前语
目前正在出一个Kafka专题
系列教程, 篇幅会较多, 喜爱的话,给个关注❤️ ~
本节给咱们讲一下Kafka中偏移量(offset)的概念
并结合经典面试题来看下它的实践运用场景~
好了, 废话不多说直接开整吧~
什么是分区 & Partition
在讲之前呢,先理一下什么是分区
,在第一节的时分有给咱们提到过
在Kafka
中,一个主题(topic)
能够分红多个分区
。每个分区都是一个有序
的音讯行列
,它们能够在不同的服务器上进行仿制,以提高可靠性
和可扩展性
。每个分区都有一个仅有的标识符(partition ID)
,用于标识该分区。
什么是偏移量 & Offset
偏移量
是Kafka
中用于标识音讯在分区中方位
的一个数字
。每个音讯都有一个仅有
的偏移量,它是由Kafka
分配的,而且在分区
中是递增
的。偏移量
能够用于回溯
分区中的音讯,也能够用于跟踪现已消费
的音讯。
+---------+ +---------+
| | | |
|Topic | |Topic |
| | | |
+---------+ +---------+
| | | |
|Partition 1 Partition 2
| | | |
+---------+ +---------+
| | | |
|Offset 1 | |Offset 1 |
| | | |
+---------+ +---------+
| | | |
|Offset 2 | |Offset 2 |
| | | |
+---------+ +---------+
| | | |
| ... | | ... |
| | | |
+---------+ +---------+
在这个暗示图中,Kafka
中有两个主题(Topic)
,每个主题都有两个分区(Partition)
。每个分区都有一个仅有的分区ID
,而且包含一系列有序
的音讯。每个音讯都有一个仅有
的偏移量(Offset)
,用于标识它在分区中的方位。
实践运用 & offset
Kafka
中的偏移量(Offset)
是一个非常重要的概念,它指的是顾客在一个特定分区的音讯中的方位
。Kafka
运用偏移量
来确保顾客能够从前次离开的地方持续
消费,从而确保音讯的次序性
和可靠性
。
以下是一些实践运用场景,演示了怎么运用偏移量来处理不同的状况:
-
从头消费音讯
:假定一个顾客在处理音讯时发生了毛病或错误
,导致它无法处理后续的音讯。在这种状况下,咱们能够将顾客的偏移量重置
为较早的方位,以从头消费
之前未能处理的音讯。 -
手动提交偏移量
:Kafka顾客API
支持手动
提交偏移量
的方法,这能够用于优化顾客的性能
和操控偏移量的提交
。在手动提交偏移量时,顾客能够依据自己的需求决定何时提交偏移量,而且能够依据音讯的处理状况进行批量提交或独自提交。 -
顾客组和谐器
:Kafka顾客API
中的顾客组和谐器担任办理顾客组中的偏移量。当顾客参加或离开顾客组时,和谐器将从头分配偏移量,以确保顾客能够从正确
的方位开端消费。 -
并发消费
:在某些状况下,咱们或许需求运用多个顾客来并发地消费同一个主题的音讯。在这种状况下,每个顾客都能够独登时办理自己的偏移量,并依据自己的需求进行提交,以确保每个顾客都能够独登时处理音讯。
总之,偏移量在Kafka中具有重要的作用,它能够协助咱们完成音讯的次序性
和可靠性
,并供给了一些方便的方法来处理不同的运用场景。
Kafka 音讯丢掉
Kafka
一定能确保音讯不丢掉吗?答案是否定的。前面几节在讲顾客消费音讯的时分都是主动提交偏移量
,这儿说一下主动提交
的概念,
默许状况下,Kafka
会运用主动提交偏移量
的方法来办理偏移量
。这意味着,每当顾客
从Kafka
中拉取一批音讯并消费结束
后,它将主动
提交偏移量,以便下一次顾客拉取音讯时能够从前次提交
的偏移量处开端消费音讯。
接下来,咱们就简单的模仿一下音讯丢掉
的场景~
新建一个OffsetController
,完成一个小需求,首要发送10
条音讯到topic1
,消费成功后将结果发送到topic2
@Slf4j
@RestController
public class OffsetController {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@GetMapping("/hello")
public String hello() throws Exception {
// 发送音讯
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
kafkaTemplate.send("topic1", message);
log.info("Sent message: {}", message);
Thread.sleep(1000);
}
return "hello";
}
@KafkaListener(topics = "topic1", id = "to1")
public void listen1(String message) {
log.info("listen1 Received message >>> {}", message);
kafkaTemplate.send("topic2", message);
}
@KafkaListener(topics = "topic2", id = "to2")
public void listen2(String message) {
log.info("listen2 Received message >>> {}", message);
}
}
看下正常状况下的音讯消费:
2023-03-21 14:47:14.161 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 0
2023-03-21 14:47:14.177 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 0
2023-03-21 14:47:14.211 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 0
2023-03-21 14:47:15.168 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 1
2023-03-21 14:47:15.173 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 1
2023-03-21 14:47:15.178 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 1
2023-03-21 14:47:16.177 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 2
2023-03-21 14:47:16.184 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 2
2023-03-21 14:47:16.187 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 2
2023-03-21 14:47:17.189 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 3
2023-03-21 14:47:17.193 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 3
2023-03-21 14:47:17.198 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 3
2023-03-21 14:47:18.202 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 4
2023-03-21 14:47:18.205 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 4
2023-03-21 14:47:18.209 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 4
2023-03-21 14:47:19.205 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 5
2023-03-21 14:47:19.210 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 5
2023-03-21 14:47:19.218 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 5
2023-03-21 14:47:20.206 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 6
2023-03-21 14:47:20.210 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 6
2023-03-21 14:47:20.213 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 6
2023-03-21 14:47:21.221 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 7
2023-03-21 14:47:21.226 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 7
2023-03-21 14:47:21.232 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 7
2023-03-21 14:47:22.228 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 8
2023-03-21 14:47:22.232 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 8
2023-03-21 14:47:22.241 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 8
2023-03-21 14:47:23.230 INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 9
2023-03-21 14:47:23.234 INFO 117020 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 9
2023-03-21 14:47:23.238 INFO 117020 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 9
能够看到音讯是正常消费, 而且是次序消费
下面来改造一下顾客,假定顾客to1
在某些状况下发生了反常或者宕机了
@KafkaListener(topics = "topic1", id = "to1")
public void listen1(String message) {
if(message.contains("6"))
throw new RuntimeException("体系反常");
log.info("listen1 Received message >>> {}", message);
kafkaTemplate.send("topic2", message);
}
@KafkaListener(topics = "topic2", id = "to2")
public void listen2(String message) {
if(String.format("Message %d", 6 + 1).equals(message)) {
log.error("音讯丢掉, 音讯为 >>> {}", 6);
}
log.info("listen2 Received message >>> {}", message);
}
看下反常状况下的输出
2023-03-21 15:21:28.405 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 0
2023-03-21 15:21:28.421 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 0
2023-03-21 15:21:28.430 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 0
2023-03-21 15:21:29.409 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 1
2023-03-21 15:21:29.413 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 1
2023-03-21 15:21:29.417 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 1
2023-03-21 15:21:30.415 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 2
2023-03-21 15:21:30.420 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 2
2023-03-21 15:21:30.424 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 2
2023-03-21 15:21:31.416 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 3
2023-03-21 15:21:31.420 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 3
2023-03-21 15:21:31.424 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 3
2023-03-21 15:21:32.430 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 4
2023-03-21 15:21:32.434 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 4
2023-03-21 15:21:32.438 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 4
2023-03-21 15:21:33.432 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 5
2023-03-21 15:21:33.436 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 5
2023-03-21 15:21:33.441 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 5
2023-03-21 15:21:34.435 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 6
2023-03-21 15:21:34.445 ERROR 130336 --- [ to1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 73, CreateTime = 1679383294435, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 6)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.kafka.study.controller.OffsetController.listen1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 体系反常; nested exception is java.lang.RuntimeException: 体系反常
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1261) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1188) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1159) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1099) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.RuntimeException: 体系反常
at com.kafka.study.controller.OffsetController.listen1(OffsetController.java:36) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
... 8 common frames omitted
2023-03-21 15:21:35.439 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 7
2023-03-21 15:21:35.442 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 7
2023-03-21 15:21:35.445 ERROR 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : 音讯丢掉, 音讯为 >>> 6
2023-03-21 15:21:35.445 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 7
2023-03-21 15:21:36.448 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 8
2023-03-21 15:21:36.451 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 8
2023-03-21 15:21:36.455 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 8
2023-03-21 15:21:37.449 INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 9
2023-03-21 15:21:37.455 INFO 130336 --- [ to1-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 9
2023-03-21 15:21:37.460 INFO 130336 --- [ to2-0-C-1] c.k.study.controller.OffsetController : listen2 Received message >>> Message 9
从结果来看当顾客to1
消费到Message 6
这条音讯的时分报了反常导致音讯没有被消费成功,但是仍是正常提交了offset
,接着持续消费,这就导致了音讯的丢掉
。理论上讲没有消费成功的音讯应当从头消费,然后提交offset
那么怎么不主动提交offset
呢? 这是一道比较经典的面试题
Kafka 手动提交 offset
由于咱们运用了主动提交偏移量
的方法,而这个新的音讯是在顾客提交偏移量之后
发送的,因而顾客不会收到这条新的音讯,这就导致了音讯丢掉的状况。为了避免这种状况,咱们应该运用手动提交偏移量
的方法,以便在顾客完成一切音讯的消费后手动提交偏移量。这样一来,即便在顾客消费音讯的进程中出现反常或者顾客运用程序被封闭,咱们也能够确保音讯的完整性和可靠性。
接下来,咱们就去解决上述的问题。
首要更改一下KafkaConfig
, 上节给咱们讲过,直接在这上边改, 在kafkaListenerContainerFactory()
中增加如下代码
// 开启手动提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
接着修正consumerConfigs()
, 增加如下代码, 将主动提交封闭
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
然后更改下咱们的controller
代码
/**
* 手动提交偏移量
*/
@GetMapping("/hello1")
public String hello1() throws Exception {
// 发送音讯
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
kafkaTemplate.send("topic3", message);
log.info("Sent message: {}", message);
Thread.sleep(1000);
}
return "hello1";
}
@KafkaListener(topics = "topic3", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
public void onMessage(String message, Acknowledgment acknowledgment) {
log.info("listen1 Received message >>> {}", message);
}
这个时分我并没有在监听器里手动提交offset
, 运行之后恳求下,看下操控台信息
2023-03-21 16:45:38.408 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 1
2023-03-21 16:45:38.411 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 1
2023-03-21 16:45:39.415 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 2
2023-03-21 16:45:39.418 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 2
2023-03-21 16:45:40.419 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 3
2023-03-21 16:45:40.423 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 3
2023-03-21 16:45:41.434 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 4
2023-03-21 16:45:41.438 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 4
2023-03-21 16:45:42.447 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 5
2023-03-21 16:45:42.452 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 5
2023-03-21 16:45:43.460 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 6
2023-03-21 16:45:43.463 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 6
2023-03-21 16:45:44.462 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 7
2023-03-21 16:45:44.465 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 7
2023-03-21 16:45:45.474 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 8
2023-03-21 16:45:45.478 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 8
2023-03-21 16:45:46.484 INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController : Sent message: Message 9
2023-03-21 16:45:46.488 INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 9
假如契合预期的话,顾客再次发动的时分,应该从前次消费的方位开端消费,下面咱们重启运用,过几秒后看下操控台信息
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 0
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 1
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 2
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 3
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 4
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 5
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 6
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 7
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 8
2023-03-21 16:47:57.286 INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController : listen1 Received message >>> Message 9
从结果来看,契合咱们的预期,由于Message 0
这条音讯并没有手动提交offset
所以下次进来的时分仍是从这个方位开端消费音讯
那怎么手动提交呢?其实很简单,增加如下代码即可
acknowledgment.acknowledge();
这样,在音讯被成功消费后,由运用本身手动提交offset
这样能够确保咱们的音讯不会被丢掉
结束语
本节提到了ack
的概念,它是kafka
的一种承认机制,它用于确定音讯是否现已被成功处理,下节就结合实践运用场景给咱们顺一下这个概念~
本着把自己知道的都告知咱们,假如本文对您有所协助,点赞+关注
鼓舞一下呗~
相关文章
- 一起来学kafka之Kafka集群建立
- 一起来学kafka之整合SpringBoot根本运用
- 一起来学kafka之整合SpringBoot深入运用(一)
项目源码(源码已更新 欢迎star⭐️)
- springboot-kafka-all: https://github.com/qiuChengleiy/springboot-kafka-all
ElasticSearch 专题学习
-
运用docker建立es集群
-
一起来学ElasticSearch(一)
-
一起来学ElasticSearch(二)
-
一起来学ElasticSearch(三)
-
一起来学ElasticSearch(四)
-
一起来学ElasticSearch(五)
-
一起来学ElasticSearch(六)
-
一起来学ElasticSearch(七)
-
一起来学ElasticSearch(八)
-
一起来学ElasticSearch(九)
-
一起来学ElasticSearch(十)
-
一起来学ElasticSearch之整合SpringBoot(一)
-
一起来学ElasticSearch之整合SpringBoot(二)
-
一起来学ElasticSearch之整合SpringBoot(三)
项目源码(源码已更新 欢迎star⭐️)
- springboot-es-all: https://github.com/qiuChengleiy/springboot-es-all
往期并发编程内容引荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码剖析)
- Java多线程专题之Callable、Future与FutureTask(含源码剖析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期进程
- 面试官: 说一下线程间的通讯
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的运用
- 面试官: 有了解过ReentrantLock的底层完成吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的根本运用
- 面试官: 有了解过线程池的作业原理吗?说说看
- 面试官: 线程池是怎么做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞行列有了解过吗?说说看
- 面试官: 阻塞行列的底层完成有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有运用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有运用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
- 面试官: Phaser有了解过吗?说说看
- 面试官: Fork/Join 有了解过吗?说说看(含源码剖析)
- 面试官: Stream并行流有了解过吗?说说看
引荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)
-
springboot-all
-
地址
: github.com/qiuChenglei… -
SpringBoot系列教程合集
-
一起来学SpringCloud合集
-
SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 完成授权码形式的服务认证(一)
-
SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 完成授权码形式的服务认证(二)
博客(阅览体会较佳)
-
我的博客(阅览体会较佳)
-
地址
: github.com/qiuChenglei…