大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包含以下专题,有爱好的小伙伴就跟着我一起学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01
服务提供者收发音讯根底完成 -> opt-01
自定义网络传输协议的完成 -> opt-02
自定义编解码完成 -> opt-03
服务提供者调用实在办法完成 -> opt-04
完善服务顾客发送音讯根底功用 -> opt-05
注册中心根底功用完成 -> opt-06
服务提供者整合注册中心 -> opt-07
服务顾客整合注册中心 -> opt-08
完善服务顾客接收呼应成果 -> opt-09
服务顾客,服务提供者整合SpringBoot -> opt-10
动态署理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制根底功用完成 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16
前语
在上一章中咱们规划了自己的网络传输协议,而且完成了。但是现在的服务端只能接收String类型的协议。也就是Netty自带的。假如咱们此刻直接运用是会有问题的。所以咱们还需要完成自己的编解码器
规划
服务顾客通过自定义网络传输协议发送恳求,首先会通过编码器的处理,然后通过网络传输到达服务提供者,服务提供者通过解码器处理之后,拿到咱们能够处理的数据。之后返回呼应数据
完成
创立子项目 xpc-rpc-codec,pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xpc-rpc</artifactId>
<groupId>com.xpc</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>xpc-rpc-codec</artifactId>
<dependencies>
<dependency>
<groupId>com.xpc</groupId>
<artifactId>xpc-rpc-protocol</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.xpc</groupId>
<artifactId>xpc-rpc-serialization-jdk</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.xpc</groupId>
<artifactId>xpc-rpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
创立一个通用的接口 com.xpc.rpc.codec.RpcCoder
package com.xpc.rpc.codec;
import com.xpc.rpc.common.serialization.Serialization;
/**
* 通用接口
*/
public interface RpcCoder {
/**
* 获取具体的序列化接口
* 后续会运用SPI机制进行扩展,现在默认给一个JDK的序列化方式
*/
Serialization getJdkSerialization();
}
创立解码器:com.xpc.rpc.codec.RpcDecoder
package com.xpc.rpc.codec;
import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.common.serialization.Serialization;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import com.xpc.rpc.serialization.jdk.JdkSerialization;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 解码器
*/
public class RpcDecoder extends ByteToMessageDecoder implements RpcCoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes() < 16) {
return;
}
//拿到恳求音讯类型
int msgType = in.readInt();
//拿到恳求体长度
int bodyLen = in.readInt();
//拿到恳求id
long requestId = in.readLong();
if(in.readableBytes() < bodyLen) {
//表示读取的数据还不行
in.resetReaderIndex();
return;
}
//获取恳求体
byte[] data = new byte[bodyLen];
in.readBytes(data);
RpcHeader rpcHeader = new RpcHeader();
rpcHeader.setRequestId(requestId);
rpcHeader.setBodyLen(bodyLen);
rpcHeader.setMsgType(msgType);
ProtocolMessage protocolMessage = new ProtocolMessage();
protocolMessage.setRpcHeader(rpcHeader);
//获取序列化接口,后续会运用SPI机制扩展
Serialization jdkSerialization = getJdkSerialization();
RpcMsgType rpcMsgType = RpcMsgType.findByType(msgType);
switch (rpcMsgType){
case REQUEST:
RpcRequest request = jdkSerialization.deserialize(data, RpcRequest.class);
protocolMessage.setT(request);
break;
case RESPONSE:
RpcResponse response = jdkSerialization.deserialize(data, RpcResponse.class);
protocolMessage.setT(response);
break;
}
//更新读索引
in.markReaderIndex();
//最后通过管道写出去
out.add(protocolMessage);
}
@Override
public Serialization getJdkSerialization() {
return new JdkSerialization();
}
}
关于该类中第一个判断为什么是16,这是解释一下,一个完好的恳求包含恳求头和恳求体,咱们恳求头包含 音讯类型,恳求体长度,恳求仅有ID,int是4字节,long占8字节,所以恳求头一共占16字节,那还有恳求体呢? 其实一个恳求的恳求体为空也是有可能的,也就是 bodyLen = 0的时候,所以一个完好的恳求大小至少要等于 16字节
/**
* 恳求类型,是普通恳求,或者是心跳音讯等
*/
private int msgType;
/**
* 音讯体的长度
*/
private int bodyLen;
/**
* 恳求的仅有ID
*/
private Long requestId;
创立编码器:com.xpc.rpc.codec.RpcEncoder
package com.xpc.rpc.codec;
import com.xpc.rpc.common.serialization.Serialization;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.serialization.jdk.JdkSerialization;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class RpcEncoder extends MessageToByteEncoder<ProtocolMessage> implements RpcCoder {
@Override
public Serialization getJdkSerialization() {
return new JdkSerialization();
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMessage protocolMessage, ByteBuf out) throws Exception {
out.writeInt(protocolMessage.getRpcHeader().getMsgType());
byte[] bytes = getJdkSerialization().serialize(protocolMessage.getT());
out.writeInt(bytes.length);
out.writeLong(protocolMessage.getRpcHeader().getRequestId());
out.writeBytes(bytes);
}
}
然后将服务端和客户端的编解码器都替换成咱们自己完成的编解码器
服务端:com.xpc.rpc.provider.server.base.BaseServer,修正startNettyServer()办法
@Override
public void startNettyServer() {
//其它代码省掉
bootstrap.group(bossGroup,workerGroup)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//Netty自带的编解码器
//pipeline.addLast(new StringDecoder());
//pipeline.addLast(new StringEncoder());
//替换成咱们自己的编解码器
pipeline.addLast(new RpcDecoder());
pipeline.addLast(new RpcEncoder());
//咱们自己完成的处理器
pipeline.addLast(new RpcProviderHandler(handlerMap));
}
});
try {
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 21778).sync();
LOGGER.info("Netty 服务端发动成功............");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.error("Netty 服务端发动失利:{}",e);
}finally {
shutDown();
}
}
修正com.xpc.rpc.provide.handler.RpcProviderHandler
package com.xpc.rpc.provider.handler;
import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class RpcProviderHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcRequest>> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class);
private Map<String,Object> handlerMap;
public RpcProviderHandler(Map<String,Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage<RpcRequest> msg) throws Exception {
LOGGER.info("className: {}",msg.getT().getClassName());
LOGGER.info("method:{}",msg.getT().getMethodName());
LOGGER.info("ParameterTypes:{}",msg.getT().getParameterTypes());
LOGGER.info("Parameters:{}",msg.getT().getParameters());
//写回呼应数据
ProtocolMessage<RpcResponse> protocolMessage = new ProtocolMessage<RpcResponse>();
RpcHeader rpcHeader = msg.getRpcHeader();
rpcHeader.setMsgType(RpcMsgType.RESPONSE.getType());
protocolMessage.setRpcHeader(rpcHeader);
RpcResponse response = new RpcResponse();
response.setCode(200);
response.setData("success");
protocolMessage.setT(response);
ctx.writeAndFlush(protocolMessage);
}
}
修正 com.xpc.rpc.consumer.RpcConsumer 的构造办法办法
public RpcConsumer() {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//Netty自带的编解码器
//pipeline.addLast(new StringDecoder());
//pipeline.addLast(new StringEncoder());
//替换成咱们自己的编解码器
pipeline.addLast(new RpcDecoder());
pipeline.addLast(new RpcEncoder());
pipeline.addLast(new RpcConsumerHandler());
}
});
try {
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 21778).sync();
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if(future.isSuccess()) {
LOGGER.info("客户端衔接成功........");
}else {
LOGGER.info("客户端衔接失利........");
}
}
});
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
修正com.xpc.rpc.consumer.handler.RpcConsumerHandler
package com.xpc.rpc.consumer.handler;
import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcConsumerHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcResponse>> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//写一条音讯到服务端
ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<RpcRequest>();
RpcHeader rpcHeader = new RpcHeader();
rpcHeader.setMsgType(RpcMsgType.REQUEST.getType());
rpcHeader.setRequestId(1L);
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName("test");
rpcRequest.setMethodName("hello");
rpcRequest.setParameterTypes(new Class[]{String.class});
rpcRequest.setParameters(new Object[]{"coco"});
protocolMessage.setRpcHeader(rpcHeader);
protocolMessage.setT(rpcRequest);
ctx.writeAndFlush(protocolMessage);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtocolMessage<RpcResponse> protocolMessage) throws Exception {
LOGGER.info("code: {}",protocolMessage.getT().getCode());
LOGGER.info("data: {}",protocolMessage.getT().getData());
}
}
测试
先发动服务提供者:com.xpc.test.netty.ProviderTest
然后发动服务顾客:com.xpc.test.netty.ConsumerTest
服务端日志:
客户端日志: