🎉🎉欢迎来到我的CSDN主页!🎉🎉
🏅我是君易--鑨,一个在CSDN分享笔记的博主。📚📚
🌟推荐给大家我的博客专栏《RabbitMQ系列之死信交换机的使用》。🎯🎯
🎁如果感觉还不错的话请给我关注加三连吧!🎁🎁
在我们上一期的RabbitMQ博客系列的分享中我们分享了有关RabbitMQ中交换机的使用,其中讲解到什么是交换机,以及交换机的类型。主要讲述了直连交换机、主题交换机、扇形交换机的示例使用,还有一个死信交换机的示例使用没有讲到,本期博客就针对与死信交互机详细的讲解一下。
死信交换机(Dead-Letter-Exchange,简称DLX)是 RabbitMQ 的一种消息机制,用于处理那些无法被正常消费的消息。当消息在一个队列中满足某些条件(如过期、被拒绝等)时,这些消息会被放入死信队列中。如果队列配置了死信交换机属性,这些死信消息会被投递到指定的交换机中,这个交换机就被称为死信交换机。
死信交换机的使用可以帮助避免消息的丢失,并提供了重试和诊断问题的机制。当消息成为死信后,可以将其重新投递到另一个队列中,以便再次尝试消费。此外,死信交换机还可以用于实现延时队列的功能。
当我们发送一个消息到我们的RabbitMQ中,我们的一个消息被消费者拒绝后,可以通过设置requeue参数为false,使其消息成为死信消息并被投递到死信交换机中。死信交换机可以与普通交换机一样绑定队列,将死信路由与他绑定的队列中。通过这种的方式可以实现对问题的诊断和重试消费。
死信交换机是一种网络设备,它可以在数据包从一个端口发送到另一个端口时确保数据包的安全性和可靠性。它通常用于安全传输敏感信息,如公司内部的机密文件或政府机构的敏感信息。以下是死信交换机的一些优缺点:
优点 | 说明 |
数据包安全性和可靠性 | 由于数据包不会直接从一个端口发送到另一个端口,攻击者无法窃取数据包并篡改其内容。这确保了数据包的安全性和可靠性。 |
数据包完整性验证 | 当数据包到达死信邮箱时,接收方可以确认接收到的数据包的完整性并进行相关操作。这有助于防止数据包在传输过程中被篡改或损坏。 |
防止拒绝服务攻击 | 死信交换机可以用于防止拒绝服务攻击。在这些攻击中,攻击者会向目标服务器发送大量垃圾流量,以使其崩溃或过载。使用死信交换机可以避免这种情况的发生,因为攻击者无法直接攻击目标服务器,而只能攻击死信交换机上的邮箱地址。 |
缺点 | 说明 |
对服务器和数据库性能要求高(要求高) | 由于死信交换机需要处理大量数据包和请求,因此对服务器和数据库的性能要求较高。如果服务器或数据库性能不足,可能会导致数据包处理延迟或丢失。 |
处理大量订单力不从心(效率低) | 对于需要处理大量订单的应用程序,死信交换机可能无法满足实时性的要求。由于数据包需要经过多个中间节点,可能会导致处理时间延迟。 |
配置和维护复杂(成本高) | 死信交换机需要正确配置和持续维护,以确保其正常工作并保持最佳性能。这可能需要较高的技术水平和资源投入。 |
总之,死信交换机在确保数据包的安全性和可靠性方面非常有用,但在使用过程中需要注意其性能和配置要求。在选择使用死信交换机时,需要根据实际需求和资源状况进行权衡和考虑。
当队列中的消息满足以下情况之一时,可以成为死信:
- 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置成了false。
- 消息是一个过期消息,超时无人消费。
- 要投递的队列消息满了,最新进来的消息变成死信。
不懂的小伙伴可以观看视频学习了解。
阿里一面——说说RabbitMQ死信队列、延时队列
我们基于上一期博客代码进行实战演示,我们重新创建两个队列用于模拟实现我们的功能,
// 创建两个队列 @Bean // 声明第一个队列 。 public Queue queueA(){ return new Queue("queueA"); } @Bean // 声明第二个队列 public Queue queueB(){ return new Queue("queueB"); }
对应上述的图解我们创建两个交换机,一个是普通交换机,一个是死信交换机。
// 创建两个交换机 一个是死信交换机 一个是普通交换机 @Bean public DirectExchange exchangeA(){ return new DirectExchange("Exchange-A"); } @Bean public DirectExchange exchangeB (){ return new DirectExchange("Exchange-B"); }
@Bean public Binding bindingA(){ return BindingBuilder .bind(queueA()) .to(exchangeA()) .with("A"); } @Bean public Binding bindingB(){ return BindingBuilder .bind(queueB()) .to(exchangeB()) .with("B"); }
我们现在两个交换机之间没有任何关系,因此我们对其进行操作使其关联起来。我们对queueA()方法进行修改。
@Bean // 声明第一个队列 。 public Queue queueA(){ Mapconfig = new HashMap<>(); //message在该队列queue的存活时间最大为10秒 config.put("x-message-ttl", 10000); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX) config.put("x-dead-letter-exchange", "ExchangeB"); //x-dead-letter-routing-key参数是给这个DLX指定路由键 config.put("x-dead-letter-routing-key", "BB"); return new Queue("queueA",true,false,false,config); }
在我们的控制台中编写对应的请求方法发送信息到我们的A队列中
@RequestMapping("/send7") public String send7() { // 向交换机发送消息 amqpTemplate.convertAndSend("ExchangeA","AA","25965616516646465"); return "木易Love馨月"; }
我们在对应的消费者这边创建一个接受类,专门接受QueueB的消息。
package com.yx.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "queueB") //接收的队列 public class ReceiverQB { @RabbitHandler public void process(String id) { log.warn("QB接收到:" + id); } }
我们还是先启动我们的生产者服务,并且在我们的网页上去访问我们编写的send7请求方法。在访问前我们的队列并没有生成。
当我们重新发送三次请求给我们的A队列,如下图所示
我们等待我们的队列A的过期时间重新刷新页面,刷新后的页面显示如下
· 消息从队列A跳转到队列B的原因是我们没有写队列A的消费者,使其消息在队列A中等待过期时间已过就会将消息提交到我们的队列B中,我们队列B进行接收队列A过期的消息。接下来我们运行启动我们的消费者服务。
我们将下述配置添加到我们消费者的配置文件中去
listener: simple: acknowledge-mode: manual
我们在消费者中编写一个消息接收类用于接收队列A中的消息。
package com.yx.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; @Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "queueA") //接收的队列 public class ReceiverQA { @RabbitHandler public void process(String id, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.warn("QA接收到:" + id); channel.basicAck(tag, true);//确认消息成功消费 } }
我们重启我们的消费者服务进行网页测试接口方法
我们现在将队列A的改为拒接消息让消息重新入列,并且设置延迟时间1000ms。true:接收;fasle:拒绝
我们重新访问请求方法,进行测试
我们再将true改为false重新启动服务测试。
接下来我们前往官网页面进行查看是发现我们的消息全部在我们的QB中,并且是未被处理的。
消息存在的原因因为我们死信队列的消费者中并没有进行对消息的确认。只要我们仿照QA进行编写确认即可。
我们再去官网查看一下就会发现我们的消息都处理了。
🎉🎉本期的博客分享到此结束🎉🎉
📚📚各位老铁慢慢消化📚📚
🎯🎯下期博客博主会带来新货🎯🎯
🎁三连加关注,阅读不迷路 !🎁