前语
RabbitMQ 是在 AMQP(Advanced Message Queuing Protocol) 协议规范基础上完整的,可复用的企业音讯体系。它遵循 Mozilla Public License 开源协议,选用 Erlang 完成的工业级的音讯行列(MQ)服务器,建立在 Erlang OTP 平台上(因为选用 Erlang 开发,所以 RabbitMQ 稳定性和牢靠性比较高)
其他主流 MQ 产品
- ActiveMQ:Apache 出品,最盛行的,能力强劲的开源音讯总线,基于 JMS(Java Message Service)规范
- RocketMQ:阿里低推迟、高并发、高可用、高牢靠的分布式音讯中间件,基于 JMS,现在由 Apache 基金会维护
- Kafka:分布式,分区的,多副本的,多订阅者的音讯发布订阅体系(分布式 MQ 体系),能够用于查找日志,监控日志,拜访日志等
本文为 RabbitMQ 入门教程,主要将会解说 RabbitMQ 装置装备(Windows),相关概念,及项目中具体运用
装置
Erlang
官网下载链接:Downloads – Erlang/OTP
RabbitMQ 服务器有必要首要装置 Erlang 运转环境,一起装置时需求注意 RabbityMQ 所依靠的 Erlang 版别,咱们能够检查下方官方版别对应信息
版别对应:RabbitMQ Erlang Version Requirements — RabbitMQ
本次运用版别 Erlang OTP 25.3(点击跳转下载链接)
双击履行 exe 装置程序,除了装置途径其他都依照默许即可
然后装备环境变量
ERLANG_HOME = D:ErlangErlangErlang OTP
并且添加 /bin 目录到 Path 环境变量中,即添加 %ERLANG_HOME%bin
到 Path 中
装置装备之后,翻开 CMD,输入 erl 然后回车键,会弹出版别信息,表示 Erlang 装置成功
RabbitMQ
下载链接: RabbitMQ 3.12.0
装置 exe 文件,履行装置包,同样除了装置途径外其他保持默许
装备环境变量
RABBITMQ_SERVER = D:RabbitMQRabbitMQrabbitmq_server-3.12.0
然后添加 %RABBITMQ_SERVER%sbin
到 Path 环境变量中
检查一切插件
rabbitmq-plugins list
注:假如出现问题请参阅最后一章 彻底卸载
之后咱们需求装置 rabbitmq_management 插件,能够运用可视化的方法检查 RabbitMQ 服务器实例的状况,以及操控 RabbitMQ 服务器
# 装置插件
rabbitmq-plugins enable rabbitmq_management
拜访办理界面: http://localhost:15672/ (账号密码:guest / guest)
前期装置装备完毕,下面能够配合官方入门文档学习
音讯行列
界说
音讯指的是两个运用间传递的数据。数据的类型有很多种方法,或许只包括文本字符串,也或许包括嵌入对象。
“音讯行列(Message Queue)”是在音讯的传输过程中保存音讯的容器。在音讯行列中,一般有生产者和顾客两个角色。生产者只负责发送数据到音讯行列,谁从音讯行列中取出数据处理,他不论。顾客只负责从音讯行列中取出数据处理,他不论这是谁发送的数据
作用
解耦。如图所示。假设有体系 B、C、D 都需求体系 A 的数据,所以体系 A 调用三个办法发送数据到 B、C、D。这时,体系 D 不需求了,那就需求在体系 A 把相关的代码删掉。假设这时有个新的体系 E 需求数据,这时体系 A 又要添加调用体系 E 的代码。为了降低这种强耦合,就能够运用 MQ,体系 A 只需求把数据发送到 MQ,其他体系假如需求数据,则从 MQ 中获取即可
异步。如图所示。一个客户端恳求发送进来,体系 A 会调用体系 B、C、D 三个体系,同步恳求的话,呼应时刻便是体系 A、B、C、D 的总和,也便是 800ms。假如运用 MQ,体系 A 发送数据到 MQ,然后就能够回来呼应给客户端,不需求再等待体系 B、C、D 的呼应,能够大大地提高功能。关于一些非必要的事务,比方发送短信,发送邮件等等,就能够选用 MQ
削峰。如图所示。这其实是 MQ 一个很重要的运用。假设体系 A 在某一段时刻恳求数暴增,有 5000 个恳求发送过来,体系 A 这时就会发送 5000 条 SQL 进入 MySQL 进行履行,MySQL 关于如此巨大的恳求当然处理不过来,MySQL 就会溃散,导致体系瘫痪。假如运用 MQ,体系 A 不再是直接发送 SQL 到数据库,而是把数据发送到 MQ,MQ 短时刻积压数据是能够承受的,然后由顾客每次拉取 2000 条进行处理,防止在恳求峰值时期大量的恳求直接发送到 MySQL 导致体系溃散
特点
牢靠性:通过支撑音讯耐久化,支撑事务,支撑消费和传输的 ack 等来保证牢靠性
路由机制:支撑主流的订阅消费形式,如播送,订阅,headers 匹配等
扩展性:多个 RabbitMQ 节点能够组成一个集群,也能够根据实践事务状况动态地扩展集群中节点
高可用性:行列能够在集群中的机器上设置镜像,使得在部分节点出现问题的状况下队依然可用
多种协议:RabbitMQ 除了原生支撑 AMQP 协议,还支撑 STOMP,MQTT 等多种音讯中间件协议
多语言客户端:RabbitMQ 简直支撑一切常用语言,比方 Java、Python、Ruby、PHP、C#、JavaScript 等
办理界面:RabbitMQ 供给了易用的用户界面,使得用户能够监控和办理音讯、集群中的节点等
插件机制:RabbitMQ 供给了许多插件,以完成从多方面进行扩展,当然也能够编写自己的插件
运用
本章将会集成 rabbitmq 到 SpringBoot 中,并运用 rabbitmq-provider (生产者)和 rabbitmq-consumer(顾客) 两个项目进行具体解说, 也能够在父项目中创立这两个模块(本文选用父子模块方法)
一切代码示例已经上传到 GitHub 库房
库房地址:ReturnTmp/rabbitmq-demo: rabbitmq 实例代码 (github.com)
生产者
装备
创立子模块 rabbitmq-provider
依靠装备(也能够 IDEA 初始化模块直接勾选)
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml
server:
port: 8021
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 111111
virtual-host: RootHost
其间虚拟 host 装备项不是有必要的,需求自行创立 vhost,假如未自行创立,默许为 virtual-host: /
注:vhost 能够理解为虚拟 broker,即 mini-RabbitMQserver,其内部均含有独立的 queue、bind、exchange 等,最重要的是具有独立的权限体系,能够做到 vhost 范围内的用户控制。当然,从 RabbitMQ 全局角度,vhost 能够作为不同权限阻隔的手段
能够依照如下过程创立 vhost
然后创立用户(办理员)
然后咱们需求为用户分配权限,指定运用咱们刚刚创立的 vhost
代码
创立直连交换机装备类
注:RabbitMQ 共有四种交换机,分别为:直连交换机,扇形交换机,主题交换机,首部交换机。这儿运用直连交换机演示,其他读者能够自行测验
@Configuration
public class DirectRabbitConfig {
//行列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否耐久化,默许是false,耐久化行列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存行列:当时衔接有效
// exclusive:默许也是false,只能被当时创立的衔接运用,并且当衔接关闭后行列即被删去。此参阅优先级高于durable
// autoDelete:是否主动删去,当没有生产者或者顾客运用此行列,该行列会主动删去。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下行列的耐久化就好,其余两个便是默许false
return new Queue("TestDirectQueue", true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange", true, false);
}
//绑定 将行列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
然后写简略的接口进行音讯推送(能够视状况写为守时使命)
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //运用RabbitTemplate,这供给了接纳/发送等等办法
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//将音讯带着绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
发动项目,调用接口: http://localhost:8021/sendDirectMessage
检查 RabbitMQ 办理页面检查是否推送成功
顾客
装备
创立子模块 rabbitmq-consumer
依靠装备
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml
server:
port: 8022
spring:
application:
name: rabbitmq-consumer
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 111111
virtual-host: RootHost
代码
创立音讯接纳监听类
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver receive message: " testMessage.toString());
}
}
之后发动项目,检查顾客接纳状况
序列化
发送接纳音讯或许出现 Failed to convert message
问题,能够通过运用 JSON 序列化传输信息方法处理
生产者
@Configuration
public class RabbitMQConfig implements InitializingBean {
/**
* 主动注入RabbitTemplate模板
*/
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送音讯JSON序列化
*/
@Override
public void afterPropertiesSet() {
//运用JSON序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
}
}
顾客
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
}
彻底卸载
咱们装置中或许出现各种问题,一般状况下是 RabbitMQ 和 Erlang 版别不对应,需求彻底卸载 RabbitMQ 和 Erlang,能够依照如下过程卸载
注:博主首次装置运用的是 Erlang 20.3 Rabbit 3.7.15 ,之后似乎小版别不对应,出现问题,需求从头卸载装置
(1)翻开 Windows 控制面板,双击“程序和功能”。
(2)在当时装置的程序列表中,右键单击 RabbitMQ Server,然后单击“卸载”。
(3)在当时装置的程序列表中,右键单击“Erlang OTP”,然后单击“卸载”。
(4)翻开 Windows 使命办理器。
(5)在使命办理器中,查找进程 epmd.exe。 假如此进程仍在运转,请右键单击该进程,然后单击“结束进程”。
(6)删去 RabbitMQ 和 Erlang 的一切装置目录。
(7)删去文件 C:WindowsSystem32configsystemprofile.erlang.cookie
(假如存在)。
(8)转到用户文件夹:C:Users[username]
,然后删去文件.erlang.cookie。
(9)同样在 User 文件夹中,转到 AppData Roaming RabbitMQ
。删去 RabbitMQ 文件夹。
(10)删去注册表 HKEY_LOCAL_MACHINESOFTWAREEricssonErlangErlSrv
的子项。
(11)翻开运转 cmd->sc delete RabbitMQ。
(12)翻开运转->regedit 找到 RabbitMQ 节点,删掉即可(假如存在)
参阅链接
- Windows 下装置 RabbitMQ 服务器及基本装备 – 蓝之风 – 博客园 (cnblogs.com)
- RabbitMQ Windows 装置、装备、运用 – 小白教程-阿里云开发者社区 (aliyun.com)
- Windows 如何彻底卸载 RabbitMQ 和 Erlang 删去注册表
- windows 下 Erlang 与 RabbitMQ 从头装置时,因为卸载不干净导致各类错误
- 超具体的 RabbitMQ 入门,看这篇就够了!-阿里云开发者社区 (aliyun.com)
- RabbitMQ 整合 Spring Boot,完成 Hello World
- Springboot 整合 RabbitMq ,用心看完这一篇就够了
- RabbitMq 核心知识点小结 – 知乎 (zhihu.com)
- RabbitMQ消费音讯坑:failed to convert serialized Message content – jiuchengi
本文由博客一文多发平台 OpenWrite 发布!