RabbitMQ之消息的可靠性传递
作者:mmseoamin日期:2024-02-06

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加

RabbitMQ之消息的可靠性传递


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 系列文章目录
  • 前言
  • 一、消息的可靠性传递的概念
  • 二、三种模式的实现
    • 环境准备
    • 确认模式
    • 退回模式
    • 消费者确认
    • 总结

      前言

      提示:这里可以添加本文要记录的大概内容:

      在当今的信息化时代,消息传递在企业级应用和分布式系统中扮演着至关重要的角色。而 RabbitMQ 作为一款强大的消息队列中间件,以其可靠性和高性能成为了众多开发者的首选。本文将深入探讨 RabbitMQ 中消息的可靠性传递机制,以及如何在实际应用中确保消息的不丢失。

      通过阅读本文,您将了解到 RabbitMQ 可靠消息传递的核心概念和工作原理。我们将探讨消息确认、持久性、队列和交换机的配置以及错误处理等关键主题,以帮助您构建高度可靠的消息传递系统。

      无论您是刚刚开始接触 RabbitMQ,还是已经在实际项目中使用它,本文都将为您提供有价值的见解和实用的指导。让我们一起深入了解 RabbitMQ 的可靠性特性,掌握构建可靠系统的关键技能。


      提示:以下是本篇文章正文内容,下面案例可供参考

      一、消息的可靠性传递的概念

      在RabbitMQ中,消息投递的路径为:生产者->交换机->队列->消费者。而在消息的投递过程中,每一个环节都可能投递失败,那么RabbitMQ是通过什么方法确认消息投递成功的呢?

      • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
      • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
      • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

        二、三种模式的实现

        环境准备

        1.首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

        spring:
          rabbitmq:
           host: 192.168.0.162
           port: 5672
           username: zhangsan
           password: zhangsan
           virtual-host: /
          
        #日志格式
        logging:
          pattern:
           console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
        

        2.在生产者的配置类创建交换机和队列

        @Configuration
        public class RabbitConfig {
          private final String EXCHANGE_NAME="my_topic_exchange";
          private final String QUEUE_NAME="my_queue";
          // 1.创建交换机
          @Bean("bootExchange")
          public Exchange getExchange(){
            return ExchangeBuilder
                 .topicExchange(EXCHANGE_NAME) // 交换机类型
                 .durable(true) // 是否持久化
                   .build();
           }
          // 2.创建队列
          @Bean("bootQueue")
          public Queue getMessageQueue(){
            return QueueBuilder
                 .durable(QUEUE_NAME) // 队列持久化
                 .build();
           }
          // 3.将队列绑定到交换机
          @Bean
          public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){
            return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
           }
        }
        

        确认模式

        1.生产者配置文件开启确认模式

        spring:
          rabbitmq:
           host: 192.168.0.162
           port: 5672
           username: zhangsan
           password: zhangsan
           virtual-host: /
          # 开启确认模式
           publisher-confirm-type: correlated
        

        2.生产者定义确认模式的回调方法

        @SpringBootTest
        public class ProducerTest {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          @Test
          public void testConfirm(){
            // 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
              /**
               * 被调用的回调方法
               * @param correlationData 相关配置信息
               * @param ack 交换机是否成功收到了消息
               * @param cause 失败原因
               */
              @Override
              public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                  System.out.println("confirm接受成功!");
                 }else{
                  System.out.println("confirm接受失败,原因为:"+cause);
                  // 做一些处理。
                 }
               }
             });
            rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
           }
        }
        

        退回模式

        1.生产者配置文件开启退回模式

        spring:
          rabbitmq:
           host: 192.168.0.162
           port: 5672
           username: zhangsan
           password: zhangsan
           virtual-host: /
          # 开启确认模式
           publisher-confirm-type: correlated
          # 开启回退模式
           publisher-returns: true
        

        2.生产者定义退回模式的回调方法

        @SpringBootTest
        public class ProducerTest {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          @Test
          public void testReturn(){
            // 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
              /**
               * @param returned 失败后将失败信息封装到参数中
               */
              @Override
              public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息对象:"+returned.getMessage());
                System.out.println("错误码:"+returned.getReplyCode());
                System.out.println("错误信息:"+returned.getReplyText());
                System.out.println("交换机:"+returned.getExchange());
                System.out.println("路由键:"+returned.getRoutingKey());
                // 处理消息...
               }
             });
            rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");
           }
        }
        

        消费者确认

        在 RabbitMQ 中,当消费者接收到消息后,会向队列发送一个确认消息,表示已经成功接收并处理了该消息。只有当确认消息被发送后,该消息才会从队列中被移除。这种机制被称为消费者消息确认(Consumer Acknowledge,简称 Ack)。这就类似于快递员派送快递时需要我们签收一样,否则快递就会一直存在于快递公司的系统中。

        消息确认分为自动确认和手动确认两种方式。自动确认意味着只要消费者接收到消息,无论是否成功处理了该消息,都会自动发送确认消息,并将消息从队列中移除。然而,在实际开发过程中,如果在接收到消息后业务处理出现异常,那么消息就可能会丢失。因此,需要设置手动确认,也就是只有在业务处理成功后,才会发送确认消息通知队列;如果出现异常,则会发送拒绝消息,让消息仍然保留在队列中。

        • 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
        • 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”

          1.消费者配置开启手动签收

          spring:
            rabbitmq:
             host: 192.168.0.162
             port: 5672
             username: zhangsan
             password: zhangsan
             virtual-host: /
            # 开启手动签收
             listener:
              simple:
               acknowledge-mode: manual
          

          2.消费者处理消息时定义手动签收和拒绝签收的情况

          @Component
          public class AckConsumer {
            @RabbitListener(queues = "my_queue")
            public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
              // 消息投递序号,消息每次投递该值都会+1
              long deliveryTag = message.getMessageProperties().getDeliveryTag();
              try {
                int i = 1/0; //模拟处理消息出现bug
                System.out.println("成功接受到消息:"+message);
                // 签收消息
                /**
                 * 参数1:消息投递序号
                 * 参数2:是否一次可以签收多条消息
                 */
                channel.basicAck(deliveryTag,true);
               }catch (Exception e){
                System.out.println("消息消费失败!");
                Thread.sleep(2000);
                // 拒签消息
                /**
                 * 参数1:消息投递序号
                 * 参数2:是否一次可以拒签多条消息
                 * 参数3:拒签后消息是否重回队列
                 */
                channel.basicNack(deliveryTag,true,true);
               }
             }
          }
          

          总结

          提示:这里对文章进行总结:

          在 RabbitMQ 中,为了确保消息的可靠性传递,需要使用确认应答和重传机制。当生产者将消息发送到 RabbitMQ 服务器时,服务器会向生产者返回一个确认应答,表示消息已成功接收。如果生产者没有收到确认应答,它可以在一定时间内重传消息,直到收到确认应答或达到重试次数限制。

          当消费者从队列中消费消息时,它会向 RabbitMQ 服务器发送一个确认应答,表示消息已成功处理。如果消费者在处理消息时出现异常或崩溃,RabbitMQ 服务器可以通过重传机制将消息重新推送给其他消费者进行处理,以确保消息不会丢失。