1、对接多个节点上的MQ(如master-MQ,slave-MQ),若读者需要自己模拟出两个MQ,可以部署多个VM然后参考 docker 安装rabbitmq_Steven-Russell的博客-CSDN博客
2、队列名称不是固定的,需要接受外部参数,并且通过模板进行格式化,才能够得到队列名称
3、需要在master-MQ上延迟一段时间,然后将消息再转发给slave-MQ
1、采用springboot的自动注入bean需要事先知道队列的名称,但是队列名称是动态的情况下,无法实现自动注入
2、mq弱依赖,在没有master-mq或者slave-mq时,不能影响到现有能力
1、由于mq的队列创建、exchange创建以及队列和exchange的绑定关系是可重入的,所以采用connectFactory进行手动声明
2、增加自定义条件OnMqCondition,防止不必要的bean创建
参考 搭建最简单的SpringBoot项目_Steven-Russell的博客-CSDN博客
org.springframework.boot spring-boot-starter-amqp
commons-io commons-io2.11.0 org.projectlombok lombok1.18.28 provided com.alibaba.fastjson2 fastjson22.0.40
在application.yml中增加如下配置
mq: master: addresses: 192.168.30.128:5672 username: guest password: guest vhost: / slave: addresses: 192.168.30.131:5672 username: guest password: guest vhost: /
package com.wd.config.condition; import org.springframework.context.annotation.Conditional; import java.lang.annotation.*; @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Conditional(OnMqCondition.class) public @interface MqConditional { String[] keys(); }
package com.wd.config.condition; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.lang.NonNull; import org.springframework.util.ObjectUtils; import java.util.Map; public class OnMqCondition implements Condition { @Override public boolean matches(@NonNull ConditionContext context, @NonNull AnnotatedTypeMetadata metadata) { MapannotationAttributes = metadata.getAnnotationAttributes(MqConditional.class.getName()); if (annotationAttributes == null || annotationAttributes.isEmpty()) { // 为空则不进行校验了 return true; } String[] keys = (String[])annotationAttributes.get("keys"); for (String key : keys) { String property = context.getEnvironment().getProperty(key); if (ObjectUtils.isEmpty(property)) { return false; } } return true; } }
package com.wd.config; import com.wd.config.condition.MqConditional; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @Configuration public class MqConnectionFactory { @Value("${mq.master.addresses}") private String masterAddresses; @Value("${mq.master.username}") private String masterUsername; @Value("${mq.master.password}") private String masterPassword; @Value("${mq.master.vhost}") private String masterVhost; @Value("${mq.slave.addresses}") private String slaveAddresses; @Value("${mq.slave.username}") private String slaveUsername; @Value("${mq.slave.password}") private String slavePassword; @Value("${mq.slave.vhost}") private String slaveVhost; @Bean @Primary @MqConditional(keys = {"mq.master.addresses", "mq.master.vhost", "mq.master.username", "mq.master.password"}) public ConnectionFactory masterConnectionFactory() { return doCreateConnectionFactory(masterAddresses, masterUsername, masterPassword, masterVhost); } @Bean @MqConditional(keys = {"mq.slave.addresses", "mq.slave.vhost", "mq.slave.username", "mq.slave.password"}) public ConnectionFactory slaveConnectionFactory() { return doCreateConnectionFactory(slaveAddresses, slaveUsername, slavePassword, slaveVhost); } private ConnectionFactory doCreateConnectionFactory(String addresses, String username, String password, String vhost) { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses(addresses); cachingConnectionFactory.setUsername(username); cachingConnectionFactory.setPassword(password); cachingConnectionFactory.setVirtualHost(vhost); return cachingConnectionFactory; } }
package com.wd.config; public enum DeclareQueueExchange { EXCHANGE("exchange"), DEAD_EXCHANGE("deadExchange"), DELAY_EXCHANGE("delayExchange"); private final String exchangeName; DeclareQueueExchange(String exchangeName) { this.exchangeName = exchangeName; } public String getExchangeName() { return exchangeName; } }
package com.wd.config; public enum DeclareQueueName { DELAY_QUEUE_NAME_SUFFIX("_delay"), DEAD_QUEUE_NAME_SUFFIX("_dead"), QUEUE_NAME_TEMPLATE("wd.simple.queue.{0}"); private final String queueName; DeclareQueueName(String queueName) { this.queueName = queueName; } public String getQueueName() { return queueName; } }
package com.wd.controller.vo; import com.wd.pojo.Phone; import lombok.Data; @Data public class DelayMsgVo { private String queueId; private Phone phone; }
package com.wd.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Date; import java.util.List; @Data @AllArgsConstructor @NoArgsConstructor public class Phone implements Serializable { private static final long serialVersionUID = -1L; private String id; private String name; private Date createTime; private ListuserList; }
package com.wd.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Date; @Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private static final long serialVersionUID = -1L; private String username; private Date create; }
package com.wd.config; import java.util.ArrayList; import java.util.List; public interface QueueIdListConfig { /** * 先用本地缓存维护队列id */ ListQUEUE_ID_LIST = new ArrayList () {{ add(111); add(222); add(333); }}; }
注意:此处就以web用户输入为入口,所以创建controller
package com.wd.controller; import com.alibaba.fastjson2.JSONObject; import com.rabbitmq.client.*; import com.wd.config.DeclareQueueExchange; import com.wd.config.DeclareQueueName; import com.wd.controller.vo.DelayMsgVo; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.web.bind.annotation.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; @RestController @ConditionalOnBean(value = ConnectionFactory.class, name = "masterConnectionFactory") public class DynamicCreateQueueController { private final ConnectionFactory masterConnectionFactory; public DynamicCreateQueueController(@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory) { this.masterConnectionFactory = masterConnectionFactory; } @PostMapping(value = "sendDelayMsg") public String sendMsg2DelayQueue(@RequestBody DelayMsgVo delayMsgVo) throws IOException, TimeoutException { doSendMsg2DelayQueue(delayMsgVo); return "success"; } private void doSendMsg2DelayQueue(DelayMsgVo delayMsgVo) throws IOException, TimeoutException { // 根据id 动态生成队列名称 String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName(); String queueName = MessageFormat.format(queueNameTemplate, delayMsgVo.getQueueId()); String delayQueueName = queueName + DeclareQueueName.DELAY_QUEUE_NAME_SUFFIX.getQueueName(); String deadQueueName = queueName + DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName(); // 注意:下述声明交换机和队列的操作是可以重入的,MQ并不会报错 try (Connection connection = masterConnectionFactory.createConnection(); Channel channel = connection.createChannel(false)){ // 声明死信交换机 channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT); // 声明死信队列 AMQP.Queue.DeclareOk deadQueueDeclareOk = channel.queueDeclare(deadQueueName, true, false, false, null); // 定时任务 绑定消费者,避免出现多个消费者以及重启后无法消费存量消息的问题 // 注意:因为需要保证消费顺序,所以此处仅声明一个消费者 // 死信队列和交换机绑定 channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName); // 声明延迟队列 Mapargs = new HashMap<>(); //设置延迟队列绑定的死信交换机 args.put("x-dead-letter-exchange", DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName()); //设置延迟队列绑定的死信路由键 args.put("x-dead-letter-routing-key", deadQueueName); //设置延迟队列的 TTL 消息存活时间 args.put("x-message-ttl", 10 * 1000); channel.queueDeclare(delayQueueName, true, false, false, args); channel.exchangeDeclare(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT); channel.queueBind(delayQueueName, DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName); // 发送消息到延迟队列 channel.basicPublish(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName, null, JSONObject.toJSONString(delayMsgVo.getPhone()).getBytes(StandardCharsets.UTF_8)); } } }
package com.wd.mq.consumer; import com.rabbitmq.client.*; import com.wd.config.DeclareQueueExchange; import com.wd.config.DeclareQueueName; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 死信消费者,消费消息转发给targetConnectionFactory对应的目标MQ */ public class MasterDeadQueueConsumer extends DefaultConsumer { private final ConnectionFactory targetConnectionFactory; public MasterDeadQueueConsumer(Channel channel, ConnectionFactory targetConnectionFactory) { super(channel); this.targetConnectionFactory = targetConnectionFactory; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 从死信队列的名称中截取队列名称,作为后续队列的名称 String routingKey = envelope.getRoutingKey(); String targetQueueName = routingKey.substring(0, routingKey.length() - DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName().length()); try (Connection targetConnection = targetConnectionFactory.createConnection(); Channel targetChannel = targetConnection.createChannel(false)){ // 声明交换机和队列 targetChannel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT); targetChannel.queueDeclare(targetQueueName, true, false, false, null); targetChannel.queueBind(targetQueueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName); // 转发消息 targetChannel.basicPublish(DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName, properties, body); } catch (TimeoutException e) { e.printStackTrace(); // 注意此处获取的源队列的channel getChannel().basicNack(envelope.getDeliveryTag(), false, true); } // 注意此处获取的源队列的channel getChannel().basicAck(envelope.getDeliveryTag(), false); } }
package com.wd.mq.consumer; import com.alibaba.fastjson2.JSONObject; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.wd.pojo.Phone; import java.io.IOException; public class SlaveQueueConsumer extends DefaultConsumer { public SlaveQueueConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Phone phone = JSONObject.parseObject(new String(body), Phone.class); System.out.println("SlaveQueueConsumer consume ==> " + phone); getChannel().basicAck(envelope.getDeliveryTag(), false); } }
注意:因为采用的是死信队列的方式实现的延迟效果,此处只需要消费对应的死信队列即可
package com.wd.mq.quartz; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.wd.config.DeclareQueueExchange; import com.wd.config.DeclareQueueName; import com.wd.config.QueueIdListConfig; import com.wd.mq.consumer.MasterDeadQueueConsumer; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; import java.io.IOException; import java.text.MessageFormat; import java.util.concurrent.TimeoutException; @Configuration @ConditionalOnBean(value = ConnectionFactory.class, name = {"slaveConnectionFactory", "masterConnectionFactory"}) public class MasterDeadQueueSubscribeProcessor { private final ConnectionFactory masterConnectionFactory; private final ConnectionFactory slaveConnectionFactory; public MasterDeadQueueSubscribeProcessor(@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory, @Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory) { this.masterConnectionFactory = masterConnectionFactory; this.slaveConnectionFactory = slaveConnectionFactory; } /** * 消费死信队列信息,并且转发到其他mq */ @Scheduled(fixedDelay = 10 * 1000) public void subscribeMasterDeadQueue() throws IOException, TimeoutException { // 根据id 动态生成队列名称 // 此处的queueIdList可以从第三方缓存查询得到,并且和sendDelayMsg接口保持同步刷新,此处先用本地缓存代替,id同步刷新机制不是重点,此处暂不讨论 for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) { String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName(); String deadQueueName = MessageFormat.format(queueNameTemplate, id) + DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName(); try (Connection connection = masterConnectionFactory.createConnection(); Channel channel = connection.createChannel(false)){ AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(deadQueueName, true, false, false, null); if (queueDeclare.getConsumerCount() == 0) { channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT); } channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName); channel.basicConsume(deadQueueName, false, new MasterDeadQueueConsumer(channel, slaveConnectionFactory)); } } } }
package com.wd.mq.quartz; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.wd.config.DeclareQueueExchange; import com.wd.config.DeclareQueueName; import com.wd.config.QueueIdListConfig; import com.wd.mq.consumer.SlaveQueueConsumer; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; import java.io.IOException; import java.text.MessageFormat; import java.util.concurrent.TimeoutException; @Configuration @ConditionalOnBean(value = ConnectionFactory.class, name = "slaveConnectionFactory") public class SlaveQueueSubscribeProcessor { private final ConnectionFactory slaveConnectionFactory; public SlaveQueueSubscribeProcessor(@Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory) { this.slaveConnectionFactory = slaveConnectionFactory; } /** * 消费队列信息 */ @Scheduled(fixedDelay = 10 * 1000) public void subscribeSlaveDeadQueue() throws IOException, TimeoutException { // 根据id 动态生成队列名称 // 此处的queueIdList可以从第三方缓存查询得到,并且和sendDelayMsg接口保持同步刷新,此处先用本地缓存代替 for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) { String queueNameTemplate = DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName(); String queueName = MessageFormat.format(queueNameTemplate, id); try (Connection connection = slaveConnectionFactory.createConnection(); Channel channel = connection.createChannel(false)){ AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(queueName, true, false, false, null); if (queueDeclare.getConsumerCount() == 0) { channel.basicConsume(queueName, false, new SlaveQueueConsumer(channel)); } channel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT); channel.queueBind(queueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), queueName); } } } }
发现SlaveQueueConsumer打印如下日志:
消息传递流程如下,验证通过