
Redis Stream解密:探秘数据流处理的黑科技【一】
解锁Redis Stream新境界:高级用法大揭秘【二】
Redis List:打造高效消息队列的秘密武器【redis实战 一】
在快节奏的技术世界中,消息队列是连接不同服务和组件的关键。而在这个领域,Redis Streams作为一种新兴的消息队列解决方案,以其高性能和易用性吸引了众多开发者的目光。当这项技术遇到了Spring Boot —— 当今最受欢迎的Java开发框架,它们的结合将如何开启新的可能性?让我们开始这趟探索之旅,深入了解如何将这两种强大的技术融合在一起,打造出优雅而强大的消息队列系统。
Redis Streams是Redis数据库的一个强大类型,于Redis 5.0中引入。它主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。以下是Redis Streams的一些基本概念和核心特性:
org.springframework.boot spring-boot-starter-data-redis org.projectlombok lombok 
package fun.bo.config;
import fun.bo.consumer.MessageConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import java.time.Duration;
/**
 * @author xiaobo
 */
@Configuration
public class StreamConfig {
    @Bean
    public StreamMessageListenerContainer> streamMessageListenerContainer(
            RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {
        // 用于配置消息监听容器的选项。在这个方法中,通过设置不同的选项,如轮询超时时间和消息的目标类型,可以对消息监听容器进行个性化的配置。
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 设置了轮询超时的时间为100毫秒。这意味着当没有新的消息时,容器将每隔100毫秒进行一次轮询。
                        .pollTimeout(Duration.ofMillis(100))
                        // 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型,以便在后续的处理中使用。
                        .targetType(String.class)
                        .build();
        // 创建一个可用于监听Redis流的消息监听容器。
        StreamMessageListenerContainer> listenerContainer =
                StreamMessageListenerContainer.create(connectionFactory, options);
        // 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量,以确定从哪里开始读取消息。
        listenerContainer.receive(
                Consumer.from("your-consumer-group", "your-consumer-name"),
                StreamOffset.create("your-stream-name", ReadOffset.lastConsumed()), messageConsumer);
        // 方法启动了消息监听容器,使其开始监听消息。一旦容器被启动,它将开始接收并处理来自Redis流的消息。
        listenerContainer.start();
        return listenerContainer;
    }
}
    
package fun.bo.produce;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
 * @author xiaobo
 */
@Service
@RequiredArgsConstructor
public class MessageProducer {
    private final RedisTemplate redisTemplate;
    public void sendMessage(String streamKey, String messageKey, String message) {
        Map messageMap = new HashMap<>();
        messageMap.put(messageKey, message);
        RecordId recordId = redisTemplate.opsForStream().add(streamKey, messageMap);
        if (recordId != null) {
            System.out.println("Message sent to Stream '" + streamKey + "' with RecordId: " + recordId);
        }
    }
}
   
package fun.bo.consumer; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Service; /** * @author xiaobo */ @Service public class MessageConsumer implements StreamListener> { @Override public void onMessage(ObjectRecord message) { String stream = message.getStream(); String messageId = message.getId().toString(); String messageBody = message.getValue(); System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId); System.out.println("Message body: " + messageBody); } } 
如果是已经存在stream,则可以不配置,这个主要是为了防止启动报错,org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key ‘your-stream-name’ or consumer group ‘your-consumer-group’ in XREADGROUP with GROUP option
public void initializeStream() {
  StreamOperations streamOperations = redisTemplate.opsForStream();
  // 创建一个流
  try {
    streamOperations.createGroup("your-stream-name", ReadOffset.from("0"), "your-consumer-group");
  } catch (Exception e) {
    // 流可能已存在,忽略异常
  }
}
  

Redis Streams作为消息队列相比于使用传统的Redis List类型,引入了一系列改进和新功能,同时也与专业的高级消息队列系统(如RabbitMQ、Kafka等)相比存在一些差距。以下是详细的分析:
更好的消息顺序保证:
消费者组支持:
消息持久化和读取:
复杂的读取操作:
消息确认和重试:
事务和消息持久性保证:
集群和分区:
管理和监控工具:
高级消息路由和过滤:
消息传递语义:
Redis Streams提供了一个轻量级、高性能且功能丰富的消息队列实现,解决了使用List作为队列时的许多痛点,特别适合需要快速部署、低延迟和简单可靠的场景。然而,对于需要复杂事务处理、高级路由和过滤、或更丰富管理工具的复杂应用场景,专业的消息队列系统可能更加适合。选择哪种方案,应根据你的具体需求、资源和技术栈来决定。
上一篇:每日coding