目录
一:初始RabbitMQ
1. 同步和异步通讯
1.1 同步调用
1.2 异步调用
2. MQ常见框架
二:RabbitMQ快速入门
1. RabbitMQ概述和安装
3. 快速入门案例
三:SpringAMQP
1. Basic Queue 简单队列模型
2. Work Queue 工作队列模型
3. 发布订阅模型-Fanout 发布
4. 发布订阅模型-Direct 发布
5. 发布订阅模型-Topic 发布
6. 消息转换器
前些天突然发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,感兴趣的同学可以进行学习人工智能学习
同步通讯和异步通讯理解
生活中就有很多同步和异步的案例,例1:假如你现在与一个妹子聊天,采用同步通信更像是打视频电话,就像直播一样,所得到的信息都能立刻同步过去,具有一定的优势;而异步通信更像是微信聊天,别人不想理你也不知道,时效性不是那么好,但也有自己的优点。例2:假如你现在在和三个妹子聊天,同步通信只能一个妹子聊,就会错失很多良机;异步通信可以多个妹子一块聊,还不会被发现;所以那么牛的技术我们当然要好好学习!
案例:前面学习的微服务间基于Feign的调用就属于同步方式,就存在以下问题:
耦合度高:每次加入新的需求,都要修改原来的代码
对于一个订单业务,我们支付成功后就需要更改订单服务修改订单状态,然后进行发货;支付服务调用订单服务还是存储服务都需要等待对方的响应,是实时的调用。此时一个完整的系统开发好了,如果产品经理需要增加一个短信通知服务等功能,此时就需要在支付服务里增加代码;每次增加一个业务,代码就需要更改,具有很强的耦合性!
性能下降(吞吐量):调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
假如现在调用支付服务需要50ms,支付服务调用其它服务都需要150ms,支付服务调用每个服务都是同步调用,所以只能进行等待当前调用完成才可以调用其它的服务;所以一个完整的服务调用下来就需要500ms,这相当于1s中只能处理请求。数以十万百万的请求过来根本顶不住,性能下降、吞吐量也下降了!
资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
在支付服务等待订单服务的过程,CPU和内存都在占用着啥都不干,只有某个服务调用完成才会执行下一个,在等待的过程中浪费大量资源,资源利用的不够充分!
级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障
假如现在存储服务挂了,此时支付服务进行访问,就会一直进入阻塞状态,这个请求就不会被释放,后面阻塞的越来越多,等待资源耗尽,支付服务就进不去了,相当于支付服务也挂了;所以造成整个服务就瘫痪了!
总结同步调用:
优点:时效性强,可以立即得到结果。
缺点:耦合度高、性能和吞吐能力下降、有额外的资源消耗、有级联失联问题。
异步调用常见实现就是事件驱动模式!
在支付服务与其它服务之间引入一个Broker(事件代理者)。一旦有人支付成功就是一个事件,这个事件交给Broker来管理;而订单、仓储等服务就会找Broker这个老大哥,一旦有人支付成功你要通知我们(订阅事件);所以一旦有人支付成功,Broker就会发布支付成功事件(这里通知完就会返回给用户,不会等待其它服务响应完)去通知其它服务有人支付成功了,此时其它服务就会去修改订单状态!
优势一:服务解耦
原来增加业务需要更改业务的代码,现在就不用了;因为现在支付服务不负责调用,只负责发送一个事件到Broker,至于是谁接收?什么时间接收?有没有完成?完全不用管。所以一旦有新的业务只需要订阅新的Broker事件即可(到时候直接大喇嘛一喊,就能通知到你)!注:这样将来增加或删除业务就不需要更改代码,只需要订阅或取消订阅事件即可。
优势二:性能提升,吞吐量提升
以前的耗时是总耗时加在一起50+150*3=500ms,现在只要支付成功,支付服务就向Broker发布事件,立刻就能返还给用户支付成功50+10=60ms。而Broker通知其它服务,什么时候去完成?多久去完成?完全不用管。
优势三:服务没有强依赖,不担心级联失败问题,没有资源浪费
支付服务相当于借用Broker去通知而不是调用,此时仓储服务挂了,也和我没关系,只需要重启仓储服务即可。既然没有强的依赖关系,我不调用你,也不需要等待你,所以就没有了资源浪费。
优势四:流量削峰
假设现在有多个用户发出请求,此时Broker就起到一个缓冲的作用,把请求都放到让订单服务、仓储等服务按照自己的能力去处理业务,处理完再去Broker取,现在此时的压力是Broker扛着。
总结异步调用:
优点:耦合度低,性能和吞吐量提升,故障隔离,没有资源消耗,没有级联失联问题,流量消峰。
缺点:依赖于Broker的可靠性、安全性、吞吐能力,架构复杂了,业务没有明显的流程线,不好追踪管理。
MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Brokeri
MQ常见的四种实现:RabbitMQ、ActiveMQ、RocketMQ、Kafka
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
RabbitMQ概述
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ
RabbitMQ安装
单机部署:基于Centos7虚拟机中使用Docker来安装!
第一步:下载镜像
①在线拉取
docker pull rabbitmq:3-management
②从本地加载,使用本地已经安装的镜像包
上传到虚拟机目录后(例如tmp目录),使用命令加载镜像即可:
docker load -i mq.tar
第二步:安装MQ
执行下面的命令来运行MQ容器:
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ # -e设置环境变量:用户名和密码 -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ # --hostname配置主机名,集群部署需要配置这个 -p 15672:15672 \ # 管理平台的端口 -p 5672:5672 \ # 消息通信的端口 -d \ rabbitmq:3-management
第三步:查看状态
docker ps
成功启动
第四步:登录管理品台页面
注:如果出现第二天登录不上的情况,请重启docker,service docker restart
192.168.#.#:15672 # 前面是虚拟机IP,后面是端口
输入设置的账户密码
需要注意的是:每个用户都需要有自己独享的虚拟主机
RabbitMQ的结构和概念
Publisher是消息的发送者,Consumer是消息的消费者。发送者将来会把消息发送到exchange(交换机),交换机会把消息路由到queue(队列),队列负责暂存消息;而后消费者从队列中获取消息,然后处理消息!
注:每创建一个用户都对应一个VirtualHost(虚拟主机),各个虚拟主机之间是相互隔离的,看不到,这样可以避免干扰。
总结RabbitMQ中的几个概念:
①channel:操作MQ的工具;
②exchange:路由消息到队列中 ;
③queue:缓存消息 ;
④virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组、隔离;
MQ的官方文档中给出了7个MQ的Demo示例,其中与消息发送和接收有关系的就是前5个:
①其中前2个命名为基本消息队列(BasicQueue)和工作消息队列(WorkQueue),这两种有一个共同的特征:消息的发送和接收都是基于队列来完成的(没有通过交换机),其中P代表发送者、C代表消费者、中间的红色部分代表消息队列。
②后3个都属于发布订阅(Publish、Subscribe),只是交换机类型不同分为三种:Fanout Exchange(广播)、Direct Exchange(路由)、Topic Exchange(主题),其中紫色的部分就代表交换机。
HelloWorld案例---》基本消息队列入门
注:mq-demo是父工程用来做依赖管理,consumer和publisher是两个子工程
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
publisher:消息发布者,将消息发送到队列;
queue:消息队列,负责接受并缓存消息;
consumer:订阅队列,处理队列中的消息;
注:其中queue是由MQ进行管理的,所以我们只需要写publisher和consumer这两部分代码
mq-demo父工程
pom.xml
4.0.0 cn.itcast.demo mq-demo1.0-SNAPSHOT publisher consumer pom org.springframework.boot spring-boot-starter-parent2.3.9.RELEASE 8 8 org.projectlombok lombokorg.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-test
publisher消息的发送者
其中启动类和yml文件是SpringBoot工程必备的,没什么好说的,最主要的是测试类
①首先要先创建连接,需要连接工厂ConnectioFactory;
②根据连接工厂,去设置连接的信息:连接的地址、端口号、虚拟主机、用户名、密码;
③前面连接工厂和参数都准备好了,然后就是调用连接工厂ConnectionFactory的newConnection方法,正式建立连接connection;
④正式建立连接后,就需要调用connection的createChannel建立通道channnel,这样生产者和消费者才能完成消息的发送和接收;
⑤通道有了就可以基于通道向队列queue中发送消息了,首先是声明了队列的名称,然后调用通道的queueDeclare方法向队列中发送消息;
⑥有了队列,生产者就可以向队列中发送消息了,把准备的消息发送到队列当中,以字节的形式发送出去。
⑦最后在关闭通道和连接。
注:无论是声明队列还是向队列中发送消息实际上使用的都是通道channel
package cn.itcast.mq.helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.150.101"); factory.setPort(5672); // 5672是通信的端口,15672是管理的接口 factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
1. 正式建立连接connection,管理界面就会有连接的信息
2. 连接正式建立后,就会创建通道channel供消息的发送和接收使用
3. 根据通道向队列queue发送消息
4. 消息发送到队列后,就关闭通道和连接(发完就不管了,解除了耦合)
①控制台
② 管理的页面queue,都表示消息已经成功发出去
Consumer消息的接收者
①消费者就需要从队列中接收消息,所以也会有创建连接工厂、准备参数、创建通道等操作,这些代码不变;
②值得注意的是在这里我们又创建了队列,这是为什么呢?这是因为我们生产者和消费者的启动顺序是不同的,万一消费者先启动找队列找不到怎办?为了避免这种情况的发生都声明了对列。并且如果这个对列已经创建过了不会再次创建;
③下面实际上就相当于回调函数,调用basicConsume方法,表示消费一条消息,那么去干什么呢,什么行为?这里就采用了匿名内部类对象DefaultConsumer(默认的消费者),重写了handleDelivery方法(处理投递的消息),把处理的行为挂载到队列queueName当中;一旦消息队列中有了消息,这个回调函数就会执行。
package cn.itcast.mq.helloworld; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.2.129"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); // 发的时候是字节,接的时候也必须是字节,这里在转换为字符串 System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
此时的执行结果
此时先打印的是 ”等待接收消息。。。。” ,实际上这就是回调机制,前面的代码只是让回调函数和队列进行绑定,此时的消息还没过来,代码会继续执行,一直到MQ把消息投递过来才会打印。这也再次证明了是异步的!
一旦消息被消费,队列中的就会被删除!
前面我们使用官方的API实现了简单的MQ程序,但是发现程序非常的麻烦;接下来就学习一下SpringAMQP,大大简化了消息的发送和接收。
什么是SpringAMQP
SpringAmqp的官方地址:Spring AMQP,是应用间消息通信的一种协议,与语言平台无关。
AMQP:在学习SpringAMQP之前需要先了解一下AMQP,Advanced Message Queuing Protocol(高级消息队列协议),是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
①用于异步处理入站消息的监听器容器;
②用于发送和接收消息的 RabbitTemplate;
③RabbitAdmin 实现自动化的声明队列、交换和绑定,自动创建队列;
接下来就是用SpringAMQP实现消息队列的五种类型!
案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能
第一步:在父工程中引入spring-amqp的依赖
org.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-test
第二步:在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
①在publisher服务中编写application.yml,添加mq连接信息
以配置的方式制定建立连接的一些信息。
spring: rabbitmq: host: 192.168.2.129 # IP地址 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码
②在publisher服务中新建一个测试类,编写测试方法:
直接使用RabbitTemplate工具类发送信息即可。
注:springamqd不会帮你创建队列,只能存在已有的队列中,所以要自己提前在浏览器的控制页面上创建这个对列!
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { // 注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // 调用工具类的方法 @Test public void testSendMessageSimpleQueue(){ // 第一个参数队列的名称 String queueName = "simple2.queue"; // 第二个参数消息 String message = "hello SpringAMQP!"; rabbitTemplate.convertAndSend(queueName,message); } }
成功发送
第三步:在consumer服务中编写消费逻辑,绑定simple.queue这个队列,进行监听
①在consumer服务中编写application.yml,添加mq连接信息:
spring: rabbitmq: host: 192.168.2.129 port: 5672 virtual-host: / username: itcast password: 123321
②在consumer服务中新建一个类,添加@Component注解,类中声明方法添加@RabbitListener注解,编写消费逻辑:
package cn.itcast.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component // 纳入Spring管理 public class SpringRabbitListener { // 声明监听那个队列 @RabbitListener(queues = "simple2.queue") // 行为,封装成方法 public void listenSimpleQueueMessage(String msg){ // Spring会把消息传递过来给msg参数 System.out.println("消费者接收到的消息是:"+msg); } }
运行主函数,启动上面的Bean
前面已经学习了简单队列的发送和接收,一旦有人拿到消息,就会从队列中删除,其它消费者根本拿不到。那如果有多个消息怎么办呢?就可以基于上述的特性让多个消费者合作处理。接下来就学习一下Work queue(工作队列)可以提高消息处理速度,避免队列消息堆积。
案例:模拟WorkQueue,实现一个队列绑定多个消费者
第一步:在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { // 注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // 发送消息 @Test public void testSendMessageWorkQueue(){ String queueName = "simple2.queue"; String message = "hello--->"; // 利用for循环发送50条消息 for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName,message+i); // 休眠20毫秒 try { Thread.sleep(20); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
第二步:在consumer服务中定义两个消息监听者,都监听simple.queue队列
注:消费者1每秒处理50条消息,消费者2每秒处理10条消息。
package cn.itcast.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @Component // 纳入Spring管理 public class SpringRabbitListener { // 消费者1 @RabbitListener(queues = "simple2.queue") // 行为,封装成方法 public void listenWorkQueue1Message(String msg){ System.out.println("消费者1接收到的消息是:"+msg+ LocalDateTime.now()); // 每秒处理50条消息 try { Thread.sleep(20); } catch (InterruptedException e) { throw new RuntimeException(e); } } // 消费者2 @RabbitListener(queues = "simple2.queue") // 行为,封装成方法 public void listenWorkQueue2Message(String msg){ System.out.println("消费者2接收到的消息是---》"+msg+LocalDateTime.now()); // 每秒处理10条消息 try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
执行结果:
理论上1秒处理完,实际上却是2秒才处理完,并没有做到能者多劳,消费者1实际上在1秒内很快就处理完消息,而消费者2因为能力不够却需要2秒。实际上这是因为MQ的预取机制,才开始就优先从队列中拿过来,并没有考虑到消费能力如何!
第三步:消费预取限制
修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限!
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码 listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
执行结果:能者多劳,可以在1秒内完成
发布订阅模式
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者,实现方式是加入了exchange(交换机)。到底是发给谁?这是由交换机的类型决定的:
①Fanout:广播;
②Direct:路由;
③Topic:话题;
注意:exchange负责消息路由,而不是存储,路由失败则消息丢失;消息的存储是由队列完成的
发布订阅-Fanout Exchange
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue!
案例:利用SpringAMQP演示FanoutExchange的使用
第一步:在consumer服务中,声明队列(Queue)、交换机(Exchange),并将两者绑定(Binding)
①SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:
②在consumer服务创建一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:
package cn.itcast.mq.config; import com.rabbitmq.client.impl.AMQImpl; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { // 声明交换机fanout.exchange @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanout.exchange"); } // 声明队列1 fanout.queue1 @Bean public Queue queue1(){ return new Queue("fanout.queue1"); } @Bean // 声明队列2 fanout.queue2 public Queue queue2(){ return new Queue("fanout.queue2"); } // 进行绑定 @Bean public Binding bindingQueue1(FanoutExchange fanoutExchange,Queue queue1){ return BindingBuilder.bind(queue1).to(fanoutExchange); } @Bean public Binding bindingQueue2(FanoutExchange fanoutExchange,Queue queue2){ return BindingBuilder.bind(queue2).to(fanoutExchange); } }
成功声明交换机
成功声明队列
绑定成功
第二步:在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
package cn.itcast.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.LocalTime; @Component // 纳入Spring管理 public class SpringRabbitListener { // 声明监听那个队列 @RabbitListener(queues = "fanout.queue1") // 行为,封装成方法 public void listenFanoutQueueMessage1(String msg){ // Spring会把消息传递过来给msg参数 System.out.println("消费者1接收到的消息是:"+msg); } @RabbitListener(queues = "fanout.queue2") // 行为,封装成方法 public void listenFanoutQueueMessage2(String msg){ // Spring会把消息传递过来给msg参数 System.out.println("消费者2接收到的消息是:"+msg); } }
第三步:在publisher中编写测试方法,向交换机itcast.fanout发送消息
注:以前是发送到queue,现在是发送到exchange,注意区别!
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { // 注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // 向交换机发送消息 @Test public void testSendFanoutExchange(){ // 交换机 String exchangeName = "fanout.exchange"; // 信息 String message = "Hello eyeryone"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName,"",message); // 中间参数routingKey后面会讲,这里先设置为空 } }
执行结果:
发布订阅-DirectExchange
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式。
①每一个Queue都与Exchange设置一个BindingKey;相当于暗号密码!
②发布者发送消息到Exchange时,也要指定一个消息的RoutingKey;与上面的BindingKey对上就发给谁!
③Exchange将消息路由到BindingKey与消息RoutingKey一致的队列;并且一个队列能绑定多个key;如果两个队列的BindingKey都能与RountingKey对上就都会发送(就相当于广播)!
声明单个key
声明多个key
案例:利用SpringAMQP演示DirectExchange的使用
第一步:在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
注:前面使用Bean方式声明一个个类,发现太麻烦了,所以这里就学习一下使用利用@RabbitListener注解声明Exchange、Queue、RoutingKey。
package cn.itcast.mq.listener; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.LocalTime; @Component // 纳入Spring管理 public class SpringRabbitListener { // DirectExchange,使用注解的形式绑定 @RabbitListener(bindings = @QueueBinding( value = @Queue("direct.queue1"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void LitenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:["+msg+"]"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"red","yellow"} )) public void ListenDirectQueue2(String msg){ System.out.println("消费者接收到direct.queue2的消息:["+msg+"]"); } }
声明后启动!查看控制页面,成功绑定
第二步:在publisher中编写测试方法,向itcast. direct发送消息
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { // 注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendDirectExchange(){ String exchangeName = "itcast.direct"; String message = "hello blue"; rabbitTemplate.convertAndSend(exchangeName,"blue",message); } }
此时RoutingKey为blue,只有direct.quque1能接收到:
如果此时RoutingKey为red
@Test public void testSendDirectExchange(){ String exchangeName = "itcast.direct"; String message = "hello red"; rabbitTemplate.convertAndSend(exchangeName,"red",message); }
则direct.quque1和direct.queue2都能接收到:
总结:所以相对于Fanout Exchange,Direct Exchange更加的灵活,可以通过key这个标记把消息传递给某一个或者所有,Fanout Exchange可以看做是Direct Exchange的一种特殊存在。
发布订阅-TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以点 “ .” 分割。 例如:china.news 代表中国的新闻消息; japan.news 则代表日本新闻。
Queue与Exchange指定BindingKey时可以使用通配符:
①#:代指0个或多个单词;
②*:代指一个单词;
案例:利用SpringAMQP演示TopicExchange的使用
第一步:利用@RabbitListener声明Exchange、Queue、RoutingKey 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
// topic Exchange @RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue1"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "china.#" )) public void ListenTopicQueue1(String msg){ System.out.println("消费者接收到topic.queue1的消息:["+msg+"]"); } @RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue2"), exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC), key = "#.news" )) public void ListenTopicQueue2(String msg){ System.out.println("消费者接收到topic.queue2的消息:["+msg+"]"); }
成功声明与绑定
第二步:在publisher中编写测试方法,向itcast. topic发送消息
@Test public void testSendTopicExchange(){ String exchangeName = "itcast.topic"; String message = "It's a nice day "; rabbitTemplate.convertAndSend(exchangeName,"china.weath",message); }
此时是topic.queue1接收到消息
总结:Topic Exchange和Direct Exchange的本质相同,Topic Exchange可以指定通配符的方式来表达BindingKey,相对于Direct Exchange灵活度又变高了。
说明:在SpringAMQP的发送方法中,接收消息的类型实际上是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
案例:测试发送Object类型消息
在consumer中利用@Bean声明一个队列:
package cn.itcast.mq.config; import com.rabbitmq.client.impl.AMQImpl; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { // 声明一个队列 @Bean public Queue objectQueue(){ return new Queue("object.queue"); } }
发送一个Map集合到object.queue队列
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { // 注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendObjectQueue(){ // 准备一个Map集合 Mapmsg = new HashMap<>(); msg.put("name", "Jack"); msg.put("age", 21); // 发送 rabbitTemplate.convertAndSend("object.queue",msg); } }
执行结果:最终的结果是通过JDK序列化转换成字节发送的
注:JDK序列化性能比较差、安全性比较差容易出现注入的情况、数据长度太长了占用额外的内存空间。
消息转换器
①Spring的对消息对象的处理是由org.springframework.amqp.support.converter.
MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化!
②如果要修改只需要定义一个MessageConverter 类型的Bean即可,推荐用JSON方式序列化!步骤如下:
第一步:在父工程中引入jackson依赖
com.fasterxml.jackson.core jackson-databind
第二步:在publisher启动类声明MessageConverter,覆盖掉原来的配置
package cn.itcast.mq; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class PublisherApplication { public static void main(String[] args) { SpringApplication.run(PublisherApplication.class); } // 覆盖原理的序列化方式 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
执行结果:成功转换成Json格式
第三步:在consumer服务中MessageConverter并监听object.queue队列并消费消息
启动类声明MessageConverter,覆盖掉原来的配置
package cn.itcast.mq; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } // 覆盖原理的序列化方式 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
消费object.queue队列的消息
@RabbitListener(queues = "object.queue") public void ListenOjectQueue(Mapmsg){ System.out.println("消费者接收到object.queue的消息:["+msg+"]"); }
成功消费:
上一篇:mysql 根据时间范围查询数据