RabbitMQ的Topic模式是一种基于主题的消息传递模式。它允许发送者向一个特定的主题(topic)发布消息,同时,订阅者也可以针对自己感兴趣的主题进行订阅。
在Topic模式中, 主题通过一个由单词和点号组成的字符串来描述。例如,“#.china”表示匹配所有以“china”为结尾的主题,比如“bj.china”或“china”等。( ‘ # ’ 和 ‘ * ’ 会再后面介绍)
当一个消息被发布到Topic交换机(Exchange)时,交换机会将消息转发给所有与该主题匹配的队列。消费者(即订阅者)可以对队列进行绑定,通过指定自己感兴趣的主题进行绑定。
通过使用Topic模式,我们可以实现高度灵活的信息交换模式,同时,确保只有感兴趣的消费者才会收到消息,提高了系统的效率和可靠性。



package com.hong.rabbitmq8;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
 * @Description: Topic模式生产者
 * @Author: hong
 * @Date: 2024-01-16 20:09
 * @Version: 1.0
 **/
public class TopicSend {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        Map bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
        for (Map.Entry bindingKeyEntry : bindingKeyMap.entrySet()) {
            String bindingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送完成------" +bindingKey+ message);
        }
    }
}
   
package com.hong.rabbitmq8;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
/**
 * @Description: Topic模式接受者1-接收*.orange.*
 * @Author: hong
 * @Date: 2024-01-16 20:07
 * @Version: 1.0
 **/
public class TopicReceiver1 {
    public static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = "Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        DeliverCallback deliverCallback = (comsumerTag, message) -> {
            System.out.println("接收队列:" + queueName + ",routingKey:" + message.getEnvelope().getRoutingKey() + ",消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        CancelCallback cancelCallback = var -> {
        };
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
    }
}
 
package com.hong.rabbitmq8;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
/**
 * @Description: Topic模式接受者1-接收*.*.rabbit和lazy.#
 * @Author: hong
 * @Date: 2024-01-16 20:07
 * @Version: 1.0
 **/
public class TopicReceiver2 {
    public static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = "Q2";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
        DeliverCallback deliverCallback = (comsumerTag, message) -> {
            System.out.println("接收队列:" + queueName + ",routingKey:" + message.getEnvelope().getRoutingKey() + ",消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        CancelCallback cancelCallback = var -> {
        };
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
    }
}
 



| 例子 | 说明 | 
|---|---|
| quick.orange.rabbit | 被队列 Q1Q2 接收到 | 
| lazy.orange.elephant | 被队列 Q1Q2 接收到 | 
| quick.orange.fox | 被队列 Q1 接收到 | 
| lazy.brown.fox | 被队列 Q2 接收到 | 
| lazy.pink.rabbit | 虽然满足两个绑定但只被队列 Q2 接收一次 | 
| quick.brown.fox | 不匹配任何绑定不会被任何队列接收到会被丢弃 | 
| quick.orange.male.rabbit | 是四个单词不匹配任何绑定会被丢弃 | 
| lazy.orange.male.rabbit | 是四个单词但匹配 Q2 | 
上一篇:算法面试题:合并两个有序链表