咱们好呀,我是楼仔。
现如今市面上注册中心的轮子许多,我实际运用过的就有三款:Eureka、Gsched、Nacos,由于当时参与 Nacos 集群的保护和开发工作,期间也参与了 Nacos 社区的一些开发和 Bug Fix 工作,进程中对 Nacos 原理有了必定的堆集,今日给咱们共享一下 Nacos 动态服务发现的原理。
不 BB,上文章目录:
01 什么是动态服务发现?
服务发现是指运用一个注册中心来记录分布式系统中的全部服务的信息,以便其他服务能够快速的找到这些已注册的服务。
在单体运用中,DNS+Nginx 能够满意服务发现的要求,此时服务的IP列表装备在 nginx 上。在微服务架构中,由于服务粒度变的更细,服务的上下线愈加频繁,咱们需要一款注册中心来动态感知服务的上下线,而且推送IP列表变化给服务顾客,架构如下图。
02 Nacos 完成动态服务发现的原理
Nacos完成动态服务发现的中心原理如下图,咱们接下来的内容将围绕这个图来进行。
2.1 通讯协议
整个服务注册与发现进程,都离不开通讯协议,在1.x的 Nacos 版别中服务端只支持 http 协议,后来为了提高功用在2.x版别引入了谷歌的 grpc,grpc 是一款长衔接协议,极大的减少了 http 恳求频繁的衔接创建和销毁进程,能大幅度提高功用,节约资源。
据官方测试,Nacos服务端 grpc 版别,比较 http 版别的功用提高了9倍以上。
2.2 Nacos 服务注册
简略来讲,服务注册的目的便是客户端将自己的ip端口等信息上报给 Nacos 服务端,进程如下:
-
创建长衔接:Nacos SDK 经过Nacos服务端域名解析出服务端ip列表,挑选其间一个ip创建 grpc 衔接,并守时检查衔接状态,当衔接断开,则自动挑选服务端ip列表中的下一个ip进行重连。
-
健康检查恳求:在正式建议注册之前,Nacos SDK 向服务端发送一个空恳求,服务端回应一个空恳求,若Nacos SDK 未收到服务端回应,则以为服务端不健康,并进行必定次数重试,假如都未收到回应,则注册失利。
-
建议注册:当你检查Nacos java SDK的注册方法时,你会发现没有返回值,这是由于Nacos SDK做了补偿机制,在实在给服务端上报数据之前,会先往缓存中刺进一条记录表明开始注册,注册成功之后再从缓存中符号这条记录为注册成功,当注册失利时,缓存中这条记录是未注册成功的状态,Nacos SDK敞开了一个守时使命,守时查询异常的缓存数据,重新建议注册。
Nacos SDK注册失利时的自动补偿机制时序图。
相关源码如下:
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
//添加redo日志
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
doRegisterService(serviceName, groupName, instance);
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
//向服务端建议注册
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
//符号注册成功
redoService.instanceRegistered(serviceName, groupName);
}
履行补偿守时使命RedoScheduledTask。
@Override
public void run() {
if (!redoService.isConnected()) {
LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
return;
}
try {
redoForInstances();
redoForSubscribes();
} catch (Exception e) {
LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
}
}
private void redoForInstances() {
for (InstanceRedoData each : redoService.findInstanceRedoData()) {
try {
redoForInstance(each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
each.getGroupName(), each.getServiceName(), e);
}
}
}
-
服务端数据同步(Distro协议):Nacos SDK只会与服务端某个节点建立长衔接,当服务端接受到客户端注册的实例数据后,还需要将实例数据同步给其他节点。Nacos自己完成了一个一致性协议名为Distro,服务注册的时分会触发Distro一次同步,每个Nacos节点之间会守时互相发送Distro数据,以此确保数据最终一致。
-
服务实例上线推送:Nacos服务端收到服务实例数据后会将服务的最新实例列表经过grpc推送给该服务的一切订阅者。
-
服务注册进程源码时序图:整理了一下服务注册进程整体时序图,对源码完成感兴趣的能够按照根据这个时序图view一下源码。
2.3 Nacos 心跳机制
现在主流的注册中心,比方Consul、Eureka、Zk包括咱们公司自研的Gsched,都是经过心跳机制来感知服务的下线。Nacos也是经过心跳机制来完成的。
Nacos现在SDK保护了两个分支的版别(1.x、2.x),这两个版别心跳机制的完成不一样。其间1.x版别的SDK经过http协议来守时向服务端发送心跳保持自己的健康状态,2.x版别的SDK则经过grpc自身的心跳机制来保活,当Nacos服务端接受不到服务实例的心跳,会以为实例下线。如下图:
grpc监测到衔接断开事情,发送ClientDisconnectEvent。
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
//衔接断开,发送衔接断开事情
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
}
移除客户端注册的服务实例
public class ClientServiceIndexesManager extends SmartSubscriber {
@Override
public void onEvent(Event event) {
//接纳失掉衔接事情
if (event instanceof ClientEvent.ClientDisconnectEvent) {
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
Client client = event.getClient();
for (Service each : client.getAllSubscribeService()) {
removeSubscriberIndexes(each, client.getClientId());
}
//移除客户端注册的服务实例
for (Service each : client.getAllPublishedService()) {
removePublisherIndexes(each, client.getClientId());
}
}
//移除客户端注册的服务实例
private void removePublisherIndexes(Service service, String clientId) {
if (!publisherIndexes.containsKey(service)) {
return;
}
publisherIndexes.get(service).remove(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
}
2.4 Nacos 服务订阅
当一个服务产生上下线,Nacos怎么知道要推送给哪些客户端?
Nacos SDK 提供了订阅和撤销订阅方法,当客户端向服务端建议订阅恳求,服务端会记录建议调用的客户端为该服务的订阅者,同时将服务的最新实例列表返回。当客户端建议了撤销订阅,服务端就会从该服务的订阅者列表中把当时客户端移除。
当客户端建议订阅时,服务端除了会同步返回最新的服务实例列表,还会异步的经过grpc推送给该订阅者最新的服务实例列表,这样做的目的是为了异步更新客户端本地缓存的服务数据。
当客户端订阅的服务上下线,该服务一切的订阅者会马上收到最新的服务列表而且将服务最新的实例数据更新到内存。
咱们也看一下相关源码,服务端接纳到订阅数据,首要保存到内存中。
@Override
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
//校验长衔接是否正常
if (!clientIsLegal(client, clientId)) {
return;
}
//保存订阅数据
client.addServiceSubscriber(singleton, subscriber);
client.setLastUpdatedTime();
//发送订阅事情
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
//处理订阅操作
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);
}
}
然后发布订阅事情。
private void addSubscriberIndexes(Service service, String clientId) {
//保存订阅数据
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Fix #5404, Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
//发布订阅事情
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
服务端自己消费订阅事情,而且推送给订阅的客户端最新的服务实例数据。
@Override
public void onEvent(Event event) {
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// If service is subscribed by one client, only push this client.
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
2.5 Nacos 推送
推送方法
前面说了服务的注册和订阅都会产生推送(服务端->客户端),那推送到底是怎么完成的呢?
在早期的Nacos版别,当服务实例变化,服务端会经过udp协议将最新的数据发送给客户端,后来发现udp推送有必定的丢包率,所以新版别的Nacos支持了grpc推送。Nacos服务端会自动判别客户端的版别来挑选哪种方法来进行推送,假如你运用1.4.2以前的SDK进行注册,那Nacos服务端会运用udp协议来进行推送,反之则运用grpc。
推送失利重试
当发送推送时,客户端可能正在重启,或许衔接不稳定导致推送失利,这个时分Nacos会进行重试。Nacos将每个推送都封装成一个使命目标,放入到行列中,再敞开一个线程不停的从行列取出使命履行,履行之前会先删除该使命,假如履行失利则将使命重新添加到行列,该线程会记录使命履行的时刻,假如超过1秒,则会记录到日志。
推送部分源码
添加推送使命到履行行列中。
private static class PushDelayTaskProcessor implements NacosTaskProcessor {
private final PushDelayTaskExecuteEngine executeEngine;
public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
this.executeEngine = executeEngine;
}
@Override
public boolean process(NacosTask task) {
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
}
推送使命PushExecuteTask 的履行。
public class PushExecuteTask extends AbstractExecuteTask {
//..省略
@Override
public void run() {
try {
//封装要推送的服务实例数据
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
//假如是服务上下线导致的推送,获取一切订阅者
//假如是订阅导致的推送,获取订阅者
for (String each : getTargetClientIds()) {
Client client = clientManager.getClient(each);
if (null == client) {
// means this client has disconnect
continue;
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
//推送给订阅者
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
//当推送产生异常,重新将推送使命放入履行行列
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
//假如是服务上下线导致的推送,获取一切订阅者
//假如是订阅导致的推送,获取订阅者
private Collection<String> getTargetClientIds() {
return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)
: delayTask.getTargetClients();
}
履行推送使命线程InnerWorker 的履行。
/**
* Inner execute worker.
*/
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (!closed.get()) {
try {
//从行列中取出使命PushExecuteTask
Runnable task = queue.take();
long begin = System.currentTimeMillis();
//履行PushExecuteTask
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}
}
2.6 Nacos SDK 查询服务实例
服务顾客首要需要调用Nacos SDK的接口来获取最新的服务实例,然后才能从获取到的实例列表中以加权轮询的方法挑选出一个实例(包括ip,port等信息),最终再建议调用。
前面已经说到Nacos服务产生上下线、订阅的时分都会推送最新的服务实例列表到当客户端,客户端再更新本地内存中的缓冲数据,所以调用Nacos SDK提供的查询实例列表的接口时,不会直接恳求服务端获取数据,而是会优先运用内存中的服务数据,只要内存中查不到的状况下才会建议订阅恳求服务端数据。
Nacos SDK内存中的数据除了接受来自服务端的推送更新之外,自己本地也会有一个守时使命守时去获取服务端数据来进行兜底。Nacos SDK在查询的时分也了容灾机制,即从磁盘获取服务数据,而这个磁盘的数据其实也是来自于内存,有一个守时使命守时从内存缓存中获取然后加载到磁盘。Nacos SDK的容灾机制默许关闭,可经过设置环境变量failover-mode=true来敞开。
架构图
用户查询流程
查询服务实例部分源码
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
//这里默许传过来是true
if (subscribe) {
//从本地内存获取服务数据,假如获取不到则从磁盘获取
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
//假如从本地获取不到数据,则调用订阅方法
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
//适用于不走订阅,直接从服务端获取数据的状况
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
}
//从本地内存获取服务数据,假如敞开了毛病转移则直接从磁盘获取,由于当服务端挂了,本地启动时内存中也没有数据
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
//毛病转移则直接从磁盘获取
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//返回内存中数据
return serviceInfoMap.get(key);
}
3. 结语
本篇文章向咱们介绍 Nacos 服务发现的基本概念和中心能力以及完成的原理,旨在让咱们对 Nacos 的服务注册与发现功用有更多的了解,做到心中有数。
这篇文章原作者是我好友,小米大佬胡俊,假如对 Nacos 开源感兴趣的同学,也能够和我联络。
精品 PDF 获取
对于一个想去大厂的后端研发,能否熟练掌握高并发是判别他优异与否的重要规范之一,这本手册《高并发手册》相当经典,触及缓存、异步、限流、熔断、降级、分片、雪崩、主从、一致性等一系列知识点。
PDF 获取地址:mp.weixin.qq.com/s/twRVqcjqb…
我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆普通,你好,陌生人,一同共勉。