著名面试八股文之kafka为什么读写效率高,写的答案之一是partition次序写,因而能确保分区内的不连续的有序性。
这儿的重点是有序追加到磁盘,而不是严厉意义上的彻底有序性。
几年前参加了一大数据岗位面试,95%的时刻在扯java根底(这个能够有)和java web相关。剩余大约5%的时刻换了人聊了一个kafka问题,算是大数据直接相关的东西吧。
于是有以下对话。
M:kafka能确保次序消费吗?
我:呃,我觉得不能。
几秒停顿,措词中。。。。。
M:kafka分区内能确保次序消费啊!
M君带着一丝得意,看看我的简历。
又抬起头,似乎在说,你改悔罢!
我:是的,但有条件,不能彻底确保,得看场景。。。
M:其实咱们公司没有大数据开发,大数据相关用的XXX(不记得了,大约是某公司的一个什么大数据一揽子处理计划)
我:???
该公司是做车联相关的产品的(没有自己独立的大数据渠道,应该车辆用户不多,数据不大,事务不杂乱),
凑巧,我也刚好做过某网红新能源车相关的大数据渠道。
这儿结合新能源车背景来聊一聊kafka在该背景事务场景下,单分区次序消费究竟靠不靠谱。
咱们从数据出产消费两端分别讲一讲。
出产端
1.终端问题
终端毛病,网络或不知道原因
比方车辆传感器毛病等问题导致自身就乱序发送了,徒之奈何?
比方咱们在T+1做守时使命核算车辆前一天的充电行程等使命时,就少部份地发现,还有前两天三天的数据,推迟尺度到达了天。
惯例性地,网络状况到达小时等级的推迟。
之所以是凌晨守时使命跑前一天的数据,便是由于数据推迟时有发生。
假设实时核算,需求数据推迟尽可能的小,在watermark机制(这部份最后会提到)下,超出部份数据将不会被归入核算。这样行程充电等事务就会被漏算,或许一个完整的进程会被切割等反常景象。
关于数据推迟这一块,某些景象上游甲方厂商可能能够处理,有些景象它也束手无策啊,它操控不了终端操作用户的行为。
这时分作为一线开发者,假设一开始容许了产品/运维为了时效性而运用实时核算,到时分出了问题,你能用各种理由解释不是咱们的问题?
开始规划选型的时分考虑到了吗?有备案吗?对方现在还认可吗?
等一系列甩锅扯皮问题。
2.数据歪斜
当时咱们的事务主要是根据某车怎么样进行核算。想要对车辆发生的数据进行次序消费,至少应该将单辆车的数据一致发送到固定的某个partition分区。
对吧?
也便是咱们今天评论的条件是根据一个常识,当咱们评论kafka能否次序消费,一定是分区内才有评论的可能,跨分区整个topic是不能够的。
当然,你也能够说我需求根据上百万辆车悉数进行次序消费。那每辆车有一千多个传感信号,只要在操作进程中,每两秒钟相关的信号都会上报一条记载,每天几十上百亿的数据悉数一致次序处理?
这样kafka topic就只能有一个分区,这样的kafka集群吞吐量不敢想象。
要确保某辆车发生的数据固定发到某个分区,一般状况下,是对车辆的VIN码(车辆唯一标识,相当于人的身份证)对分区数求模,得到的便是该车辆应该发送的分区ID。
kafka的发送分区战略:
-
假设未自定义分区战略,且key为空,轮询分区发送,确保各分区数据平衡。
kafkaTemplate.send(topic, info);
-
假设未自定义分区战略,指定了key,则运用默许分区战略。key对分区数求模得到发送的分区。
kafkaTemplate.send(topic,key, info);
默许分区战略为org.apache.kafka.clients.producer.internals.DefaultPartitioner
- 假设指定了自定义分区战略,不管指没指定key,以自定义战略为准。
@Component
public class DefinePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 这儿也能够装备分区数或许守时获取分区数
return key.hashCode() % (cluster.partitionsForTopic(topic).size() - 1);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
然后指定分区战略
spring.kafka.producer.properties.partitioner.class = com.nyp.test.service.DefinePartitioner
将vin作为key,或许自定义分区,能够将同一辆车发送的数据指定到同一分区。
但是在实践的进程傍边,咱们会发现,有的车作为长途或短途的运输车辆,或许作为网约车,那么每天上报的数据会相对较大,
而有的车当天没有出行或其它充电等任何操作,则没有上报数据。
这样就会形成数据歪斜,导致各节点(broker)各分区之间数据严峻不平衡。
可能会导致以下状况(2,3主要针对大数据结构)
- GC 频频 过多的数据集中在某些分区,使得JVM 的内存资源缺少,导致频频 GC。
- 吞吐下降、推迟增大 数据单点和频频 GC 导致吞吐下降、推迟增大。
- 体系溃散 严峻状况下,过长的 GC 导致 TaskManager 失联,体系溃散。
3.扩容分区的价值
kafka的缺点,也是Pulsar的优点。
简单点说,kafka的数据与broker是存放在一同的,假设要加broker,就需求将数据平衡到新的broker。
而Pulsar的架构则是节点与数据分离,音讯服务层与存储层彻底解耦,从而使各层能够独立扩展,所以扩容的时分会十分便当。当然这不是本文的重点。
总归,
当kafka需求扩容或许对topic添加分区时,由第2点咱们得知,数据将发往哪个分区将由key%分区数
决议,当分区数量变化后,一切的现有数据在进行扩容或重分区的时分都有必要进行key%分区数
进行重路由。
这一步的价值有必要考虑进去。
4.单分区,A,B音讯次序发送,A失利B成功,A再重试发送,变成BA次序?
4.1 音讯的发送
kafka需求在单分区确保音讯按发生时刻正序排列,至少应该确保按音讯发生的时刻正序发送。
假定音讯源严厉依照时刻发生的条件,
- 能够同步发送,一次只发送一条。
同步发送,堵塞直至发送成功,回来SendResult
目标,里边包含ProducerRecord
和RecordMetadata
目标。
SendResult result = kafkaTemplate.send(topic, key, info).get();
- 也能够异步发送,当数据到达一定巨细批量提交到集群,或许3秒钟提交一次到集群。
异步发送,回来一个ListenableFuture
目标,大家应该对Future
不生疏。此目标能够添加回调方法。在成功或失利时履行相应的使命。
ListenableFuture<SendResult<Object, Object>> listenableFuture = kafkaTemplate.send(topic, key, info);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable ex) {
}
@Override
public void onSuccess(SendResult<Object, Object> result) {
}
});
一同,异步发送需求添加相应的装备,比方一次提交多少条数据,比方假设数据迟迟没有到达发送数据量,需求设定一个最大时刻,超越这个时刻阀值需提交一次,等等。
留意后两个参数的装备。
不同版别之间,参数称号会有差异。
-
batch.size
每当多个记载被发送到同一个分区时,出产者将尝试将记载批处理到更少的恳求中。这有助于进步客户机和服务器上的功能。此装备操控以字节为单位的默许批处理巨细。 较小的批巨细将使批处理不那么常见,并可能降低吞吐量(批巨细为零将彻底禁用批处理)。十分大的批处理巨细可能会更糟蹋内存,由于咱们总是会分配指定批处理巨细的缓冲区,以预期会有额定的记载。
留意几点:
- 此参数操控的发送批次的巨细是以字节数,而不是数据条数。
- 此参数操控粒度为分区,而不是topic。当发往某个分区的数据大于等于此巨细时将建议一次提交。
- 合理操控此参数。
-
linger.ms
这个设置给出了批处理推迟的上限:一旦咱们获得了一个分区的batch_size
值的记载,不管这个设置如何,它都会当即发送,但是假设咱们为这个分区积累的字节少于这个数,咱们将在指定的时刻内“逗留”,等候更多的记载呈现。该设置默许为0(即没有推迟)。例如,设置LINGER_MS_CONFIG =5能够削减发送的恳求数量,但在没有负载的状况下,发送的记载将添加5ms的推迟。 -
max.block.ms
前两个参数能堵塞(等候)多长时刻。 -
buffer.memory
出产者能够用来缓冲等候发送到服务器的记载的内存的总字节数。假设发送记载的速度比发送到服务器的速度快,出产者将堵塞max.block.ms,之后它将抛出反常
。
这个设置应该大致对应于出产者将运用的总内存,但不是硬性约束,由于不是出产者运用的一切内存都用于缓冲。一些额定的内存将用于紧缩(假设启用了紧缩)以及保护正在运行的恳求。
4.2 音讯的承认(ack)
前面音讯现已发送出去了,但要确保不丢音讯,不重发音讯,即Exactly Once
精次一次性消费,至少需求确保出产端的音讯承认机制。
acks
参数操控的是音讯发出后,kafka集群是否需求呼应,以及呼应的等级。
-
acks=0
假设设置为0,那么出产者将不会等候服务器的任何承认。该记载将当即添加到套接字缓冲区并被以为已发送。在这种状况下,不能确保服务器现已接纳到记载,重试装备将不会生效(由于客户端通常不会知道任何失利)。为每条记载回来的偏移量将一直设置为-1。
为便当回忆,这儿的0指是的需求0个节点承认。
-
ack= 1
这将意味着leader将记载写入其本地日志,但将在不等候一切follower彻底承认的状况下进行呼应。在这种状况下,假设leader在承认记载后当即失利,但在follower复制它之前,那么记载将丢失。为便当回忆,这儿的1指的是只需一个节点承认,这儿一个节点肯定指的是主节点leader.
-
ack=all或-1
这是最高等级的承认机制,一同也意味着吞吐量受到约束。它将等候leader和一切follower副本都呼应,才以为发送完毕。为便当回忆,这儿的all指的是需求一切节点承认。
4.3 幂等性
回到第4末节的主题,当由于网络抖动或许其它任何已知不知道原因,音讯AB发送次序由于A失利重试终究变成了BA的倒序,那么kafka分区还能坚持开始期望中的AB有序性吗?
答案是能够,只要敞开幂等性,在Producer ID(即PID)和Sequence Number的根底上,音讯终究将坚持AB的次序。
幂等性关于WEB程序员应该不会生疏,前端调用后端接口,写入订单或许建议付出,由于用户重复操作网络重试等各种反常原因导致屡次恳求,后端应确保只呼应一次恳求或/且终究效果一致。
后端各微服务之间调用也有重试,也是相同的道理。
详细到kafka音讯发送,跟4.2末节中的Exactly Once
实践上有类似的地方,经过设置enable.idempotence=true
敞开幂等性,它的根底或条件条件是,会主动设置ack=all
。
如何设置kafka出产端的幂等性?
-
enable.idempotence=true 显式敞开幂等性。kafka 3.0以上的版别,此值为false,这儿应该显式设置。
-
replication.factor kafka集群的副本数 至少应大于1
-
acks=all kafka 3.0 今后的版别,此值为1,这儿应该显式设置。
-
max.in.flight.requests.per.connection=1
在堵塞之前,客户端将在单个连接上发送的未承认恳求的最大数量。请留意,假设将此设置设置为大于1,而且存在失利的发送,则存在由于重试(即,假设启用了重试)而导致音讯从头排序的危险。 默许值为5,假设要敞开幂等性,此值应<=5。 但假设引值>1 <=5 不会报错,但仍是有乱序的危险。 -
retries > 0 重试次数应大于0,否则没有重试。那样的话,A失利后也不能再发成功,即4末节开始的问题。
留意:当用户设置了enable.idempotence=true
,但没有显式设置3,4,5,则体系将选择合适的值。假设设置了不兼容的值,将抛出ConfigException。
一同,为确保完整性,消费端应确保 enable.auto.commit=false
,isolation.level=read_committed
,即主动承认改为手动承认
,事务隔离等级改为读已提交
。
4.3 幂等性原理
kafka为处理数据乱序和重发引进了PID和Sequence Number的概念。 每个producer都会有一个producer id即PID。这对用户不可见。
出产端发送的每条音讯的每个<Topic, Partition>都对应一个单调递加的Sequence Number。
相同,Broker端也会为每个<PID, Topic, Partition>保护一个序号,而且每Commit一条音讯时将其对应序号递加。
-
关于接纳的每条音讯,假设其序号比Broker保护的序号大1,则Broker会接受它,否则将其丢掉.
-
假设音讯序号比Broker保护的序号差值比1大,阐明中间有数据尚未写入,即乱序,此刻Broker回绝该音讯
-
假设音讯序号小于等于Broker保护的序号,阐明该音讯已被保存,即为重复音讯,Broker直接丢掉该音讯
发送失利后会重试,这样能够确保每个音讯都被发送到broker。
这儿再解释一下为什么能处理乱序,假定broker在接纳到 A音讯之前的Sequence Number为10,
A在出产端为11,B为12,
由于某种原因,A失利了,此刻broker端的Sequence Number依然为10
此刻,B到达broker,它为12,大于10,且它们之间的差异大于1,此刻回绝音讯B.B音讯发送失利。
然后A重试,成功,Sequence Number变为11,
再然后B重试,此刻成功。
终究,AB两条音讯以开始的次序写入成功。
消费端(非大数据形式)
5 单线程和多线程都不能确保跨分区次序
音讯量十分大,topic具有几十几百分区的状况下,消费端只用一个线程去消费,单是想想就知道不太实际,功能拉跨。
先搞搞一个测验demo测验多线程消费
向10个分区随机发送100条数据,数据末尾带上1-100递加的序号.
public void sendDocInfo(String info) {
try {
Random random = new Random();
kafkaTemplate.send("test10", random.nextInt(9)+"", info + "_" + i).get();
} catch (Exception e) {
log.error("kafka发送反常 " + e);
}
}
在消费端打印消费,带上分区ID。
@KafkaListener(
topics = "test10",
groupId = "heilu-group"
)
public void handle(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
records.forEach(e -> {
log.info(e.partition() +" 分区接纳到音讯 : " + e.value());
});
ack.acknowledge();
}
能够很明显的看到跨分区乱序。
6.线程-分区一一对应
这种状况能确保某个线程内的有序性。
但假设有上百个分区,需求手动写这么多套代码,这好吗?
每个线程只消费一个对应的分区
@KafkaListener(
groupId = "test-group",
topicPartitions ={@TopicPartition(topic = "test10", partitions = { "0"})}
)
如图
至于Retry的状况,根据源码,需求kafka集群模仿一个反常才干完成,在本地经过拦截器或其它方式都是模仿不出来的。
所以没做这块的演示。
response.error
不为NONE
的状况下,才做canRetry
判别.
7.大数据领域的处理(缓解)计划watermark机制。
在任何出产领域,数据的推迟和乱序是一定会发生的。无非是概率巨细,严峻程度不同而已。
关于这种状况,大数据结构的共识是,关于数据乱序推迟,咱们要等,但不能无限等候下去。
因而flink/spark引进了watermark俗称水印机制。
请留意,此机制是为了缓解
数据的推迟和乱序,而不是彻底处理该问题。
就像开篇所说的第1点,车辆跑在路上总会有各种突发状况,传感器会老化,深山老林信号不好,这种状况连终端出产厂商都无法彻底处理,下流数据厂商怎么能根除呢?
watermark一般配合Window一同运用。
假设对window不了解的,能够参阅我之前写的这篇文章 关于我由于flink成为spark源码贡献者这件小事
能够简单理解为一个时刻段(微批,短至毫秒,长可至时分秒),处理一批数据。
不是搞大数据的,对大数据不感兴趣的,能够跳过这一部份。
- watermark的本质是一个时刻戳,它是为了应对数据乱序和推迟的一种机制。
- watermark = max(eventTime) – 允许迟到的长度
- window中,不考虑allowLateness,当watermark等于大于end-of-window时,窗口触发核算和销毁。
比方:
- 有一个窗口`[12:00-12:05)`,watermark允许迟到1分钟, 接纳到两条数据时刻分别为`12:03:43`,`12:05:23`,
那么watermark = `12:05:23 – 1 minute = 12:04:23` 小于12:05,所以窗口没有结束,不触发核算
注:严厉意义来讲,[watermark = `12:05:23 – 1 minute -1ms`] 由于 end-of-window判别的时分是>= - 当接纳到一条数据时刻为12:06时,窗口触发核算 假设allowLateness>0,窗口推迟销毁,假设来了一条数据时刻为12:04:49会再次触发窗口核算 假设来了一条数据时刻为12:05:01,不会进行当时窗口,会进入到下一个窗口
- 有一个窗口`[12:00-12:05)`,watermark允许迟到1分钟, 接纳到两条数据时刻分别为`12:03:43`,`12:05:23`,
那么watermark = `12:05:23 – 1 minute = 12:04:23` 小于12:05,所以窗口没有结束,不触发核算
- 考虑到代码并发度与上游(如kafka,socket)分区数不匹配可能会导致有些分区消费不到数据,如测验socket只要一个分区,而flink代码中有8个并发度,
那么
- 会有7个并发度里消费不到数据,它的watermark为Long.minvalue,
- 而flink的watermark在多并发度下,以最迟的那个为准,所以
-
整个flink使命中的watermark就为Long.minvalue,这时整个使命不会输出使命数据,由于watermark过小,触发不了使命window.
类似于木桶理论,一个木桶能装多少水由最短的那根木桶决议;相同的,flink使命中的watermark由最小的分区的watermark决议。
处理方法:
- 设置两边分区度坚持一致
- 高版别里 .withIdleness(Duration.ofSeconds(x)) 在这个时刻里,假设有闲暇分区没有消费数据,那么它将不持有水印, 即全局水印的推动将不考虑这些闲暇分区。
- 假设flink使命收到一个过错数据,远超现在的体系时刻,如2100-09-09 00:00:00,在除了闲暇分区外的分区都收到这样的数据,那么flink使命的watermark 将超越体系时刻,那么正常数据将不会被体系正常处理。这时,在watermark生成器这儿要做特殊处理。
- Watermark怎样生成?实时生成和周期性生成(时刻或许条数),别忘了第5条。
这部份的源码,感兴趣的能够试一下,引进flink依靠,版别1.14,没有运用kafka,运用
nc -lk 9090
可出产数据。我的观念是,每一个后端程序员都应该了解一点大数据核算。
能够看下我这篇文章。揭开神秘面纱,会stream流就会大数据
public static void main(String[] args) { Configuration configuration = new Configuration(); configuration.setInteger("heartbeat.timeout", 180000); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); // 并行度和上游数据分区数对watermark生效的影响 // streamExecutionEnvironment.setParallelism(1); // nc -lk 9090 DataStream<TestObject> dataStream = streamExecutionEnvironment .socketTextStream( "192.168.124.123", 9090) .map( e -> { try { Gson gson = new GsonBuilder() .setDateFormat("yyyy-MM-dd HH:mm:ss") .create(); TestObject object = gson.fromJson(e, TestObject.class); return object; } catch (Exception exception) { exception.printStackTrace(); System.out.println("反常数据 = " + e); return new TestObject(); } }); try { OutputTag<TestObject> lateOutput = new OutputTag<>("lateData", TypeInformation.of(TestObject.class)); SingleOutputStreamOperator result = dataStream .filter(e -> StringUtils.isNoneBlank(e.key)) .assignTimestampsAndWatermarks( (WatermarkStrategy<TestObject>) WatermarkStrategy .<TestObject> forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner( (row, ts) -> { System.out.println("source = " + row); DateTimeFormatter dtf2 = DateTimeFormatter .ofPattern("yyyy-MM-dd HH:mm:ss", Locale.CHINA); Long time = row.getTime().getTime(); System.out.println("time = " + time); // 假设eventTime > 体系时刻,这儿要做处理 // TODO 假设eventTime远小于体系时刻,可能会拖慢全体的Watermark Long now = System.currentTimeMillis(); return time > now ? now : time; } ) .withIdleness(Duration.ofSeconds(5)) ) .keyBy(e -> e.key) .window( SlidingEventTimeWindows.of( Time.seconds(60 * 2), Time.seconds(60))) // 将推迟的数据旁路输出 .sideOutputLateData(lateOutput) .process( new ProcessWindowFunction<TestObject, Object, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<TestObject> elements, Collector<Object> out) throws Exception { System.out.println("watermark = " + context.currentWatermark()); System.out.println("watermark = " + new Timestamp(context.currentWatermark()) +" window.start = " + new Timestamp(context.window().getStart()) +" window.end = " + new Timestamp(context.window().getEnd())); elements.forEach(e -> System.out.println("e + " + e)); } }); result.print(); // 迟到不处理的数据 result.getSideOutput(lateOutput).print(); streamExecutionEnvironment.execute("WaterMark test"); } catch (Exception exception) { exception.printStackTrace(); } } @Data @NoArgsConstructor public static class TestObject { private String key; private Timestamp time; private float price; }
8. 小结
kafka为了吞吐量,在出产端设计了次序追加形式,这两者才是因果, 得益于此,kafka单分区内的数据
能够
变得有序,这仅仅一个副产品。它一同得考虑到数据终端带来的先天不足,
分区节点间的数据歪斜带来的功能问题,
分区节点扩容的价值,
幂等性所需求价值带来的吞吐量约束,
以及消费端的约束。种种问题考量。
幂等性更多的是做一次精准消费,防止重复消费,有序仅仅副产品。 有且只要一次精准消费,可比什么劳什子有序消费重要得多!
就像摩托车是一个交通工具,能跑在廉价的路途(一般服务器)上,将便当(从前巨大上的大数据)带到千家万户(一般小公司)。
但它不是装X工具。给我个人的感觉,假设真要把kafka的分区有序性强行用到出产环境,就像下图这样。
告辞。
参阅:
docs.confluent.io/cloud/curre…
nightlies.apache.org/flink/flink…
nightlies.apache.org/flink/flink…
/post/720067…
/post/722661…