Redission tryLock

Redisson 的 tryLock

1
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)

在 waitTime 时间范围内尝试获取锁, 如果获取到锁, 设置过期时间 leaseTime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class RedissonLock extends RedissonBaseLock {
// 在waitTime时间范围内尝试获取锁,如果获取到锁,则设置锁过期时间leaseTime
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 第一步:尝试获取锁
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// ttl为空说明获取到了锁
if (ttl == null) {
return true;
}

// 判断尝试获取锁是否超过waitTime
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// 第二步:订阅解锁消息通知
current = System.currentTimeMillis();
// 订阅锁释放
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 等待锁释放消息,等待时间超过waitTime,获取锁失败
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
// 如果订阅解锁Future在执行中,等任务执行完后取消订阅锁释放
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
// 取消订阅解锁通知
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}

try {
// 判断尝试获取锁是否超过waitTime
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// 第三步:自旋尝试获取锁
while (true) {
long currentTime = System.currentTimeMillis();
// 1、尝试获取锁(下文会详细解析此方法)
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// ttl为空说明获取到了锁
if (ttl == null) {
return true;
}

// 判断尝试获取锁是否超过waitTime
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// 等待锁释放(信号量控制)
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 尝试获取信号量
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

// 判断尝试获取锁是否超过waitTime
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 第四步:取消解锁订阅
unsubscribe(subscribeFuture, threadId);
}
}
}

主要分为以下四步

  • tryAcquire 尝试获取锁, 获取到返回 true
  • 获取不到说明锁被占用了, 订阅结果消息通知
  • 收到消息解锁通知, 自旋获取锁, 直到 waitTime 获取锁失败
  • 不论是否获取锁成功, 取消解锁消息订阅

tryAcquire 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 这里需要注意的是leaseTime == -1,会触发redisson看门狗机制
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

// 获取锁成功
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 锁自动续时(看门狗机制)触发条件leaseTime == -1
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}

tryLockInnerAsync 里面是尝试获取分布式锁的 lua 脚本
scheduleExpirationRenewal 锁自动续时, 看门狗机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// 如果key一开始就不存在,则直接创建一个key
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 这里是重入锁的实现,同一个线程多次获取锁只需要在value加1即可,value相当于一个加锁计数器
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 有其他线程持有锁,加锁失败,返回锁过期时间
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
  1. 加锁的 key 不存在就创建一个 redis hash key, field(当前线程 id), value(加锁次数)
  2. 有线程持有锁并且未解锁, 其他线程无法获取到锁
  3. 加锁成功返回 null, 加锁失败返回过期时间

锁过期时间自动续费

  1. 锁过期自动续费的触发条件为 tryLock 设置的锁到期时间为-1
  2. 自动续费的原理是创建一个定时任务, 每 internalLockLeaseTime / 3 时触发一次, 如果发现持有锁未释放, 把锁过期时间更新为 internalLockLeaseTime(默认为 30s)
  3. 锁过期时间更新后, 再次递归调用 renewExpiration 创建下一次定时任务

前面 tryLock 方法的订阅解锁消息通知是在 unlock 的时候发起的
unlockAsync 方法内部调用 lua 脚本, 调用 publish 推送解锁消息

Redis 的 publish

1
publish channel message
  • channel 指定要发布消息的频道
  • message 要发布的消息内容
    在 tryLock 和 unLock 中, 他们的 channel 是线程 id

Redission tryLock
http://showyoubug.cn/2024/05/28/Redission-tryLock/
作者
Dong Su
发布于
2024年5月28日
许可协议