首先明确,这里的consumer不是一台消费者机器,而是rabbitMq的最小消费单位,一台机器可以开启多个消费者,一个消费者总是对应一个channel。
一个TCP 被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。
也就是rabbitMq采用一个TCP连接处理多个消费者的多线程请求,实际上就是多路复用。
simple模式每个消费者都有其私有的线程,可以增加消费者,也会自动增加消费线程,不管消费者是不是在处理消息,可能会造成资源线程的浪费。
看的出来direct的线程模型更简单,也因此压力集中在Connection线程池上,线程可以复用与多个消费者,但是如果采用这种模式,需要设置Connection线程池合适的参数。
看起来direct更好一些,那么如何选择
看官网的说法
2.0版本引入了DirectMessageListenerContainer(DMLC)。此前,仅SimpleMessageListenerContainer(SMLC) 可用。SMLC 对每个消费者使用一个内部队列和一个专用线程。如果容器配置为侦听多个队列,则使用同一个消费者线程来处理所有队列。并发控制由concurrentConsumers和其他属性。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给消费者线程。之所以需要这种架构,是因为在 RabbitMQ 客户端的早期版本中,多个并发传送是不可能的。较新版本的客户端具有修订的线程模型,现在可以支持并发。这允许引入 DMLC,现在可以直接在 RabbitMQ 客户端线程上调用侦听器。因此,它的架构实际上比 SMLC“更简单”。然而,这种方法存在一些局限性,并且 DMLC 不具备 SMLC 的某些功能。consumersPerQueue此外,并发性由(以及客户端库的线程池)控制。concurrentConsumers此容器不提供 和关联的属性。
以下功能适用于 SMLC,但不适用于 DMLC:
然而,与 SMLC 相比,DMLC 具有以下优点:
消费者数量配置
图中consumer数量可进行配置
listener: simple: concurrency: 50 maxConcurrency: 100
支持可伸缩的配置,根据前面的线程模型,我们知道simple配置的并发数实际上也是消费线程的数量。
direct: consumersPerQueue: 50 #每个队列消费者数量
根据前面的线程模型,使用direct模式需要设置合理的连接线程池,因为连接线程池还需要进行业务逻辑的处理,配置如下
@Bean(name = "connectionFactory") @Primary public ConnectionFactory connectionFactory( @Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setExecutor(createThreadPool(coreSize, maxSize, "mq-connection-", "mq-connection-group")); return connectionFactory; }
配置后,rabbitMq控制台有会体现出来
首先要明白批量消费的意义,消费者可以批处理,还可以批量确认。减少ack次数
listener: simple: batch-size: 10 acknowledge-mode: auto consumer-batch-enabled: true
配置批量消费
@Primary public SimpleRabbitListenerContainerFactory normalFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("connectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setBatchListener(true); configurer.configure(factory, connectionFactory); return factory; }
监听类
@RabbitListener(queues = "QUEUE_DEMO_DIRECT") public void ListenerQueue01(Listmessage, Channel channel) throws IOException, InterruptedException { logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message); }
这样在消息足够的情况下
List中就是10条消息。但是只需要确认最后一条消息就好了
原理
rabbitmq支持批量确认
channel.basicAck(deliveryTag,multiple)
在RabbitMQ中,channel.basicAck方法用于确认已经接收并处理了消息。该方法有两个参数:
2、 multiple:表示是否批量确认消息。当multiple为false时,只确认当前deliveryTag对应的消息;当multiple为true时,会确认当前deliveryTag及之前所有未确认的消息。
看过前面我们知道DirectMessageListenerContainer并不像SimpleMessageListenerContainer能够支持批量消息,但是其支持一个参数
messagesPerAck,也可以在处理多少个消息之后进行ack,减少ack次数。
首先要了解rabbitMq的channel.basicQos方法
/** * Request a specific prefetchCount "quality of service" settings * for this channel. ** Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1). * * @param prefetchCount maximum number of messages that the server * will deliver, 0 if unlimited * @param global true if the settings should be applied to the * entire channel rather than each consumer * @throws java.io.IOException if an error is encountered * @see #basicQos(int, int, boolean) */ void basicQos(int prefetchCount, boolean global) throws IOException;
pprefetchCount,服务器一次请求将传递的最大消息数,如果没有限制,则为0。调用此方法时,该值必填。默认值:0,简单说就是一个消费者的最大处理消息数,broker服务器发现一个消费者还有prefetchCount个消息未ack,那么就不会再给它发送消息。防止消息堆积
global,是否将设置应用于整个频道,而不是每个消费者
direct和simple配置字段相同都是prefetchCount
注意在simple模式下如果prefetchCount配置小于batchSize,那么prefetchCount就会被batchSize覆盖。
如果在direct模式下prefetchCount配置小于messagesPerAck,那么prefetchCount就会被messagesPerAck覆盖。
rabbitMq的消费端的重试机制指的是本地重试。
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初识的失败等待时长为1秒 multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
结论:
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现: