前言
在某些场景中,多个进程必须以互斥的方式独占共享资源,这时用分布式锁是最直接有效的。
随着技术快速发展,数据规模增大,分布式系统越来越普及,一个应用往往会部署在多台机器上(多节点),在有些场景中,为了保证数据不重复,要求在同一时刻,同一任务只在一个节点上运行,即保证某一方法同一时刻只能被一个线程执行。在单机环境中,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。而在多机部署环境中,不同机器不同进程,就需要在多进程下保证线程的安全性了。因此,分布式锁应运而生。
以往的工作中看到或用到几种实现方案,有基于zk的,也有基于redis的。由于实现上逻辑不严谨,线上时不时会爆出几个死锁case。那么,究竟什么样的分布式锁实现,才算是比较好的方案?
常见分布式锁方案对比
分类 | 方案 | 实现原理 | 优点 | 缺点 |
基于数据库 | 基于mysql 表唯一索引 | 1.表增加唯一索引 2.加锁:执行insert语句,若报错,则表明加锁失败 3.解锁:执行delete语句 |
完全利用DB现有能力,实现简单 | 1.锁无超时自动失效机制,有死锁风险 2.不支持锁重入,不支持阻塞等待 3.操作数据库开销大,性能不高 |
基于MongoDB findAndModify原子操作 | 1.加锁:执行findAndModify原子命令查找document,若不存在则新增 2.解锁:删除document |
实现也很容易,较基于MySQL唯一索引的方案,性能要好很多 | 1.大部分公司数据库用MySQL,可能缺乏相应的MongoDB运维、开发人员 2.锁无超时自动失效机制 |
|
基于分布式协调系统 | 基于ZooKeeper | 1.加锁:在/lock目录下创建临时有序节点,判断创建的节点序号是否最小。若是,则表示获取到锁;否,则则watch /lock目录下序号比自身小的前一个节点 2.解锁:删除节点 |
1.由zk保障系统高可用 2.Curator框架已原生支持系列分布式锁命令,使用简单 |
需单独维护一套zk集群,维保成本高 |
基于缓存 | 基于redis命令 | 1. 加锁:执行setnx,若成功再执行expire添加过期时间 2. 解锁:执行delete命令 |
实现简单,相比数据库和分布式系统的实现,该方案最轻,性能最好 | 1.setnx和expire分2步执行,非原子操作;若setnx执行成功,但expire执行失败,就可能出现死锁 2.delete命令存在误删除非当前线程持有的锁的可能 3.不支持阻塞等待、不可重入 |
基于redis Lua脚本能力 | 1. 加锁:执行SET lock_name random_value EX seconds NX 命令
2. 解锁:执行Lua脚本,释放锁时验证random_value if redis.call(“get”, KEYS[1]) == ARGV[1] then return redis.call(“del”,KEYS[1]) else return 0 end |
同上;实现逻辑上也更严谨,除了单点问题,生产环境采用用这种方案,问题也不大。 | 不支持锁重入,不支持阻塞等待 |
表格中对比了几种常见的方案,redis+lua基本可应付工作中分布式锁的需求。然而,当偶然看到redisson分布式锁实现方案(传送门),相比以上方案,redisson保持了简单易用、支持锁重入、支持阻塞等待、Lua脚本原子操作,不禁佩服作者精巧的构思和高超的编码能力。下面就来学习下redisson这个牛逼框架,是怎么实现的。
分布式锁需满足四个条件
首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。
- 具有容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。
Redisson分布式锁的实现
Redisson 分布式重入锁用法
Redisson 支持单点模式、主从模式、哨兵模式、集群模式,这里以单点模式为例:
// 1.构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:5379").setPassword("123456").setDatabase(0);
// 2.构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3.获取锁对象实例(无法保证是按线程的顺序获取到)
RLock rLock = redissonClient.getLock(lockKey);
try {
/**
* 4.尝试获取锁
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
*/
boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
if (res) {
//成功获得锁,在这里处理业务
}
} catch (Exception e) {
throw new RuntimeException("aquire lock fail");
}finally{
//无论如何, 最后都要解锁
rLock.unlock();
}
redisson这个框架重度依赖了Lua脚本和Netty,代码很牛逼,各种Future及FutureListener的异步、同步操作转换。
自己先思考下,如果要手写一个分布式锁组件,怎么做?肯定要定义2个接口:加锁、解锁;大道至简,redisson的作者就是在加锁和解锁的执行层面采用Lua脚本,逼格高,而且重要有原子性保证啊。当然,redisson的作者毕竟牛逼,加锁和解锁过程中还巧妙地利用了redis的发布订阅功能,后面会讲到。下面先对加锁和解锁Lua脚本了解下。
加锁&解锁Lua脚本
加锁、解锁Lua脚本是redisson分布式锁实现最重要的组成部分。首先不看代码,先研究下Lua脚本都是什么逻辑
1、加锁Lua脚本
- 脚本入参
参数 | 示例值 | 含义 |
KEY个数 | 1 | KEY个数 |
KEYS[1] | my_first_lock_name | 锁名 |
ARGV[1] | 60000 | 持有锁的有效时间:毫秒 |
ARGV[2] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID |
- 脚本内容
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);
- 脚本解读
-
-
Q:返回nil、返回剩余过期时间有什么目的?
A:当且仅当返回nil,才表示加锁成功;客户端需要感知加锁是否成功的结果2、解锁Lua脚本
- 脚本入参
参数 示例值 含义 KEY个数 2 KEY个数 KEYS[1] my_first_lock_name 锁名 KEYS[2] redisson_lock__channel:{my_first_lock_name} 解锁消息PubSub频道 ARGV[1] 0 redisson定义0表示解锁消息 ARGV[2] 30000 设置锁的过期时间;默认值30秒 ARGV[3] 58c62432-bb74-4d14-8a00-9908cc8b828f:1 唯一标识;同加锁流程 - 脚本内容
-- 若锁不存在:则直接广播解锁消息,并返回1 if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end; -- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; -- 若锁存在,且唯一标识匹配:则先将锁重入计数减1 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期 redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程 redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
- 脚本解读
-
Q1:广播解锁消息有什么用?
A:是为了通知其他争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁。Q2:返回值0、1、nil有什么不一样?
A:当且仅当返回1,才表示当前请求真正触发了解锁Lua脚本;但客户端又并不关心解锁请求的返回值,好像没什么用?源码搞起
1、加锁流程源码
读加锁源码时,可以把
tryAcquire(leaseTime, unit, threadId)
方法直接视为执行加锁Lua脚本。直接进入org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)
源码@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 获取锁能容忍的最大等待时长 long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); // 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间 time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); // 【核心点2】订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage /** * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题: * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争 * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败 * 当 this.await返回true,进入循环尝试获取锁 */ final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); //await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future) if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } // 订阅成功 try { // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间 time -= (System.currentTimeMillis() - current); if (time <= 0) { // 超出可容忍的等待时长,直接返回获取锁失败 acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); // 尝试获取锁;如果锁被其他线程占用,就返回锁剩余过期时间【同上】 ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); // 【核心点3】根据锁TTL,调整阻塞等待时长; // 注意:这里实现非常巧妙,1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁; //2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。 if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { // 取消解锁消息的订阅 unsubscribe(subscribeFuture, threadId); } }
接下的再获取锁方法 tryAcquire的实现,真的就是执行Lua脚本!
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { // tryAcquireAsync异步执行Lua脚本,get方法同步获取返回结果 return get(tryAcquireAsync(leaseTime, unit, threadId)); } // 见org.redisson.RedissonLock#tryAcquireAsync private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { // 实质是异步执行加锁Lua脚本 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { //先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果 if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired //如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } // 见org.redisson.RedissonLock#tryLockInnerAsync <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
加锁过程小结
1、锁其实也是一种资源,各线程争抢锁操作对应到redisson中就是争抢着去创建一个hash结构,谁先创建就代表谁获得锁;hash的名称为锁名,hash里面内容仅包含一条键值对,键为redisson客户端唯一标识+持有锁线程id,值为锁重入计数;给hash设置的过期时间就是锁的过期时间。放个图直观感受下:
-
2、加锁流程核心就3步
Step1:尝试获取锁,这一步是通过执行加锁Lua脚本来做;
Step2:若第一步未获取到锁,则去订阅解锁消息,当获取锁到剩余过期时间后,调用信号量方法阻塞住,直到被唤醒或等待超时
Step3:一旦持有锁的线程释放了锁,就会广播解锁消息。于是,第二步中的解锁消息的监听器会释放信号量,获取锁被阻塞的那些线程就会被唤醒,并重新尝试获取锁。比如 RedissonLock中的变量internalLockLeaseTime,默认值是30000毫秒,还有调用tryLockInnerAsync()传入的一个从连接管理器获取的getLockWatchdogTimeout(),他的默认值也是30000毫秒,这些都和redisson官方文档所说的watchdog机制有关,看门狗,还是很形象的描述这一机制,那么看门狗到底做了什么,为什么这么做,来看下核心代码.
先思考一个问题,假设在一个分布式环境下,多个服务实例请求获取锁,其中服务实例1成功获取到了锁,在执行业务逻辑的过程中,服务实例突然挂掉了或者hang住了,那么这个锁会不会释放,什么时候释放?回答这个问题,自然想起来之前我们分析的lua脚本,其中第一次加锁的时候使用pexpire给锁key设置了过期时间,默认30000毫秒,由此来看如果服务实例宕机了,锁最终也会释放,其他服务实例也是可以继续获取到锁执行业务。但是要是30000毫秒之后呢,要是服务实例1没有宕机但是业务执行还没有结束,所释放掉了就会导致线程问题,这个redisson是怎么解决的呢?这个就一定要实现自动延长锁有效期的机制。
异步执行完lua脚本执行完成之后,设置了一个监听器,来处理异步执行结束之后的一些工作。在操作完成之后会去执行operationComplete方法,先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果,如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal,回想一下之前的lua脚本,当加锁逻辑
处理结束,返回了一个nil;如此说来 就一定会走定时任务了。来看下定时调度scheduleExpirationRenewal代码private void scheduleExpirationRenewal(final long threadId) { if (expirationRenewalMap.containsKey(getEntryName())) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (!future.isSuccess()) { log.error("Can't update lock " + getName() + " expiration", future.cause()); return; } if (future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); } } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { task.cancel(); } }
首先,会先判断在expirationRenewalMap中是否存在了entryName,这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,如果已经存在了,就直接返回;第一次加锁,肯定是不存在的,接下来就是搞了一个TimeTask,延迟internalLockLeaseTime/3之后执行,这里就用到了文章一开始就提到奇妙的变量,算下来就是大约10秒钟执行一次,调用了一个异步执行的方法
如图也是调用异步执行了一段lua脚本,首先判断这个锁key的map结构中是否存在对应的key8a9649f5-f5b5-48b4-beaa-d0c24855f9ab:anyLock:1,如果存在,就直接调用pexpire命令设置锁key的过期时间,默认30000毫秒。
OK,现在思路就清晰了,在上面任务调度的方法中,也是异步执行并且设置了一个监听器,在操作执行成功之后,会回调这个方法,如果调用失败会打一个错误日志并返回,更新锁过期时间失败;然后获取异步执行的结果,如果为true,就会调用本身,如此说来又会延迟10秒钟去执行这段逻辑,所以,这段逻辑在你成功获取到锁之后,会每隔十秒钟去执行一次,并且,在锁key还没有失效的情况下,会把锁的过期时间继续延长到30000毫秒,也就是说只要这台服务实例没有挂掉,并且没有主动释放锁,看门狗都会每隔十秒给你续约一下,保证锁一直在你手中。完美的操作。
到现在来说,加锁,锁自动延长过期时间,都OK了,然后就是说在你执行业务,持有锁的这段时间,别的服务实例来尝试加锁又会发生什么情况呢?或者当前客户端的别的线程来获取锁呢?很显然,肯定会阻塞住,我们来通过代码看看是怎么做到的。还是把眼光放到之前分析的那段加锁lua代码上,当加锁的锁key存在的时候并且锁key对应的map结构中当前客户端的唯一key也存在时,会去调用hincrby命令,将唯一key的值自增一,并且会pexpire设置key的过期时间为30000毫秒,然后返回nil,可以想象这里也是加锁成功的,也会继续去执行定时调度任务,完成锁key过期时间的续约,这里呢,就实现了锁的可重入性。
那么当以上这种情况也没有发生呢,这里就会直接返回当前锁的剩余有效期,相应的也不会去执行续约逻辑。此时一直返回到上面的方法,如果加锁成功就直接返回;否则就会进入一个死循环,去尝试加锁,并且也会在等待一段时间之后一直循环尝试加锁,阻塞住,知道第一个服务实例释放锁。对于不同的服务实例尝试会获取一把锁,也和上面的逻辑类似,都是这样实现了锁的互斥。紧接着,我们来看看锁释放的逻辑,其实也很简单,调用了lock.unlock()方法,跟着代码走流程发现,也是异步调用了一段lua脚本,lua脚本,应该就比较清晰,也就是通过判断锁key是否存在,如果不存在直接返回;否则就会判断当前客户端对应的唯一key的值是否存在,如果不存在就会返回nil;否则,值自增-1,判断唯一key的值是否大于零,如果大于零,则返回0;否则删除当前锁key,并返回1;返回到上一层方法,也是针对返回值进行了操作,如果返回值是1,则会去取消之前的定时续约任务,如果失败了,则会做一些类似设置状态的操作,这一些和解锁逻辑也没有什么关系,可以不去看他。
解锁流程源码
解锁流程相对比较简单,完全就是执行解锁Lua脚本,无额外的代码逻辑,直接看
org.redisson.RedissonLock#unlock
代码@Override public void unlock() { // 执行解锁Lua脚本,这里传入线程id,是为了保证加锁和解锁是同一个线程,避免误解锁其他线程占有的锁 Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } } // 见org.redisson.RedissonLock#unlockInnerAsync protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
c.加锁&解锁流程串起来
上面结合Lua脚本和源码,分别分析了加锁流程和解锁流程。下面升级下挑战难度,模拟下多个线程争抢锁会是怎样的流程。示意图如下,比较关键的三处已用红色字体标注。
概括下整个流程
1、线程A和线程B两个线程同时争抢锁。线程A很幸运,最先抢到了锁。线程B在获取锁失败后,并未放弃希望,而是主动订阅了解锁消息,然后再尝试获取锁,顺便看看没有抢到的这把锁还有多久就过期,线程B就按需阻塞等锁释放。
2、线程A拿着锁干完了活,自觉释放了持有的锁,于此同时广播了解锁消息,通知其他抢锁的线程再来枪;
3、解锁消息的监听者LockPubSub收到消息后,释放自己持有的信号量;线程B就瞬间从阻塞中被唤醒了,接着再抢锁,这次终于抢到锁了!后面再按部就班,干完活,解锁
其他料
Q1:订阅频道名称(如:
redisson_lock__channel:{my_first_lock_name}
)为什么有大括号?
A:
1.在redis集群方案中,如果Lua脚本涉及多个key的操作,则需限制这些key在同一个slot中,才能保障Lua脚本执行的原子性。否则运行会报错Lua script attempted to access a non local key in a cluster node . channel
;
2.HashTag是用{}
包裹key的一个子串,若设置了HashTag,集群会根据HashTag决定key分配到哪个slot;HashTag不支持嵌套,只有第一个左括号{
和第一个右括号}
里面的内容才当做HashTag参与slot计算;通常,客户端都会封装这个计算逻辑。// 见org.redisson.cluster.ClusterConnectionManager#calcSlot @Override public int calcSlot(String key) { if (key == null) { return 0; } int start = key.indexOf('{'); if (start != -1) { int end = key.indexOf('}'); key = key.substring(start+1, end); } int result = CRC16.crc16(key.getBytes()) % MAX_SLOT; log.debug("slot {} for {}", result, key); return result; }
3.在解锁Lua脚本中,操作了两个key:一个是锁名
my_lock_name
,一个是解锁消息发布订阅频道redisson_lock__channel:{my_first_lock_name}
,按照上面slot计算方式,两个key都会按照内容my_first_lock_name
来计算,故能保证落到同一个slotQ2:redisson代码几乎都是以Lua脚本方式与redis服务端交互,如何跟踪这些脚本执行过程?
A:启动一个redis客户端终端,执行monitor
命令以便在终端上实时打印 redis 服务器接收到的命令;然后debug执行redisson加锁/解锁测试用例,即可看到代码运行过程中实际执行了哪些Lua脚本eg:上面整体流程示意图的测试用例位:
@RunWith(SpringRunner.class) @SpringBootTest public class RedissonDistributedLockerTest { private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class); @Resource private DistributedLocker distributedLocker; private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor(); private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor(); @Test public void tryLockUnlockCost() throws Exception { StopWatch stopWatch = new StopWatch("加锁解锁耗时统计"); stopWatch.start(); for (int i = 0; i < 10000; i++) { String key = "mock-key:" + UUID.randomUUID().toString().replace("-", ""); Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000); Assert.assertTrue(optLocked.isPresent()); optLocked.get().unlock(); } stopWatch.stop(); log.info(stopWatch.prettyPrint()); } @Test public void tryLock() throws Exception { String key = "mock-key:" + UUID.randomUUID().toString().replace("-", ""); Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000); Assert.assertTrue(optLocked.isPresent()); Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000); Assert.assertTrue(optLocked2.isPresent()); optLocked.get().unlock(); } /** * 模拟2个线程争抢锁:A先获取到锁,A释放锁后,B再获得锁 */ @Test public void tryLock2() throws Exception { String key = "mock-key:" + UUID.randomUUID().toString().replace("-", ""); CountDownLatch countDownLatch = new CountDownLatch(1); Future<Optional<LockResource>> submit = executorServiceB.submit(() -> { countDownLatch.await(); log.info("B尝试获得锁:thread={}", currentThreadId()); return distributedLocker.tryLock(key, 600000, 600000); } ); log.info("A尝试获得锁:thread={}", currentThreadId()); Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000); Assert.assertTrue(optLocked.isPresent()); log.info("A已获得锁:thread={}", currentThreadId()); countDownLatch.countDown(); optLocked.get().unlock(); log.info("A已释放锁:thread={}", currentThreadId()); Optional<LockResource> lockResource2 = submit.get(); Assert.assertTrue(lockResource2.isPresent()); executorServiceB.submit(() -> { log.info("B已获得锁:thread={}", currentThreadId()); lockResource2.get().unlock(); log.info("B已释放锁:thread={}", currentThreadId()); }); } /** * 模拟3个线程争抢锁:A先获取到锁,A释放锁后,B和C同时争抢锁 */ @Test public void tryLock3() throws Exception { String key = "mock-key:" + UUID.randomUUID().toString().replace("-", ""); log.info("A尝试获得锁:thread={}", currentThreadId()); Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000); if (optLocked.isPresent()) { log.info("A已获得锁:thread={}", currentThreadId()); } Assert.assertTrue(optLocked.isPresent()); CyclicBarrier cyclicBarrier = new CyclicBarrier(2); Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> { cyclicBarrier.await(); log.info("B尝试获得锁:thread={}", currentThreadId()); return distributedLocker.tryLock(key, 600000, 600000); } ); Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> { cyclicBarrier.await(); log.info("C尝试获得锁:thread={}", currentThreadId()); return distributedLocker.tryLock(key, 600000, 600000); } ); optLocked.get().unlock(); log.info("A已释放锁:thread={}", currentThreadId()); CountDownLatch countDownLatch = new CountDownLatch(2); executorServiceB.submit(() -> { log.info("B已获得锁:thread={}", currentThreadId()); try { submitB.get().get().unlock(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } log.info("B已释放锁:thread={}", currentThreadId()); countDownLatch.countDown(); }); executorServiceC.submit(() -> { log.info("C已获得锁:thread={}", currentThreadId()); try { submitC.get().get().unlock(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } log.info("C已释放锁:thread={}", currentThreadId()); countDownLatch.countDown(); }); countDownLatch.await(); } private static Long currentThreadId() { return Thread.currentThread().getId(); } @Test public void tryLockWaitTimeout() throws Exception { String key = "mock-key:" + UUID.randomUUID().toString(); Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000); Assert.assertTrue(optLocked.isPresent()); Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> { long now = System.currentTimeMillis(); Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10); long cost = System.currentTimeMillis() - now; log.info("cost={}", cost); return optLockedAgain; }).exceptionally(th -> { log.error("Exception: ", th); return Optional.empty(); }).join(); Assert.assertTrue(!optLockResource.isPresent()); } @Test public void tryLockWithLeaseTime() throws Exception { String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString(); Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000); Assert.assertTrue(optLocked.isPresent()); // 可重入 Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000); Assert.assertTrue(optLockedAgain.isPresent()); } /** * 模拟1000个并发请求枪一把锁 */ @Test public void tryLockWithLeaseTimeOnMultiThread() throws Exception { int totalThread = 1000; String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString(); AtomicInteger tryAcquireLockTimes = new AtomicInteger(0); AtomicInteger acquiredLockTimes = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(totalThread); for (int i = 0; i < totalThread; i++) { executor.submit(new Runnable() { @Override public void run() { tryAcquireLockTimes.getAndIncrement(); Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000); if (optLocked.isPresent()) { acquiredLockTimes.getAndIncrement(); } } }); } executor.awaitTermination(15, TimeUnit.SECONDS); Assert.assertTrue(tryAcquireLockTimes.get() == totalThread); Assert.assertTrue(acquiredLockTimes.get() == 1); } @Test public void tryLockWithLeaseTimeOnMultiThread2() throws Exception { int