一、文章核心内容
通过篇文章,将会用到以下的内容项,可以在生产中直接使用
-
RocketMQ与SpringBoot的整合以及基础的应用配置
-
RocketMQTjson怎么读emplate模板类的各种设计模式的集体教学活动方案使用,如顺序消息、异步消息、响应消http://192.168.1.1登录息、单向消息、指定消息k设计模式及其应用场景ey等等
-
R源码编辑器ocketMQ多租户/环境自动隔离topicjson格式怎么打开/group/tag,如只需实体类型要配置基础的topic、group、tag,部署到dev、test、pr实体类od环境json自动隔离,只需要写一次配置;多租户同样适用
-
Java时间模块支持,默认如果RocketMQMessageListener用实体类接收消息时,字段不支持LocalDate/Locahttp 500lDateTime类型,发送会报错,需要增加单独处理
-
启动日志报Rock源码交易平台etMQLog:WARN Pl源码编程器ease initialize thjson格式e l实体类ogger system properly错误解决
文章预告
-
下一篇文章分享在企业级项目中RocketMQ的二次封装,达到核JSON心代码抽离,实现类只需要关注业务模块,日志记录,自动重试啥啥的全都扔给抽象层
-
抽象层源码精灵永久兑换码是json怎么读解耦、复用的一大有效手段,需要结合业务、设计模式、实战经验,抽象http://www.baidu.com出一个适合自身项目的抽象层
特殊说明
-
这篇文章分享的是高级部分,并非基础RocketMQ,所以RocketMQ基础使用、Rhttp代理ocketMQ控制台等基础部分不会过多说明
二、核心使用
2.http://www.baidu.com1 与SpringBoot整合
整体项目结构如下,文实体类是什么章未列出所有代码,未提及的设计模式请参考下面源代码
- 文章源码:gitee.comjson数据/tianxincode…
- 添加maven设计模式有哪些依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.codecoord</groupId>
<artifactId>springboot-rocketmq</artifactId>
<version>1.0</version>
<name>springboot-rocketmq</name>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<!-- RocketMQ核心依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 配置文件增加服务器配置
- 核心的配置在application.yml配置注释中有说明
server:
port: 8888
spring:
application:
name: springboot-rocketmq
rocketmq:
# 多个NameServer,host:port;host:port,RocketMQProperties
name-server: 127.0.0.1:9876
producer:
# 发送同一类消息的设置为同一个group,保证唯一
group: springboot_producer_group
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 发送消息超时时间,默认3000
sendMessageTimeout: 3000
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
# access-key
#accessKey: xxx
# secret-key
#secretKey: xxx
# 是否启用消息跟踪,默认false
enableMsgTrace: false
# 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称
customizedTraceTopic: RMQ_SYS_TRACE_TOPIC
consumer:
# 配置指定group是否启动监听器 group1.topic1 = false
listeners:
# key:group名称
rocketmq_source_code_group:
# value:{key: topic名称: value: true/false}
rocketmq_source_code: true
# 指定消费组
group: springboot_consumer_group
# 指定topic,启动时就会注册
#topic: springboot_consumer_xxx
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
# 其他配置参考属性类
logging:
level:
io.netty: ERROR
# 关闭RocketmqClient info日志,不然每隔几秒就会打印broker信息
RocketmqClient: error
- 新建启动类
- 解决启动RocketMQLog:WARN Please initialize the logger system properly.报错
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* RocketMQ启动类
*/
@SpringBootApplication
public class RocketMqApplication {
public static void main(String[] args) {
/*
* 指定使用的日志框架,否则将会告警
* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
* RocketMQLog:WARN Please initialize the logger system properly.
*/
System.setProperty("rocketmq.client.logUseSlf4j", "true");
SpringApplication.run(RocketMqApplication.class, args);
}
}
- 以上完成,和SpringBoot整合已经完成,基本上大部分框架与SpringBoot整合都只需要以下三步
- 添加pom
- applihttp协议cation.yml配置
- 启动类(如果需要开启注解)
2.2 项目基础数据类
- 新建消息实体类,LocalDate和LocalDateTime类型默认不支持,需要单独处理,参考后面配置,因为LocalDate和LocalDateTime日常使用中用的较多
import com.codecoord.rocketmq.config.RocketMqConfig;
import lombok.Data;
import java.time.LocalDate;
import java.time.LocalDateTime;
@Data
public class RocketMqMessage {
private Long id;
private String message;
private String version;
/**
* LocalDate和LocalDateTime默认不支持,需要单独处理
* {@link RocketMqConfig}
*/
private LocalDate currentDate;
private LocalDateTime currentDateTime;
}
- 新增常量类,包含主要源码编辑器topic等信息
- 生产环境中一但上线90%情况下不会去更改原有的tag或者topic消息信息,所json数据以可以将其定义为常量类,方便全局引用,避免出错
public interface RocketMqBizConstant {
String SOURCE_TOPIC = "rocketmq_source_code_topic";
String SOURCE_GROUP = "rocketmq_source_code_group";
String SOURCE_TAG = "rocketmq_source_code_tag";
}
- 也可以在配置文件中定义,然后通过 ${} 表达式注入,这json解析种方式适用于需要更改不同配置信息让队列生效情况
# 配置文件中配置组信息
service:
topic: rocketmq_source_code_topic
group: rocketmq_source_code_group
tag: rocketmq_source_code_tag
- 新增延迟等级常量类,延迟等级和基础RocketMQ中的延迟等级一致
public interface RocketMqDelayLevel {
int ONE_SECOND = 1;
int FIVE_SECOND = 2;
int TEN_SECOND = 3;
int THIRTY_SECOND = 4;
int ONE_MINUTE = 5;
int TWO_MINUTE = 6;
int THREE_MINUTE = 7;
int FOUR_MINUTE = 8;
int FIVE_MINUTE = 9;
int SIX_MINUTE = 10;
int SEVEN_MINUTE = 11;
int EIGHT_MINUTE = 12;
int NINE_MINUTE = 13;
int TEN_MINUTE = 14;
int TWENTY_MINUTE = 15;
int THIRTY_MINUTE = 16;
int ONE_HOUR = 17;
int TWO_HOUR = 18;
}
2.3 RocketMQ配置
- 配置跨json解析域,此配置可选,是为了方便网页版apipost请求的时候处理浏览器跨域问题,如果不需要测试可以不配置
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* web拦截器配置,可以使用网页版本的apipost
*/
@Component
public class WebConfig implements WebMvcConfigurer {
/**
* 跨域配置方式一
*/
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*")
.allowedHeaders("*")
.allowedMethods("*")
// .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
.maxAge(3600L);
}
/**
* 跨域配置方式二
*/
/*@Bean
public CorsFilter corsFilter() {
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
CorsConfiguration configuration = new CorsConfiguration();
configuration.setAllowCredentials(true);
configuration.setMaxAge(3600L);
// 配置允许的原始ip、头、方法
configuration.addAllowedOrigin("*");
configuration.addAllowedHeader("*");
configuration.addAllowedMethod("*");
source.registerCorsConfiguration("/**", configuration);
return new CorsFilter(source);
}*/
}
- 重要:RocketMQ序列化器处理,增加Java8时间类型处理,配置完成之后可以在实httpwatch体类中使用Java8时间类型作为属性
- 未配置出现错误参考第四节
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import java.util.List;
/**
* RocketMQ序列化器处理
*/
@Configuration
public class RocketMqConfig {
/**
* 解决RocketMQ Jackson不支持Java时间类型配置
* 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
*/
@Bean
@Primary
public RocketMQMessageConverter createRocketMQMessageConverter() {
RocketMQMessageConverter converter = new RocketMQMessageConverter();
CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
for (MessageConverter messageConverter : messageConverterList) {
if (messageConverter instanceof MappingJackson2MessageConverter) {
MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
// 增加Java8时间模块支持,实体类可以传递LocalDate/LocalDateTime
objectMapper.registerModules(new JavaTimeModule());
}
}
return converter;
}
}
2源码之家.4 RocketMQTemplate发送消息
- SpringBoot中使用RocketMQTemplate模板类进行消息发送
- org.apache.rocketmq.spring.corejson是什么意思.RocketMQTemplate
- 多种发送消息方式示例
- 注意:发送带key消息是通过设置请求头完成
- org.apache.rocketmq.spring.support.RocketMQHeaders源码中的图片
- org.apache.rocketmq.spring.suppohttp://www.baidu.comrt.RocketMQUtil#convertToSpringMhttpwatchessage(MessageExt)
- 需要注意目的地发送格式为 topic:tag
- 下面不httpwatch同消息间用了注释,实体类的作用需要测试哪种发送消息方式打开注释即可
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.constant.RocketMqDelayLevel;
import com.codecoord.rocketmq.domain.RocketMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
@Resource(name = "rocketMQTemplate")
private RocketMQTemplate rocketMqTemplate;
/**
* 通过实体类发送消息,发送注意事项请参考实体类
* http://localhost:8888/rocketmq/entity/message
*/
@RequestMapping("/entity/message")
public Object sendMessage() {
// 目的:topic:tag,如果不指定则发往配置的默认地址
String destination = RocketMqBizConstant.SOURCE_TOPIC + ":" + RocketMqBizConstant.SOURCE_TAG;
RocketMqMessage message = new RocketMqMessage();
message.setId(System.currentTimeMillis());
message.setMessage("当前消息发送时间为:" + LocalDateTime.now());
// Java时间字段需要单独处理,否则会序列化失败
message.setCurrentDate(LocalDate.now());
message.setCurrentDateTime(LocalDateTime.now());
message.setVersion("1.0");
/// 发送同步消息,消息成功发送到Broker时才返回,message可以入参批量消息
// 通过SendResult来处理发送结果
// SendResult sendResult = rocketMqTemplate.syncSend(destination, message);
/// 发送时指定业务key
/*Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)
// 设置keys
.setHeader(RocketMQHeaders.KEYS, message.getId())
.build();
SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage);*/
/// 发送延迟消息
Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();
SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage, 3000, RocketMqDelayLevel.FIVE_SECOND);
/// 发送同步有序消息,需要指定hashKey,可以用业务唯一键
// rocketMqTemplate.syncSendOrderly(destination, message, message.getId().toString());
/// 发送异步消息,消息发送后及时返回,然后通过回调方法通知
// rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
// @Override
// public void onSuccess(SendResult sendResult) {
// log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));
// }
//
// @Override
// public void onException(Throwable e) {
// log.error("消息发送失败【{}】", e.getMessage());
// }
// });
/// 发送异步有序消息,需要指定hashKey,可以用业务唯一键
// rocketMqTemplate.asyncSendOrderly(destination, message, message.getId().toString(), new SendCallback() {
// @Override
// public void onSuccess(SendResult sendResult) {
// log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));
// }
//
// @Override
// public void onException(Throwable e) {
// log.error("消息发送失败【{}】", e.getMessage());
// }
// });
/// 发送单向消息
// rocketMqTemplate.sendOneWay(destination, message);
/// 发送单向有序消息,通过MessageBuilder构建
// Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();
// rocketMqTemplate.sendOneWayOrderly(destination, buildMessage, message.getId().toString());
/// 发送和接收回调消息,需要实现 RocketMQReplyListener 监听器类才可以,否则将会超时错误
// rocketMqTemplate.sendAndReceive(destination, message, new RocketMQLocalRequestCallback<String>() {
// @Override
// public void onSuccess(String message) {
// log.info("消息发送成功,消息类型【{}】", message);
// }
//
// @Override
// public void onException(Throwable e) {
// log.error("消息发送失败", e);
// }
// });
/// 调用抽象类方法发送,最终也是syncSend
// rocketMqTemplate.convertAndSend(destination, "convertAndSend");
// 转换消息和发送,底层使用的是syncSend(destination, message),将会被RocketEntityMessageListener消费
// Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)
// // 设置请求头
// .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
// .build();
// 将会被RocketEntityMessageListener03消费
// Message<Object> buildMessage = MessageBuilder.withPayload(new Object()).build();
// rocketMqTemplate.send(destination, buildMessage);
/// 发送批量消息,批量消息最终转为单挑进行发送
// List<Message<String>> msgList = new ArrayList<>();
// for (int i = 0; i < 10; i++) {
// msgList.add(MessageBuilder.withPayload("消息:" + i).build());
// }
// rocketMqTemplate.syncSend(destination, msgList);
return message;
}
/**
* 直接将对象进行传输,也可以自己进行json转化后传输
*/
@RequestMapping("/messageExt/message")
public SendResult convertAndSend() {
String destination = "rocketmq_source_code_topic:rocketmq_source_code_ext_tag";
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", "messageExt");
return rocketMqTemplate.syncSend(destination, jsonObject);
}
}
- 需要测试哪种发送方式,把其他方式注释打开需要的发送方式注释即可,上面发送延迟消息
2.5 RocketMQListener消费
- 消息消费需要使用注解RocketMQMessajson数据gejsonobjectListener进行监听
- electorExpression = “*”:指定t源码之家ag,*表示监听所有tag
- consumerGroup:指定消费组
- 一般可以将consumerGroup和selectorExpression设源码时代置为一样
- topic:指定topic,topic至关重要,通常表示某一类业务或者平台,例如订单topic、仓储topic
- 监听器注解类需要实现RocketMQListener接口,泛型为发送消息源码1688时的类型
- org.apache.rocketmq.spring.core.RocketMQListener
- 监听器实例
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 实体类消费监听器
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_TAG
)
public class RocketEntityMessageListener implements RocketMQListener<RocketMqMessage> {
/**
* 普通消息
*/
@Override
public void onMessage(RocketMqMessage message) {
log.info("收到消息【{}】", JSONObject.toJSON(message));
try {
// 方法执行完成之后会自动进行进行ack,后续会分享源码解读
TimeUnit.SECONDS.sleep(3);
// 出现异常,将会自动进入重试队列
// int ex = 10 / 0;
} catch (InterruptedException e) {
log.error(e.getMessage());
}
log.info("休眠了3s后消费完成");
}
}
- 通过表达式注入
@RocketMQMessageListener(
topic = "${service.topic}",
consumerGroup = "${service.group}",
selectorExpression = "${service.tag}"
)
2.6 发送消息测试
- RocketMqController已经提供了消息发设计模式属于行为型的是送测试接口,通过接口发送后,即可自动发送到指定的topic:tag,特别注意是发送到 topic:tag,而不是 group:tag
- 发送之后通过RocketMQ控制台查看
三、消费配置
3.1 配置线程数
- RocketMQMessjsonobjectageListener默认是64个线程并发json解析消息,一些时候需要源码1688单线程消费,所以可以配置 consumeThreadMax 参数指定并发消费json是什么意思线程数,避免太大源码中的图片导致资源不够
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_TAG,
// 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小
consumeThreadMax = 5
)
3.2 消息确认
- RocketMQ在SpringBoot中,当方法执行成功且未抛出异常时会自动进行确认,如果出现异常将会进行重试
- 源码类:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
// 只要出现Exception异常,将会进行重试
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 如果没有出现异常则消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
- 其中delayLevelWhenNextConsume指定重试频率
- -1:不重试,直接放入死信队列
- 0:broker控制重设计模式有哪些试频率
- >0:客户端控制设计模式属于行为型的是重试频率
3.3 WARhttpclientN No appenders could be found for logger
- 引入SpringBoot之后不在启动类中做日志配置情况下将会有以下警告
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
- 需要在启动json格式类中启动前设置环境变量 rocketmq.client.logHTTPUseSlf4j 为 true 明确指定RocketMQ的日志框架
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMqApplication {
public static void main(String[] args) {
/*
* 指定使用的日志框架,否则将会报错
* RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
* RocketMQLog:WARN Please initialize the logger system properly.
*/
System.setProperty("rocketmq.client.logUseSlf4j", "true");
SpringApplication.run(RocketMqApplication.class, args);
}
}
四、Java8时间类型支持
2.3节有提到,在此处做详细说明
- RocketMQ内置使用的转换器是RocketMQMessageConverter
- org.apache.rocketmq.sp设计模式ring.autoconfigure.MessageConverterConfiguration
- 转换JSON时使用的是MappingJackson2MessageConverter,但是其不支持Java的时间类型,比如LocalDate
- 当json数据消息实体中存在上面的时间https和http的区别类型字段时源码网站将会报以下错误
java.lang.RuntimeException: cannot convert message to class com.codecoord.rocketmq.domain.RocketMqMessage
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.doConvertMessage(DefaultRocketMQListenerContainer.java:486) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:399) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:71) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:359) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:392) [rocketmq-client-4.9.1.jar:4.9.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_231]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_231]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_231]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_231]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_231]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Expected array or string.
- 所以需要自定义消息转换器,将MappingJacksoHTTPn2MessageConverter进行替换,然后添加支持时间模块
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import java.util.List;
/**
* 序列化器处理
*/
@Configuration
public class RocketMqConfig {
/**
* 解决RocketMQ Jackson不支持Java时间类型配置
* 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
*/
@Bean
@Primary
public RocketMQMessageConverter createRocketMQMessageConverter() {
RocketMQMessageConverter converter = new RocketMQMessageConverter();
CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
for (MessageConverter messageConverter : messageConverterList) {
if (messageConverter instanceof MappingJackson2MessageConverter) {
MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
objectMapper.registerModules(new JavaTimeModule());
}
}
return converter;
}
}
五、多环境自动隔离
5.1 环境隔离说明
- 从上面代码中可以看到,topic等源码时代信息通过代码常量方式维护,此时如果不对环境json进行隔离,将会导致消息被错误的环境消费
- 其中一种解决方案是通过$Value注入,但是这个需要每个环境都维护自己的topic等信息
- 我们可以进行环境区分自动隔离,比如dev、test、prod等不同环境只需要简单配置一个选项,所有的消息将被自动隔离源码网站,这样当协作人员需要单独测试的时候较为方便
- 环境隔离利用BeanPostProcessor的postProcessBeforeInitialization在监听器实例初始化前把对应topic修改
5.2 环境隔离配置
多环境自动隔离这部分实用性五颗星
- 配置文件中增加属性配置
# 自定义属性
system:
environment:
# 隔离环境名称,拼接到topic后,xxx_topic_tianxin,默认空字符串
name: tianxin
# 启动隔离,会自动在topic上拼接激活的配置文件,达到自动隔离的效果
# 默认为true,配置类:EnvironmentIsolationConfig
isolation: true
- 配置Bean修改配置,自动隔离核心原理
- Spring生命周期中BeanPostProcessor在实体类是什么类初始化前在postProcessBefor源码编程器eInitia设计模式的两大主题lization中搞事情
- 监听器类脑设计模式面试题袋上面有@Compo源码编辑器下载nent注解,所以一定会被Spring收HTTP拾到容器中
- 所以可以在初始化前,把监听器类的t设计模式opic/group/tag神不知鬼不觉给改掉,然后实例http 500化的时候用的http://www.baidu.com就是改后的值
- 可以选择隔离topic/group/tag或者全部隔离,如果是多租户那就拼接多租户信息即可
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;
/**
* RocketMQ多环境隔离配置
* 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉
*/
@Configuration
public class EnvironmentIsolationConfig implements BeanPostProcessor {
@Value("${system.environment.isolation:true}")
private boolean enabledIsolation;
@Value("${system.environment.name:''}")
private String environmentName;
@Override
public Object postProcessBeforeInitialization(@NonNull Object bean,
@NonNull String beanName) throws BeansException {
// DefaultRocketMQListenerContainer是监听器实现类
if (bean instanceof DefaultRocketMQListenerContainer) {
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
// 开启消息隔离情况下获取隔离配置,此处隔离topic,根据自己的需求隔离group或者tag
if (enabledIsolation && StringUtils.hasText(environmentName)) {
container.setTopic(String.join("_", container.getTopic(), environmentName));
}
return container;
}
return bean;
}
}
- 启动项目,从控制台中可以看到当开启隔离的时候会自动在topic后面加上隔离名称
- 提示:如果按照topic隔离,请提前创建好topic或者开启broker的topic自动创建功能