音讯行列
音讯行列形式
点对点
出产者出产音讯发送到Queue,顾客从Queue取出数据,并消费数据,数据被消费,Queue不再存储,Queue支撑多个顾客,一条音讯只能被消费一次(只需一个顾客可以消费到)
发布/订阅(一对多)
出产者发送音讯到topic中,多个顾客订阅topic,和点对点不同,发布到topic的音讯会被一切订阅者消费
kafka架构
- Producer:音讯出产者,向kafka broker发送音讯
- Consumer:音讯顾客,从kafka broker取音讯
- Consumer Group:多个consumer组成,顾客组内不同顾客担任消费不同分区的数据,kafka的topic下的一条音讯,只能被同一个顾客组的一个顾客消费到
- consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
- group.id是一个字符串,仅有标识一个consumer group
- consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)
- Broker:一台服务器便是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic
- Topic:主题(行列)
- Partition:分区,kafka的扩展性表现,一个庞大的topic有许多分区(partition),可以分不到多个broker上去,每个 partition 是一个有序的行列, partition 中的每条音讯 都会被分配一个有序的 id (offset) kafka 只确保按一个 partition 中的次序将音讯发给consumer ,不确保一个 topic的全体(多个 partition 间)的次序;
- Replica:副本,当集群某个节点毛病时,该节点的partitiion数据不丢掉,kafka的副本机制,一个topic的每个分区有多个副本,一个leader和follower
- follower:每个分区的多个副本的“从”,实时从leader中同步数据,坚持leader数据的同步
- leader:每个分区副本的主,出产者发送数据的目标,顾客消费数据的目标
- Offset:每个partition都由一系列有序的、不可变的音讯组成,这些音讯被接连的追加到partition中。partition中的每个音讯都有一个接连的序列号叫做offset,用于partition仅有标识一条音讯;kafka 的存储文件都是依照 offset.kafka来命名,用 offset 名字的好处是便利查。例如你想坐落 2049,只需找到2048.kafka的文件即可。当然 the first offsetthe 就 是 00000000000.kafka ;
- zookeeper:保存offset数据(0.9版别之前),确保高可用,0.9版别之后offset数据存放在kafka的体系特定topic中;
装备文件
#装备文件
#broker的大局仅有编号
broker.id=0
#删去 topic
delete.topic.enable=true
#处理网络恳求
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区巨细
socket.send.buffer.bytes=102400
#接纳套字的缓冲区巨细
socket.receive.buffer.bytes=102400
#恳求套接字的缓冲区巨细
socket.request.max.bytes=104857600
#kafka运转日志存放的途径
log.dirs=/opt/module/kafka/logs
#topic在当时
num.partitions=1
#用来康复和清理data下数据的线程量
num.recovery.threads.per.data.dir=1
#segment文件保存的最长时刻,超将被删去
log.retention.hours=168
#装备衔接Zookeeper集群地址
zookeeper.connect=hadoop102:2181 ,
常用指令
#发动zookeeper
bash bin/kafka -topics.sh -- zookeeper localhost:2181 -- list
#发动kafka
bash /etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties
#创立topic
sh /etc/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create
--replication-factor 3 --partitions 1 --topic first
#删去topic
sh /etc/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic first
#消费音讯
sh /etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
#出产音讯
sh /etc/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first
Zookeepr
- kafka集群中有一个broker会被推举成Controller,担任办理集群broker的上下线、一切topic的分区副本分配和leader推举
- Controller办理依靠于Zookeeper
Prodeucer(出产者)
作业流程
- producer先从zookeeper的 “/brokers/…/state”节点找到该partition的leader
- producer将音讯发送给该leader
- leader将音讯写入本地log
- followers从leader pull音讯
- 写入本地log后向leader发送ACK
- leader收到一切ISR中的replication的ACK后,添加HW(high watermark,终究commit 的offset)并向producer发送ACK
- Kafka 中音讯是以topic 进行分类的,出产者出产音讯,顾客消费音讯,都是面向topic的。
- topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应一个log 文件,该log 文件中存储的便是producer 出产的数据。Producer 出产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。顾客组中的每个顾客,都会实时记载自己消费到了哪个offset,以便出错康复时,从上次的方位持续消费。
文件存储
- Broker:音讯中心件处理结点,一个Kafka节点便是一个broker,多个broker可以组成一个Kafka集群。
- Topic:一类音讯,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群可以一起担任多个topic的分发。
- Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的行列。
- Segment:partition物理上由多个segment组成
- offset:每个partition都由一系列有序的、不可变的音讯组成,这些音讯被接连的追加到partition中。partition中的每个音讯都有一个接连的序列号叫做offset,用于partition仅有标识一条音讯.
分析进程分为以下4个进程:
- topic中partition存储散布
- partiton中文件存储办法
- partiton中segment文件存储结构
- 在partition中怎么经过offset查找message
topic中partition存储散布
假定实验环境中Kafka集群只需一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件装备(参数log.dirs=xxx/message-folder),
例如创立2个topic称号别离为report_push、launch_info, partitions数量都为partitions=4 存储途径和目录规矩为: xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规矩为topic称号+有序序号,第一个partiton序号从0开端,序号最大值为partitions数量减1
partiton中文件存储办法
- 每个partion(目录)相当于一个巨型文件被平均分配到多个巨细持平segment(段)数据文件中。但每个段segment file音讯数量不一定持平,这种特性便利old segment file快速被删去。
- 每个partiton只需求支撑次序读写就行了,segment文件生命周期由服务端装备参数决议。
这样做的好处便是能快速删去无用文件,有用进步磁盘运用率
partiton中segment文件存储结构
- segment file组成:由2大部分组成,别离为index file和data file,此2个文件一一对应,成对呈现,后缀”.index”和“.log”别离标明为segment索引文件、数据文件.
- segment文件命名规矩:partion大局的第一个segment从0开端,后续每个segment文件名为上一个segment文件终究一条音讯的offset值。数值最大为64位long巨细,19位数字字符长度,没有数字用0填充。
创立一个topicXXX包括1 partition,设置每个segment巨细为500MB,并发动producer向Kafka broker写入许多数据如下图2所示segment文件列表形象阐明晰上述2个规矩:
以上图中一对segment file文件为例,阐明segment中index<—->data file对应联系物理结构如下:
上图中索引文件存储许多元数据,数据文件存储许多音讯,索引文件中元数据指向对应数据文件中message的物理偏移地址。 其间以索引文件中元数据3,497为例,顺次在数据文件中标明第3个message(在大局partiton标明第368772(368796+3)个message)、以及该音讯的物理偏移地址为497。
segment data file由许多message组成,每个message物理结构如下:
关键字 | 解说阐明 |
---|---|
8 byte offset | 在parition(分区)内的每条音讯都有一个有序的id号,这个id号被称为偏移(offset),它可以仅有承认每条音讯在parition(分区)内的方位。即offset标明partiion的第多少message |
4 byte message size | message巨细 |
4 byte CRC32 | 用crc32校验message |
1 byte “magic” | 标明本次发布Kafka服务程序协议版别号 |
1 byte “attributes” | 标明为独立版别、或标识紧缩类型、或编码类型。 |
4 byte key length | 标明key的长度,当key为-1时,K byte key字段不填 |
K byte key | 可选 |
value bytes payload | 标明实践音讯数据。 |
经过offset查找message
例如读取offset=368776的message,需求经过下面2个进程查找。
- 第一步查找segment file ,其间00000000000000000000.index标明最开端的文件,开端偏移量(offset)为0;第二个文件00000000000000368769.index的音讯量开端偏移量为368770 = 368769 + 1.相同,第三个文件00000000000000737337.index的开端偏移量为737338=737337 + 1,其他后续文件顺次类推,以开端偏移量命名并排序这些文件,只需依据offset 二分查找文件列表,就可以快速定位到详细文件。 当offset=368776时定位到00000000000000368769.index|log
- 第二步经过segment file查找message 经过第一步定位到segment file,当offset=368776时,顺次定位到00000000000000368769.index的元数据物理方位和00000000000000368769.log的物理偏移地址,然后再经过00000000000000368769.log次序查找直到offset=368776为止。
segment index file采取稀少索引存储办法,它削减索引文件巨细,经过mmap可以直接内存操作,稀少索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需求消耗更多的时刻。
Kafka文件存储优势
Kafka运转时很少有许多读磁盘的操作,首要是定时批量写磁盘操作,因而操作磁盘很高效。这跟Kafka文件存储中读写message的规划是休戚相关的。Kafka中读写message有如下特色:
写message
- 音讯从java堆转入page cache(即物理内存)。
- 由异步线程刷盘,音讯从page cache刷入磁盘。
读message
- 音讯直接从page cache转入socket发送出去。
- 当从page cache没有找到相应数据时,此刻会产生磁盘IO,从磁 盘Load音讯到page cache,然后直接从socket发出去
Kafka高效文件存储规划特色
- Kafka把topic中一个parition大文件分红多个小文件段,经过多个小文件段,就容易定时铲除或删去现已消费完文件,削减磁盘占用。
- 经过索引信息可以快速定位message和承认response的最大巨细。
- 经过index元数据悉数映射到memory,可以防止segment file的IO磁盘操作。
- 经过索引文件稀少存储,可以大幅降低index文件元数据占用空间巨细。
零仿制
分区
分区原因
- 便利在集群中扩展 便利在集群中扩展 ,每个 Partition可以经过调整习惯它地点的机器,而一个 topic又可以有多个Partition组成,因而整个集群就可以习惯任意巨细的数据了
- 可以进步并发 ,由于可以以 Partition为单位读写了
从数据组织形式来说,kafka有三层形式,kafka有多个主题,每个主题有多个分区,每个分区又有多条音讯
分区个数
分区越多,所需求消耗的资源就越多。甚至假如足够大的时分,还会触发到操作体系的一些参数限制。比方linux中的文件描绘符限制,一般在创立线程,创立socket,翻开文件的场景下,linux默许的文件描绘符参数,只需1024,超越则会报错。
由于每个业务场景都不同,只能结合详细业务来看。假如每秒钟需求从主题写入和读取1GB数据,而顾客1秒钟最多处理50MB的数据,那么这个时分就可以设置20-25个分区,当然还要结合详细的物理资源状况。
而怎么无法估算出大约的处理速度和时刻,那么就用基准测验来测验吧。创立不同分区的topic,逐渐压测测出终究的成果。假如实在是懒得测,那比较无脑的承认分区数的办法便是broker机器数量的2~3倍。
出产者分区写入战略
出产者在将音讯发送到某个Topic ,需求经过拦截器、序列化器和分区器(
Partitioner
)的一系列效果之后才干发送到对应的Broker,在发往Broker之前是需求承认它所发往的分区,kafka怎么将数据分配到不同分区中的战略
- 假如音讯
ProducerRecord
指定了partition字段,那么就不需求分区器。 - 假如音讯
ProducerRecord
没有指定partition字段,那么就需求依靠分区器,依据key这个字段来核算partition的值。分区器的效果便是为音讯分配分区。
public class ProducerRecord<K, V> {
// 该音讯需求发往的主题
private final String topic;
// 该音讯需求发往的主题中的某个分区,假如该字段有值,则分区器不起效果,直接发往指定的分区
// 假如该值为null,则运用分区器进行分区的挑选
private final Integer partition;
private final Headers headers;
// 假如partition字段为null,则运用分区器进行分区挑选时会用到该key字段,该值可为空
private final K key;
private final V value;
private final Long timestamp;
默许
Kafka 中供给的默许分区器是 DefaultPartitioner
,它完结了Partitioner接口(用户可以完结这个接口来自定义分区器),其间的partition办法便是用来完结详细的分区分配逻辑:
- 假如在发音讯的时分指定了分区,则音讯投递到指定的分区。
- 假如没有指定分区,可是音讯的key不为空,则运用称之为
murmur
的Hash算法(非加密型Hash函数,具有高运算功能及低磕碰率)来核算分区分配。 - 假如既没有指定分区,且音讯的key也是空,则用轮询的办法挑选一个分区。
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 首要经过cluster从元数据中获取topic一切的分区信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 拿到该topic的分区数
int numPartitions = partitions.size();
// 假如音讯记载中没有指定key
if (keyBytes == null) {
// 则获取一个自增的值
int nextValue = nextValue(topic);
// 经过cluster拿到一切可用的分区(可用的分区这儿指的是该分区存在领袖副本)
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// 假如该topic存在可用的分区
if (availablePartitions.size() > 0) {
// 那么将nextValue转成正数之后对可用分区数进行取余
int part = Utils.toPositive(nextValue) % availablePartitions.size();
// 然后从可用分区中回来一个分区
return availablePartitions.get(part).partition();
} else { // 假如不存在可用的分区
// 那么就从一切不可用的分区中经过取余的办法回来一个不可用的分区
return Utils.toPositive(nextValue) % numPartitions;
}
} else { // 假如音讯记载中指定了key
// 则运用该key进行hash操作,然后对一切的分区数进行取余操作,这儿的hash算法选用的是murmur2算法,然后再转成正数
//toPositive办法很简略,直接将给定的参数与0X7FFFFFFF进行逻辑与操作。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
// nextValue办法可以理解为是在音讯记载中没有指定key的状况下,需求生成一个数用来替代key的hash值
// 办法便是最开端先生成一个随机数,之后在这个随机数的基础上每次恳求时均进行+1的操作
private int nextValue(String topic) {
// 每个topic都对应着一个计数
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) { // 假如是第一次,该topic还没有对应的计数
// 那么先生成一个随机数
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
// 然后将该随机数与topic对应起来存入map中
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
// 之后把这个随机数回来
counter = currentCounter;
}
}
// 一旦存入了随机数之后,后续的恳求均在该随机数的基础上+1之后进行回来
return counter.getAndIncrement();
}
自定义分区
public class MyParatitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
//key不能空,假如key为空的会经过轮询的办法 挑选分区
if(keyBytes == null || (!(key instanceof String))){
throw new RuntimeException("key is null");
}
//获取分区列表
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//以下是上述各种战略的完结,不能共存
//随机战略
return ThreadLocalRandom.current().nextInt(partitions.size());
//按音讯键保存战略
return Math.abs(key.hashCode()) % partitions.size();
//自定义分区战略, 比方key为123的音讯,挑选放入终究一个分区
if(key.toString().equals("123")){
return partitions.size()-1;
}else{
//不然随机
ThreadLocalRandom.current().nextInt(partitions.size());
}
}
@Override
public void close() {
}
}
//生成kafka producer客户端的时分指定该类就行:
props.put("partitioner.class", "kafkaconf.MyParatitioner"); //首要这个装备指定分区类 //properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.Mypartition.Mypartition");
......其他装备
val producer = new KafkaProducer[String, String](properties)
副本机制
在kafka中,每个主题可以有多个分区,每个分区又可以有多个副本。这多个副本中,只需一个是leader,而其他的都是follower副本。仅有leader副本可以对外供给服务。
多个follower副本一般存放在和leader副本不同的broker中。经过这样的机制完结了高可用,当某台机器挂掉后,其他follower副本也能敏捷”转正“,开端对外供给服务。
副本效果
在kafka中,完结副本的意图便是冗余备份,且仅仅是冗余备份,一切的读写恳求都是由leader副本进行处理的。follower副本仅有一个功能,那便是从leader副本拉取音讯,尽量让自己跟leader副本的内容一起
假如follower副本也对外供给服务那会怎么样呢?首要,功能是肯定会有所提高的。但一起,会呈现一系列问题。相似数据库事务中的幻读,脏读。
比方你现在写入一条数据到kafka主题a,顾客b从主题a消费数据,却发现消费不到,由于顾客b去读取的那个分区副本中,最新音讯还没写入。而这个时分,另一个顾客c却可以消费到最新那条数据,由于它消费了leader副本。
数据牢靠(Producer的ACK机制)
为确保producer发送的数据,能牢靠到指定topic,topic的每个的partition收到 producer发送的数据后,都需求向producer发送 ack(acknowledgement承认收到),假如 producer收到 ack,就会进行下一轮的发送。
问题:分区中现有一个leader副本节点和多个follower副本节点,出产者将音讯发送过来的时分,何时回来ack给出产者?
leader副本担任读与写,follower副本同步leader的数据。
计划1:leader和一切的follower都同步完结,才发送ack给出产者 计划2:leader+follower同步完结的数量过半,就发送ack给出产者
计划 | 长处 | 缺点 |
---|---|---|
半数以上flower完结同步,就发送ack | 推迟低(follower有块有慢,当半数以上follower完结,就过滤剩下的follower) | 推举新leader时,忍受n节点毛病,需求2n+1个副本 |
悉数follower完结同步,才发送ack | 推举新leader时,忍受n态节点毛病,需求n+1个副本 | 推迟高(同步快的需求等同步慢的,导致推迟高) |
Kafka 挑选了第二种计划,原因如下:
- 相同为了忍受n 台节点的毛病,第一种计划需求2n+1 个副本,而第二种计划只需求n+1个副本,而Kafka 的每个分区都有许多的数据,第一种计划会形成许多数据的冗余。
- 尽管第二种计划的网络推迟会比较高,但网络推迟对Kafka 的影响较小。
- 挑选计划二会或许呈现这种状况,leader+follower彻底同步时,假如有1个leader+4个follower,1个leader和3个follower都同步完结,1个follower同步超级慢或许挂掉,会影响回来或许不回来ack
ISR
选用第二种计划之后,想象以下情景:
leader 收到数据,一切follower 都开端同步数据,但有一个follower,由于某种毛病,迟迟不能与leader 进行同步,那leader 就要一向等下去,直到它完结同步,才干发送ack。这个问题怎么处理呢
Leader维护了一个动态的in-sync replica set (ISR-同步副本列表),意为和leader坚持同步的follower调集。依据follower发来的FETCH恳求中的fetch offset判别ISR中的follower完结数据同步是否成功。假如follower长时刻未向leader同步数据,则该follower将被踢出ISR,该时刻阈值由replica.lag.time.max.ms参数设定。Leader产生毛病之后,就会从ISR中推举新的leader。
- ISR(In-Sync Replicas ):与leader坚持同步的follower调集
- AR(Assigned Replicas):分区的一切副本
- ISR是由leader维护,follower从leader同步数据有一些推迟(包括推迟时刻replica.lag.time.max.ms和推迟条数replica.lag.max.messages两个维度, 当时最新的版别0.10.x中只支撑replica.lag.time.max.ms这个维度),任意一个超越阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新参加的follower也会先存放在OSR中。
- AR=ISR+OSR。
ACK
关于某些不太重要的数据,对数据的牢靠性要求不是很高,可以忍受数据的少量丢掉,所以没必要等ISR中的follower悉数接纳成功。所以Kafka为用户供给了三种牢靠性级别,用户依据对牢靠性和推迟的要求进行权衡,挑选以下的装备。
ACK有三个值,别离为-1 0 1
- 当ACK=0时,producer不等候broker的ack,这一操作供给了一个最低的推迟,broker一接纳到还没有写入磁盘就现已回来,当broker毛病时有或许丢掉数据;
- ACK=1时,producer等候broker的ack,partition的leader落盘成功后回来ack,假如在follower同步成功之前leader毛病,而由于现已回来了ack,体系默许新推举的leader现已有了数据,然后不会进行失利重试,那么将会丢掉数据
- ACK=-1,producer等候broker的ack,partition的leader和follower悉数落盘成功后才回来ack。可是假如在follower同步完结后,broker发送ack之前,leader产生毛病,导致没有回来ack给Producer,由于失利重试机制,又会给新推举出来的leader发送数据,形成数据重复
数据一起性
假定leader接受了producer传来的数据为20条,ISR中三台follower(f1,f2,f3)开端同步数据,由于网络传输,三台follower同步数据的速率不同。当f1同步了15条数据,f2同步了10条数据,f3同步了13条数据,此刻,leader突然挂掉,从ISR中选取了f2作为主节点,此刻leader-f2同步了10条,f1同步15,f3同步13,就会形成leader和follower之间数据不一起问题。
处理办法:
- HW (High Watermark)俗称高水位,它标识了一个特定的音讯偏移量(offset),顾客只能拉取到这个offset之前的音讯,关于同一个副本目标而言,其HW值不会大于LEO值。小于等于HW值的一切音讯都被以为是“已备份”的(replicated) 。
- LEO(Log End Offset),即日志末端位移(log end offset),记载了该副本底层日志(log)中下一条音讯的位移值。**留意是下一条音讯!**也便是说,假如LEO=10,那么标明该副本保存了10条音讯,位移值范围是[0, 9]。LEO 的巨细相当于当时日志分区中终究一条音讯的offset值加1,分区 ISR 调集中的每个副本都会维护本身的 LEO,,而 ISR 调集中最小的 LEO 即为分区的 HW,对顾客而言只能消费 HW 之前的音讯(便是途中黄色块)。
上图中,HW值是7,标明位移是0-7的一切音讯都现已处于“已备份状况”(committed),而LEO值是15,那么8~14的音讯便是没有彻底备份(fully replicated)——为什么没有15?由于方才说过了,LEO指向的是下一条音讯到来时的位移,故上图运用虚线框标明。咱们总说consumer无法消费未提交音讯。这句话假如用以上名词来解读的话,应该表述为:consumer无法消费分区下leader副本中位移值大于分区HW的任何音讯。这儿需求特别留意分区HW便是leader副本的HW值。
follower副本何时更新LEO
follower副本仅仅被动地向leader副本恳求数据,详细表现为follower副本不停地向leader副本地点的broker发送FETCH恳求,一旦获取音讯后写入自己的日志中进行备份。那么follower副本的LEO是何时更新的呢?首要,Kafka有两套follower副本LEO:1. 一套LEO保存在follower副本地点broker的副本办理机中;2. 另一套LEO保存在leader副本地点broker的副本办理机中——换句话说,leader副本机器上保存了一切的follower副本的LEO。
为什么要保存两套?这是由于Kafka运用前者协助follower副本更新其HW值;而运用后者协助leader副本更新其HW运用
- follower副本端的follower副本LEO何时更新
follower副本端的LEO值便是其底层日志的LEO值,也便是说每当新写入一条音讯,其LEO值就会被更新(相似于LEO += 1)。当follower发送FETCH恳求后,leader将数据回来给follower,此刻follower开端向底层log写数据,然后自动地更新LEO值
- leader副本端的follower副本LEO何时更新?
leader副本端的follower副本LEO的更新产生在leader在处理follower FETCH恳求时。一旦leader接纳到follower发送的FETCH恳求,它首要会从自己的log中读取相应的数据,可是在给follower回来数据之前它先去更新follower的LEO(即上面所说的第二套LEO)
follower副本何时更新HW
follower更新HW产生在其更新LEO之后,一旦follower向log写完数据,它会测验更新它自己的HW值。详细算法便是比较当时LEO值与FETCH呼应中leader的HW值,取两者的小者作为新的HW值。这奉告咱们一个现实:假如follower的LEO值超越了leader的HW值,那么follower HW值是不会跳过leader HW值的。
leader副本何时更新LEO
和follower更新LEO道理相同,leader写log时就会自动地更新它自己的LEO值
leader副本何时更新HW值
前面说过了,leader的HW值便是分区HW值,因而何时更新这个值是咱们最关怀的,由于它直接影响了分区数据关于consumer的可见性 。以下4种状况下leader会测验去更新分区HW——切记是测验,有或许由于不满意条件而不做任何更新:
- 副本成为leader副本时:当某个副本成为了分区的leader副本,Kafka会测验去更新分区HW,这个副本的状况是一定要查看的!不过,本文评论的是当体系稳定后且正常作业时备份机制或许呈现的问题,故这个条件不在咱们的评论之列。
- broker呈现溃散导致副本被踢出ISR时:若有broker溃散则必须查看下是否会波及此分区,因而查看下分区HW值是否需求更新是有必要的。本文不对这种状况做深化评论
- producer向leader副本写入音讯时:由于写入音讯会更新leader的LEO,故有必要再查看下HW值是否也需求修正
- leader处理follower FETCH恳求时:当leader处理follower的FETCH恳求时首要会从底层的log读取数据,之后会测验更新分区HW值
特别留意上面4个条件中的终究两个。它提醒了一个现实——当Kafka broker都正常作业时,分区HW值的更新时机有两个:leader处理PRODUCE恳求时和leader处理FETCH恳求时。别的,leader是怎么更新它的HW值的呢?前面说过了,leader broker上保存了一套follower副本的LEO以及它自己的LEO。当测验承认分区HW时,它会选出一切满意条件的副本,比较它们的LEO(当然也包括leader自己的LEO),并挑选最小的LEO值作为HW值。这儿的满意条件首要是指副本要满意以下两个条件之一:
- 处于ISR中
- 副本LEO落后于leader LEO的时长不大于replica.lag.time.max.ms参数值(默许是10s)
乍看上去好像这两个条件说得是一回事,究竟ISR的定义便是第二个条件描绘的那样。但某些状况下Kafka确实或许呈现副本现已“追上”了leader的进展,但却不在ISR中——比方某个从failure中康复的副本。假如Kafka只判别第一个条件的话,承认分区HW值时就不会考虑这些未在ISR中的副本,但这些副本现已具有了“马上进入ISR”的资格,因而就或许呈现分区HW值跳过ISR中副本LEO的状况——这肯定是不允许的,由于分区HW实践上便是ISR中一切副本LEO的最小值。
举例
咱们假定有一个topic,单分区,副本因子是2,即一个leader副本和一个follower副本。当producer发送一条音讯时,broker端的副本到底会产生什么事情以及分区HW是怎么被更新的
下图是初始状况,
- 初始时leader和follower的HW和LEO都是0(严格来说源代码会初始化LEO为-1,不过这不影响之后的评论);
- leader中的remote LEO指的便是leader端保存的follower LEO,也被初始化成0。此刻,producer没有发送任何音讯给leader,而follower现已开端不断地给leader发送FETCH恳求了,但由于没有数据因而什么都不会产生。
- follower发送过来的FETCH恳求由于无数据而暂时会被存放到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强制完结。倘若在存放期间producer端发送过来数据,那么会Kafka会自动唤醒该FETCH恳求,让leader持续处理之。
尽管purgatory不是本文的重点,但FETCH恳求发送和PRODUCE恳求处理的时时机影响咱们的评论。因而后续咱们也将分两种状况来评论分区HW的更新。
第一种状况
follower发送FETCH恳求在leader处理完produce恳求之后
producer给该topic分区发送了一条音讯。此刻的状况如下图所示:
如图所示,leader接纳到produce恳求首要做两件事情:
- 把音讯写入写底层log(一起也就自动地更新了leader的LEO)
- 测验更新leader HW值(前面leader副本何时更新HW值一节中的第三个条件触发)。咱们现已假定此刻follower没有发送FETCH恳求,那么leader端保存的remote LEO仍然是0,因而leader会比较它自己的LEO值和remote LEO值,发现最小值是0,与当时HW值相同,故不会更新分区HW值
所以,PRODUCE恳求处理完结后leader端的HW值仍然是0,而LEO是1,remote LEO是0。假定此刻follower发送了FETCH恳求(或许说follower早已发送了FETCH恳求,只不过在broker的恳求行列中排队),那么状况改动如下图所示:
本例中当follower发送FETCH恳求时,leader端的处理顺次是:
- 读取底层log数据
- 更新remote LEO = 0(为什么是0? 由于此刻follower还没有写入这条音讯。leader怎么承认follower还未写入呢?这是经过follower发来的FETCH恳求中的fetch offset来承认的)
- 测验更新分区HW——此刻leader LEO = 1,remote LEO = 0,故分区HW值= min(leader LEO, follower remote LEO) = 0
- 把数据和当时分区HW值(仍然是0)发送给follower副本
而follower副本接纳到FETCH response后顺次履行下列操作:
- 写入本地log(一起更新follower LEO)
- 更新follower HW——比较本地LEO和当时leader HW取小者,故follower HW = 0
此刻,第一轮FETCH RPC完毕,咱们会发现尽管leader和follower都现已在log中保存了这条音讯,但分区HW值没有被更新,实践上,它是在第二轮FETCH RPC中被更新的,如下图所示:
上图中,follower发来了第二轮FETCH恳求,leader端接纳到后仍然会顺次履行下列操作:
- 读取底层log数据
- 更新remote LEO = 1(这次为什么是1了? 由于这轮FETCH RPC带着的fetch offset是1,那么为什么这轮带着的便是1了呢,由于上一轮完毕后follower LEO被更新为1了)
- 测验更新分区HW——此刻leader LEO = 1,remote LEO = 1,故分区HW值= min(leader LEO, follower remote LEO) = 1。留意分区HW值此刻被更新了!!!
- 把数据(实践上没有数据)和当时分区HW值(已更新为1)发送给follower副本
相同地,follower副本接纳到FETCH response后顺次履行下列操作:
- 写入本地log,当然没东西可写,故follower LEO也不会改动,仍然是1
- 更新follower HW——比较本地LEO和当时leader LEO取小者。由于此刻两者都是1,故更新follower HW = 1
producer端发送音讯后broker端完好的处理流程就讲完了。此刻音讯现已成功地被仿制到leader和follower的log中且分区HW是1,标明consumer可以消费offset = 0的这条音讯。下面咱们来分析下PRODUCE和FETCH恳求交互的第二种状况。
第二种状况
FETCH恳求保存在purgatory中PRODUCE恳求到来
这种状况实践上和第一种状况差不多。前面说过了,当leader无法立即满意FECTH回来要求的时分(比方没有数据),那么该FETCH恳求会被暂存到leader端的purgatory中,待时机成熟时会测验再次处理它。不过Kafka不会无限期地将其缓存着,默许有个超时时刻(500ms),一旦超时时刻已过,则这个恳求会被强制完结。不过咱们要评论的场景是在存放期间,producer发送PRODUCE恳求然后使之满意了条件然后被唤醒。此刻,leader端处理流程如下:
- leader写入本地log(一起自动更新leader LEO)
- 测验唤醒在purgatory中存放的FETCH恳求
- 测验更新分区HW
至于唤醒后的FETCH恳求的处理与第一种状况彻底一起,故这儿不做详细展开了。
以上一切的东西其实就想阐明一件事情:Kafka运用HW值来决议副本备份的进展,而HW值的更新一般需求额定一轮FETCH RPC才干完结,故而这种规划是有问题的。它们或许引起的问题包括:
- 备份数据丢掉
- 备份数据不一起
数据丢掉
如前所述,运用HW值来承认备份进展时,其值的更新是在下一轮RPC中完结的。
假如follower副本在 follower副本接纳到FETCH response后履行操作的第一步与第二步之间产生溃散,那么就有或许形成数据的丢掉。咱们举个比如来看下。
上图中有两个副本:A和B。开端状况是A是leader。咱们假定producer端min.insync.replicas设置为1,那么当producer发送两条音讯给A后,A写入到底层log,此刻Kafka会告诉producer说这两条音讯写入成功。
可是在broker端,leader和follower底层的log虽都写入了2条音讯且分区HW现已被更新到2,但follower HW没有被更新(也便是操作的第二步没有履行)。倘若此刻副本B地点的broker宕机,那么重启回来后B会自动把LEO调整到之前的HW值,故副本B会做日志切断(log truncation),将offset = 1的那条音讯从log中删去,并调整LEO = 1,此刻follower副本底层log中就只需一条音讯,即offset = 0的音讯。
B重启之后需求给A发FETCH恳求,但若A地点broker机器在此刻宕机,那么Kafka会令B成为新的leader,而当A重启回来后也会履行日志切断,将HW调整回1。这样,位移=1的音讯就从两个副本的log中被删去,即永远地丢掉了。
这个场景丢掉数据的前提是在min.insync.replicas=1时,一旦音讯被写入leader端log即被以为是“已提交”,而推迟一轮FETCH RPC更新HW值的规划使得follower HW值是异步推迟更新的,倘若在这个进程中leader产生改动,那么成为新leader的follower的HW值就有或许是过期的,使得clients端以为是成功提交的音讯被删去。
leader/follower数据离散
除了或许形成的数据丢掉以外,这种规划还有一个潜在的问题,即形成leader端log和follower端log的数据不一起。比方leader端保存的记载序列是r1,r2,r3,r4,r5,….;而follower端保存的序列或许是r1,r3,r4,r5,r6…。这也是不合法的场景,由于望文生义,follower必须追随leader,完好地备份leader端的数据。
咱们仍然运用一张图来阐明这种场景是怎么产生的:
这种状况的初始状况与状况1有一些不同的:A仍然是leader,A的log写入了2条音讯,但B的log只写入了1条音讯。分区HW更新到2,但B的HW仍是1,一起producer端的min.insync.replicas = 1。
这次咱们让A和B地点机器一起挂掉,然后假定B先重启回来,因而成为leader,分区HW = 1。假定此刻producer发送了第3条音讯(绿色框标明)给B,所以B的log中offset = 1的音讯变成了绿色框标明的音讯,一起分区HW更新到2(A还没有回来,就B一个副本,故可以直接更新HW而不必理睬A)之后A重启回来,需求履行日志切断,但发现此刻分区HW=2而A之前的HW值也是2,故不做任何调整。此后A和B将以这种状况持续正常作业。
显然,这种场景下,A和B底层log中保存在offset = 1的音讯是不同的记载,然后引发不一起的情形呈现。
处理计划
Kafka 0.11.0.0.版别处理计划
形成上述两个问题的根本原因在于HW值被用于衡量副本备份的成功与否以及在呈现failture时作为日志切断的依据,但HW值的更新是异步推迟的,特别是需求额定的FETCH恳求处理流程才干更新,故这中心产生的任何溃散都或许导致HW值的过期。鉴于这些原因,Kafka 0.11引进了leader epoch来取代HW值。Leader端多拓荒一段内存区域专门保存leader的epoch信息,这样即使呈现上面的两个场景也能很好地躲避这些问题。
所谓leader epoch实践上是一对值:(epoch,offset)。epoch标明leader的版别号,从0开端,当leader改动过1次时epoch就会+1,而offset则对应于该epoch版别的leader写入第一条音讯的位移。因而假定有两对值:
(0, 0) (1, 120)
标明第一个leader从位移0开端写入音讯;共写了120条[0, 119];而第二个leader版别号是1,从位移120处开端写入音讯。
leader broker中会保存这样的一个缓存,并定时地写入到一个checkpoint文件中。
当leader写底层log时它会测验更新整个缓存——假如这个leader初次写音讯,则会在缓存中添加一个条目;不然就不做更新。而每次副本从头成为leader时会查询这部分缓存,获取出对应leader版别的位移,这就不会产生数据不一起和丢掉的状况。
躲避数据丢掉
上图左半边现已给出了扼要的流程描绘,这儿不详细展开详细的leader epoch完结细节(比方OffsetsForLeaderEpochRequest的完结),咱们只需求知道每个副本都引进了新的状况来保存自己当leader时开端写入的第一条音讯的offset以及leader版别。这样在康复的时分彻底运用这些信息而非水位来判别是否需求切断日志。
躲避数据不一起
相同的道理,依靠leader epoch的信息可以有用地躲避数据不一起的问题。
0.11.0.0版别的Kafka经过引进leader epoch处理了原先依靠水位标明副本进展或许形成的数据丢掉/数据不一起问题。
源代码方位:kafka.server.epoch.LeaderEpochCache.scala (leader epoch数据结构)、kafka.server.checkpoints.LeaderEpochCheckpointFile(checkpoint查看点文件操作类)还有散布在Log中的CRUD操作。
推举机制
ISR
- leader会追寻和维护ISR中一切follower的滞后状况。假如滞后太多(时刻滞后replica.lag.time.max.ms可装备),leader会把该replica从ISR中移除。被移除ISR的replica一向在追逐leader。leader写入数据后并不会commit,只需ISR列表中的一切folower同步之后才会commit,把滞后的follower移除ISR首要是防止写音讯推迟。设置ISR首要是为了broker宕掉之后,从头推举partition的leader从ISR列表中挑选。
leader
- 在kafka集群中有2个种leader,一种是broker的leader即controller leader,还有一种便是partition的leader,下面介绍一下2种leader的推举大致流程。
Controller leader
所谓操控器便是一个Borker,在一个kafka集群中,有多个broker节点,可是它们之间需求推举出一个leader,其他的broker充任follower人物。集群中第一个发动的broker会经过在zookeeper中创立暂时节点/controller
来让自己成为操控器,其他broker发动时也会在zookeeper中创立暂时节点,可是发现节点现已存在,所以它们会收到一个反常,意识到操控器现已存在,那么就会在zookeeper中创立watch目标,便于它们收到操控器改动的告诉。
那么假如操控器由于网络原因与zookeeper断开衔接或许反常退出,那么其他broker经过watch收到操控器改动的告诉,就会去测验创立暂时节点/controller
,假如有一个broker创立成功,那么其他broker就会收到创立反常告诉,也就意味着集群中现已有了操控器,其他broker只需创立watch目标即可。
假如集群中有一个broker产生反常退出了,那么操控器就会查看这个broker是否有分区的副本leader,假如有那么这个分区就需求一个新的leader,此刻操控器就会去遍历其他副本,决议哪一个成为新的leader,一起更新分区的ISR调集。
假如有一个broker参加集群中,那么操控器就会经过Broker ID去判别新参加的broker中是否含有现有分区的副本,假如有,就会从分区副本中去同步数据。
集群中每推举一次操控器,就会经过zookeeper创立一个controller epoch
,每一个推举都会创立一个更大,包括最新信息的epoch
,假如有broker收到比这个epoch
旧的数据,就会忽略它们,kafka也经过这个epoch
来防止集群产生“脑裂”。
分区副本推举机制
分区leader副本的推举由Kafka Controller (Controller leader)担任详细施行。当创立分区(创立主题或添加分区都有创立分区的动作)或分区上线(比方分区中原先的leader副本下线,此刻分区需求推举一个新的leader上线来对外供给服务)的时分都需求履行leader的推举动作
在kafka的集群中,会存在着多个主题topic,在每一个topic中,又被划分为多个partition,为了防止数据不丢掉,每一个partition又有多个副本,在整个集群中,总共有三种副本人物:
- 领袖副本(leader):也便是leader主副本,每个分区都有一个领袖副本,为了确保数据一起性,一切的出产者与顾客的恳求都会经过该副本来处理。
- 跟随者副本(follower):除了领袖副本外的其他一切副本都是跟随者副本,跟随者副本不处理来自客户端的任何恳求,只担任从领袖副本同步数据,确保与领袖坚持一起。假如领袖副本产生溃散,就会从这其间推举出一个leader。
- 首选leader副本:创立分区时指定的首选领袖。假如不指定,则为分区的第一个副本。
follower需求从leader中同步数据,可是由于网络或许其他原因,导致数据堵塞,呈现不一起的状况,为了防止这种状况,follower会向leader发送恳求信息,这些恳求信息中包括了follower需求数据的偏移量offset,而且这些offset是有序的。
假如有follower向leader发送了恳求1,接着发送恳求2,恳求3,那么再发送恳求4,这时就意味着follower现已同步了前三条数据,不然不会发送恳求4。leader经过盯梢 每一个follower的offset来判别它们的仿制进展。
默许的,假如follower与leader之间超越10s内没有发送恳求,或许说没有收到恳求数据,此刻该follower就会被以为“不同步副本”。而持续恳求的副本便是“同步副本”,当leader产生毛病时,只需“同步副本”才干够被推举为leader。其间的恳求超时时刻可以经过参数replica.lag.time.max.ms
参数来装备。
咱们希望每个分区的leader可以散布到不同的broker中,尽或许的达到负载均衡,所以会有一个首选领袖,假如咱们设置参数auto.leader.rebalance.enable
为true,那么它会查看首选领袖是否是真实的领袖,假如不是,则会触发推举,让首选领袖成为领袖。
根本思路是依照AR调集中副本的次序查找第一个存活的副本,而且这个副本在ISR调集中。一个分区的AR调集在分配的时分就被指定,而且只需不产生重分配的状况,调集内部副本的次序是坚持不变的,而分区的ISR调集中副本的次序或许会改动。留意这儿是依据AR的次序而不是ISR的次序进行推举的。
还有一些状况也会产生分区leader的推举,比方当分区进行重分配(reassign)的时分也需求履行leader的推举动作。这个思路比较简略:从重分配的AR列表中找到第一个存活的副本,且这个副本在现在的ISR列表中。
再比方当产生优先副本(preferred replica partition leader election)的推举时,直接将优先副本设置为leader即可,AR调集中的第一个副本即为优先副本。
还有一种状况便是当某节点被优雅地关闭(也便是履行ControlledShutdown)时,坐落这个节点上的leader副本都会下线,所以与此对应的分区需求履行leader的推举。这儿的详细思路为:从AR列表中找到第一个存活的副本,且这个副本在现在的ISR列表中,与此一起还要确保这个副本不处于正在被关闭的节点上。
leader产生毛病今后,会从ISR中选出一个新leader,为了确保多个副本之间的同步性和数据一起性,其他follower会将log文件中高于HW的部分截掉,然后从leader中同步数据
比如
分区中的leader挂掉后,需求从ISR的follower副本中推举出新的leader
图示HW为9,顾客只能看到0~9
假如leader挂掉了,推举follower2为leader,那么以新leader告诉其他节点以HW 9为基准,超越HW的部分需求截取掉,leader本身的(LEO为11 大于HW 9)不必截取掉。然后从新的leader开端同步,这个只会确保副本之间的数据一起性,并不能确保数据不丢掉或许不重复
消费组选主
在kafka的消费端,会有一个顾客和谐器以及消费组,组和谐器GroupCoordinator
需求为消费组内的顾客推举出一个消费组的leader,那么怎么推举的呢?
假如消费组内还没有leader,那么第一个参加消费组的顾客即为消费组的leader,假如某一个时刻leader顾客由于某些原因退出了消费组,那么就会从头推举leader,怎么推举?
private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption
12
上面代码是kafka源码中的部分代码,member是一个hashmap的数据结构,key为顾客的member_id
,value是元数据信息,那么它会将leaderId推举为Hashmap中的第一个键值对,它和随机根本没啥差异。
关于整个推举算法的概况需求先了解Raft推举算法,kafka是依据该算法来完结leader推举的。可以参考文章【散布式一起性协议:Raft算法详解】。
每个Kafka副本目标都有两个重要的属性:LEO和HW。留意是一切的副本,而不仅仅leader副本。
Consumer 顾客
消费办法
push
push形式很难习惯不同消费速率的顾客,音讯发送速率是由broker决议的,broker会以最快的速度传递音讯,可是会形成consumer来不及处理音讯,会导致拒绝服务以及网络拥堵
pull
consumer选用pull形式从broker拉数据
不足之处便是,假如kafka没数据,顾客会陷入循环,一向回来空数据,所以kafka在消费数据的时分会传入一个时长参数timeout,假如当时没有数据,consumer会等一段时刻再回来,这个时长是timeout
顾客组
- 同一时刻,一条音讯只能被组中的一个顾客实例消费
- 顾客组订阅这个主题,意味着主题下的一切分区都会被组中的顾客消费到,假如依照从属联系来说的话便是,主题下的每个分区只从属于组中的一个顾客,不或许呈现组中的两个顾客担任同一个分区。
- 假如分区数大于或许等于组中的顾客实例数,那自然没有什么问题,无非一个顾客会担任多个分区,(PS:当然,最理想的状况是二者数量持平,这样就相当于一个顾客担任一个分区);
- 假如顾客实例的数量大于分区数,那么依照默许的战略(PS:之所以着重默许战略是由于你也可以自定义战略),有一些顾客是多余的,一向接不到音讯而处于空闲状况。
- 假如分区数目大于顾客数目,即多个顾客担任同一个分区,那么会有什么问题? 咱们知道,Kafka在规划的时分便是要确保分区下音讯的次序,也便是说音讯在一个分区中的次序是怎样的,那么顾客在消费的时分看到的便是什么样的次序,那么要做到这一点就首要要确保音讯是由顾客自动拉取的(pull),其次还要确保一个分区只能由一个顾客担任。倘若,两个顾客担任同一个分区,那么就意味着两个顾客一起读取分区的音讯,由于顾客自己可以操控读取音讯的offset,就有或许C1才读到2,而C1读到1,C1还没处理完,C2现已读到3了,则会形成许多糟蹋,由于这就相当于多线程读取同一个音讯,会形成音讯处理的重复,且不能确保音讯的次序,这就跟自动推送(push)无异。
顾客组
- consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
- group.id是一个字符串,仅有标识一个consumer group
- consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)
顾客方位
顾客在消费的进程中需求记载自己消费了多少数据,即消费方位信息。在Kafka中这个方位信息有个专门的术语:位移(offset)。
许多音讯引擎都把这部分信息保存在服务器端(broker端)。这样做的好处当然是完结简略,但会有三个首要的问题:
- broker从此变成有状况的,会影响伸缩性;
- 需求引进应对机制(acknowledgement)来承认消费成功。
- 由于要保存许多consumer的offset信息,必然引进杂乱的数据结构,形成资源糟蹋。而Kafka挑选了不同的办法:每个consumer group保存自己的位移信息,那么只需求简略的一个整数标明方位就够了;一起可以引进checkpoint机拟定时耐久化,简化了应对机制的完结。
位移办理(offset management)
自动VS手动
Kafka默许是定时帮你自动提交位移的(enable.auto.commit = true),你当然可以挑选手动提交位移完结自己操控。别的kafka会定时把group消费状况保存起来,做成一个offset map,如下图所示:
上图中标明晰test-group这个组当时的消费状况。
//设置自动提交
props.put("enable.auto.commit", "true");
位移提交 — 逐渐削减与zk的耦合
老版别的位移是提交到zookeeper中的,目录结构是:/consumers/<group.id>/offsets/<topic><partitionId>,可是zookeeper并不合适进行大批量的读写操作,尤其是写操作。因而kafka供给了另一种处理计划:添加__consumeroffsets topic,将offset信息写入这个topic,脱节对zookeeper的依靠(指保存offset这件事情)。__consumer_offsets中的音讯保存了每个consumer group某一时刻提交的offset信息。仍然以上图中的consumer group为例,格局大约如下:
__consumers_offsets topic装备了compact战略,使得它总是可以保存最新的位移信息,既操控了该topic整体的日志容量,也能完结保存最新offset的意图。compact的详细原理请拜见:Log Compaction
kafka读取__consumers_offsets内容
由于Zookeeper并不合适大批量的频繁写入操作,新版Kafka已引荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,而且默许供给了kafka_consumer_groups.sh脚本供用户查看consumer信息
#创立topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3
#由于默许没有指定key,所以依据round-robin办法,音讯散布到不同的分区上
#创立consumer group
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test --from-beginning --new-consumer
#获取group id
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list --new-consumer
#输出 console-consumer-46965
s
#查询__consumer_offsets topic一切内容
#留意:运转下面指令前先要在consumer.properties中设置exclude.internal.topics=false
#0.11.0.0之前版别
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
#0.11.0.0之后版别(含)
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
默许状况下__consumer_offsets有50个分区,假如你的体系中consumer group也许多的话,那么这个指令的输出成果会许多。
核算指定consumer group在__consumer_offsets topic中分区信息,这时分就用到了之前获取的group.id(本例中是console-consumer-46965)。
Kafka会运用下面公式核算该group位移保存在__consumer_offsets的哪个分区上:
Math.abs(groupID.hashCode()) % numPartitions
所以在本例中,对应的分区=Math.abs(“console-consumer-46965”.hashCode()) % 50 = 11,即__consumer_offsets的分区11保存了这个consumer group的位移信息
#获取指定consumer group的位移信息
#0.11.0.0版别之前
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
#0.11.0.0版别今后(含)
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
精准一次性消费
手动办理offset
www.cnblogs.com/yn-huang/p/…
分区战略
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
假如是自定义分配战略的话可以承继AbstractPartitionAssignor这个类,它默许有3个完结,对应三种分区战略
range
range战略对应的完结类是org.apache.kafka.clients.consumer.RangeAssignor
这是默许的分配战略
可以经过顾客装备中partition.assignment.strategy参数来指定分配战略,它的值是类的全途径,是一个数组
/**
* The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
* and the consumers in lexicographic order. We then divide the number of partitions by the total number of
* consumers to determine the number of partitions to assign to each consumer. If it does not evenly
* divide, then the first few consumers will have one extra partition.
*
* For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
* resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
*
* The assignment will be:
* C0: [t0p0, t0p1, t1p0, t1p1]
* C1: [t0p2, t1p2]
*/
range战略是依据每个主题的,关于每个主题,咱们以数字次序排列可用分区,以字典次序排列顾客。然后,将分区数量除以顾客总数,以承认分配给每个顾客的分区数量。假如没有平均划分(PS:除不尽),那么开端的几个顾客将有一个额定的分区。
- range分配战略针对的是主题(PS:这儿所说的分区指的某个主题的分区,顾客指的是订阅这个主题的顾客组中的顾客实例)
- 首要,将分区按数字次序排行序,顾客按顾客称号的字典序排好序
- 然后,用分区总数除以顾客总数。假如可以除尽,则皆大欢喜,平均分配;若除不尽,则坐落排序前面的顾客将多担任一个分区
比如
假定有两个顾客C0和C1,两个主题t0和t1,而且每个主题有3个分区,分区的状况是这样的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那么,依据以上信息,终究顾客分配分区的状况是这样的:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
关于主题t0,分配的成果是C0担任P0和P1,C1担任P2;关于主题t2,也是如此
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
// 主题与顾客的映射
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>());
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey(); // 主题
List<String> consumersForTopic = topicEntry.getValue(); // 顾客列表
// partitionsPerTopic标明主题和分区数的映射
// 获取主题下有多少个分区
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
// 顾客按字典序排序
Collections.sort(consumersForTopic);
// 分区数量除以顾客数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 取模,余数便是额定的分区
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
// 分配分区
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
roundrobin(轮询)
roundronbin分配战略的详细完结是org.apache.kafka.clients.consumer.RoundRobinAssignor
轮询分配战略是依据一切可用的顾客和一切可用的分区的,与前面的range战略最大的不同便是它不再局限于某个主题
假如一切的顾客实例的订阅都是相同的,那么这样最好了,可用统一分配,均衡分配
例如,假定有两个顾客C0和C1,两个主题t0和t1,每个主题有3个分区,别离是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那么,终究分配的成果是这样的:
C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
用图形标明大约是这样的:
rebalance
- Rebalance
rebalance本质上是一种协议,规定了一个consumer group下的一切consumer怎么达到一起来分配订阅topic的每个分区。比方某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常状况下,Kafka平均会为每个consumer分配5个分区。这个分配的进程就叫rebalance。
rebalance时机
- 组成员产生改动(新consumer参加组、已有consumer自动脱离组或已有consumer溃散了)
- 订阅主题数产生改动——假如运用了正则表达式的办法进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
- 订阅主题的分区数产生改动
- consumer调用unsubscrible(),取消topic的订阅
rebalance分区分配
之前提到了group下的一切consumer都会和谐在一起一起参加分配,这是怎么完结的?Kafka新版别consumer默许供给了两种分配战略:range和round-robin。当然Kafka选用了可插拔式的分配战略,你可以创立自己的分配器以完结不同的分配战略。实践上,由于现在range和round-robin两种分配器都有一些弊端,Kafka社区现已提出第三种分配器来完结更加公正的分配战略,仅仅现在还在开发中。咱们这儿只需求知道consumer group默许现已帮咱们把订阅topic的分区分配作业做好了就行了。
简略举个比如,假定现在某个consumer group下有两个consumer: A和B,当第三个成员参加时,kafka会触发rebalance并依据默许的分配战略从头为A、B和C分配分区,如下图所示:
履行rebalance
Kafka供给了一个人物:coordinator来履行关于consumer group的办理。坦率说kafka关于coordinator的规划与修正是一个很长的故事。最新版别的coordinator也与开端的规划有了很大的不同。这儿只提及两次比较大的改动
首要是0.8版别的coordinator,那时分的coordinator是依靠zookeeper来完结关于consumer group的办理的。Coordinator监听zookeeper的/consumers/<group>/ids的子节点改动以及/brokers/topics/<topic>数据改动来判别是否需求进行rebalance。
group下的每个consumer都自己决议要消费哪些分区,并把自己的决议抢先在zookeeper中的/consumers/<group>/owners/<topic>/<partition>下注册。很明显,这种计划要依靠于zookeeper的协助,而且每个consumer是独自做决议的,没有那种“我们属于一个组,要洽谈做事情”的精神。
0.9版别的kafka改进了coordinator的规划,提出了group coordinator——每个consumer group都会被分配一个这样的coordinator用于组办理和位移办理。
这个group coordinator比本来承当了更多的责任,比方组成员办理、位移提交维护机制等。当新版别consumer group的第一个consumer发动的时分,它会去和kafka server承认谁是它们组的coordinator。之后该group内的一切成员都会和该coordinator进行和谐通讯。这种coordinator规划不再需求zookeeper了,功能上可以得到很大的提高。
怎么承认coordinator
- 承认consumer group位移信息写入__consumers_offsets的哪个分区。详细核算公式:
- __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
- groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默许是50个分区。
- 该分区leader地点的broker便是被选定的coordinator
Rebalance Generation
JVM GC的分代搜集便是这个词(严格来说是generational),它标明晰rebalance之后的一届成员,首要是用于维护consumer group,隔离无效offset提交的。
比方上一届的consumer成员是无法提交位移到新一届的consumer group中。咱们有时分可以看到ILLEGAL_GENERATION的过错,便是kafka在诉苦这件事情。每次group进行rebalance之后,generation号都会加1,标明group进入到了一个新的版别,如下图所示: Generation 1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入Generation 2,之后成员4参加,再次触发rebalance,group进入Generation 3
协议(protocol)
前面说过了, rebalance本质上是一组协议。group与coordinator一起运用它来完结group的rebalance。现在kafka供给了5个协议来处理与consumer group coordination相关的问题:
- Heartbeat恳求:consumer需求定时给coordinator发送心跳来标明自己还活着
- LeaveGroup恳求:自动奉告coordinator我要脱离consumer group
- SyncGroup恳求:group leader把分配计划奉告组内一切成员
- JoinGroup恳求:成员恳求参加组
- DescribeGroup恳求:显现组的一切信息,包括成员信息,协议称号,分配计划,订阅信息等。一般该恳求是给办理员运用
Coordinator在rebalance的时分首要用到了前面4种恳求。
liveness
consumer怎么向coordinator证明自己还活着?
经过定时向coordinator发送Heartbeat恳求。假如超越了设定的超时时刻,那么coordinator就以为这个consumer现已挂了。一旦coordinator以为某个consumer挂了,那么它就会敞开新一轮rebalance,而且在当时其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,奉告其他consumer:不好意思各位,你们从头申请参加组吧!
Rebalance进程
整体而言,rebalance分为2步:Join和Sync
- Join, 望文生义便是参加组。这一步中,一切成员都向coordinator发送JoinGroup恳求,恳求入组。一旦一切成员都发送了JoinGroup恳求,coordinator会从中挑选一个consumer担任leader的人物,并把组成员信息以及订阅信息发给leader——留意leader和coordinator不是一个概念,leader担任消费分配计划的拟定。
- Sync,这一步leader开端分配消费计划,即哪个consumer担任消费哪些topic的哪些partition。一旦完结分配,leader会将这个计划封装进SyncGroup恳求中发给coordinator,非leader也会发SyncGroup恳求,仅仅内容为空。coordinator接纳到分配计划之后会把计划塞进SyncGroup的response中发给各个consumer。这样组内的一切成员就都知道自己应该消费哪些分区了。
首要是参加组的进程:
在coordinator搜集到一切成员恳求前,它会把已收到恳求放入一个叫purgatory(炼狱)的当地。 然后是分发分配计划的进程,即SyncGroup恳求:
consumer group状况机
和许多kafka组件一样,group也做了个状况机来标明组状况的流转。coordinator依据这个状况时机对consumer group做不同的处理,如下图所示
简略阐明下图中的各个状况:
- Dead:组内现已没有任何成员的终究状况,组的元数据也现已被coordinator移除了。这种状况呼应各种恳求都是一个response: UNKNOWN_MEMBER_ID
- Empty:组内无成员,可是位移信息还没有过期。这种状况只能呼应JoinGroup恳求
- PreparingRebalance:组准备敞开新的rebalance,等候成员参加
- AwaitingSync:正在等候leader consumer将分配计划传给各个成员
- Stable:rebalance完结!可以开端消费了
rebalance场景分析
新成员参加组(member join)
组成员溃散(member failure)
前面说过了,组成员溃散和组成员自动脱离是两个不同的场景。由于在溃散时成员并不会自动地奉告coordinator此事,coordinator有或许需求一个完好的session.timeout周期才干检测到这种溃散,这必然会形成consumer的滞后。可以说脱离组是自动地建议rebalance;而溃散则是被动地建议rebalance
组成员自动离组(member leave group)
提交位移(member commit offset)
防止不必要的Rebalance
除掉consumer正常的添加和停掉导致rebalance外,在某些状况下,Consumer 实例会被 Coordinator 过错地以为 “已停止” 然后被“踢出”Group,导致rebalance,这种状况应该防止
状况一:
未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的
- 设置 session.timeout.ms = 6s。
- 设置 heartbeat.interval.ms = 2s。
- 要确保 Consumer 实例在被判定为 “dead” 之前,可以发送至少 3 轮的心跳恳求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
状况二:
Consumer 消费时刻过长导致的
max.poll.interval.ms 参数值的设置显得尤为关键。假如要防止非预期的 Rebalance,最好将该参数值设置得大一点,比你的下流最大处理时刻稍长一点
rebalance影响
-
或许重复消费: Consumer被踢出消费组,或许还没有提交offset,Rebalance时会Partition从头分配其它Consumer,会形成重复消费,虽有幂等操作但消耗消费资源,亦添加集群压力
-
集群不稳定:Rebalance扩散到整个ConsumerGroup的一切顾客,由于一个顾客的退出,导致整个Group进行了Rebalance,并在一个比较慢的时刻内达到稳定状况,影响面较大
-
影响消费速度:频繁的Rebalance反而降低了音讯的消费速度,大部分时刻都在重复消费和Rebalance
总结
本次首要讲解了kafka的整体根本架构和原理,并从出产者和顾客视点别离介绍了kafka的架构,便于我们从不同视点去理解这个高功能音讯行列。