RabbitMQ是一个开源的消息代理软件,用于处理应用程序之间的消息传递。它实现了高级消息队列协议(AMQP),是一个可靠且灵活的消息中间件,广泛用于构建分布式系统、微服务架构以及异构系统之间的通信 ,本期文章将为大家带来RabbitMQ的基本使用及入门
RabbitMQ可以实现可靠的异步通信,解耦系统中的组件,以及处理高并发和大规模的消息流。它在许多领域,如微服务架构、日志处理、任务调度等方面都有广泛的应用
消息队列(Message Queue): RabbitMQ作为消息中间件,它允许应用程序之间通过消息进行通信。消息被发送到队列中,然后由消费者从队列中接收并处理。
生产者(Producer): 生产者是将消息发送到RabbitMQ的应用程序。它将消息发布到一个或多个队列。
消费者(Consumer): 消费者是从RabbitMQ接收消息并进行处理的应用程序。一个队列可以有多个消费者,但每条消息只会被一个消费者处理。
交换机(Exchange): 交换机负责将消息路由到一个或多个队列。生产者将消息发送到交换机,而交换机则将消息路由到相关的队列。
队列(Queue): 队列是存储消息的地方,它是消息的终点。消费者从队列中获取消息并进行处理。
绑定(Binding): 绑定是交换机和队列之间的关系,它定义了如何将消息从交换机路由到队列。
持久性(Durable): RabbitMQ支持将队列和消息标记为持久性,以确保在服务器重启时数据不会丢失。
虚拟主机(Virtual Host): 虚拟主机是RabbitMQ中消息隔离的单位,允许在同一RabbitMQ服务器上创建多个独立的消息代理。
ACK(Acknowledgment): 消费者在处理完消息后发送ACK,通知RabbitMQ该消息已被成功处理。这确保消息不会在处理失败时丢失。
插件系统: RabbitMQ具有丰富的插件系统,允许扩展和定制其功能
docker pull rabbitmq:management
命令详解:
在后台运行一个RabbitMQ容器,通过主机的5672端口可以访问RabbitMQ的AMQP服务,通过主机的15672端口可以访问RabbitMQ的管理界面。RabbitMQ的默认虚拟主机为"my_vhost",默认用户名和密码为"admin"。容器的主机名被设置为"my-rabbitmq-host"。容器将在守护进程启动时自动重新启动
在虚拟机中运行以下命令
docker run -d \ --name my-rabbitmq \ -p 5672:5672 -p 15672:15672 \ --hostname my-rabbitmq-host \ -e RABBITMQ_DEFAULT_VHOST=my_vhost \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --restart=always \ rabbitmq:management
开放端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --zone=public --add-port=5672/tcp --permanent
容器创建运行完成后我们可以在自己的主机上通过IP:15672进入到管理员界面,账号密码为我们上面命令指定的admin,登录即可
添加用户输入账户名和密码指定当前用户身份
刚创建完的用户还不能进行一个登录,需要给它分配一个能够操作的虚拟主机
Virtal Host是可以分配的虚拟主机,Set permission可以对其进行分配
分配完成后的用户就可以对当前虚拟主机进行登录等操作了
我们以生产者和消费者的形式创建SpringBoot项目
创建时需要注意,以SpringBoot项目为例在勾选依赖时,要将Spring for Rabbit勾选
生产者和消费者的配置文件都需要更改
注意:端口不能相同防止端口冲突,要将ip修改为自己的主机ip以及用户名和密码
server: port: 8888 spring: rabbitmq: host: Rabbit所在主机的ip username: YU password: 123456 port: 5672 virtual-host: my_vhost
通过@Bean注解,它定义了一个名为firstQueue的Bean,该Bean是一个Queue实例,代表了一个名为"firstQueue"的RabbitMQ队列。这样的配置允许应用程序使用该队列进行消息的生产和消费
package com.yu.publisher; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @SuppressWarnings("all") public class RabbitConfig { @Bean public Queue firstQueue() { return new Queue("firstQueue"); } }
我们通过导入Spring AMQP框架中用于发送消息的AmqpTemplate接口,调用amqpTemplate的convertAndSend方法向名为"firstQueue"的RabbitMQ队列发送消息,消息内容为"hello world",发送成功后返回一个字符串"🚀",作为HTTP响应体。
package com.yu.publisher; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired private AmqpTemplate amqpTemplate; @RequestMapping("send1") public String send1(){ //向消息队列发送消息 amqpTemplate.convertAndSend("firstQueue","hello world"); return "🚀"; } }
测试阶段:
启动生产者,在浏览器调用send1(Controller中的方法)进行访问测试
这时我们打开RabbitMQ控制面板中的消息队列就可以看到我们刚刚发送的消息了
我们通过注解@RabbitHandler指定该类监听名为"firstQueue"的RabbitMQ队列和@RabbitHandler注解标识该方法为处理RabbitMQ消息的方法,使用Lombok生成的日志记录器记录接收到的消息,将"接收到:"与消息内容拼接并记录为警告级别的日志,当队列"firstQueue"中有消息到达时,process方法会被调用,将接收到的消息记录到日志中
package com.yu.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "firstQueue") public class Receiver { @RabbitHandler public void process(String msg) { log.warn("接收到:" + msg); } }
启动消费者,这时,当我们再去访问生产者时,在日志信息中就会返回生产者所发送的消息
在RabbitMQ中发送对象通常需要进行对象的序列化和反序列化,以便将对象转换为消息体发送到队列,然后从队列接收消息并还原为对象
实现序列化接口
package com.yu.publisher; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; @SuppressWarnings("all") @Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private String username; private String userpwd; }
package com.yu.publisher; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @SuppressWarnings("all") public class RabbitConfig { @Bean public Queue secondQueue() { return new Queue("secondQueue"); } }
使用ObjectMapper将其序列化为JSON字符串,然后通过amqpTemplate.convertAndSend将这个JSON字符串发送到名为"secondQueue"的RabbitMQ队列
package com.yu.publisher; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired private AmqpTemplate amqpTemplate; @Autowired private ObjectMapper objectMapper; @RequestMapping("send2") public String send2() throws Exception{ User user = new User("YU","123"); String YU = objectMapper.writeValueAsString(user); //向消息队列发送消息 amqpTemplate.convertAndSend("secondQueue",YU); return "🚀"; } }
声明一个方法作为消息处理器,标记方法用于处理从RabbitMQ队列接收到的消息,处理消息的方法,该方法接收一个JSON字符串参数使用ObjectMapper将JSON字符串转换为User对象,使用Slf4j日志框架记录接收到的User对象,这里使用warn级别,接收并处理从"secondQueue"队列中接收到的消息
package com.yu.consumer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "secondQueue") public class PojoReceiver { @Autowired private ObjectMapper objectMapper; @RabbitHandler public void process(String json) throws Exception { User user = objectMapper.readValue(json, User.class); log.warn("接收到:" + user); } }
测试阶段:
当我们向消息队列中发送一个对象,消费者再进行接收