前语
生产者担任向Kakfa发送消息。扮演的是一个消息投递的人物。为了保证消息可以顺利安全地发送到Kafka broker里面,Kafka对生产者精心规划许多机制。
本篇介绍Kafka中生产者的一些根底配备和一些生产者的机制。
看完这篇文章你将会收成:
生产者
消息发送的流程
消息的构成:
- Topic:主题
- Partition:分区
- key:假设key有值,分区器会根据它进行分区
- valuea:消息内容
ProducerRecord
方针包含方针主题(Topic) 和要发送的内容。生产者需求把键和值方针序列化成字节数组。
然后数据会序列化器解析,得到解析后的数据,再扔给分区器。
假设在ProducerRecord方针里面指清楚分区,那么分区器就不多做处理。假设没有指明,那就靠分区器根据方针的键来选择一个分区。
选择好分区之后,生产者就知道往哪个主题和分区发送这条记载。这条记载会被追加到一个记载批次里面,一个批次里面的全部消息会被发送到相同的主题和分区上。这部分作业是有一个独立的线程担任把这些记载批次发送到相应的broker上。
服务器收到这些消息时,就会回来一个照应。
假设消息写入Kafka成功,就回来一个RecordMetaData
元数据方针,它包含了主题和分区信息,以及记载在分区的偏移量。
假设消息写入Kafka失利,就会回来一个差错,生产者收到差错之后会测验从头发送消息,几次之后假设仍是实不行,就回来差错信息。 (最大尽力交给)
生产者配备
要想往Kafka写入消息,第一步就是要创立一个生产者方针,并且设置一些配备特点
必选配备
下面罗列几个Kafka生产者有必选特点
:
bootstrap.servers
指定broker的地址清单,地址的格式为host:sport
。清单里面不需求包含全部broker地址,生产者会从给定的broker里面找到其他broker的信息。(建议是供应两个broker,一旦其间一个宕机,别的一个顶上去衔接到集群)
key.serializer
broker期望接收到的键值都是字节数组。生产者接口容许运用参数化类型(Java里面叫泛型) ,因此可以把java方针作为键或值发送给broker。默许供应了ByteArraySerializer
、StringSerializer
和IntegerSerializer
。
value.serializer
跟前者相同,假设key、value都是同一个类型,那么就跟key.serializer运用同一个序列化器即可。
可选配备
acks
满意acks的设置,才代表生产者成功写入消息
-
acks=0
:代表生产者不需求任何服务器的照应。这种情况下,消息丢了就丢了,生产者自己也不知道。合适高吞吐量,但是消息并不重要的场景。 -
acks=1
:只需首领副本收到消息,生产者就会收到来自服务器的成功照应。假设消息无法抵达首领节点(比如首领节点溃散,新的首领还没有被推举出来等),生产者就会收到一个差错照应。生产者会测验重发消息。此时的吞吐量取决于是同步发送方式,仍是异步发送方式。 -
acks=all
:只有当全部副本都收到消息,生产者才会收到来自服务器的一个成功照应。
buffer.memory
生产者内存缓冲池大小
用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息,
compression.type
紧缩算法,指的是消息被发送到broker之前运用哪一种紧缩算法进行紧缩。
消息在发送到broker之前,要走网络,那么就意味着需求消耗带宽。假设我们这个消息大小太大,功用上就很欠好。所以我们一般会采用某种紧缩算法来紧缩消息。
-
snappy
:snappy是谷歌研制的,它占用较少的CPU,却可以得到较好的功用和紧缩比,假设是比较重视功用和互联网带宽(比如直播体系) 可以运用这种算法。 -
gzip
:gzip一般会占用较多的CPU,但是会提高更高的紧缩比,假设带宽有限,可以选用这种算法。特别的gzip对文本类型紧缩有特别好的作用。 -
lz4
:lz4则是寻求紧缩解压速度,他的紧缩比并不是很好。假设网络带宽条件比较好,可以选用这种紧缩算法。
retries
生产者重发消息的次数
字面意思,生产者重发消息的次数,超过了之后就会抛出一个重试失常。
batch.size
批次的大小
按照字节数核算,不是按照消息个数来核算。当批次被填满的时分,批次里面的全部消息被发送出去。有时分也不需求等到被填满才发送,半满,乃至只包含1个消息的批次也有或许被发送。
- 批次太大:占内存。
- 批次太小:频频发送消息,带来额定的开支。
linger.ms
发送批次前等候下一个消息参加的时间,也就是批次的泊车等候时间
KafkaProducer会在批次被填满或许linger.ms抵达上限时把批次发送出去。
默许情况下,只需有可用的线程,生产者就会把消息发送出去。
max.in.flight.requests.per.connection
服务器照应之前能接受的消息数
生产者在收到服务器照应之前,可以发送多少个消息,它的值越高,约占内存,不过吞吐量也越高。
设置为1,则可以保证消息是按照发送次第写入服务器的,即就是发送了重试。
但是其实没必要阿,Kafka是可以保证同一个分区里的消息是有序得。只需生产者按照必定次第发送消息,broker就会按照这个次第将他们写入同一个分区。顾客消费的时分也是按次第消费的。
消息发送方法
实例化生产者方针之后,就可以向Kafka发送消息了,发送消息有3种方法。消息先是被放进缓冲区,然后运用独自的线程发送到服务端。
发送并忘掉(fire-and-forget)
发送给服务器之后,不关心他是否成功抵达,大多情况下,消息会正常抵达,因为Kafka是高可用,并且要害是生产者会自动测验重发,有必定几率会丢掉消息。
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products",
"France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
同步发送
运用
send()
方法发送消息,他会回来一个Future方针,通过调用get()方法进行等候,就可以知道消息是否发送成功。
同步,指的是发送给Kafka之后,我这边还需求有一个发送服务端等候服务请照应进程,通过调用那个Future方针的get方法来进行处理后续逻辑。
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
❌KafKaProducer一般会发生两种类型差错
:
- 一类是可重试差错,这类差错可以通过重发消息来处理。比如关于衔接差错,可以通过再次建立衔接来处理。“
无主
”差错(no-leader)则是可以通过从头为分区从头推举首领来处理。假设多次都无法处理问题,则会抛出一个重试失常。 - 别的一类为无法通过重试处理,比如消息太大的失常,这类失常KafkaProducer不会进行任何重试,直接抛出失常。
异步发送
调用
send()
方法,指定一个回调函数,服务器在回来照应时调用该函数。
异步,指的是我不需求去处理消息等候进程,我们通过指定一个回调函数,让Kafka那儿收到消息之后调用这个回调函数即可。
大多数情况,我们不需求等候照应,虽然说,Kafka会把方针主题、分区信息和消息的偏移量发送过来,但是关于发送端的应用程序来说,并不是有必要的。不过我们遇到消息发送失利的时分,需求抛出失常,记载差错日志,或许把消息写入“差错消息”文件以便日后剖析。
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
优点就是,我们可以对失常情况进行处理。
序列化器
之所以为什么需求运用序列化器,是因为我们有许多客户端,每个客户端传给Kafka的键值类型都是可以不同的,所以需求客户端指明运用的序列化器,让Kafka知道怎样去解析数据。
举荐的运用计划是:Avro
。(Hadoop也是运用这个)
Kafka有自己供应的默许序列化器(byteArray、String、Integer等) ,也可以我们用户自己去自定义序列化器。
自定义序列化器
我们可以用业内常见的序列化结构,如Avro
、Thrift
或是Protobuf
。不建议运用我们自定义序列化器。
因为当我们需求更新我们这个方针,新增或删去某一个字段,那么这个自定义序列化器就要发生改动。或许不止一个客户端用这个方针,那么全部的客户端序列化器都要跟着改。非常不利于扩展。
怎样写一个自定义序列化器?
首要我们要知道Kafka接受的是字节数组,所以我们只需求把方针序列为字节数组回来出去即可。
运用Avro序列化
Apache Avro是一种与编程言语无关的序列化格式。
Avro数据通过与言语无关的schema来定义。
schema通过JSON来描述,数据被序列化成二进制文件或许JSON文件。(一般都是用二进制文件)
Avro在读、写数据都需求用到schema。schema存在于Avro的数据文件中。
Avro的特色在于,担任写消息的应用程序假设运用了新的schema(比如新增或删去某一个字段),担任读消息的应用程序可以持续处理消息而无需做任何改动。所以它非常合适Kafka。
留心:虽然担任读消息的应用程序不需求改schema,但是它仍是会读不到最新的字段,只不过他不会回来差错或许失常,读取到的是null。
在Kafka中运用Avro
Avro的数据文件包含了整个schema,虽然说这种开支对是可接受的,但是Kafka有许多消息,那么每个消息所带来的负担是不行忽视的。怎样处理呢?Kafka有他自己的一套东西。
Kafka采用的是运用schema注册表来抵达方针,这个不是Kafka自己完结的,需求凭借外部来完结。我们用的是Confluent Schema Registry。
原理就是非常简略,原先Avro数据里面放schema,现在不放了,改为将全部的schema放在同一个当地存着(这个当地就是schema注册表) ,然后解析的时分根据Avro数据里的schema标识符去自己拉取schema下来。因此我们在发送消息的时分需求注册schema到schema注册表里面,然后塞入一个shcema标识符即可。
本质上:就是一个拉方式(poll) ,或许说是读扩散。
分区
一个主题下,有一个或多个分区,在同一个分区内,消息是具有次第性的。
ProducerRecord方针包含了方针主题、键、值。
键:可以设置为默许的null,不过大多数应用程序会用到null。键主要有2个用途:
- 可以作为消息的附加消息。
- 也可以抉择消息该写到主题的哪个分区。具有相同键的消息可以被写到同一个分区。
运用null作为键值,并且运用了默许的分区器,那么记载就会被随机地发送到主题内各个可用的分区上。
假设键不为空,并且运用了默许的分区器。Kafka会对键进行散列,然后根据散列后的成果,把消息映射到特定的分区上。所以同一个键总是会被映射到同一个分区上。
要害一点是:这边散列的分区是全部的分区,并不是可用的分区。所以有或许映射到不行用的分区。但是这种情况很少发生,并且Kafka具有仿制功用和可用性。
一般我们是不会简单改动主题分区数量。因为一旦改动了,那么全部的映射联系都会发生变化。有或许同一个键的数据会被映射到不同的分区。所以假设想要通过键来映射分区,那么最好在创立主题的时分就把分区规划好。
原则上是:永久不要新增分区
分区器
默许分区器就是上面所谈论的情况,他是运用次数最频频、最常用的分区器。它选用的是散列分区这么一个战略。
有些时分,我们的事务需求,需求我们对数据指定分区,并且支持独自性的分区,比如某个大客户的账号记载我们想要独自分配到某个分区。
黏性分区:StickyPartitioning Strategy
0.10版别之后的kafka完结了黏性分区战略,完结生产者发送数据分块优化。
我们知道,往Kafka发送消息,broker并不会立刻接收到消息。Kafka有按量和准时进行一个Batch批次的消息发送。
从这个规划上来说,我们当然期望是消息尽或许填满一个批次,这样是最赚的。
实际上,抉择Batch怎样构成的一个因素是分区战略(Partition Strategy)。
在Kafka2.4
版别之前,选用的默许分区战略是轮询(Round-Robin),(既没有指定partition,又没有指定key的情况下,假设多条消息不是被发送到相同的分区,那么他们就不能被放到一个batch里)
所以这样就会形成一个大的Batch被拆分红多个小Batch。因此社区推出了一种新的分区战略黏性分区。
黏性分区:会随机选择一个分区并尽或许地坚持运用该分区,代表黏住这个分区。
好处
:显著地下降给消息指定分区进程中地延时,有助于改进消息批处理,减少延迟,并减少broker的负载。