相关推荐recommended
RabbitMQ发布与订阅模式类型
作者:mmseoamin日期:2024-02-20

🍁博客主页:👉不会压弯的小飞侠

✨欢迎关注:👉点赞👍收藏⭐留言✒

✨系列专栏:👉Linux专栏

🔥欢迎大佬指正,一起学习!一起加油!

RabbitMQ发布与订阅模式类型,在这里插入图片描述,第1张

目录

  • 🍁模式说明
  • 🍁发布与订阅模式完成消息传递
  • 🍁总结

    🍁模式说明

    • 工作队列背后的假设是每个任务都是 只交付给一名工人。在这一部分中,我们将做一些事情 完全不同的 - 我们将向多个传递消息 消费者。此模式称为“发布/订阅”。

      RabbitMQ发布与订阅模式类型,在这里插入图片描述,第2张

    • 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

      🍁发布与订阅模式完成消息传递

      • 编写生产者发送消息
        • 编写消息生产者 Producter
          public class Producer {
              public static void main(String[] args) throws Exception {
                  Connection connection = ConnectionUtil.getConnection();
                  Channel channel = connection.createChannel();
                  /*
                 exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
                 参数:
                  1. exchange:交换机名称
                  2. type:交换机类型
                      DIRECT("direct"),:定向
                      FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                      TOPIC("topic"),通配符的方式
                      HEADERS("headers");参数匹配
                  3. durable:是否持久化
                  4. autoDelete:自动删除
                  5. internal:内部使用。 一般false
                  6. arguments:参数
                  */
                  String exchangeName = "test_fanout";
                  //5. 创建交换机
                  channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
                  //6. 创建队列
                  String queue1Name = "test_fanout_queue1";
                  String queue2Name = "test_fanout_queue2";
                  channel.queueDeclare(queue1Name,true,false,false,null);
                  channel.queueDeclare(queue2Name,true,false,false,null);
                  //7. 绑定队列和交换机
                  /*
                  queueBind(String queue, String exchange, String routingKey)
                  参数:
                      1. queue:队列名称
                      2. exchange:交换机名称
                      3. routingKey:路由键,绑定规则
                          如果交换机的类型为fanout ,routingKey设置为""
                   */
                  channel.queueBind(queue1Name,exchangeName,"");
                  channel.queueBind(queue2Name,exchangeName,"");
                  String body = "此消息被交换机发到两个队列中!!!";
                  //8. 发送消息
                  channel.basicPublish(exchangeName,"",null,body.getBytes());
                  //9. 释放资源
                  channel.close();
                  connection.close();
              }
          }
          
          • 测试发布者

            RabbitMQ发布与订阅模式类型,在这里插入图片描述,第3张

            • 编写消费者接收消息
              • 编写消息消费者Consumer1
                public class Consumer1 {
                    public static void main(String[] args) throws Exception {
                        Connection connection = ConnectionUtil.getConnection();
                        Channel channel = connection.createChannel();
                        String queue1Name = "test_fanout_queue1";
                        Consumer consumer = new DefaultConsumer(channel){
                            @Override
                            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                System.out.println("body:"+new String(body));
                            }
                        };
                        channel.basicConsume(queue1Name,true,consumer);
                    }
                }
                
                • 编写消费者接收消息
                  • 编写消息消费者Consumer2
                    public class Consumer2 {
                        public static void main(String[] args) throws Exception {
                            Connection connection = ConnectionUtil.getConnection();
                            Channel channel = connection.createChannel();
                            String queue2Name = "test_fanout_queue2";
                            Consumer consumer = new DefaultConsumer(channel){
                                @Override
                                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                    System.out.println("body:"+new String(body));
                                }
                            };
                            channel.basicConsume(queue2Name,true,consumer);
                        }
                    }
                    
                    • 测试
                    • 启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。

                      RabbitMQ发布与订阅模式类型,在这里插入图片描述,第4张

                      RabbitMQ发布与订阅模式类型,在这里插入图片描述,第5张

                      🍁总结

                      • 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
                      • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
                      • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
                      • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机