简略开始入门了解canal
是不是还有为数据同步找不到适宜的计划忧愁?本文将介绍一款阿里开源的数据同步框架
布景
在作业中咱们咱们或许需求实时的统计或许处理一些事务表中的数据时分,咱们都会进行数据同步或许叫数据迁移的步骤,是为了不影响主事务表的稳定性,当然假如对数据的实时性没有很高的要求的话、咱们可以使用守时使命去守时抓取数据,可是这种情况下需求考虑时间距离,以及每次抓取的数据量,假如事务层面每分钟有300条数据进来,可是我的守时使命却是每分钟抓200条这是不是就出现问题了?或许说某一天事务做大了数据量增加的情况下,先不说查询量大,占用资源,在程序方面就或许形成许多问题,严峻的话或许会影响到其他的事务。那么这里就有必要给咱们介绍一款数据同步框架了。
介绍
前期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的事务需求,完成方法主要是根据事务 trigger 获取增量改变。从 2010 年开始,事务逐渐测验数据库日志解析获取增量改变进行同步,由此衍生出了很多的数据库增量订阅和消费事务。
canal [k’nl] ,译意为水道/管道/沟渠,主要用途是根据 MySQL 数据库增量日志解析,供给增量数据订阅和消费
根据日志增量订阅和消费的事务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 事务 cache 改写
- 带事务逻辑的增量数据处理
当时的 canal 支撑源端 MySQL 版别包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
原理
MySQL主备仿制原理
- MySQL master 将数据改变写入二进制日志( binary log, 其间记录叫做二进制日志事情binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事情,将数据改变反映它自己的数据
canal 作业原理
- canal 模仿 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 目标(原始为 byte 流)
canal的环境建立
敞开binlog日志
上面介绍了canal的完成是根据mysql的binlog日志,那就是说咱们首先要把mysql的binlog日志功用敞开了。
先进入mysql:
mysql -uroot -p
履行这个:
show variables like 'log_%';
看到这个log_bin打开即可
假如显示为OFF状态说明没有打开,需求修正mysql的配置文件my.cnf
vi /etc/my.cnf
### 追加内容
log-bin=mysql-bin #binlog 文件名
binlog_format=ROW #挑选row模式
server_id=1 #mysql的实例id,不能和canal的slaveId重复
### 重启mysql
service mysql restart
### 登录查看、
show variables like 'log_%';
创立canal用户
root用户进入mysql环境,输入以下指令
# 修正暗码校验规矩
set global validate_password_length=0;
set global validate_password_policy=LOW;
# 创立用户canal,暗码为canal
CREATE USER canal IDENTIFIED BY 'canal';
# canal用户授权
GRANT SELECT,UPDATE,INSERT,DELETE,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 改写权限
FLUSH PRIVILEGES;
配置文件
### vi conf/example/instance.properties
#需求改成自己的数据库信息
canal.instance.master.address=192.168.44.132:3306
#需求改成自己的数据库用户名与暗码
canal.instance.dbUsername=root
canal.instance.dbpassword=root
#需求改成同步的数据库表规矩,例如仅仅同步一下表
canal.instance.filter.regex=.*canal.instance.filter.regex=guli ucenter.ucenter member
正则规矩:
- 多个正则之间以逗号(,)分隔,转义符需求双斜杠
- 一切表:
.*
or.***
- 库下的一切表 :
canal..*
- 库下的以canal打头的表:
canal.canal.*
- 库下的一张表:
canal.test1
- 多个规矩组合使用
,
隔开
注意:此过滤条件只针对row模式的数据有效(ps mixed/statement因为不解析sql
启动
进入bin目录下启动
sh bin/startup.sh
整合Spring
maven依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
</dependency>
</dependencies>
配置文件
# 服务端口
server.port=10001
# 服务名
spring.application.name=canal-client
# 环境设置:dev、test、prod
spring.profiles.active=dev
# mysql数据库衔接
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/guli?serverTimezone=GMT+8
spring.datasource.username=root
spring.datasource.password=root
编写代码
@Component
public class CanalClient {
//sql队列
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Resource
private DataSource dataSource;
/**
* canal入库方法
*/
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.61.111",
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*..*");
connector.rollback();
try {
while (true) {
//测验从master那边拉去数据batchSize条记录,有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) { //数据没有改变(size>=1,有改变)
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
//当队列里面堆积的sql大于必定数值的时分就模仿履行
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/**
* 模仿履行队列里面的sql句子
*/
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i ) {
String sql = SQL_QUEUE.poll();
System.out.println("[sql]----> " sql);
this.execute(sql.toString());
}
}
/**
* 数据处理
*
* @param entrys
*/
private void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
for (CanalEntry.Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == CanalEntry.EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/**
* 保存更新句子
*
* @param entry
*/
private void saveUpdateSql(CanalEntry.Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " entry.getHeader().getTableName() " set ");
for (int i = 0; i < newColumnList.size(); i ) {
sql.append(" " newColumnList.get(i).getName()
" = '" newColumnList.get(i).getValue() "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : oldColumnList) {
if (column.getIsKey()) {
//暂时只支撑单一主键
sql.append(column.getName() "=" column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存删除句子
*
* @param entry
*/
private void saveDeleteSql(CanalEntry.Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " entry.getHeader().getTableName() " where ");
for (CanalEntry.Column column : columnList) {
if (column.getIsKey()) {
//暂时只支撑单一主键
sql.append(column.getName() "=" column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存刺进句子
*
* @param entry
*/
private void saveInsertSql(CanalEntry.Entry entry) {
try {
RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " entry.getHeader().getTableName() " (");
for (int i = 0; i < columnList.size(); i ) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i ) {
sql.append("'" columnList.get(i).getValue() "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 入库
* @param sql
*/
public void execute(String sql) {
Connection con = null;
try {
if(null == sql) return;
con = dataSource.getConnection();
QueryRunner qr = new QueryRunner();
int row = qr.execute(con, sql);
System.out.println("update: " row);
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtils.closeQuietly(con);
}
}
}
总结
现在大多数企业都在挑选canal作为数据同步的技术计划,在分布式系统中有着广泛的使用,其实其间的内容还有许多,企业中大多是结合kafka和zookpeer去进行多节点注册处理数据,本文仅仅简略了解,供给一种处理日常开发规划的经历。咱们可以更好的处理数据同步的问题。