奥卡姆剃刀原理,“如无必要,勿增实体”。
在一些小型项目当中,没有引进音讯中间件,也不想引进,但有一些业务逻辑想要解耦异步,那怎么办呢?

咱们的web项目,单独内网布置,由于大数据布景,公司音讯中间件共同运用的kafka,在一些小项目上kafka就显得很粗笨。 引进rocketmq或rabittmq也没必要。 事情或多线程也不适合。


详细一点的,之前对接的一个体系,一张记载表有10+以上的类型状况,新的需求是,针对每种状况做出对应的不同的操作。
之前写入这张记载表的时分,方法也是五花八门,有的是单条记载写入,有的是批量写入,有的调用了共同的service,有的呢直接调用了DAO层mapper直接写入。

所以想找到一个共同入口进行切入处理,就不行了。

这个时分就算引进音讯行列,也需求在不同的业务办法里进行写入音讯的操作。业务方也不太乐意配合改。

能够运用触发器,但它是归于上个时代的产品,槽点太多。(这儿并不是彻底不主张运用触发器,技能永远是为业务服务的,只要评价觉得可行,就能够运用)
那么这个时分,CDC技能就能够粉墨登场了。
CDC(change data capture)数据更改捕获。
常见的数据更改捕获都是经过数据库比方mysql的binlog来到达目的。
咱们能够监控mysql binlog日志,当写入一条数据的时分,接收到数据改变日志,做出相应的操作。
这样的好处是,只需导入依靠,不额定引进组件,一起无需改动之前的代码。 两头彻底解耦,互不搅扰。


常见的CDC结构,比方,canal (非Camel)

  • canal [k’nl],译意为水道/管道/沟渠,主要用途是依据 MySQL 数据库增量日志解析,供给增量数据订阅和消费 前期阿里巴巴由于杭州和美国双机房布置,存在跨机房同步的业务需求,完结方法主要是依据业务 trigger 获取增量改变。 从 2010 年开端,业务逐渐尝试数据库日志解析获取增量改变进行同步,由此衍生出了很多的数据库增量订阅和消费业务。

  • 它是依据日志增量订阅和消费的业务,包含

数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 改写
带业务逻辑的增量数据处理

不想引入mq?试试debezium

它的原理

  • canal 模仿 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 恳求,开端推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 目标(原始为 byte 流)

再比方,debezium(音同 dbzm 滴BZ姆)很多人或许不太了解. 包含databus,maxwell,flink cdc(大数据范畴)等等,它们同属CDC捕获数据更改(change data capture)类的技能。

不想引入mq?试试debezium


为什么是debezium


这么多技能结构,为什么选debezium?

看起来很多。但一一扫除下来就debezium和canal。

sqoop,kettle,datax之类的东西,归于前大数据时代的产品,地位类似于web范畴的structs2。而且,它们依据查询而非binlog日志,其实不归于CDC。首先扫除。

flink cdc是大数据范畴的结构,一般web项目的数据量归于大材小用了。

一起databus,maxwell相对比较冷门,用得比较少。


最终不必canal的原因有以下几点。


  1. canal需求安装,这违反了“如非必要,勿增实体”的准则。
  2. canal只能对MYSQL进行CDC监控。有很大的局限性。
  3. 大数据范畴非常盛行的flink cdc(阿里团队主导)底层运用的也是debezium,而非同是阿里出品的canal。
  4. debezium可借助kafka组件,将改变的数据发到kafka topic,后续的读取操作只需读取kafka,可有用减少数据库的读取压力。可保证一次语义,至少一次语义。
    一起,也可依据内嵌布置形式,无需咱们手动布置kafka集群,可满意”如非必要,勿增实体“的准则。

不想引入mq?试试debezium

Debezium是一个捕获数据更改(CDC)渠道,而且运用Kafka和Kafka Connect完结了自己的耐久性、可靠性和容错性。
每一个布置在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一个上游数据库服务器,捕获一切的数据库更改,
然后记载到一个或许多个Kafka topic(一般一个数据库表对应一个kafka topic)。
Kafka保证一切这些数据更改事情都能够多副本而且总体上有序(Kafka只能保证一个topic的单个分区内有序),这样,
更多的客户端能够独立消费相同的数据更改事情而对上游数据库体系形成的影响降到很小(假如N个运用都直接去监控数据库更改,对数据库的压力为N,
而用debezium报告数据库更改事情到kafka,一切的运用都去消费kafka中的音讯,能够把对数据库的压力降到1)。另外,客户端能够随时中止消费,然后重启,
从前次中止消费的当地接着消费。每个客户端能够自行决定他们是否需求exactly-once或许at-least-once音讯交给语义保证,
而且一切的数据库或许表的更改事情是依照上游数据库产生的次序被交给的。

不想引入mq?试试debezium

对于不需求或许不想要这种容错等级、性能、可扩展性、可靠性的运用,他们能够运用内嵌的Debezium connector引擎来直接在运用内部运转connector。
这种运用仍需求消费数据库更改事情,但更期望connector直接传递给它,而不是耐久化到Kafka里。


简介

Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)供给了一个低推迟的流式处理渠道。你能够安装而且装备Debezium去监控你的数据库,然后你的运用就能够消费对数据库的每一个行等级(row-level)的更改。只要已提交的更改才是可见的,所以你的运用不必忧虑业务(transaction)或许更改被回滚(roll back)。Debezium为一切的数据库更改事情供给了一个共同的模型,所以你的运用不必忧虑每一种数据库办理体系的错综杂乱性。另外,由于Debezium用耐久化的、有副本备份的日志来记载数据库数据改变的前史,因此,你的运用能够随时中止再重启,而不会错过它中止运转时产生的事情,保证了一切的事情都能被正确地、彻底地处理掉。


监控数据库,而且在数据改变的时分获得告诉一直是很杂乱的事情。关系型数据库的触发器能够做到,可是只对特定的数据库有用,而且一般只能更新数据库内的状况(无法和外部的进程通信)。一些数据库供给了监控数据改变的API或许结构,可是没有一个标准,每种数据库的完结方法都是不同的,而且需求很多特定的知识和理解特定的代码才能运用。保证以相同的次序查看和处理一切更改,一起最小化影响数据库仍然非常具有挑战性。


Debezium供给了模块为你做这些杂乱的作业。一些模块是通用的,而且能够适用多种数据库办理体系,但在功用和性能方面仍有一些限制。另一些模块是为特定的数据库办理体系定制的,所以他们一般能够更多地运用数据库体系自身的特性来供给更多功用。

github官网上罗列的一些
典型运用场景


  • 缓存失效(Cache invalidation)
    经典问题 Redis与MySQL双写共同性如何保证?Debezium运用kafka单分区的有序性(疏忽mysql binlog自身或许的推迟和乱序),可彻底处理此问题。
    在缓存中缓存的条目(entry)在源头被更改或许被删去的时分立即让缓存中的条目失效。 假如缓存在一个独立的进程中运转(例如Redis,Memcache,Infinispan或许其他的),那么简略的缓存失效逻辑能够放在独立的进程或服务中, 从而简化主运用的逻辑。在一些场景中,缓存失效逻辑能够更杂乱一点,让它运用更改事情中的更新数据去更新缓存中受影响的条目。

  • 简化单体运用(Simplifying monolithic applications) 许多运用更新数据库,然后在数据库中的更改被提交后,做一些额定的作业:更新查找索引,更新缓存,发送告诉,运转业务逻辑,等等。 这种情况一般称为双写(dual-writes),由于运用没有在一个业务内写多个体系。这样不仅运用逻辑杂乱难以维护, 而且双写简单丢掉数据或许在一些体系更新成功而另一些体系没有更新成功的时分形成不同体系之间的状况不共同。运用捕获更改数据技能(change data capture,CDC), 在源数据库的数据更改提交后,这些额定的作业能够被放在独立的线程或许进程(服务)中完结。这种完结方法的容错性更好,不会丢掉事情,简单扩展,而且更简单支撑升级。

  • 共享数据库(Sharing databases) 当多个运用共用同一个数据库的时分,一个运用提交的更改一般要被另一个运用感知到。一种完结方法是运用音讯总线, 尽管非业务性(non-transactional)的音讯总线总会受上面提到的双写(dual-writes)影响。可是,另一种完结方法,即Debezium,变得很直接:每个运用能够直接监控数据库的更改,而且响应更改。

  • 数据集成(Data integration) 数据一般被存储在多个当地,尤其是当数据被用于不同的目的的时分,会有不同的形式。坚持多体系的同步是很有挑战性的, 可是能够经过运用Debezium加上简略的事情处理逻辑来完结简略的ETL类型的处理方案。

  • 指令查询责任分离(CQRS) 在指令查询责任分离 Command Query Responsibility Separation (CQRS) 架构形式中,更新数据运用了一种数据模型, 读数据运用了一种或许多种数据模型。由于数据更改被记载在更新侧(update-side),这些更改将被处理以更新各种读展现。 所以CQRS运用一般更杂乱,尤其是他们需求保证可靠性和全序(totally-ordered)处理。Debezium和CDC能够使这种方法更可行: 写操作被正常记载,可是Debezium捕获数据更改,而且耐久化到全序流里,然后供那些需求异步更新只读视图的服务消费。 写侧(write-side)表能够表示面向范畴的实体(domain-oriented entities),或许当CQRS和 Event Sourcing 结合的时分,写侧表仅仅用做追加操作指令事情的日志。


springboot 整合 Debezium


依靠

<debezium.version>1.7.0.Final</debezium.version>
 <mysql.connector.version>8.0.26</mysql.connector.version>
 <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.connector.version}</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${debezium.version}</version>
    <exclusions>
	<exclusion>
	    <groupId>mysql</groupId>
	    <artifactId>mysql-connector-java</artifactId>
	</exclusion>
    </exclusions>
</dependency>

注意debezium版别为1.7.0.Final,对应mysql驱动为8.0.26,低于这个版别会报兼容过错。


装备


相应的装备

debezium.datasource.hostname = localhost
debezium.datasource.port = 3306
debezium.datasource.user = root
debezium.datasource.password = 123456
debezium.datasource.tableWhitelist = test.test
debezium.datasource.storageFile = E:/debezium/test/offsets/offset.dat
debezium.datasource.historyFile = E:/debezium/test/history/custom-file-db-history.dat
debezium.datasource.flushInterval = 10000
debezium.datasource.serverId = 1
debezium.datasource.serverName = name-1

然后进行装备初始化。

主要的装备项:

connector.class

  • 监控的数据库类型,这儿选mysql。

offset.storage

  • 挑选FileOffsetBackingStore时,意思把读取进展存到本地文件,由于咱们不必kafka,当运用kafka时,选KafkaOffsetBackingStore

offset.storage.file.filename

  • 寄存读取进展的本地文件地址。

offset.flush.interval.ms

  • 读取进展改写保存频率,默许1分钟。假如不依靠kafka的话,应该就没有exactly once只读取一次语义,应该是至少读取一次。意味着或许重复读取。假如web容器挂了,最新的读取进展没有改写到文件里,下次重启时,就会重复读取binlog。

table.whitelist

  • 监控的表名白名单,主张设置此值,只监控这些表的binlog。

database.whitelist

  • 监控的数据库白名单,假如选此值,会疏忽table.whitelist,然后监控此db下一切表的binlog。

import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import lombok.Data;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.File;
import java.io.IOException;
/**
 * @className: MysqlConfig
 * @author: nyp
 * @description: TODO
 * @date: 2023/8/7 13:53
 * @version: 1.0
 */
@Configuration
@ConfigurationProperties(prefix ="debezium.datasource")
@Data
public class MysqlBinlogConfig {
    private String hostname;
    private String port;
    private String user;
    private String password;
    private String tableWhitelist;
    private String storageFile;
    private String historyFile;
    private Long flushInterval;
    private String serverId;
    private String serverName;
    @Bean
    public io.debezium.config.Configuration MysqlBinlogConfig () throws Exception {
        checkFile();
        io.debezium.config.Configuration configuration = io.debezium.config.Configuration.create()
                .with("name", "mysql_connector")
                .with("connector.class", MySqlConnector.class)
                // .with("offset.storage", KafkaOffsetBackingStore.class)
                .with("offset.storage", FileOffsetBackingStore.class)
                .with("offset.storage.file.filename", storageFile)
                .with("offset.flush.interval.ms", flushInterval)
                .with("database.history", FileDatabaseHistory.class.getName())
                .with("database.history.file.filename", historyFile)
                .with("snapshot.mode", "Schema_only")
                .with("database.server.id", serverId)
                .with("database.server.name", serverName)
                .with("database.hostname", hostname)
//                .with("database.dbname", dbname)
                .with("database.port", port)
                .with("database.user", user)
                .with("database.password", password)
//                .with("database.whitelist", "test")
                .with("table.whitelist", tableWhitelist)
                .build();
        return configuration;
    }
    private void checkFile() throws IOException {
        String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
        File dirFile = new File(dir);
        if(!dirFile.exists()){
            dirFile.mkdirs();
        }
        File file = new File(storageFile);
        if(!file.exists()){
            file.createNewFile();
        }
    }
}

snapshot.mode 快照形式,指定连接器启动时运转快照的条件。或许的设置有:

  • initial 只要在没有为逻辑服务器名记载偏移量时,连接器才运转快照。

  • When_needed 当连接器认为有必要时,它会在启动时运转快照。也就是说,当没有可用的偏移量时,或许当先前记载的偏移量指定了服务器中不可用的binlog位置或GTID时。

  • Never 连接器从不运用快照。在榜首次运用逻辑服务器名启动时,连接器从binlog的开头读取。谨慎装备此行为。只要当binlog保证包含数据库的整个前史记载时,它才有用。

  • Schema_only 连接器运转形式而不是数据的快照。当您不需求主题包含数据的共同快照,而只需求主题包含自连接器启动以来的更改时,此设置非常有用。

  • Schema_only_recovery 这是现已捕获更改的连接器的康复设置。当您重新启动连接器时,此设置答应康复损坏或丢掉的数据库前史主题。您能够定时将其设置为“整理”意外增长的数据库前史主题。数据库前史主题需求无限保存。


database.server.id

  • 伪装成slave的Debezium服务的id,自定义,有多个Debezium服务不能重复,假如重复的话会报以下异常。
io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'binlog.000013' at 46647257, the last event read from './binlog.000013' at 125, the last byte read from './binlog.000013' at 46647257. Error code: 1236; SQLSTATE: HY000.
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1167)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1212)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'binlog.000013' at 46647257, the last event read from './binlog.000013' at 125, the last byte read from './binlog.000013' at 46647257.
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:944)
	... 3 common frames omitted

监听


装备监听服务

import com.alibaba.fastjson.JSON;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
/**
 * @projectName: test
 * @package: com.test.config
 * @className: MysqlBinlogListener
 * @author: nyp
 * @description: TODO
 * @date: 2023/8/7 13:56
 * @version: 1.0
 */
@Component
@Slf4j
public class MysqlBinlogListener {
    @Resource
    private Executor taskExecutor;
    private final List<DebeziumEngine<ChangeEvent<String, String>>> engineList = new ArrayList<>();
    private MysqlBinlogListener (@Qualifier("mysqlConnector") Configuration configuration) {
            this.engineList.add(DebeziumEngine.create(Json.class)
                    .using(configuration.asProperties())
                    .notifying(record -> receiveChangeEvent(record.value()))
                    .build());
    }
    private void receiveChangeEvent(String value) {
        if (Objects.nonNull(value)) {
            Map<String, Object> payload = getPayload(value);
            String op = JSON.parseObject(JSON.toJSONString(payload.get("op")), String.class);
            if (!(StringUtils.isBlank(op) || Envelope.Operation.READ.equals(op))) {
                ChangeData changeData = getChangeData(payload);
                log.info("changeData = " + changeData);
            }
        }
    }
    @PostConstruct
    private void start() {
        for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
            taskExecutor.execute(engine);
        }
    }
    @PreDestroy
    private void stop() {
        for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
            if (engine != null) {
                try {
                    engine.close();
                } catch (IOException e) {
                    log.error("", e);
                }
            }
        }
    }
    public static Map<String, Object> getPayload(String value) {
        Map<String, Object> map = JSON.parseObject(value, Map.class);
        Map<String, Object> payload = JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class);
        return payload;
    }
    public static ChangeData getChangeData(Map<String, Object> payload) {
        Map<String, Object> source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
        return ChangeData.builder()
                .op(payload.get("op").toString())
                .table(source.get("table").toString())
                .after(JSON.parseObject(JSON.toJSONString(payload.get("after")), Map.class))
                .source(JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class))
                .before(JSON.parseObject(JSON.toJSONString(payload.get("before")), Map.class))
                .build();
    }
    @Data
    @Builder
    public static class ChangeData {
        /**
         * 更改前数据
         */
        private Map<String, Object> after;
        private Map<String, Object> source;
        /**
         * 更改后数据
         */
        private Map<String, Object> before;
        /**
         * 更改的表名
         */
        private String table;
        /**
         * 操作类型, 枚举 Envelope.Operation
         */
        private String op;
    }
}

将监听到的binlog日志封装为ChangeData目标,包含表名,更改前后的数据,

以及操作类型

READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d"),
TRUNCATE("t");

测试


update操作输出

MysqlListener.ChangeData(after = {
	name = Suzuki Mio2,
	id = 1
}, source = {
	file = binlog .000013,
	connector = mysql,
	pos = 42587833,
	name = test - 1,
	row = 0,
	server_id = 1,
	version = 1.7 .0.Final,
	ts_ms = 1691458956000,
	snapshot = false,
	db = test
	table = test
}, before = {
	name = Suzuki Mio,
	id = 1
}, table = test, op = u)
data = {
	name = Suzuki Mio2,
	id = 1
}

新增操作输出

MysqlListener.ChangeData(after = {
	name = 王五,
	id = 0
}, source = {
	file = binlog .000013,
	connector = mysql,
	pos = 42588175,
	name = test - 1,
	row = 0,
	server_id = 1,
	version = 1.7 .0.Final,
	ts_ms = 1691459066000,
	snapshot = false,
	db = test,
	table = test
}, before = null, table = test, op = c)

删去操作输出

MysqlListener.ChangeData(after = null, source = {
	file = binlog .000013,
	connector = mysql,
	pos = 42588959,
	name = test - 1,
	row = 0,
	server_id = 1,
	version = 1.7 .0.Final,
	ts_ms = 1691459104000,
	snapshot = false,
	db = test
	table = test
}, before = {
	name = 王五,
	id = 0
}, table = test, op = d)

咱们之前装备的保存读取进展的文件storageFile,类似于kafka的偏移量,记载的内容如下:

不想引入mq?试试debezium

中止服务,对数据库进行操作,再次重启,会依据进展重新读取。


小结

本文介绍了debezium,更多的时分,咱们一谈到CDC,榜首想到的是很多数据同步的东西。 但其实也能够运用其数据改变捕获的特性,来到达一部份音讯行列的效果。
但其毕竟不能彻底替代音讯行列。我们理性看待与挑选。

本文的重点在介绍一种思路,详细的某项技能反而不那么重要。


参考:

debezium.io/documentati… debezium.io/documentati… github.com/alibaba/can… github.com/debezium/de…