使用消息队列(如:RabbitMQ)的系统都是异步通信的,这种通信方式不能够保证发送的消息被对端成功的接收并且处理,因为有各种原因可能导致消息的丢失,所以发布者(Publisher)和消费者(Consumer)都需要一个机制来保证消息成功的传递和接收

Publisher确保消息传递的两种方法

事务机制

在标准的AMQP 0-9-1协议当中,确保消息不会丢失是通过使用事务的方式,就是对将用发送的消息创建一个Transaction channel ,下面的代码就是通过一个事务发送消息:

ch.QueueDeclare(queue, false, true, false, false, nil)
​
if err := ch.Tx(); err != nil {
    t.Fatalf("tx.select failed")
}
​
ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")})
​
if err := ch.TxCommit(); err != nil {
    t.Fatalf("tx.commit failed")
}

使用事务可以保证消息可以成功的被发送到消息队列当中,但同样会很大程度上降低吞吐量,在RabbitMQ官方文档中记载会降低250

当然,还有一种方式既可以保证吞吐量,还可以保证消息成功的传递,这种方式就是:Publisher Confirm,这是在AMQP协议中支持的消息确认机制

发布者确认机制

发布者的确认机制被称为Publisher Confirms,它为发布者提供了一种机制去追踪被RabbitMQ成功接收的消息,保证消息可以被成功的发送到消息队列中

在这种机制中,Client会发送confirm.select方法,然会broker会向客户端回传confirm.select-ok,前提是要设置nowait=false,否则就会立即确认;下面是代码示例:

package main
​
import (
    "fmt"
    "github.com/streadway/amqp"
)
​
func main() {
    conn, _ := amqp.Dial("amqp://test:123456@localhost:5673/test")
    defer conn.Close()
    channel, err := conn.Channel()
​
    if err != nil {
        fmt.Printf("error: %s n", err.Error())
        return
    }
​
    defer channel.Close()
  
  // 将 channel 放置到 confirm mode 
    err = channel.Confirm(false)
​
    if err != nil {
        fmt.Printf("error: %s n", err.Error())
        return
    }
​
    queue, err := channel.QueueDeclare("confirm:message", false, false, false, false, nil)
    if err != nil {
        fmt.Printf("error: %s n", err.Error())
        return
    }
  //添加监听器,监听 broker 的回应
    confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))
​
    defer confirmOne(confirms)
    err = channel.Publish("", queue.Name, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:     []byte("confirm message"),
    })
​
    if err != nil {
        fmt.Printf("error: %s n", err.Error())
        return
    }
​
    fmt.Println("success")
​
}
​
func confirmOne(confirms <-chan amqp.Confirmation) {
    if confirmed := <-confirms; confirmed.Ack {
        fmt.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
    } else {
        fmt.Printf("confirmed delivery of delivery tag: %d", confirmed.DeliveryTag)
    }
}

channel处于confirm mode时,brokerclient都会从1开始统计消息数,当broker收到消息之后会给client发送一个basic.ok数据,在broker回传给client的数据中包含字段delivery-tag为确认的消息序列号

当然,如果消息不能被成功的处理,那么就会发送basic.nack,如果发送的数据中requeue=false,那么同样也表明消息被成功处理了,而如果requeue=true时,那么消息就会再次入队,然会进行重发;代码如下:

// only ack the source delivery when the destination acks the publishing
if confirmed := <-confirms; confirmed.Ack {
    msg.Ack(false)
} else {
  // 第一个参数为 multiple,表示是否一次确认多条
  // 第二个参数为 requeue,表示是否重新入队
    msg.Nack(false, false)
}

上面说的broker回应确认消息是当消息到了exchange之后,而不是到了具体的队列当中,所以如果消息到了exchange之后无法路由到任何队列时,消息也会丢失

但是如果设置发送消息时的mandatory=true,那么就会给client发送一个basic.return,这样发布者就可以对其进行进一步的处理,保证消息成功发送到队列当中,代码如下所示:

package main
​
import (
    "github.com/streadway/amqp"
    "log"
)
​
type MQ struct {
    Channel *amqp.Channel
    notifyConfirm chan amqp.Confirmation
    notifyReturn chan amqp.Return
}
​
func (this *MQ) NotiryReturn() {
    conn, _ := amqp.Dial("amqp://test:123456@localhost:5673/test")
    defer conn.Close()
​
    ch, _ := conn.Channel()
    defer ch.Close()
​
    this.notifyReturn = this.Channel.NotifyReturn(make(chan amqp.Return))
    // listen basic.return message
    go this.listenReturn()
}
​
//消息是否正确入列
func (this *MQ) listenReturn() {
    ret := <-this.notifyReturn
    if len(string(ret.Body)) != 0 {
        log.Println("message can be route to queue:", string(ret.Body))
    }
}

一般来说,消息的确认和消息的发送是按照相同顺序的,但是对于一些特殊情况,比如:异步回应确认消息,或者有的消息需要持久化,有的不需要,所以就有可能出现乱序的情况;所以,发布者不应该根据确认的顺序来决定消息是否成功被处理了

注:对于持久化的消息则会在消息持久化之后再发送basic.ok,也就是说,消息存储到磁盘之后才会发送确认消息,所以会消费更多的时间

消费者交付确认 ((Consumer) Delivery Acknowledgements

消费者交付确认是指RabbitMQ Server确定消息是否传递成功,因为会有很多的原因会导致消息传递失败,比如:连接断开,Consumer APP退出,channel异常关闭等

Broker将消息传递给消费者时,Broker就需要决定该消息是否被消费者接受并处理了,如果被成功处理了,那么Broker就会立刻删除掉这条消息,否则就需要判断是否重发该消息

RabbitMQ提供了两种消息取人模式:

  • 自动确认(Automatic Acknowledgements):即消息发送出去之后就认为消息被成功传递了,这种模式下消息吞吐量会很高,但是会有丢失消息的风险,比如:消息发出之后TCP连接断了,那么这样消息就会丢失了

    当然,自动确认模式还有一点,就是巨大的吞吐量会给消费者带来压力,如果消费者不能及时处理消息,那么就会出现消息积压了,这样很容易撑爆消费者端的内存

  • 手动确认(manully Acknowledgements):需要消费者手动回应确认消息;手动确认模式也有可能存在消息积压,但是可以通过设置消费者的prefetch来限制消费数量;

    prefetch就类似于tcp滑动窗口一样,只有消费者确认了消息之后,broker才能继续发送消息给该消费者,否则,消息数的上限就是该prefetch的值

消费者是通过什么手动确认消息的呢

这跟发布者一样,Broker发送消息给消费者的时候会给消费者发送一个devlivery tag,它是一个从1开始向上递增的正数,当消费者需要手动确认该消息时就需要将这个delivery tag作为参数回送给Broker,这样Broker就能够确认了

当然,这个delivery tag是在一个channel中的唯一标识,所以消费消息的channel和回应确认消息的channel必须是一个,否则就会出错关闭channel

消费者发送的确认有那几种呢?

消费者回应的确认可以是积极的,也有可能是消极的,不同的确认方法对应不同的协议,如下所示:

  • basic.ack:用于积极确认
  • basic.nack:用于消极确认
  • basic.reject:用于消极确认,但与basic.nack相比有一个限制

如果是积极确认的话就会broker就会认为消息已传递成功,就会删除这条消息;而若收到basic.reject这种消极确认也是一样会删除消息,它们两个只是语义的不同,前者是认为消息已经被成功处理了,后者则认为消息不会被处理

下面是手动消息确认的代码示例:

package main
​
import (
    "bytes"
    "log"
    "time""github.com/streadway/amqp"
)
​
func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}
​
func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()
​
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()
​
    q, err := ch.QueueDeclare(
        "task_queue", // name
        true,     // durable
        false,    // delete when unused
        false,    // exclusive
        false,    // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")
​
    err = ch.Qos(
        1,   // prefetch count
        0,   // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")
​
    msgs, err := ch.Consume(
        q.Name, // queue
        "",   // consumer
    //如果此处为 true,则表示消息自动确认,false:手动确认
        false, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,  // args
    )
    failOnError(err, "Failed to register a consumer")
​
    var forever chan struct{}
​
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dotCount := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dotCount)
            time.Sleep(t * time.Second)
            log.Printf("Done")
      //确认消息,参数为 multiple:表示一次确认多条消息,这可以减少网络压力
            d.Ack(false)
        }
    }()
​
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

当处于手动确认模式时,如果没有收到确认消息,那么当传递消息的channel被关闭时就会将未确认的消息重新入队,被重新发送的消息会有一个字段redeliver=true,表明这条消息是重发的,第一次被发送的消息为false

参考文档:www.rabbitmq.com/confirms.ht…