/** * 创建队列 * 1.队列名 * 2.是否持久化 * 3.是否排他 * 4.是否自动删除 * * @return */ @Bean public Queue NoticeQueue() { Maparguments = new HashMap<>(); // 正常队列设置死信交换机 arguments.put("x-dead-letter-exchange", RabbitConstant.NOTICE_DEAD_EXCHANGE); // 设置死信routingkey arguments.put("x-dead-letter-routing-key", RabbitConstant.NOTICE_DEAD_ROUTING_KEY); // 设置正常队列长度的限制 // arguments.put("x-max-length", 6); // 设置延迟时间 // arguments.put("x-message-ttl", 10000); return new Queue(RabbitConstant.NOTICE_QUEUE, true, false, false, arguments); }
RabbitMQ可以对消息和队列设置TTL. 目前有两种方法可以设置。
/** * 创建队列 * 1.队列名 * 2.是否持久化 * 3.是否排他 * 4.是否自动删除 * * @return */ @Bean public Queue NoticeQueue() { Maparguments = new HashMap<>(); // 设置延迟时间 arguments.put("x-message-ttl", 10000); return new Queue(RabbitConstant.NOTICE_QUEUE, true, false, false, arguments); }
public void test(String param) { // 消息传输加工机 此处用于为消息设置TTL时间 MessagePostProcessor messagePostProcessor = message -> { message.getMessageProperties().setExpiration(param);// Expiration 到期 message.getMessageProperties().setContentEncoding("UTF-8"); return message; }; }
延时消息实现的两种方案
1.插件下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
下面延迟插件版本为3.12.0 RabbitMQ 3.12.7 Erlang 25.3.2.9
2.下载插件后, 将插件放到plugins目录中
3.在sbin中执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 启动插件
4.确认成功 在可视化界面中 exchange 中创建新交换机类型 有 x-delayed-message
5.创建交换机 声明为延时交换机
@Bean public DirectExchange noticeExchange() { DirectExchange directExchange = new DirectExchange(RabbitConstant.NOTICE_EXCHANGE); // 声明 延时交换机 directExchange.setDelayed(true); return directExchange; }
6.发送带有延时参数的消息
public void test(String param) { // 消息传输加工机 此处用于为消息设置TTL时间 MessagePostProcessor messagePostProcessor = message -> { message.getMessageProperties().setContentEncoding("UTF-8"); message.getMessageProperties().setDelay(Integer.valueOf(param)); return message; }; rabbitTemplate.convertAndSend(RabbitConstant.NOTICE_EXCHANGE, RabbitConstant.NOTICE_ROUTING_KEY, "测试消息" + param, messagePostProcessor); log.info("我TM进来了"); }
1.创建正常的交换机和队列,死信交换机和死信队列, 队列绑定死信交换机和死信routing-key
@Bean public DirectExchange noticeExchange() { DirectExchange directExchange = new DirectExchange(RabbitConstant.NOTICE_EXCHANGE); return directExchange; } @Bean public DirectExchange deadNoticeExchange() { return new DirectExchange(RabbitConstant.NOTICE_DEAD_EXCHANGE); } @Bean public Queue NoticeQueue() { Maparguments = new HashMap<>(); // 正常队列设置死信交换机 arguments.put("x-dead-letter-exchange", RabbitConstant.NOTICE_DEAD_EXCHANGE); // 设置死信routingkey arguments.put("x-dead-letter-routing-key", RabbitConstant.NOTICE_DEAD_ROUTING_KEY); // 设置正常队列长度的限制 // arguments.put("x-max-length", 6); // 设置延迟时间 // arguments.put("x-message-ttl", 10000); return new Queue(RabbitConstant.NOTICE_QUEUE, true, false, false, arguments); } @Bean public Queue deadNoticeQueue() { return new Queue(RabbitConstant.NOTICE_DEAD_QUEUE, true, false, false); } @Bean public Binding binding() { return BindingBuilder.bind(NoticeQueue()).to(noticeExchange()).with(RabbitConstant.NOTICE_ROUTING_KEY); } @Bean public Binding bindingDead() { return BindingBuilder.bind(deadNoticeQueue()).to(deadNoticeExchange()).with(RabbitConstant.NOTICE_DEAD_ROUTING_KEY); }
2.发送带有 过期时间的消息
@PostMapping("test") public void demoJobHandler(String param) { // 消息传输加工机 此处用于为消息设置TTL时间 MessagePostProcessor messagePostProcessor = message -> { message.getMessageProperties().setExpiration(param);// Expiration 到期 message.getMessageProperties().setContentEncoding("UTF-8"); return message; }; /** * 1.交换机 * 2.队列 * 3.消息 */ rabbitTemplate.convertAndSend(RabbitConstant.NOTICE_EXCHANGE, RabbitConstant.NOTICE_ROUTING_KEY, "测试消息" + param, messagePostProcessor); log.info("我TM进来了"); }
3.监听死信队列中的消息
@RabbitListener(bindings = { @QueueBinding(value = @Queue(RabbitConstant.NOTICE_DEAD_QUEUE), // 队列 exchange = @Exchange(value = RabbitConstant.NOTICE_DEAD_EXCHANGE), // 交换机 key = RabbitConstant.NOTICE_DEAD_ROUTING_KEY) // 路由键 }) public void consumerMessage1(Message message, Channel channel) { log.info("接受到消息"); RabbitMQUtil rabbitMQUtil = new RabbitMQUtil(message, channel); String messageBody = rabbitMQUtil.getMessageBody(); long messageTag = rabbitMQUtil.getMessageTag(); log.info("消息内容:{}, 消息标识:{}", messageBody, messageTag); if (!rabbitMQUtil.ack()) { log.info("消息已处理完成 回调失败"); } log.info("消息已处理完成 回调成功"); }
第一种 是可以完全理解上的延迟消息 到达时间就会消费
第二种 假设第一条消息的过期时间为30s 第二条消息为10s 但是因为实现方式问题, 队列为先进先出 ,所以只有第一条30s消费完后才会消费10s的消息 所以是满足不了场景