上篇说到Nacos客户端的注册流程(没看过的小伙伴可以点击这儿1、Nacos 服务注册客户端源码剖析),那注册发送到服务端,服务端是怎么处理的呢?

本篇就通过源码来剖析一下服务端的处理流程。

Nacos注册服务端源码解析

回顾一下,客户端注册代码。客户端通过com.alibaba.nacos.common.remote.client.RpcClient#request(com.alibaba.nacos.api.remote.request.Request, long)进行调用,那调用后服务端的代码进口在哪里呢?

咱们知道Nacos注册服务是通过Grpc长途调用,现在客户端建议调用,必定有个Grpc的服务端在承受服务。

Grpc 服务端

com.alibaba.nacos.core.remote.BaseRpcServer是Grpc服务端的一个笼统类,其有一个PostConstruct,表明在构造办法后履行,下面看看它详细做了什么事情

@PostConstruct
public void start() throws Exception {
    String serverName = getClass().getSimpleName();
    Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());
    // 发动Grpc的服务端
    startServer();
    Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());
    // 增加一个封闭的钩子函数,当虚拟机承受封闭退出信号的时分封闭服务,详细的也便是封闭Grpc的服务端
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
        try {
            BaseRpcServer.this.stopServer();
            Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
        } catch (Exception e) {
            Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
        }
    }));
}

这个startServer()是一个笼统办法,详细由其子类进行完成,查看子类,发现是由BaseGrpcServer完成

@Override
public void startServer() throws Exception {
    final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
    // server interceptor to set connection id.
    ServerInterceptor serverInterceptor = new ServerInterceptor() {
        @Override
        public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
                                                           ServerCallHandler<T, S> next) {
            Context ctx = Context.current()
                .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
            if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
                Channel internalChannel = getInternalChannel(call);
                ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
            }
            return Contexts.interceptCall(ctx, call, headers, next);
        }
    };
    // 注册服务
    addServices(handlerRegistry, serverInterceptor);
    // 创立grpc的server端
    server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
        .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
        .compressorRegistry(CompressorRegistry.getDefaultInstance())
        .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
        .addTransportFilter(new ServerTransportFilter() {
            @Override
            public Attributes transportReady(Attributes transportAttrs) {
                InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                    .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                    .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                int remotePort = remoteAddress.getPort();
                int localPort = localAddress.getPort();
                String remoteIp = remoteAddress.getAddress().getHostAddress();
                Attributes attrWrapper = transportAttrs.toBuilder()
                    .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                    .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                    .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
                Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
                return attrWrapper;
            }
            @Override
            public void transportTerminated(Attributes transportAttrs) {
                String connectionId = null;
                try {
                    connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                } catch (Exception e) {
                    // Ignore
                }
                if (StringUtils.isNotBlank(connectionId)) {
                    Loggers.REMOTE_DIGEST
                        .info("Connection transportTerminated,connectionId = {} ", connectionId);
                    connectionManager.unregister(connectionId);
                }
            }
        }).build();
    // 发动grpc server
    server.start();
}

注册服务的过程中就能看到增加了Grpc的处理方式

private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
    // unary common call register.
    final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
        .setType(MethodDescriptor.MethodType.UNARY)
        .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
        .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
        .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
    // 处理类
    final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
        .asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
    final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
        .addMethod(unaryPayloadMethod, payloadHandler).build();
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
    // bi stream register.
    // 双向流处理类
    final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
        (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
    final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
        .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
                                                                               .generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
        .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
        .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
    final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
        .builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
}

而这两个类也便是完成了nacos_grpc_service.proto的两个子处理类。

public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase
public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase

对应nacos_grpc_service.proto中的两个办法处理

service Request {
  // Sends a commonRequest
  rpc request (Payload) returns (Payload) {
  }
}
service BiRequestStream {
  // Sends a biStreamRequest
  rpc requestBiStream (stream Payload) returns (stream Payload) {
  }
}

咱们再看下BaseRpcServer,这个类是一个笼统类,它有两个子类,分别是GrpcClusterServerGrpcSdkServer。这两个类是从哪里创立的呢?又是做什么用的呢?带着这两个问题,咱们剖析一下这两个Server类。

GrpcClusterServerGrpcSdkServer剖析

Server的创立

这两个类其实很简单,这儿直接贴一下源码

@Service
public class GrpcSdkServer extends BaseGrpcServer {
    @Override
    public int rpcPortOffset() {
        return Constants.SDK_GRPC_PORT_DEFAULT_OFFSET;
    }
    @Override
    public ThreadPoolExecutor getRpcExecutor() {
        return GlobalExecutor.sdkRpcExecutor;
    }
}
@Service
public class GrpcClusterServer extends BaseGrpcServer {
    @Override
    public int rpcPortOffset() {
        return Constants.CLUSTER_GRPC_PORT_DEFAULT_OFFSET;
    }
    @Override
    public ThreadPoolExecutor getRpcExecutor() {
        if (!GlobalExecutor.clusterRpcExecutor.allowsCoreThreadTimeOut()) {
            GlobalExecutor.clusterRpcExecutor.allowCoreThreadTimeOut(true);
        }
        return GlobalExecutor.clusterRpcExecutor;
    }
}

类中就两个办法,一个设置端口,一个是获取衔接池。然后在类上有个@Servcice的注解。这个注解是spring的注解,只需归spring扫描到的话,就可以自动帮咱们创立并放入到spring容器中。而Nacos的发动类是com.alibaba.nacos.Nacos,这儿放一下源码。

@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {
    public static void main(String[] args) {
        SpringApplication.run(Nacos.class, args);
    }
}

可以看出这是一个Spring Boot工程,扫描的时分扫描了com.alibaba.nacos包,当然也包含其子包。这样这两个Server类均会被扫描到,进行注入,从而调用com.alibaba.nacos.core.remote.BaseRpcServer#start,从而进行一系列的创立操作。

Server的效果

这儿创立了两个Server,从Server的姓名上能看出,一个是SDK服务调用,一个是Cluster服务调用,可是这两个类并没有写的很清楚究竟做啥用的。看办法也基本共同,那咱们就从客户端进口,看客户端调用有啥差异来辨认服务端的差异。

在两个类中,有个端口偏移量的处理,在GrpcSdkServer中的是SDK_GRPC_PORT_DEFAULT_OFFSET,也便是偏移1000,而GrpcClusterServer中的是CLUSTER_GRPC_PORT_DEFAULT_OFFSET为1001。从端口咱们去查找客户端代码,发现客户端的两个类,分别是GrpcSdkClientGrpcClusterClient。他们的创立在com.alibaba.nacos.common.remote.client.RpcClientFactory中,其代码如下

public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,Integer threadPoolMaxSize, Map<String, String> labels) {
    if (!ConnectionType.GRPC.equals(connectionType)) {
        throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
    }
    return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
        LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
        try {
            // 创立SDK客户端
            return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels);
        } catch (Throwable throwable) {
            LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
            throw throwable;
        }
    });
}
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
        Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
    if (!ConnectionType.GRPC.equals(connectionType)) {
        throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
    }
    // 创立cluster客户端
    return CLIENT_MAP.computeIfAbsent(clientName,
            clientNameInner -> new GrpcClusterClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize,labels));
}

从这两段代码看不出差异,不过别急,立刻就能看到差异了。

再看下是谁在调用这两个办法。

createClient是被ClientWorkerNamingGrpcClientProxy调用,这两个类是位于nacos-client傍边的,也便是一般咱们需要引进的客户端,由客户端建议调用。

2、Nacos 服务注册服务端源码分析(一)

createClusterClient是被ClusterRpcClientProxyRpcClientFactory调用,这两个类位于nacos-core中。nacos-core是服务端引证,在服务与服务端内部调用。

2、Nacos 服务注册服务端源码分析(一)

至此,这两个类水落石出了。

假如是咱们的事务工程代码,引进的是nacos-client,这样调用的服务端便是GrpcSdkServer,由这个服务来处理客户端恳求,针对是外部服务的调用。而GrpcClusterServer不处理外部的恳求,处理的内部的服务器端和服务器端的调用。

假如看不懂一个服务代码的调用,可以一向溯源,溯源到恳求方,看是假如建议的恳求,或许通过断点法,看IDE的仓库来向上溯源,这两种都是很好的办法。

服务端处理办法剖析

在上面咱们现已剖析,不过什么Grpc的调用,都会通过grpcCommonRequestAcceptor.request(request, responseObserver)grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)的处理。而接下来要点解说下com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor#request的处理。

这个办法中,前面都是校验和转化,这儿就不细看了,直接抓要点

Request request = (Request) parseObj;
try {
    // 获取衔接
    Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
    // 创立并设置恳求的节本信息
    RequestMeta requestMeta = new RequestMeta();
    requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
    requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
    requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
    requestMeta.setLabels(connection.getMetaInfo().getLabels());
    // 刷新活泼时刻,根据这个时刻看是否超时的
    connectionManager.refreshActiveTime(requestMeta.getConnectionId());
    // 要点!!!处理恳求
    Response response = requestHandler.handleRequest(request, requestMeta);
    // 拿到恳求解析,完成恳求
    Payload payloadResponse = GrpcUtils.convert(response);
    traceIfNecessary(payloadResponse, false);
    responseObserver.onNext(payloadResponse);
    responseObserver.onCompleted();
} catch (Throwable e) {
    Loggers.REMOTE_DIGEST
        .error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId,
               e);
    Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(e));
    traceIfNecessary(payloadResponse, false);
    responseObserver.onNext(payloadResponse);
    responseObserver.onCompleted();
}

要点在网络处理requestHandle.handleRequest(request, requestMeta)的处理中。

 public Response handleRequest(T request, RequestMeta meta) throws NacosException {
     for (AbstractRequestFilter filter : requestFilters.filters) {
         try {
             Response filterResult = filter.filter(request, meta, this.getClass());
             if (filterResult != null && !filterResult.isSuccess()) {
                 return filterResult;
             }
         } catch (Throwable throwable) {
             Loggers.REMOTE.error("filter error", throwable);
         }
     }
     // 调用处理办法
     return handle(request, meta);
 }
// 笼统办法,由子类处理
public abstract S handle(T request, RequestMeta meta) throws NacosException;

这儿采用了模板办法的规划模式。模板办法的规划模式是界说顶层的处理逻辑,而详细步骤的完成由子类去完成。

当咱们想看下这个handle是怎么处理的时分,发现其子类非常多。

2、Nacos 服务注册服务端源码分析(一)

注册的方式的处理类是哪个呢?

看到这个类有个泛型T extends Request,必定跟Request有关,咱们回去找下咱们的Request详细是哪个子类。而Request是由客户端建议的,那就需要回到上一篇看下,详细的Request是怎么来的。咱们可以自己找下。我相信聪明的各位必定最后会找到com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doRegisterService办法。

public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 构建Request目标
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                                                  NamingRemoteConstants.REGISTER_INSTANCE, instance);
    requestToServer(request, Response.class);
    redoService.instanceRegistered(serviceName, groupName);
}

通过这个Request目标,咱们就可以很快找到详细的处理类了。

2、Nacos 服务注册服务端源码分析(一)

没错,便是这个com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler。在这个类中,就有着com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler#handle办法。

@Override
@Secured(action = ActionTypes.WRITE)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
    Service service = Service
        .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
    switch (request.getType()) {
        case NamingRemoteConstants.REGISTER_INSTANCE:
            // 处理注册
            return registerInstance(service, request, meta);
        case NamingRemoteConstants.DE_REGISTER_INSTANCE:
            // 处理下线
            return deregisterInstance(service, request, meta);
        default:
            throw new NacosException(NacosException.INVALID_PARAM,
                                     String.format("Unsupported request type %s", request.getType()));
    }
}
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
    throws NacosException {
    // 注册实例
    clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
    // 发布事情
    NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
                                                             meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
                                                             request.getInstance().getIp(), request.getInstance().getPort()));
    return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
@Override
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    if (!singleton.isEphemeral()) {
        throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                                        String.format("Current service %s is persistent service, can't register ephemeral instance.",
                                                      singleton.getGroupedServiceName()));
    }
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();
    client.recalculateRevision();
    // 持续发布事情
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

咱们看到注册逻辑好像并没有处理什么内容,实践都是在发布事情后,承受事情的处理逻辑进行处理的。而详细是怎么处理的呢?由于NotifyCenter是个很重要的类。剖析起来又要花很多时刻。所以,本篇就先解说到这儿。

总结

本篇花了很大的篇幅解说了Grpc服务端,包含发动的两个Server,GrpcClusterServerGrpcSdkServer。他们是怎么发动的和他们各自的效果。后边还介绍了咱们怎么寻找处理的Handler类。假如有过源码阅读经验的小伙伴看到Handler这个词就会发生反射,这一定是一个网络处理类。由于从Netty(当然可能其他框架,或许更早的框架)都是用Handler来对网络恳求进行处理的。

NotifyCenter的解说可以看这篇3、Nacos 服务注册服务端源码剖析(二)