整合RabbitMQ实现消息异步发送
作者:mmseoamin日期:2024-04-27

消息队列中间件

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削峰等问题。

中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC 的调用等等。

使用较多的消息队列有 ActiveMQ(安全),RabbitMQ,ZeroMQ,Kafka(大数据),MetaMQ,RocketMQ。

AMQP高级消息队列协议

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ

RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

RabbitMQ的优势

可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。

灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。

消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。

高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。

多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。

管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。

跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。

插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。

RabbitMQ安装(docker)

查询RabbitMQ镜像

docker search rabbitmq

 
  1. [root@localhost ~]# docker search rabbitmq
  2. NAME DESCRIPTION STARS OFFICIAL AUTOMATED
  3. rabbitmq RabbitMQ is an open source multi-protocol me… 4798 [OK]
  4. bitnami/rabbitmq Bitnami Docker Image for RabbitMQ 100 [OK]
  5. bitnami/rabbitmq-exporter 2
  6. circleci/rabbitmq This image is for internal use 0
  7. circleci/rabbitmq-delayed https://github.com/circleci/rabbitmq-delayed… 1

整合RabbitMQ实现消息异步发送,image-20230617101359800,第1张

拉取镜像

docker pull rabbitmq

 
  1. [root@localhost ~]# docker pull rabbitmq
  2. Using default tag: latest
  3. latest: Pulling from library/rabbitmq
  4. 7b1a6ab2e44d: Pull complete
  5. 37f453d83d8f: Pull complete
  6. e64e769bc4fd: Pull complete
  7. c288a913222f: Pull complete
  8. 12addf9c8bf9: Pull complete
  9. eaeb088e057d: Pull complete
  10. b63d48599313: Pull complete
  11. 05c99d3d2a57: Pull complete
  12. 43665bfbc3f9: Pull complete
  13. Digest: sha256:884146137011519524d506a12687127f3d2c7c37c2cc11206dc72c59bedea5e2
  14. Status: Downloaded newer image for rabbitmq:latest
  15. docker.io/library/rabbitmq:latest

创建运行rabbitmq容器

docker run -it —name=rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 rabbitmq

 
  1. docker run -it \
  2. --name rabbitmq \
  3. -v /etc/localtime:/etc/localtime \
  4. -v /usr/local/software/mq/data:/var/lib/rabbitmq \
  5. -e RABBITMQ_DEFAULT_USER=admin \
  6. -e RABBITMQ_DEFAULT_PASS=123 \
  7. --network wn_docker_net \
  8. --ip 172.18.12.20 \
  9. -p 15672:15672 \
  10. -p 5672:5672 \
  11. -d rabbitmq

整合RabbitMQ实现消息异步发送,image-20230722104404686,第2张

 

查看运行日志

 
  1. [root@localhost ~]# docker logs rabbitmq
  2. 2023-06-17 02:35:10.091757+00:00 [info] <0.222.0> Feature flags: list of feature flags found:
  3. 2023-06-17 02:35:10.107600+00:00 [info] <0.222.0> Feature flags: [ ] implicit_default_bindings
  4. 2023-06-17 02:35:10.107642+00:00 [info] <0.222.0> Feature flags: [ ] maintenance_mode_status
  5. 2023-06-17 02:35:10.107661+00:00 [info] <0.222.0> Feature flags: [ ] quorum_queue
  6. 2023-06-17 02:35:10.107675+00:00 [info] <0.222.0> Feature flags: [ ] stream_queue
  7. 2023-06-17 02:35:10.107749+00:00 [info] <0.222.0> Feature flags: [ ] user_limits
  8. 2023-06-17 02:35:10.107764+00:00 [info] <0.222.0> Feature flags: [ ] virtual_host_metadata
  9. 2023-06-17 02:35:10.107778+00:00 [info] <0.222.0> Feature flags: feature flag states written to disk: yes
  10. :
  11. :
  12. ## ## RabbitMQ 3.9.11
  13. ## ##
  14. ########## Copyright (c) 2007-2021 VMware, Inc. or its affiliates.
  15. ###### ##
  16. ########## Licensed under the MPL 2.0. Website: https://rabbitmq.com
  17. Erlang: 24.2 [jit]
  18. TLS Library: OpenSSL - OpenSSL 1.1.1m 14 Dec 2021
  19. Doc guides: https://rabbitmq.com/documentation.html
  20. Support: https://rabbitmq.com/contact.html
  21. Tutorials: https://rabbitmq.com/getstarted.html
  22. Monitoring: https://rabbitmq.com/monitoring.html

启动管理插件

进入rabbitmq容器

 
  1. [root@localhost ~]# docker exec -it rabbitmq bash

启动插件

 
  1. root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_management
  2. Enabling plugins on node rabbit@6d2342d51b11:
  3. rabbitmq_management
  4. The following plugins have been configured:
  5. rabbitmq_management
  6. rabbitmq_management_agent
  7. rabbitmq_prometheus
  8. rabbitmq_web_dispatch
  9. Applying plugin configuration to rabbit@6d2342d51b11...
  10. The following plugins have been enabled:
  11. rabbitmq_management
  12. started 1 plugins.

浏览器测试

在浏览器中输入 linux的ip地址:15672

整合RabbitMQ实现消息异步发送,image-20230617104552831,第3张

RabbitMQ hello

引入依赖

 
  1. org.springframework.boot
  2. spring-boot-starter-amqp

配置

 
  1. spring:
  2. rabbitmq:
  3. host: 192.168.20.65
  4. username: admin
  5. password: 123
  6. port: 5672

生产者

发送数据

java对象

使用序列化

 
  1. rabbitTemplate.convertAndSend(
  2. "hello_queue",
  3. bookType
  4. );

整合RabbitMQ实现消息异步发送,image-20240119151600306,第4张

json

 
  1. @Configuration
  2. public class RabbitmqConfig {
  3. @Bean
  4. public MessageConverter messageConverter(){
  5. return new Jackson2JsonMessageConverter();
  6. }
  7. @Bean
  8. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
  9. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  10. rabbitTemplate.setConnectionFactory(connectionFactory);
  11. rabbitTemplate.setMessageConverter(messageConverter());
  12. return rabbitTemplate;
  13. }
  14. }

整合RabbitMQ实现消息异步发送,image-20240119152219328,第5张