相关推荐recommended
Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)
作者:mmseoamin日期:2023-12-20

目录

  • 一、序言
  • 二、死信交换机和消息TTL实现延迟消息
    • 1、死信队列介绍
    • 2、代码示例
      • (1) 死信交换机配置
      • (2) 消息生产者
      • (3) 消息消费者
      • 3、测试用例
      • 三、延迟消息交换机实现延迟消息
        • 1、安装延时消息插件
        • 2、代码示例
          • (1) 延时消息交换机配置
          • (2) 消息生产者
          • (3) 消息消费者
          • 3、测试用例
          • 四、两种实现方式优缺点
            • 1、延时消息插件
            • 2、TLL&死信交换机

              一、序言

              业务开发中有很多延时操作的场景,比如最常见的超时订单自动关闭、延时异步处理,我们常用的实现方式有:

              • 定时任务轮询(有延时)。
              • 借助Redission的延时队列。
              • Redis的key过期事件通知机制(需开启key过期事件通知,对Redis有性能损耗)。
              • RocketMQ中定时消息推送(支持的时间间隔固定,不支持自定义)。
              • RabbitMQ中的死信队列和延迟消息交换机。

                其中用的最多的也是借助Redisson实现的数据结构延迟队列和RabbitMQ中的死信队列来实现,今天我们通过RabbitMQ死信队列和延迟消息交换机(新特性)来实现延时消息推送。


                二、死信交换机和消息TTL实现延迟消息

                1、死信队列介绍

                这种方式主要通过结合消息过期和私信交换机来实现延迟消息推送,首先先了解下哪些消息会进入死信队列:

                • 被消费者nack(negatively acknowleged)的消息。
                • TTL过期后未被消费的消息。
                • 超过队列长度限制后被丢弃的消息。

                  备注:更多信息请参考RabbitMQ中的 Dead Letter Exchange。

                  2、代码示例

                  (1) 死信交换机配置

                  @Configuration
                  protected static class DeadLetterExchangeConfig {
                  	@Bean
                  	public Queue deadLetterQueue(){
                  		return QueueBuilder.durable("dead-letter-queue").build();
                  	}
                  	@Bean
                  	public DirectExchange deadLetterExchange() {
                  		return ExchangeBuilder.directExchange("dead-letter-exchange").build();
                  	}
                  	@Bean
                  	public Binding bindQueueToDeadLetterExchange(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
                  		return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");
                  	}
                  	@Bean
                  	public Queue normalQueue() {
                  		return QueueBuilder.durable("normal-queue")
                  			.deadLetterExchange("dead-letter-exchange")
                  			.deadLetterRoutingKey("dead-letter-routing-key")
                  			.build();
                  	}
                  }
                  

                  (2) 消息生产者

                  @Slf4j
                  @Component
                  @RequiredArgsConstructor
                  public class RabbitMqProducer {
                  	private final RabbitTemplate rabbitTemplate;
                  	
                  	public void sendMsgToDeadLetterExchange(String body, int timeoutInMillSeconds) {
                  		log.info("开始发送消息到dead letter exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());
                  		MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setExpiration(String.valueOf(timeoutInMillSeconds)).build();
                  		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
                  		rabbitTemplate.send("normal-queue", message);
                  	}
                  	
                  }
                  

                  (3) 消息消费者

                  @Slf4j
                  @Component
                  public class RabbitMqConsumer {
                  	@RabbitListener(queues = "dead-letter-queue")
                  	public void handleMsgFromDeadLetterQueue(String msg) {
                  		log.info("Message received from dead-letter-queue, message body: {}, current time:{}", msg, LocalDateTime.now());
                  	}
                  }
                  

                  3、测试用例

                  @RestController
                  @RequiredArgsConstructor
                  public class RabbitMsgController {
                  	private final RabbitMqProducer rabbitMqProducer;
                  	
                  	@RequestMapping("/exchange/dead-letter")
                  	public ResponseEntity sendMsgToDeadLetterExchange(String body, int timeout) {
                  		rabbitMqProducer.sendMsgToDeadLetterExchange(body, timeout);
                  		return ResponseEntity.ok("消息发送到死信交换机成功");
                  	}
                  	
                  }
                  

                  浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

                  2023-11-26 11:50:33.041  INFO 19152 --- [nio-8080-exec-7] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到dead letter exchange 消息体:hello, 消息延迟:5000ms, 当前时间:2023-11-26T11:50:33.041
                  2023-11-26 11:50:38.054  INFO 19152 --- [ntContainer#4-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from dead-letter-queue, message body: hello, current time:2023-11-26T11:50:38.054
                  

                  三、延迟消息交换机实现延迟消息

                  上面通过消息TTL和死信交换机实现延迟消息的解决方案是由一个叫James Carr的人提出来的,后来RabbitMQ提供了一个开箱即用的解决方案,通过延时消息插件来实现。

                  该插件以前被当做是试验性产品,但是现在已经可以投产使用了。(PS:2015年4月16号就已经有该插件文档)

                  在Spring AMQP中,同样提供了对该延时消息插件的支持,并且在RabbitMQ 3.6.0版本就已经测试通过。

                  1、安装延时消息插件

                  该延时消息插件为社区插件,因此需要自己手动下载安装的RabbMQ版本对应的插件,下载地址:RabbitMQ延时消息插件releases。

                  我安装的RabbitMQ版本为3.9.9,3.9.0版本的插件对所有3.9.x版本的RabbitMQ都支持。

                  Spring RabbitMQ那些事(2-两种方式实现延时消息订阅),在这里插入图片描述,第1张

                  下载完后把.ez结尾的插件复制RabbitMQ的插件目录下,插件目录为/usr/lib/rabbitmq/plugins 。

                  Spring RabbitMQ那些事(2-两种方式实现延时消息订阅),在这里插入图片描述,第2张

                  通过命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange安装该插件,通过命令rabbitmq-plugins list查看插件列表,可以看到该延时消息插件已经成功安装。

                  Spring RabbitMQ那些事(2-两种方式实现延时消息订阅),在这里插入图片描述,第3张

                  2、代码示例

                  (1) 延时消息交换机配置

                  @Configuration
                  protected static class DelayedMsgExchangePluginConfig {
                  	@Bean
                  	public Queue delayedQueue() {
                  		return QueueBuilder.durable("delayed-queue").build();
                  	}
                  	@Bean
                  	public DirectExchange delayedExchange() {
                  		return ExchangeBuilder.directExchange("delayed-exchange").delayed().build();
                  	}
                  	@Bean
                  	public Binding bindDelayedQueueToDelayedChange(Queue delayedQueue, DirectExchange delayedExchange) {
                  		return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed-routing-key");
                  	}
                  }
                  

                  备注:延时交换机的类型可以为DirectExchage、TopicExcahge和FanoutExchange,这些都支持。

                  (2) 消息生产者

                  @Slf4j
                  @Component
                  @RequiredArgsConstructor
                  public class RabbitMqProducer {
                  	private final RabbitTemplate rabbitTemplate;
                  	public void sendDelayedMsg(String body, int timeoutInMillSeconds) {
                  		log.info("开始发送消息到delayed-exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());
                  		MessageProperties messageProperties = new MessageProperties();
                  		messageProperties.setDelay(timeoutInMillSeconds);
                  		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
                  		rabbitTemplate.send("delayed-exchange", "delayed-routing-key", message);
                  	}
                  }
                  

                  (3) 消息消费者

                  @Slf4j
                  @Component
                  public class RabbitMqConsumer {
                  	@RabbitListener(queues = "delayed-queue")
                  	public void handleMsgFromDelayedQueue(String msg) {
                  		log.info("Message received from delayed-queue, message body: {}, current time:{}", msg, LocalDateTime.now());
                  	}
                  }
                  

                  3、测试用例

                  @RestController
                  @RequiredArgsConstructor
                  public class RabbitMsgController {
                  	private final RabbitMqProducer rabbitMqProducer;
                  	@RequestMapping("/exchange/delayed")
                  	public ResponseEntity sendMsgToHeadersExchange(String body, int timeout) {
                  		rabbitMqProducer.sendDelayedMsg(body, timeout);
                  		return ResponseEntity.ok("消息发送到延迟交换机成功");
                  	}
                  	
                  }
                  

                  浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

                  2023-11-26 13:02:07.816  INFO 26524 --- [nio-8080-exec-3] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到delayed-exchange 消息体:Hello, 消息延迟:5000ms, 当前时间:2023-11-26T13:02:07.816
                  2023-11-26 13:02:12.830  INFO 26524 --- [ntContainer#5-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from delayed-queue, message body: Hello, current time:2023-11-26T13:02:12.829
                  

                  四、两种实现方式优缺点

                  1、延时消息插件