概述
Kafka架构如下,首要由 Producer、Broker、Consumer 三部分组成。一条音讯从出产到消费完成这个过程,能够划分三个阶段,出产阶段、存储阶段、消费阶段。
- 出产阶段: 在这个阶段,从音讯在 Producer 创立出来,经过网络传输发送到 Broker 端。
- 存储阶段: 在这个阶段,音讯在 Broker 端存储,假如是集群,音讯会在这个阶段被复制到其他的副本上。
- 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取音讯,经过网络传输发送到Consumer上。
后边怎么保证音讯不丢首要从这三部分来分析。
音讯传递语义
在深度分析音讯丢掉场景之前,咱们先来聊聊「音讯传递语义」到底是个什么玩意?
所谓的音讯传递语义是 Kafka 供给的 Producer 和 Consumer 之间的音讯传递过程中音讯传递的保证性。首要分为三种, 如下图所示:
-
首要当 Producer 向 Broker 发送数据后,会进行
commit
,假如commit
成功,因为Replica
副本机制的存在,则意味着音讯不会丢掉,可是Producer
发送数据给Broker
后,遇到网络问题而造成通信中断,那么Producer
就无法准确判断该音讯是否现已被提交(commit
),这就或许造成at least once
语义。 -
在
Kafka 0.11.0.0
之前, 假如Producer
没有收到音讯commit
的呼应成果,它只能从头发送音讯,保证音讯现已被正确的传输到 Broker,从头发送的时分会将音讯再次写入日志中;而在0.11.0.0
版本之后,Producer
支撑幂等传递选项,保证从头发送不会导致音讯在日志呈现重复。为了完成这个,Broker
为Producer
分配了一个ID,并经过每条音讯的序列号进行去重。也支撑了相似业务语义来保证将音讯发送到多个 Topic 分区中,保证一切音讯要么都写入成功,要么都失利,这个首要用在 Topic 之间的exactly once
语义。其间启用幂等传递的办法装备:
enable.idempotence = true
。启用业务支撑的办法装备:设置属性
transcational.id = "指定值"
。 -
从
Consumer
角度来分析, 咱们知道Offset
是由Consumer
自己来维护的, 假如Consumer
收到音讯后更新Offset
, 这时Consumer
反常crash
掉, 那么新的Consumer
接收后再次重启消费,就会造成at most once
语义(音讯会丢,但不重复)。 -
假如
Consumer
消费音讯完成后, 再更新Offset
,假如这时Consumer crash
掉,那么新的Consumer
接收后从头用这个Offset
拉取音讯, 这时就会造成at least once
语义(音讯不丢,但被屡次重复处理)。
总结: 默认 Kafka 供给「at least once」语义的音讯传递,允许用户经过在处理音讯之前保存 Offset的方法供给 「at mostonce」 语义。假如咱们能够自己完成消费幂等,理想情况下这个体系的音讯传递便是严厉的「exactly once」, 也便是保证不丢掉、且只会被精确的处理一次,可是这样是很难做到的。
下面咱们分别从出产阶段、存储阶段、消费阶段这几方面看下怎么保证音讯不丢掉。
出产阶段
经过深入解析Kafka音讯发送过程咱们知道Kafka出产者异步发送音讯并回来一个Future,代表发送成果。首要需要咱们获取回来成果判断是否发送成功。
// 异步发送音讯,并设置回调函数
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("音讯发送失利: " + exception.getMessage());
} else {
System.out.println("音讯发送成功到主题: " + metadata.topic() + ", 分区: " + metadata.partition() + ", 偏移量: " + metadata.offset());
}
}
});
音讯行列经过最常用的恳求承认机制,来保证音讯的牢靠传递:当你的代码调用发音讯办法时,音讯行列的客户端会把音讯发送到 Broker,Broker 收到音讯后,会给客户端回来一个承认呼应,表明音讯现已收到了。客户端收到呼应后,完成了一次正常音讯的发送。
Producer(出产者)保证音讯不丢掉的办法:
- 发送承认机制:Producer能够运用Kafka的acks参数来装备发送承认机制。经过设置适宜的acks参数值,Producer能够在音讯发送后等候Broker的承认。承认机制供给了不同等级的牢靠性保证,包括:
-
- acks=0:Producer在发送音讯后不会等候Broker的承认,这或许导致音讯丢掉危险。
- acks=1:Producer在发送音讯后等候Broker的承认,保证至少将音讯写入到Leader副本中。
- acks=all或acks=-1:Producer在发送音讯后等候Broker的承认,保证将音讯写入到一切ISR(In-Sync Replicas)副本中。这供给了最高的牢靠性保证。
- 音讯重试机制:Producer能够完成音讯的重试机制来应对发送失利或反常情况。假如发送失利,Producer能够从头发送音讯,直到成功或达到最大重试次数。重试机制能够保证音讯不会因为临时的网络问题或Broker毛病而丢掉。
Broker存储阶段
正常情况下,只需 Broker 在正常运行,就不会呈现丢掉音讯的问题,可是假如 Broker 呈现了毛病,比方进程死掉了或许服务器宕机了,还是或许会丢掉音讯的。
在kafka高性能规划原理中咱们了解到kafka为了进步性能用到了 Page Cache
技能.在读写磁盘日志文件时,其实操作的都是内存,然后由操作体系决定什么时分将 Page Cache 里的数据真实刷入磁盘。假如内存中数据还未刷入磁盘服务宕机了,这个时分还是会丢音讯的。
为了最大程度地下降数据丢掉的或许性,咱们能够考虑以下办法:
-
耐久化装备优化:能够经过调整 Kafka 的耐久化装备参数来操控数据刷盘的频率,从而减少数据丢掉的或许性。例如,能够下降
flush.messages
和flush.ms
参数的值,以更频繁地刷写数据到磁盘。 - 副本因子增加:在 Kafka 中,能够为每个分区设置多个副本,以进步数据的牢靠性。当某个 broker 产生毛病时,其他副本依然可用,能够防止数据丢掉。
-
运用acks=all:在出产者装备中,设置
acks=all
能够保证音讯在一切ISR(In-Sync Replicas)中都得到承认后才被视为发送成功。这样能够保证音讯被复制到多个副本中,下降数据丢掉的危险。 - 备份数据:定时备份 Kafka 的数据,以便在产生灾难性毛病时能够进行数据康复。
消费阶段
消费阶段选用和出产阶段相似的承认机制来保证音讯的牢靠传递,客户端从 Broker 拉取音讯后,履行用户的消费业务逻辑,成功后,才会给 Broker 发送消费承认呼应。假如 Broker 没有收到消费承认呼应,下次拉音讯的时分还会回来同一条音讯,保证音讯不会在网络传输过程中丢掉,也不会因为客户端在履行消费逻辑中犯错导致丢掉。
- 主动提交位移:Consumer能够挑选启用主动提交位移的功能。当Consumer成功处理一批音讯后,它会主动提交当前位移,标记为已消费。这样即便Consumer产生毛病,它能够运用已提交的位移来康复并持续消费之前未处理的音讯。
- 手动提交位移:Consumer还能够挑选手动提交位移的方法。在消费一批音讯后,Consumer能够显式地提交位移,以保证处理的音讯被正确记录。这样能够防止重复消费和位移丢掉的问题。
下面是手动提交位移的比如:
// 创立消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
// 消费音讯
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 处理音讯逻辑
System.out.println("消费音讯:Topic = " + record.topic() +
", Partition = " + record.partition() +
", Offset = " + record.offset() +
", Key = " + record.key() +
", Value = " + record.value());
// 手动提交位移
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetMetadata = new OffsetAndMetadata(record.offset() + 1);
consumer.commitSync(Collections.singletonMap(topicPartition, offsetMetadata));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}