上篇说到Nacos
客户端的注册流程(没看过的小伙伴可以点击这儿1、Nacos 服务注册客户端源码剖析),那注册发送到服务端,服务端是怎么处理的呢?
本篇就通过源码来剖析一下服务端的处理流程。
Nacos注册服务端源码解析
回顾一下,客户端注册代码。客户端通过com.alibaba.nacos.common.remote.client.RpcClient#request(com.alibaba.nacos.api.remote.request.Request, long)
进行调用,那调用后服务端的代码进口在哪里呢?
咱们知道Nacos注册服务是通过Grpc长途调用,现在客户端建议调用,必定有个Grpc的服务端在承受服务。
Grpc 服务端
com.alibaba.nacos.core.remote.BaseRpcServer
是Grpc服务端的一个笼统类,其有一个PostConstruct
,表明在构造办法后履行,下面看看它详细做了什么事情
@PostConstruct
public void start() throws Exception {
String serverName = getClass().getSimpleName();
Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());
// 发动Grpc的服务端
startServer();
Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());
// 增加一个封闭的钩子函数,当虚拟机承受封闭退出信号的时分封闭服务,详细的也便是封闭Grpc的服务端
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
try {
BaseRpcServer.this.stopServer();
Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
} catch (Exception e) {
Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
}
}));
}
这个startServer()
是一个笼统办法,详细由其子类进行完成,查看子类,发现是由BaseGrpcServer
完成
@Override
public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// server interceptor to set connection id.
ServerInterceptor serverInterceptor = new ServerInterceptor() {
@Override
public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
ServerCallHandler<T, S> next) {
Context ctx = Context.current()
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
.withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
.withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
.withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
Channel internalChannel = getInternalChannel(call);
ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
}
return Contexts.interceptCall(ctx, call, headers, next);
}
};
// 注册服务
addServices(handlerRegistry, serverInterceptor);
// 创立grpc的server端
server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
.maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addTransportFilter(new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
int remotePort = remoteAddress.getPort();
int localPort = localAddress.getPort();
String remoteIp = remoteAddress.getAddress().getHostAddress();
Attributes attrWrapper = transportAttrs.toBuilder()
.set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
.set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build();
String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
return attrWrapper;
}
@Override
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
Loggers.REMOTE_DIGEST
.info("Connection transportTerminated,connectionId = {} ", connectionId);
connectionManager.unregister(connectionId);
}
}
}).build();
// 发动grpc server
server.start();
}
注册服务的过程中就能看到增加了Grpc的处理方式
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
// unary common call register.
final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
// 处理类
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
.addMethod(unaryPayloadMethod, payloadHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
// bi stream register.
// 双向流处理类
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
(responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
}
而这两个类也便是完成了nacos_grpc_service.proto
的两个子处理类。
public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase
public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase
对应nacos_grpc_service.proto
中的两个办法处理
service Request {
// Sends a commonRequest
rpc request (Payload) returns (Payload) {
}
}
service BiRequestStream {
// Sends a biStreamRequest
rpc requestBiStream (stream Payload) returns (stream Payload) {
}
}
咱们再看下BaseRpcServer
,这个类是一个笼统类,它有两个子类,分别是GrpcClusterServer
和GrpcSdkServer
。这两个类是从哪里创立的呢?又是做什么用的呢?带着这两个问题,咱们剖析一下这两个Server类。
GrpcClusterServer
和GrpcSdkServer
剖析
Server的创立
这两个类其实很简单,这儿直接贴一下源码
@Service
public class GrpcSdkServer extends BaseGrpcServer {
@Override
public int rpcPortOffset() {
return Constants.SDK_GRPC_PORT_DEFAULT_OFFSET;
}
@Override
public ThreadPoolExecutor getRpcExecutor() {
return GlobalExecutor.sdkRpcExecutor;
}
}
@Service
public class GrpcClusterServer extends BaseGrpcServer {
@Override
public int rpcPortOffset() {
return Constants.CLUSTER_GRPC_PORT_DEFAULT_OFFSET;
}
@Override
public ThreadPoolExecutor getRpcExecutor() {
if (!GlobalExecutor.clusterRpcExecutor.allowsCoreThreadTimeOut()) {
GlobalExecutor.clusterRpcExecutor.allowCoreThreadTimeOut(true);
}
return GlobalExecutor.clusterRpcExecutor;
}
}
类中就两个办法,一个设置端口,一个是获取衔接池。然后在类上有个@Servcice
的注解。这个注解是spring
的注解,只需归spring
扫描到的话,就可以自动帮咱们创立并放入到spring
容器中。而Nacos
的发动类是com.alibaba.nacos.Nacos
,这儿放一下源码。
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {
public static void main(String[] args) {
SpringApplication.run(Nacos.class, args);
}
}
可以看出这是一个Spring Boot
工程,扫描的时分扫描了com.alibaba.nacos
包,当然也包含其子包。这样这两个Server类均会被扫描到,进行注入,从而调用com.alibaba.nacos.core.remote.BaseRpcServer#start
,从而进行一系列的创立操作。
Server的效果
这儿创立了两个Server,从Server的姓名上能看出,一个是SDK服务调用,一个是Cluster服务调用,可是这两个类并没有写的很清楚究竟做啥用的。看办法也基本共同,那咱们就从客户端进口,看客户端调用有啥差异来辨认服务端的差异。
在两个类中,有个端口偏移量的处理,在GrpcSdkServer
中的是SDK_GRPC_PORT_DEFAULT_OFFSET
,也便是偏移1000,而GrpcClusterServer
中的是CLUSTER_GRPC_PORT_DEFAULT_OFFSET
为1001。从端口咱们去查找客户端代码,发现客户端的两个类,分别是GrpcSdkClient
和GrpcClusterClient
。他们的创立在com.alibaba.nacos.common.remote.client.RpcClientFactory
中,其代码如下
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,Integer threadPoolMaxSize, Map<String, String> labels) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
try {
// 创立SDK客户端
return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels);
} catch (Throwable throwable) {
LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
throw throwable;
}
});
}
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
// 创立cluster客户端
return CLIENT_MAP.computeIfAbsent(clientName,
clientNameInner -> new GrpcClusterClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize,labels));
}
从这两段代码看不出差异,不过别急,立刻就能看到差异了。
再看下是谁在调用这两个办法。
createClient
是被ClientWorker
和NamingGrpcClientProxy
调用,这两个类是位于nacos-client
傍边的,也便是一般咱们需要引进的客户端,由客户端建议调用。
createClusterClient
是被ClusterRpcClientProxy
和RpcClientFactory
调用,这两个类位于nacos-core
中。nacos-core
是服务端引证,在服务与服务端内部调用。
至此,这两个类水落石出了。
假如是咱们的事务工程代码,引进的是nacos-client
,这样调用的服务端便是GrpcSdkServer
,由这个服务来处理客户端恳求,针对是外部服务的调用。而GrpcClusterServer
不处理外部的恳求,处理的内部的服务器端和服务器端的调用。
假如看不懂一个服务代码的调用,可以一向溯源,溯源到恳求方,看是假如建议的恳求,或许通过断点法,看IDE的仓库来向上溯源,这两种都是很好的办法。
服务端处理办法剖析
在上面咱们现已剖析,不过什么Grpc
的调用,都会通过grpcCommonRequestAcceptor.request(request, responseObserver)
和grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)
的处理。而接下来要点解说下com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor#request
的处理。
这个办法中,前面都是校验和转化,这儿就不细看了,直接抓要点
Request request = (Request) parseObj;
try {
// 获取衔接
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
// 创立并设置恳求的节本信息
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新活泼时刻,根据这个时刻看是否超时的
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
// 要点!!!处理恳求
Response response = requestHandler.handleRequest(request, requestMeta);
// 拿到恳求解析,完成恳求
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
} catch (Throwable e) {
Loggers.REMOTE_DIGEST
.error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId,
e);
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(e));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}
要点在网络处理requestHandle.handleRequest(request, requestMeta)
的处理中。
public Response handleRequest(T request, RequestMeta meta) throws NacosException {
for (AbstractRequestFilter filter : requestFilters.filters) {
try {
Response filterResult = filter.filter(request, meta, this.getClass());
if (filterResult != null && !filterResult.isSuccess()) {
return filterResult;
}
} catch (Throwable throwable) {
Loggers.REMOTE.error("filter error", throwable);
}
}
// 调用处理办法
return handle(request, meta);
}
// 笼统办法,由子类处理
public abstract S handle(T request, RequestMeta meta) throws NacosException;
这儿采用了模板办法的规划模式。模板办法的规划模式是界说顶层的处理逻辑,而详细步骤的完成由子类去完成。
当咱们想看下这个handle是怎么处理的时分,发现其子类非常多。
注册的方式的处理类是哪个呢?
看到这个类有个泛型T extends Request
,必定跟Request
有关,咱们回去找下咱们的Request
详细是哪个子类。而Request
是由客户端建议的,那就需要回到上一篇看下,详细的Request
是怎么来的。咱们可以自己找下。我相信聪明的各位必定最后会找到com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doRegisterService
办法。
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
// 构建Request目标
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
}
通过这个Request
目标,咱们就可以很快找到详细的处理类了。
没错,便是这个com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler
。在这个类中,就有着com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler#handle
办法。
@Override
@Secured(action = ActionTypes.WRITE)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
// 处理注册
return registerInstance(service, request, meta);
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
// 处理下线
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
}
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
throws NacosException {
// 注册实例
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
// 发布事情
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
request.getInstance().getIp(), request.getInstance().getPort()));
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
@Override
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
client.recalculateRevision();
// 持续发布事情
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
咱们看到注册逻辑好像并没有处理什么内容,实践都是在发布事情后,承受事情的处理逻辑进行处理的。而详细是怎么处理的呢?由于NotifyCenter
是个很重要的类。剖析起来又要花很多时刻。所以,本篇就先解说到这儿。
总结
本篇花了很大的篇幅解说了Grpc服务端
,包含发动的两个Server
,GrpcClusterServer
和GrpcSdkServer
。他们是怎么发动的和他们各自的效果。后边还介绍了咱们怎么寻找处理的Handler类。假如有过源码阅读经验的小伙伴看到Handler
这个词就会发生反射,这一定是一个网络处理类。由于从Netty
(当然可能其他框架,或许更早的框架)都是用Handler
来对网络恳求进行处理的。
对NotifyCenter
的解说可以看这篇3、Nacos 服务注册服务端源码剖析(二)