大家好,我是小趴菜,接下来我会从0到1手写一个RPC结构,该专题包括以下专题,有爱好的小伙伴就跟着我一同学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01
服务提供者收发音讯根底完结 -> opt-01
自定义网络传输协议的完结 -> opt-02
自定义编解码完结 -> opt-03
服务提供者调用实在方法完结 -> opt-04
完善服务顾客发送音讯根底功用 -> opt-05
注册中心根底功用完结 -> opt-06
服务提供者整合注册中心 -> opt-07
服务顾客整合注册中心 -> opt-08
完善服务顾客接纳照应成果 -> opt-09
服务顾客,服务提供者整合SpringBoot -> opt-10
动态署理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制根底功用完结 -> opt-11
SPI机制扩展随机负载均衡战略 -> opt-12
SPI机制扩展轮询负载均衡战略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16
前语
在之前的章节中,我们现已完结了服务提供者的收发音讯功用,并且服务提供者可以调用实在方法并回来照应成果。
可是我们发现,发送音讯是在服务顾客注册的时分,发送了一条音讯,这明显不符合我们的需求。
这一章我们完善下服务顾客发送音讯的根底功用
完结
修正服务顾客:com.xpc.rpc.consumer.RpcConsumer
package com.xpc.rpc.consumer;
import com.xpc.rpc.codec.RpcDecoder;
import com.xpc.rpc.codec.RpcEncoder;
import com.xpc.rpc.consumer.handler.RpcConsumerHandler;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.request.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RpcConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class);
private Bootstrap bootstrap;
private EventLoopGroup eventLoopGroup;
private Map<String,RpcConsumerHandler> handlerMap = new ConcurrentHashMap<>();
public RpcConsumer() {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//我们自己的编解码器
pipeline.addLast(new RpcDecoder());
pipeline.addLast(new RpcEncoder());
pipeline.addLast(new RpcConsumerHandler());
}
});
}
public void sendRequest(ProtocolMessage<RpcRequest> requestProtocolMessage) {
//先写死,后续会通过注册中心获取
String host = "127.0.0.1";
int port = 21778;
RpcRequest request = requestProtocolMessage.getT();
String key = buildHandlerMapKey(request.getClassName(), request.getMethodName());
RpcConsumerHandler consumerHandler;
if(handlerMap.containsKey(key)) {
consumerHandler = handlerMap.get(key);
Channel channel = consumerHandler.getChannel();
if(!channel.isOpen() || !channel.isActive()) {
consumerHandler = getConsumerHandler(key, host, port);
handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
}
}else {
consumerHandler = getConsumerHandler(key, host, port);
if(consumerHandler == null) {
throw new RuntimeException("");
}
handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
}
//发送音讯
consumerHandler.sendRequest(requestProtocolMessage);
}
private RpcConsumerHandler getConsumerHandler(String key,String host, int port) {
try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if(future.isSuccess()) {
LOGGER.info("客户端衔接成功........");
}else {
LOGGER.info("客户端衔接失利........");
}
}
});
RpcConsumerHandler rpcConsumerHandler = channelFuture.channel().pipeline().get(RpcConsumerHandler.class);
handlerMap.put(key,rpcConsumerHandler);
return rpcConsumerHandler;
} catch (InterruptedException e) {
return null;
}
}
public void close() {
if(eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
private String buildHandlerMapKey(String className,String methodName) {
StringBuilder key = new StringBuilder();
key.append(className).append("#").append(methodName);
return key.toString();
}
}
- 首先是修正了结构方法,现在没有直接去获取衔接了,只是作为初始化
- 新增sendRequest()方法,这个就是服务顾客发送音讯的方法了
- 继而会调用RpcConsumerHandler.sendRequest()方法去实在的发送音讯给服务提供者
修正服务顾客的自定义处理器:com.xpc.rpc.consumer.handler.RpcConsumerHandler
package com.xpc.rpc.consumer.handler;
import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcConsumerHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcResponse>> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);
private Channel channel;
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.channel = ctx.channel();
}
//获取照应成果
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtocolMessage<RpcResponse> protocolMessage) throws Exception {
LOGGER.info("code: {}",protocolMessage.getT().getCode());
LOGGER.info("data: {}",protocolMessage.getT().getData());
}
//发送央求到服务提供者
public void sendRequest(ProtocolMessage<RpcRequest> requestProtocolMessage) {
channel.writeAndFlush(requestProtocolMessage);
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
}
到此代码修正就完结了,接下来就是检验了
检验
发动服务提供者:com.xpc.test.netty.providerTest
package com.xpc.test.netty;
import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;
public class ProviderTest {
@Test
public void startNetty() {
BaseServer baseServer = new BaseServer("com.xpc");
baseServer.startNettyServer();
}
}
修正并发动服务顾客 com.xpc.test.netty.ConsumerTest
package com.xpc.test.netty;
import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.consumer.RpcConsumer;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import org.junit.Test;
public class ConsumerTest {
@Test
public void startConsumer() throws Exception{
RpcConsumer rpcConsumer = new RpcConsumer();
//发送央求
rpcConsumer.sendRequest(getRequest());
Thread.sleep(5000);
rpcConsumer.close();
}
//结构央求
private ProtocolMessage<RpcRequest> getRequest() {
ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<RpcRequest>();
RpcHeader rpcHeader = new RpcHeader();
rpcHeader.setMsgType(RpcMsgType.REQUEST.getType());
rpcHeader.setRequestId(1L);
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName("com.xpc.test.scanner.DemoService");
rpcRequest.setMethodName("hello");
rpcRequest.setParameterTypes(new Class[]{String.class});
rpcRequest.setParameters(new Object[]{"coco"});
protocolMessage.setRpcHeader(rpcHeader);
protocolMessage.setT(rpcRequest);
return protocolMessage;
}
}
服务提供者日志:
服务顾客日志: