觉得不错请按下图操作,掘友们,哈哈哈!!!

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

一:Canal 官网介绍

canal :译意为水道/管道/水沟,主要用途是根据 MySQL 数据库增量日志解析,供给增量数据订阅和消费

前期阿里巴巴由于杭州和美国双机房布置,存在跨机房同步的事务需求,完成方式主要是根据事务 trigger 获取增量改变。从 2010 年开端,事务逐渐测验数据库日志解析获取增量改变进行同步,由此衍生出了很多的数据库增量订阅和消费事务。

根据日志增量订阅和消费的事务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 事务 cache 改写
  • 带事务逻辑的增量数据处理

当前的 canal 支撑源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

二:作业原理

运用Canal 首要咱们要了解MySql主从复制的作业原理。

2.1 MySQL主从复制原理

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

  • MySQL master 将数据改变写入二进制日志( binary log, 其中记录叫做二进制日志事情binary log events,能够经过 show binlog events 进行检查)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事情,将数据改变反映它自己的数据

2.2 canal 作业原理

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

  • canal 模仿 MySQL slave 的交互协议,假装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开端推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 目标(原始为 byte 流)

三:canal 装置运用

3.1 canal 简介

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

去官网下载页面进行下载:github.com/alibaba/can…

我这儿下载的是1.1.6的版本:

canal 对应包的下载和装置的教程,都间接看 canal官网github,装置包现在有三兄弟:

  • canal deployer:又称 canal server,是真正监听 mysql 日志的服务端。
  • canal adapter:望文生义“适配器”,搭配 canal server,现在能完成mysql 数据到 hbase、rdb、es的增量同步,妥妥的 ETL 东西。
  • canal admin:也是为 canal server 服务的,为canal供给全体装备办理、节点运维等面向运维的性能,供给肯定敌对的WebUI操作界面。假如 canal server 要搭建集群环境,必少不了 canal admin 这样业余的运维东西。

3.2 mysql 相关装备

1. MySql 相关装备

  • 装置canal前咱们先敞开MySql的 binlog,在MySQL装备文件my.cnf设置如下信息:
[mysqld]
# 翻开binlog 
log-bin=mysql-bin 
# 选择ROW(行)形式 
binlog-format=ROW 
# 装备MySQL replaction需求定义,不要和canal的slaveId重复 
server_id=1
  • 改了装备文件之后,重启MySQL,运用指令检查是否翻开binlog形式:

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

检查binlog日志文件列表:

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

检查当前正在写入的binlog文件:

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

MySQL服务器这边装备就完成。

  • 在mysql中给canal独自建一个用户,给全库全表的读,拷贝,复制的权限
-- 运用指令登录:
mysql -u root -p 
-- 创立用户 用户名:canal 密码:Canal@123456 
create user 'canal'@'%' identified by 'Canal@123456';
-- 授权 *.*表明所有库 
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';

3.3 装置canal

3.3.1 canal.deployer

解压canal.deployer-1.1.6.tar.gz,咱们能够看到里面有四个文件夹:

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

接着翻开装备文件conf/example/instance.properties,装备信息如下:

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会主动生成slaveId,所以能够不用装备
# canal.instance.mysql.slaveId=0
# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志称号
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时开始的binlog偏移量
canal.instance.master.position=154
# mysql主库链接时开始的binlog的时刻戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex .*\..*表明监听所有表 也能够写具体的表名,用,离隔
canal.instance.filter.regex=.*\..*
# mysql 数据解析表的黑名单,多个表用,离隔
canal.instance.filter.black.regex=

我这儿用的是家里的win10体系,公司的mac没遇到这个问题,所以在bin目录下找到startup.bat发动:

可是发动就报错,要踩坑了???

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

后来修正一下发动的脚本startup.bat解决了:

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

然后再发动脚本:

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

这就发动成功了。

3.3.2 canal adapter

作用:

  1. 对接上游音讯,包括kafka、rocketmq、canal-server
  2. 完成mysql数据的增量同步
  3. 完成mysql数据的全量同步
  4. 下流写入支撑mysql、es、hbase等

它既然是适配器,那么就得介绍“源头”和“目标”这两个部位数据的对接:

  • 源头
  • (1)canal adapter 能够直连 canal server ,出产 instance的数据;
  • (2)也能够在让 canal server 将数据投递到 MQ,而后 cancal adapter 出产 MQ 中的数据。
  • 目标:对接上游音讯,包括kafka、rocketmq、canal-server 完成mysql数据的增量同步 完成mysql数据的全量同步 下流写入支撑mysql、es、hbase

现在adapter是支撑动态装备的,也便是说修正装备文件后无需重启,使命会主动改写装备!

(1) 修正application.yml 履行vim conf/application.yml修正consumerProperties、srcDataSources、canalAdapters的装备

canal.conf:
  mode: tcp # kafka rocketMQ                # canal client的形式: tcp kafka rocketMQ
  flatMessage: true                         # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ形式下有用
  syncBatchSize: 1000                       # 每次同步的批数量
  retries: 0                                # 重试次数, -1为无限重试
  timeout:                                  # 同步超时时刻, 单位毫秒
  consumerProperties:
    canal.tcp.server.host:                  # 对应单机形式下的canal
    canal.tcp.zookeeper.hosts: 127.0.0.1:2181 # 对应集群形式下的zk地址, 假如装备了canal.tcp.server.host, 则以canal.tcp.server.host为准
    canal.tcp.batch.size: 500               # tcp每次拉取音讯的数量
  srcDataSources:                           # 源数据库
    defaultDS:                              # 自定义称号
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true   # jdbc url 
      username: root                                            # jdbc 账号
      password: 123                                          # jdbc 密码
  canalAdapters:                            # 适配器列表
  - instance: example                       # canal 实例名或许 MQ topic 名
    groups:                                 # 分组列表
    - groupId: g1                           # 分组id, 假如是MQ形式将用到该值
      outerAdapters:                        # 分组内适配器列表
      - name: es7                           # es7适配器
        mode: rest                          # transport or rest
        hosts: 127.0.0.1:9200               # es地址
        security.auth: test:123456          # 拜访es的认证信息,如没有则不需求填
        cluster.name: my-es                 # 集群称号,transport形式必需装备
...... 
  1. 一份数据能够被多个group同时消费, 多个group之间会是一个并行履行, 一个group内部是一个串行履行多个outerAdapters, 比方比方中logger和hbase
  2. 现在client adapter数据订阅的方式支撑两种,直连canal server 或许 订阅kafka/RocketMQ的音讯

(2) conf/es7目录下新增映射装备文件

adapter将会主动加载conf/es7下的所有.yml结束的装备文件

新增表映射的装备文件,如sys_user.yml内容如下:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: sys_goods
  _id: id
  upsert: true
  sql: "select id, goodsname, price from sys_goods"
  etlCondition: "where update_time>={}"
  commitBatch: 3000
  • dataSourceKey装备application.ymlsrcDataSources的值
  • destination装备canal.deployerInstance
  • groupId装备application.ymlcanalAdapters.groups的值
  • _index装备索引名
  • _id装备主键对应的字段
  • upsert是否更新
  • sql映射sql
  • etlConditionetl 的条件参数,全量同步时能够运用
  • commitBatch提交批巨细

sql映射支撑多表相关自由组合, 可是有必定的约束:

  1. 主表不能为子查询句子
  2. 只能运用left outer join即最左表必定要是主表
  3. 相关从表假如是子查询不能有多张表
  4. 主sql中不能有where查询条件(从表子查询中能够有where条件可是不引荐, 可能会形成数据同步的不一致, 比方修正了where条件中的字段内容)
  5. 相关条件只允许主外键的’=’操作不能呈现其他常量判别比方: on a.role_id=b.id and b.statues=1
  6. 相关条件必须要有一个字段呈现在主查询句子中比方: on a.role_id=b.id 其中的 a.role_id 或许 b.id 必须呈现在主select句子中

Elastic Search的mapping 特点与sql的查询值将一一对应(不支撑 select *), 比方: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这儿以别号(假如有别号)作为终究的映射字段. 这儿的_id能够填写到装备文件的 _id: _id映射

四: 项目实战

4.1 项目运用场景

在现有表演事务中,咱们有这样一个场景,办理员在后台办理体系中能够对先有表演项目,场次,售卖时刻,票种等信息进行修正,比方原先我有一个表演项目 “周杰伦演唱会”,这时分用户在C端就能够看到称号便是“周杰伦演唱会”,可是我觉的这个项目的名字不够大气,这时分我在后台改了叫 “周杰伦嘉年华演唱会”,在后台更改后首要项目不会当即收效,而是要经过一个发布的过程C端才能看到更改后的。 那么由于C端查询量巨,特别是在这种大型演唱会的时分,面对这么大的流量咱们怎样去规划呢??? 所以这便是咱们运用的场景。

4.2 规划比照

  • 流程上触及的模型直接增加Redis 缓存,在事务侧组装查询逻辑
    • 这是咱们原先的运用方案,可是对Redis 依赖很大,并且在高峰期的时分呈现过缓存击穿的问题
    • 由于查询逻辑复杂,事务侧能完成查询事务的组装,可是在查询高峰期内存耗费极高,效率也不是很高。
  • 数据库数据更改后在事务层再去更新ES,ES供给对外的查询能力
    • 这便是上边方案的变种,ES有天然的查询优势,并且能应对复杂的查询条件,所以在上边很多事务组装的复杂查询此刻都能够用ES来完成。可是数据同步ES的当地就比较分散,每个更改表的当地都要考虑,假如同步的表比较多就很难维护了。
  • 运用Canal 监听数据库binary log,同步到Kafka,专门的服务消费kafka音讯,同步ES
    • 第三种方案,需求进入中间件,canal和kafka,可是这个方案只会在服务搭建的时分比较复杂一些,归于一劳永逸那种,canal 咱们监听数据库 binary log ,然后将 binary log 同步到kafka中,新增消费服务读取kafka 中 binary log,处理ES逻辑。这样事务方能够提各种各样的查询需求,咱们只需求在查询服务中运用ES进行组装即可。

4.3 运用:

在咱们的项目中,数据流向包括:腾讯云订阅服务 -> canal adapter -> es

只运用canal的客户端功用也便是canal adapter,由于咱们咱们数据库是运用的腾讯云Mql,腾讯云供给了数据订阅功用。

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

在这儿说明下,实际上腾讯云,阿里云都供给了这样的服务,他们对数据库指定的表监听binglog,然后写入到kakaka中,比方上边的截图。

4.4 问题以及解决方案

问题1:监听多张表怎样确保数据音讯不乱?

【干货】使用Canal 解决数据同步场景中的疑难杂症!!!

就像这个截图,咱们能够按照表名进行分区,也便是说,同一张表数据的改变只会在一个partion上,这样表改变的音讯在一个partion 上就变得有序了。

问题2:假如数据库中数据和ES中的数据对不上怎样处理?

现在咱们是供给一个job,每十分钟跑一次去比照数据库中有用数据的数量和ES中的数据数量,可是Job很慢,能够考虑,从数据库捞数据,分批跑(shard的方式,先取比方订单号是1的,后续一直累加到9),假如有多台机器就能够散布式跑,每台下放shardIndex ,数据库sql就捞 orderNum%10 = shardIndex的数据,这样也不会跑重,处理的时分再用多线程。

问题3: 怎样进步处理速度以及效率 在探索中咱们采用了一种办法:便是对音讯批量处理,咱们从音讯中心拿音讯的时分批量去拿,举个比方吧,goods 表对应的音讯现在有1000条,orderId =100 的数据在占了100个,散布在不同的offset中,假如我每次都是拿一条音讯去消费一次这样重复的逻辑我要跑100次,可是们能够批量拿200条音讯,然后再对音讯中的OrderId进行分组,得到终究要履行的OrderId这样是不是就能够少跑很多次。拿到要履行的OrderId后咱们就能够找到对应的Sql得到最终的Select结果(这个当地有个点,我不关心你是什么操作【DML】,我只要操作的数据ID,然后履行我的Select sql 去查询最终的结果,然后再去同步到ES中),处理的过程中咱们也能够运用多线程来加速处理速度,这都是一些优化的点。

【干货】常见库存规划方案-各种方案比照总有一个适合你

JYM来一篇音讯行列-Kafka的干货吧!!!

规划形式:

JYM 规划形式系列- 单例形式,适配器形式,让你的代码更优雅!!!

JYM 规划形式系列- 责任链形式,装修形式,让你的代码更优雅!!!

JYM 规划形式系列- 战略形式,模板办法形式,让你的代码更优雅!!!

JYM 规划形式系列-工厂形式,让你的代码更优雅!!!

Spring相关:

Spring源码解析-陈词滥调Bean ⽣命周期 ,但这个你值得看!!!

Spring 源码解析-JYM你值得拥有,从源码角度看bean的循环依赖!!!

Spring源码解析-Spring 事务

本文正在参与「金石方案」