1、介绍
Apache Avro 是一种高效的数据序列化体系,用于在不同的应用程序和渠道之间传输和存储数据。它供给了一种紧凑且高效的二进制数据编码格局,比较其他常见的序列化方法,Avro能够完成更快的序列化和更小的数据存储。
而Confluent Schema Registry是由Confluent公司供给的一个开源组件,旨在处理分布式体系中的数据模式演化和兼容性的问题。它是建立在Apache Avro之上的一个服务,能够用于集中管理和存储Avro数据的模式(Schema),确保分布式体系中的数据一致性和兼容性。它广泛应用于事件流处理渠道(如Kafka),为数据流的可靠性和互操作性供给了支持。
本文将介绍如安在Spring Boot应用程序中整合Apache Avro和Confluent Schema Registry,以完成高效的数据序列化和管理。
本文代码示例: GitHub库房地址
2、Confluent Schema
1、下载
软件下载地址:Previous Versions – Confluent
本次运用:confluent-community-7.3.3 社区版,下载上传至Linux解压。
2、修正装备
修正装备文件:在 confluent-7.3.3/etc
文件夹下
confluent-7.3.3/etc/schema-registry/schema-registry.properties
# 装备Confluent Schema Registry 服务的访问IP和端口
listeners=http://0.0.0.0:8081
# 修正 Kafka集群指定引导服务器
kafkastore.bootstrap.servers=PLAINTEXT://xx.xx.xx.xx:9092,xx.xx.xx.xx:9192
# kafkastore.connection.url 装备zookeeper地址方法已弃用
# 存储 schema 的 topic
kafkastore.topic=_schemas
# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
3、发动
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
4、验证
curl -X POST 'http://localhost:8081/subjects/topic-test/versions' \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\": \"string\"}"}'
返回成果:{"id":1}
2、Springboot整合
1、引进xml
<dependencies>
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.7</version>
</dependency>
<!--avro-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-schema-registry-client -->
<!--schema-registry-client-->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.4.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
</dependencies>
<!--schema-registry-client 远程库房-->
<repositories>
<!-- other maven repositories the project -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
2、导入Avro构建插件
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<!--schema 文件所在目录-->
<sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
<!--依据schema 文件生成的类文件目录-->
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
3、生成实体类
-
在 ${project.basedir}/src/main/resources/avro/ 下创建
user.avsc
文件。 -
通过 Maven
mvn compile
命令生产实体类
{
"namespace": "com.jinunn.kraft.avro", // 实体类寄存途径
"type": "record",
"name": "User", // 实体类文件名
"fields": [ // 实体类属性
{
"name": "id", // 属性名
"type": "int" // 类型
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
}
4、SpringBoot装备文件
server:
port: 8086
spring:
application:
name: kafka
kafka:
# 集群地址
bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9192,xx.xx.xx.xx:9292
producer:
# 设置key的序列化类
key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
# 设置value的序列化类
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
# ack策略
# 0:生产者发送音讯就不管了,效率高,但是容易丢数据,且没有重试机制
# 1:音讯发送到Leader并落盘后就返回,假如Leader挂了而且Follower还没有同步数据就会丢失数据
#-1:音讯要所有副本都拷贝才返回,确保数据不丢失(但是有或许重复消费)
acks: 1
# 失败重试次数
retries: 3
# 批量提交的数据巨细
batch-size: 16384
# 生产者暂存数据的缓冲区巨细
buffer-memory: 33554432
consumer:
# key的反序列化类
key-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
# 是否主动提交偏移量,假如要手动承认音讯,就要设置为false
enable-auto-commit: false
# 消费音讯后距离多长时间提交偏移量(ms)
auto-commit-interval: 100
# 默许的顾客组,假如不指定就会用这个
group-id: groupId
# kafka意外宕机时的音讯消费策略
# earliest:当各分区下有已提交的offset时,从提交的offset开端消费;无提交的offset时,从头开端消费
# latest:当各分区下有已提交的offset时,从提交的offset开端消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开端消费;只需有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: latest
listener:
# 手动承认音讯
ack-mode: manual_immediate
# 顾客运行的线程数
concurrency: 2
properties:
# confluent schema 地址
schema:
registry:
url: http://xx.xx.xx.xx:8081
5、音讯生产者
@RestController
@RequestMapping("/send")
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class Producer {
private final KafkaTemplate<String, User> kafkaTemplate;
@GetMapping("/test")
public void sendMsg() {
for (int i = 0; i < 10; i++) {
User user = new User();
user.setId(i);
user.setName("Jin" + i);
user.setAge(35 + i);
kafkaTemplate.send(AvroConsumer.TOPIC_NAME, user);
}
}
}
6、音讯顾客
@Slf4j
@Component
public class AvroConsumer {
public static final String TOPIC_NAME = "test";
@KafkaListener(topics = TOPIC_NAME, groupId = "test-group")
public void consume(ConsumerRecord<String, User> record, Acknowledgment ack) {
log.info("value #=>:{}", record.value());
// 手动提交ack
ack.acknowledge();
}
}
7、成果