好文推荐:
2.5万字详解23种设计模式
基于Netty搭建websocket集群实现服务器消息推送
2.5万字讲解DDD领域驱动设计
延时队列:是一种消息队列,可以用于在指定时间或经过一定时间后执行某种操作。
小编已经做好了Kafka延时队列的封装,以后只需要一行代码就可以实现kafka延时队列了,代码中有详细注释,完整代码已经给大家整理好了,领取方式放在了文章末。
1,订单超时自动取消:用户下单后,如果在指定时间内未完成支付,系统会自动取消订单,释放库存。
2,定时推送:比如消息通知,用户预约某个服务,系统会在服务开始前一定时间发送提醒短信。
3,定时任务:将需要定时执行的任务放入延时队列中,等到指定的时间到达时再进行执行,例如生成报表、统计数据等操作。
4,限时抢购:将限时抢购的结束时间放入延时队列中,当时间到达时自动下架商品。
…
1.1 优点: ①Redis的延迟队列是基于Redis的sorted set实现的,性能较高。 ②Redis的延迟队列可以通过TTL设置过期时间,灵活性较高。 ③简单易用,适用于小型系统。 ④性能较高,支持高并发。 1.2 缺点: ①可靠性相对较低,可能会丢失消息,就算redis最高级别的持久化也是有可能丢一条的,每次请求都做aof,但是aof是异步的,所以不保证这一条操作能被持久化。 ②而且Redis持久化的特性也导致其在数据量较大时,存储和查询效率逐渐降低,此时会需要对其进行分片和负载均衡。 ③Redis的延迟队列需要手动实现消息重试机制,更严谨的消息队列需要数据库兜底。 1.3 应用场景: ①适用于较小规模的系统,实时性要求较高的场景。 ②适用于轻量级的任务调度和消息通知场景,适合短期延迟任务,不适合长期任务,例如订单超时未支付等。
2.1 优点: ①Kafka的优点在于其高并发、高吞吐量和可扩展性强,同时支持分片。 ②可靠性高,支持分布式和消息持久化。 ③消费者可以随时回溯消费。 ④支持多个消费者并行消费、消费者组等机制。 2.2 缺点: ①没有原生的延迟队列功能,需要使用topic和消费者组来实现,实现延迟队列需要额外的开发工作。 ②消费者需要主动拉取数据,可能会导致延迟,精度不是特别高。 在此案例中代码已经实现了,直接拿来使用就可以了。 2.3 应用场景: 适用于大规模的数据处理,实时性要求较高的,高吞吐量的消息处理场景。
3.1 优点: ①RabbitMQ的延迟队列是通过RabbitMQ的插件实现的,易于部署和使用。 ②RabbitMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。 ③支持消息持久化和分布式。 ④支持优先级队列和死信队列。 ⑤提供了丰富的插件和工具。 3.2 缺点: ①RabbitMQ的延迟队列性能较低,不适用于高吞吐量的场景。 ②性能较低,不适合高并发场景。 ③实现延迟队列需要额外的配置,但是配置就很简单了。 3.3应用场景: 适用于中小型的任务调度和消息通知,对可靠性要求高的场景。
4.1 优点: ①RocketMQ的延迟队列是RocketMQ原生支持的,易于使用和部署。 ②RocketMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。 ③高性能和高吞吐量,支持分布式和消息持久化。 ④RocketMQ使用简单,性能好,并且支持延迟队列功能。 4.2 缺点: ①RocketMQ的延迟队列不支持动态添加或删除队列。 ②RocketMQ的延迟队列需要保证消息的顺序,可能会导致消息延迟。 ③在节点崩溃后,RocketMQ有可能发生消息丢失。 4.3 应用场景: ①适用于大规模的数据处理,对性能和吞吐量要求较高的场景。 ②适合于任务量较大、需要延迟消息和定时消息的场景。例如电商平台、社交软件等。 ③适用于分布式任务调度和高可靠性消息通知场景。
基于以上四种实现延时队列的分析来,选择对应的技术方案的基础上呢,不同公司的mq的基础设施不同,如果只有Kafka,也没必要引入RabbitMQ和RocketMq来实现,引入新的组件也会顺便带来新的问题。
网上搜Kafka实现延时队列有很多文章,很多文章说使用Kafka内部的时间轮,支持延时操作,但这是Kafka自己内部使用的,时间轮只是一个工具类,用户无法将其作为延迟队列来使用。
Kafka延时队列的最佳实践,使用Kafka消费者的暂停和恢复机制来实现。

以下代码只列出了核心实现,完整代码已经给大家整理好了,可以关注【微信公众号】微信搜索【老板来一杯java】,然后【加群】直接获取源码,在自己项目中引入即用!
源码目录:

定义一个Kafka延期队列,包含的内容:KafkaDelayQueue,其中有延迟队列配置,主题,消费组,延迟时间,目标主题,KafkaSyncConsumer,ApplicationContext,poll线程池,delay线程池等等
package com.wdyin.kafka.delay; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import java.time.Duration; import java.util.Collections; import java.util.concurrent.ThreadPoolExecutor; /** * kafka延时队列 * * @Author WDYin * @Date 2022/7/2 **/ @Slf4j @Getter @Setter class KafkaDelayQueue{ private String topic; private String group; private Integer delayTime; private String targetTopic; private KafkaDelayConfig kafkaDelayConfig; private KafkaSyncConsumer kafkaSyncConsumer; private ApplicationContext applicationContext; private ThreadPoolTaskScheduler threadPoolPollTaskScheduler; private ThreadPoolTaskScheduler threadPoolDelayTaskScheduler; ...... }
Kafka延期队列的工厂,用于及其管理延迟队列
package com.wdyin.kafka.delay;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.Properties;
/**
* 延时队列工厂
* @author WDYin
* @date 2023/4/17
**/
@Data
public class KafkaDelayQueueFactory {
private KafkaDelayConfig kafkaDelayConfig;
private Properties properties;
private ApplicationContext applicationContext;
private Integer concurrency;
public KafkaDelayQueueFactory(Properties properties, KafkaDelayConfig kafkaDelayConfig) {
Assert.notNull(properties, "properties cannot null");
Assert.notNull(kafkaDelayConfig.getDelayThreadPool(), "delayThreadPool cannot null");
Assert.notNull(kafkaDelayConfig.getPollThreadPool(), "pollThreadPool cannot null");
Assert.notNull(kafkaDelayConfig.getPollInterval(), "pollInterval cannot null");
Assert.notNull(kafkaDelayConfig.getPollTimeout(), "timeout cannot null");
this.properties = properties;
this.kafkaDelayConfig = kafkaDelayConfig;
}
public void listener(String topic, String group, Integer delayTime, String targetTopic) {
if (StringUtils.isEmpty(topic)) {
throw new RuntimeException("topic cannot empty");
}
if (StringUtils.isEmpty(group)) {
throw new RuntimeException("group cannot empty");
}
if (StringUtils.isEmpty(delayTime)) {
throw new RuntimeException("delayTime cannot empty");
}
if (StringUtils.isEmpty(targetTopic)) {
throw new RuntimeException("targetTopic cannot empty");
}
KafkaSyncConsumer kafkaSyncConsumer = createKafkaSyncConsumer(group);
KafkaDelayQueue kafkaDelayQueue = createKafkaDelayQueue(topic, group, delayTime, targetTopic, kafkaSyncConsumer);
kafkaDelayQueue.send();
}
private KafkaDelayQueue createKafkaDelayQueue(String topic, String group, Integer delayTime, String targetTopic, KafkaSyncConsumer kafkaSyncConsumer) {
KafkaDelayQueue kafkaDelayQueue = new KafkaDelayQueue<>(kafkaSyncConsumer, kafkaDelayConfig);
Assert.notNull(applicationContext, "kafkaDelayQueue need applicationContext");
kafkaDelayQueue.setApplicationContext(applicationContext);
kafkaDelayQueue.setDelayTime(delayTime);
kafkaDelayQueue.setTopic(topic);
kafkaDelayQueue.setGroup(group);
kafkaDelayQueue.setTargetTopic(targetTopic);
return kafkaDelayQueue;
}
private KafkaSyncConsumer createKafkaSyncConsumer(String group) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
return new KafkaSyncConsumer<>(properties);
}
}
package com.wdyin.kafka.delay; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.springframework.context.ApplicationListener; import org.springframework.kafka.core.KafkaTemplate; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.*; /** * 延时队列监听 * @Author : WDYin * @Date : 2021/5/7 * @Desc : */ @Slf4j public class KafkaPollListenerimplements ApplicationListener > { private KafkaTemplate kafkaTemplate; public KafkaPollListener(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @Override public void onApplicationEvent(KafkaPollEvent event) { ConsumerRecords records = (ConsumerRecords ) event.getSource(); Integer delayTime = event.getDelayTime(); KafkaDelayQueue kafkaDelayQueue = event.getKafkaDelayQueue(); KafkaSyncConsumer kafkaSyncConsumer = kafkaDelayQueue.getKafkaSyncConsumer(); Set partitions = records.partitions(); Map commitMap = new HashMap<>(); partitions.forEach((partition) -> { List > consumerRecords = records.records(partition); for (ConsumerRecord record : consumerRecords) { long startTime = (record.timestamp() / 1000) * 1000; long endTime = startTime + delayTime; long now = System.currentTimeMillis(); if (endTime > now) { kafkaSyncConsumer.pauseAndSeek(partition, record.offset()); kafkaDelayQueue.getThreadPoolPollTaskScheduler().schedule(kafkaDelayQueue.delayTask(partition), new Date(endTime)); break; } log.info("{}: partition:{}, offset:{}, key:{}, value:{}, messageDate:{}, nowDate:{}, messageDate:{}, nowDate:{}", Thread.currentThread().getName() + "#" + Thread.currentThread().getId(), record.topic() + "-" + record.partition(), record.offset(), record.key(), record.value(), LocalDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()), LocalDateTime.now(), startTime, Instant.now().getEpochSecond()); kafkaTemplate.send(kafkaDelayQueue.getTargetTopic(), record.value()); commitMap.put(partition, new OffsetAndMetadata(record.offset() + 1)); } }); if (!commitMap.isEmpty()) { kafkaSyncConsumer.commit(commitMap); } } }
package com.wdyin.kafka.delay;
import lombok.Data;
/**
* 延时队列配置
* @author WDYin
* @date 2023/4/16
**/
@Data
public class KafkaDelayConfig {
private Integer pollInterval;
private Integer pollTimeout;
private Integer pollThreadPool;
private Integer delayThreadPool;
public KafkaDelayConfig() {
}
......
}
自己项目中引入以上代码之后,使用KafkaDelayApplication:一个Kafka延迟任务注册程序,注意一个延时主题对应一个延迟时间,后续有新的延迟任务只需要在此注册延迟任务的监听即可!开箱即用!
使用流程:
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @author WDYin
* @date 2023/4/18
**/
@Component
public class KafkaDelayApplication {
@Resource
private KafkaDelayQueueFactory kafkaDelayQueueFactory;
/**
* 延迟任务都可以配置在这里
* Kafka将消息从【延时主题】经过【延时时间】后发送到【目标主题】
*/
@PostConstruct
public void init() {
//延迟30秒
kafkaDelayQueueFactory.listener("delay-30-second-topic", "delay-30-second-group", 1 * 30 * 1000, "delay-60-second-target-topic");
//延迟60秒
kafkaDelayQueueFactory.listener("delay-60-second-topic", "delay-60-second-group", 1 * 60 * 1000, "delay-60-second-target-topic");
//延迟30分钟
kafkaDelayQueueFactory.listener("delay-30-minute-topic", "delay-30-minute-group", 30 * 60 * 1000, "delay-30-minute-target-topic");
}
}


好文推荐:
2.5万字详解23种设计模式
微服务springcloud环境下基于Netty搭建websocket集群实现服务器消息推送----netty是yyds
2.5万字讲解DDD领域驱动设计