在上一篇13、Nacos 源码剖析-Distro协议(上)中,咱们剖析了和Distro
协议相关的一些中心类,但是剖析的更多是作为建议方的一个处理逻辑,本篇站在Dirstro
协议的接收方的角度持续剖析接收到之后恳求后的处理逻辑。
回想一下上篇中,咱们剖析到clusterRpcClientProxy.sendRequest(member, request)
后,就未再持续往下持续盯梢了,本次接着这部分代码持续深入,首先咱们确认的是这个request
是DistroDataRequest
类型,然后找到对应request
的handler
便是他的处理逻辑。
那DistroDataRequest
的处理逻辑便是DistroDataRequestHandler
的handle
办法中。
@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
switch (request.getDataOperation()) {
case VERIFY:
// 处理验证恳求
return handleVerify(request.getDistroData(), meta);
case SNAPSHOT:
// 处理快照恳求
return handleSnapshot();
case ADD:
case CHANGE:
case DELETE:
// 处理数据变化的恳求
return handleSyncData(request.getDistroData());
case QUERY:
// 处理查询数据的恳求
return handleQueryData(request.getDistroData());
default:
// 默许的回来
return new DistroDataResponse();
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
DistroDataResponse result = new DistroDataResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("handle distro request with exception");
return result;
}
}
上面的代码能够看出,DistroDataRequest
有这多种数据操作类型,根据不同的类型有着不同的处理方式。其处理方式别离有处理验证恳求
,处理快照恳求
,处理数据变化的恳求
,处理查询数据的恳求
。下面咱们逐个剖析下。
handleVerify
private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
DistroDataResponse result = new DistroDataResponse();
// 运用distroProtocol进行验证
if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
}
return result;
}
代码中直接运用了distroProtocol.onVerify
办法。
public boolean onVerify(DistroData distroData, String sourceAddress) {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),
distroData.getDistroKey());
}
String resourceType = distroData.getDistroKey().getResourceType();
// 寻找对应的处理器履行,这儿回想一下DistroClientComponentRegistry#doRegister()办法,在register中注册的处理器,实践便是DistroClientDataProcessor
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
return false;
}
// 通过DistroClientDataProcessor履行验证
return dataProcessor.processVerifyData(distroData, sourceAddress);
}
既然咱们知道了dataProcessor
便是DistroClientDataProcessor
,那咱们接着往下看
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
// 对数据进行反序列化为DistroClientVerifyInfo对象
DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
// 通过clientManager进行验证
if (clientManager.verifyClient(verifyData)) {
return true;
}
Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
return false;
}
clientManager
是一个托付类,在Distro
协议中,托付的便是EphemeralIpPortClientManager
,由于ephemeral
默许是true
(在1、Nacos 服务注册客户端源码剖析中剖析过)
@Override
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
String clientId = verifyData.getClientId();
IpPortBasedClient client = clients.get(clientId);
if (null != client) {
// 旧版本的长途节点将始终运用零修订进行验证
if (0 == verifyData.getRevision() || client.getRevision() == verifyData.getRevision()) {
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
return true;
} else {
Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] IpPortBasedClient[{}] revision local={}, remote={}",
client.getClientId(), client.getRevision(), verifyData.getRevision());
}
}
return false;
}
如果版本一致的话,就会履行ClientBeatUpdateTask
,看下ClientBeatUpdateTask
的run
办法
@Override
public void run() {
long currentTime = System.currentTimeMillis();
for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {
((HealthCheckInstancePublishInfo) each).setLastHeartBeatTime(currentTime);
}
client.setLastUpdatedTime();
}
这个办法便是更新了client
的最新的更新时刻。
handleSnapshot
private DistroDataResponse handleSnapshot() {
DistroDataResponse result = new DistroDataResponse();
// 运用的是distroProtocol的onSnapshot办法
DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
result.setDistroData(distroData);
return result;
}
代码中直接运用了distroProtocol.onSnapshot
办法。
public DistroData onSnapshot(String type) {
// 寻找对应的DistroDataStorage履行,这儿回想一下DistroClientComponentRegistry#doRegister()办法,在register中的registerDataStorage,实践便是DistroClientDataProcessor
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
if (null == distroDataStorage) {
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
return new DistroData(new DistroKey("snapshot", type), new byte[0]);
}
return distroDataStorage.getDatumSnapshot();
}
没想到吧,DistroDataStorage
也是DistroClientDataProcessor
,持续往下看
public DistroData getDatumSnapshot() {
List<ClientSyncData> datum = new LinkedList<>();
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
continue;
}
// 获取SyncData
datum.add(client.generateSyncData());
}
ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
snapshot.setClientSyncDataList(datum);
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot);
// 回来了clientManager中一切client的ClientSyncData
return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}
回来了一切实例的ClientSyncData
。那generateSyncData()
是生成了什么数据呢?
@Override
public ClientSyncData generateSyncData() {
List<String> namespaces = new LinkedList<>();
List<String> groupNames = new LinkedList<>();
List<String> serviceNames = new LinkedList<>();
List<String> batchNamespaces = new LinkedList<>();
List<String> batchGroupNames = new LinkedList<>();
List<String> batchServiceNames = new LinkedList<>();
List<InstancePublishInfo> instances = new LinkedList<>();
List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();
BatchInstanceData batchInstanceData = new BatchInstanceData();
for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
InstancePublishInfo instancePublishInfo = entry.getValue();
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;
batchInstancePublishInfos.add(batchInstance);
buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);
batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);
} else {
namespaces.add(entry.getKey().getNamespace());
groupNames.add(entry.getKey().getGroup());
serviceNames.add(entry.getKey().getName());
instances.add(entry.getValue());
}
}
ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
data.getAttributes().addClientAttribute(REVISION, getRevision());
return data;
}
ClientSyncData
便是一个实例的一切的特点信息,包括namespace
,groupName
,serviceName
等信息。
handleSyncData
private DistroDataResponse handleSyncData(DistroData distroData) {
DistroDataResponse result = new DistroDataResponse();
// distroProtocol承受distroData
if (!distroProtocol.onReceive(distroData)) {
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("[DISTRO-FAILED] distro data handle failed");
}
return result;
}
仍是distroProtocol
处理
public boolean onReceive(DistroData distroData) {
Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
distroData.getDistroKey());
String resourceType = distroData.getDistroKey().getResourceType();
// 获取到DistroClientDataProcessor
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
return dataProcessor.processData(distroData);
}
依然是调用DistroClientDataProcessor
的processData
办法
@Override
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
// 反序列化ClientSyncData
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncData.class);
// 处理新增和更新的办法
handlerClientSyncData(clientSyncData);
return true;
case DELETE:
String deleteClientId = distroData.getDistroKey().getResourceKey();
Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
// 删去则断开衔接
clientManager.clientDisconnected(deleteClientId);
return true;
default:
return false;
}
}
在新增的和修改的时分调用了handlerClientSyncData
办法。
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO
.info("[Client-Add] Received distro client sync data {}, revision={}", clientSyncData.getClientId(),
clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
// 同步客户端的衔接
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
// 对client更新
upgradeClient(client, clientSyncData);
}
syncClientConnected
这个syncClientConnected
运用的是EphemeralIpPortClientManager
的syncClientConnected
@Override
public boolean clientConnected(String clientId, ClientAttributes attributes) {
// 创立一个client,并进行衔接
return clientConnected(clientFactory.newClient(clientId, attributes));
}
@Override
public IpPortBasedClient newClient(String clientId, ClientAttributes attributes) {
long revision = attributes.getClientAttribute(REVISION, 0);
// 创立了一个基于ip和端口的客户端
IpPortBasedClient ipPortBasedClient = new IpPortBasedClient(clientId, true, revision);
ipPortBasedClient.setAttributes(attributes);
return ipPortBasedClient;
}
@Override
public boolean clientConnected(final Client client) {
clients.computeIfAbsent(client.getClientId(), s -> {
Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
// 对客户端进行初始化
ipPortBasedClient.init();
return ipPortBasedClient;
});
return true;
}
public void init() {
if (ephemeral) {
beatCheckTask = new ClientBeatCheckTaskV2(this);
// 守时,每隔5s做一次健康检查
HealthCheckReactor.scheduleCheck(beatCheckTask);
} else {
healthCheckTaskV2 = new HealthCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
}
}
在同步客户端衔接的时分,后台会履行一个每隔5s的守时任务,对衔接的客户端进行健康检查。
upgradeClient
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
Set<Service> syncedService = new HashSet<>();
// process batch instance sync logic
processBatchInstanceDistroData(syncedService, client, clientSyncData);
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
// 处理变化的新增数据
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
}
}
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
// 处理变化的多余数据
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
}
upgradeClient
则是对同步的数据clientSyncData
进行遍历,找到变化的数据,然后进行处理。当然处理的逻辑都是发布事情,将事情和逻辑区分隔。由事情驱动。这儿就不再剖析各个事情,如果有感兴趣的小伙伴能够自行剖析。
handleQueryData
private DistroDataResponse handleQueryData(DistroData distroData) {
DistroDataResponse result = new DistroDataResponse();
DistroKey distroKey = distroData.getDistroKey();
// distroProtocol履行onQuery
DistroData queryData = distroProtocol.onQuery(distroKey);
result.setDistroData(queryData);
return result;
}
看下distroProtocol
的onQuery
public DistroData onQuery(DistroKey distroKey) {
String resourceType = distroKey.getResourceType();
// 这儿拿到的DistroClientDataProcessor
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType);
if (null == distroDataStorage) {
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", resourceType);
return new DistroData(distroKey, new byte[0]);
}
// DistroClientDataProcessor的getDistroData
return distroDataStorage.getDistroData(distroKey);
}
DistroClientDataProcessor#getDistroData
的办法
public DistroData getDistroData(DistroKey distroKey) {
Client client = clientManager.getClient(distroKey.getResourceKey());
if (null == client) {
return null;
}
// 拿到client.generateSyncData()
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
return new DistroData(distroKey, data);
}
查询的办法比较简单,便是拿到client.generateSyncData()
,序列化后组成DistroData
回来。
总结
承受到Distro
的相关恳求后,通过DistroDataRequestHandler
处理,而DistroDataRequestHandler
又由DistroProtocol
中的处理逻辑进行处理的。然后咱们剖析了上篇没有剖析的DistroProtocol
中的几个相关办法。
在DistroProtocol
中又主要由DistroClientDataProcessor
进行处理。其主要仍是将各个节点的特点的数据存储在DistroData
中,然后由各个服务节点之间的缓存进行保存的。所以咱们一直称之为Distro
是一个临时性的同步协议,它的数据仅仅保存再运行态,当服务关闭了就不存在了。发动的时分拉取最新的数据进行保存,然后通过一系列Rpc
的恳求和内部各个事情,比如衔接注册事情,断开事情等进行事情的驱动,一环扣一环,确保数据的一致性。