Redis Stream解密:探秘数据流处理的黑科技【一】
解锁Redis Stream新境界:高级用法大揭秘【二】
Redis List:打造高效消息队列的秘密武器【redis实战 一】
在快节奏的技术世界中,消息队列是连接不同服务和组件的关键。而在这个领域,Redis Streams作为一种新兴的消息队列解决方案,以其高性能和易用性吸引了众多开发者的目光。当这项技术遇到了Spring Boot —— 当今最受欢迎的Java开发框架,它们的结合将如何开启新的可能性?让我们开始这趟探索之旅,深入了解如何将这两种强大的技术融合在一起,打造出优雅而强大的消息队列系统。
Redis Streams是Redis数据库的一个强大类型,于Redis 5.0中引入。它主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。以下是Redis Streams的一些基本概念和核心特性:
org.springframework.boot spring-boot-starter-data-redis org.projectlombok lombok
package fun.bo.config; import fun.bo.consumer.MessageConsumer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import java.time.Duration; /** * @author xiaobo */ @Configuration public class StreamConfig { @Bean public StreamMessageListenerContainer> streamMessageListenerContainer( RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) { // 用于配置消息监听容器的选项。在这个方法中,通过设置不同的选项,如轮询超时时间和消息的目标类型,可以对消息监听容器进行个性化的配置。 StreamMessageListenerContainer.StreamMessageListenerContainerOptions > options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() // 设置了轮询超时的时间为100毫秒。这意味着当没有新的消息时,容器将每隔100毫秒进行一次轮询。 .pollTimeout(Duration.ofMillis(100)) // 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型,以便在后续的处理中使用。 .targetType(String.class) .build(); // 创建一个可用于监听Redis流的消息监听容器。 StreamMessageListenerContainer > listenerContainer = StreamMessageListenerContainer.create(connectionFactory, options); // 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量,以确定从哪里开始读取消息。 listenerContainer.receive( Consumer.from("your-consumer-group", "your-consumer-name"), StreamOffset.create("your-stream-name", ReadOffset.lastConsumed()), messageConsumer); // 方法启动了消息监听容器,使其开始监听消息。一旦容器被启动,它将开始接收并处理来自Redis流的消息。 listenerContainer.start(); return listenerContainer; } }
package fun.bo.produce; import lombok.RequiredArgsConstructor; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.Map; /** * @author xiaobo */ @Service @RequiredArgsConstructor public class MessageProducer { private final RedisTemplateredisTemplate; public void sendMessage(String streamKey, String messageKey, String message) { Map messageMap = new HashMap<>(); messageMap.put(messageKey, message); RecordId recordId = redisTemplate.opsForStream().add(streamKey, messageMap); if (recordId != null) { System.out.println("Message sent to Stream '" + streamKey + "' with RecordId: " + recordId); } } }
package fun.bo.consumer; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Service; /** * @author xiaobo */ @Service public class MessageConsumer implements StreamListener> { @Override public void onMessage(ObjectRecord message) { String stream = message.getStream(); String messageId = message.getId().toString(); String messageBody = message.getValue(); System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId); System.out.println("Message body: " + messageBody); } }
如果是已经存在stream,则可以不配置,这个主要是为了防止启动报错,org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key ‘your-stream-name’ or consumer group ‘your-consumer-group’ in XREADGROUP with GROUP option
public void initializeStream() { StreamOperationsstreamOperations = redisTemplate.opsForStream(); // 创建一个流 try { streamOperations.createGroup("your-stream-name", ReadOffset.from("0"), "your-consumer-group"); } catch (Exception e) { // 流可能已存在,忽略异常 } }
Redis Streams作为消息队列相比于使用传统的Redis List类型,引入了一系列改进和新功能,同时也与专业的高级消息队列系统(如RabbitMQ、Kafka等)相比存在一些差距。以下是详细的分析:
更好的消息顺序保证:
消费者组支持:
消息持久化和读取:
复杂的读取操作:
消息确认和重试:
事务和消息持久性保证:
集群和分区:
管理和监控工具:
高级消息路由和过滤:
消息传递语义:
Redis Streams提供了一个轻量级、高性能且功能丰富的消息队列实现,解决了使用List作为队列时的许多痛点,特别适合需要快速部署、低延迟和简单可靠的场景。然而,对于需要复杂事务处理、高级路由和过滤、或更丰富管理工具的复杂应用场景,专业的消息队列系统可能更加适合。选择哪种方案,应根据你的具体需求、资源和技术栈来决定。
上一篇:每日coding