简介
本文面向BitSail的Connector开发人员,经过开发者的角度全面的论述开发一个完整Connector的全流程,快速上手Connector开发。
目录结构
首要开发者需求经过git下载最新代码到本地,并导入到IDE中。一起创立自己的作业分支,使用该分支开发自己的Connector。项目地址:github.com/bytedance/b…
项目结构如下:
开发流程
BitSail 是一款依据分布式架构的数据集成引擎,Connector会并发履行。并由BitSail 结构来负责任务的调度、并发履行、脏数据处理等,开发者只需求完结对应接口即可,详细开发流程如下:
-
工程装备,开发者需求在
bitsail/bitsail-connectors/pom.xml
模块中注册自己的Connector,一起在bitsail/bitsail-dist/pom.xml
增加自己的Connector模块,一起为你的连接器注册装备文件,来使得结构能够在运行时动态发现它。
- Connector开发,完结Source、Sink供给的抽象办法,详细细节参考后续介绍。
- 数据输出类型,现在支撑的数据类型为BitSail Row类型,无论是Source在Reader中传递给下流的数据类型,仍是Sink从上游消费的数据类型,都应该是BitSail Row类型。
Architecture
当前Source API的规划一起兼容了流批一批的场景,换言之便是一起支撑pull & push 的场景。在此之前,咱们需求首要再过一遍传统流批场景中各组件的交互模型。
Batch Model
传统批式场景中,数据的读取一般分为如下几步:
-
createSplits
:一般在client端或许中心节点履行,意图是将完整的数据依照指定的规矩尽可能拆分为较多的rangeSplits
,createSplits
在作业生命周期内有且履行一次。
-
runWithSplit
: 一般在履行节点节点履行,履行节点发动后会向中心节点恳求存在的rangeSplit
,然后再本地进行履行;履行完结后会再次向中心节点恳求直到一切splits
履行完结。
-
commit
:全部的split的履行完结后,一般会在中心节点履行commit
的操作,用于将数据对外可见。
Stream Model
传统流式场景中,数据的读取一般分为如下几步:
-
createSplits
:一般在client端或许中心节点履行,意图是依据滑动窗口或许翻滚窗口的战略将数据流划分为rangeSplits
,createSplits
在流式作业的生命周期中依照划分窗口的会一向履行。
-
runWithSplit
: 一般在履行节点节点履行,中心节点会向可履行节点发送rangeSplit
,然后在可履行节点本地进行履行;履行完结后会将处理完的splits
数据向下流发送。
-
commit
:全部的split的履行完结后,一般会向方针数据源发送retract message
,实时动态展示结果。
BitSail Model
-
createSplits
:BitSail经过SplitCoordinator
模块划分rangeSplits
,在流式作业中的生命周期中createSplits
会周期性履行,而在批式作业中仅仅会履行一次。
-
runWithSplit
: 在履行节点节点履行,BitSail中履行节点包含Reader
和Writer
模块,中心节点会向可履行节点发送rangeSplit
,然后在可履行节点本地进行履行;履行完结后会将处理完的splits
数据向下流发送。
-
commit
:writer
在完结数据写入后,committer
来完结提交。在不敞开checkpoint
时,commit
会在一切writer
都结束后履行一次;在敞开checkpoint
时,commit
会在每次checkpoint
的时候都会履行一次。
Source Connector
- Source: 数据读取组件的生命周期办理,首要负责和结构的交互,构架作业,不参加作业真正的履行
- SourceSplit: 数据读取分片;大数据处理结构的中心意图便是将大规模的数据拆分成为多个合理的Split
- State:作业状况快照,当敞开checkpoint之后,会保存当前履行状况。
- SplitCoordinator: 已然提到了Split,就需求有相应的组件去创立、办理Split;SplitCoordinator承当了这样的人物
- SourceReader: 真正负责数据读取的组件,在接收到Split后会对其进行数据读取,然后将数据传输给下一个算子
Source Connector开发流程如下
- 首要需求创立
Source
类,需求完结Source
和ParallelismComputable
接口,首要负责和结构的交互,构架作业,它不参加作业真正的履行
-
BitSail
的Source
选用流批一体的规划思维,经过getSourceBoundedness
办法设置作业的处理方式,经过configure
办法界说readerConfiguration
的装备,经过createTypeInfoConverter
办法来进行数据类型转化,能够经过FileMappingTypeInfoConverter
得到用户在yaml文件中自界说的数据源类型和BitSail类型的转化,完结自界说化的类型转化。
- 最终,界说数据源的数据分片格局
SourceSplit
类和闯将办理Split
的人物SourceSplitCoordinator
类
- 最终完结
SourceReader
完结从Split
中进行数据的读取。
Job Type | Boundedness |
---|---|
batch | Boundedness.BOUNDEDNESS |
stream | Boundedness.UNBOUNDEDNESS |
- 每个
SourceReader
都在独立的线程中履行,并确保SourceSplitCoordinator
分配给不同SourceReader
的切片没有交集
- 在
SourceReader
的履行周期中,开发者只需求关注如何从结构好的切片中去读取数据,之后完结数据类型对转化,将外部数据类型转化成BitSail
的Row
类型传递给下流即可
Reader示例
public class FakeSourceReader extends SimpleSourceReaderBase<Row> {
private final BitSailConfiguration readerConfiguration;
private final TypeInfo<?>[] typeInfos;
private final transient int totalCount;
private final transient RateLimiter fakeGenerateRate;
private final transient AtomicLong counter;
private final FakeRowGenerator fakeRowGenerator;
public FakeSourceReader(BitSailConfiguration readerConfiguration, Context context) {
this.readerConfiguration = readerConfiguration;
this.typeInfos = context.getTypeInfos();
this.totalCount = readerConfiguration.get(FakeReaderOptions.TOTAL_COUNT);
this.fakeGenerateRate = RateLimiter.create(readerConfiguration.get(FakeReaderOptions.RATE));
this.counter = new AtomicLong();
this.fakeRowGenerator = new FakeRowGenerator(readerConfiguration, context.getIndexOfSubtask());
}
@Override
public void pollNext(SourcePipeline<Row> pipeline) throws Exception {
fakeGenerateRate.acquire();
pipeline.output(fakeRowGenerator.fakeOneRecord(typeInfos));
}
@Override
public boolean hasMoreElements() {
return counter.incrementAndGet() <= totalCount;
}
}
Sink Connector
- Sink:数据写入组件的生命周期办理,首要负责和结构的交互,构架作业,它不参加作业真正的履行。
- Writer:负责将接收到的数据写到外部存储。
- WriterCommitter(可选):对数据进行提交操作,来完结两阶段提交的操作;完结exactly-once的语义。
开发者首要需求创立Sink
类,完结Sink
接口,首要负责数据写入组件的生命周期办理,构架作业。经过configure
办法界说writerConfiguration
的装备,经过createTypeInfoConverter
办法来进行数据类型转化,将内部类型进行转化写到外部体系,同Source
部分。之后咱们再界说Writer
类完结详细的数据写入逻辑,在write
办法调用时将BitSail Row
类型把数据写到缓存队列中,在flush
办法调用时将缓存队列中的数据刷写到方针数据源中。
Writer示例
public class PrintWriter implements Writer<Row, String, Integer> {
private static final Logger LOG = LoggerFactory.getLogger(PrintWriter.class);
private final int batchSize;
private final List<String> fieldNames;
private final List<String> writeBuffer;
private final List<String> commitBuffer;
private final AtomicInteger printCount;
public PrintWriter(int batchSize, List<String> fieldNames) {
this(batchSize, fieldNames, 0);
}
public PrintWriter(int batchSize, List<String> fieldNames, int alreadyPrintCount) {
Preconditions.checkState(batchSize > 0, "batch size must be larger than 0");
this.batchSize = batchSize;
this.fieldNames = fieldNames;
this.writeBuffer = new ArrayList<>(batchSize);
this.commitBuffer = new ArrayList<>(batchSize);
printCount = new AtomicInteger(alreadyPrintCount);
}
@Override
public void write(Row element) {
String[] fields = new String[element.getFields().length];
for (int i = 0; i < element.getFields().length; ++i) {
fields[i] = String.format(""%s":"%s"", fieldNames.get(i), element.getField(i).toString());
}
writeBuffer.add("[" + String.join(",", fields) + "]");
if (writeBuffer.size() == batchSize) {
this.flush(false);
}
printCount.incrementAndGet();
}
@Override
public void flush(boolean endOfInput) {
commitBuffer.addAll(writeBuffer);
writeBuffer.clear();
if (endOfInput) {
LOG.info("all records are sent to commit buffer.");
}
}
@Override
public List<String> prepareCommit() {
return commitBuffer;
}
@Override
public List<Integer> snapshotState(long checkpointId) {
return Collections.singletonList(printCount.get());
}
}
将连接器注册到装备文件中
为你的连接器注册装备文件,来使得结构能够在运行时动态发现它,装备文件的界说如下:
以hive为例,开发者需求在resource目录下新增一个json文件,名字示例为bitsail-connector-hive.json,只要不和其他连接器重复即可
{
"name": "bitsail-connector-hive",
"classes": [
"com.bytedance.bitsail.connector.hive.source.HiveSource",
"com.bytedance.bitsail.connector.hive.sink.HiveSink"
],
"libs": [
"bitsail-connector-hive-${version}.jar"
]
}
测验模块
在Source或许Sink连接器所在的模块中,新增ITCase测验用例,然后依照如下流程支撑
- 经过testcontainer来发动相应的组件
- 编写相应的装备文件
{
"job": {
"common": {
"job_id": 313,
"instance_id": 3123,
"job_name": "bitsail_clickhouse_to_print_test",
"user_name": "test"
},
"reader": {
"class": "com.bytedance.bitsail.connector.clickhouse.source.ClickhouseSource",
"jdbc_url": "jdbc:clickhouse://localhost:8123",
"db_name": "default",
"table_name": "test_ch_table",
"split_field": "id",
"split_config": "{"name": "id", "lower_bound": 0, "upper_bound": "10000", "split_num": 3}",
"sql_filter": "( id % 2 == 0 )",
"columns": [
{
"name": "id",
"type": "int64"
},
{
"name": "int_type",
"type": "int32"
},
{
"name": "double_type",
"type": "float64"
},
{
"name": "string_type",
"type": "string"
},
{
"name": "p_date",
"type": "date"
}
]
},
"writer": {
"class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink"
}
}
}
- 经过代码EmbeddedFlinkCluster.submit来进行作业提交
@Test
public void testClickhouseToPrint() throws Exception {
BitSailConfiguration jobConf = JobConfUtils.fromClasspath("clickhouse_to_print.json");
EmbeddedFlinkCluster.submitJob(jobConf);
}
提交PR
当开发者完结自己的Connector后,就能够相关自己的issue,提交PR到github上了,提交之前,开发者记得Connector添加文档,经过review之后,咱们贡献的Connector就成为BitSail的一部分了,咱们依照贡献程度会选取活泼的Contributor成为咱们的Committer,参加BitSail社区的重大决议计划,期望咱们积极参加!
活动引荐
快来加入BitSail 鼓励计划,成为Contributor!
不仅能够Get到新技术,提升出产效率,还能结识到一群情投意合的小伙伴一起讨论和生长~
更有蓝牙耳机、键盘、音箱等鼓励好礼,送给为BitSail做出积极贡献的你!
issue招领链接:github.com/bytedance/b…
【BitSail种子用户】招募ing,福利多多!
活动流程:
(1)填写开源调研问卷,参加抽奖。
(2)咱们将抽取部分填写问卷用户,参加一对一用户访谈。
问卷链接: www.wjx.cn/vm/w8po1FA.…
填写开源调研问卷,即可参加抽取字节跳动数据平台帆布包、充电宝等精巧礼品。
立即跳转BitSail GitHub代码库房了解更多!
github.com/bytedance/b…