学会RabbitMQ的延迟队列,提高消息处理效率
作者:mmseoamin日期:2023-12-11

系列文章目录

手把手教你,本地RabbitMQ服务搭建(windows)

消息队列选型——为什么选择RabbitMQ

RabbitMQ灵活运用,怎么理解五种消息模型

RabbitMQ 能保证消息可靠性吗

推或拉? RabbitMQ 消费模式该如何选择

死信是什么,如何运用RabbitMQ的死信机制?

真的好用吗?鲜有人提的 RabbitMQ-RPC模式


如何利用RabbitMQ的延迟队列提高消息处理效率

  • 系列文章目录
  • 一、什么是延迟队列?
  • 二、延迟队列的实现
    • 1. x-delayed-message插件
    • 2. TTL + 死信队列
    • 三、手写延时队列
      • 1. 时间轮概念
      • 2. JAVA演示
      • 四、应用场景与注意事项
        • 1. 应用场景
        • 2. 注意事项
        • 总结

          学会RabbitMQ的延迟队列,提高消息处理效率,在这里插入图片描述,第1张

          前面我们讲到了RabbitMQ的死信队列,其实除了死信队列,RabbitMQ还有一个常用的延迟队列设计。今天,我们就来说一下这个延迟队列

          📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容

          📗本文收录于 RabbitMQ ,有需要者,可直接订阅专栏实时获取更新

          📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导

          📙Zookeeper Redis kafka docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


          提示:以下是本篇文章正文内容,下面案例可供参考

          一、什么是延迟队列?

          延迟队列指的是当我们将消息发送到RabbitMQ时,可以指定消息的有效期或者消息需要在未来某个时间点才能被消费。这种消息被称为“延迟消息”。因此,RabbitMQ支持通过延迟队列来实现延迟消息的发送和消费。

          二、延迟队列的实现

          延迟队列的实现原理其实就是将消息放入到一个普通的队列中,只不过这个队列有一个特殊的属性:消息的消费被延迟一段时间。这个延迟时间可以是任意的,也可以是固定的。当消息进入队列时,会有一个定时器在计时,当计时器到达设定的时间时,消息会被转移至消费队列等待被消费。

          在RabbitMQ中,延迟队列的实现有两种方式:一种是通过x-delayed-message插件实现;另一种是通过TTL(Time To Live)和死信队列实现。

          1. x-delayed-message插件

          x-delayed-message插件可以让RabbitMQ支持延迟消息功能,它是一个非官方插件,需要自行下载并安装。其源码地址如下:github地址 或 gitee地址;如果你是从笔者之前的安装博客 手把手教你,本地RabbitMQ服务搭建(windows) 过来的,那么你用的可能是RabbitMQ V3.12,可以直接下载我上传的资源 3.12-插件

          首先,需要在RabbitMQ服务器上安装x-delayed-message插件。把上述的插件复制进我们RabbitMQ的服务插件目录下

          学会RabbitMQ的延迟队列,提高消息处理效率,在这里插入图片描述,第2张

          然后执行插件的启用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可

          然后,在Java代码中定义queue、exchange和connectionFactory,代码如下:

          ConnectionFactory connectionFactory = new ConnectionFactory();
          connectionFactory.setHost(HOST_NAME);
          connectionFactory.setUsername(USERNAME);
          connectionFactory.setPassword(PASSWORD);
          connectionFactory.setPort(PORT);
          Connection connection = connectionFactory.newConnection();
          Channel channel = connection.createChannel();
          Map arguments = new HashMap();
          arguments.put("x-delayed-type", "direct");
          channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, arguments);
          channel.queueDeclare("delayed_queue", true, false, false, null);
          channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");
          

          不难发现,此时其实是交换机在做延迟,

          学会RabbitMQ的延迟队列,提高消息处理效率,在这里插入图片描述,第3张

          当然,除了交换机的设置,在发送消息时,还需要在消息头部设置x-delay属性,代码如下:

          AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
          builder.deliveryMode(2);
          builder.headers(new HashMap(){{put("x-delay", 5000);}});
          AMQP.BasicProperties properties = builder.build();
          channel.basicPublish("delayed_exchange", "delayed_routing_key", properties, message.getBytes());
          

          2. TTL + 死信队列

          此种方式的原理其实我们在学习死信队列的时候应该就察觉到了,就是利用消息超时(TTL)后会转入死信交换机的机制,其模型如下:

          学会RabbitMQ的延迟队列,提高消息处理效率,在这里插入图片描述,第4张

          首先,需要在Java代码中定义queue、exchange和connectionFactory,代码如下:

          ConnectionFactory connectionFactory = new ConnectionFactory();
          connectionFactory.setHost(HOST_NAME);
          connectionFactory.setUsername(USERNAME);
          connectionFactory.setPassword(PASSWORD);
          connectionFactory.setPort(PORT);
          Connection connection = connectionFactory.newConnection();
          Channel channel = connection.createChannel();
          Map arguments = new HashMap();
          arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
          arguments.put("x-dead-letter-routing-key", "dead_letter_routing_key");
          arguments.put("x-message-ttl", 5000);
          channel.exchangeDeclare("normal_exchange", "direct", true, false, null);
          channel.exchangeDeclare("dead_letter_exchange", "direct", true, false, null);
          channel.queueDeclare("normal_queue", true, false, false, arguments);
          channel.queueDeclare("dead_letter_queue", true, false, false, null);
          channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
          channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");
          

          在发送消息时,只需要将消息发送到normal_exchange交换机下,代码如下:

          channel.basicPublish("normal_exchange", "normal_routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
          

          三、手写延时队列

          当然,除了RabbitMQ,实现延时队列的方式还有很多,我们甚至可以自己实现,本节,我们就尝试自己写个延时队列

          1. 时间轮概念

          在关于计时或定时的设计里,时间轮是一种用于处理定时任务的数据结构。它通过将时间划分为一系列的时刻,每个时刻对应一个槽,将任务存储在相应的槽中

          学会RabbitMQ的延迟队列,提高消息处理效率,在这里插入图片描述,第5张

          时间轮通常包含多个槽和指针,其中指针指向当前时刻对应的槽,每过单位时间,指针就指向下一个槽,这样任务调度时按照指针的移动依次执行槽中的任务

          学会RabbitMQ的延迟队列,提高消息处理效率,在这里插入图片描述,第6张

          2. JAVA演示

          我们先使用JUC相关内容实现一个时间轮

          import java.util.*;
          import java.util.concurrent.*;
          class TimeWheel {
              private int size;
              private int currentIndex;
              private List> slots;
              private Executor executor;
              public TimeWheel(int size, Executor executor) {
                  this.size = size;
                  this.slots = new ArrayList<>(size);
                  for (int i = 0; i < size; i++) {
                      slots.add(new LinkedBlockingQueue<>());
                  }
                  this.executor = executor;
              }
              public void addTask(Task task) {
                  int expireIndex = (int)(currentIndex + task.getDelay() / 1000) % size;
                  slots.get(expireIndex).add(task);
              }
              public void start() {
                  new Thread(() -> {
                      while (true) {
                          currentIndex = (currentIndex + 1) % size;
                          BlockingQueue currentSlot = slots.get(currentIndex);
                          List tasks = new ArrayList<>();
                          currentSlot.drainTo(tasks);
                          for (Task task : tasks) {
                              executor.execute(task);
                          }
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  }).start();
              }
          }
          class Task implements Runnable {
              private long delay; // 延迟时间,单位毫秒
              private Runnable task; // 任务
              public Task(long delay, Runnable task) {
                  this.delay = delay;
                  this.task = task;
              }
              public long getDelay() {
                  return delay;
              }
              @Override
              public void run() {
                  task.run();
              }
          }
          

          我们可以使用main方法来尝试验证这个时间轮效果:

              public static void main(String[] args) {
                  TimeWheel timeWheel = new TimeWheel(60 * 60, Executors.newFixedThreadPool(10));
                  // 添加任务,延迟5秒执行
                  timeWheel.addTask(new Task(5000, () -> System.out.println("Task 1 executed!")));
                  // 添加任务,延迟10秒执行
                  timeWheel.addTask(new Task(10000, () -> System.out.println("Task 2 executed!")));
                  // 启动时间轮
                  timeWheel.start();
              }
          

          学会RabbitMQ的延迟队列,提高消息处理效率,在这里插入图片描述,第7张

          当然,以上代码只是一个简化的实现,实际情况中需要考虑任务执行时间和时间轮的精度等问题。

          四、应用场景与注意事项

          1. 应用场景

          1. 红包预告

            在现在的抢红包的场景下,当用户发起红包活动后,可能不希望立即开抢,而是设定在一段时间后开启。那么我们可以将将红包信息发送到一个延迟队列中,一定时间后,系统会自动激活红包,此时用户才可以真正抢红包

            学会RabbitMQ的延迟队列,提高消息处理效率,在这里插入图片描述,第8张

          2. 订单系统

            在订单系统中,有一些订单需要在未来某个时间点才能被处理。例如,有些订单需要在一定的时间之后才能发货或者确认收货。这时候,我们可以将这些订单放到延迟队列中,当时间到达时再进行处理。

          3. 优惠券系统

            在优惠券系统中,有一些优惠券需要在未来某个时间点才能使用。这时候,我们可以将这些优惠券放到延迟队列中,当时间到达时再进行激活。

          2. 注意事项

          1. 延迟队列不要使用太多

            使用延迟队列可以在一定程度上减少系统的负载,但是使用过多的延迟队列会导致系统变得更加复杂,维护起来也更加困难。

          2. 延迟队列可能会导致消息丢失

            在RabbitMQ中,当一个带有TTL消息被发送到队列中时,如果队列中的消息太多,或者队列的消费者速度太慢,就会导致消息失效,如果没有使用死信机制,消息就会被丢失。为了避免这种情况发生,我们需要对队列进行监控,及时发现问题并进行处理。

          3. 设置合适的延迟时间

            在使用延迟队列时,需要根据实际需求设置合适的延迟时间。如果延迟时间太短,可能会导致消息延迟效果不明显;如果延迟时间太长,可能会导致系统累积大量的消息,导致负载过高。

          总结

          RabbitMQ的延迟队列是一种非常实用的特性,可以帮助我们实现定时任务、限流、削峰等功能。但是,在使用延迟队列时,需要谨慎对待,根据实际需求设置合适的延迟时间,并及时监控队列中的消息,避免出现消息丢失的情况。