一、Kafka Streams概述
官网文档:kafka.apache.org/32/document…
1)Kafka Streams是什么
Kafka Streams是一套客户端类库,它建立在重要的流处理概念之上,它能够对存储在Kafka内的数据进行流式处理和剖析,简称“流式核算”。
2)流式核算与批量核算差异
- 流式核算:输入是持续的,一般先界说方针核算,然后数据到来之后将核算逻辑应用于数据,往往用增量核算替代全量核算。
- 批量核算:一般先有全量数据集,然后界说核算逻辑,并将核算应用于全量数据。特色是全量核算,而且核算结果一次性全量输出。
3)Kafka Streams特色
- 简单轻量级的客户端库,能够轻松嵌入任何 Java 应用程序,并与用户为其流应用程序具有的任何现有打包、布置和操作工具集成。
- 除了作为内部音讯传递层的Apache Kafka自身之外,对体系没有外部依赖;值得注意的是,它运用 Kafka 的分区模型来水平扩展处理,同时坚持强壮的排序确保。
- 支撑容错本地状况,这能够完成十分快速和高效的有状况操作,例如窗口衔接和聚合。
- 支撑一次性处理语义,以确保每条记载将被处理一次且仅处理一次,即使在处理过程中 Streams 客户端或 Kafka 署理出现毛病时也是如此。
- 采用一次一条记载的处理来完成毫秒处理推迟,并支撑依据事情时刻的窗口操作,记载无序到达。
- 供给必要的流处理原语,以及高级 Streams DSL和低级处理器 API。
二、Kafka Streams流处理拓扑
1)相关概念
- 流是 Kafka Streams 供给的最重要的抽象:它代表了一个无限的、不断更新的数据集。流是不可变数据记载的有序、可重放和容错序列,其间数据记载被界说为键值对。
- 流处理应用程序是任何运用 Kafka Streams 库的程序。它经过一个或多个处理器拓扑界说其核算逻辑,其间处理器拓扑是由流(边)衔接的流处理器(节点)的图。
- 流处理器是处理器拓扑中的一个节点;它表示一个处理过程,经过从拓扑中的上游处理器一次接纳一个输入记载,将其操作应用到它,并或许随后产生一个或多个输出记载到其下流处理器,然后转换流中的数据。
拓扑中有两个特别的处理器:
- 源处理器:源处理器是一种特别类型的流处理器,它没有任何上游处理器。它经过运用来自这些主题的记载并将它们转发到其下流处理器,从一个或多个 Kafka 主题生成其拓扑的输入流。
- 接纳器处理器:接纳器处理器是一种特别类型的流处理器,它没有下流处理器。它将任何从其上游处理器接纳到的记载发送到指定的 Kafka 主题。
2)Kafka Streams中两种界说流处理的办法
- Kafka Streams DSL map供给了最常见的数据转换操作;例如开箱即filter用join;
- aggregations较低等级的处理器 API答应开发人员界说和衔接自界说处理器以及与状况存储交互。
3)流处理中的三种时刻
流处理的一个关键方面是时刻的概念,以及它是如何建模和集成的。例如,一些操作,如加窗,是依据时刻鸿沟界说的。
- 事情时刻——事情或数据记载产生的时刻点,即开始是“在源头”创建的。
示例:假如事情是由轿车中的 GPS 传感器陈述的地理方位改变,则相关的事情时刻将是 GPS 传感器捕获方位改变的时刻。
- 处理时刻——事情或数据记载恰好被流处理应用程序处理的时刻点,即记载被消费的时刻。处理时刻或许比原始事情时刻晚几毫秒、几小时或几天等。
示例:想象一个剖析应用程序,它读取并处理从轿车传感器陈述的地理方位数据,以将其出现给车队办理仪表板。在这儿,剖析应用程序中的处理时刻或许是事情时刻之后的毫秒或秒(例如,关于依据 Apache Kafka 和 Kafka Streams 的实时管道)或几小时(例如,关于依据 Apache Hadoop 或 Apache Spark 的批处理管道)。
- 吸取时刻——事情或数据记载由 Kafka 署理存储在主题分区中的时刻点。与事情时刻的差异在于,这个吸取时刻戳是在 Kafka 署理将记载附加到方针主题时生成的,而不是在“在源”创建记载时生成的。处理时刻的差异在于处理时刻是流处理应用程序处理记载的时刻。
例如,假如一条记载从未被处理过,它就没有处理时刻的概念,但它仍然有一个吸取时刻。
【温馨提示】事情时刻 和 吸取时刻 之间的选择实际上是经过 Kafka的装备来完成的(不是 Kafka Streams):从 Kafka 0.10.x 开始,时刻戳会主动嵌入到 Kafka 音讯中。依据 Kafka 的装备,这些时刻戳代表事情时刻或吸取时刻。
4)KTable和KSteam
-
KStream
是一个数据流,能够以为一切的记载都经过Insert only的方式刺进进这个数据流中。 -
KTable
代表一个完整的数据集,能够理解为数据库中的表。每条记载都是KV键值对,key能够理解为数据库中的主键,是仅有的,而value代表一条记载。咱们能够以为KTable中的数据时经过Update only的方式进入的。假如是相同的key,会覆盖掉本来的那条记载。
综上来说,KStream是数据流,来多少数据就刺进多少数据,是Insert only;KTable是数据集,相同key只答应保留最新的记载,也便是Update only。
5)窗口
流式数据在时刻上无界的,可是聚合操作只能作用在特定(有界)的数据集,这时分就有了窗口的概念,在时刻无界的数据流中界说一个鸿沟来用于核算。
⼀个窗⼝包括窗⼝⼤⼩和滑动步长两个特点:
- 窗⼝⼤⼩:⼀条记载在窗⼝中持续的时刻,持续时刻超越窗⼝⼤⼩的记载将会被删去;
- 滑动步长:指定了⼀个窗⼝每次相关于前⼀个窗⼝向前移动的间隔
【温馨提示】滑动步长不能超越窗⼝⼤⼩,假如超越窗⼝⼤⼩则会导致部分记载不归于任何窗⼝⽽不被处理。
Kafka Streams界说了三种窗⼝:
-
跳跃时刻窗⼝(
hopping time window
):⼤⼩固定,或许会堆叠的窗⼝模型 -
翻转时刻窗⼝(
tumbling time window
):⼤⼩固定,不可堆叠,⽆间隙的⼀类窗⼝模型 -
滑动窗⼝(
sliding window
):⼤⼩固定而且沿着时刻轴连续滑动的窗⼝模型,假如两条记载时刻戳之差在窗⼝⼤⼩之内,则这两条数据记载归于同⼀个窗⼝。在Kafka流中,滑动窗⼝只要在join操作的时分才⽤到。
三、Kafka Streams原理与架构
Kafka Streams 经过构建 Kafka 生产者和顾客库并运用 Kafka 的本机功能来供给数据并行性、分布式和谐、容错和操作简单性,然后简化了应用程序开发。
1)流分区和使命
Kafka 的音讯传递层对数据进行分区以进行存储和传输。Kafka Streams 对数据进行分区以进行处理。在这两种情况下,这种分区是完成数据局部性、弹性、可伸缩性、高性能和容错的原因。Kafka Streams 运用分区和使命的概念作为其依据 Kafka 主题分区的并行模型的逻辑单元。在并行性方面,Kafka Streams 和 Kafka 之间有着亲近的联系:
- 每个流分区是一个完全有序的数据记载序列,并映射到一个 Kafka主题分区。
- 流中的数据记载映射到来自该主题的 Kafka音讯。
- 数据记载的键决定了 Kafka 和 Kafka Streams 中数据的分区,即数据如何路由到主题内的特定分区。
【例如】假如您的输入主题有 5 个分区,那么您最多能够运转 5 个应用程序实例。这些实例将协作处理主题的数据。假如您运转的应用程序实例数量多于输入主题的分区,“剩下”的应用程序实例将发动但坚持闲暇;可是,假如其间一个繁忙的实例出现毛病,则其间一个闲暇的实例将恢复前者的工作。
下图显现了两个使命,每个使命分配有输入流的一个分区:
2)线程模型
Kafka Streams 答应用户装备库可用于并行化应用程序实例中的处理的线程数。每个线程能够运用其处理器拓扑独登时执行一个或多个使命。例如,下图显现了一个流线程运转两个流使命:
发动更多流线程或应用程序的更多实例仅相当于仿制拓扑并让它处理不同的 Kafka 分区子集,然后有效地并行处理。值得注意的是,线程之间没有共享状况,因而不需要线程间和谐。
3)本地状况存储
-
Kafka Streams
供给了所谓的状况存储,流处理应用程序能够运用它来存储和查询数据,这是完成有状况操作时的重要才能。 -
Kafka Streams
应用程序中的每个流使命都能够嵌入一个或多个本地状况存储,这些本地状况存储能够经过 API 拜访,以存储和查询处理所需的数据。Kafka Streams 为此类本地状况存储供给容错和主动恢复。
【例如】Kafka Streams DSL会在您调用有状况运算符(例如join()or aggregate())或窗口化流时主动创建和办理此类状况存储。
下图显现了两个流使命及其专用的本地状况存储:
4)容错
Kafka Streams
建立在 Kafka 原生集成的容错功能之上。Kafka 分区具有高可用性和可仿制性;因而,当流数据被耐久化到 Kafka 时,即使应用程序失利并需要重新处理它,它仍然可用。Kafka Streams 中的使命运用 Kafka 顾客客户端供给的容错才能来处理毛病。假如使命在失利的机器上运转,Kafka Streams 会主动在应用程序的剩下运转实例之一中重新发动使命。
四、简单应用(WordCount示例)
官网示例:kafka.apache.org/32/document…
源代码:github.com/apache/kafk…
1)发动zookeeper和kafka(无鉴权)
$ cd $KAFKA_HOME
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
$ ./bin/kafka-server-start.sh ./config/server.properties
2)创建topic
创建名为streams-plaintext-input
的输入topic和名为streams-wordcount-output
的输出topic:
$ cd $KAFKA_HOME
$ bin/kafka-topics.sh --create \
--bootstrap-server hadoop-node1:19092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
$ bin/kafka-topics.sh --create \
--bootstrap-server hadoop-node1:19092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
【温馨提示】咱们创建启用压缩的输出topic,因为输出流是一个改变日志流,关于具有相同键的多条记载,后面的每条记载都是对前一条记载的更新。
查看topic
$ bin/kafka-topics.sh --bootstrap-server hadoop-node1:19092 --describe
3)Maven依赖装备
<!-- (必需)Kafka 客户端库。包括内置的序列化器/反序列化器 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- (必需)Kafka Streams 的根底库 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.2.0</version>
</dependency>
<!-- (可选)用于 Scala 库的 Kafka Streams DSL,用于编写 Scala Kafka Streams 应用程序。不运用 SBT 时,您需要在工件 ID 后缀上您的应用程序运用的正确版本的 Scala ( _2.12, _2.13) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-scala_2.13</artifactId>
<version>3.2.0</version>
</dependency>
4)修正源码
【温馨提示】这儿主要修正BOOTSTRAP_SERVERS_CONFIG装备,假如运用带鉴权的kafka就得装备鉴权了。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
/**
* Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
* that computes a simple word occurrence histogram from an input text.
* <p>
* In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages
* represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
* is an updated count of a single word.
* <p>
* Before running this example you must create the input topic and the output topic (e.g. via
* {@code bin/kafka-topics.sh --create ...}), and write some data to the input topic (e.g. via
* {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic.
*/
public final class WordCountDemo {
public static final String INPUT_TOPIC = "streams-plaintext-input";
public static final String OUTPUT_TOPIC = "streams-wordcount-output";
static Properties getStreamsConfig(final String[] args) throws IOException {
final Properties props = new Properties();
if (args != null && args.length > 0) {
try (final FileInputStream fis = new FileInputStream(args[0])) {
props.load(fis);
}
if (args.length > 1) {
System.out.println("Warning: Some command line arguments were ignored. This demo only accepts an optional configuration file.");
}
}
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
//修正的地方
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092");
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
static void createWordCountStream(final StreamsBuilder builder) {
final KStream<String, String> source = builder.stream(INPUT_TOPIC);
final KTable<String, Long> counts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count();
// need to override value serde to Long type
counts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
}
public static void main(final String[] args) throws IOException {
final Properties props = getStreamsConfig(args);
final StreamsBuilder builder = new StreamsBuilder();
createWordCountStream(builder);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
5)发动 Wordcount 应用程序
1、在IDEA发动应用程序 2、现在咱们能够在一个独自的终端中发动控制台生产者来向这个主题写入一些输入数据:
$ cd $KAFKA_HOME
$ bin/kafka-console-producer.sh --bootstrap-server hadoop-node1:19092 --topic streams-plaintext-input
输入以下数据:
all streams lead to kafka
3、并经过在独自的终端中运用控制台运用者读取其输出topic来查看 WordCount 演示应用程序的输出:
$ cd $KAFKA_HOME
$ bin/kafka-console-consumer.sh --bootstrap-server hadoop-node1:19092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
6)经过jar包发动程序
将java文件打包成jar包,DIEA打包过程如下:
- File->Project Structure->Artifacts
- Build->Build Artifacts(开始打包)
找到对应的jar包程序,放在$KAFKA_HOME/libs/目录下,运转程序
修正名称
$ mv kafka.jar $KAFKA_HOME/libs/WordCountDemo.jar
运转
$ cd $KAFKA_HOME
# 发动生产者
$ bin/kafka-console-producer.sh --bootstrap-server hadoop-node1:19092 --topic streams-plaintext-input
# 输入以下数据:
hello kafka streams
# 发动应用程序(数据处理,数据统计)
$ bin/kafka-run-class.sh bigdata.kstreams.com.WordCountDemo
# 假如没有修正装备,运用官方供给的以下指令
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
# 发动别的一个客户端,显现输出(顾客)
$ bin/kafka-console-consumer.sh --bootstrap-server hadoop-node1:19092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
【总结】Kafka Streams的作用和意图便是数据剖析与处理,充当着spark和flink等核算引擎的角色,可是现在企业里把kafka作为核算引擎不多,还是传统用法多(数据缓冲、解耦、异步通讯),所以kafka在核算引擎全面化落地还有很长的路要走。等待kafka的一体化完成(数据解耦+数据核算和剖析)。
Kafka Streams的介绍就先到这儿了,后续会有更全面的kafka API介绍和操作,有疑问的小伙伴,欢迎给我留言哦~