作者:小傅哥
博客:bugstack.cn
原文:bugstack.cn/md/road-map…

沉积、共享、生长,让自己和别人都能有所收成!

本文的宗旨在于经过简略干净实践的办法教会读者,为什么要运用Dubbo、怎样运用Dubbo、Dubbo通讯的原理是什么。在学习本文后,你能够避开很多关于 Dubbo 运用时的坑,也能更清楚自己的编码是在做什么。

本文涉及的工程:

  • xfg-dev-tech-dubbo:gitcode.net/KnowledgePl…
  • xfg-dev-tech-dubbo-test:gitcode.net/KnowledgePl…

一、为什么运用

跟着互联网场景中所要面临的用户规模和体量的增加,体系的也需求做相应的拆分规划和完成。随之而来的,曾经的一套体系,现在成了多个微服务。如;电商体系,曾经就在一个工程中写就能够了,现在需求拆分出,用户、付出、产品、配送、活动、风控等各个模块。那么这些模块拆分后,如何高效的通讯呢?

Dubbo 我手写几行代码,就把通信模式给你解释清楚!
  • 关于通讯,就引进了 RPC 框架,而 Dubbo 便是其间的一个完成办法。
  • 那为啥用 Dubbo 呢?其实中心问题就一个,为了进步通讯效率。由于 Dubbo 的底层通讯是 Socket 而不是 HTTP 所以通讯的性能会更好。同时 Dubbo 又有分布式的高可用规划,在一组布置了买卖服务的实例宕机后,会被从注册中心去除,之后流量会打到其他服务上。

二、要怎样运用

Dubbo 我手写几行代码,就把通信模式给你解释清楚!

Dubbo 的运用分为2方,一个是接口的供给方,别的一个是接口的调用方。接口的供给方需求供给出被调用方运用接口的描述性信息。这个信息包含;接口称号、接口入参、接口出参,只需让调用方拿到这些信息今后,它才干依托于这样的接口信息做一个署理操作,并在署理类中运用 Socket 完成两边的信息交互。

所以你看上去调用 RPC 接口如同和运用 HTTP 也没啥差异,无非便是引进了 POM 装备,之后再装备了注解就能够运用了。但其实,它是把你的 Jar 当做署理的必要参数运用了。本文也会介绍,详细是怎样署理的

三、运用的事例

关于编程的学习来说,其实最开始的那一下,不是搞明白一切原理,而是先让自己能够看到运转出来的作用。哎,之后就去剖析原理,这样会舒畅的多。

所以小傅哥这儿供给了一套简略的 Dubbo 运用事例,只需你满足最基本的装备条件,就能够运转出作用;

  1. JDK 1.8
  2. Maven 3.x – jdk1.8支撑的就能够
  3. Dubbo 3.1.4 – POM 中已经装备,与2.x最大的运用上的差异便是一些注解的运用
  4. Zookeeper 3.4.x – 假如你只是依照本文中的直连形式测验,那么不安装 Zookeeper 也能够

1. 接口供给方

工程事例创立结构,选用的是 DDD 结构。但和 DDD 一点联系没有。假如你对工程创立有疑惑,能够参阅 《Java 简明教程》之 DDD 架构

Dubbo 我手写几行代码,就把通信模式给你解释清楚!

1.1 接口定义

源码cn.bugstack.dev.tech.dubbo.api.IUserService

public interface IUserService {
    Response<UserResDTO> queryUserInfo(UserReqDTO reqDTO);
}
  • 接口定义平平无奇,但第1个坑暗藏玄机!
  • 也便是,一切的 Dubbo 接口,出入参,默许都需求承继 Serializable 接口。也便是 UserReqDTO、UserResDTO、Response 这3个类,都得承继 Serializable 序列化接口。

1.2 接口完成

源码cn.bugstack.dev.tech.dubbo.trigger.rpc.UserService

@Slf4j
@DubboService(version = "1.0.0")
public class UserService implements IUserService {
    @Override
    public Response<UserResDTO> queryUserInfo(UserReqDTO reqDTO) {
        log.info("查询用户信息 userId: {} reqStr: {}", reqDTO.getUserId(), JSON.toJSONString(reqDTO));
        try {
            // 1. 模仿查询【你能够从数据库或者Redis缓存获取数据】
            UserResDTO resDTO = UserResDTO.builder()
                    .userId(reqDTO.getUserId())
                    .userName("小傅哥")
                    .userAge(20)
                    .build();
            // 2. 回来成果
            return Response.<UserResDTO>builder()
                    .code(Constants.ResponseCode.SUCCESS.getCode())
                    .info(Constants.ResponseCode.SUCCESS.getInfo())
                    .data(resDTO).build();
        } catch (Exception e) {
            log.error("查询用户信息失败 userId: {} reqStr: {}", reqDTO.getUserId(), JSON.toJSONString(reqDTO), e);
            return Response.<UserResDTO>builder()
                    .code(Constants.ResponseCode.UN_ERROR.getCode())
                    .info(Constants.ResponseCode.UN_ERROR.getInfo())
                    .build();
        }
    }
}
  • 接口完成平平无奇,但第2个坑暗藏玄机!
  • Dubbo 的完成接口,需求被 Dubbo 自己办理。所以 Dubbo 供给了 @DubboService 注解。有些小卡拉米,运用的是不是 Spring 的 @Service 呀?尤其是曾经的 Dubbo 版别 2.7.* 它的注解也是 @Service 也不留神就用成了 Spring 的 @Service。一个小bug,又调了一上午。

1.3 工程装备

application.yml

dubbo:
  application:
    name: xfg-dev-tech-dubbo
    version: 1.0.0
  registry:
    address: zookeeper://127.0.0.1:2181 # N/A - 无zookeeper可装备 N/A 走直连形式测验
  protocol:
    name: dubbo
    port: 20881
  scan:
    base-packages: cn.bugstack.dev.tech.dubbo.api
  • 装备信息平平无奇,但第3个坑暗藏玄机!
  • base-packages 扫描的是哪里装备了 Dubbo 的 API 入口,给它入口就行,它会自己找到完成类。但!你要知道 Java 的 Spring 运用能扫描到,能被 Spring 办理,那么 pom 要直接或者直接的引导到定义了 Dubbo 的模块。
  • 再有一个问题,Spring 运用开发,讲究约定大于装备。你 Application 运用,的包名应该是能够掩盖到其他包名的。比方 Application 都装备到 cn.bugstack.dev.tech.dubbo.a.b.c.d.* 去了,它默许就扫不到 cn.bugstack.dev.tech.dubbo.api 了。一个小bug,一下午又过去了。
  • 留意:address:假如装备的是 N/A 便是不走任何注册中心,便是个直连,首要用于本地验证的。假如你装备了 zookeeper://127.0.0.1:2181 就需求先安装一个 zookeeper 别的,即便你装备了注册中心的办法,也能够直连测验。

1.4 运用构建

以上信息都准备了,一群小卡拉米开始掉到第4个坑里了!

你有2个运用,一个Dubbo接口供给方、一个Dubbo接口运用方。那么你在给你别的一个运用运用接口的时分,你在 InelliJ IDEA 的 Maven 中执行 Install 了吗?

Install 是干啥的?它是为了让你运用了同一个本地 Maven 装备的运用,能够引进到对方供给的 Jar 包。你 Install 今后,这个 Jar 包就会进入到本地 Maven 仓库了。假如是公司里开发,会有专门的自己家布置的,私有Maven中心仓库,就能够经过 deploy 把本地 Jar 发布上去,那么公司里的同伴,也就都能够引用了。

Dubbo 我手写几行代码,就把通信模式给你解释清楚!
  • 你要先点击 root 下的 install 操作,这样就会自动构建了。
  • 假如你电脑装备有点低,也会出现一些气人怪相,比方就刷不进去,install 了也引用不了。记得要 clean 清空下,也能够直接到 maven 文件件去清空。

2. 接口运用方

有些小卡拉米觉得前面的抗都扫干净了,就完事了。没有接下来还有坑,让你一搞搞一天,半夜也睡不好。

2.1 POM 引进

<dependency>
    <groupId>cn.bugstack</groupId>
    <artifactId>xfg-dev-tech-dubbo-api</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>
  • POM 的装备,便是把 Jar 包给引用进来。由于 Dubbo 需求依据这个接口,做一个署理操作。不引进,你代码就爆红啦!爆红啦!

2.2 消费装备

源码application.yml

dubbo:
  application:
    name: xfg-dev-tech-dubbo
    version: 1.0.0
  registry:
     address: zookeeper://127.0.0.1:2181
#    address: N/A
  protocol:
    name: dubbo
    port: 20881
  • 装备了 zookeeper 你就用第一个,代码中对应 @DubboReference(interfaceClass = IUserService.class, version = "1.0.0")
  • 装备了 N/A 你就用第二个,代码中有必要指定直连。@DubboReference(interfaceClass = IUserService.class, url = "dubbo://127.0.0.1:20881", version = "1.0.0")

2.3 代码装备

源码cn.bugstack.dev.tech.dubbo.consumer.test.ApiTest

// 直连形式;@DubboReference(interfaceClass = IUserService.class, url = "dubbo://127.0.0.1:20881", version = "1.0.0")
@DubboReference(interfaceClass = IUserService.class, version = "1.0.0")
private IUserService userService;
@Test
public void test_userService() {
    UserReqDTO reqDTO = UserReqDTO.builder().userId("10001").build();
    Response<UserResDTO> resDTO = userService.queryUserInfo(reqDTO);
    log.info("测验成果 req: {} res: {}", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO));
}

测验成果

2023-07-08 15:37:22.291  INFO 62481 --- [           main] c.b.d.tech.dubbo.consumer.test.ApiTest   : 测验成果 req: {"userId":"10001"} res: {"code":"0000","data":{"userAge":20,"userId":"10001","userName":"小傅哥"},"info":"成功"}
2023-07-08 15:37:22.324  INFO 62481 --- [tor-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl        : backgroundOperationsLoop exiting
  • 假如不出啥意外,到这你就能够直接启动运转了。并看到测验成果。
  • 但别忘记了,你启动的时分,需求先启动 xfg-dev-tech-dubbo 让接口供给方跑起来。

四、原理的剖析

都说 Jar 是供给可描述性信息的,对方才干署理调用。那么这个进程是怎样干的呢,总不能一问这个,就让小卡拉米们去手写 Dubbo 呀!所以小傅哥会经过最简略模型结构,让你了解这个 Dubbo 通讯的原理,方便小卡拉米们上手。

Dubbo 我手写几行代码,就把通信模式给你解释清楚!
  • 假如所示,接口运用方,对接口进行署理。什么是署理呢,署理便是用一个包装的结构,代替原有的操作。在这个包装的结构里,你能够自己扩展出恣意的办法。
  • 那么,这儿的署理。便是依据接口的信息,创立出一个署理目标,在署理目标中,供给 Socket 恳求。当调用这个接口的时分,就能够对接口供给方的,建议 Socket 恳求了。
  • 而 Socket 接收方,也便是接口供给方。他收到信息今后,依据接口的描述性内容,进行一个反射调用。这下就把信息给恳求出来,之后再经过 Socket 回来回去就能够了。

好,中心的原理就这么点。接下来,我们从代码中看看。

1. 接口署理 – 供给方

源码cn.bugstack.dev.tech.dubbo.trigger.socket.RpcServerSocket

@Slf4j
@Service
public class RpcServerSocket implements Runnable {
    private ApplicationContext applicationContext;
    public RpcServerSocket(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
        new Thread(this).start();
    }
    @Override
    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) {
                            channel.pipeline().addLast(new ObjectEncoder());
                            channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                            channel.pipeline().addLast(new SimpleChannelInboundHandler<Map<String, Object>>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, Map<String, Object> request) throws Exception {
                                    // 解析参数
                                    Class<?> clazz = (Class<?>) request.get("clazz");
                                    String methodName = (String) request.get("methodName");
                                    Class<?>[] paramTypes = (Class<?>[]) request.get("paramTypes");
                                    Object[] args = (Object[]) request.get("args");
                                    // 反射调用
                                    Method method = clazz.getMethod(methodName, paramTypes);
                                    Object invoke = method.invoke(applicationContext.getBean(clazz), args);
                                    // 封装成果
                                    Map<String, Object> response = new HashMap<>();
                                    response.put("data", invoke);
                                    log.info("RPC 恳求调用 clazz:{} methodName:{}, response:{}", clazz.getName(), methodName, JSON.toJSON(response));
                                    // 回写数据
                                    channelHandlerContext.channel().writeAndFlush(response);
                                }
                            });
                        }
                    });
            ChannelFuture f = b.bind(22881).sync();
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

这段代码首要供给的功用包含;

  1. Netty Socket 启动一个服务端
  2. 注入 ApplicationContext applicationContext 用于在接收到恳求接口信息后,获取对应的 Bean 目标。
  3. 依据恳求来的 Bean 目标,以及参数的必要信息。进行接口的反射调用。
  4. 最后一步,便是把接口反射恳求的信息,再经过 Socket 回来回去。

2. 接口反射 – 调用方

翻开工程:xfg-dev-tech-dubbo-test

源码cn.bugstack.dev.tech.dubbo.consumer.config.RPCProxyBeanFactory

@Slf4j
@Component("rpcProxyBeanFactory")
public class RPCProxyBeanFactory implements FactoryBean<IUserService>, Runnable {
    private Channel channel;
    // 缓存数据,实际RPC会对每次的调用生成一个ID来符号获取
    private Object responseCache;
    public RPCProxyBeanFactory() throws InterruptedException {
        new Thread(this).start();
        while (null == channel) {
            Thread.sleep(150);
            log.info("Rpc Socket 链接等候...");
        }
    }
    @Override
    public IUserService getObject() throws Exception {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Class<?>[] classes = {IUserService.class};
        InvocationHandler handler = new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if (Object.class.equals(method.getDeclaringClass())) {
                    return method.invoke(this, args);
                }
                Map<String, Object> request = new HashMap<>();
                request.put("clazz", IUserService.class);
                request.put("methodName", method.getName());
                request.put("paramTypes", method.getParameterTypes());
                request.put("args", args);
                channel.writeAndFlush(request);
                // 模仿超时等候,一般RPC接口恳求,都有一个超时等候时长。
                Thread.sleep(350);
                return responseCache;
            }
        };
        return (IUserService) Proxy.newProxyInstance(classLoader, classes, handler);
    }
    @Override
    public Class<?> getObjectType() {
        return IUserService.class;
    }
    @Override
    public void run() {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.AUTO_READ, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new ObjectEncoder());
                            channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                            channel.pipeline().addLast(new SimpleChannelInboundHandler<Map<String, Object>>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, Map<String, Object> data) throws Exception {
                                    responseCache = data.get("data");
                                }
                            });
                        }
                    });
            ChannelFuture channelFuture = b.connect("127.0.0.1", 22881).syncUninterruptibly();
            this.channel = channelFuture.channel();
            channelFuture.channel().closeFuture().syncUninterruptibly();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

这段代码首要供给的功用包含;

  1. 完成 FactoryBean<IUserService> 为的是把这样一个署理目标,交给 Spring 容器办理。
  2. 完成 Runnable 接口,并在接口中,创立 Netty 的 Socket 客户端。客户端中接收来自服务端的音讯,并临时存放到缓存中。留意 Dubbo 中这块的处理会杂乱一些,以及恳求同步呼应通讯,这样才干把各个接口的调集记录下来
  3. getObject() 目标中,供给署理操作。署理里,就能够自己想咋搞咋搞了。而 Dubbo 也是在署理里,供给了如此的操作,对接口供给方发送恳求音讯,并在超时时间内回来接口信息。由于反射调用,需求你供给类办法入参类型入参内容,所以我们要把这些信息传递给接口供给方。

3. 服务测验 – 消费验证

  • 启动 xfg-dev-tech-dubbo
  • 测验 xfg-dev-tech-dubbo-test
@Resource(name = "rpcProxyBeanFactory")
private IUserService proxyUserService;
@Test
public void test_proxyUserService(){
    UserReqDTO reqDTO = UserReqDTO.builder().userId("10001").build();
    Response<UserResDTO> resDTO = proxyUserService.queryUserInfo(reqDTO);
    log.info("测验成果 req: {} res: {}", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO));
}

测验成果

2023-07-08 16:14:51.322  INFO 74498 --- [           main] c.b.d.tech.dubbo.consumer.test.ApiTest   : 测验成果 req: {"userId":"10001"} res: {"code":"0000","data":{"userAge":20,"userId":"10001","userName":"小傅哥"},"info":"成功"}
  • 这儿我们给 IUserService 注入一个自己署理好的目标,之后就能够调用验证了。
  • 好啦,到这我们就把关于 Dubbo 的事交代明白了,以上内容较多。小卡拉米需求细细的品尝吸收!