单个确认发布是一种同步确认发布方式,也就是发布一个消息后只有它被确认发布,后续的消息才能继续发布。
缺点:发布速度特别慢,因为若是没有确认发布的消息会阻塞所有后续消息的发布
package com.hong.rabbitmq5; import com.hong.utils.RabbitMQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.UUID; /** * @Description: 发布确认-单个确认发布 * 单个确认发布是一种同步确认发布方式,也就是发布一个消息后只有它被确认发布,后续的消息才能继续发布。 * 缺点:发布速度特别慢,因为若是没有确认发布的消息会阻塞所有后续消息的发布 * @Author: hong * @Date: 2023-12-21 20:52 * @Version: 1.0 **/ public class Task5 { public static void main(String[] args) throws Exception{ String queueName = UUID.randomUUID().toString(); Channel channel = RabbitMQUtil.getChannel(); //队列持久化 true持久化 channel.queueDeclare(queueName,true,false,false,null); //开启发布确认 默认关闭 channel.confirmSelect(); long startTime = System.currentTimeMillis(); for(int i = 0; i <= 1000; i++){ String message = i + ""; //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8")); //发布就确认 boolean flag = channel.waitForConfirms(); if(flag){ System.out.println("第"+i+"个消息发布成功!"); } } long endTime = System.currentTimeMillis(); System.out.println("发布1000个消息单个确认发布总耗时:"+(endTime - startTime)+"ms"); } }
批量确认发布也是一种同步确认发布方式,一批确认一次,相比单个确认发布极大地提升了吞吐量
缺点:一旦出故障难以确认到底是哪个消息出问题了
package com.hong.rabbitmq5; import com.hong.utils.RabbitMQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.UUID; /** * @Description: 批量确认发布 * 批量确认发布也是一种同步确认发布方式,一批确认一次,相比单个确认发布极大地提升了吞吐量 * 缺点:一旦出故障难以确认到底是哪个消息出问题了 * @Author: hong * @Date: 2024-01-03 17:44 * @Version: 1.0 **/ public class BatchConfirmPublish { public static void main(String[] args) throws Exception { String queueName = UUID.randomUUID().toString(); Channel channel = RabbitMQUtil.getChannel(); //队列持久化 true持久化 channel.queueDeclare(queueName,true,false,false,null); //开启发布确认 默认关闭 channel.confirmSelect(); //100个确认一次 int batchSize = 100; long startTime = System.currentTimeMillis(); for(int i = 1; i <= 1000; i++){ String message = i + ""; //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8")); if(i%batchSize == 0){ //发布就确认 channel.waitForConfirms(); } } long endTime = System.currentTimeMillis(); System.out.println("发布1000个消息,批量确认发布总耗时:"+(endTime - startTime)+"ms"); } }
package com.hong.rabbitmq5; import com.hong.utils.RabbitMQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import com.rabbitmq.client.MessageProperties; import java.util.UUID; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; /** * @Description: 异步确认发布 * @Author: hong * @Date: 2024-01-06 20:41 * @Version: 1.0 **/ public class AsynConfirmPublish { public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtil.getChannel(); //队列的声明 String queueName = UUID.randomUUID().toString(); //队列持久化 true持久化 channel.queueDeclare(queueName,true,false,false,null); //开启发布确认 默认关闭 channel.confirmSelect(); ConcurrentSkipListMapconcurrentSkipListMap = new ConcurrentSkipListMap<>(); //消息确认回调的函数 ConfirmCallback ackCallback = (deliveryTag, multiple) ->{ if(multiple) { //2.删除掉已经确认的消息 剩下的就是未确认的消息 ConcurrentNavigableMap confirmed = concurrentSkipListMap.headMap(deliveryTag); confirmed.clear(); }else { concurrentSkipListMap.remove(deliveryTag); } System.out.println("确认的消息:" + deliveryTag); }; //消息确认失败回调函数 ConfirmCallback nackCallback= (deliveryTag,multiple) ->{ //3.打印一下未确认的消息都有哪些 String message = concurrentSkipListMap.get(deliveryTag); System.out.println("未确认的消息tag:" + deliveryTag + "-----------未确认的消息是:" + message); }; /* * 消息的监听器 异步通知 */ channel.addConfirmListener(ackCallback,nackCallback); long begin = System.currentTimeMillis(); //批量发送消息 for (int i = 1; i <= 1000; i++) { String message= "消息" + i; //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8")); //1.此处记录下所有要发送的消息 消息的总和 concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message); } //结束时间 long end = System.currentTimeMillis(); System.out.println("发布1000个消息,异步确认消息总耗时:"+(end-begin)+"ms"); } }