使用消息队列(如:RabbitMQ
)的系统都是异步通信的,这种通信方式不能够保证发送的消息被对端成功的接收并且处理,因为有各种原因可能导致消息的丢失,所以发布者(Publisher
)和消费者(Consumer
)都需要一个机制来保证消息成功的传递和接收
Publisher
确保消息传递的两种方法
事务机制
在标准的AMQP 0-9-1
协议当中,确保消息不会丢失是通过使用事务的方式,就是对将用发送的消息创建一个Transaction channel
,下面的代码就是通过一个事务发送消息:
ch.QueueDeclare(queue, false, true, false, false, nil)
if err := ch.Tx(); err != nil {
t.Fatalf("tx.select failed")
}
ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")})
if err := ch.TxCommit(); err != nil {
t.Fatalf("tx.commit failed")
}
使用事务可以保证消息可以成功的被发送到消息队列当中,但同样会很大程度上降低吞吐量,在RabbitMQ
官方文档中记载会降低250
倍
当然,还有一种方式既可以保证吞吐量,还可以保证消息成功的传递,这种方式就是:Publisher Confirm
,这是在AMQP
协议中支持的消息确认机制
发布者确认机制
发布者的确认机制被称为Publisher Confirms
,它为发布者提供了一种机制去追踪被RabbitMQ
成功接收的消息,保证消息可以被成功的发送到消息队列中
在这种机制中,Client
会发送confirm.select
方法,然会broker
会向客户端回传confirm.select-ok
,前提是要设置nowait=false
,否则就会立即确认;下面是代码示例:
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
conn, _ := amqp.Dial("amqp://test:123456@localhost:5673/test")
defer conn.Close()
channel, err := conn.Channel()
if err != nil {
fmt.Printf("error: %s n", err.Error())
return
}
defer channel.Close()
// 将 channel 放置到 confirm mode
err = channel.Confirm(false)
if err != nil {
fmt.Printf("error: %s n", err.Error())
return
}
queue, err := channel.QueueDeclare("confirm:message", false, false, false, false, nil)
if err != nil {
fmt.Printf("error: %s n", err.Error())
return
}
//添加监听器,监听 broker 的回应
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))
defer confirmOne(confirms)
err = channel.Publish("", queue.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("confirm message"),
})
if err != nil {
fmt.Printf("error: %s n", err.Error())
return
}
fmt.Println("success")
}
func confirmOne(confirms <-chan amqp.Confirmation) {
if confirmed := <-confirms; confirmed.Ack {
fmt.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
fmt.Printf("confirmed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
}
当channel
处于confirm mode
时,broker
和client
都会从1
开始统计消息数,当broker
收到消息之后会给client
发送一个basic.ok
数据,在broker
回传给client
的数据中包含字段delivery-tag
为确认的消息序列号
当然,如果消息不能被成功的处理,那么就会发送basic.nack
,如果发送的数据中requeue=false
,那么同样也表明消息被成功处理了,而如果requeue=true
时,那么消息就会再次入队,然会进行重发;代码如下:
// only ack the source delivery when the destination acks the publishing
if confirmed := <-confirms; confirmed.Ack {
msg.Ack(false)
} else {
// 第一个参数为 multiple,表示是否一次确认多条
// 第二个参数为 requeue,表示是否重新入队
msg.Nack(false, false)
}
上面说的broker
回应确认消息是当消息到了exchange
之后,而不是到了具体的队列当中,所以如果消息到了exchange
之后无法路由到任何队列时,消息也会丢失
但是如果设置发送消息时的mandatory=true
,那么就会给client
发送一个basic.return
,这样发布者就可以对其进行进一步的处理,保证消息成功发送到队列当中,代码如下所示:
package main
import (
"github.com/streadway/amqp"
"log"
)
type MQ struct {
Channel *amqp.Channel
notifyConfirm chan amqp.Confirmation
notifyReturn chan amqp.Return
}
func (this *MQ) NotiryReturn() {
conn, _ := amqp.Dial("amqp://test:123456@localhost:5673/test")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
this.notifyReturn = this.Channel.NotifyReturn(make(chan amqp.Return))
// listen basic.return message
go this.listenReturn()
}
//消息是否正确入列
func (this *MQ) listenReturn() {
ret := <-this.notifyReturn
if len(string(ret.Body)) != 0 {
log.Println("message can be route to queue:", string(ret.Body))
}
}
一般来说,消息的确认和消息的发送是按照相同顺序的,但是对于一些特殊情况,比如:异步回应确认消息,或者有的消息需要持久化,有的不需要,所以就有可能出现乱序的情况;所以,发布者不应该根据确认的顺序来决定消息是否成功被处理了
注:对于持久化的消息则会在消息持久化之后再发送
basic.ok
,也就是说,消息存储到磁盘之后才会发送确认消息,所以会消费更多的时间
消费者交付确认 ((Consumer) Delivery Acknowledgements
)
消费者交付确认是指RabbitMQ Server
确定消息是否传递成功,因为会有很多的原因会导致消息传递失败,比如:连接断开,Consumer APP
退出,channel
异常关闭等
当Broker
将消息传递给消费者时,Broker
就需要决定该消息是否被消费者接受并处理了,如果被成功处理了,那么Broker
就会立刻删除掉这条消息,否则就需要判断是否重发该消息
RabbitMQ
提供了两种消息取人模式:
-
自动确认(
Automatic Acknowledgements
):即消息发送出去之后就认为消息被成功传递了,这种模式下消息吞吐量会很高,但是会有丢失消息的风险,比如:消息发出之后TCP
连接断了,那么这样消息就会丢失了当然,自动确认模式还有一点,就是巨大的吞吐量会给消费者带来压力,如果消费者不能及时处理消息,那么就会出现消息积压了,这样很容易撑爆消费者端的内存
-
手动确认(
manully Acknowledgements
):需要消费者手动回应确认消息;手动确认模式也有可能存在消息积压,但是可以通过设置消费者的prefetch
来限制消费数量;prefetch
就类似于tcp
的滑动窗口一样,只有消费者确认了消息之后,broker
才能继续发送消息给该消费者,否则,消息数的上限就是该prefetch
的值
消费者是通过什么手动确认消息的呢
这跟发布者一样,Broker
发送消息给消费者的时候会给消费者发送一个devlivery tag
,它是一个从1
开始向上递增的正数,当消费者需要手动确认该消息时就需要将这个delivery tag
作为参数回送给Broker
,这样Broker
就能够确认了
当然,这个delivery tag
是在一个channel
中的唯一标识,所以消费消息的channel
和回应确认消息的channel
必须是一个,否则就会出错关闭channel
消费者发送的确认有那几种呢?
消费者回应的确认可以是积极的,也有可能是消极的,不同的确认方法对应不同的协议,如下所示:
-
basic.ack
:用于积极确认 -
basic.nack
:用于消极确认 -
basic.reject
:用于消极确认,但与basic.nack
相比有一个限制
如果是积极确认的话就会broker
就会认为消息已传递成功,就会删除这条消息;而若收到basic.reject
这种消极确认也是一样会删除消息,它们两个只是语义的不同,前者是认为消息已经被成功处理了,后者则认为消息不会被处理
下面是手动消息确认的代码示例:
package main
import (
"bytes"
"log"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
//如果此处为 true,则表示消息自动确认,false:手动确认
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
//确认消息,参数为 multiple:表示一次确认多条消息,这可以减少网络压力
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
当处于手动确认模式时,如果没有收到确认消息,那么当传递消息的channel
被关闭时就会将未确认的消息重新入队,被重新发送的消息会有一个字段redeliver=true
,表明这条消息是重发的,第一次被发送的消息为false
参考文档:www.rabbitmq.com/confirms.ht…