研讨一款入门级其他 RPC 结构 — XXL-RPC
XXL-RPC 结构是 XXL-JOB 作者出品, 官网地址 分布式服务结构XXL-RPC
XXL-RPC 是一款轻量、简略,但五脏俱全的 RPC 结构,本地环境搭建十分简略, 对初学 RPC 的人来说,十分的友好,建议阅览学习,关于了解 RPC 大有裨益。 本文也是环绕 XXL-RPC 结构行文的。
官网文章现已写得很好,而本文是基于自己的学习和了解而成文的;内容会涉及源码,篇幅偏长。为了更好了解 XXL-PRC,需求有 Spring 初始化进程和 Netty 知识背景。
一、RPC 根底
1.1 入门了解
PRC 是为调用长途的服务像调用本地一样服务简略。
举例:在 node1 节点上调用 node2 的 serviceB,就像 node1 上调用 node1 上的 serviceB 一样。
要完成这一方针,由上图可见,通讯组件必不可少。假如不考虑集群,能够将 node2 的各种信息装备写入 node1,可是考虑集群的扩容和缩容,就必需要有一个组件能及时地更新各节点的信息,暂时称这个组件为服务注册与发现组件。
对 RPC 有了最根本的了解后,接着咱们再进一步看 RPC 的理论和规划。
1.2 根底原理和规划
假如 node1 要调用 node2 服务就需求规划一个简略的通讯;能够粗略概括如下:
为了更好的了解上图,补充一下术语:
术语 | 了解 |
---|---|
stub | Stub 是在客户端端点的署理程序,用于代表客户端应用程序与长途服务进行通讯。它躲藏了底层的网络通讯细节,当客户端应用程序调用 stub 中的函数时,stub 将担任将恳求打包并经过网络发送给长途服务端。 |
skeleton | Skeleton 是在服务端端点的署理程序,用于接纳来自客户端的恳求,并将其解包后传递给实践的服务完成。Skeleton 躲藏了底层的网络通讯细节,并担任将恳求分派到相应的服务完成函数上。当服务完成函数履行完毕后,skeleton将担任将成果打包并发送给客户端 |
stub 和 skeleton 是 RPC 中用于署理客户端和服务端之间通讯的组件。
RPC 并没有具体规范,可是大致原理是一样的, 一次简略 RPC 调用进程如下:
有了这些根底概念以后,接下来咱们正式进入 XXL-RPC 这个结构的学习。
二、XXL-RPC 的规划和理论
2.1 XXL-RPC 的规划
XXL-RPC 中关于 RPC 的一次调用进程,原理分析如下,作者的规划完成,本文的代码分析也是环绕下面这张图具体展开了解的。
其间 TCP 的模型, XXL-RPC 选用 NIO 进行底层通讯。
XXL-RPC 底层通讯运用了 Netty。 假如要想学习了解 RPC 结构, Netty 是必需要掌握的。
客户端能够跟每一个服务供给者( ip + port
)树立一个 socket 链接。 假如服务供给者是集群,那么每个客户端能够跟这些服务供给者逐个树立链接,形成连接池。客户端恳求的时分,依据负载均衡算法从中取一个进行运用即可。
// 地址由 ip 和 port 组成
this.registryAddress = IpUtil.getIpPort(this.ip, this.port);
2.2 XXL-RPC 工程
XXL-RPC 工程代码如下,有两个示例,接下来就从示例进行下手 (provider 和 consumer/invoker) 到这儿了,不妨拉一下源码看看。
工程是以 spring 为根底的,从 @XxlRpcService
和 @XxlRpcReference
两个注解下手比较合适。而两个注解都是经过 spring 加载进程的后置处理器进行处理,假如把这两个注解捋顺了,整个进程就根本明晰了。
/xxl-rpc-sample-springboot-client :服务消费方 consumer 调用示例
/xxl-rpc-sample-springboot-server :服务供给方 provider 示例
三、源码了解
@XxlRpcService
和 @XxlRpcReference
结构是依靠 spring 容器。在发动进程中,后置处理器会进行各自的初始化处理,源码了解就环绕这两个注解进行展开。
3.1 @XxlRpcService
XxlRpcService 处理的入口是 XxlRpcSpringProviderFactory, 也是服务供给者的中心类; XxlRpcSpringProviderFactory 完成 ApplicationContextAware 和 InitializingBean 两个接口, 这个类在创立的时分,会扫描一切 XxlRpcService 的注解类。在 afterPropertiesSet 中发动 Netty 服务端。
接下来具体查看一下源码:
- 扫描一切 XxlRpcService 类,Spring 后置处理,比较简略。
// 将带有 XxlRpcService 注解的一切类都解析放入到 Map 中
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("XXL-RPC, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
// iface + version 作为 key,将其放入到 map 中
super.addService(iface, version, serviceBean);
}
}
}
- 回调 afterPropertiesSet ,触发 Netty 服务端的创立。
public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {
// 回调调用 start 创立服务
@Override
public void afterPropertiesSet() throws Exception {
super.start();
}
}
- 创立 Netty 服务端
Netty 服务端发动代码; 事务中心处理类是 NettyServerHandler 类;这个类会处理具体恳求,不杂乱,模板式的代码。
public class NettyServer extends Server {
private Thread thread;
@Override
public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {
thread = new Thread(new Runnable() {
@Override
public void run() {
// 设置线程数大小。默认 60、300
final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(
NettyServer.class.getSimpleName(),
xxlRpcProviderFactory.getCorePoolSize(),
xxlRpcProviderFactory.getMaxPoolSize());
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
// 心跳检活
.addLast(new IdleStateHandler(0,0, Beat.BEAT_INTERVAL*3, TimeUnit.SECONDS)) // beat 3N, close if idle
// 编解码
.addLast(new NettyDecoder(XxlRpcRequest.class, xxlRpcProviderFactory.getSerializerInstance()))
.addLast(new NettyEncoder(XxlRpcResponse.class, xxlRpcProviderFactory.getSerializerInstance()))
// 处理恳求的具体类
.addLast(new NettyServerHandler(xxlRpcProviderFactory, serverHandlerPool));
}
})
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();
// 激活服务注册
onStarted();
// wait util stop
future.channel().closeFuture().sync();
.......
}
上面的 Netty 服务端发动代码是十分规范的模板式代码。具体调用进程如下:
- NettyServerHandler 服务恳求处理的 handler
在这个 handler 中,将任务提交到线程池;再经过反射处理恳求,再将成果写回 Netty 通道。
@Override
public void channelRead0(final ChannelHandlerContext ctx, final XxlRpcRequest xxlRpcRequest) throws Exception {
......
// do invoke
try {
serverHandlerPool.execute(new Runnable() {
@Override
public void run() {
// invoke + response
// 真实做调用的当地
XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
// 写回数据
ctx.writeAndFlush(xxlRpcResponse);
}
});
} catch (Exception e) {
.......
ctx.writeAndFlush(xxlRpcResponse);
}
}
- 反射调用接口 xxlRpcProviderFactory.invokeService。 从 Map 中寻找 service,然后经过反射进行调用。
// 经过反射进行调用
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);
服务端的代码仍是比较简略和明晰。大致有以下几个进程:
- XxlRpcSpringProviderFactory 完成了 ApplicationContextAware 接口,所以 spring 发动会回调; 在回调的时分,将一切 XxlRpcService 注解的类都标记成为服务供给的类,用一个 Map 容器承载。
- XxlRpcSpringProviderFactory 完成了 InitializingBean 接口,在一切类都初始化完成后,回调 afterPropertiesSet() 办法,这个时分会经过这个办法发动 Netty 服务端。
- Netty 服务端是十分规范的模板代码,其间 NettyServerHandler 为中心的事务 handler,用来处理服务恳求的中心。
- NettyServerHandler 中运用线程池来,这样能够处理许多恳求;进行具体事务办法的调用
- xxlRpcProviderFactory.invokeService 完成具体办法调用。
上面的代码便是服务端(provider) 发动和调用的进程,比较简略明晰的。
服务注册发动是在 Netty 发动的时分,回调 start 事件发动的。
接下来讲解客户端(consumer/invoker) 的一个发动和调用进程
3.2 @XxlRpcReference
带有 XxlRpcReference 注解接口,客户端都没有显现的完成类, 因而需求依靠 XxlRpcReference 注解,署理生成一个具体的完成类,这个完成类在被调用具体办法的时分,能够映射成长途办法恳求,并将成果再回来。
整个调用能够分为以下两个中心点:
- 署理方针生成
- 长途办法调用
整理整个进程,大致如下:
接下来要点分析生成可长途调用方针的代码。
- XxlRpcSpringInvokerFactory 为入口类。在这个类,需求触发生成 @XxlRpcReference 的具体署理方针。
下图是发动的时分去填充 @XxlRpcReference 描绘的字段代码
比如 DemoService 被 @XxlRpcReference 描绘, 就需求生成具体的署理方针
@Controller
public class IndexController {
@XxlRpcReference
private DemoService demoService;
......
}
代码如下:给一切 @XxlRpcReference 描绘的特点字段生成具体的署理方针
public class XxlRpcSpringInvokerFactory implements InitializingBean, DisposableBean, BeanFactoryAware, InstantiationAwareBeanPostProcessor {
.......
// spring 后置处理接口,填充特点字段
@Override
public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {
// collection
final Set<String> serviceKeyList = new HashSet<>();
// parse XxlRpcReferenceBean
// 处理特点字段
ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
if (field.isAnnotationPresent(XxlRpcReference.class)) {
// valid
Class iface = field.getType();
if (!iface.isInterface()) {
throw new XxlRpcException("XXL-RPC, reference(XxlRpcReference) must be interface.");
}
XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);
// init reference bean
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
......
referenceBean.setInvokerFactory(xxlRpcInvokerFactory);
// get proxyObj 生成署理方针
Object serviceProxy = null;
try {
// 中心,生成长途调用的署理方针
serviceProxy = referenceBean.getObject();
} catch (Exception e) {
throw new XxlRpcException(e);
}
// set bean 反射设置特点
field.setAccessible(true);
field.set(bean, serviceProxy);
......
}
}
});
}
- serviceProxy = referenceBean.getObject(); 包装成长途调用的署理方针。
整个客户端的中心代码,有必要认真阅览一下
阅览前需求先了解 callType 有四种方法,而 referenceBean.getObject() 完成了下面 4 种方法的调用:
callType | 描绘 |
---|---|
SYNC | 同步等回来成果 |
FUTURE | future 获取成果 |
CALLBACK | 回来成果进行办法回调 |
ONEWAY | 不关注回来成果 |
由于 NIO 是异步通讯模型,调用线程并不会阻塞获取调用成果,因而,XXL-RPC 完成了在异步通讯模型上的同步调用,即 “sync-over-async” 。即怎么恰当地回来异步成果,下图是作者规划的思路,即经过 wait 和 notify 来完成异步成果的获取。
有了上面的了解,咱们再阅览中心的生成署理方针的代码就比较简略了。
下面生成署理的代码也是环绕怎么进行长途调用(省掉参数封装、泛化调用), 逻辑并不杂乱
public Object getObject() throws Exception {
......
// newProxyInstance 生成署理方针
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
.......
// send 中心,发送长途调用,对应四种方法
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
.....
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
} else if (CallType.FUTURE == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// invoke future set
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
return null;
} catch (Exception e) {
logger.info(">>>>>>>>>>> XXL-RPC, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
......
} else if (CallType.CALLBACK == callType) {
// get callback
XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("XXL-RPC XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
}
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
try {
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
} catch (Exception e) {
.......
}
return null;
} else if (CallType.ONEWAY == callType) {
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
return null;
....
}
}
});
}
关于怎么完成 “sync-over-async” 细节参考 3.3 sync 和 future 形式。
终究的长途通讯仍是运用 Netty。
- clientInstance.asyncSend(finalAddress, xxlRpcRequest);
@Override
public void send(XxlRpcRequest xxlRpcRequest) throws Exception {
this.channel.writeAndFlush(xxlRpcRequest).sync();
}
做一个简略的总结:
- XxlRpcSpringInvokerFactory 在发动的时分,完成了 postProcessAfterInstantiation 接口,在方针填充特点前调用,这个时分对具有 XxlRpcReference 特点字段的方针做该字段的填充,即生成具体的署理方针
- XxlRpcReferenceBean 署理中心类,最中心的作用便是包装,长途调用。
- 长途调用供给了四种类型。中心是完成 “sync-over-async”
根本上咱们现已摸清楚 provider/服务端 和 consumer/Invoker/客户端的完成和调用进程。
下面再追加 sync 和 future 形式的了解。
3.3 sync 和 future 形式
经过 wait() 和 notifyAll() 来处理异步成果,作者的解释,能够结合图进行了解
- 1、consumer建议恳求:consumer 会依据长途服务的 stub 实例化长途服务的署理服务,在建议恳求时,署理服务会封装本次恳求相关底层数据,如服务iface、methods、params等等,然后将数据经过 serialization 之后发送给provider;
- 2、provider 接纳恳求:provider 接纳到恳求数据,首先会deserialization获取原始恳求数据,然后依据 stub 匹配方针服务并调用;
- 3、provider 响应恳求:provider 在调用方针服务后,封装服务回来数据并进行serialization,然后把数据传输给 consumer;
- 4、consumer 接纳响应:consumer 接纳到相应数据后,首先会deserialization 获取原始数据,然后依据 stub 生成调用回来成果,回来给恳求调用处。结束。
get() 等候获取成果方针
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
// lock.wait() 等候调集
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
......
return response;
}
调用成功后,在 notifyAll()
// ---------------------- for invoke back ----------------------
public void setResponse(XxlRpcResponse response) {
this.response = response;
// 触摸等候
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
规划仍是比较精彩。
下面是 future 方法获取成果方针的方法。
// invoke future set
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
重写了 future 。 其间 get 也是在复用上面才能。
public class XxlRpcInvokeFuture implements Future {
.......
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
// 也是运用 await 和 notifyAll
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, unit);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} finally {
stop();
}
}
.......
}
经过上面两个注解进程的讲解,RPC 的调用进程就现已讲完了。
接下来再略微分析一下服务注册与发现(上面的内容是中心要点)
四、服务中心体系规划(粗讲)
内部经过播送机制,集群节点实时同步服务注册信息,保证共同。客户端凭借 long pollong 实时感知服务注册信息,简练、高效;
4.1 服务注册
服务端的调用是在 Netty 发动后注册的回调事件进行服务注册的。
serverInstance.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// start registry 注册信息
if (serviceRegistry != null) {
registerInstance = serviceRegistry.newInstance();
registerInstance.start(serviceRegistryParam);
if (serviceData.size() > 0) {
// XxlRpcService 一切的接口。进行注册
registerInstance.registry(serviceData.keySet(), registryAddress);
}
}
}
});
serverInstance.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// stop registry
if (registerInstance != null) {
if (serviceData.size() > 0) {
registerInstance.remove(serviceData.keySet(), registryAddress);
}
registerInstance.stop();
registerInstance = null;
}
}
});
然后在内部发动一个独立的线程,每隔 10 秒进行,上报注册信息
// registry thread
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!registryThreadStop) {
try {
if (registryData.size() > 0) {
// 运用 http 上报元数据信息
boolean ret = registryBaseClient.registry(new ArrayList<XxlRpcAdminRegistryDataItem>(registryData));
logger.debug(">>>>>>>>>>> xxl-rpc, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData);
}
} catch (Exception e) {
......
try {
// 歇息 10 s
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
......
}
}
}
});
服务供给者在发动 Netty 后,上报数据到服务注册中心。
运用的是 http 往注册中心进行上报(注册进程)。
4.2 服务发现
客户端调用,便是拉取注册中心的数据,选择一个合适的地址进行调用(负载均衡算法)
if (finalAddress==null || finalAddress.trim().length()==0) {
if (invokerFactory!=null && invokerFactory.getRegister()!=null) {
// 拉取数据
String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
TreeSet<String> addressSet = invokerFactory.getRegister().discovery(serviceKey);
// 负载均衡调用
if (addressSet==null || addressSet.size()==0) {
// pass
} else if (addressSet.size()==1) {
finalAddress = addressSet.first();
} else {
finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
}
}
}
另外在程序中有一个独立的线程在不断地进行数据的守时改写
com.xxl.rpc.core.registry.impl.xxlrpcadmin.XxlRpcAdminRegistryClient#XxlRpcAdminRegistryClient。 中心代码 refreshDiscoveryData
discoveryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!registryThreadStop) {
......
} else {
try {
.......
// 守时更新数据。(10s)
refreshDiscoveryData(discoveryData.keySet());
} catch (Exception e) {
.......
}
}
});
在 xxl-rpc-admin 模块中 com.xxl.rpc.admin.service.impl.XxlRpcRegistryServiceImpl#afterPropertiesSet 有几个线程在不断改写数据和播送音讯,完成服务治理,限于篇幅就不再深入。
负载均衡、泛化调用、服务监控就不逐个展开了,感兴趣的能够阅览代码进行研讨。
到这儿,对 RPC 的整体完成现已有了一个明晰的认识了。再来看看作者这张架构图就不难了解了。
✒️五、总结
XXL-RPC 是比较小而美的 RPC 结构,很轻量、简略; 关于 RPC 初学者十分的友好。 经过学习这个结构再去了解 Dubbo 就会比较轻松。
XXL-RPC 代码结构比较明晰, 底层 Netty 也是模板式的代码,XXL-RPC 作者对线程的了解也十分到位,值得推荐。
由于轻量,所以许多细节并没有考虑周全,比如线程池的高雅关闭等,这些问题,咱们学习 Dubbo 的时分再细说,本文到此结束。