一、前语

RocketMQ是一款最初由阿里巴巴开源、后边贡献给Apache并成为顶级项目。

这次文章主要将的是如何搭建RocketMQ的源码调试环境。有了源码环境后,可以经过官方的单元测试进行调试、有利于更进一步去了解设计和源码。

二、源码下载

  1. 给出官方的地址:https://github.com/apache/rocketmq/releases
  2. 本次下载版别为:rocketmq-all-4.9.6,对应的链接是:https://github.com/apache/rocketmq/archive/refs/tags/rocketmq-all-4.9.6.zip
  3. 下载zip并进行解压,经过 Idea 导入即可,最终的效果图是。

源码解析RocketMQ之源码环境搭建

略微讲一下对应的目录:

  • acl、access control list 访问控制列表
  • broker、Broker人物的代码
  • client、客户端(顾客、生产者)的代码,类sdk
  • common、公共模块
  • distribution、用于布置RocketMQ,如bin目录、conf目录
  • example、使用RocketMQ的使用事例(比较全)
  • filter,RocketMQ的相关过滤器如
  • logging、日志输出
  • namesrv、RocketMQ命名服务人物的代码
  • openmessaging、开放音讯标准
  • remoting、RocketMQ用于长途访问的模块代码
  • srvutil、用于办理和操作NameServer的命令行工具
  • store、负责音讯的存储和办理
  • test、单元测试代码
  • tools、包含多个实用的调集,用于辅佐RocketMQ的布置、办理和调试

三、源码环境搭建

大体上,环境的准备以及人物的发动和实际场景中差不多。我罗列一下:

  1. 发动命名服务NameServer:由于Broker发动过程中需要注册进去
  2. 发动Broker
  3. 经过RocketMQ提供的事例进行测试

3.1 NameServer发动

3.1.1 修正发动参数

先在Idea界面顺次点击

源码解析RocketMQ之源码环境搭建

然后会弹框斤进行环境变量装备、对应的变量名为:ROCKETMQ_HOME

源码解析RocketMQ之源码环境搭建

接下来保存好后,进入下一步。

PS:ROCKETMQ_HOME的途径是自定义的!自定义的!自定义的!我这里为了便利直接放D盘了。

3.1.2 仿制装备文件

接下来,在刚配的ROCKETMQ_HOME文件夹下面,创建3个子文件夹。

源码解析RocketMQ之源码环境搭建

conf:为了放装备文件;
logs:为了放日志文件;
store:为了放音讯存储类的文件;

建完文件夹,咱们需要去源码仿制一些装备文件。

源码解析RocketMQ之源码环境搭建

把这3货copy到conf文件夹下面,最终效果如下:

源码解析RocketMQ之源码环境搭建

3.1.3 修正装备文件

首先来改broker.conf

# 这是nameserver的地址
namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.3.6
# 这是存储途径,你设置为你的rocketmq运转目录的store子目录
storePathRootDir=<你自己ROCKETMQ_HOME途径>/store
# 这是commitLog的存储途径
storePathCommitLog=<你自己ROCKETMQ_HOME途径>/store/commitlog
# consume queue文件的存储途径
storePathConsumeQueue=<你自己ROCKETMQ_HOME途径>E/store/consumequeue
# 音讯索引文件的存储途径
storePathIndex=<你自己ROCKETMQ_HOME途径>/store/index
# checkpoint文件的存储途径
storeCheckpoint=<你自己ROCKETMQ_HOME途径>/store/checkpoint
# abort文件的存储途径
abortFile=<你自己ROCKETMQ_HOME途径>/store/abort

略微讲下参数的意思:
namesrvAddr指的是命名服务NameServer的IP地址;
brokerIP1指的是broker对应的IP地址;

3.1.4 发动

发动这步十分简略,直接点击即可。

源码解析RocketMQ之源码环境搭建

当输出以下内容则算是成功了。

源码解析RocketMQ之源码环境搭建

3.2 Broker发动

Broker发动也是十分简略。咱们依旧需要进行几步:

  1. 修正发动参数
  2. 发动

3.2.1 修正发动参数

先在Idea界面顺次点击

源码解析RocketMQ之源码环境搭建

在弹出窗口中,咱们需要装备两个参数:

  1. Program arguments
  2. Environment variables

源码解析RocketMQ之源码环境搭建

3.2.2 发动

发动也是按照图的步骤来即可。

源码解析RocketMQ之源码环境搭建

最终发动的效果图如下:

源码解析RocketMQ之源码环境搭建

3.3 事例测试

很漂亮,咱们已经快到最终一步了。现在只需要做两件事:

  1. 发送者发送音讯
  2. 顾客处理音讯

“什么?要写简易demo吗?我不想写”。
不,rocketMQ十分交心的帮你找好了demo,就在example模块里边。

3.3.1 Producer demo 生产者

按照上述进行点击。

源码解析RocketMQ之源码环境搭建

图中producer是生产者、consumer是顾客;先翻开Producer.java,然后进行IP装备。下面我就直接粘贴代码了。


import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
    public static final int MESSAGE_COUNT = 1000;
    public static final String PRODUCER_GROUP = "groupName";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TopicTest";
    public static final String TAG = "TagA";
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.setProducerGroup(PRODUCER_GROUP);
        // 装备命名服务
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        // 发动
        producer.start();
        // 循环发送音讯
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {
                Message msg = new Message(TOPIC /* Topic */,
                        TAG /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // 封闭
        producer.shutdown();
    }
}

然后点击发动,若是下图效果则成功

源码解析RocketMQ之源码环境搭建

3.3.2 Consumer demo 顾客

同理,顾客也需要对Consumer.java进行处理:

  1. 翻开Consumer.java
  2. 修正注册中心地址
  3. 运转

下面是对应的代码

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
/**
 * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
 */
public class Consumer {
    public static final String CONSUMER_GROUP = "groupName";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TopicTest";
    public static void main(String[] args) throws InterruptedException, MQClientException {
        /*
         * 初始化
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        /*
            设置注册中心
         */
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        /*
         * 订阅 topic
         */
        consumer.subscribe(TOPIC, "*");
        /*
         *  注册回调函数,便于进行监听
         */
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 发动并持续监听
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

对应的运转效果图

源码解析RocketMQ之源码环境搭建

3.3 存储级别校验

这时分咱们来看store文件夹,咱们会发现多了许多文件夹以及文件,也就是属于RocketMQ的存储系统相关的文件。下节我会持续去分析对应的存储模型。

源码解析RocketMQ之源码环境搭建

四、问题记录

4.1 closeChannel: close the connection to remote address[] result: true

这个报错一般是生产者/顾客已经连上了命名服务,但是没有对应的顾客/生产者地址导致的;

4.2 org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

连接超时。这个报错一般是,你的broker.conf对应的brokerIP地址没写对,导致生产者连接broker的时分,超时导致;

4.3 java: 程序包xxx不存在

当你遇到类似的报错的时分

源码解析RocketMQ之源码环境搭建

请记住这一般是jdk的问题。处理的方法:

  1. 确认你现在jdk的版别。现在我自己运转在jdk1.8和jdk11是没什么问题
  2. 修正模块的jdk版别为你现在运转的版别(修正Target bytecode version

源码解析RocketMQ之源码环境搭建

源码解析RocketMQ之源码环境搭建

  1. 修正对应项目的版别

源码解析RocketMQ之源码环境搭建

源码解析RocketMQ之源码环境搭建

再重新发动试试,祝好运 :)

五、重视我

微信查找公众号:野区杰西