🎉🎉欢迎光临🎉🎉
🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀
🌟特别推荐给大家我的最新专栏《Redis实战与进阶》
本专栏带你Redis从入门到入魔
这是苏泽的个人主页可以看到我其他的内容哦👇👇
努力的苏泽http://suzee.blog.csdn.net/
开发目的:
开发一个高并发预约管理处理系统,其中用户可以预约倾听者。由于并发预约可能导致冲突和混乱,需要实现分布式锁来确保同一时间段只有一个用户可以进行预约。为了实现这一目的,使用Redis作为分布式锁的存储介质,并设计一组表来存储倾听者的预约情况。
高并发预约管理:系统能够处理大量用户同时预约倾听者的情况,通过使用分布式锁来保证同一时间段只有一个用户可以进行预约,防止冲突和混乱。
分布式锁实现:系统使用Redis作为分布式锁的存储介质,通过设置键值对来实现分布式锁。具体地,使用一组表来存储倾听者的预约情况,表名由倾听者的ID和日期组成。每个表使用Redis的哈希表结构,其中键表示时间段,值表示该时间段是否已被预约(真或假)。通过对这些表的操作,系统实现了分布式锁的效果。
预约冲突检测:在用户进行预约时,系统会检查对应倾听者的预约情况表,判断该时间段是否已被其他用户预约。如果时间段已被预约,则系统会阻止当前用户的预约请求,以避免冲突。
数据持久化:用户的预约信息会被保存到数据库中,以便后续查询和处理。同时,通过RabbitMQ等消息队列技术,系统可以将预约信息发送到其他模块进行处理。
系统中已经集成了Spring Cloud、Redis和RabbitMQ相关依赖。
创建Redis分布式锁的工具类:
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.data.redis.core.script.ScriptExecutor; import org.springframework.data.redis.serializer.RedisSerializer; import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; public class RedisDistributedLock { private RedisTemplateredisTemplate; private ScriptExecutor scriptExecutor; private RedisSerializer redisSerializer; public RedisDistributedLock(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; this.scriptExecutor = this.redisTemplate.getScriptExecutor(); this.redisSerializer = this.redisTemplate.getStringSerializer(); } /** * 尝试获取分布式锁 * * @param lockKey 锁的key * @param requestId 锁的唯一标识 * @param expireTime 锁的过期时间(单位:秒) * @return 是否成功获取锁 */ public boolean tryLock(String lockKey, String requestId, long expireTime) { RedisScript script = new DefaultRedisScript<>( "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " + "return redis.call('expire', KEYS[1], ARGV[2]) " + "else " + "return 0 " + "end", Long.class ); Long result = scriptExecutor.execute(script, Collections.singletonList(lockKey), requestId, expireTime); return result != null && result == 1; } /** * 释放分布式锁 * * @param lockKey 锁的key * @param requestId 锁的唯一标识 */ public void releaseLock(String lockKey, String requestId) { RedisScript script = new DefaultRedisScript<>( "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else " + "return 0 " + "end", Long.class ); scriptExecutor.execute(script, Collections.singletonList(lockKey), requestId); } /** * 生成唯一的锁标识 * * @return 锁的唯一标识 */ public String generateRequestId() { return UUID.randomUUID().toString(); } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service public class ReservationService { private RedisDistributedLock distributedLock; private ReservationRepository reservationRepository; private RabbitTemplate rabbitTemplate; @Autowired public ReservationService(RedisDistributedLock distributedLock, ReservationRepository reservationRepository, RabbitTemplate rabbitTemplate) { this.distributedLock = distributedLock; this.reservationRepository = reservationRepository; this.rabbitTemplate = rabbitTemplate; } @Transactional public void makeReservation(String listenerId, String userId, String timeSlot) { String lockKey = listenerId + "-" + timeSlot; String requestId = distributedLock.generateRequestId(); try { // 尝试获取分布式锁,设置锁的过期时间为一定的秒数(例如10秒) if (distributedLock.tryLock(lockKey, requestId, 10)) { // 获取到锁,可以进行预约操作 // 检查时间段是否已经被预约 if (!isTimeSlotBooked(listenerId, timeSlot)) { // 保存预约信息到数据库 Reservation reservation = new Reservation(listenerId, userId, timeSlot); reservationRepository.save(reservation); // 发送预约消息到RabbitMQ rabbitTemplate.convertAndSend("reservation-exchange", "reservation-queue", reservation); } else { // 时间段已经被预约,抛出异常或返回错误信息 throw new RuntimeException("The time slot is already booked."); } } else { // 未获取到锁,抛出异常或返回错误信息 throw new RuntimeException("Failed to acquire lock for the time slot."); } } finally { // 释放分布式锁 distributedLock.releaseLock(lockKey, requestId); } } private boolean isTimeSlotBooked(String listenerId, String timeSlot) { // 查询数据库或Redis,判断时间段是否已经被预约 // 返回true表示已被预约,返回false表示未被预约 } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; @Configuration public class AppConfig { private RedisConnectionFactory redisConnectionFactory; private ConnectionFactory rabbitConnectionFactory; @Autowired public AppConfig(RedisConnectionFactory redisConnectionFactory, ConnectionFactory rabbitConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; this.rabbitConnectionFactory = rabbitConnectionFactory; } @Bean public RedisTemplateredisTemplate() { RedisTemplate redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setConnectionFactory(rabbitConnectionFactory); return rabbitTemplate; } @Bean public RedisDistributedLock distributedLock(RedisTemplate redisTemplate) { return new RedisDistributedLock(redisTemplate); } }
RabbitMQ是一种消息队列系统,可以用于异步处理预约消息,提高系统的可伸缩性和稳定性。以下是实现RabbitMQ部分的步骤:
创建预约消息的实体类:
public class ReservationMessage { private String listenerId; private String userId; private String timeSlot; // 构造方法、Getter和Setter方法省略 }
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class ReservationMessageConsumer { @RabbitListener(queues = "reservation-queue") public void processReservationMessage(ReservationMessage reservationMessage) { // 处理预约消息,例如发送通知给倾听者或用户 System.out.println("Received reservation message: " + reservationMessage.toString()); } }
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean public Queue reservationQueue() { return new Queue("reservation-queue"); } @Bean public DirectExchange reservationExchange() { return new DirectExchange("reservation-exchange"); } @Bean public Binding reservationBinding(Queue reservationQueue, DirectExchange reservationExchange) { return BindingBuilder.bind(reservationQueue).to(reservationExchange).with("reservation-queue"); } }
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class ReservationService { private RabbitTemplate rabbitTemplate; @Autowired public ReservationService(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void makeReservation(String listenerId, String userId, String timeSlot) { // 创建预约消息 ReservationMessage reservationMessage = new ReservationMessage(listenerId, userId, timeSlot); // 发送预约消息到RabbitMQ rabbitTemplate.convertAndSend("reservation-exchange", "reservation-queue", reservationMessage); } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Map; @Component public class ReservationStatusManager { private static final String TABLE_PREFIX = "listener_"; private RedisTemplateredisTemplate; private HashOperations hashOperations; @Autowired public ReservationStatusManager(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } @PostConstruct private void init() { hashOperations = redisTemplate.opsForHash(); } public void setReservationStatus(String listenerId, String date, String timeSlot, String status) { String tableName = TABLE_PREFIX + listenerId + "_" + date; hashOperations.put(tableName, timeSlot, status); } public String getReservationStatus(String listenerId, String date, String timeSlot) { String tableName = TABLE_PREFIX + listenerId + "_" + date; return hashOperations.get(tableName, timeSlot); } public void removeProcessedReservations(String listenerId, String date) { String tableName = TABLE_PREFIX + listenerId + "_" + date; redisTemplate.delete(tableName); } public Map getAllReservationStatus(String listenerId, String date) { String tableName = TABLE_PREFIX + listenerId + "_" + date; return hashOperations.entries(tableName); } }
我们通过注入RedisTemplate来操作Redis。在ReservationStatusManager类中,我们使用HashOperations来进行哈希表的操作。init()方法使用@PostConstruct注解进行初始化,确保RedisTemplate和HashOperations在对象创建后进行实例化。
setReservationStatus()方法用于设置预约状态,getReservationStatus()方法用于获取预约状态,removeProcessedReservations()方法用于删除已处理的预约状态。另外,getAllReservationStatus()方法可以用于获取某个倾听者在特定日期的所有预约状态。
那么这样就实现了 在多个用户想要同时预约同一个倾听者的同一时间时 所避免的数据混乱
我们这样做保证了数据的一致性 又使用了 以速度性能著称的Redis 和异步处理的RabbitMQ这样就能使得 在预约成功的一瞬间 完成通知倾听者的效果 达到了预期的业务效果