延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后才收到消息。
延迟任务:设置在一定时间之后才执行的任务。
延迟消息有以下三种实现方案:
TTL
死信交换机
成为死信(dead letter)的条件:
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
package com.itheima.consumer.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.security.auth.login.CredentialNotFoundException; @Configuration public class DlxExchangeConfiguration { /** * 声明 TTL 队列 * 1. 指定消息的 TTL * 2. 指定死信交换机 * 3. 指定死信交换机的 RoutingKey */ @Bean public Queue ttlQueue() { return QueueBuilder .durable("ttl.queue") // 指定队列的名称 //.ttl(10000) // 指定 TTL 为 10 秒,这里可设置过期时间,但我这里测试在发送消息时设置过期时间 .deadLetterExchange("dlx.direct") // 指定死信交换机 .deadLetterRoutingKey("dlx") // 指定死信交换机的 RoutingKey .build(); } /** * 声明TTl交换机 * @return */ @Bean public DirectExchange directExchange(){ return new DirectExchange("ttl.direct"); } /** * 声明ttl交换机与队列的关联关系 * @return */ @Bean public Binding directBinding(){ return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("ttl"); } /** * 声明死信交换机 * @return */ @Bean public DirectExchange dlxDirect(){ return new DirectExchange("dlx.direct"); } /** * 声明死信队列 * @return */ @Bean public Queue dlxQueue(){ return new Queue("dlx.queue"); } /** * 声明死信交换机与队列关联关系 * @return */ @Bean public Binding tlxBinding(){ return BindingBuilder.bind(dlxQueue()).to(dlxDirect()).with("dlx"); } }
@Test void testSendTTLMessage(){ rabbitTemplate.convertAndSend("ttl.direct", "ttl", "hello", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000"); return message; } }); log.info("消息发送成功!"); }
@RabbitListener(queues = "dlx.queue") //监听的队列:dlx.queue public void listenDlxQueue(String msg){ log.info("dlx.queue的消息:【"+msg+"】"); }
4.1、发送时间,设置10s过期
4.2、死信队列消费消息时间
RabbitMQ官方推出的插件,原生支持延迟消息的功能。其原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
本地RabbitMQ官网下载rabbitmq_delayer_message_exchange插件地址:
https://www.rabbitmq.com/community-plugins.html
代码实现:
声明延迟交换机方式一:
@Bean public DirectExchange delayExchange(){ return ExchangeBuilder .directExchange("delay.direct") .delayed() //设置delay的属性为true .durable(true) .build(); }
方式二:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(value = "delay.direct",delayed = "true"), //delayed 这是延迟交换机开启 key = "delay" )) public void listenDelayMessage(String msg){ log.info("接收到delay.queue的延迟消息:{}",msg); }
测试发送消息
@Test void testSendDelayMessage(){ rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(1000); return message; } }); log.info("消息发送成功!"); }
电商项目设置30分钟后检测订单支付状态,并完成取消超时订单
设置30分钟检查订单状态存在两个问题:
解决措施:设置多个延迟消息交换机,如设置不同的等待时间:10s、10s、10s、15s、15s....,这些时间相加得到30分钟,不同延迟时间过滤掉大部分的消息,给MQ减压
首先先查询支付状态,判断是否支付,如果状态显示未支付,则获取下次延迟时间,判断是否有延迟时间,有则重发延迟消息,没有延迟消息则取消订单。如果订单显示已支付,则标记未已支付。
定义延迟时间集合及相关方法:
package com.itheima.consumer.config; import lombok.Data; import java.util.Arrays; import java.util.List; @Data public class MultiDelayMessage{ //消息体 private T data; //记录延迟消息时间的集合 private List delayMillis; public MultiDelayMessage(T data, List delayMillis){ this.data = data; this.delayMillis = delayMillis; } public static MultiDelayMessage of(T data, Long ... delayMillis){ return new MultiDelayMessage<>(data, Arrays.asList(delayMillis)); } //获取并移除下一个延迟时间 //Returns: 队列中的第一个延迟时间 public Long removeNextDelay(){ return delayMillis.remove(0); } //是否还有下一个延迟时间 public boolean hasNextDelay(){ return !delayMillis.isEmpty(); } }
定义交换机、队列名称以及key:
public interface MqConstants { String DELAY_EXCHANGE = "trade.delay.topic"; //交换机名称 String DELAY_ORDER_QUEUE = "trade.order.delay.queue"; //队列名称 String DELAY_PRDER_ROUTING_KEY = "order.query"; //key }
定义延迟消息体:
import lombok.RequiredArgsConstructor; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; @RequiredArgsConstructor public class DelayMessageProcessor implements MessagePostProcessor { private final int delay; @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(delay); return null; } }
实现发送延迟消息:
/* * 生成订单后,发送延迟检查订单消息 */ public void testOrderStatic(){ try{ MultiDelayMessagemsg = MultiDelayMessage.of("这里是订单ID", 1000L, 1000L, 1000L, 1500L, 1500L); //这里的消息体是订单ID,后面是延迟消息时间集合 rabbitTemplate.convertAndSend( MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_PRDER_ROUTING_KEY,msg, new DelayMessageProcessor(msg.removeNextDelay().intValue()) ); }catch (AmqpException e){ log.error("延迟消息发送失败!"); } }
消费延迟消息的大概思路:
import com.itheima.constants.DelayMessageProcessor; import com.itheima.constants.MqConstants; import com.itheima.constants.MultiDelayMessage; import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; @RequiredArgsConstructor public class OrderStatusCheckListener { private RabbitTemplate rabbitTemplate; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"), exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE, delayed = "true" ,type = ExchangeTypes.TOPIC), key = MqConstants.DELAY_PRDER_ROUTING_KEY )) public void listenOderDelayMessage(MultiDelayMessagemsg){ //1、查询订单状态 //2、判断是否已支付 //2.1订单不存在或者已经处理(订单显示已支付)----交易服务显示已支付,则表示已支付,直接return //(交易服务显示未支付的情况下)3、去支付服务查询真正的支付状态 ---- 这里是在交易服务查询显示未支付,但不一定是未支付,需要去支付服务查询确定一下 //3.1、判断支付服务的订单支付状态,已支付则标记订单状态为已支付,直接return //4、判断是否存在延迟时间 if (msg.hasNextDelay()){ //4.1、存在,则重发延迟消息 Long nestDelay = msg.removeNextDelay(); rabbitTemplate.convertAndSend( MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_PRDER_ROUTING_KEY, msg, new DelayMessageProcessor(nestDelay.intValue()) ); return; } //5、不存在,取消订单,修改订单状态为取消订单 //6、恢复库存 } }