前言
SpringBoot 集成 RabbitMQ 公司老迈觉得运用注解太繁琐了,而且不能动态生成行列所以让我研讨是否能够动态绑定,所以就有了这个事情。打工人便是命苦没办法,硬着头皮直接就上了,接下来进入主题吧。
需求思路剖析
依据老迈的需求,大致分为运用装备文件进行装备,然后代码动态产生行列,交换机,生产者,顾客,以及假如装备了死信行列则动态绑定死信行列。由此得出一切的这些都是依据装备进行操作。然后百度有无代码创立就完事了。
装备文件思路剖析
问百度 RabbItMQ 支撑代码创立行列,交换机,以及两者之间绑定的代码,依据这些资料得出以下装备,下面示例装备只给出常用装备,其他装备后面会有个装备类
spring:
rabbitmq:
# 动态创立和绑定行列、交换机的装备
modules:
- routing-key: 路由KEY
producer: 生产者
consumer: 顾客
autoAck: 是否主动ACK
queue: 行列
name: 行列称号
dead-letter-exchange: 死信行列交换机
dead-letter-routing-key: 死信行列路由KEY
arguments: 行列其他参数,此装备支撑RabbitMQ的扩展装备
# 1分钟(测验),单位毫秒
x-message-ttl: 3000 # 推迟行列
exchange: 交换机
name: 交换机称号
..... 省略其他装备
从这儿开端便是界说中心代码模块需求的类,能够从这儿开端越过,直接看中心装备逻辑和中心代码,后续需求了解详细类的功用再回来看
装备类完成
装备有了,接下来便是创立Java目标把装备目标化了,因为支撑多个所以用的是调集接纳装备
/**
* 绑定装备根底类
* @author FJW
* @version 1.0
* @date 2023年04月11日 14:58
*/
@Data
@Configuration
@ConfigurationProperties("spring.rabbitmq")
public class RabbitModuleProperties {
/**
* 模块装备
*/
List<ModuleProperties> modules;
}
对应YML的装备类
/**
* YML装备类
* @author FJW
* @version 1.0
* @date 2023年04月11日 17:16
*/
@Data
public class ModuleProperties {
/**
* 路由Key
*/
private String routingKey;
/**
* 生产者
*/
private String producer;
/**
* 顾客
*/
private String consumer;
/**
* 主动承认
*/
private Boolean autoAck = true;
/**
* 行列信息
*/
private Queue queue;
/**
* 交换机信息
*/
private Exchange exchange;
/**
* 交换机信息类
*/
@Data
public static class Exchange {
/**
* 交换机类型
* 默许主题交换机
*/
private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.TOPIC;
/**
* 交换机称号
*/
private String name;
/**
* 是否耐久化
* 默许true耐久化,重启音讯不会丢失
*/
private boolean durable = true;
/**
* 当一切队绑定列均不在运用时,是否主动删去交换机
* 默许false,不主动删去
*/
private boolean autoDelete = false;
/**
* 交换机其他参数
*/
private Map<String, Object> arguments;
}
/**
* 行列信息类
*/
@Data
public static class Queue {
/**
* 行列称号
*/
private String name;
/**
* 是否耐久化
*/
private boolean durable = true; // 默许true耐久化,重启音讯不会丢失
/**
* 是否具有排他性
*/
private boolean exclusive = false; // 默许false,可多个顾客消费同一个行列
/**
* 当顾客均断开衔接,是否主动删去行列
*/
private boolean autoDelete = false; // 默许false,不主动删去,避免顾客断开行列丢掉音讯
/**
* 绑定死信行列的交换机称号
*/
private String deadLetterExchange;
/**
* 绑定死信行列的路由key
*/
private String deadLetterRoutingKey;
/**
* 交换机其他参数
*/
private Map<String, Object> arguments;
}
}
生产者&顾客,这儿只需求界说个接口,后续会有完成类进行完成
生产者
/**
* 生产者接口
* @author FJW
* @version 1.0
* @date 2023年04月11日 13:52
*/
public interface ProducerService {
/**
* 发送音讯
* @param message
*/
void send(Object message);
}
顾客, 这儿需求继承 RabbitMQ 的顾客接口,后续会直接把此接口给动态绑定到 RabbitMQ 中
/**
* 顾客接口
* @author FJW
* @version 1.0
* @date 2023年04月11日 13:52
*/
public interface ConsumerService extends ChannelAwareMessageListener {
}
重试处理器
/**
* 重试处理器
* @author FJW
* @version 1.0
* @date 2023年04月19日 16:40
*/
public interface CustomRetryListener {
/**
* 最终一次重试失利回调
* @param context
* @param callback
* @param throwable
* @param <E>
* @param <T>
*/
public <E extends Throwable, T> void lastRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable);
/**
* 每次失利的回调
* @param context
* @param callback
* @param throwable
* @param <E>
* @param <T>
*/
public <E extends Throwable, T> void onRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable);
}
常量枚举界说
交换机类型枚举
/**
* 交换机类型枚举
* @author FJW
* @version 1.0
* @date 2023年04月11日 15:19
*/
public enum RabbitExchangeTypeEnum {
/**
* 直连交换机
* <p>
* 依据routing-key精准匹配行列(最常运用)
*/
DIRECT,
/**
* 主题交换机
* <p>
* 依据routing-key含糊匹配行列,*匹配任意一个字符,#匹配0个或多个字符
*/
TOPIC,
/**
* 扇形交换机
* <p>
* 直接分发给一切绑定的行列,忽略routing-key,用于播送音讯
*/
FANOUT,
/**
* 头交换机
* <p>
* 相似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(运用较少)
*/
HEADERS;
}
行列,交换机,路由 常量枚举
/**
* 行列,交换机。路由 常量枚举
* @author FJW
* @version 1.0
* @date 2023年04月18日 16:39
*/
public enum RabbitEnum {
QUEUE("xxx.{}.queue", "行列称号"),
EXCHANGE("xxx.{}.exchange", "交换机称号"),
ROUTER_KEY("xxx.{}.key", "路由称号"),
;
RabbitEnum(String value, String desc) {
this.value = value;
this.desc = desc;
}
@Getter
private String value;
@Getter
private String desc;
}
中心代码
生产者完成类封装
/**
* 生产者完成类
* @author FJW
* @version 1.0
* @date 2023年04月18日 14:32
*/
@Slf4j
public class AbsProducerService implements ProducerService {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 交换机
*/
private String exchange;
/**
* 路由
*/
private String routingKey;
@Override
public void send(Object msg) {
MessagePostProcessor messagePostProcessor = (message) -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(IdUtil.randomUUID());
messageProperties.setTimestamp(new Date());
return message;
};
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentEncoding("UTF-8");
messageProperties.setContentType("text/plain");
String data = JSONUtil.toJsonStr(msg);
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend(this.exchange, this.routingKey, message, messagePostProcessor);
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
}
顾客完成类封装
/**
* @author FJW
* @version 1.0
* @date 2023年04月18日 17:53
*/
@Slf4j
public abstract class AbsConsumerService<T> implements ConsumerService {
private Class<T> clazz = (Class<T>) new TypeToken<T>(getClass()) {}.getRawType();
/**
* 音讯
*/
private Message message;
/**
* 通道
*/
private Channel channel;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
this.message = message;
this.channel = channel;
String body = new String(message.getBody());
onConsumer(genObject(body));
}
/**
* 依据反射获取泛型
* @param body
* @return
*/
private T genObject(String body) throws JsonProcessingException, IllegalAccessException, InstantiationException {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(body, clazz);
}catch (Exception e) {
log.error("MQ转发层过错,请检查泛型是否与实践类型匹配, 指定的泛型是: {}", clazz.getName(), e);
}
return clazz.newInstance();
}
/**
* 扩展消费办法,对音讯进行封装
* @param data
* @throws IOException
*/
public void onConsumer(T data) throws IOException {
log.error("未对此办法进行完成: {}", data);
}
/**
* 承认音讯
*/
protected void ack() throws IOException {
ack(Boolean.FALSE);
}
/**
* 回绝音讯
*/
protected void nack() throws IOException {
nack(Boolean.FALSE, Boolean.FALSE);
}
/**
* 回绝音讯
*/
protected void basicReject() throws IOException {
basicReject(Boolean.FALSE);
}
/**
* 回绝音讯
* @param multiple 当时 DeliveryTag 的音讯是否承认一切 true 是, false 否
*/
protected void basicReject(Boolean multiple) throws IOException {
this.channel.basicReject(this.message.getMessageProperties().getDeliveryTag(), multiple);
}
/**
* 是否主动承认
* @param multiple 当时 DeliveryTag 的音讯是否承认一切 true 是, false 否
*/
protected void ack(Boolean multiple) throws IOException {
this.channel.basicAck(this.message.getMessageProperties().getDeliveryTag(), multiple);
}
/**
* 回绝音讯
* @param multiple 当时 DeliveryTag 的音讯是否承认一切 true 是, false 否
* @param requeue 当时 DeliveryTag 音讯是否重回行列 true 是 false 否
*/
protected void nack(Boolean multiple, Boolean requeue) throws IOException {
this.channel.basicNack(this.message.getMessageProperties().getDeliveryTag(), multiple, requeue);
}
}
音讯监听工厂类完成,此完成非常重要,此处的代码便是绑定顾客的中心代码
/**
* MQ详细音讯监听器工厂
* @author FJW
* @version 1.0
* @date 2023年04月18日 10:48
*/
@Data
@Slf4j
@Builder
public class ConsumerContainerFactory implements FactoryBean<SimpleMessageListenerContainer> {
/**
* MQ衔接工厂
*/
private ConnectionFactory connectionFactory;
/**
* 操作MQ管理器
*/
private AmqpAdmin amqpAdmin;
/**
* 行列
*/
private Queue queue;
/**
* 交换机
*/
private Exchange exchange;
/**
* 顾客
*/
private ConsumerService consumer;
/**
* 重试回调
*/
private CustomRetryListener retryListener;
/**
* 最大重试次数
*/
private final Integer maxAttempts = 5;
/**
* 是否主动承认
*/
private Boolean autoAck;
@Override
public SimpleMessageListenerContainer getObject() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setAmqpAdmin(amqpAdmin);
container.setConnectionFactory(connectionFactory);
container.setQueues(queue);
container.setPrefetchCount(20);
container.setConcurrentConsumers(20);
container.setMaxConcurrentConsumers(100);
container.setDefaultRequeueRejected(Boolean.FALSE);
container.setAdviceChain(createRetry());
container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
if (Objects.nonNull(consumer)) {
container.setMessageListener(consumer);
}
return container;
}
/**
* 装备重试
* @return
*/
private Advice createRetry() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
// 第一次重试调用
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
if (Objects.nonNull(retryListener)) {
retryListener.onRetry(context, callback, throwable);
if (maxAttempts.equals(context.getRetryCount())) {
retryListener.lastRetry(context, callback, throwable);
}
}
}
});
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxAttempts));
retryTemplate.setBackOffPolicy(genExponentialBackOffPolicy());
return RetryInterceptorBuilder.stateless()
.retryOperations(retryTemplate).recoverer(new RejectAndDontRequeueRecoverer()).build();
}
/**
* 设置过期时间
* @return
*/
private BackOffPolicy genExponentialBackOffPolicy() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
// 重试距离基数(秒)
backOffPolicy.setInitialInterval(5000);
// 从重试的第一次至最终一次,最大时间距离(秒)
backOffPolicy.setMaxInterval(86400000);
// 重试指数
backOffPolicy.setMultiplier(1);
return backOffPolicy;
}
@Override
public Class<?> getObjectType() {
return SimpleMessageListenerContainer.class;
}
}
中心装备类
/**
* RabbitMQ 全局装备,SpringBoot 发动后会回调此类
* @author FJW
* @version 1.0
* @date 2023年04月11日 13:55
*/
@Slf4j
public class RabbitMqConfig implements SmartInitializingSingleton {
/**
* MQ链接工厂
*/
private ConnectionFactory connectionFactory;
/**
* MQ操作管理器
*/
private AmqpAdmin amqpAdmin;
/**
* YML装备
*/
private RabbitModuleProperties rabbitModuleProperties;
@Autowired
public RabbitMqConfig(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties, ConnectionFactory connectionFactory) {
this.amqpAdmin = amqpAdmin;
this.rabbitModuleProperties = rabbitModuleProperties;
this.connectionFactory = connectionFactory;
}
@Override
public void afterSingletonsInstantiated() {
StopWatch stopWatch = StopWatch.create("MQ");
stopWatch.start();
log.debug("初始化MQ装备");
List<ModuleProperties> modules = rabbitModuleProperties.getModules();
if (CollUtil.isEmpty(modules)) {
log.warn("未装备MQ");
return;
}
for (ModuleProperties module : modules) {
try {
Queue queue = genQueue(module);
Exchange exchange = genQueueExchange(module);
queueBindExchange(queue, exchange, module);
bindProducer(module);
bindConsumer(queue, exchange, module);
} catch (Exception e) {
log.error("初始化失利", e);
}
}
stopWatch.stop();
log.info("初始化MQ装备成功耗时: {}ms", stopWatch.getTotal(TimeUnit.MILLISECONDS));
}
/**
* 绑定生产者
* @param module
*/
private void bindProducer(ModuleProperties module) {
try {
AbsProducerService producerService = SpringUtil.getBean(module.getProducer());
producerService.setExchange(module.getExchange().getName());
producerService.setRoutingKey(module.getRoutingKey());
log.debug("绑定生产者: {}", module.getProducer());
} catch (Exception e) {
log.warn("无法在容器中找到该生产者[{}],若需求此生产者则需求做详细完成", module.getConsumer());
}
}
/**
* 绑定顾客
* @param queue
* @param exchange
* @param module
*/
private void bindConsumer(Queue queue, Exchange exchange, ModuleProperties module) {
CustomRetryListener customRetryListener = null;
try {
customRetryListener = SpringUtil.getBean(module.getRetry());
}catch (Exception e) {
log.debug("无法在容器中找到该重试类[{}],若需求重试则需求做详细完成", module.getRetry());
}
try {
ConsumerContainerFactory factory = ConsumerContainerFactory.builder()
.connectionFactory(connectionFactory)
.queue(queue)
.exchange(exchange)
.consumer(SpringUtil.getBean(module.getConsumer()))
.retryListener(customRetryListener)
.autoAck(module.getAutoAck())
.amqpAdmin(amqpAdmin)
.build();
SimpleMessageListenerContainer container = factory.getObject();
if (Objects.nonNull(container)) {
container.start();
}
log.debug("绑定顾客: {}", module.getConsumer());
} catch (Exception e) {
log.warn("无法在容器中找到该顾客[{}],若需求此顾客则需求做详细完成", module.getConsumer());
}
}
/**
* 行列绑定交换机
* @param queue
* @param exchange
* @param module
*/
private void queueBindExchange(Queue queue, Exchange exchange, ModuleProperties module) {
log.debug("初始化交换机: {}", module.getExchange().getName());
String queueName = module.getQueue().getName();
String exchangeName = module.getExchange().getName();
module.setRoutingKey(StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), module.getRoutingKey()));
String routingKey = module.getRoutingKey();
Binding binding = new Binding(queueName,
Binding.DestinationType.QUEUE,
exchangeName,
routingKey,
null);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(binding);
log.debug("行列绑定交换机: 行列: {}, 交换机: {}", queueName, exchangeName);
}
/**
* 创立交换机
* @param module
* @return
*/
private Exchange genQueueExchange(ModuleProperties module) {
ModuleProperties.Exchange exchange = module.getExchange();
RabbitExchangeTypeEnum exchangeType = exchange.getType();
exchange.setName(StrUtil.format(RabbitEnum.EXCHANGE.getValue(), exchange.getName()));
String exchangeName = exchange.getName();
Boolean isDurable = exchange.isDurable();
Boolean isAutoDelete = exchange.isAutoDelete();
Map<String, Object> arguments = exchange.getArguments();
return getExchangeByType(exchangeType, exchangeName, isDurable, isAutoDelete, arguments);
}
/**
* 依据类型生成交换机
* @param exchangeType
* @param exchangeName
* @param isDurable
* @param isAutoDelete
* @param arguments
* @return
*/
private Exchange getExchangeByType(RabbitExchangeTypeEnum exchangeType, String exchangeName, Boolean isDurable, Boolean isAutoDelete, Map<String, Object> arguments) {
AbstractExchange exchange = null;
switch (exchangeType) {
// 直连交换机
case DIRECT:
exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
// 主题交换机
case TOPIC:
exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
//扇形交换机
case FANOUT:
exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
// 头交换机
case HEADERS:
exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
default:
log.warn("未匹配到交换机类型");
break;
}
return exchange;
}
/**
* 创立行列
* @param module
* @return
*/
private Queue genQueue(ModuleProperties module) {
ModuleProperties.Queue queue = module.getQueue();
queue.setName(StrUtil.format(RabbitEnum.QUEUE.getValue(), queue.getName()));
log.debug("初始化行列: {}", queue.getName());
Map<String, Object> arguments = queue.getArguments();
if (MapUtil.isEmpty(arguments)) {
arguments = new HashMap<>();
}
// 转化ttl的类型为long
if (arguments.containsKey("x-message-ttl")) {
arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
}
// 绑定死信行列
String deadLetterExchange = queue.getDeadLetterExchange();
String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
deadLetterExchange = StrUtil.format(RabbitEnum.EXCHANGE.getValue(), deadLetterExchange);
deadLetterRoutingKey = StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), deadLetterRoutingKey);
arguments.put("x-dead-letter-exchange", deadLetterExchange);
arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
log.debug("绑定死信行列: 交换机: {}, 路由: {}", deadLetterExchange, deadLetterRoutingKey);
}
return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}
}
以上装备完成后,开端RUN代码
运用示例
这儿装备了推迟行列和死信行列,开RUN
spring:
rabbitmq:
# 动态创立和绑定行列、交换机的装备
modules:
# 正常行列
- routing-key: test
consumer: testConsumerService
producer: testProducerService
autoAck: false
queue:
name: test
dead-letter-exchange: dead
dead-letter-routing-key: dead
arguments:
# 1分钟(测验),单位毫秒
x-message-ttl: 3000
exchange:
name: test
# 死信行列
- routing-key: dead
consumer: deadConsumerService
producer: deadProducerService
autoAck: false
queue:
name: dead
exchange:
name: dead
项目发动部分日志
MQ管理端查看是否绑定成功
交换机
行列
由此能够得出代码是正常的,都有
生产者发送音讯
PostMan
顾客,因为测验死信行列,所以这儿回绝消费了
顾客消费信息
搞定收工,接下来还有什么问题等到详细生产上去发现了,横竖需求完成了,假如你们有方面需求此文章仍是个不错的参考,各位看客记住留下赞哦