使用当前(2022年12月初)最新的版本:3.18.1;
org.redisson redisson 3.18.1
案例采用redis-cluster集群的方式;
public class Main { public static void main(String[] args) throws Exception { // 1.配置Redis-Cluster集群节点的ip和port Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002") .addNodeAddress("redis://127.0.0.1:7003") .addNodeAddress("redis://127.0.0.1:7004"); // 2.创建Redisson的客户端 RedissonClient redisson = Redisson.create(config); // 3.测试Redisson可重⼊锁的加锁、释放锁 testLock(redisson); } private static void testLock(RedissonClient redisson) throws InterruptedException { // 1.获取key为"anyLock"的锁对象 final RLock lock = redisson.getLock("test_lock"); boolean locked = true; try { //2.1:加锁 lock.lock(); // 2.2:加锁,并设置尝试获取锁超时时间30s、锁超时⾃动释放的时间10s // locked = lock.tryLock(30, 10, TimeUnit.SECONDS); if (locked) System.out.println("加锁成功!" + new Date()); Thread.sleep(20 * 1000); System.out.println("锁逻辑执行完毕!" + new Date()); } finally { // 3.释放锁 lock.unlock(); } } }
redission支持4种连接redis方式,分别为单机、主从、Sentinel、Cluster 集群;在分布式锁的实现上区别在于hash槽的获取方式。
具体配置方式见Redisson的GitHub(https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95#21-%E7%A8%8B%E5%BA%8F%E5%8C%96%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95)
分布式锁主要需要以下redis命令:
EXISTS key:当 key 存在,返回1;不存在,返回0。
GETSET key value:将给定 key 的值设为 value ,并返回 key 的旧值 (old value);当 key 存在但不是字符串类型时,返回一个错误;当key不存在时,返回nil。
GET key:返回 key 所关联的字符串值,如果 key 不存在那么返回 nil。
DEL key [KEY …]:删除给定的一个或多个 key(不存在的 key 会被忽略),返回实际删除的key的个数(integer)。
DEL key1 key2 key3HSET key field value:给一个key 设置一个{field=value}的组合值,如果key没有就直接赋值并返回1;如果field已有,那么就更新value的值,并返回0。
HEXISTS key field:当key中存储着field的时候返回1,如果key或者field有一个不存在返回0。
HINCRBY key field increment:将存储在key中的哈希(Hash)对象中的指定字段field的值加上增量increment;
- 如果键key不存在,一个保存了哈希对象{field=value}的key将被创建;
- 如果字段field不存在,在进行当前操作前,feild将被创建,且对应的值被置为0;返回值是increment。
PEXPIRE key milliseconds:设置存活时间,单位是毫秒。EXPIRE操作单位是秒。
PUBLISH channel message:向channel post一个message内容的消息,返回接收消息的客户端数。
Redisson源码中,执行redis命令的是lua脚本,其中主要有如下几个概念:
- redis.call():执行redis命令。
- KEYS[n]:指脚本中第n个参数,比如KEYS[1]指脚本中的第一个参数。
- ARGV[n]:指脚本中第n个参数的值,比如ARGV[1]指脚本中的第一个参数的值。
- 返回值中nil与false同一个意思。
在redis执行lua脚本时,相当于一个redis级别的锁,不能执行其他操作,类似于原子操作,这也是redisson实现的一个关键点。
另外,如果lua脚本执行过程中出现了异常或者redis服务器宕机了,会将脚本中已经执行的命令在AOF、RDB日志中删除;即LUA脚本执行报错会进行回滚操作。
RLock接口主要继承了Lock接口,并扩展了部分方法,比如:tryLock(long waitTime, long leaseTime, TimeUnit unit)方法中加入的leaseTime参数,用来设置锁的过期时间,如果超过leaseTime还没有解锁的话,redis就强制解锁;leaseTime的默认时间是30s。
RLock lock = redissonClient.getLock("test_lock");
RLock对象表示⼀个锁对象,我们要某一个key加锁时,需要先获取⼀个锁对象。
这里并没有具体请求Redis进行加锁的逻辑,而只是调用RedissonLock的构造函数,设置一些变量。
进入到Rlock#lock()方法,先看主流程;关于竞争锁等待时间、锁超时释放时间的配置、使用,在流程中穿插着聊。
lock()方法执行链路:
走到这里,已经可以看到加锁的底层逻辑:LUA脚本。
而lua脚本只是⼀⼤串字符串,作为evalWriteAsync()⽅法的⼀个参数⽽已;所以下⼀步进到evalWriteAsync()⽅法中:
走到这里会调用ConnectionManager#getEntry(String)方法;
在创建RedissonClient时,笔者配置的是Redis-Cluster,而走到这里却会进入到MasterSlaveConnectionManager,实际上实例化的ConnectionManager就是RedisCluster模式下的ClusterConnectionManager,而ClusterConnectionManager继承自MasterSlaveConnectionManager,并且ClusterConnectionManager没有重写getEntry(String)方法,所以会进入到MasterSlaveConnectionManager#getEntry(String)方法。
ConnectionManager#getEntry(String)方法会根据传入的key名称找到相应的Redis节点、目标master。
Redis-Cluster集群中的数据分布式是 通过⼀个⼀个的hash slot来实现的,Redis-Cluster集群总共16384个hash slot,它们都 会被均匀分布到所有的master节点上;这里ClusterConnectionManager通过key名称计算出相应的hash slot方式如下:
@Override public int calcSlot(String key) { if (key == null) { return 0; } int start = key.indexOf('{'); if (start != -1) { int end = key.indexOf('}'); if (end != -1 && start + 1 < end) { key = key.substring(start + 1, end); } } int result = CRC16.crc16(key.getBytes()) % MAX_SLOT; log.debug("slot {} for {}", result, key); return result; }
这⾥计算出key的hash slot之后,就可以通过hash slot 去看⼀看哪个master上有这个hash slot,如果某个master上有个这个hash slot,那么这个 key当然就会落到该master节点上,执⾏加锁指令也就应该在该master上执⾏。
下面进入本文重点,可重入锁的各种加锁、释放锁。
在寻找应该在哪台Redis机器上加锁时,在RedissonLock#tryLockInnerAsync()方法中我们看到了一堆LUA脚本:
LUA脚本参数解析:
- KEYS[1] 表示的是 getName() ,即锁key的名称,比如案例中的 test_lock;
- ARGV[1] 表示的是 internalLockLeaseTime 默认值是30s;
- ARGV[2] 表示的是 getLockName(threadId) ,唯一标识当前访问线程,使用锁对象id+线程id(UUID:ThreadId)方式表示,用于区分不同服务器上的线程。
- UUID用来唯⼀标识⼀个客户端,因为会有多个客户端的多个线程加锁;
- 结合起来的UUID:ThreadId 表示:具体哪个客户端上的哪个线程过来加锁,通 过这样的组合⽅式唯⼀标识⼀个线程。
LUA脚本逻辑:
- 如果锁名称不存在;
- 则向redis中添加一个key为test_lock的HASH结构、添加一个field为线程id,值=1的键值对{field:increment},表示此线程的重入次数为1;
- 设置test_lock的过期时间,防止当前服务器出问题后导致死锁,然后return nil; end;返回nil,lua脚本执行完毕;
- 如果锁存在,检测当前线程是否持有锁;
- 如果是当前线程持有锁,hincrby将该线程重入的次数++;并重新设置锁的过期时间;返回nil,lua脚本执行完毕;
- 如果不是当前线程持有锁,pttl返回锁的过期时间,单位ms。
第一次加锁时,key肯定不存在与master节点上;会执行下列LUA脚本对应的Redis指令:
hset test_lock UUID:ThreadId 1 pexpire test_lock 30000
此时,Redis中多一个Hash结构的key(test_lock):
test_lock : { UUID:ThreadId:1 }
这里的1使用来做锁重入的。
pexpire指令为test_lock这个key设置过期时间为30s,即:30s后这个key会⾃动过期被删除,key对应的锁在那时也就被释放了。
总体来看,加锁的逻辑很简单:
成功加锁后,lua脚本返回nil,即null。
加锁成功之后,tryLockInnerAsync()⽅法返回;再结合Java8的Stream,对加锁结果进一步处理;
因为加锁成功后返回的是nil,这是lua脚本的返回形式,体现到java代码中的返回值为:null。
又由于RLock#lock()方法传入的leaseTime是-1,所以进入到scheduleExpirationRenewal(long)方法做锁续约。
renewExpirationAsync()方法负责做具体的锁续约:
protected CompletionStagerenewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
这里LUA脚本的逻辑很简单:
锁续约(看门狗机制)其实就是每次加锁成功后,会⻢上开启⼀个后台线程, 每隔10s检查⼀下key是否存在,如果存在就为key续期30s。
如果当前持有锁的线程被中断了,会停止锁续约,即杀死看门狗;
protected void cancelExpirationRenewal(Long threadId) { ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } if (threadId != null) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } }
所谓的停止锁续约,实际就是将当前线程的threadId从看门狗缓存中移除,后续在执行锁续约时,如果发现看门狗缓存中已经没有了当前线程threadId,则直接退出锁续约 并且 不再延时10s开启一个定时任务。
如果加锁时指定了leaseTime > 0,则不会开门狗机制,表示强制锁leaseTime 毫秒后过期。一共有三种加锁方式可以做到,如下:
- RLock#lock(long leaseTime, TimeUnit unit)
- RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
- RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
再次回到加锁的LUA脚本:
同一个线程对分布式锁多次加锁时,会走以下逻辑:
对应的Redis命令为:
hexists test_lock UUID:ThreadId hincrby test_lock UUID:ThreadId 1 pexpire test_lock 30000
此时Redis中test_key对应的数据结构从
test_lock : { UUID:ThreadId:1 }
变成:
test_lock : { UUID:ThreadId:2 }
并将key的过期时间重新设置为30s。
锁重入成功之后,后台也会开启⼀个watchdog后台线程做锁续约,每隔10s检查⼀下key,如果key存在就将key的过期时间重新设置为30s。
Redisson可重⼊加锁的语义,实际是通过Hash结构的key中某个线程(UUID:ThreadId)对应的加锁次数来表示的。
再再次回到加锁的LUA脚本:
如果分布式锁已经被其他线程持有,LUA脚本会执行以下逻辑:
对应的Redis的命令为:
pttl test_lock
针对加锁方式的不同,加锁失败的逻辑也不同;可以分两大类:指定了加锁失败的等待时间waitTime和未指定waitTime。
可以简单的概述为RLock接口下的tryLock()方法获取锁会失败,lock()方法获取锁一定会成功。
以Rlock#lock()方法为例:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } CompletableFuturefuture = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { // lock() 或 lockInterruptibly()为入口走到这里时。leaseTime为-1,表示会开始开门狗;如果leaseTime大于0,则不会开启开门狗; ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { // 因为Semaphore的可用资源为0,所以这里就等价于Thread.sleep(ttl); entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); } }
首先订阅解锁channel(命名格式:redisson_lock__channel:{keyName}),其他线程解锁后,会发布解锁的消息;这里收到消息会立即尝试获取锁;订阅解锁channel的超时时间默认为7.5s。也就说获取锁失败7.5s之内,如果其他线程释放锁,当前线程可以立即尝试获取到锁。
获取锁失败之后会进⼊⼀个while死循环中:
- 每休息锁的存活时间ttl之后,就尝试去获取锁,直到成功获取到锁才会跳出while死循环。
以Rlock#tryLock(long waitTime, TimeUnit unit)为例:
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 获取锁剩余的等待时长 time -= System.currentTimeMillis() - current; if (time <= 0) { // 获取锁超时,返回获取分布式锁失败 acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); CompletableFuturesubscribeFuture = subscribe(threadId); try { // 订阅解锁channel的超时时长为 获取锁剩余的等待时长 subscribeFuture.get(time, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { if (!subscribeFuture.completeExceptionally(new RedisTimeoutException( "Unable to acquire subscription lock after " + time + "ms. " + "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) { subscribeFuture.whenComplete((res, ex) -> { if (ex == null) { unsubscribe(res, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } catch (ExecutionException e) { acquireFailed(waitTime, unit, threadId); return false; } try { // 收到解锁channel的消息之后,走到这里,再次判断获取锁等待时长是否超时 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // while循环中尝试去获取锁 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { // 如果获取锁失败后,锁存活时长 小于 剩余锁等待时长,则线程睡眠 锁存活时长 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { // 如果获取锁失败后,锁存活时间 大于等于 剩余锁等待时长,则线程睡眠 锁等待时长 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); } }
加锁存在超时时间 相比于 一直重试直到加锁成功,只是多一个时间限制,具体差异体现在:订阅解锁channel的超时时长、获取锁失败后线程的睡眠时长、重试获取锁次数的限制;
- 获取分布式锁失败之后,立即判断当前获取锁是否超时,如果超时,则返回加锁失败;
- 否者,订阅解锁channel(命名格式:redisson_lock__channel:{keyName}),其他线程解锁后,会发布解锁的消息;订阅解锁channel的超时时间为 获取锁剩余的等待时长。
- 在这个时间范围之内,如果其他线程释放锁,当前线程收到解锁channel的消息之后再次判断获取锁是否超时,如果不超时,尝试获取锁。
- 获取锁之后会进⼊⼀个while死循环中:
- 如果获取锁超时,则返回加锁失败;
- 否者让线程睡眠:
- 如果锁存活时长ttl 小于 剩余锁等待时长,则线程睡眠 锁存活时长;
- 如果锁存活时间ttl 大于等于 剩余锁等待时长,则线程睡眠 锁等待时长;
- 线程睡眠完之后,判断获取锁是否超时,不超时则尝试去获取锁。
进入到Rlock#unlock()方法;
和加锁的方式⼀样,释放锁也是通过lua脚本来完成的;
LUA脚本参数解析:
- KEYS[1] 表示的是 getName() ,代表的是锁名 test_lock;
- KEYS[2] 表示getChanelName() 表示的是发布订阅过程中使用的Chanel;
- ARGV[1] 表示的是LockPubSub.unLockMessage,解锁消息,实际代表的是数字 0,代表解锁消息;
- ARGV[2] 表示的是internalLockLeaseTime 默认的有效时间 30s;
- ARGV[3] 表示的是 getLockName(thread.currentThread().getId()) 代表的是 UUID:ThreadId 用锁对象id+线程id, 表示当前访问线程,用于区分不同服务器上的线程。
LUA脚本逻辑:
- 如果锁名称不存在;
- 可能是因为锁过期导致锁不存在,也可能是并发解锁。
- 则发布锁解除的消息,返回1,lua脚本执行完毕;
- 如果锁存在,检测当前线程是否持有锁;
- 如果是当前线程持有锁,定义变量counter,接收执行incrby将该线程重入的次数–的结果;
- 如果重入次数大于0,表示该线程还有其他任务需要执行;重新设置锁的过期时间;返回0,lua脚本执行完毕;
- 否则表示该线程执行结束,del删除该锁;并且publish发布该锁解除的消息;返回1,lua脚本执行完毕;
- 如果不是当前线程持有锁 或 其他情况,都返回nil,lua脚本执行完毕。
脚本执行结束之后,如果返回值不是0或1,即当前线程去释放其他线程的加锁时,抛出异常。
通过LUA脚本释放锁成功之后,会将看门狗杀死;
forceUnlockAsync()方法被调用的地方很多,大多都是在清理资源时删除锁。
@Override public RFutureforceUnlockAsync() { cancelExpirationRenewal(null); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); }
LUA脚本逻辑:
如果Redisson客户端刚加锁成功,并且未指定releaseTime,后台会启动一个定时任务watchdog每隔10s检查key:key如果存在就为它⾃动续命到30s;在watchdog定时任务存在的情况下,如果不是主动释放锁,那么key将会⼀直的被watchdog这个定时任务维持加锁。
但是如果客户端宕机了,定时任务watchdog也就没了,也就没有锁续约机制了,那么过完30s之后,key会⾃动被删除、key对应的锁也自动被释放了。
如果在加锁时指定了leaseTime,加锁成功之后,后台并不会启动一个定时任务watchdog做锁续约;key存活leaseTime 毫秒之后便会自动被删除、key对应的锁也就自动被释放了;无论当前线程的业务逻辑是否执行完毕。
比如使用如下方式加锁:
上一篇:MySQL SQL 注入