消息生产者
public class Send { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "这是一条消息!!!"; // 发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:" + message); } } }
消息消费者(会一直监听队列)
public class Recv { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv:" + message); }; // 自动确认消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
工作队列
生产者
public class Send { private static final String QUEUE_NAME = "work_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 批量发送10个消息 for (int i = 0; i < 10; i++) { String message = "这是一条消息!!!" + i; // 发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:" + message); } } } }
消费者1
public class Recv1 { private static final String QUEUE_NAME = "work_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { // 模拟消费者缓慢 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv1:" + message); // 手工确认消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 关闭自动确认消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
消费者2
public class Recv2 { private static final String QUEUE_NAME = "work_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { // 模拟消费者缓慢 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv2:" + message); // 手工确认消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 关闭自动确认消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
轮训策略验证
公平策略验证
什么是RabbitMQ的发布订阅模式
发布订阅模型应用场景
RabbitMQ发布订阅模型
发送端
public class Send { private static final String EXCHANGE_NAME = "fan_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 绑定交换机,广播类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "广播发送消息:这是一条消息!!!"; // 发送消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:" + message); } } }
消费端(两个节点)
public class Recv1 { private static final String EXCHANGE_NAME = "fan_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机,广播类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 获取队列(排它队列) String queueName = channel.queueDeclare().getQueue(); // 绑定队列和交换机 channel.queueBind(queueName, EXCHANGE_NAME, ""); // 回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv1:" + message); }; // 自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
什么是RabbitMQ的路由模式
交换机类型是direct
队列和交换机绑定,需要指定一个路由键(也叫binding key)
消息生产者发送消息给交换机,需要指定路由键
交换机根据消息的路由键,转发给对应的队列
消息生产者
public class Send { private static final String EXCHANGE_NAME = "direct_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 绑定交换机,直连类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String error = "我是错误日志"; String info = "我是info日志"; String warning = "我是warning日志"; // 发送消息 channel.basicPublish(EXCHANGE_NAME, "error", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "info", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "warning", null, warning.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:消息发送成功!"); } } }
消费者一(只接收错误消息)
public class Recv1 { private static final String EXCHANGE_NAME = "direct_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机,直连类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 获取队列 String queueName = channel.queueDeclare().getQueue(); // 绑定队列和交换机 channel.queueBind(queueName, EXCHANGE_NAME, "error"); // 回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv1:" + message); }; // 自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消费者二(接收全部消息)
public class Recv2 { private static final String EXCHANGE_NAME = "direct_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机,直连类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 获取队列 String queueName = channel.queueDeclare().getQueue(); // 绑定队列和交换机 channel.queueBind(queueName, EXCHANGE_NAME, "error"); channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); // 回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv2:" + message); }; // 自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
什么是RabbitMQ的主题模式
生产者
public class Send { private static final String EXCHANGE_NAME = "topic_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 绑定交换机,主题类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String error = "我是错误日志"; String info = "我是info日志"; String warning = "我是warning日志"; // 发送消息 channel.basicPublish(EXCHANGE_NAME, "error", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "info", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "warning", null, warning.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:消息发送成功!"); } } }
消费者一(只接收错误消息)
public class Recv1 { private static final String EXCHANGE_NAME = "topic_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机,主题类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 获取队列 String queueName = channel.queueDeclare().getQueue(); // 绑定队列和交换机 channel.queueBind(queueName, EXCHANGE_NAME, "error"); // 回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv1:" + message); }; // 自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消费者二(接收全部消息)
public class Recv2 { private static final String EXCHANGE_NAME = "topic_mq"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机,主题类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 获取队列 String queueName = channel.queueDeclare().getQueue(); // 绑定队列和交换机 channel.queueBind(queueName, EXCHANGE_NAME, "#"); // 回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv2:" + message); }; // 自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
简单模式
工作队列模式
发布订阅模式
路由模式
通配符模式