MQ介绍
MQ:MessageQueue,音讯行列。 行列,是一种FIFO 先进先出的数据结构。音讯由出产者发送到MQ进行排队,然后按本来的顺序交由音讯的顾客进行处理
MQ的效果主要有以下三个方面:
-
异步
:进步体系响应速度和吞吐量;(快递员放快递到菜鸟驿站,客户有时间再取) -
解耦
:减小服务间影响(进步稳定性),完成数据分发(发布/订阅形式,一个出产者对应多个顾客) -
削峰
:稳定的体系资源应对突发的流量冲击(避免突发流量形成服务宕机)
MQ优缺点
- 体系可用性下降(外部依靠增多,一旦MQ宕机,会对事务产生影响)
- 体系复杂性进步(同步变异步,引进音讯消费问题,音讯不丢掉,顺序消费,重复消费等)
- 音讯一致性问题(多个顾客怎么确保同时成功/失利)
常用的MQ产品包含Kafka、RabbitMQ和RocketMQ
RabbitMQ安装
Releases rabbitmq/erlang-rpm (github.com)下载地址
rpm -ivh rabbitmq-server-3.9.15-1.el7.noarch.rpm
# 发动服务,后台进程
service rabbitmq-server start
# 发动程序,前台运行,检查发动过程
rabbitmqctl start_app
# 状况
service rabbitmq-server status
# 封闭服务
rabbitmqctl stop_app
安装web控制台插件,重启后生效
rabbitmq-plugins enable rabbitmq_management
http://192.168.119.133:15672
创立管理员用户
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
集群搭建
一般形式
在这种集群形式下,集群各节点之间只会有相同的元数据
,而音讯不会冗余,只存在一个节点,顾客在消费时,请求到了没有该音讯的节点,RabbitMQ会临时在节点间进行数据传输。
这种形式下音讯牢靠性不高
。也不支撑高可用,某一个节点挂了之后,需求重启服务后,才能让这个节点的音讯正常消费。
镜像形式
RabbitMQ的官方HA高可用方案。需求在搭建了一般集群之后再弥补搭建。其本质区别在于,这种形式会在镜像节点中心主动进行音讯同步
,而不是在客户端拉取音讯时临时同步。
而且在集群内部有一个算法会选举产生master
和slave
,当一个master挂了后,也会主动选出一个来。从而给整个集群供给高可用才能。
这种形式的音讯牢靠性更高,由于每个节点上都存着全量的音讯
。而他的弊端也是明显的,集群内部的网络带宽会被这种同步通讯很多的耗费,进而下降整个集群的功用。这种形式下,行列数量最好不要过多。
1、同步集群节点的cookie,途径/var/lib/rabbitmq/.erlang.cookie
2、将worker1和worker3参加worker2的集群中rabbitmqctl join_cluster --ram rabbit@worker2
参加时首先要发动worker1和worker3上的服务,不然呈现如下错误
NODENAME=rabbit@worker3
依次履行
service rabbitmq-server start
rabbitmqctl start_app
rabbitmqctl join_cluster --ram rabbit@worker2
参加集群后如下所示,rabbitmqctl cluster_status
检查集群状况
--ram
,表明节点的元数据(交换机、行列定义信息)只保存在内存中;此时存在单点故障,如果worker2节点宕机,元数据有可能丢掉。所以官方不建议一切节点都运用ram。
通常在出产环境中,为了削减RabbitMQ集群之间的数据传输,在装备镜像战略时,会针对固定的虚拟主机virtual host来装备。
创立一个虚拟主机,并增加对应的镜像战略
rabbitmqctl add_vhost /mirror
rabbitmqctl set_policy ha-all --vhost "/mirror" "^" '{"ha-mode":"all"}'
通常镜像形式的集群已经足够满足大部分的出产场景了。尽管他对体系资源耗费比较高,可是在出产环境中,体系的资源都是会做预留的,所以正常的运用是没有问题的。可是在做事务集成时,仍是需求注意行列数量不宜过多,而且尽量不要让RabbitMQ产生很多的音讯堆积。
创立行列
根底概念
RabbitMQ是依据AMQP协议
(音讯行列协议,用于出产者和顾客之间通信)开发的一个MQ产品
虚拟主机Virtual host
在一个RabbitMQ Server或集群中能够划分出多个虚拟主机,每一个虚拟主机都有AMQP的全套组件
,而且能够针对每个虚拟主机进行权限和数据分配;虚拟主机之间是完全隔离的
。
衔接Connection
客户端与RabbitMQ进行交互,首先就需求树立一个TPC衔接,这个衔接便是 Connection。
信道Channel
一旦客户端与与RabbitMQ树立了衔接,就会分配一个AMQP信道 Channel。每个信道有仅有ID,数据操作根本在信道Channel中展开。RabbitMQ为了削减功用开销,会在一个Connection中树立多个Channel,这样便于客户端进行多线程衔接,这些衔接会复用同一个Connection的TCP通道。
交换机Exchange
音讯发送到RabbitMQ中后,会首先进入一个交换机,然后由交换机将数据转发到不同的行列中。RabbitMQ中有多种不同类型的交换机来支撑不同的路由战略。
-
Direct Exchange
:依据音讯的Routing key
,将音讯路由到匹配的行列 -
Topic Exchange
: 依据音讯的Routing key
和通配符(*
匹配一个单词,#
匹配多个单词),将音讯路由到匹配的行列 -
Headers Exchange
: 依据音讯的头部信息
路由音讯到匹配行列,头部信息能够是恣意键值对 -
Fanout Exchange
: 将音讯广播到与之绑定的一切行列
行列Queue
行列是实践保存数据的最小单位
,具备FIFO的特性,音讯会被发到行列中,然后才被顾客消费
Classic经典行列
在单机环境中,拥有比较高的音讯牢靠性
Quorum仲裁行列
Quorum是依据Raft一致性
协议完成的一种新式的分布式音讯行列,他完成了耐久化,多备份的FIFO行列,主要便是针对RabbitMQ的镜像形式
规划的。简单理解便是quorum行列中的音讯需求有集群中半数节点赞同承认
后,才会写入到行列中。这种行列类似于RocketMQ傍边的DLedger集群。这种方法能够确保音讯在集群内部不会丢掉
。同时,Quorum是以牺牲很多高档行列特性为价值,来进一步确保音讯在分布式环境下的高牢靠。
Feature | Classic Mirrored | Quorum | 注释 |
---|---|---|---|
Non-durable queues | yes | no不支撑 | 不耐久化行列 |
Exclusivity | yes | no | 只能由声明行列的connection运用 |
Per message persistence | per message | always | |
Membership changes | automatic | manual | |
Message TTL (Time-To-Live) | yes | yes (since 3.10) | |
Queue TTL | yes | partially (lease is not renewed on queue re-declaration) | |
Queue length limits | yes | yes (exceptx-overflow:reject-publish-dlx) | |
Lazy behaviour | yes | always (since 3.10) | |
Message priority | yes | no | |
Consumer priority | yes | yes | |
Dead letter exchanges | yes | yes | |
Adheres topolicies | yes | yes (seePolicy support) | |
Poison message handling | no | yes | 毒音讯 |
GlobalQoS Prefetch | yes | no |
Poison message handling
:毒音讯,音讯一向不能被顾客正常消费(消费失利或许消费逻辑有问题),就会导致音讯不断的从头入队,形成功用浪费;Quorum行列会跟踪音讯的失利次数,记录在x-delivery-count
头部参数中,然后通过设置Delivery limit
设置阈值,失利次数超过阈值就会删去音讯,或许装备了死信行列,就进入对应的死信行列
声明Quorum行列
Map<String,Object> params =newHashMap<>();
params.put("x-queue-type","quorum");
//声明Quorum行列的方法便是增加一个x-queue-type参数,指定为quorum。默认是classic
channel.queueDeclare(QUEUE_NAME,true,false,false, params);
Quorum行列更适合于行列长期存在,而且对容错、数据安全方面的要求比低推迟、不耐久等高档行列更能要求更严厉的场景。
例如 电商体系的订单,引进MQ后,处理速度能够慢一点,可是订单不能丢掉。
不适用的场景
- 临时运用的行列,或许常常修改和删去的行列
- 对音讯推迟要求高,Raft一致性算法会影响音讯的推迟
- 对数据安全性要求不高,Quorum行列需求顾客手动通知或许出产者手动承认
- 行列音讯积压严重或许音讯很大,Quorum行列会将一切音讯一直保存在内存中,直到撑爆内存
Stream行列
Stream行列是RabbitMQ自3.9.0版本开始引进的一种新的数据行列类型,也是目前官方最为引荐的行列类型。这种行列类型的音讯是耐久化到磁盘而且具备分布式备份的,更适合于顾客多,读音讯十分频频的场景。
Stream行列的核心是以append-only的方法将音讯耐久化到日志文件中,然后通过调整顾客的消费进展offset,完成音讯的屡次分发。以下是4个主要特点
- 大规模分发,已有的音讯行列是一个顾客绑定一个专用行列,Stream行列答应恣意数量的顾客运用同一个行列
- 音讯回溯,已有的行列中,音讯被顾客消费完后,会从行列中删去,无法读取消费过的音讯,Stream答应顾客从日志的任何节点开始从头读取音讯
- 高吞吐量
- 大日志,已有的行列中积累的音讯过多时,功用下降会十分明显,Stream行列的规划方针以最小的内存开销存储很多数据
功用比照
Feature | Classic | Stream |
---|---|---|
Non-durable queues | yes | no |
Exclusivity | yes | no |
Per message persistence | per message | always |
Membership changes | automatic | manual |
TTL | yes | no (but seeRetention) |
Queue length limits | yes | no (but seeRetention) |
Lazy behaviour | yes | inherent |
Message priority | yes | no |
Consumer priority | yes | no |
Dead letter exchanges | yes | no |
Adheres topolicies | yes | (seeRetention) |
Reacts tomemory alarms | yes | no (uses minimal RAM) |
Poison message handling | no | no |
GlobalQoS Prefetch | yes | no |
Map<String,Object> params =newHashMap<>();
params.put("x-queue-type","stream");
params.put("x-max-length-bytes",20_000_000_000L);// 日志文件的最大字节数: 20 GB
params.put("x-stream-max-segment-size-bytes",100_000_000);// 每一个日志文件的最大巨细: 100 MB
channel.queueDeclare(QUEUE_NAME,true,false,false, params);
RabbitMQ编程模型
根底模型
成功发送音讯
for (int i = 100; i < 200; i++) {
String newMessage = String.format("亚索%d级了", i);
channel.basicPublish("", QUEUE_NAME, null, newMessage.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + newMessage + "'");
}
消费2个音讯
pull,主动从服务器上获取音讯
GetResponse response = channel.basicGet(QUEUE_NAME, true);
push,服务端推送音讯过来,履行回调函数
channel.basicConsume(QUEUE_NAME, true, myconsumer);
官网模型
1、Hello World
Producer端发送一个音讯到指定的queue,中心不需求任何exchange规则,Consumer端按queue消费
Producer
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicPublish("", QUEUE_NAME,null, message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer:
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
GetResponse response = channel.basicGet(QUEUE_NAME, true);
2、Work Queues
Producer:发音讯到方针行列
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);//耐久化音讯
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer:
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
3、Publish/Subscribe
发布/订阅机制
Producer只担任发送音讯到Exchange,再由Exchange(type=fanout)分配到与该Exchange绑定的Queue,顾客创立行列绑定到Exchange上
Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer: 绑定到Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
4、Routing
Producer只担任发送音讯到Exchange,Exchange(type=direct)依据routingkey
将不同类别的音讯分发到不同的Queue
Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, ro utingKey, null, message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);
5、Topics
Producer只担任发送音讯到Exchange,Exchange(type=direct)依据routingkey
将不同类别的音讯分发到不同的Queue,routingkey
支撑含糊匹配,单词间用.
离隔,*
代表一个单词,#
代表0个或多个单词
Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, "hero.yasuo.lol", null, message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.lol");
channel.queueBind(queueName, EXCHANGE_NAME, "#.lol");
channel.basicConsume(queueName, true, consumer);
6、RPC远程调用
异步降级为同步调用,不引荐运用
7、Publish Confirms
确保出产者发送成功,发送者发送音讯的根底APIProducer.basicPublish方法
是没有返回值的,也便是说,一次发送音讯是否成功,出产者是不知道的,这在事务上就容易形成音讯丢掉。而这个模块便是通过给发送者供给一些承认机制,来确保这个音讯发送的过程是成功的。