在上一篇13、Nacos 源码剖析-Distro协议(上)中,咱们剖析了和Distro协议相关的一些中心类,但是剖析的更多是作为建议方的一个处理逻辑,本篇站在Dirstro协议的接收方的角度持续剖析接收到之后恳求后的处理逻辑。

回想一下上篇中,咱们剖析到clusterRpcClientProxy.sendRequest(member, request)后,就未再持续往下持续盯梢了,本次接着这部分代码持续深入,首先咱们确认的是这个requestDistroDataRequest类型,然后找到对应requesthandler便是他的处理逻辑。

DistroDataRequest的处理逻辑便是DistroDataRequestHandlerhandle办法中。

@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
    try {
        switch (request.getDataOperation()) {
            case VERIFY:
                // 处理验证恳求
                return handleVerify(request.getDistroData(), meta);
            case SNAPSHOT:
                // 处理快照恳求
                return handleSnapshot();
            case ADD:
            case CHANGE:
            case DELETE:
                // 处理数据变化的恳求
                return handleSyncData(request.getDistroData());
            case QUERY:
                // 处理查询数据的恳求
                return handleQueryData(request.getDistroData());
            default:
                // 默许的回来
                return new DistroDataResponse();
        }
    } catch (Exception e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
        DistroDataResponse result = new DistroDataResponse();
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("handle distro request with exception");
        return result;
    }
}

上面的代码能够看出,DistroDataRequest有这多种数据操作类型,根据不同的类型有着不同的处理方式。其处理方式别离有处理验证恳求处理快照恳求处理数据变化的恳求处理查询数据的恳求。下面咱们逐个剖析下。

handleVerify

private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
    DistroDataResponse result = new DistroDataResponse();
    // 运用distroProtocol进行验证
    if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
        result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
    }
    return result;
}

代码中直接运用了distroProtocol.onVerify办法。

public boolean onVerify(DistroData distroData, String sourceAddress) {
    if (Loggers.DISTRO.isDebugEnabled()) {
        Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),
                             distroData.getDistroKey());
    }
    String resourceType = distroData.getDistroKey().getResourceType();
    // 寻找对应的处理器履行,这儿回想一下DistroClientComponentRegistry#doRegister()办法,在register中注册的处理器,实践便是DistroClientDataProcessor
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
        return false;
    }
    // 通过DistroClientDataProcessor履行验证
    return dataProcessor.processVerifyData(distroData, sourceAddress);
}

既然咱们知道了dataProcessor便是DistroClientDataProcessor,那咱们接着往下看

public boolean processVerifyData(DistroData distroData, String sourceAddress) {
    // 对数据进行反序列化为DistroClientVerifyInfo对象
    DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class)
        .deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
    // 通过clientManager进行验证
    if (clientManager.verifyClient(verifyData)) {
        return true;
    }
    Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
    return false;
}

clientManager是一个托付类,在Distro协议中,托付的便是EphemeralIpPortClientManager,由于ephemeral默许是true(在1、Nacos 服务注册客户端源码剖析中剖析过)

@Override
public boolean verifyClient(DistroClientVerifyInfo verifyData) {
    String clientId = verifyData.getClientId();
    IpPortBasedClient client = clients.get(clientId);
    if (null != client) {
        // 旧版本的长途节点将始终运用零修订进行验证
        if (0 == verifyData.getRevision() || client.getRevision() == verifyData.getRevision()) {
            NamingExecuteTaskDispatcher.getInstance()
                .dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
            return true;
        } else {
            Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] IpPortBasedClient[{}] revision local={}, remote={}",
                                client.getClientId(), client.getRevision(), verifyData.getRevision());
        }
    }
    return false;
}

如果版本一致的话,就会履行ClientBeatUpdateTask,看下ClientBeatUpdateTaskrun办法

@Override
public void run() {
    long currentTime = System.currentTimeMillis();
    for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {
        ((HealthCheckInstancePublishInfo) each).setLastHeartBeatTime(currentTime);
    }
    client.setLastUpdatedTime();
}

这个办法便是更新了client的最新的更新时刻。

handleSnapshot

private DistroDataResponse handleSnapshot() {
    DistroDataResponse result = new DistroDataResponse();
    // 运用的是distroProtocol的onSnapshot办法
    DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
    result.setDistroData(distroData);
    return result;
}

代码中直接运用了distroProtocol.onSnapshot办法。

public DistroData onSnapshot(String type) {
    // 寻找对应的DistroDataStorage履行,这儿回想一下DistroClientComponentRegistry#doRegister()办法,在register中的registerDataStorage,实践便是DistroClientDataProcessor
    DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
    if (null == distroDataStorage) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
        return new DistroData(new DistroKey("snapshot", type), new byte[0]);
    }
    return distroDataStorage.getDatumSnapshot();
}

没想到吧,DistroDataStorage也是DistroClientDataProcessor,持续往下看

public DistroData getDatumSnapshot() {
    List<ClientSyncData> datum = new LinkedList<>();
    for (String each : clientManager.allClientId()) {
        Client client = clientManager.getClient(each);
        if (null == client || !client.isEphemeral()) {
            continue;
        }
        // 获取SyncData
        datum.add(client.generateSyncData());
    }
    ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
    snapshot.setClientSyncDataList(datum);
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot);
    // 回来了clientManager中一切client的ClientSyncData
    return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}

回来了一切实例的ClientSyncData。那generateSyncData()是生成了什么数据呢?

@Override
public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<String> batchNamespaces = new LinkedList<>();
    List<String> batchGroupNames = new LinkedList<>();
    List<String> batchServiceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();
    BatchInstanceData  batchInstanceData = new BatchInstanceData();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        InstancePublishInfo instancePublishInfo = entry.getValue();
        if (instancePublishInfo instanceof BatchInstancePublishInfo) {
            BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;
            batchInstancePublishInfos.add(batchInstance);
            buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);
            batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);
        } else {
            namespaces.add(entry.getKey().getNamespace());
            groupNames.add(entry.getKey().getGroup());
            serviceNames.add(entry.getKey().getName());
            instances.add(entry.getValue());
        }
    }
    ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
    data.getAttributes().addClientAttribute(REVISION, getRevision());
    return data;
}

ClientSyncData便是一个实例的一切的特点信息,包括namespacegroupNameserviceName等信息。

handleSyncData

private DistroDataResponse handleSyncData(DistroData distroData) {
    DistroDataResponse result = new DistroDataResponse();
    // distroProtocol承受distroData
    if (!distroProtocol.onReceive(distroData)) {
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("[DISTRO-FAILED] distro data handle failed");
    }
    return result;
}

仍是distroProtocol处理

public boolean onReceive(DistroData distroData) {
    Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
                        distroData.getDistroKey());
    String resourceType = distroData.getDistroKey().getResourceType();
    // 获取到DistroClientDataProcessor
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
        return false;
    }
    return dataProcessor.processData(distroData);
}

依然是调用DistroClientDataProcessorprocessData办法

@Override
public boolean processData(DistroData distroData) {
    switch (distroData.getType()) {
        case ADD:
        case CHANGE:
            // 反序列化ClientSyncData
            ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                .deserialize(distroData.getContent(), ClientSyncData.class);
            // 处理新增和更新的办法
            handlerClientSyncData(clientSyncData);
            return true;
        case DELETE:
            String deleteClientId = distroData.getDistroKey().getResourceKey();
            Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
            // 删去则断开衔接
            clientManager.clientDisconnected(deleteClientId);
            return true;
        default:
            return false;
    }
}

在新增的和修改的时分调用了handlerClientSyncData办法。

private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO
        .info("[Client-Add] Received distro client sync data {}, revision={}", clientSyncData.getClientId(),
              clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
    // 同步客户端的衔接
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 对client更新
    upgradeClient(client, clientSyncData);
}

syncClientConnected

这个syncClientConnected运用的是EphemeralIpPortClientManagersyncClientConnected

@Override
public boolean clientConnected(String clientId, ClientAttributes attributes) {
    // 创立一个client,并进行衔接
    return clientConnected(clientFactory.newClient(clientId, attributes));
}
@Override
public IpPortBasedClient newClient(String clientId, ClientAttributes attributes) {
    long revision = attributes.getClientAttribute(REVISION, 0);
    // 创立了一个基于ip和端口的客户端
    IpPortBasedClient ipPortBasedClient = new IpPortBasedClient(clientId, true, revision);
    ipPortBasedClient.setAttributes(attributes);
    return ipPortBasedClient;
}
@Override
public boolean clientConnected(final Client client) {
    clients.computeIfAbsent(client.getClientId(), s -> {
        Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
        IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
        // 对客户端进行初始化
        ipPortBasedClient.init();
        return ipPortBasedClient;
    });
    return true;
}
public void init() {
    if (ephemeral) {
        beatCheckTask = new ClientBeatCheckTaskV2(this);
        // 守时,每隔5s做一次健康检查
        HealthCheckReactor.scheduleCheck(beatCheckTask);
    } else {
        healthCheckTaskV2 = new HealthCheckTaskV2(this);
        HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
    }
}

在同步客户端衔接的时分,后台会履行一个每隔5s的守时任务,对衔接的客户端进行健康检查。

upgradeClient

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    Set<Service> syncedService = new HashSet<>();
    // process batch instance sync logic
    processBatchInstanceDistroData(syncedService, client, clientSyncData);
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            // 处理变化的新增数据
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            // 处理变化的多余数据
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
}

upgradeClient则是对同步的数据clientSyncData进行遍历,找到变化的数据,然后进行处理。当然处理的逻辑都是发布事情,将事情和逻辑区分隔。由事情驱动。这儿就不再剖析各个事情,如果有感兴趣的小伙伴能够自行剖析。

handleQueryData

private DistroDataResponse handleQueryData(DistroData distroData) {
    DistroDataResponse result = new DistroDataResponse();
    DistroKey distroKey = distroData.getDistroKey();
    // distroProtocol履行onQuery
    DistroData queryData = distroProtocol.onQuery(distroKey);
    result.setDistroData(queryData);
    return result;
}

看下distroProtocolonQuery

public DistroData onQuery(DistroKey distroKey) {
    String resourceType = distroKey.getResourceType();
    // 这儿拿到的DistroClientDataProcessor
    DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType);
    if (null == distroDataStorage) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", resourceType);
        return new DistroData(distroKey, new byte[0]);
    }
    // DistroClientDataProcessor的getDistroData
    return distroDataStorage.getDistroData(distroKey);
}

DistroClientDataProcessor#getDistroData的办法

public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    // 拿到client.generateSyncData()
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

查询的办法比较简单,便是拿到client.generateSyncData(),序列化后组成DistroData回来。

总结

承受到Distro的相关恳求后,通过DistroDataRequestHandler处理,而DistroDataRequestHandler又由DistroProtocol中的处理逻辑进行处理的。然后咱们剖析了上篇没有剖析的DistroProtocol中的几个相关办法。

DistroProtocol中又主要由DistroClientDataProcessor进行处理。其主要仍是将各个节点的特点的数据存储DistroData中,然后由各个服务节点之间的缓存进行保存的。所以咱们一直称之为Distro是一个临时性的同步协议,它的数据仅仅保存再运行态,当服务关闭了就不存在了。发动的时分拉取最新的数据进行保存,然后通过一系列Rpc的恳求和内部各个事情,比如衔接注册事情,断开事情等进行事情的驱动,一环扣一环,确保数据的一致性。