Redis实现分布式锁

这句话基本上是正确的,但是需要一些小的澄清和补充以更准确地描述分布式锁和Redis在其中的作用。

分布式锁

锁的作用域和分布式环境

确实,传统的Java锁(如synchronized关键字或java.util.concurrent.locks包中的锁)设计用于单个JVM进程内多线程之间的同步。这些锁不能直接用于多个JVM进程之间(即分布式环境中)的同步,因为每个进程拥有自己的内存空间和锁状态,它们之间无法共享这种状态。

Redis实现分布式锁

引入Redis来实现分布式锁是一种流行的做法,确实可以利用Redis的某些特性来实现跨多个进程的同步。Redis的操作通常是原子性的,这意味着每个命令在执行过程中不会被其他命令中断。加之Redis是单线程的,保证了在任何给定时间内只执行一个命令,这为实现分布式锁提供了基础。

Redis单线程的特点

虽然Redis是单线程的,但是“利用Redis单线程的特点来实现分布式锁”这句话可能会造成一些误解。实现分布式锁的关键不仅仅在于Redis的单线程模型,而更在于它提供的命令可以被用来原子性地创建锁。例如,SET key value NX PX milliseconds命令可以原子性地设置一个值,如果这个键不存在的话,并设置键的过期时间。这意味着只有第一个尝试设置该键的进程/线程可以成功获得锁,而其他进程/线程则因为NX条件(键必须不存在)而失败,从而实现了锁的效果。

Redis实现分布式锁

Redis分布式锁最简单的实现

想要实现分布式锁,必须要求 Redis 有「互斥」的能力,我们可以使用 SETNX 命令,这个命令表示SET if Not Exists,即如果 key 不存在,才会设置它的值,否则什么也不做。

两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。

客户端 1 申请加锁,加锁成功:

客户端 2 申请加锁,因为它后到达,加锁失败:

image.png

此时,加锁成功的客户端,就可以去操作「共享资源」,例如,修改 MySQL 的某一行数据,或者调用一个 API 请求。

操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?

也很简单,直接使用 DEL 命令删除这个 key 即可,这个逻辑非常简单。

image.png

但是,它存在一个很大的问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成「死锁」:

1、程序处理业务逻辑异常,没及时释放锁

2、进程挂了,没机会释放锁

这时,这个客户端就会一直占用这个锁,而其它客户端就「永远」拿不到这把锁了。怎么解决这个问题呢?

如何避免死锁?

我们很容易想到的方案是,在申请锁时,给这把锁设置一个「租期」。

在 Redis 中实现时,就是给这个 key 设置一个「过期时间」。这里我们假设,操作共享资源的时间不会超过 10s,那么在加锁时,给这个 key 设置 10s 过期即可:

1
2
SETNX lock 1    // 加锁
EXPIRE lock 10 // 10s后自动过期

image.png

这样一来,无论客户端是否异常,这个锁都可以在 10s 后被「自动释放」,其它客户端依旧可以拿到锁。

但现在还是有问题:

现在的操作,加锁、设置过期是 2 条命令,有没有可能只执行了第一条,第二条却「来不及」执行的情况发生呢?例如:

  • SETNX 执行成功,执行EXPIRE 时由于网络问题,执行失败
  • SETNX 执行成功,Redis 异常宕机,EXPIRE 没有机会执行
  • SETNX 执行成功,客户端异常崩溃,EXPIRE也没有机会执行

总之,这两条命令不能保证是原子操作(一起成功),就有潜在的风险导致过期时间设置失败,依旧发生「死锁」问题。

在 Redis 2.6.12 之后,Redis 扩展了 SET 命令的参数,用这一条命令就可以了:

1
SET lock 1 EX 10 NX

image.png

锁被别人释放怎么办?

上面的命令执行时,每个客户端在释放锁时,都是「无脑」操作,并没有检查这把锁是否还「归自己持有」,所以就会发生释放别人锁的风险,这样的解锁流程,很不「严谨」!如何解决这个问题呢?

解决办法是:客户端在加锁时,设置一个只有自己知道的「唯一标识」进去。

例如,可以是自己的线程 ID,也可以是一个 UUID(随机且唯一),这里我们以UUID 举例:

1
SET lock $uuid EX 20 NX

之后,在释放锁时,要先判断这把锁是否还归自己持有,伪代码可以这么写:

1
2
if redis.get("lock") == $uuid:
redis.del("lock")

这里释放锁使用的是 GET + DEL 两条命令,这时,又会遇到我们前面讲的原子性问题了。这里可以使用lua脚本来解决。

原子性问题锁误释放锁

如果操作不是原子性的,那么在检查锁的值和实际进行删除之间的时间窗口内,锁可能已经被释放(例如因为超时)并被另一个客户端获取。这种情况下,一个客户端可能会错误地释放了属于另一个客户端的锁。

1
2
3
4
5
6
if redis.call("GET",KEYS[1]) == ARGV[1]
then
return redis.call("DEL",KEYS[1])
else
return 0
end

好了,这样一路优化,整个的加锁、解锁的流程就更「严谨」了。

这里我们先小结一下,基于 Redis 实现的分布式锁,一个严谨的的流程如下:

1、加锁

1
SET lock_key $unique_id EX $expire_time NX

2、操作共享资源

3、释放锁:Lua 脚本,先 GET 判断锁是否归属自己,再DEL 释放锁

代码

介绍代码前先介绍这几个类的作用

private ThreadLocal<String> lockerId = new ThreadLocal<>(); 这句代码在Java中声明了一个ThreadLocal类型的变量lockerId,用于存储与当前线程关联的String类型的数据。

ThreadLocal 是什么?

ThreadLocal是Java提供的一个线程局部变量工具,它可以为每个使用该变量的线程提供一个独立的变量副本。这意味着每个线程都可以独立地改变自己的副本,而不会影响到其他线程中的副本。这在进行多线程编程时是非常有用的,特别是当你需要避免因变量共享而导致的并发问题。

注意

虽然ThreadLocal提供了很好的线程隔离能力,但它也可能导致内存泄露问题,特别是在使用线程池时。因为线程池中的线程会被重用,如果不及时清理ThreadLocal变量,那么已经结束生命周期的对象可能会一直被ThreadLocal引用而无法被垃圾收集器回收。因此,使用ThreadLocal时应确保及时调用remove()方法来避免内存泄露。

AtomicReferenceLinkedBlockingQueue都是Java并发包中的类,用于处理多线程编程的不同方面。

AtomicReference

AtomicReference类位于java.util.concurrent.atomic包中,提供了一个可以原子读写的引用变量。这意味着对于任何单个操作,如赋值或者更新操作,AtomicReference保证操作的原子性,即在完成操作之前,不会被其他线程打断。

AtomicReference是用于对象引用的原子操作,并且常用于无锁设计或算法中,尤其是在需要对共享对象进行原子更新时。例如,你可能会使用AtomicReference来原子更新一个共享的配置对象,而不需要同步访问该对象。

LinkedBlockingQueue

LinkedBlockingQueue类位于java.util.concurrent包中,是一个基于已链接节点的、线程安全的阻塞队列。它实现了BlockingQueue接口。这种队列的元素遵循先进先出(FIFO)原则。LinkedBlockingQueue可以用作生产者消费者模式的基础,其中多个线程生产对象到队列中,而多个消费者线程从队列中取出并处理这些对象。

LinkedBlockingQueue的一个关键特性是,它在尝试添加元素到满队列或从空队列获取元素时,可以让线程阻塞(等待),直到操作可以成功进行。这样,它在处理多线程间的协作时非常有用。

对比

  • 用途不同AtomicReference主要用于实现无锁的线程安全操作,特别是对共享对象的原子更新。而LinkedBlockingQueue用于在生产者和消费者模式中,安全地在多个线程间传递数据。
  • 功能不同AtomicReference提供原子操作,以保证更新操作的原子性。LinkedBlockingQueue提供阻塞操作,以协调生产者和消费者之间的数据流。
  • 性能考虑:使用AtomicReference可能有助于减少锁的使用,从而在某些场景下提高性能。而LinkedBlockingQueue通过提供等待/通知机制,使得生产者和消费者能够有效地同步数据流,可能在不同的使用场景下提高性能。

两者都是解决并发问题的重要工具,选择使用哪一个取决于你的具体需求和应用场景。

分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
* @author roxanne_waar
* @date 2024/3/15 11:18
* @description 实现一个简易的redis分布式锁
*/
@Component
public class RedisDistributedLock implements Lock {
private final static String LOCK_NAME = "redis:lock";
private final static int EX_TIME = 5 ;

@Autowired
private JedisPool jedisPool;
private ThreadLocal<String> thread_id = new ThreadLocal<>();

private Thread owner_thread = null;


@Override
public void lock() {
try{
while(!tryLock()){
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}

}

@Override
public void lockInterruptibly() throws InterruptedException {

}

@Override
public boolean tryLock() {
Thread me = Thread.currentThread();
if(owner_thread == me){
//防止重入
return true;
}else if(owner_thread != null){
//本进程其余线程拥有锁
return false;
}

Jedis jedis = null;
//尝试获取锁
try{
jedis = jedisPool.getResource();
SetParams setParams = new SetParams();
setParams.nx().ex(EX_TIME);
String my_id = UUID.randomUUID().toString();

if(owner_thread == null && "OK".equals(jedis.set(LOCK_NAME,my_id,setParams))){
//获取到锁
thread_id.set(my_id);
owner_thread = me;
return true;
}else {
return false;
}

}catch (Exception e){
e.printStackTrace();
}finally {
jedis.close();
}
return false;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void unlock() {
Jedis resource = null;
try{
resource = jedisPool.getResource();
String unlock_lua = getLuaScript(resource);
Long res = 0L;
if(thread_id.get() != null){
res = (Long)resource.eval(unlock_lua,1,LOCK_NAME,thread_id.get());
}
if(res == 1) System.out.println("锁释放成功");
else System.out.println("锁释放失败");


} catch (Exception e){
e.printStackTrace();
}finally {

if(resource != null){
resource.close();
}

if(owner_thread == Thread.currentThread()){
thread_id.remove();
owner_thread = null;
System.out.println("本地锁所有权已经石房");
}

}

}

@Override
public Condition newCondition() {
return null;
}

public String getLuaScript(Jedis jedis) throws Exception {
File lua = null;
FileInputStream inputStream = null;
String res = "";
try{
lua = ResourceUtils.getFile("classpath:script/unlock.lua");
inputStream = new FileInputStream(lua);

byte[] buffer = new byte[1024 * 5];
int len = 0;
while((len = inputStream.read(buffer)) != -1){
res += new String(buffer,0,len);
}
}catch (Exception e){
e.printStackTrace();
}finally {
inputStream.close();
jedis.close();
}
return res;
}
}

lua脚本

1
2
3
4
5
6
7
8
local key = KEYS[1]
local id = ARGV[1]
local lock_id = redis.call("get",key)
if(id == lock_id) then
redis.call("del",key)
return 1
end
return 0

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void testLock1() throws Exception{
int a = 1;
CountDownLatch countDownLatch = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);

for (int i = 1; i <= 5; i++) {

service.submit(() -> {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "准备开始累加");
Thread.sleep(1000);
res++;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println(res);
}

加入看门狗优化

​ 上面的分布式锁可以简单的实现功能,但是在实际环境,我们不可能设置固定的过期时间。毕竟如果我们调用服务也就是获取锁和解锁之间的时间大于这个过期时间。会造成服务还没完,我们锁就被自动释放了导致线程安全问题。因此我们如果可以的话定期查询锁的状态,自动续期就好了。这个时候我们就会引入看门狗机制。

​ 加锁时,先设置一个过期时间,然后我们开启一个守护线程、,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间。这个守护线程我们一般也把它叫做看门狗线程。

​ 但是我们看门狗线程不做没用的轮序询问是否快过期,我们只需要使用DelayQueue延迟队列来实现,当快要过期之前我们就从队列中取出我们的守护进程进行延长过期时间。


分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/**
* @author roxanne_waar
* @date 2024/3/15 11:18
* @description 实现一个简易的redis分布式锁加上了看门狗
*/
@Component
public class RedisDistributedLockWithDog implements Lock {
private final static String LOCK_NAME = "redis:lock";
private final static long EX_TIME = 5 * 1000;

@Autowired
private JedisPool jedisPool;
private ThreadLocal<String> thread_id = new ThreadLocal<>();

private Thread owner_thread = null;
private Thread watch_dog = null;
private DelayQueue<DelayedElement> queue = new DelayQueue<>();


@Override
public void lock() {
try{
while(!tryLock()){
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}

}

@Override
public void lockInterruptibly() throws InterruptedException {

}

@Override
public boolean tryLock() {
Thread me = Thread.currentThread();
if(owner_thread == me){
//防止重入
return true;
}else if(owner_thread != null){
//本进程其余线程拥有锁
return false;
}

Jedis jedis = null;
//尝试获取锁
try{
jedis = jedisPool.getResource();
SetParams setParams = new SetParams();
setParams.nx().px(EX_TIME);
String my_id = UUID.randomUUID().toString();

if(owner_thread == null && "OK".equals(jedis.set(LOCK_NAME,my_id,setParams))){
//获取到锁
thread_id.set(my_id);
owner_thread = me;
if(watch_dog == null){
watch_dog = new Thread(new WatchDog(),"看门狗线程"+UUID.randomUUID().toString());
//设置看门狗为当前进程守护进程
watch_dog.setDaemon(true);
watch_dog.start();
}
//往延迟队列添加看门狗进程
queue.add(new DelayedElement(System.currentTimeMillis() + EX_TIME - 1000,LOCK_NAME,my_id));


return true;
}else {
return false;
}

}catch (Exception e){
e.printStackTrace();
}finally {
jedis.close();
}
return false;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void unlock() {
Jedis resource = null;
try{
resource = jedisPool.getResource();
String unlock_lua = unlock(resource);
Long res = 0L;
if(thread_id.get() != null){
res = (Long)resource.eval(unlock_lua,1,LOCK_NAME,thread_id.get());
}
if(res == 1) System.out.println("锁释放成功");
else System.out.println("锁释放失败");


} catch (Exception e){
e.printStackTrace();
}finally {

if(resource != null){
resource.close();
}

if(owner_thread == Thread.currentThread()){
thread_id.remove();
owner_thread = null;
System.out.println("本地锁所有权已经石房");
}

}

}

@Override
public Condition newCondition() {
return null;
}

public String unlock(Jedis jedis) throws Exception {
File lua = null;
FileInputStream inputStream = null;
String res = "";
try{
lua = ResourceUtils.getFile("classpath:script/unlock.lua");
inputStream = new FileInputStream(lua);

byte[] buffer = new byte[1024 * 5];
int len = 0;
while((len = inputStream.read(buffer)) != -1){
res += new String(buffer,0,len);
}

}catch (Exception e){
e.printStackTrace();
}finally {
inputStream.close();
jedis.close();
}
return res;
}

public String addExp(Jedis jedis) throws Exception {
File lua = null;
FileInputStream inputStream = null;
String res = "";
try{
lua = ResourceUtils.getFile("classpath:script/add_exp.lua");
inputStream = new FileInputStream(lua);

byte[] buffer = new byte[1024 * 5];
int len = 0;
while((len = inputStream.read(buffer)) != -1){
res += new String(buffer,0,len);
}

}catch (Exception e){
e.printStackTrace();
}finally {
inputStream.close();
jedis.close();
}
return res;
}

// 看门狗线程
private class WatchDog implements Runnable{

@Override
public void run() {
System.out.println("看门狗线程启动");
Jedis jedis = null;
//当监控线程还在运行的情况下
try {
jedis = jedisPool.getResource();
while(!Thread.currentThread().isInterrupted()){
try {
//阻塞式取
DelayedElement de = queue.take();
String key = de.getKey();
String value = de.getValue();

long result = (long) jedis.eval(addExp(jedis),1,key,value, String.valueOf(EX_TIME));
if(result == 0L){
System.out.println("看门狗加时失败,锁已被释放!");
}
else {
//往延迟队列添加看门狗进程
queue.add(new DelayedElement(System.currentTimeMillis() + EX_TIME - 1000,key,value));
System.out.println("看门狗"+ Thread.currentThread().getName()+"已经为进程加时!");
}

} catch (Exception e) {
System.out.println("看门狗被中断!");
break;
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
jedis.close();
}

System.out.println("看门狗线程准备关闭");
watch_dog = null;


}
}
@PreDestroy
public void closeExpireThread(){
if(null!=watch_dog){
watch_dog.interrupt();
}
}
}

延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* @author roxanne_waar
* @date 2024/3/15 17:12
* @description DelayedElement
*/
@AllArgsConstructor
@Data
public class DelayedElement implements Delayed {

// 截至时间
private long delayTime;
private String key;
private String value;
/**
* 返回元素到激活时刻的剩余时长
*/
public long getDelay(TimeUnit unit) {
long d = unit.convert(this.delayTime
- System.currentTimeMillis(),unit);
return d;
}

/**按剩余时长排序*/
public int compareTo(Delayed o) {
long d = (getDelay(TimeUnit.MILLISECONDS)
-o.getDelay(TimeUnit.MILLISECONDS));
if (d==0){
return 0;
}else{
if (d<0){
return -1;
}else{
return 1;
}
}
}
}

注意:我们的守护进程其实æ¯针对一个jvm中的线程进行守护的,也就是说我们测试的时候,多个进程用的看门狗其实是一个。