一、前语
RocketMQ是一款最初由阿里巴巴开源、后边贡献给Apache并成为顶级项目。
这次文章主要将的是如何搭建RocketMQ的源码调试环境。有了源码环境后,可以经过官方的单元测试进行调试、有利于更进一步去了解设计和源码。
二、源码下载
- 给出官方的地址:
https://github.com/apache/rocketmq/releases
- 本次下载版别为:
rocketmq-all-4.9.6
,对应的链接是:https://github.com/apache/rocketmq/archive/refs/tags/rocketmq-all-4.9.6.zip
- 下载zip并进行解压,经过 Idea 导入即可,最终的效果图是。
略微讲一下对应的目录:
- acl、access control list 访问控制列表
- broker、Broker人物的代码
- client、客户端(顾客、生产者)的代码,类sdk
- common、公共模块
- distribution、用于布置RocketMQ,如bin目录、conf目录
- example、使用RocketMQ的使用事例(比较全)
- filter,RocketMQ的相关过滤器如
- logging、日志输出
- namesrv、RocketMQ命名服务人物的代码
- openmessaging、开放音讯标准
- remoting、RocketMQ用于长途访问的模块代码
- srvutil、用于办理和操作NameServer的命令行工具
- store、负责音讯的存储和办理
- test、单元测试代码
- tools、包含多个实用的调集,用于辅佐RocketMQ的布置、办理和调试
三、源码环境搭建
大体上,环境的准备以及人物的发动和实际场景中差不多。我罗列一下:
- 发动命名服务
NameServer
:由于Broker发动过程中需要注册进去 - 发动Broker
- 经过RocketMQ提供的事例进行测试
3.1 NameServer发动
3.1.1 修正发动参数
先在Idea界面顺次点击
然后会弹框斤进行环境变量装备、对应的变量名为:ROCKETMQ_HOME
接下来保存好后,进入下一步。
PS:ROCKETMQ_HOME
的途径是自定义的!自定义的!自定义的!我这里为了便利直接放D盘了。
3.1.2 仿制装备文件
接下来,在刚配的ROCKETMQ_HOME文件夹下面,创建3个子文件夹。
conf:为了放装备文件;
logs:为了放日志文件;
store:为了放音讯存储类的文件;
建完文件夹,咱们需要去源码仿制一些装备文件。
把这3货copy到conf文件夹下面,最终效果如下:
3.1.3 修正装备文件
首先来改broker.conf
# 这是nameserver的地址
namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.3.6
# 这是存储途径,你设置为你的rocketmq运转目录的store子目录
storePathRootDir=<你自己ROCKETMQ_HOME途径>/store
# 这是commitLog的存储途径
storePathCommitLog=<你自己ROCKETMQ_HOME途径>/store/commitlog
# consume queue文件的存储途径
storePathConsumeQueue=<你自己ROCKETMQ_HOME途径>E/store/consumequeue
# 音讯索引文件的存储途径
storePathIndex=<你自己ROCKETMQ_HOME途径>/store/index
# checkpoint文件的存储途径
storeCheckpoint=<你自己ROCKETMQ_HOME途径>/store/checkpoint
# abort文件的存储途径
abortFile=<你自己ROCKETMQ_HOME途径>/store/abort
略微讲下参数的意思:
namesrvAddr指的是命名服务NameServer的IP地址;
brokerIP1指的是broker对应的IP地址;
3.1.4 发动
发动这步十分简略,直接点击即可。
当输出以下内容则算是成功了。
3.2 Broker发动
Broker发动也是十分简略。咱们依旧需要进行几步:
- 修正发动参数
- 发动
3.2.1 修正发动参数
先在Idea界面顺次点击
在弹出窗口中,咱们需要装备两个参数:
- Program arguments
- Environment variables
3.2.2 发动
发动也是按照图的步骤来即可。
最终发动的效果图如下:
3.3 事例测试
很漂亮,咱们已经快到最终一步了。现在只需要做两件事:
- 发送者发送音讯
- 顾客处理音讯
“什么?要写简易demo吗?我不想写”。
不,rocketMQ十分交心的帮你找好了demo,就在example模块里边。
3.3.1 Producer demo 生产者
按照上述进行点击。
图中producer是生产者、consumer是顾客;先翻开Producer.java,然后进行IP装备。下面我就直接粘贴代码了。
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static final int MESSAGE_COUNT = 1000;
public static final String PRODUCER_GROUP = "groupName";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest";
public static final String TAG = "TagA";
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setProducerGroup(PRODUCER_GROUP);
// 装备命名服务
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 发动
producer.start();
// 循环发送音讯
for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
// 封闭
producer.shutdown();
}
}
然后点击发动,若是下图效果则成功
3.3.2 Consumer demo 顾客
同理,顾客也需要对Consumer.java进行处理:
- 翻开Consumer.java
- 修正注册中心地址
- 运转
下面是对应的代码
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
/**
* This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
*/
public class Consumer {
public static final String CONSUMER_GROUP = "groupName";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest";
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* 初始化
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
/*
设置注册中心
*/
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* 订阅 topic
*/
consumer.subscribe(TOPIC, "*");
/*
* 注册回调函数,便于进行监听
*/
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 发动并持续监听
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
对应的运转效果图
3.3 存储级别校验
这时分咱们来看store文件夹,咱们会发现多了许多文件夹以及文件,也就是属于RocketMQ的存储系统相关的文件。下节我会持续去分析对应的存储模型。
四、问题记录
4.1 closeChannel: close the connection to remote address[] result: true
这个报错一般是生产者/顾客已经连上了命名服务,但是没有对应的顾客/生产者地址导致的;
4.2 org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
连接超时。这个报错一般是,你的broker.conf对应的brokerIP地址没写对,导致生产者连接broker的时分,超时导致;
4.3 java: 程序包xxx不存在
当你遇到类似的报错的时分
请记住这一般是jdk的问题。处理的方法:
- 确认你现在jdk的版别。现在我自己运转在jdk1.8和jdk11是没什么问题
- 修正模块的jdk版别为你现在运转的版别(修正
Target bytecode version
)
- 修正对应项目的版别
再重新发动试试,祝好运 :)
五、重视我
微信查找公众号:野区杰西