RabbitMQ如何保證消息的可靠投遞?

RabbitMQ如何保證消息的可靠投遞?

在這裡插入圖片描述

Spring Boot整合RabbitMQ

github地址:
https://github.com/erlieStar/rabbitmq-examples

Spring有三種配置方式

  1. 基於XML

  2. 基於JavaConfig

  3. 基於註解

當然現在已經很少使用XML來做配置了,只介紹一下用JavaConfig和註解的配置方式

RabbitMQ整合Spring Boot,我們只需要增加對應的starter即可

<code> 
   org.springframework.boot
   spring-boot-starter-amqp
 
/<code>

基於註解

在application.yaml的配置如下

<code>spring:
  rabbitmq:
    host: myhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

log:
  exchange: log.exchange
  info:
    queue: info.log.queue
    binding-key: info.log.key
  error:
    queue: error.log.queue
    binding-key: error.log.key
  all:
    queue: all.log.queue
    binding-key: '*.log.key'
/<code>

消費者代碼如下

<code>@Slf4j
@Component
public class LogReceiverListener {

    /**
     * 接收info級別的日誌
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${log.info.queue}", durable = "true"),
                    exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
                    key = "${log.info.binding-key}"
            )
    )
    public void infoLog(Message message) {
        String msg = new String(message.getBody());
        log.info("infoLogQueue 收到的消息為: {}", msg);
    }

    /**
     * 接收所有的日誌
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${log.all.queue}", durable = "true"),
                    exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
                    key = "${log.all.binding-key}"
            )
    )
    public void allLog(Message message) {
        String msg = new String(message.getBody());
        log.info("allLogQueue 收到的消息為: {}", msg);
    }
}
/<code> 

生產者如下

<code>@RunWith(SpringRunner.class)
@SpringBootTest
public class MsgProducerTest {

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Value("${log.exchange}")
    private String exchange;
    @Value("${log.info.binding-key}")
    private String routingKey;

    @SneakyThrows
    @Test
    public void sendMsg() {
        for (int i = 0; i /<code>

Spring Boot針對消息ack的方式和原生api針對消息ack的方式有點不同

原生api消息ack的方式

消息的確認方式有2種

自動確認(autoAck=true)
手動確認(autoAck=false)

消費者在消費消息的時候,可以指定autoAck參數

String basicConsume(String queue, boolean autoAck, Consumer callback)

autoAck=false: RabbitMQ會等待消費者顯示回覆確認消息後才從內存(或者磁盤)中移出消息

autoAck=true: RabbitMQ會自動把發送出去的消息置為確認,然後從內存(或者磁盤)中刪除,而不管消費者是否真正的消費了這些消息

手動確認的方法如下,有2個參數

basicAck(long deliveryTag, boolean multiple)

deliveryTag: 用來標識信道中投遞的消息。RabbitMQ 推送消息給Consumer時,會附帶一個deliveryTag,以便Consumer可以在消息確認時告訴RabbitMQ到底是哪條消息被確認了。


RabbitMQ保證在每個信道中,每條消息的deliveryTag從1開始遞增

multiple=true: 消息id<=deliveryTag的消息,都會被確認

myltiple=false: 消息id=deliveryTag的消息,都會被確認

消息一直不確認會發生啥?

如果隊列中的消息發送到消費者後,消費者不對消息進行確認,那麼消息會一直留在隊列中,直到確認才會刪除。
如果發送到A消費者的消息一直不確認,只有等到A消費者與rabbitmq的連接中斷,rabbitmq才會考慮將A消費者未確認的消息重新投遞給另一個消費者

Spring Boot中針對消息ack的方式

有三種方式,定義在AcknowledgeMode枚舉類中


RabbitMQ如何保證消息的可靠投遞?


spring boot針對消息默認的ack的方式為AUTO。

在實際場景中,我們一般都是手動ack。

application.yaml的配置改為如下

<code>spring:
  rabbitmq:
    host: myhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 手動ack,默認為auto
/<code>

相應的消費者代碼改為

<code>@Slf4j
@Component
public class LogListenerManual {

    /**
     * 接收info級別的日誌
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${log.info.queue}", durable = "true"),
                    exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
                    key = "${log.info.binding-key}"
            )
    )
    public void infoLog(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("infoLogQueue 收到的消息為: {}", msg);
        try {
            // 這裡寫各種業務邏輯
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}
/<code>

我們上面用到的註解,作用如下


RabbitMQ如何保證消息的可靠投遞?


基於JavaConfig

既然用註解這麼方便,為啥還需要JavaConfig的方式呢?
JavaConfig方便自定義各種屬性,比如同時配置多個virtual host等

具體代碼看GitHub把

RabbitMQ如何保證消息的可靠投遞

一個消息往往會經歷如下幾個階段

RabbitMQ如何保證消息的可靠投遞?

在這裡插入圖片描述


所以要保證消息的可靠投遞,只需要保證這3個階段的可靠投遞即可


生產階段

這個階段的可靠投遞主要靠ConfirmListener(發佈者確認)和ReturnListener(失敗通知)
前面已經介紹過了,一條消息在RabbitMQ中的流轉過程為
producer -> rabbitmq broker cluster -> exchange -> queue -> consumer

ConfirmListener可以獲取消息是否從producer發送到broker
ReturnListener可以獲取從exchange路由不到queue的消息

我用Spring Boot Starter 的api來演示一下效果

application.yaml

<code>spring:
  rabbitmq:
    host: myhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 手動ack,默認為auto

log:
  exchange: log.exchange
  info:
    queue: info.log.queue
    binding-key: info.log.key
/<code>

發佈者確認回調

<code>@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private MessageSender messageSender;

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String msgId = correlationData.getId();
        String msg = messageSender.dequeueUnAckMsg(msgId);
        if (ack) {
            System.out.println(String.format("消息 {%s} 成功發送給mq", msg));
        } else {
            // 可以加一些重試的邏輯
            System.out.println(String.format("消息 {%s} 發送mq失敗", msg));
        }
    }
}
/<code>

失敗通知回調

<code>@Component
public class ReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        String msg = new String(message.getBody());
        System.out.println(String.format("消息 {%s} 不能被正確路由,routingKey為 {%s}", msg, routingKey));
    }
}
/<code>
<code>@Configuration
public class RabbitMqConfig {

    @Bean
    public ConnectionFactory connectionFactory(
            @Value("${spring.rabbitmq.host}") String host,
            @Value("${spring.rabbitmq.port}") int port,
            @Value("${spring.rabbitmq.username}") String username,
            @Value("${spring.rabbitmq.password}") String password,
            @Value("${spring.rabbitmq.virtual-host}") String vhost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         ReturnCallback returnCallback, ConfirmCallback confirmCallback) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        // 要想使 returnCallback 生效,必須設置為true
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}
/<code>

這裡我對RabbitTemplate做了一下包裝,主要就是發送的時候增加消息id,並且保存消息id和消息的對應關係,因為RabbitTemplate.ConfirmCallback只能拿到消息id,並不能拿到消息內容,所以需要我們自己保存這種映射關係。在一些可靠性要求比較高的系統中,你可以將這種映射關係存到數據庫中,成功發送刪除映射關係,失敗則一直髮送

<code>@Component
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public final Map unAckMsgQueue = new ConcurrentHashMap<>();

    public void convertAndSend(String exchange, String routingKey, String message) {
        String msgId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(msgId);
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        unAckMsgQueue.put(msgId, message);
    }

    public String dequeueUnAckMsg(String msgId) {
        return unAckMsgQueue.remove(msgId);
    }

}
/<code>

測試代碼為

<code>@RunWith(SpringRunner.class)
@SpringBootTest
public class MsgProducerTest {

    @Autowired
    private MessageSender messageSender;
    @Value("${log.exchange}")
    private String exchange;
    @Value("${log.info.binding-key}")
    private String routingKey;

    /**
     * 測試失敗通知
     */
    @SneakyThrows
    @Test
    public void sendErrorMsg() {
        for (int i = 0; i /<code>

先來測試失敗者通知

輸出為

<code>消息 {this is error message 0} 不能被正確路由,routingKey為 {test}
消息 {this is error message 0} 成功發送給mq
消息 {this is error message 2} 不能被正確路由,routingKey為 {test}
消息 {this is error message 2} 成功發送給mq
消息 {this is error message 1} 不能被正確路由,routingKey為 {test}
消息 {this is error message 1} 成功發送給mq
/<code>

消息都成功發送到broker,但是並沒有被路由到queue中

再來測試發佈者確認

輸出為

<code>消息 {this is info message 0} 成功發送給mq
infoLogQueue 收到的消息為: {this is info message 0}
infoLogQueue 收到的消息為: {this is info message 1}
消息 {this is info message 1} 成功發送給mq
infoLogQueue 收到的消息為: {this is info message 2}
消息 {this is info message 2} 成功發送給mq
/<code>

消息都成功發送到broker,也成功被路由到queue中

存儲階段

這個階段的高可用還真沒研究過,畢竟集群都是運維搭建的,後續有時間的話會把這快的內容補充一下

消費階段

消費階段的可靠投遞主要靠ack來保證。
前文已經介紹了原生api ack的方式和Spring Boot框架ack的方式

總而言之,在生產環境中,我們一般都是單條手動ack,消費失敗後不會重新入隊(因為很大概率還會再次失敗),而是將消息重新投遞到死信隊列,方便以後排查問題

總結一下各種情況

  1. ack後消息從broker中刪除

  2. nack或者reject後,分為如下2種情況
    (1) reque=true,則消息會被重新放入隊列
    (2) reque=fasle,消息會被直接丟棄,如果指定了死信隊列的話,會被投遞到死信隊列

相關閱讀


分享到:


相關文章: