概述

Kafka架构如下,首要由 Producer、Broker、Consumer 三部分组成。一条音讯从出产到消费完成这个过程,能够划分三个阶段,出产阶段、存储阶段、消费阶段。

kafka如何保证消息不丢

  • 出产阶段: 在这个阶段,从音讯在 Producer 创立出来,经过网络传输发送到 Broker 端。
  • 存储阶段: 在这个阶段,音讯在 Broker 端存储,假如是集群,音讯会在这个阶段被复制到其他的副本上。
  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取音讯,经过网络传输发送到Consumer上。

后边怎么保证音讯不丢首要从这三部分来分析。

音讯传递语义

在深度分析音讯丢掉场景之前,咱们先来聊聊「音讯传递语义」到底是个什么玩意?

所谓的音讯传递语义是 Kafka 供给的 Producer 和 Consumer 之间的音讯传递过程中音讯传递的保证性。首要分为三种, 如下图所示:

kafka如何保证消息不丢

  1. 首要当 Producer 向 Broker 发送数据后,会进行 commit,假如 commit 成功,因为 Replica 副本机制的存在,则意味着音讯不会丢掉,可是 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那么 Producer 就无法准确判断该音讯是否现已被提交(commit),这就或许造成 at least once 语义。

  2. Kafka 0.11.0.0 之前, 假如 Producer 没有收到音讯 commit 的呼应成果,它只能从头发送音讯,保证音讯现已被正确的传输到 Broker,从头发送的时分会将音讯再次写入日志中;而在 0.11.0.0 版本之后, Producer 支撑幂等传递选项,保证从头发送不会导致音讯在日志呈现重复。为了完成这个, BrokerProducer 分配了一个ID,并经过每条音讯的序列号进行去重。也支撑了相似业务语义来保证将音讯发送到多个 Topic 分区中,保证一切音讯要么都写入成功,要么都失利,这个首要用在 Topic 之间的 exactly once 语义。

    其间启用幂等传递的办法装备enable.idempotence = true

    启用业务支撑的办法装备:设置属性 transcational.id = "指定值"

  3. Consumer 角度来分析, 咱们知道 Offset 是由 Consumer 自己来维护的, 假如 Consumer 收到音讯后更新 Offset, 这时 Consumer 反常 crash 掉, 那么新的 Consumer 接收后再次重启消费,就会造成 at most once 语义(音讯会丢,但不重复)。

  4. 假如 Consumer 消费音讯完成后, 再更新 Offset,假如这时 Consumer crash 掉,那么新的 Consumer 接收后从头用这个 Offset 拉取音讯, 这时就会造成 at least once 语义(音讯不丢,但被屡次重复处理)。

总结: 默认 Kafka 供给「at least once」语义的音讯传递,允许用户经过在处理音讯之前保存 Offset的方法供给 「at mostonce」 语义。假如咱们能够自己完成消费幂等,理想情况下这个体系的音讯传递便是严厉的「exactly once」, 也便是保证不丢掉、且只会被精确的处理一次,可是这样是很难做到的。

下面咱们分别从出产阶段、存储阶段、消费阶段这几方面看下怎么保证音讯不丢掉。

出产阶段

经过深入解析Kafka音讯发送过程咱们知道Kafka出产者异步发送音讯并回来一个Future,代表发送成果。首要需要咱们获取回来成果判断是否发送成功。

// 异步发送音讯,并设置回调函数 
producer.send(record, new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) { 
            System.err.println("音讯发送失利: " + exception.getMessage()); 
        } else { 
            System.out.println("音讯发送成功到主题: " + metadata.topic() + ", 分区: " + metadata.partition() + ", 偏移量: " + metadata.offset()); 
        } 
    } 
});

音讯行列经过最常用的恳求承认机制,来保证音讯的牢靠传递:当你的代码调用发音讯办法时,音讯行列的客户端会把音讯发送到 Broker,Broker 收到音讯后,会给客户端回来一个承认呼应,表明音讯现已收到了。客户端收到呼应后,完成了一次正常音讯的发送。

Producer(出产者)保证音讯不丢掉的办法:

  1. 发送承认机制:Producer能够运用Kafka的acks参数来装备发送承认机制。经过设置适宜的acks参数值,Producer能够在音讯发送后等候Broker的承认。承认机制供给了不同等级的牢靠性保证,包括:
    • acks=0:Producer在发送音讯后不会等候Broker的承认,这或许导致音讯丢掉危险。
    • acks=1:Producer在发送音讯后等候Broker的承认,保证至少将音讯写入到Leader副本中。
    • acks=all或acks=-1:Producer在发送音讯后等候Broker的承认,保证将音讯写入到一切ISR(In-Sync Replicas)副本中。这供给了最高的牢靠性保证。
  1. 音讯重试机制:Producer能够完成音讯的重试机制来应对发送失利或反常情况。假如发送失利,Producer能够从头发送音讯,直到成功或达到最大重试次数。重试机制能够保证音讯不会因为临时的网络问题或Broker毛病而丢掉。

Broker存储阶段

正常情况下,只需 Broker 在正常运行,就不会呈现丢掉音讯的问题,可是假如 Broker 呈现了毛病,比方进程死掉了或许服务器宕机了,还是或许会丢掉音讯的。

在kafka高性能规划原理中咱们了解到kafka为了进步性能用到了 Page Cache 技能.在读写磁盘日志文件时,其实操作的都是内存,然后由操作体系决定什么时分将 Page Cache 里的数据真实刷入磁盘。假如内存中数据还未刷入磁盘服务宕机了,这个时分还是会丢音讯的。

为了最大程度地下降数据丢掉的或许性,咱们能够考虑以下办法:

  1. 耐久化装备优化:能够经过调整 Kafka 的耐久化装备参数来操控数据刷盘的频率,从而减少数据丢掉的或许性。例如,能够下降 flush.messagesflush.ms 参数的值,以更频繁地刷写数据到磁盘。
  2. 副本因子增加:在 Kafka 中,能够为每个分区设置多个副本,以进步数据的牢靠性。当某个 broker 产生毛病时,其他副本依然可用,能够防止数据丢掉。
  3. 运用acks=all:在出产者装备中,设置 acks=all 能够保证音讯在一切ISR(In-Sync Replicas)中都得到承认后才被视为发送成功。这样能够保证音讯被复制到多个副本中,下降数据丢掉的危险。
  4. 备份数据:定时备份 Kafka 的数据,以便在产生灾难性毛病时能够进行数据康复。

消费阶段

消费阶段选用和出产阶段相似的承认机制来保证音讯的牢靠传递,客户端从 Broker 拉取音讯后,履行用户的消费业务逻辑,成功后,才会给 Broker 发送消费承认呼应。假如 Broker 没有收到消费承认呼应,下次拉音讯的时分还会回来同一条音讯,保证音讯不会在网络传输过程中丢掉,也不会因为客户端在履行消费逻辑中犯错导致丢掉。

  1. 主动提交位移:Consumer能够挑选启用主动提交位移的功能。当Consumer成功处理一批音讯后,它会主动提交当前位移,标记为已消费。这样即便Consumer产生毛病,它能够运用已提交的位移来康复并持续消费之前未处理的音讯。
  2. 手动提交位移:Consumer还能够挑选手动提交位移的方法。在消费一批音讯后,Consumer能够显式地提交位移,以保证处理的音讯被正确记录。这样能够防止重复消费和位移丢掉的问题。

下面是手动提交位移的比如:

// 创立消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList(topic));
try {
    while (true) {
        // 消费音讯
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            // 处理音讯逻辑
            System.out.println("消费音讯:Topic = " + record.topic() +
                    ", Partition = " + record.partition() +
                    ", Offset = " + record.offset() +
                    ", Key = " + record.key() +
                    ", Value = " + record.value());
            // 手动提交位移
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetMetadata = new OffsetAndMetadata(record.offset() + 1);
            consumer.commitSync(Collections.singletonMap(topicPartition, offsetMetadata));
        }
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    consumer.close();
}