相关推荐recommended
【RabbitMQ】3 RabbitMQ使用及交换机
作者:mmseoamin日期:2024-01-21

目录

  • 代码示例
  • 交换机概述
  • 无名交换机
  • 绑定(binding)
  • 交换机的类型
    • Fanout
    • Direct
    • Topic
    • 官网地址:https://www.rabbitmq.com/getstarted.htm

      代码示例

      先来看下如何使用rabbitmq:

      使用 Java 编写两个程序,发送单个消息的生产者和接收消息并打印出来的消费者。

      在下图中,P 是生产者,C 是消费者,中间的框是一个队列(代表使用者保留的消息缓冲区)。

      【RabbitMQ】3 RabbitMQ使用及交换机,在这里插入图片描述,第1张

      生产者:

      import cn.hutool.core.map.MapUtil;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      import java.io.IOException;
      import java.nio.charset.StandardCharsets;
      import java.util.concurrent.TimeoutException;
      /**
       * 生产者
       */
      public class Producer {
          /**
           * 队列的名称
           */
          public static final String QUEUE_NAME = "hello";
          public static void main(String[] args) throws IOException, TimeoutException {
              // 创建连接工厂
              ConnectionFactory connectionFactory = new ConnectionFactory();
              // 设置连接 RabbitMQ 的信息
              connectionFactory.setHost("192.168.65.137");
              connectionFactory.setVirtualHost("/");
              connectionFactory.setPort(5672);
              connectionFactory.setUsername("guest");
              connectionFactory.setPassword("guest");
              // 创建连接
              Connection connection = connectionFactory.newConnection();
              // 创建信道
              Channel channel = connection.createChannel();
              // 声明创建队列
              /**
               * 第一个参数:队列的名称
               * 第二个参数:是否持久化【存储在磁盘上】,默认为 false ,表示存储在内存中。
               * 第三个参数:
               *      当 exclusive = true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列 仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 ;
               *      当 exclusive = false 则设置队列为非排他的,此时不同连接(Connection)的管道Channel可以使用该队列 ;
               * 第四个参数:是否自动删除。如果autoDelete = true,当所有消费者都与这个队列断开连接时,这个队列会自动删除。注意: 不是说该队列没有消费者连接时该队列就会自动删除,因为当生产者声明了该队列且没有消费者连接消费时,该队列是不会自动删除的。
               * 第五个参数:其他参数
               */
              channel.queueDeclare(QUEUE_NAME, true, false, false, MapUtil.newHashMap());
              // 发送消息
              String msg = "你好啊";
              channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
              System.out.println("消息发送完毕");
              // 关闭信道
              channel.close();
              // 关闭连接
              connection.close();
              // 关闭连接工厂
              connectionFactory.clone();
          }
      }
      

      消费者:

      import com.rabbitmq.client.*;
      import java.io.IOException;
      import java.nio.charset.StandardCharsets;
      import java.util.concurrent.TimeoutException;
      /**
       * 消费者
       */
      public class Consumer {
          /**
           * 队列的名称
           */
          public static final String QUEUE_NAME = "hello";
          public static void main(String[] args) throws IOException, TimeoutException {
              // 创建连接工厂
              ConnectionFactory connectionFactory = new ConnectionFactory();
              // 设置连接 RabbitMQ 的信息
              connectionFactory.setHost("192.168.65.137");
              connectionFactory.setVirtualHost("/");
              connectionFactory.setPort(5672);
              connectionFactory.setUsername("guest");
              connectionFactory.setPassword("guest");
              // 创建连接
              Connection connection = connectionFactory.newConnection();
              // 创建信道
              Channel channel = connection.createChannel();
              // 声明消费者成功消费的回调
              DeliverCallback deliverCallback = (consumerTag, message) -> {
                  System.out.println("consumerTag = " + consumerTag);
                  System.out.println("message = " + new String(message.getBody(), StandardCharsets.UTF_8));
              };
              // 声明消费者取消消费的回调
              CancelCallback cancelCallback = (consumerTag) -> {
                  System.out.println("consumerTag = " + consumerTag);
              };
              // 第一个参数:队列的名称
              // 第二个参数:是否自动确认
              // 第三个参数:消费者成功消费的回调
              // 第四个参数:消费者取消消费的回调
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
              // 关闭信道
              channel.close();
              // 关闭连接
              connection.close();
              // 关闭连接工厂
              connectionFactory.clone();
          }
      }
      

      交换机概述

      RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。一般而言,生产者甚至不知道这些消息传递到了那些队列中。相反,生产者只能将消息发送到交换机(exchange)。

      交换机的工作内容非常简单,一方面它接收来自生产者的消息,另一方面将消息推送到队列中。交换机必须确切的知道如何处理收到的消息,是将这些消息放到特定的队列、放到许多队列中或直接丢失它们,这是由交换机的类型决定的。

      交换机通过routing key 来实现。

      【RabbitMQ】3 RabbitMQ使用及交换机,在这里插入图片描述,第2张

      无名交换机

      当我们不指定交换机的名字,通过空字符串("")进行标识。依然能够将消息发送给队列,那是就是使用的默认的交换机,即无名交换机。

      channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
      

      第一个参数是交换机的名称。空字符串表示默认或无名交换机。

      【RabbitMQ】3 RabbitMQ使用及交换机,在这里插入图片描述,第3张

      绑定(binding)

      绑定就是交换机(exchange)和队列(queue)之间的桥梁,它告诉我们交换机和那个队列进行了绑定关系。

      交换机和队列的绑定关系是通过routing key来确认的。

      注意要区分消费者和队列的绑定。一般消费者和队列的绑定是binding key来确认的。

      交换机的类型

      RabbitMQ常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种。

      常用的有 fanout 、 direct 、 topic

      Fanout

      Fanout 这种类型非常简单,它将接收到所有消息并广播到它知道的所有队列中。系统中默认有 fanout 类型的交换机。

      示例:

      生产者:

      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.Channel;
      import java.nio.charset.StandardCharsets;
      public class Producer {
          /**
           * 交换机名称
           */
          public static final String EXCHANGE_NAME = "logs";
          public static void main(String[] args) throws Exception {
            	ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setVirtualHost("/");
              connectionFactory.setUsername("guest");
              connectionFactory.setPassword("guest");
              connectionFactory.setPort(5672);
              connectionFactory.setHost("127.0.0.1");
              Connection connection = connectionFactory.newConnection();
            	// 获取信道
              Channel channel =  connection.createChannel();
            
              // 声明交换机,类型为FANOUT
              channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
              // 发送消息,队列名字为空,使用默认的队列
              channel.basicPublish(EXCHANGE_NAME, "", null, "你好啊".getBytes(StandardCharsets.UTF_8));
              System.out.println("消息发送完毕");
          }
      }
      

      消费者:

      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.CancelCallback;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import java.nio.charset.StandardCharsets;
      public class Consumer1 {
          /**
           * 交换机名称
           */
          public static final String EXCHANGE_NAME = "logs";
          public static void main(String[] args) throws Exception {
              ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setVirtualHost("/");
              connectionFactory.setUsername("guest");
              connectionFactory.setPassword("guest");
              connectionFactory.setPort(5672);
              connectionFactory.setHost("127.0.0.1");
              Connection connection = connectionFactory.newConnection();
            	// 获取信道
              Channel channel =  connection.createChannel();
            
              // 声明交换机,注意类型为FANOUT
              channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
              // 声明一个临时队列
              String queueName = channel.queueDeclare().getQueue();
              // 绑定交换机和队列
              channel.queueBind(queueName, EXCHANGE_NAME, "");
              System.out.println("等待接收消息,将接收到的消息打印在屏幕上。。。");
              DeliverCallback deliverCallback = (consumerTag, message) -> {
                  System.out.println("消费者1 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
              };
              CancelCallback cancelCallback = (consumerTag) -> {
                  System.out.println("consumerTag = " + consumerTag);
              };
              channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
          }
      }
      

      可以再来一个消费者2

      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.CancelCallback;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import java.nio.charset.StandardCharsets;
      public class Consumer2 {
          /**
           * 交换机名称
           */
          public static final String EXCHANGE_NAME = "logs";
          public static void main(String[] args) throws Exception {
              ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setVirtualHost("/");
              connectionFactory.setUsername("guest");
              connectionFactory.setPassword("guest");
              connectionFactory.setPort(5672);
              connectionFactory.setHost("127.0.0.1");
              Connection connection = connectionFactory.newConnection();
            	// 获取信道
              Channel channel =  connection.createChannel();
            
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
              // 声明一个临时队列
              String queueName = channel.queueDeclare().getQueue();
              // 绑定交换机和队列
              channel.queueBind(queueName, EXCHANGE_NAME, "");
              System.out.println("等待接收消息,将接收到的消息打印在屏幕上。。。");
              DeliverCallback deliverCallback = (consumerTag, message) -> {
                  System.out.println("消费者2 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
              };
              CancelCallback cancelCallback = (consumerTag) -> {
                  System.out.println("consumerTag = " + consumerTag);
              };
              channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
          }
      }
      

      先启动消费者,再启动生产者,可以看到,生产者发送的消息,两个消费者都可以收到。

      Direct

      在 Fanout 类型的示例中,我们是将所有的消息广播给所有的消费者,但是有时我们希望这样,有的消费者消费 error 级别信息,有些消费者消费 warning 级别消息,有些消费者消费info 级别的日志信息,此时 Fanout 类型的交换机就不能满足这样的需求,就需要使用 direct 这种类型的交换机来实现这样的功能。

      我们将使用direct类型的交换器,路由算法也很简单 – 信息发到binding key和消息的routing key可以完全匹配的队列。

      【RabbitMQ】3 RabbitMQ使用及交换机,在这里插入图片描述,第4张

      如上图,我们可以看到,一个direct交换器x有两个队列绑定它,第一个队列通过叫orange的binding key绑定,第二个通过队列有两个bindings,一个叫black,另一个叫green。

      这样,routing key 为orange的消息就发送到Q1队列,routing key 为black和green的消息就发送到Q2队列,其他所有的消息都会被丢弃。

      一个binding key绑定多个队列也是允许的。这样的话,direct类型的交换器就会像fanout类型的一样,把消息发送给所有匹配的队列。如下图:

      【RabbitMQ】3 RabbitMQ使用及交换机,在这里插入图片描述,第5张

      示例:

      有如下的绑定关系

      【RabbitMQ】3 RabbitMQ使用及交换机,在这里插入图片描述,第6张

      消费者1:

      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import java.nio.charset.StandardCharsets;
      public class Consumer1 {
          /**
           * 交换机名称
           */
          public static final String EXCHANGE_NAME = "direct_logs";
          /**
           * 队列名称
           */
          public static final String QUEUE_NAME = "disk";
          public static void main(String[] args) throws Exception {
              ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setVirtualHost("/");
              connectionFactory.setUsername("guest");
              connectionFactory.setPassword("guest");
              connectionFactory.setPort(5672);
              connectionFactory.setHost("127.0.0.1");
              Connection connection = connectionFactory.newConnection();
            	// 获取信道
              Channel channel =  connection.createChannel();
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
              // 绑定交换机和队列
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
              // 消费
              DeliverCallback deliverCallback = (consumerTag, message) -> {
                  System.out.println("消费者1 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
                  System.out.println("消费者1 消费的交换机的名称是 = " + message.getEnvelope().getExchange());
                  System.out.println("消费者1 消费的 routing key 是 = " + message.getEnvelope().getRoutingKey());
              };
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
          }
      }
      

      消费者 2 :

      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.DeliverCallback;
      import java.nio.charset.StandardCharsets;
      public class Consumer2 {
          /**
           * 交换机名称
           */
          public static final String EXCHANGE_NAME = "direct_logs";
          /**
           * 队列名称
           */
          public static final String QUEUE_NAME = "console";
          public static void main(String[] args) throws Exception {
             ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setVirtualHost("/");
              connectionFactory.setUsername("guest");
              connectionFactory.setPassword("guest");
              connectionFactory.setPort(5672);
              connectionFactory.setHost("127.0.0.1");
              Connection connection = connectionFactory.newConnection();
            	// 获取信道
              Channel channel =  connection.createChannel();
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
              // 绑定交换机和队列
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
              // 消费
              DeliverCallback deliverCallback = (consumerTag, message) -> {
                  System.out.println("消费者2 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
                  System.out.println("消费者2 消费的交换机的名称是 = " + message.getEnvelope().getExchange());
                  System.out.println("消费者2 消费的 routing key 是 = " + message.getEnvelope().getRoutingKey());
              };
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
          }
      }
      

      生产者:

      import com.rabbitmq.client.Channel;
      import java.io.IOException;
      import java.nio.charset.StandardCharsets;
      import java.util.HashMap;
      import java.util.Map;
      public class Producer {
          public static final String EXCHANGE_NAME = "direct_logs";
          public static void main(String[] args) throws Exception {
              ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setVirtualHost("/");
              connectionFactory.setUsername("guest");
              connectionFactory.setPassword("guest");
              connectionFactory.setPort(5672);
              connectionFactory.setHost("127.0.0.1");
              Connection connection = connectionFactory.newConnection();
            	// 获取信道
              Channel channel =  connection.createChannel();
            
              Map map = new HashMap<>();
              map.put("info", "info 级别的日志信息");
              map.put("warning", "warning 级别的日志信息");
              map.put("error", "error 级别的日志信息");
              map.put("debug", "debug 级别的日志信息");
              map.forEach((k, v) -> {
                  try {
                      channel.basicPublish(EXCHANGE_NAME, k, null, v.getBytes(StandardCharsets.UTF_8));
                  } catch (IOException e) {
                      throw new RuntimeException(e);
                  }
              });
          }
      }
      

      Topic

      Topic类型和 Direct 相比,都是可以根据 routing key 将消息路由到不同的队列,只不过 Exchange 类型为 Topic 可以让队列在绑定 routing key 的时候使用通配符。

      发送到topic交换器的消息不能有随意的routing key ,必须是一个以逗号分割的词列表。单词可以是任何词,但是要能说明连接的消息的特征。一个有效的routing key 的例子:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。每个单词的最大长度为255字节。

      一般binding keys(交换器和队列的连接)也要相同的形式。topic交换器背后的逻辑和direct类似 - 发送一个具有特定routing key的消息会被传递到所有绑定了匹配的binding key的所有队列。binding key有两个非常重要点:

      • *(star)代表一个单词
      • #(hash)代表0个或多个单词

        【RabbitMQ】3 RabbitMQ使用及交换机,在这里插入图片描述,第7张

        在上面图中的例子中,我们打算发送描述动物的消息。消息有三个单词(两个逗号)组成的routing key。在routing key的第一个单词描述敏捷性,第二个描述颜色,第三个描述物种:“…”。

        我们将创建三个binding:Q1绑定*orange.*的binding key,Q2绑定*.*.rabbit和lazy.#。

        总结以上binding如下:

        • Q1接受所有颜色为orange的动物
        • Q2接受所有的rabbit和lazy类型的动物

          下面是一些绑定关系示例:

          routing key描述
          quick.orange.rabbitQ1 和 Q2 能接收到
          lazy.orange.elephantQ1 和 Q2 能接收到
          quick.orange.foxQ1 能接收到
          lazy.brown.foxQ2 能接收到
          lazy.pink.rabbitQ2 能接收到
          quick.brown.fox不匹配任何绑定,不会被任何队列接收到,会被丢弃
          quick.orange.male.rabbit是四个单词,不匹配任何绑定,会被丢弃
          lazy.orange.male.rabbit是四个单词,但匹配 Q2

          如果我们打破协议,发送一个routing key有四个单词的消息会怎么样,比如"quick.orange.male.rabbit"?其实,这种消息不匹配任何bindings而被丢弃。

          但是,另一方面,“lazy.orange.male.rabbit”,尽管有4个单词,却匹配最后一个binding,所以会发送到第二个队列。

          补充:

          • topic交换器非常强大,可以表现为其他的交换器。
          • 当队列和"#"binding key绑定 - 它就会接受所有消息,忽略routing key - 就像fanout交换器;
          • 当在binding中没有使用"*“和”#"两个字符时,就会表现为direct交换器类型。

            示例:

            消费者1:

            import com.rabbitmq.client.BuiltinExchangeType;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.DeliverCallback;
            import java.nio.charset.StandardCharsets;
            public class Consumer1 {
                /**
                 * 交换机名称
                 */
                public static final String EXCHANGE_NAME = "topic_logs";
                /**
                 * 队列名称
                 */
                public static final String QUEUE_NAME = "Q1";
                public static void main(String[] args) throws Exception {
                   ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
                    connectionFactory.setPort(5672);
                    connectionFactory.setHost("127.0.0.1");
                    Connection connection = connectionFactory.newConnection();
                  	// 获取信道
                    Channel channel =  connection.createChannel();
                  
                    // 声明交换机
                    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                    // 声明一个临时队列
                    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                    // 绑定交换机和队列
                    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
                    System.out.println("等待接收消息,将接收到的消息打印在屏幕上。。。");
                    // 消费
                    DeliverCallback deliverCallback = (consumerTag, message) -> {
                        System.out.println("消费者1 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
                        System.out.println("消费者1 消费的交换机的名称是 = " + message.getEnvelope().getExchange());
                        System.out.println("消费者1 消费的 routing key 是 = " + message.getEnvelope().getRoutingKey());
                    };
                    channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
                }
            }
            

            消费者2:

            package com.github;
            import com.rabbitmq.client.BuiltinExchangeType;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.DeliverCallback;
            import java.nio.charset.StandardCharsets;
            public class Consumer2 {
                /**
                 * 交换机名称
                 */
                public static final String EXCHANGE_NAME = "topic_logs";
                /**
                 * 队列名称
                 */
                public static final String QUEUE_NAME = "Q2";
                public static void main(String[] args) throws Exception {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
                    connectionFactory.setPort(5672);
                    connectionFactory.setHost("127.0.0.1");
                    Connection connection = connectionFactory.newConnection();
                  	// 获取信道
                    Channel channel =  connection.createChannel();
                  
                    // 声明交换机
                    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                    // 声明一个临时队列
                    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                    // 绑定交换机和队列
                    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
                    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
                    System.out.println("等待接收消息,将接收到的消息打印在屏幕上。。。");
                    // 消费
                    DeliverCallback deliverCallback = (consumerTag, message) -> {
                        System.out.println("消费者2 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
                        System.out.println("消费者2 消费的交换机的名称是 = " + message.getEnvelope().getExchange());
                        System.out.println("消费者2 消费的 routing key 是 = " + message.getEnvelope().getRoutingKey());
                    };
                    channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
                }
            }
            

            生产者:

            package com.github;
            import com.rabbitmq.client.Channel;
            import java.io.IOException;
            import java.nio.charset.StandardCharsets;
            import java.util.HashMap;
            import java.util.Map;
            public class Producer {
                public static final String EXCHANGE_NAME = "topic_logs";
                public static void main(String[] args) throws Exception {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
                    connectionFactory.setPort(5672);
                    connectionFactory.setHost("127.0.0.1");
                    Connection connection = connectionFactory.newConnection();
                  	// 获取信道
                    Channel channel =  connection.createChannel();
                    Map map = new HashMap<>();
                    map.put("quick.orange.rabbit", "Q1 和 Q2 能接收到");
                    map.put("lazy.orange.elephant", "Q1 和 Q2 能接收到");
                    map.put("quick.orange.fox", "Q1 能接收到");
                    map.put("lazy.brown.fox", "Q2 能接收到");
                    map.put("lazy.pink.rabbit", "Q2 能接收到");
                    map.put("quick.brown.fox", "不匹配任何绑定,不会被任何队列接收到,会被丢弃");
                    map.put("quick.orange.male.rabbit", "是四个单词,不匹配任何绑定,会被丢弃");
                    map.put("lazy.orange.male.rabbit", "是四个单词,但匹配 Q2");
                    map.forEach((k, v) -> {
                        try {
                            channel.basicPublish(EXCHANGE_NAME, k, null, v.getBytes(StandardCharsets.UTF_8));
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    });
                }
            }