一、背景

Netty 是由 JBOSS 供给的一个 java 开源结构,现为 Github 上的独立项目。Netty 供给异步的、事情驱动的网络应用程序结构和东西,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个依据NIO的客户、服务器端的编程结构,运用 Netty 能够确保你快速和简略的开宣布一个网络应用,例如完结了某种协议的客户端、服务端应用。Netty 相当于简化和流线化了网络应用的编程开发进程,例如:依据 TCP 和 UDP 的 socket 服务开发。

今日,这儿就分享一个依据 Netty 完结的 RPC 结构。

什么是RPC

RPC,是指 Remote Procedure Call Protocol,中文释义为长途进程调用,是一种进程间通讯办法,它是一种技术的思想,它答应程序调用另一个地址空间(同享网络的另一台机器上)的进程或函数。

简略来说,调用长途的服务时,给你的感觉就像是调用本地的服务相同,RPC 通讯对用户来说是完全透明的,运用者无需关怀恳求是怎么宣布去的、发到了哪里,每次调用只需要拿到正确的调用成果就行。

干流的 RPC 结构有 dubbo、spring-cloud 等,而 dubbo 的底层通讯正是依据 Netty 完结的。

项目根本流程

1. 客户端 Clustomer 调用长途拜访接口 PersonService
2. PersonService 在客户端发动时会主动创立 PersonService 的署理目标 Proxy
3. Proxy 将恳求数据发给客户端处理器 RpcClientHandler
4. RpcClientHandler 将恳求数据发给服务端处理器 RpcServerHandler
5. RpcServerHandler 经过反射 Reflect 找到长途拜访接口完结类 PersonServiceImpl
6. 履行 PersonServiceImpl 的办法,获取履行成果 
7. RpcServerHandler 将履行成果传输给 RpcClientHandler
8. RpcClientHandler 接纳到服务端的呼应后,经过调用链路将履行成果回传 Clustomer

Netty 完成的一个 RPC 框架

代码中的出现自界说注解/自界说类,假如没有将它们的代码贴出来,这些注解或目标只需知道它们是干嘛的,暂时不理睬它们是怎么完结的(由于该项目还自界说了 IOC 容器,这儿暂时不对 IOC 容器进行讲解,并且有其他简便的办法能够替换它们完结的功用),想要了解能够前往 github 地址看源码

项目源码:github.com/yehuisheng-…

二、引进 maven 依靠

<dependencies>
    <!-- 引进netty依靠 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.76.Final</version>
    </dependency>
    <!-- 反射库 -->
    <dependency>
        <groupId>org.reflections</groupId>
        <artifactId>reflections</artifactId>
        <version>0.10.2</version>
    </dependency>
    <!-- 序列化 -->
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.9</version>
    </dependency>
</dependencies>

三、界说传输格式

1、自界说协议

  • MsgProtocol 是自界说协议,规则了通讯两边数据处理的格式
public class MsgProtocol {
    private int length;
    private byte[] msg;
    public int getLength() {
        return length;
    }
    public byte[] getMsg() {
        return msg;
    }
    public void setLength(int length) {
        this.length = length;
    }
    public void setMsg(byte[] msg) {
        this.msg = msg;
    }
    public MsgProtocol() {
    }
    public MsgProtocol(byte[] msg) {
        this.msg = msg;
        this.length = msg.length;
    }
}

2、恳求目标

  • RequestMsg 是恳求目标,包含了客户端恳求的接口、办法、形参等数据
public class RequestMsg implements Serializable {
    /** 恳求参数 */
    private Object[] params;
    /** 参数类型 */
    private Class<?>[] paramsType;
    /** 恳求接口 */
    private Class<?> clazz;
    /** 恳求办法 */
    private String methodName;
    public Object[] getParams() {
        return params;
    }
    public void setParams(Object[] params) {
        this.params = params;
    }
    public Class<?> getClazz() {
        return clazz;
    }
    public void setClazz(Class<?> clazz) {
        this.clazz = clazz;
    }
    public String getMethodName() {
        return methodName;
    }
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
    public Class<?>[] getParamsType() {
        return paramsType;
    }
    public void setParamsType(Class<?>[] paramsType) {
        this.paramsType = paramsType;
    }
    @Override
    public String toString() {
        return "RpcRequestMsg{" +
                "params=" + Arrays.toString(params) +
                ", clazz=" + clazz +
                ", methodName=" + methodName +
                '}';
    }
}

3、解码器/编码器

  • ProDeCoder 自界说解码器,将 Pipeline 上接纳的数据转为 MsgProtocol 目标
public class ProDeCoder extends ReplayingDecoder<Void> {
    /**
     *  解码器 - 从字节省的缓冲区中解析通讯节点发送过来的数据
     */
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
        // 获取数据长度
        int length = byteBuf.readInt();
        // 树立缓存区
        byte[] bytes = new byte[length];
        // 正确读取数据到缓冲区中
        byteBuf.readBytes(bytes);
        /*
         *  将解码的数据经过 pipeline 调用链给下一个处理器
         */
        MsgProtocol protocol = new MsgProtocol();
        protocol.setLength(length);
        protocol.setMsg(bytes);
        list.add(protocol);
    }
}
  • ProEnCoder 自界说编码器,将 Pipeline 上要传输出去的 MsgProtocol 目标转为 byte 数组
public class ProEnCoder extends MessageToByteEncoder<MsgProtocol> {
    /**
     *  编码器 - 将数据的长度放入缓冲区中,发送给通讯的节点
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MsgProtocol msgProtocol, ByteBuf byteBuf) {
        byteBuf.writeInt(msgProtocol.getLength());
        byteBuf.writeBytes(msgProtocol.getMsg());
    }
}

4、字节操作东西类

  • ByteUtil 是字节操作东西类,只要两个办法:将字节数组转为 Java 目标、将 Java 目标转为字节数组
public class ByteUtil {
    /**
     * @param bytes     字节数组
     * @param clazz     类型
     * @param <T>       泛型转化
     * @return  将字节数组转为 Java 目标
     */
    public static <T> T cast(byte[] bytes, Class<T> clazz) {
        if (bytes == null || clazz == null) {
            return null;
        }
        // 读取字节数组,转为 Object
        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
             ObjectInputStream ois = new ObjectInputStream(bis)) {
            Object obj = ois.readObject();
            if (obj instanceof String) {
                // 用 Gson 进行反序列化
                Gson gson = new Gson();
                return gson.fromJson(obj.toString(), clazz);
            } else {
                return clazz.cast(obj);
            }
        } catch (Exception e) {
            throw new ClassCastException("类型转化反常," + e.getMessage());
        }
    }
    /**
     * @param obj    Object 类型的目标
     * @return      将目标转为字节数组
     */
    public static byte[] getBytes(Object obj) {
        if (obj instanceof Serializable) {
            // 现已序列化了
            byte[] bytes = null;
            try (ByteArrayOutputStream bo = new ByteArrayOutputStream();
                 ObjectOutputStream oos = new ObjectOutputStream(bo)) {
                oos.writeObject(obj);
                bytes = bo.toByteArray();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return bytes;
        } else {
            // 运用 Gson 进行序列化
            Gson gson = new Gson();
            String json = gson.toJson(obj);
            return getBytes(json);
        }
    }
}

四、建立服务端

  • ConfigProperties 是自界说装备类,作用是加载 properties 文件的装备信息,这儿用到的装备只要 IP 地址和端口,感兴趣的话也能够将 ServerBootstrap 目标的装备加到装备文件,由用户决定装备项

  • @ObjectScan 是自界说注解,类似于 Spring 的 @Component 注解,作用是协助 IOC 容器发动时主动创立目标

@ObjectScan
public class ConfigProperties {
    private final Properties properties;
    public ConfigProperties() {
        this.properties = new Properties();
        loadConfig();
    }
    /** 地址 */
    private String address;
    /** 端口 */
    private int port;
    /**
     *  加载装备文件
     */
    private void loadConfig() {
        File directory = new File("");
        String courseFile;
        try {
            courseFile = directory.getCanonicalPath();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        String path = courseFile + "\target\classes\rpc.properties";
        try (FileInputStream fis = new FileInputStream(path);
             InputStream is = new BufferedInputStream(fis)) {
            properties.load(is);
        } catch (FileNotFoundException ignored) {
            // 装备文件不存在就运用默许值
        } catch (Exception e) {
            e.printStackTrace();
        }
        refreshContext();
    }
    /**
     *  改写装备信息
     */
    private void refreshContext() {
        this.address = temple("address", "127.0.0.1", String.class);
        this.port = temple("port", 9999, Integer.class);
        System.out.println("加载装备文件,address = " + address
                + ", port = " + port);
    }
    /**
     *
     * @param fieldName     字段名
     * @param defaultValue  默许值
     * @param clazz         类型
     * @param <T>       泛型
     * @return          回来装备文件中的值,假如没有则运用默许值
     */
    private <T> T temple(String fieldName, T defaultValue, Class<T> clazz) {
        Object o = properties.get(fieldName);
        if (o != null) {
            String s = o.toString().replaceAll(" ", "");
            if ("".equals(s)) {
                return defaultValue;
            } else if (Integer.class.equals(clazz) && isNumber(s)) {
                return clazz.cast(Integer.parseInt(s));
            } else if (String.class.equals(clazz)) {
                return clazz.cast(s);
            } else {
                throw new RuntimeException("装备文件属性" + fieldName +  "类型过错,请查看");
            }
        }
        return defaultValue;
    }
    /**
     * @param value     字段值
     * @return          正则表达式判别是否整数
     */
    private boolean isNumber(String value) {
        return value.matches("-?\d+");
    }
    public String getAddress() {
        return address;
    }
    public int getPort() {
        return port;
    }
}

1、channel 处理器

  • @AutoImport 是自界说注解,类似于 Spring 的 @Autowired 注解,作用是对增加注解的字段进行主动注入

  • BeanFactory 是自界说 Bean 工厂(IOC容器),类似于 Spring 的 BeanFactory 目标,作用是管理 IOC 容器的目标

/**
 *
 *  @ChannelHandler.Sharable 增加注解只是标明当时 Handler 是可同享的,会在增加到 Pipeline 时去做判别,
 *                             假如 Handler 是单例,可是没有增加 Sharable 注解,Netty 就会抛反常。
 *                             Netty 并不会帮你完结单例,你增加了注解后,还需要自行将 Handler 设置为单例。
 *
 * @author yehuisheng
 */
@ObjectScan
@ChannelHandler.Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<MsgProtocol> {
    @AutoImport
    private BeanFactory beanFactory;
    /**
     *  有客户端衔接,就会触发该办法
     * @param ctx   channel上下文目标
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("有客户端衔接了");
    }
    /**
     *  读取客户端的恳求,进行处理
     * @param ctx   channel上下文目标
     * @param msg   音讯体
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MsgProtocol msg) throws Exception {
        // 解析长途服务调用的恳求数据
        RequestMsg request = ByteUtil.cast(msg.getMsg(), RequestMsg.class);
        // 经过 Bean 工厂获取接口完结类
        Object instance = beanFactory.getServiceImpl(request.getClazz());
        // 依据参数类型和办法名找到办法目标,履行该办法取得接口的履行成果
        Class<?>[] paramsType = request.getParamsType();
        Method method = ObjectUtil.isEmpty(paramsType)
                ? instance.getClass().getDeclaredMethod(request.getMethodName())
                : instance.getClass().getDeclaredMethod(request.getMethodName(), paramsType);
        Object res = method.invoke(instance, request.getParams());
        /*
         *  判别有无回来值,Void 只完结了 Serializable 接口的空目标,
         *  只是表明没有回来值,在客户端获取成果的办法中能够看到它们的运用
         */
        boolean hasReturn = !Void.class.getSimpleName().toLowerCase()
                .equalsIgnoreCase(method.getReturnType().getName());
        /*
         *  Null 只完结了 Serializable 接口的空目标,只是表明数据为空,
         *  为了在调用 ByteUtil.getBytes() 能够将空值序列化
         */
        Object data = hasReturn ? (res == null ? new Null() : res) : new Void();
        // 将数据封装为自界说协议,回来客户端
        byte[] bytes = ByteUtil.getBytes(data);
        ctx.writeAndFlush(new MsgProtocol(bytes));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
        cause.printStackTrace();
    }
}

2、服务端

@ObjectScan
public class RpcNettyServer {
    @AutoImport
    private ConfigProperties configProperties;
    @AutoImport
    private RpcServerHandler rpcServerHandler;
    private EventExecutorGroup eventExecutors;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    public void start() {
        final int processors = Runtime.getRuntime().availableProcessors();
        // 担任接纳客户端衔接的线程池
        bossGroup = new NioEventLoopGroup(processors > 1 ? processors/2 : processors);
        // 担任IO操作/使命处理的线程池
        workerGroup = new NioEventLoopGroup(processors * 2);
        // 自界说异步使命线程组
        eventExecutors = new DefaultEventExecutorGroup(processors);
        try {
            /*
             *  1、初始化两个线程组
             *  2、设置 NIO 通讯 channel
             *  3、界说堵塞行列的长度
             *  4、设置是否监控客户端的衔接状况
             *  5、增加信道(channel)的处理器
             */
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 增加编码器,解码器,handler,处理 handler 的线程池
                            pipeline.addLast(new ProDeCoder())
                                    .addLast(new ProEnCoder())
                                    .addLast(eventExecutors, rpcServerHandler);
                        }
                    });
            // 绑定装备类加载的端口
            ChannelFuture channelFuture = bootstrap.bind(configProperties.getPort()).sync();
            System.out.println("RPC 服务发动成功。。。");
            // 堵塞当时代码,使 netty 服务器一向处于运转状况
            channelFuture.channel().closeFuture().sync();
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            close();
        }
    }
    public void close() {
        boolean success = false;
        if (ObjectUtil.canShutdownThreadPool(bossGroup)) {
            bossGroup.shutdownGracefully();
            success = true;
        }
        if (ObjectUtil.canShutdownThreadPool(workerGroup)) {
            workerGroup.shutdownGracefully();
            success = true;
        }
        if (ObjectUtil.canShutdownThreadPool(eventExecutors)) {
            eventExecutors.shutdownGracefully();
            success = true;
        }
        if (success) {
            System.out.println("服务端封闭服务了。。。");
        }
    }
}

五、建立客户端

1、chanel 处理器

客户端处理器做这样的作业:

  1. 获取接口的恳求办法、形参,将这些数据发送到服务端
  2. 数据发送后,调用 Object 的 wait 办法,线程进入等候状况
  3. 处理器接纳到服务端的处理成果后,就调用 Object 的 notify 办法唤醒等候的线程
  4. 等候的线程唤醒后,履行下一步操作,将处理成果回来给长途拜访接口的调用方
/**
 *
 *  @ChannelHandler.Sharable 增加注解只是标明当时 Handler 是可同享的,会在增加到 Pipeline 时去做判别,
 *                             假如 Handler 是单例,可是没有增加 Sharable 注解,Netty 就会抛反常。
 *                             Netty 并不会帮你完结单例,你增加了注解后,还需要自行将 Handler 设置为单例。
 *
 * @author yehuisheng
 */
@ObjectScan
@ChannelHandler.Sharable
public class RpcClientHandler<T> extends SimpleChannelInboundHandler<MsgProtocol> implements Supplier<T> {
    private ChannelHandlerContext channel;
    /** 恳求成果 */
    private Object result;
    /** 恳求目标 */
    private RequestMsg request;
    public void setRequest(RequestMsg request) {
        this.request = request;
    }
    /**
     *  与服务端成功树立衔接
     * @param ctx   channel上下文目标
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        channel = ctx;
    }
    /**
     *  读取服务器的信息
     * @param ctx   channel上下文目标
     * @param msg   音讯体
     */
    @Override
    protected synchronized void channelRead0(ChannelHandlerContext ctx, MsgProtocol msg) {
        this.result = ByteUtil.cast(msg.getMsg(), Object.class);
        // 接纳到服务端的恳求后,唤醒 get 办法持续履行
        this.notify();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
    /**
     * @return  获取服务端的长途接口的处理成果
     */
    @Override
    @SuppressWarnings("unchecked")
    public synchronized T get() {
        if (request == null) {
            throw new IllegalArgumentException("找不到恳求信息");
        }
        if (channel == null) {
            throw new IllegalArgumentException("服务端未敞开");
        }
        try {
            byte[] bytes = ByteUtil.getBytes(this.request);
            // 向服务端发送恳求
            channel.writeAndFlush(new MsgProtocol(bytes));
            // 等候 channelRead0() 呼应服务端的恳求成果
            this.wait();
        } catch (Exception e) {
            e.printStackTrace();
        }
        /*
         *  假如办法无回来值,服务端也不回传音讯,客户端就会一向堵塞在 Object.wait() 中,
         *  因而需要运用特定目标表明
         */
        return (result instanceof Void || result instanceof Null) ? null : (T) result;
    }
}

2、客户端

客户端主要做两个作业:

  1. 树立与服务端的衔接
  2. 给长途拜访接口创立署理目标,当长途署理目标调用办法时,将当时接口的类型、办法和参数封装成 RequestMsg 目标,再由 clientHandler 目标带着数据拜访服务端,获取接口的处理成果

clientHandler 是同享目标,为了线程安全需要在署理接口调用的方位加锁。

这儿有一个问题,clientHandler 内部运用了 synchronized + wait/notify,wait 的办法会开释当时线程持有的目标锁。因而,假如署理接口调用 clientHandler 时运用 synchronized 对 clientHandler 目标加锁,那么当 clientHandler 内部调用 Object 的 wait 办法时,也会把这个方位的锁给开释掉,就会出现线程安全问题。

怎么解决这个问题呢?

计划一: 署理接口调用处理器的方位 synchronized 不直接锁 clientHandler 目标,而是创立一个 Object 目标当作锁,锁定 clientHandler 目标

计划二: clientHandler 目标内部的 wait/notify 换成 Lock 接口的 await/signal

@ObjectScan
public class RpcNettyClient {
    @AutoImport
    private ConfigProperties configProperties;
    @AutoImport
    private RpcClientHandler<?> clientHandler;
    private EventLoopGroup eventLoopGroup;
    private final Object lock = new Object();
    /**
     * @param clazz     获取的接口类型
     * @param <T>       泛型
     * @return          获取接口的署理目标
     */
    public <T> T getBeanInterface(Class<T> clazz) {
        if (!clazz.isInterface()) {
            throw new IllegalArgumentException("clazz不是接口类型");
        }
        // 经过 JDK 动态署理创立署理目标
        Object instance = Proxy.newProxyInstance(
                Thread.currentThread().getContextClassLoader(),
                new Class[]{clazz},
                (proxy, method, args) -> {
                    // 设置恳求目标的参数
                    RequestMsg request = new RequestMsg();
                    request.setClazz(clazz);
                    request.setParamsType(method.getParameterTypes());
                    request.setMethodName(method.getName());
                    request.setParams(args);
                    synchronized (lock) {
                        clientHandler.setRequest(request);
                        return clientHandler.get();
                    }
                }
        );
        return clazz.cast(instance);
    }
    /**
     *  敞开客户端,衔接 netty 服务器
     */
    public void start() {
        eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    // 设置线程池
                    .group(eventLoopGroup)
                    // 设置 NIO 通道
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            // 增加编解码器和自界说事务处理器
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new ProDeCoder())
                                    .addLast(new ProEnCoder())
                                    .addLast(clientHandler);
                        }
                    });
            System.out.println("衔接长途服务成功。。。");
            // 绑定服务器地址和端口号
            ChannelFuture channelFuture = bootstrap.connect(configProperties.getAddress(), configProperties.getPort());
            // 不堵塞客户端
            channelFuture.sync();
            // 堵塞客户端
//            channelFuture.channel().closeFuture().sync();
        } catch (Throwable e) {
            e.printStackTrace();
            close();
        }
    }
    public void close() {
        boolean success = false;
        if (ObjectUtil.canShutdownThreadPool(eventLoopGroup)) {
            eventLoopGroup.shutdownGracefully();
            success = true;
        }
        if (success) {
            System.out.println("客户端封闭服务了。。。");
        }
    }
}

六、创立长途拜访接口

1、接口

public interface PersonService {
    void add(String name, String sex, int age);
    Object get(String name);
    Object remove(String name);
    int count();
    int number(Integer n);
    int number(int n);
}

2、接口完结类

  • @Service 是自界说注解,作用是服务端将当时目标向外暴露,使得客户端能够长途拜访这个目标
@Service
public class PersonServiceImpl implements PersonService {
    private final Map<String, JsonObject> personMap = new ConcurrentHashMap<>();
    @Override
    public void add(String name, String sex, int age) {
        if (ObjectUtil.isEmpty(name) || ObjectUtil.isEmpty(sex)
                || (!"男".equals(sex) && !"女".equals(sex))) {
            return;
        }
        JsonObject person = new JsonObject();
        person.addProperty("name", name);
        person.addProperty("sex", sex);
        person.addProperty("age", age);
        personMap.put(name, person);
    }
    @Override
    public JsonObject get(String name) {
        return ObjectUtil.isEmpty(name) ? null : personMap.get(name);
    }
    @Override
    public JsonObject remove(String name) {
        return ObjectUtil.isEmpty(name) ? null : personMap.remove(name);
    }
    @Override
    public int count() {
        return personMap.size();
    }
    @Override
    public int number(Integer n) {
        return n == null ? -1 : n + 10;
    }
    @Override
    public int number(int n) {
        return n;
    }
}

七、创立服务供给者和顾客

1、线程池

  • RpcThreadPool 自界说线程池
  • SelfRejectedPolicy 自界说回绝战略,线程被回绝后,睡眠一段时刻,再持续测验参加线程池的作业行列,假如某线程超越指定的时刻阈值,还加不入线程池的作业行列,就抛弃该线程(也能够音讯行列处理、短信邮件报警、记载到日志文件中等其他办法)
@ObjectScan
public class RpcThreadPool extends ThreadPoolExecutor {
    private static final int PROCESSORS = Runtime.getRuntime().availableProcessors();
    public RpcThreadPool() {
        // 依据服务器装备和项目处理的使命复杂度,装备合适的线程池参数
        super(PROCESSORS > 2 ? PROCESSORS/3 : PROCESSORS,
                PROCESSORS*2,
                3L, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(PROCESSORS * 500),
                Executors.defaultThreadFactory(),
                new SelfRejectedPolicy());
    }
}
class SelfRejectedPolicy implements RejectedExecutionHandler {
    /**
     *  测验再次进入线程池作业行列的时刻(单位毫秒)
     *  假如线程超越这个时刻还未进入作业行列,则抛弃当时线程
     */
    private final long againTryAddQueueTime;
    /** 被回绝的线程睡眠多长时刻后再测验参加线程池的作业行列(单位毫秒) */
    private final long sleepTime;
    SelfRejectedPolicy() {
        // 默许 againTryAddQueueTime = 300
        this(300);
    }
    SelfRejectedPolicy(long againTryAddQueueTime) {
        // 默许 sleepTime = 50
        this(againTryAddQueueTime, 50);
    }
    SelfRejectedPolicy(long againTryAddQueueTime, long sleepTime) {
        this.againTryAddQueueTime = againTryAddQueueTime;
        this.sleepTime = sleepTime;
    }
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        Runnable runnable;
        // 判别当时线程是否被线程池回绝过
        if (r instanceof RejectedRunnable) {
            RejectedRunnable rejectedRunnable = (RejectedRunnable) r;
            // 判别线程花费在进入作业行列的时刻,有没有超出阈值
            if (rejectedRunnable.getAliveTime() > againTryAddQueueTime) {
                System.err.println("抛弃线程:" + rejectedRunnable);
                return;
            }
            runnable = r;
        } else {
            // 包装该线程,并默许记载花费在进入作业行列的时刻
            runnable = new RejectedRunnable(r);
        }
        try {
            // 睡眠一段时刻后,再运用线程池调用该使命
            TimeUnit.MILLISECONDS.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 无论是 Runnable 仍是 Callable 接口,终究都是运用 execute() 参加使命行列
        executor.execute(runnable);
    }
}
  • RejectedRunnable 回绝线程类,包装被线程池回绝的使命,并在自界说回绝战略中测验再次参加线程池的作业行列
public class RejectedRunnable implements Runnable {
    /** 被线程池回绝的线程 */
    private final Runnable runnable;
    /** 当时目标的创立时刻 */
    private final long createTime;
    public RejectedRunnable(Runnable runnable) {
        this.runnable = runnable;
        this.createTime = System.currentTimeMillis();
    }
    @Override
    public void run() {
        runnable.run();
    }
    /**
     * @return  获取当时目标的存活时刻
     */
    public long getAliveTime() {
        return System.currentTimeMillis() - createTime;
    }
}

2、进口类

  • Applications 服务敞开/封闭的进口类,当时类只要两个重要的办法
  1. run() 经过注解判别当时是客户端仍是服务端,并终究调用 RpcNettyClient 或 RpcNettyServer 的 start 办法敞开服务

  2. close() 封闭当时的服务和线程池

public class Applications {
    private static BeanFactory clientBeanFactory;
    private static BeanFactory serverBeanFactory;
    /**
     *  获取办法调用者,并且判别是否有敞开注解,然后运转对应的 IOC 容器
     */
    public static void run() {
        try {
            // 获取调用方的类目标
            String className = new Exception().getStackTrace()[1].getClassName();
            Class<?> clazz = Class.forName(className);
            BeanFactory beanFactory;
            // 注解判别服务端仍是客户端
            if (clazz.isAnnotationPresent(EnableRpcClient.class)) {
                clientBeanFactory = new ClientBeanFactory();
                beanFactory = clientBeanFactory;
            } else if (clazz.isAnnotationPresent(EnableRpcServer.class)) {
                serverBeanFactory = new ServerBeanFactory();
                beanFactory = serverBeanFactory;
            } else {
                throw new RuntimeException("请敞开RPC注解功用");
            }
            // 发动服务,客户端默许不堵塞,而服务端会在这个办法堵塞,监听 Channel 事情
            beanFactory.refresh();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     *  封闭客户端或服务端
     */
    public static void close() {
        if (serverBeanFactory != null) {
            close(serverBeanFactory);
        }
        if (clientBeanFactory != null) {
            close(clientBeanFactory);
        }
    }
    /**
     *  封闭目标工厂和线程池
     * @param beanFactory   目标工厂
     */
    private static void close(BeanFactory beanFactory) {
        RpcThreadPool threadPool = beanFactory.get(RpcThreadPool.class);
        if (threadPool != null && !threadPool.isShutdown()) {
            threadPool.shutdown();
        }
        beanFactory.close();
    }
}

3、服务供给者

  • @EnableRpcServer 自界说注解,作用是敞开服务供给者的注解功用
@EnableRpcServer
public class Provider {
    public static void main(String[] args) {
        Applications.run();
    }
}

4、顾客

  • @EnableRpcClient 自界说注解,作用是敞开服务顾客的注解功用

  • @Reference 自界说注解,作用是标示当时接口是长途拜访接口,和上文创立接口完结类的 @Service 注解一一对应

顾客这儿运用了线程池,创立多个线程一起拜访服务端,测验高并发下是否存在某些问题

@EnableRpcClient
public class Customer {
    @Reference
    private static PersonService personService;
    @AutoImport
    private static RpcThreadPool rpcThreadPool;
    /** 调用次数 */
    private final static int TIMES = 5000;
    private final static Random RANDOM = new Random();
    private static Integer num;
    public static void main(String[] args) {
        Applications.run();
        long millis = System.currentTimeMillis();
        // 多线程履行长途拜访
        for (int i = 0; i < TIMES; i++) {
            int finalI = i;
            num = i;
            rpcThreadPool.execute(() -> {
                String name = name();
                String sex = sex();
                int age = age();
                personService.add(name, sex, age);
                name = name();
                Object person = personService.get(name);
                System.out.println(finalI + " -> call get(" + name + ") = " + person);
                name = name();
                Object remove = personService.remove(name);
                System.out.println(finalI + " -> call remove(" + name + ") = " + remove);
                System.out.println(finalI + " -> call count = " + personService.count());
                System.out.println(finalI + " -> call number(int) = " + personService.number(finalI));
                System.out.println(num + " -> call number(Integer) = " + personService.number(num));
            });
        }
        // 线程池的使命履行完毕,才履行剩余的代码
        while (rpcThreadPool.getTaskCount() != rpcThreadPool.getCompletedTaskCount()) {}
        System.err.println("交由线程池的总使命数量:" + TIMES);
        System.err.println("线程池完结的使命数量:" + rpcThreadPool.getCompletedTaskCount());
        System.err.println("线程池未完结的使命数量:" + (TIMES - rpcThreadPool.getCompletedTaskCount()));
        long time = System.currentTimeMillis() - millis;
        System.err.println("耗时:" + (time/1000) + "秒" + (time%1000) + "毫秒");
        Applications.close();
    }
    /**
     * @return  主动生成姓名
     */
    private static String name() {
        String uuid = UUID.randomUUID().toString().replaceAll("-", "");
        int nameLength = RANDOM.nextInt(6);
        return uuid.length() < nameLength ? uuid : uuid.substring(0, nameLength);
    }
    /**
     * @return  随机选择性别
     */
    private static String sex() {
        return (RANDOM.nextInt() % 2 == 0) ? "男" : "女";
    }
    /**
     * @return  随机获取年龄
     */
    private static int age() {
        return RANDOM.nextInt(100);
    }
}

八、测验

  • 发动服务端
  • 发动客户端,并经过线程池调用长途拜访接口

Netty 完成的一个 RPC 框架

Netty 完成的一个 RPC 框架

能够看到测验成功,功用都是正常的,好了,一个依据 Netty 完结的 RPC 结构就此完结