RabbitMQ的基本使用,进行实例案例的消息队列
作者:mmseoamin日期:2024-01-23

目录

一、介绍

1. 概述

2. 作用

3. 工作原理

二、RabbitMQ安装部署

1. 安装

2. 部署

3. 增加用户

三、实现案例

1. 项目创建

2. 项目配置

3. 生产者代码

4. 消费者代码

四、测试

每篇一获


一、介绍

1. 概述

RabbitMQ 是一种开源的消息代理和队列服务器,用于通过简单和可扩展的方式在分布式系统中传递消息。它实现了高级消息队列协议(AMQP)。

  • 服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信)
  • 消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送  给另一端,称为延迟消息通讯(异步通信)
  • 一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议
  • 消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.

以下是关于RabbitMQ的一些详细信息:

  1. 消息代理:RabbitMQ是一个消息代理,它接受并转发消息。你可以把它想象成一个邮局:当你把邮件放在邮筒里时,你可以确定邮差最终会把邮件送到你的收件人。在这个比喻中,RabbitMQ是邮筒、邮局和邮差。

  2. 可靠性:RabbitMQ支持消息持久化、传递确认、发布者确认和高可用性。

  3. 灵活的路由:RabbitMQ提供了多种消息路由模式,包括点对点、发布/订阅和路由模式。

  4. 集群:多个RabbitMQ服务器可以组成一个集群,形成一个高可用、负载均衡的系统。

  5. 多协议支持:RabbitMQ支持多种消息队列协议,包括AMQP、STOMP、MQTT等。

  6. 客户端支持:RabbitMQ为多种编程语言提供了客户端库,包括Java、.NET、Python、Ruby、PHP等。

  7. 管理界面:RabbitMQ提供了一个易于使用的用户界面,用于管理和监控你的RabbitMQ服务器。

  8. 跟踪:如果你需要查看消息传递的详细信息,RabbitMQ提供了消息跟踪功能。

  9. 插件机制:RabbitMQ支持通过插件扩展其核心功能。

2. 作用

RabbitMQ的主要作用和优势如下:

  1. 解耦:在系统设计中,组件之间的高度耦合是非常不利的。RabbitMQ作为消息队列中间件,可以有效地解耦系统,使得系统组件之间不直接通信,只通过消息队列来交换信息。

  2. 异步通信:RabbitMQ提供了异步处理的能力。当一个操作需要大量时间时,可以将该操作作为一个消息发送到队列,然后立即返回。这样,用户不需要等待这个操作完成,提高了系统的响应性能。

  3. 缓冲:RabbitMQ可以在处理高负载的情况下起到缓冲的作用。当消息的产生速度超过处理速度时,RabbitMQ可以暂存这些消息,等待处理程序准备好后再进行处理。

  4. 可靠性:RabbitMQ提供了消息持久化、传递确认、发布者确认等机制,确保消息不会丢失。

  5. 路由能力:RabbitMQ提供了灵活的消息路由能力,如点对点、发布/订阅等模式,满足不同的业务需求。

  6. 扩展性:RabbitMQ支持集群,可以通过增加更多的RabbitMQ节点来提高系统的处理能力。

  7. 跨平台和语言无关:RabbitMQ提供了多种语言的客户端,如Java、.NET、Python等,可以在不同的平台和语言之间进行通信。

  8. 监控:RabbitMQ提供了管理界面,可以方便地监控和管理消息队列的状态。

RabbitMQ作为一个消息队列中间件,可以帮助我们构建高效、可靠、可扩展的分布式系统。

3. 工作原理

RabbitMQ的基本使用,进行实例案例的消息队列,第1张

RabbitMQ的工作原理主要基于生产者-消费者模型和消息队列。以下是其基本的工作流程:

  1. 生产者:生产者是创建消息的应用程序。它创建消息并发送到RabbitMQ。

  2. 队列:队列是RabbitMQ的内部结构,用于存储消息。多个生产者可以发送消息到一个队列,多个消费者可以从一个队列中获取消息。

  3. 交换器:生产者发送消息到交换器(Exchange),然后交换器根据一定的规则(路由键)将消息路由到一个或多个队列。RabbitMQ提供了几种类型的交换器,如直接交换器、主题交换器、头交换器和扇出交换器。

  4. 消费者:消费者是接收消息的应用程序。消费者连接到RabbitMQ并订阅一个队列,当新的消息到达队列时,RabbitMQ会将消息推送给消费者,或者消费者可以主动从队列中拉取消息。

  5. 消息确认:当消费者处理完一个消息后,它需要向RabbitMQ发送一个确认,告诉RabbitMQ这个消息已经被处理,可以从队列中删除。如果消费者处理消息时发生错误,它可以发送一个拒绝,告诉RabbitMQ这个消息没有被正确处理。

  6. 持久化:为了防止消息丢失,RabbitMQ提供了消息持久化的功能。生产者在发送消息时可以设置消息为持久化,RabbitMQ会将这些消息存储到磁盘,即使RabbitMQ服务器重启,这些消息也不会丢失。

通过这种方式,RabbitMQ可以在分布式系统中实现消息的可靠传递。 

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言.

RabbitMQ的基本使用,进行实例案例的消息队列,第2张

  • Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
  • Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue.
  • Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
  • ExchangeType:交换机类型决定了路由消息行为,
  • RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
  • Message Queue:消息队列,用于存储还未被消费者消费的消息. Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内 容.
  • BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.

    二、RabbitMQ安装部署

    1. 安装

    在虚拟机中下载RabbitMQ的镜像

    命令:

    docker pull rabbitmq:management 

    2. 部署

    查看防火墙列表的端口是否开启

    5672(RabbitMQ的用户端口)和15672(RabbitMQ的管理员端口)

    命令:

    firewall-cmd --zone=public --list-ports

    开放端口5672:

    firewall-cmd --zone=public --add-port=5672/tcp --permanent 

    开放端口15672:

    firewall-cmd --zone=public --add-port=15672/tcp --permanent

    更新防火墙端口:

    firewall-cmd --reload

    创建并运行RabbitMQ的容器:

    docker run -d \
    --name my-rabbitmq \
    -p 5672:5672 -p 15672:15672 \
    --hostname my-rabbitmq-host \
    -e RABBITMQ_DEFAULT_VHOST=my_vhost \
    -e RABBITMQ_DEFAULT_USER=admin \
    -e RABBITMQ_DEFAULT_PASS=admin \
    --restart=always \
    rabbitmq:management 

    --hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)

    -e:指定环境变量:

    RABBITMQ_DEFAULT_VHOST:默认虚拟机名

    RABBITMQ_DEFAULT_USER:默认的用户名

    RABBITMQ_DEFAULT_PASS:默认用户名的密码

    最后是运用RabbitMQ镜像;

    RabbitMQ的基本使用,进行实例案例的消息队列,第3张

    3. 增加用户

    使用虚拟机IP和RabbitMQ的管理员端口登入后台管理:

    RabbitMQ的基本使用,进行实例案例的消息队列,第4张

    如图所示增加用户:

    RabbitMQ的基本使用,进行实例案例的消息队列,第5张

    点击创建的用户,在点击设置应用

    RabbitMQ的基本使用,进行实例案例的消息队列,第6张

    RabbitMQ的基本使用,进行实例案例的消息队列,第7张

    之后退出,登入创建的用户

    RabbitMQ的基本使用,进行实例案例的消息队列,第8张

    RabbitMQ的基本使用,进行实例案例的消息队列,第9张

    三、实现案例

    在实现案例的时候,虚拟机的RabbitMQ容器不用停止运行,虚拟机不用关闭。

    1. 项目创建

    打开我们的开放工具,创建项目,来实现生产者-消费者的消息队列:

    根据如图创建项目:

    父项目

    RabbitMQ的基本使用,进行实例案例的消息队列,第10张

    生产者(publisher)在父项目中

    RabbitMQ的基本使用,进行实例案例的消息队列,第11张

    RabbitMQ的基本使用,进行实例案例的消息队列,第12张

    创建消费者(consumer)在父项目中:

    RabbitMQ的基本使用,进行实例案例的消息队列,第13张

    RabbitMQ的基本使用,进行实例案例的消息队列,第14张

    2. 项目配置

    在生产者(publisher)项目中配置yml文件:

    server:
        port: 9949
    spring:
        rabbitmq:
            host: 192.***.***.***
            username: Jun
            password: 123456
            port: 5672
            virtual-host: my_vhost

    server.port:配置生产者的端口

    host:配置虚拟机的IP(这里需要根据自己的虚拟机IP进行填写)

    username:配置我们在RabbitMQ中创建的用户名称

    password:配置我们在RabbitMQ中创建的用户密码

    port:配置RabbitMQ的用户端口

    virtual-host: 配置默认虚拟机名(my_vhost)

    消费者(consumer)项目中配置yml文件:

    server:
        port: 8848
    spring:
        rabbitmq:
            host: 192.168.211.129
            username: Jun
            password: 123456
            port: 5672
            virtual-host: my_vhost
    

    server.port:配置消费者的端口

    host:配置虚拟机的IP(这里需要根据自己的虚拟机IP进行填写)

    username:配置我们在RabbitMQ中创建的用户名称

    password:配置我们在RabbitMQ中创建的用户密码

    port:配置RabbitMQ的用户端口

    virtual-host: 配置默认虚拟机名(my_vhost)

    3. 生产者代码

    在生产者中创建一个配置类,使用@Configuration注解的类表示这个类包含了一个或多个@Bean注解的方法,这些方法将会被Spring容器调用,其返回值将被添加到Spring的应用上下文中,作为一个bean供其他部分使用。

    这个配置类的名字是RabbitConfig(可以自己修改),它的主要作用是配置RabbitMQ的队列。

    RabbitConfig:

    package com.cloudjun.publisher;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    @SuppressWarnings("all")
    public class RabbitConfig {
        // 创建队列
        @Bean
        public Queue messageQueue() {
            return new Queue("messageQueue");
        }
        @Bean
        public Queue messageUser() {
            return new Queue("messageUser");
        }
    }

    创建实体对象来作为传输信息内容:

    User:

     

    package com.cloudjun.publisher;
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    import lombok.NoArgsConstructor;
    import lombok.Setter;
    import java.io.Serializable;
    @SuppressWarnings("all")
    @Getter
    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    public class User implements Serializable {
            private String username;
            private String userpwd;
    }

    创建一个控制器类,使用@RestController注解的类表示这个类是一个控制器,它可以处理HTTP请求。

    这个控制器类的名字是TestController,它的主要作用是处理HTTP请求,并通过RabbitMQ发送消息。

    TestController

    package com.cloudjun.publisher;
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    /**
     * @author CloudJun
     */
    @RestController
    public class TestController {
        @Autowired
        private AmqpTemplate template;
        @Autowired
        private ObjectMapper objectMapper;
        @RequestMapping("test01")
        public String test01(){
            // 发送消息到名为messageQueue的队列
            // 这里的messageQueue是RabbitMQ中定义的队列名称
            // 这里的"Hello World!"是发送的消息内容
            template.convertAndSend("messageQueue", "HelloWorld!");
            return "💖";
        }
        @RequestMapping("test02")
        public String test02() throws Exception {
            // 发送消息到名为messageQueue的队列
            // 这里的messageQueue是RabbitMQ中定义的队列名称
            User user = new User("Jun", "123456");
            // 序列化对象转换为JSON字符串
            String json = objectMapper.writeValueAsString(user);
            template.convertAndSend("messageUser", json);
            return "💖";
        }
    }
    

    类及代码说明:

    在这个类中,使用了@Autowired注解来自动注入AmqpTemplate和ObjectMapper对象。AmqpTemplate是Spring提供的一个操作RabbitMQ的工具,可以用来发送和接收消息。ObjectMapper是Jackson库提供的一个工具,可以用来将对象转换为JSON字符串,或者将JSON字符串转换为对象。

    1. test01方法发送了一个字符串"Hello World!"到名为messageQueue的队列。
    2. test02方法创建了一个User对象,然后使用ObjectMapper将这个对象转换为JSON字符串,然后发送这个JSON字符串到名为messageUser的队列。

    4. 消费者代码

    创建实体对象来作为接收生产者的信息内容:

    User:

    package com.cloudjun.consumer;
    import lombok.*;
    import java.io.Serializable;
    @SuppressWarnings("all")
    @Getter
    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    public class User implements Serializable {
            private String username;
            private String userpwd;
    }

    创建消息消费者类,使用@Component注解的类表示这个类是一个组件,它会被Spring管理。

    这个消息消费者类的名字是Receiver(名称可以直接修改),它的主要作用是接收并处理RabbitMQ的消息。

    Receiver:

    package com.cloudjun.consumer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    @Component
    @SuppressWarnings("all")
    @Slf4j
    @RabbitListener(queues = "messageQueue")
    public class Receiver {
            @RabbitHandler
            public void messageMsg(String msg) {
                log.warn("接收到:" + msg);
            }
    }
    

    类及代码说明:

    在这个类中,使用了@Slf4j注解来启用日志,使用了@RabbitListener注解来监听名为messageQueue的队列,这个队列是在前面的RabbitConfig配置类中定义的。

    这个类定义了一个名为process的方法,这个方法使用了@RabbitHandler注解,表示这个方法是处理消息的方法。当messageQueue队列中有新的消息时,这个方法会被调用,方法的参数msg就是接收到的消息。

    在process方法中,使用了log.warn来打印接收到的消息,这样我们就可以在日志中看到接收到的消息。

    总的来说,这个类的作用是接收并处理RabbitMQ的消息,并将接收到的消息打印在日志中。

    在创建一个消息消费者类,这个消息消费者类的名字是PojoReceiver,它的主要作用是接收并处理RabbitMQ的消息。

    PojoReceiver:

    package com.cloudjun.consumer;
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    @Component
    @SuppressWarnings("all")
    @Slf4j
    @RabbitListener(queues = "messageUser")
    public class PojoReceiver {
            @Autowired
            private ObjectMapper objectMapper;
            @RabbitHandler
            public void messageUser(String json) throws Exception {
                User user = objectMapper.readValue(json, User.class);
                // 处理user对象
                log.warn("接收到:" + user);
            }
    }
    

    类及代码说明:

    这个类定义了一个名为process的方法,这个方法使用了@RabbitHandler注解,表示这个方法是处理消息的方法。当messageUser队列中有新的消息时,这个方法会被调用,方法的参数json就是接收到的消息。

    不同于前面的Receiver类,这个类在接收到消息后,会使用ObjectMapper将消息从JSON字符串转换为User对象。这样,我们就可以在处理消息时,直接操作User对象,而不需要手动解析JSON字符串。

    在process方法中,使用了log.warn来打印接收到的User对象,这样我们就可以在日志中看到接收到的消息。

    总的来说,这个类的作用是接收并处理RabbitMQ的消息,并将接收到的消息从JSON字符串转换为User对象,然后将User对象打印在日志中。 

    四、测试

    启动两个项目,在浏览器中访问生产者的配置路径,并且在消费者中看看是否可以查看到,生产者转递过来的信息。

    方法一:

    RabbitMQ的基本使用,进行实例案例的消息队列,第15张

    RabbitMQ的基本使用,进行实例案例的消息队列,第16张

    方法二:

    RabbitMQ的基本使用,进行实例案例的消息队列,第17张

    RabbitMQ的基本使用,进行实例案例的消息队列,第18张

    每篇一获

    学习RabbitMQ的基本使用后,你可以从以下几个方面受益:

    1. 可靠性:通过使用RabbitMQ消息队列技术,可以确保消息的可靠性,即使在消息处理过程中出现故障,也可以确保消息不会丢失。

    2. 异步处理:使用RabbitMQ可以实现异步处理,将消息发送到队列中,然后再异步处理它们。这样可以加速应用程序的响应时间,提高系统的吞吐量。

    3. 解耦合:使用RabbitMQ可以实现应用程序之间的解耦合,例如一个应用程序可以发送消息到一个队列中,而另一个应用程序可以从该队列中接收并处理消息。这样可以降低应用程序之间的依赖性,提高系统的可维护性和可扩展性。

    4. 伸缩性:使用RabbitMQ可以轻松地水平扩展消息处理能力,通过添加更多的消费者来实现更高的吞吐量。

    5. 可视化管理:RabbitMQ提供了一个易于使用的Web管理界面,可以监控和管理RabbitMQ服务器,包括队列、交换机、绑定等等。

    总之,学习RabbitMQ可以帮助你更好地理解消息队列的概念和实现方式,并且可以应用到实际项目中,提高应用程序的可靠性、响应时间和可维护性。