本节内容主要介绍springboot项目通过集成redis,如何利用redis的订阅发布机制,完成系统消息的发布与订阅功能。Redis中的发布与订阅是一种消息通信模式,允许发送者(发布者)将消息发送给多个接收者(订阅者)。在 Redis中,发布与订阅通过PUBLISH和SUBSCRIBE命令实现。频道(Channel):频道是消息的通道,用于区分不同类型或主题的消息。订阅者可以选择订阅感兴趣的频道,以接收相应的消息。Redis的发布与订阅模式是无状态的,即发布者在发送消息之后不需要关心是否有订阅者接收到消息,也不需要维护订阅者的信息。当发布者向某个频道发布消息时,所有订阅了该频道的订阅者都会接收到相同的消息。这种机制使得消息的发布者和订阅者之间能够实现解耦,并支持一对多的消息传递方式,即广播形式。
①创建一个web项目,引入redis启动器的pom依赖
org.springframework.boot spring-boot-starter-data-redisorg.springframework.boot spring-boot-starter-weborg.projectlombok lombok
② 在application.yml中添加redis的配置
③创建redis的配置类, 初始化redis工具类RedisTemplate和redis订阅消息的监听容器RedisMessageListenerContainer
package com.yundi.atp.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { /** * 初始化一个Redis消息监听容器 * @param connectionFactory * @return */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 添加其他配置,如线程池大小等 return container; } @Bean public RedisTemplateredisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(connectionFactory); redisTemplate.setDefaultSerializer(new StringRedisSerializer()); return redisTemplate; } }
④创建redis消息频道的常量
package com.yundi.atp.constant; public class ChannelConstant { /** * 广播通道 */ public static final String CHANNEL_GLOBAL_NAME = "channel-global"; /** * 单播通道 */ public static final String CHANNEL_SINGLE_NAME = "channel-single"; }
⑤ 创建一个http请求,用于发布基于redis的消息供客户端订阅
package com.yundi.atp.controller; import com.yundi.atp.constant.ChannelConstant; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RequestMapping(value = "base") @RestController public class BaseController { @Resource private RedisTemplate redisTemplate; /** * 发布广播消息 * * @param msg */ @GetMapping(value = "/publish/{msg}") public void sendMsg(@PathVariable(value = "msg") String msg) { redisTemplate.convertAndSend(ChannelConstant.CHANNEL_GLOBAL_NAME, msg); } }
⑥ 创建一个消息订阅者,实现MessageListener接口,通过重写onMessage方法订阅消息
package com.yundi.atp.listen; import com.yundi.atp.constant.ChannelConstant; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; @Slf4j @Component public class RedisMessageSubscriber implements MessageListener { @Autowired private RedisMessageListenerContainer redisMessageListenerContainer; /** * 订阅消息:将订阅者添加到指定的频道 */ @PostConstruct public void subscribeToChannel() { //广播消息 redisMessageListenerContainer.addMessageListener(this, new ChannelTopic(ChannelConstant.CHANNEL_GLOBAL_NAME)); } @Override public void onMessage(Message message, byte[] bytes) { String channel = new String(message.getChannel(), StandardCharsets.UTF_8); String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); log.info("Received message: " + messageBody + " from channel: " + channel); } }
⑦启动项目,通过http请求发布消息,查看是否能够订阅成功消息
⑧开启redis客户端测试,同样能够订阅到消息,证明redis的消息的订阅与发布是无状态的且是广播模式
关于springboot集成redis实现消息的订阅与发布的内容到这里就结束了,我们下期见。。。。。。