Redis实现分布式锁
这句话基本上是正确的,但是需要一些小的澄清和补充以更准确地描述分布式锁和Redis在其中的作用。
分布式锁
锁的作用域和分布式环境
确实,传统的Java锁(如synchronized
关键字或java.util.concurrent.locks
包中的锁)设计用于单个JVM进程内多线程之间的同步。这些锁不能直接用于多个JVM进程之间(即分布式环境中)的同步,因为每个进程拥有自己的内存空间和锁状态,它们之间无法共享这种状态。
Redis实现分布式锁
引入Redis来实现分布式锁是一种流行的做法,确实可以利用Redis的某些特性来实现跨多个进程的同步。Redis的操作通常是原子性的,这意味着每个命令在执行过程中不会被其他命令中断。加之Redis是单线程的,保证了在任何给定时间内只执行一个命令,这为实现分布式锁提供了基础。
Redis单线程的特点
虽然Redis是单线程的,但是“利用Redis单线程的特点来实现分布式锁”这句话可能会造成一些误解。实现分布式锁的关键不仅仅在于Redis的单线程模型,而更在于它提供的命令可以被用来原子性地创建锁。例如,SET key value NX PX milliseconds
命令可以原子性地设置一个值,如果这个键不存在的话,并设置键的过期时间。这意味着只有第一个尝试设置该键的进程/线程可以成功获得锁,而其他进程/线程则因为NX条件(键必须不存在)而失败,从而实现了锁的效果。
Redis实现分布式锁
Redis分布式锁最简单的实现
想要实现分布式锁,必须要求 Redis 有「互斥」的能力,我们可以使用 SETNX 命令,这个命令表示SET if Not Exists,即如果 key 不存在,才会设置它的值,否则什么也不做。
两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。
客户端 1 申请加锁,加锁成功:
客户端 2 申请加锁,因为它后到达,加锁失败:

此时,加锁成功的客户端,就可以去操作「共享资源」,例如,修改 MySQL 的某一行数据,或者调用一个 API 请求。
操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?
也很简单,直接使用 DEL 命令删除这个 key 即可,这个逻辑非常简单。

但是,它存在一个很大的问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成「死锁」:
1、程序处理业务逻辑异常,没及时释放锁
2、进程挂了,没机会释放锁
这时,这个客户端就会一直占用这个锁,而其它客户端就「永远」拿不到这把锁了。怎么解决这个问题呢?
如何避免死锁?
我们很容易想到的方案是,在申请锁时,给这把锁设置一个「租期」。
在 Redis 中实现时,就是给这个 key 设置一个「过期时间」。这里我们假设,操作共享资源的时间不会超过 10s,那么在加锁时,给这个 key 设置 10s 过期即可:
1 2
| SETNX lock 1 // 加锁 EXPIRE lock 10 // 10s后自动过期
|

这样一来,无论客户端是否异常,这个锁都可以在 10s 后被「自动释放」,其它客户端依旧可以拿到锁。
但现在还是有问题:
现在的操作,加锁、设置过期是 2 条命令,有没有可能只执行了第一条,第二条却「来不及」执行的情况发生呢?例如:
- SETNX 执行成功,执行EXPIRE 时由于网络问题,执行失败
- SETNX 执行成功,Redis 异常宕机,EXPIRE 没有机会执行
- SETNX 执行成功,客户端异常崩溃,EXPIRE也没有机会执行
总之,这两条命令不能保证是原子操作(一起成功),就有潜在的风险导致过期时间设置失败,依旧发生「死锁」问题。
在 Redis 2.6.12 之后,Redis 扩展了 SET 命令的参数,用这一条命令就可以了:

锁被别人释放怎么办?
上面的命令执行时,每个客户端在释放锁时,都是「无脑」操作,并没有检查这把锁是否还「归自己持有」,所以就会发生释放别人锁的风险,这样的解锁流程,很不「严谨」!如何解决这个问题呢?
解决办法是:客户端在加锁时,设置一个只有自己知道的「唯一标识」进去。
例如,可以是自己的线程 ID,也可以是一个 UUID(随机且唯一),这里我们以UUID 举例:
之后,在释放锁时,要先判断这把锁是否还归自己持有,伪代码可以这么写:
1 2
| if redis.get("lock") == $uuid: redis.del("lock")
|
这里释放锁使用的是 GET + DEL 两条命令,这时,又会遇到我们前面讲的原子性问题了。这里可以使用lua脚本来解决。
原子性问题锁误释放锁
如果操作不是原子性的,那么在检查锁的值和实际进行删除之间的时间窗口内,锁可能已经被释放(例如因为超时)并被另一个客户端获取。这种情况下,一个客户端可能会错误地释放了属于另一个客户端的锁。
1 2 3 4 5 6
| if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end
|
好了,这样一路优化,整个的加锁、解锁的流程就更「严谨」了。
这里我们先小结一下,基于 Redis 实现的分布式锁,一个严谨的的流程如下:
1、加锁
1
| SET lock_key $unique_id EX $expire_time NX
|
2、操作共享资源
3、释放锁:Lua 脚本,先 GET 判断锁是否归属自己,再DEL 释放锁
代码
介绍代码前先介绍这几个类的作用
private ThreadLocal<String> lockerId = new ThreadLocal<>();
这句代码在Java中声明了一个ThreadLocal
类型的变量lockerId
,用于存储与当前线程关联的String
类型的数据。
ThreadLocal 是什么?
ThreadLocal
是Java提供的一个线程局部变量工具,它可以为每个使用该变量的线程提供一个独立的变量副本。这意味着每个线程都可以独立地改变自己的副本,而不会影响到其他线程中的副本。这在进行多线程编程时是非常有用的,特别是当你需要避免因变量共享而导致的并发问题。
注意
虽然ThreadLocal
提供了很好的线程隔离能力,但它也可能导致内存泄露问题,特别是在使用线程池时。因为线程池中的线程会被重用,如果不及时清理ThreadLocal
变量,那么已经结束生命周期的对象可能会一直被ThreadLocal
引用而无法被垃圾收集器回收。因此,使用ThreadLocal
时应确保及时调用remove()
方法来避免内存泄露。
AtomicReference
和LinkedBlockingQueue
都是Java并发包中的类,用于处理多线程编程的不同方面。
AtomicReference
AtomicReference
类位于java.util.concurrent.atomic
包中,提供了一个可以原子读写的引用变量。这意味着对于任何单个操作,如赋值或者更新操作,AtomicReference
保证操作的原子性,即在完成操作之前,不会被其他线程打断。
AtomicReference
是用于对象引用的原子操作,并且常用于无锁设计或算法中,尤其是在需要对共享对象进行原子更新时。例如,你可能会使用AtomicReference
来原子更新一个共享的配置对象,而不需要同步访问该对象。
LinkedBlockingQueue
LinkedBlockingQueue
类位于java.util.concurrent
包中,是一个基于已链接节点的、线程安全的阻塞队列。它实现了BlockingQueue
接口。这种队列的元素遵循先进先出(FIFO)原则。LinkedBlockingQueue
可以用作生产者消费者模式的基础,其中多个线程生产对象到队列中,而多个消费者线程从队列中取出并处理这些对象。
LinkedBlockingQueue
的一个关键特性是,它在尝试添加元素到满队列或从空队列获取元素时,可以让线程阻塞(等待),直到操作可以成功进行。这样,它在处理多线程间的协作时非常有用。
对比
- 用途不同:
AtomicReference
主要用于实现无锁的线程安全操作,特别是对共享对象的原子更新。而LinkedBlockingQueue
用于在生产者和消费者模式中,安全地在多个线程间传递数据。
- 功能不同:
AtomicReference
提供原子操作,以保证更新操作的原子性。LinkedBlockingQueue
提供阻塞操作,以协调生产者和消费者之间的数据流。
- 性能考虑:使用
AtomicReference
可能有助于减少锁的使用,从而在某些场景下提高性能。而LinkedBlockingQueue
通过提供等待/通知机制,使得生产者和消费者能够有效地同步数据流,可能在不同的使用场景下提高性能。
两者都是解决并发问题的重要工具,选择使用哪一个取决于你的具体需求和应用场景。
分布式锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
@Component public class RedisDistributedLock implements Lock { private final static String LOCK_NAME = "redis:lock"; private final static int EX_TIME = 5 ;
@Autowired private JedisPool jedisPool; private ThreadLocal<String> thread_id = new ThreadLocal<>();
private Thread owner_thread = null;
@Override public void lock() { try{ while(!tryLock()){ Thread.sleep(1000); } }catch (Exception e){ e.printStackTrace(); }
}
@Override public void lockInterruptibly() throws InterruptedException {
}
@Override public boolean tryLock() { Thread me = Thread.currentThread(); if(owner_thread == me){ return true; }else if(owner_thread != null){ return false; }
Jedis jedis = null; try{ jedis = jedisPool.getResource(); SetParams setParams = new SetParams(); setParams.nx().ex(EX_TIME); String my_id = UUID.randomUUID().toString();
if(owner_thread == null && "OK".equals(jedis.set(LOCK_NAME,my_id,setParams))){ thread_id.set(my_id); owner_thread = me; return true; }else { return false; }
}catch (Exception e){ e.printStackTrace(); }finally { jedis.close(); } return false; }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; }
@Override public void unlock() { Jedis resource = null; try{ resource = jedisPool.getResource(); String unlock_lua = getLuaScript(resource); Long res = 0L; if(thread_id.get() != null){ res = (Long)resource.eval(unlock_lua,1,LOCK_NAME,thread_id.get()); } if(res == 1) System.out.println("锁释放成功"); else System.out.println("锁释放失败");
} catch (Exception e){ e.printStackTrace(); }finally {
if(resource != null){ resource.close(); }
if(owner_thread == Thread.currentThread()){ thread_id.remove(); owner_thread = null; System.out.println("本地锁所有权已经石房"); }
}
}
@Override public Condition newCondition() { return null; }
public String getLuaScript(Jedis jedis) throws Exception { File lua = null; FileInputStream inputStream = null; String res = ""; try{ lua = ResourceUtils.getFile("classpath:script/unlock.lua"); inputStream = new FileInputStream(lua);
byte[] buffer = new byte[1024 * 5]; int len = 0; while((len = inputStream.read(buffer)) != -1){ res += new String(buffer,0,len); } }catch (Exception e){ e.printStackTrace(); }finally { inputStream.close(); jedis.close(); } return res; } }
|
lua脚本
1 2 3 4 5 6 7 8
| local key = KEYS[1] local id = ARGV[1] local lock_id = redis.call("get",key) if(id == lock_id) then redis.call("del",key) return 1 end return 0
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Test public void testLock1() throws Exception{ int a = 1; CountDownLatch countDownLatch = new CountDownLatch(5); ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 1; i <= 5; i++) {
service.submit(() -> { try { lock.lock(); System.out.println(Thread.currentThread().getName() + "准备开始累加"); Thread.sleep(1000); res++; } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } countDownLatch.countDown(); }); } countDownLatch.await(); System.out.println(res); }
|
加入看门狗优化
上面的分布式锁可以简单的实现功能,但是在实际环境,我们不可能设置固定的过期时间。毕竟如果我们调用服务也就是获取锁和解锁之间的时间大于这个过期时间。会造成服务还没完,我们锁就被自动释放了导致线程安全问题。因此我们如果可以的话定期查询锁的状态,自动续期就好了。这个时候我们就会引入看门狗机制。
加锁时,先设置一个过期时间,然后我们开启一个守护线程、,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间。这个守护线程我们一般也把它叫做看门狗线程。
但是我们看门狗线程不做没用的轮序询问是否快过期,我们只需要使用DelayQueue
延迟队列来实现,当快要过期之前我们就从队列中取出我们的守护进程进行延长过期时间。
分布式锁

|
@Component public class RedisDistributedLockWithDog implements Lock { private final static String LOCK_NAME = "redis:lock"; private final static long EX_TIME = 5 * 1000;
@Autowired private JedisPool jedisPool; private ThreadLocal<String> thread_id = new ThreadLocal<>();
private Thread owner_thread = null; private Thread watch_dog = null; private DelayQueue<DelayedElement> queue = new DelayQueue<>();
@Override public void lock() { try{ while(!tryLock()){ Thread.sleep(1000); } }catch (Exception e){ e.printStackTrace(); }
}
@Override public void lockInterruptibly() throws InterruptedException {
}
@Override public boolean tryLock() { Thread me = Thread.currentThread(); if(owner_thread == me){ return true; }else if(owner_thread != null){ return false; }
Jedis jedis = null; try{ jedis = jedisPool.getResource(); SetParams setParams = new SetParams(); setParams.nx().px(EX_TIME); String my_id = UUID.randomUUID().toString();
if(owner_thread == null && "OK".equals(jedis.set(LOCK_NAME,my_id,setParams))){ thread_id.set(my_id); owner_thread = me; if(watch_dog == null){ watch_dog = new Thread(new WatchDog(),"看门狗线程"+UUID.randomUUID().toString()); watch_dog.setDaemon(true); watch_dog.start(); } queue.add(new DelayedElement(System.currentTimeMillis() + EX_TIME - 1000,LOCK_NAME,my_id));
return true; }else { return false; }
}catch (Exception e){ e.printStackTrace(); }finally { jedis.close(); } return false; }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; }
@Override public void unlock() { Jedis resource = null; try{ resource = jedisPool.getResource(); String unlock_lua = unlock(resource); Long res = 0L; if(thread_id.get() != null){ res = (Long)resource.eval(unlock_lua,1,LOCK_NAME,thread_id.get()); } if(res == 1) System.out.println("锁释放成功"); else System.out.println("锁释放失败");
} catch (Exception e){ e.printStackTrace(); }finally {
if(resource != null){ resource.close(); }
if(owner_thread == Thread.currentThread()){ thread_id.remove(); owner_thread = null; System.out.println("本地锁所有权已经石房"); }
}
}
@Override public Condition newCondition() { return null; }
public String unlock(Jedis jedis) throws Exception { File lua = null; FileInputStream inputStream = null; String res = ""; try{ lua = ResourceUtils.getFile("classpath:script/unlock.lua"); inputStream = new FileInputStream(lua);
byte[] buffer = new byte[1024 * 5]; int len = 0; while((len = inputStream.read(buffer)) != -1){ res += new String(buffer,0,len); }
}catch (Exception e){ e.printStackTrace(); }finally { inputStream.close(); jedis.close(); } return res; }
public String addExp(Jedis jedis) throws Exception { File lua = null; FileInputStream inputStream = null; String res = ""; try{ lua = ResourceUtils.getFile("classpath:script/add_exp.lua"); inputStream = new FileInputStream(lua);
byte[] buffer = new byte[1024 * 5]; int len = 0; while((len = inputStream.read(buffer)) != -1){ res += new String(buffer,0,len); }
}catch (Exception e){ e.printStackTrace(); }finally { inputStream.close(); jedis.close(); } return res; }
private class WatchDog implements Runnable{
@Override public void run() { System.out.println("看门狗线程启动"); Jedis jedis = null; try { jedis = jedisPool.getResource(); while(!Thread.currentThread().isInterrupted()){ try { DelayedElement de = queue.take(); String key = de.getKey(); String value = de.getValue();
long result = (long) jedis.eval(addExp(jedis),1,key,value, String.valueOf(EX_TIME)); if(result == 0L){ System.out.println("看门狗加时失败,锁已被释放!"); } else { queue.add(new DelayedElement(System.currentTimeMillis() + EX_TIME - 1000,key,value)); System.out.println("看门狗"+ Thread.currentThread().getName()+"已经为进程加时!"); }
} catch (Exception e) { System.out.println("看门狗被中断!"); break; } } }catch (Exception e){ e.printStackTrace(); }finally { jedis.close(); }
System.out.println("看门狗线程准备关闭"); watch_dog = null;
} } @PreDestroy public void closeExpireThread(){ if(null!=watch_dog){ watch_dog.interrupt(); } } }
|
延迟队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
@AllArgsConstructor @Data public class DelayedElement implements Delayed {
private long delayTime; private String key; private String value;
public long getDelay(TimeUnit unit) { long d = unit.convert(this.delayTime - System.currentTimeMillis(),unit); return d; }
public int compareTo(Delayed o) { long d = (getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); if (d==0){ return 0; }else{ if (d<0){ return -1; }else{ return 1; } } } }
|
注意:我们的守护进程其实æ¯针对一个jvm中的线程进行守护的,也就是说我们测试的时候,多个进程用的看门狗其实是一个。