一、Raft 协议是什么?
Raft 协议是一种分布式共同性算法,它用于在分布式体系中的多个节点之间达成共同性。Raft 协议的方针是供给一种相对简略、易于了解和完成的方法,以保证在网络分区、节点毛病等状况下,体系依然能够坚持共同性和可用性。
运用服务关于恳求的处理流程图:
以下是 Raft 协议的核心架构组件和流程:
1、节点人物:
- Leader:担任办理整个集群,处理客户端恳求,建议日志仿制,以及触发新的推举。
- Follower:被动节点,接纳并仿制 Leader 的日志条目,响应 Leader 的心跳和日志仿制恳求。
- Candidate:当 Follower 在推举超时时间内未收到 Leader 的心跳时,它会变成 Candidate 并建议推举。
当一个节点发动的时分,需求将本身节点信息注册到集群中Leader节点
2、领导者推举(Leader Election):
- 当集群发动或 Leader 失效时,Follower 会等候一段时间(随机化超时时间)后变成 Candidate。
- Candidate 建议推举,向其他节点发送恳求投票(RequestVote RPC)。
- 假如 Candidate 取得大多数节点的投票,它就成为新的 Leader。
3、日志仿制(Log Replication):
- Leader 处理客户端恳求,将每个恳求作为新的日志条目追加到其日志中。
- Leader 向其他节点发送 AppendEntries RPC 来仿制日志条目。
- 当日志条目被仿制到大多数节点时,Leader 将这些条方针记为已提交,并通知 Follower 运用这些更改。
4、日志压缩(Log Compaction):
- 为了削减日志的大小,Raft 答应 Leader 删去那些现已被大多数节点仿制并提交的日志条目。
5、安全性和共同性:
- Raft 保证在任何时分,只要当时任期的日志条目能够被提交。 经过领导者的推举机制和日志仿制战略,Raft 保证了集群状况的共同性。
6、成员变更(Membership Changes):
- Raft 答应在不停机的状况下更改集群的成员。
- Leader 能够向 Follower 发送装备更改的日志条目,这些更改在被仿制和提交后生效。
7、心跳和超时:
- Leader 定时向 Follower 发送心跳(Heartbeat)以维持其领导地位。
- Follower 在未收到心跳的状况下会触发新的推举。
8、日志共同性:
- Raft 经过保证一切已提交的日志条目在集群中的一切节点上都是共同的,来保护共同性。
Raft 协议的架构规划强调了简略性和易于了解,一起供给了强壮的共同性和容错能力。这种规划使得 Raft 成为了许多分布式体系和数据库的首选共同性算法。
人物转化 这幅图是首领、提名人和大众的人物切换图,我先简略总结一下:
- 大众 -> 提名人:当开端推举,或许“推举超时”时
- 提名人 -> 提名人:当“推举超时”,或许开端新的“任期”
- 提名人 -> 首领:获取大多数投票时
- 提名人 -> 大众:其它节点成为首领,或许开端新的“任期”
- 首领 -> 大众:发现自己的任期ID比其它节点分任期ID小时,会主动放弃首领方位
Raft 协议经过这些机制解决了分布式体系中的共同性问题,特别是在领导者推举和日志仿制方面。它被广泛运用于各种分布式体系和服务中,例如 etcd(一个分布式键值存储体系),它被用作 Kubernetes 的后端存储。Raft 协议的规划使得它在实际运用中既高效又可靠。
二、Raft 协议运用场景
Raft 协议作为一种分布式共同性算法,被广泛运用于需求在多个节点间坚持数据共同性的分布式体系场景中。以下是一些典型的 Raft 协议运用场景:
1、分布式存储体系:
Raft 协议被用于分布式存储体系中,以保证数据在多个节点间的共同性和可用性。例如,分布式键值存储(如 etcd、Consul)和分布式数据库(如 TiKV)都采用了 Raft 协议。
2、装备办理服务:
在装备办理服务中,Raft 用于保证集群中的一切节点都能访问到最新的装备信息。例如,Consul 供给了一个服务发现和装备的工具,它运用 Raft 来保证装备的共同性。
3、服务发现和注册:
服务发现和注册体系(如 etcd)运用 Raft 来保护服务实例的注册信息,保证客户端能够发现和连接到正确的服务实例。
4、分布式锁服务:
分布式锁服务需求在多个节点间和谐资源的访问,Raft 协议能够帮助完成一个高可用和共同性的分布式锁。
5、分布式使命调度:
在分布式使命调度体系中,Raft 能够用来推举使命调度器的领导者,保证使命分配的共同性和次序履行。
6、分布式状况机:
Raft 协议能够用来构建分布式状况机,其中每个节点都保护一个状况机的副本,Raft 保证这些状况机的状况共同。
7、分布式日志体系:
分布式日志体系(如 Apache Kafka)能够运用 Raft 来保证日志数据在多个副本之间的共同性。
8、集群办理:
在集群办理工具中,Raft 能够用于推举集群领导者,办理集群状况,以及处理集群成员的加入和退出。
9、分布式业务:
尽管 Raft 本身不直接处理分布式业务,但它能够作为分布式业务协议的一部分,用于保证业务日志的共同性。
Raft 协议因其易于了解和完成,以及在实践中的高效性和可靠性,成为了构建分布式体系时的首选共同性算法之一。在这些运用场景中,Raft 协议帮助体系在面对网络分区、节点毛病等分布式体系常见问题时,依然能够坚持数据的共同性和体系的可用性。
三、Kafka Raft(KRaft)
Kafka Raft(KRaft)与 Apache ZooKeeper 是两种不同的分布式和谐服务,它们在 Kafka 集群中扮演着不同的人物。以下是 KRaft 与 ZooKeeper 的对比:
1、依靠性:
- ZooKeeper:在 KRaft 出现之前,Kafka 严重依靠于 ZooKeeper 来办理集群的元数据,如 broker 注册、主题分区、控制器推举等。
- KRaft:KRaft 是 Kafka 内部完成的共同性协议,它答应 Kafka 集群在不依靠 ZooKeeper 的状况下运转,从而简化了 Kafka 的架构。
2、共同性协议:
- ZooKeeper:运用 ZAB(ZooKeeper Atomic Broadcast)协议,它是一个为分布式体系供给共同性服务的协议。
- KRaft:根据 Raft 共同性协议,它供给了一种更易于了解和完成的领导者推举和日志仿制机制。
3、功用和可伸缩性:
- ZooKeeper:在大型集群中,ZooKeeper 可能会成为功用瓶颈,由于它需求处理很多的客户端恳求和保护杂乱的会话状况。
- KRaft:KRaft 旨在提高 Kafka 的功用和可伸缩性,经过内部办理元数据,削减了对外部和谐服务的依靠。
4、布置和办理:
- ZooKeeper:布置和保护 ZooKeeper 集群需求额定的工作,包括装备、监控和毛病康复。
- KRaft:由于 KRaft 集成在 Kafka 中,布置和办理 Kafka 集群变得愈加简略,不再需求单独的 ZooKeeper 集群。
5、可靠性和可用性:
- ZooKeeper:ZooKeeper 供给了强共同性保证,但在推举过程中可能会有短暂的不可用性。
- KRaft:KRaft 同样供给了强共同性保证,而且经过内部的控制器集群(Controller Quorum)来提高体系的可靠性和可用性。
6、未来开展:
- ZooKeeper:随着 KRaft 的引入,Kafka 社区逐步削减了对 ZooKeeper 的依靠,这可能会影响 ZooKeeper 在 Kafka 生态体系中的地位。
- KRaft:KRaft 是 Kafka 未来开展的方向,它标志着 Kafka 朝着更轻量级、更易于办理的方向开展。
KRaft 形式的首要优势包括:
-
去中心化:Kafka 集群不再依靠于外部的 ZooKeeper 集群,简化了布置和运维。
-
功用提升:由于不再需求与 ZooKeeper 进行通信,Kafka 集群的功用得到了提升。
-
扩展性:KRaft 形式答应 Kafka 集群更灵敏地扩展,不再遭到 ZooKeeper 集群规模的限制。
-
共同性和可用性:Raft 协议保证了即使在部分控制器节点失败的状况下,集群的元数据依然能够坚持共同性和可用性。
-
简化的毛病康复:在 KRaft 形式下,Kafka 集群的毛病康复过程愈加简略和直接。
KRaft 形式在 Kafka 3.3.1 版别中被标记为能够在出产环境中运用。这意味着 Kafka 用户现在能够挑选 KRaft 形式来布置他们的 Kafka 集群,以取得更好的功用和更简略的运维体验。但是,需求注意的是,KRaft 形式目前依然是一个相对较新的功用,因此在出产环境中运用时,建议亲近重视 Kafka 社区的更新和最佳实践。
四、根据KRaft 协议布置Kafka(不依靠与Zookeeper)
关于更多为啥会扔掉Zookeeper的原因能够参阅我这篇文章:为何Kafka在2.8版别开端会“扔掉”Zookeeper?
首要来看一下KRaft在体系架构层面和之前的版别有什么区别。KRaft形式提出往来不断zookeeper后的kafka整体架构入下图是前后架构图对比:
1)下载 Kafka
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
2)装备修正
修正kafka目录下的config/kraft/server.properties文件。三个服务器都需求修正。 特别注意:每个服务器(broker)上的装备里的node.id有必要是数字,而且不能重复。
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present. See config/kraft/README.md for details.
#
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode
# 节点人物(修正)
process.roles=broker,controller
# The node id associated with this instance's roles
# 节点ID,和节点所承担的人物想相关(修正)
node.id=1
# The connect string for the controller quorum
# 装备标识有哪些节点是 **Quorum** 的投票者节点
controller.quorum.voters=1@192.168.182.110:9093,2@192.168.182.111:9093,3@192.168.182.112:9093
############################# Socket Server Settings #############################
# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://:9092
# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
# 这里我修正了日志文件的路径,默许是在/tmp目录下的
log.dirs=/data/kraft-combined-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
三个broker的装备根本都和上面的装备一样,不同的地方就是node.id:
kraft1:node.id=1
kraft2:node.id=2
kraft3:node.id=3
别的还有两处需求修正。
- controller.quorum.voters=1@kraft1:9093,2@kraft2:9093,3@kraft3:9093【以逗号分隔的{id}@{host}:{port}投票者列表。例如:1@localhost:9092,2@localhost:9093,3@localhost:9094】
- log.dirs=/home/vagrant/kraft-combined-logs【日志路径,默许是/temp下的文件下,出产环境不要运用,由于linux会整理/tmp目录下的文件,会形成数据丢掉】
Process.Roles:
每个Kafka服务器现在都有一个新的装备项,叫做Process.Roles, 这个参数能够有以下值:
- 假如Process.Roles = Broker, 服务器在KRaft形式中充任 Broker。
- 假如Process.Roles = Controller, 服务器在KRaft形式下充任 Controller。
- 假如Process.Roles = Broker,Controller,服务器在KRaft形式中一起充任 Broker 和Controller。
- 假如process.roles 没有设置。那么集群就假定是运转在ZooKeeper形式下。
如前所述,目前不能在不从头格局化目录的状况下在ZooKeeper形式和KRaft形式之间来回转化。一起充任Broker和Controller的节点称为“组合”节点。
关于简略的场景,组合节点更容易运转和布置,能够避免多进程运转时,JVM带来的相关的固定内存开支。要害的缺点是,控制器将较少地与体系的其余部分阻隔。例如,假如代理上的活动导致内存不足,则服务器的控制器部分不会与该OOM条件阻隔。
Quorum Voters
-
体系中的一切节点都有必要设置
controller.quorum.voters
装备。这个装备标识有哪些节点是 Quorum 的投票者节点。一切想成为控制器的节点都需求包括在这个装备里面。这类似于在运用ZooKeeper时,运用ZooKeeper.connect装备时有必要包括一切的ZooKeeper服务器。 -
但是,与ZooKeeper装备不同的是,
controller.quorum.voters
装备需求包括每个节点的id。格局为: id1@host1:port1,id2@host2:port2。
3)生成集群ID
随便找一个服务器,进入kafka目录,运用kafka-storage.sh生成一个uuid,一个集群只能有一个uuid!!!
./bin/kafka-storage.sh random-uuid
# 这个ID就能够作为集群的ID
# AxAUvePAQ364y4mxggF35w
4)用 kafka-storage.sh 格局化存储数据的目录
三个机器上都需求履行
#./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
./bin/kafka-storage.sh format -t AxAUvePAQ364y4mxggF35w -c config/kraft/server.properties
5)用bin/kafka-server-start.sh 发动Kafka Server
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
6)测试验证
./bin/kafka-topics.sh --create --topic kafkaraftTest --partitions 1 --replication-factor 1 --bootstrap-server 192.168.182.110:9092
检查topic
./bin/kafka-topics.sh --list --bootstrap-server 192.168.182.110:9092
./bin/kafka-topics.sh --describe --topic kafkaraftTest --bootstrap-server 192.168.182.110:9092
有任何疑问也可重视我大众号:大数据与云原生技术分享,进行技术交流,如本篇文章对您有所帮助,麻烦帮助一键三连(点赞、转发、保藏)~