本文已参加“新人创造礼”活动,一同敞开创造之路。
之前一篇博客记录了将Oracle的数据实时同步到MySQL中,由于项目需求,将多个数据源的数据先集成到一致的MySQL数据库中,然后再将这些数据传输到Rabbitmq中,最终经过Spark传输到HBASE中。之前也考虑过将这些制作到一个管道中,最终由于Rabbitmq我并没有找到能够作为中间的过程的组件,只能拆分红两个管道,这篇就记录一下将MySQL中的数据传输到Rabbitmq Producer中。关于前面讲Oracle数据同步到MySQL的能够看这篇:运用Streamsets将Oracle数据实时同步到MySQL中
1、首要制作整个的管道
2、装备MySQL Binary Log参数
A、装备MySQL Binary Log
这儿假如需求从二进制日志的最初开端读取事情则勾上。未挑选时,原点将从上次保存的偏移开端读取事情。
B、填写衔接MySQL的账号与密码
C、装备Advanced
读取二进制日志文件中的事情时要监听的表的列表。假如是多个表的话格局:
<database name>.<table name>,<database name>.<table name>......
3、装备Rabbitmq Producer参数
A、装备通用参数
B、装备RabbitMQ
RabbitMQ URI。通常运用以下格局: amqp:< host >:< port >/< virtualhost >。
C、装备Queue
这儿的Name填写的是运用或新创建的行列的名称。
D、装备Exchange
能够挑选为要运用的binding装备以下绑定特点。假如未装备任何binding,则运用默许交流。这些特点直接对应于RabbitMQ特点。有关更多信息,请拜见RabbitMQ文档。
这儿binding的类别有三种:direct exchange、topic exchange与fanout exchange。
direct exchange:
此类型的exchange路由规矩很简单,exchange在和queue进行binding时会设置routingkey, 然后我们在将音讯发送到exchange时会设置对应的routingkey。
在direct类型的exchange中,只要这两个routingkey完全相同,exchange才会挑选对应的binging进行音讯路由。
topic exchange:
此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全持平,这儿的routingkey能够有通配符:’‘,’#’.
其间’‘表示匹配一个单词, ‘#’则表示匹配没有或许多个单词
fanout exchange:
此exchange的路由规矩很简单直接将音讯路由到一切绑定的行列中,无须对音讯的routingkey进行匹配操作。
E、装备Advanced
F、装备DataFormat
这儿DataFormat支持的音讯的数据格局:
- Avro
- Binary
- Delimited
- JSON
- Protobuf
- SDC Record
- Text
Text Field Path:包含要写入的文本数据的字段。一切数据必须合并到指定字段中。
4、解决问题
第一次启动时报了个错:
报错:
RUNNING_ERROR: MYSQL_006 – MySql server error: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event ‘binlog.000006’ at 1661, the last event read from ‘./binlog.000006’ at 1692, the last byte read from ‘./binlog.000006’ at 1692…………
由于MySQL服务器之前测验具有相同server_uuid / server_id的从服务器已衔接到主服务器,所以这儿需求到 MySQL Binary Log中修改Server ID。
5、测验数据
Oracle中增加了数据后:
由于之前的管道流装备MySQL中也会增加数据:
最终经过这次装备的管道流增加数据到Rabbitmq中。能够打开rabbitmq的管理界面: http://192.168.105.77:15672
登录后挑选Queues,能够看到刚刚新建的:
然后再Streamsets的管道中看到启动成功: