上一篇文章叙述了以RocketMQ
源码的方法发起NameServer
和broker
进行单机布置及收发消息的流程,其实就是简略的quickstart
,后端君在实际操作过之后就现已可以依据RocketMQ
进行简略业务的音] l _ j &讯传递,结束比如异步消费搜集日志这样的小功用了。
在RocketMQ
的官网上还0 m E Y有许多不同品种的消息示例,建议想要学习RocketMQ
的同学们先去手写一下这些Demo
,了解过如次序消息、广播消息、守时消息、批消息等消息类型的特性之后再去看源码。
接下来便c j S !是学习RocketMQ
的第二天正文内容,今日咱们来聊聊Name~ ! f BServer
的发起C : o r流程以及NameServer
首要功用的源码分析0 ? v ? R 8。
NameServer的总述
NameServer
是一个供给轻量g / ` ! u P # – 2级服务发现和路由的服务器,它首H % : S .要包括两个功用:
-
代理处理, NameServer
从Broker
集群承受注册,并供给心跳机制来检查Broker
是否活动。 -
路由处理,每个称谓服务器将保存关于 Broker
集群的整个路由& q 3 d r x 4 K E信息和用于客户机查询的行列信息。
咱们可以将NameServM 3 i B ! D Y l &er
就当成是一个轻量级的注册中心。事实上,Q ( @ 8 6 6 3从前Rocketo G $ J a ) I ,MQ
就用过Zookeeper
来作r w E 为注册中心,后来因为RocketMQ
本身架构的原因不7 8 R W : w需求像Zookee5 L X uper
那样的推举机制来选择master
节点,所以移除了Zookeeper
依靠,并运用NameServer
来替代。
作为RoS y & 5 I ; DcketMQ
的注册中心,NameServer
接搜集群中所有Broker
的注册,并每隔10s供给心跳机制来检查Broker
的是否,假如Broker
有F v j V j 2 O w超越120s没有更新,那么将被视为失效并从集群中移除。
了解了NameServer
的功用之后,后端君不由会想,NameServer
的这些功用是怎么完成的?这就需求翻阅源码了。
NG : M ? $ h U 6 #ameserver发起流程分析
上一篇文章《快速入门》中也S : g h 3提到过,发起NameServer
需求找到nam[ W [ esrv
包中的发起类NamesrcStartup
类,而研讨N_ N a u o wameServer
的发起流程也需求从这个类的main
方法Z Y V S g ` [ t初步。
解析配备
发起NameServer
的第一. V y K 0 Q & 0步是结构一个NamesrvController
实例,这个类是NameServerK X 9 u ) - P + K
的中心类。
public static NamesrvController main0(String[] args) {
try {
// 结构 NamesrvController 类
NamesrvController coL x Y c u C 8 o 4ntroller = createNamesrvController(args);
// 初始化、发起 NamesrvCm 9 S W V g [ Rontroller 类
start(controller);
StriA u &ng tip = "The Name Server bb Z 0 @oot success. serializeType=" + RemotingCommand.getSeriE + v z ualizeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catchR g T + w [ E _ (Throwabq 3 j 8le e) {
e.A V K { dprintStackTrace();
System.ex~ j H p i : it(-1);
}
return null;
}
而createNamesrvController
方法,就是从指令行接收参数,然后将解析成装R W F 5 Z ; B J备类NamesrvConfig
和NettyServerConfig
。
final NamesrvCos M $ enfig namesrvConfig = new NamesrvConfig();
final Netr Z D I i G & N xtyServerCy } 2 K I Vonfig netL C ) q F } | FtyServerConH N t +fig = new NettyServerConfig();
// RocketMQ 默许端口为9876
nettyServerCo] P l W G !nfig.setListenPort(9876);
// 通过 -c 参数指定配备文件
if (commandLine.hasOption('c? E = , 4 ~ :')) {
String fi0 ; @ A d 0le = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyS} c y L = U * | !erverConfig);
namesrvConfig.setConfigStorePathj + .(file);
System.z y T 8 8 W V I 5out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 通过 -p 参数打印当时装V t ) [备,并退出程序
if (commandLine.hasOption([ 8 w X E A H V !'p')) {
InternalLogger console = InternalLoggerFactory.getLogger(Logge! J 4rName.NAMESRV_CONSOLE_NAME);
MixAll.printObject/ _ % X V NProperties(console,H , ] namesrvConfig);
MixAll.printObjectProperties(console, nettyServerCon) = Efig);
System.exit(0): s / G 8 M 5 e;
}
// 通过"--具体特色名 特色值"指定特色值
MixAll.properties2Obj3 * t 9ect(ServerUtil.commandLine2Properties(comz t 1 zmandLine), namesrvConfig);
咱们知道在指令行中运行RocketMQj - w C B + z ` 8
是可以指定参数的,它的原理就是上面代码展示的那样。
通过-c
指令可以指定配备文件,将配备文件中的内容解析成java.util.Properties
类,然后赋值给NaI : DmesrvConfig
和NettyServerConfig
类结束配备文件的解析与映射。
假如指定了-p
指令,则会在控制台打印配备信息,然后程序直接退出。
除此之外还可以运用-n
参数指定namesrvAddr
的值,这是在org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions
方法中指定的参数3 J 3 !,不在本节的评论范围内,有爱好的同学, 1 `可以自己去翻阅源码调试下看看。
当结束配备特色的映射,就会依据配备类N. I A I 7 G wamesrvConfig
和NettyServerConfig
结构一个NamesrvController
实例。
fin] * ; s 7 } Pal NamesrvController controller8 V L ) B M / = new NamesrvController(namesrvConfig, nettyServerCog 3 7 Znfig);
初始化及心跳机制
发起NameServer
的第二步是通过NamesrvController#initialize
结束初始化。
public boolean init# L K @ Qialize() {
// 加载`kvConfig.? d Yjss @ % d : Bon`装 ] h备文件中的`KV`配备,然后将这些配备W & h : p M放到`KVConfigManager#configTable`特色中
this.kvConfigManage] a ^ F kr.load();
// 依据`NettyServerConfig`发起一个`Netty`服务器
this.remotingServer = new Netty d ! F F B BRemotingServerx V f . E I + ;(this.nettyServerConfig, this.broB % , M ! _ TkerHousekeepingService);
//5 ! Q % ` ~ E p u 初始化负责处理`Netty`网络交互数据的线程池
this.remotingExecutor = Executors.newFixedThreadPo^ 5 D V ` , 3ol(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExeO ; w | ( HcutorThread_"));
this.registerProcessor();5 & $ p ~ :
// 注册心跳机制线程池
this.scheduledExe~ 7 Q ) E # FcutorService.scheduleAtFixedRate(new Runnable() {
@Override
public voi1 k - # % / G }d run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 注册打印KV配备线程池
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(% X ) t +) {
@Override
publicb u T void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 省掉以下代码...
return true;
}
在初始化NamesrvController
过程中,会注册一个心跳机制的线程池,它会在发起后5秒初步每隔10秒扫描一次不生动的bN A 4 . /roker
。
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> i: Z l # ~ ^ | r Xt = thi/ h m G B 9s.brokerLiveTable.entrySet().iterator();
while (iw k a M O 8t.I s l F q PhasNext6 2 c & B()) {
Entry<String, BrokerLiveInfo> nextP S e 6 W C = it.next();
long lasts e s y b | } = next.getValue().getLastUpdateTimen u + | { lstamp();
// BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;即120秒
if ((last + BROKER_CHANNELx x 4 i 1 ^_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChanns [ ) 1 s N N [ Nel());
//v P S A - f h 将该 broker 从 brokerLiveTable 中移除
it.reZ 5 , k = 1 @move();
lF 4 # v Sog.warn("The broker chI S E : bannel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDe W c P ;estroy(next.getKey(), next.getValue().getChannel());
}
}
}
可以看到,在w b v B : K [ jscanNotActiveBrT w h h Aoker
方法中,Namt } feServer
会遍历RouteInfoManager#brokerLiveTable
这个特色。
private final HashMap<String/* brd Z | | S 2 $okerAddr */, BrokerLiveInfo> brokerLiveTable;
class I 8 : BrokerLiveInfo {
// broker上一次更新的生动时间戳
private long lastUpdateTimestamp;
private DataVersionG x , ) c J b dataVer] r ?sion;
private Channel channel;
private String haServerAddrl $ x % I ~;
}
RouteInfoManager#brokerLiveTable
特色存储的是集群中所有broker
的生动信息,首要是BrokerLiveInfo#lastUpdateTimestamp
特色,它描述了broker
上一次更0 B k | ^ L新的生动时间戳。若lastUpdateTimestamp
特色超越120秒未更新,则该broker
会被视为失效并从brokerLiveTable
中移除。
除了心跳机制的线程池外,还会g P S N $ a . 1 }注册别的一个线程池,它会每隔10秒打印一次所有的KV
装G ` + q p | *备信息。
典雅停机
NameServer
发起x B ) g ( d o a的终究一步,是注m t |册了a – 0 V & l t ~ J一个JVM
的钩子函数,它会在JVM
封闭之前实行。这个钩子函数的作用是开释资源,如封闭NeG _ n y /tty
服务器,封闭线程池等。
RuR G ; N kntime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callabp 6 0 q Z Tle<Void>(| p P B * :) {
@Override
public Void call() throwsR O & F H 2 @ Exception {
controller.shutdown();
return nulC x Ol;
}
}));
小p g o l结
本文Y m N m r叙述了NameServer
的作用,同时依据其发起类NamesrvStartup
类分析了发起流程,以及心跳机制和典雅停机的完成+ e X & A原理。
希望可以帮忙到咱们。
参考文献
-
RocketMQ Architecture -
RoI 6 ) Y ~ r ncketMQ Namesrv发起流程
版权声明:本文为Planez $ = _ u I Oswalker23所创,转载请带上原文链接,感谢。
本文已上传S B U个人公众号,欢迎扫码重视。
本文运用 m7 = E U z pdnice 排版