1. Redis 分布式锁说明
Java 实现 Redis 分布式锁案例已经上传到笔者的GitHub,欢迎下载参考,如有错误希望大佬指正。
在大多数情况下,应该都是使用成熟的分布式锁框架,如 Redisson
。这里只是根据 Redisson
部分源码思想进行的个人摸索,编写了一个利用Redis实现的分布式可重入锁,包含看门狗对锁进行续期。
1.1 什么是 Redis 分布式锁
- 在 Java 中提供了 synchronized 和 Lock 锁,来保证多线程程序中的线程安全问题。
- 分布式锁指的是,在分布式系统,不同的进程中,访问共享资源的一张锁的实现。
如果不同的系统
或同一个系统的不同主机
之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。
1.2 分布式锁需要满足的条件
- 互斥性。在任意时刻,只有一个线程能持有锁。
- 重入锁。一个线程能重复获取同一把锁。
- 不会发生死锁。即使有一个线程在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 具有容错性。只要大部分的 Redis 节点正常运行,就可以加锁和解锁。
- 解铃还须系铃人。加锁和解锁必须是同一个线程,自己不能把别人加的锁给解了。
- 锁过期续费。在线程任务未完成的情况下,需要自动续约锁,以防锁过期。
1.3 Redis 分布式锁原理
加锁
- 一个分布式系统中,可能会存在各种各样的分布式锁,这些锁,都应该有一个标识,如:
lock1
、lock2
、lock3
…
- 对于一把锁,如标识为
lock1
的锁,可能会有好几个不同机器上的线程在竞争。
- 竞争锁的线程,也应该给它们一个线程标识,如:
uuid1
、uuid2
、uuid3
…
- 如果线程
uuid1
获取了锁 lock1
,在还未释放锁的时候,允许线程 uuid1
能重复获取 lock1
(记录获取数量),释放直到获取该锁的数量为零
(锁不再被线程uuid1
持有)
Redis 哈希是结构化为字段值对集合的记录类型。可以使用散列来表示基本对象并存储计数器分组等。
散列表(hashmap)能够满足保存:锁标识
:线程标识
:重入次数
解锁
如上面加锁
中所述,锁是可以重入的。
- 一个线程可以重复获取同一把锁,因此每次解锁,该锁的记录值
value
应该减1
- 如果某个线程获取锁
lock1
的值为零了,锁应该被释放,这时候要允许别的线程获取锁 lock1
。
1.4 Redis 分布式锁实现原理(lua 脚本说明)
我们都知道 redis 是单线程的,因此可以通过lua脚本来获取锁、释放锁、锁续期,保证原子性。
加锁
KEYS[1] —- 锁标识
ARGV[1] —- 线程标识
ARGV[2] —- 锁过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[1], 1); redis.call('pexpire', KEYS[1], ARGV[2]); return 1; end;
local beforeGetCurrentLockHasNum = 0;
if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then beforeGetCurrentLockHasNum = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hincrby', KEYS[1], ARGV[1], 1); redis.call('pexpire', KEYS[1], ARGV[2]); local afterGetCurrentLockHasNum = redis.call('hget', KEYS[1], ARGV[1]) - beforeGetCurrentLockHasNum; return afterGetCurrentLockHasNum; end;
return beforeGetCurrentLockHasNum;
|
解锁
KEYS[1] —- 锁标识
ARGV[1] —- 线程标识
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return 0; end;
local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); if (counter == 0) then redis.call('del', KEYS[1]); return 1; end;
return 1;
|
锁续期
KEYS[1] —- 锁标识
ARGV[1] —- 线程标识
ARGV[2] —- 重置时间
1 2 3 4 5 6 7 8
| if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return 'fail'; end;
redis.call('pexpire', KEYS[1], ARGV[2]); return 'success';
|
2. Java 实现 Redis 分布式锁
这里通过 Spring Boot 示例项目,使用 spring-boot-starter-data-redis
来连接 redis。主要是体现其实现思想。
创建一个 RedisLock 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import org.springframework.context.annotation.DependsOn; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import java.util.Collections; import java.util.UUID;
@Component(value = "redisLock") @DependsOn({"stringRedisTemplate", "redisTemplate", "redisConnectionFactory"}) public class RedisLock {
@Resource(name = "stringRedisTemplate") private StringRedisTemplate stringRedisTemplate;
public static String generationUuid() { return UUID.randomUUID().toString(); } }
|
2.1 获取锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
public boolean lock(String lockKey, String uuid, long expireTime) { if (expireTime <= 0) { return false; } boolean lock = false; String luaScriptStr = "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[1], 1); " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 1; " + "end; " + "local beforeGetCurrentLockHasNum = 0; " + "if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then " + "beforeGetCurrentLockHasNum = redis.call('hget', KEYS[1], ARGV[1]); " + "redis.call('hincrby', KEYS[1], ARGV[1], 1); " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "local afterGetCurrentLockHasNum = redis.call('hget', KEYS[1], ARGV[1]) - beforeGetCurrentLockHasNum; " + "return afterGetCurrentLockHasNum; " + "end; " + "return beforeGetCurrentLockHasNum; "; RedisScript<Long> luaScriptObj = new DefaultRedisScript<>(luaScriptStr, Long.class); String result = String.valueOf(stringRedisTemplate.execute( luaScriptObj, Collections.singletonList(lockKey), uuid, String.valueOf(expireTime) )); if (!"0".equals(result)) { lock = true; System.out.printf("线程:%s,获取锁:%s,过期时间:%s\n", uuid, lockKey, expireTime); } return lock; }
|
2.2 释放锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
public boolean unlock(String lockKey, String uuid) { boolean release = false; String luaScriptStr = "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " + "return 0; " + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " + "if (counter == 0) then " + "redis.call('del', KEYS[1]); " + "return 1; " + "end; " + "return 1; "; RedisScript<Long> luaScriptObj = new DefaultRedisScript<>(luaScriptStr, Long.class); String result = String.valueOf(stringRedisTemplate.execute( luaScriptObj, Collections.singletonList(lockKey), uuid )); if ("1".equals(result)) { release = true; System.out.printf("线程:%s,释放锁:%s\n", uuid, lockKey); } return release; }
|
2.3 锁续期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public boolean renewal(String lockKey, String uuid, long expireTime) { String luaScriptStr = "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " + "return 'fail'; " + "end; " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 'success'; "; if (expireTime <= 0) { return false; } boolean renewal = false; RedisScript<String> luaScriptObj = new DefaultRedisScript<>(luaScriptStr, String.class); String result = stringRedisTemplate.execute( luaScriptObj, Collections.singletonList(lockKey), uuid, String.valueOf(expireTime) ); if ("success".equals(result)) { renewal = true; System.out.printf("线程:%s,持有锁:%s,进行续期:%s\n", uuid, lockKey, expireTime); } return renewal; }
|
3. Java 实现锁续期
3.1 创建一个注解
创建一个注解,用于标识获取锁、释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface LockMethodListener {
boolean isGetLock() default false; }
|
3.2 使用注解
在获取锁方法【lock
】、释放锁方法【unlock
】上使用该注解:
1 2 3 4
| @LockMethodListener(isGetLock = true) public boolean lock(String lockKey, String uuid, long expireTime) { }
|
1 2 3 4
| @LockMethodListener(isGetLock = false) public boolean unlock(String lockKey, String uuid) { }
|
3.3 编写切面
前面虽说是已经完成了注解的编写,但无实质处理。下面针对使用了注解 LockMethodListener
的方法编写切面。下面代码需要用到 LockListener.java
锁监听类,后续 3.4 锁监听类
部分衔接。
创建切面类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit;
@Aspect @Component public class LockMethodListenerAspect {
private static final ConcurrentHashMap<String, LockInfo> WITCH_DOG_MAP = new ConcurrentHashMap<>();
private static final ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(30);
@Resource(name = "redisLock") private RedisLock redisLock;
private static class LockInfo {
public ScheduledFuture<?> future;
public int reentrant;
public LockInfo(ScheduledFuture<?> future, int reentrant) { this.future = future; this.reentrant = reentrant; } } }
|
切面规则/表达式
1 2 3
| @Pointcut("@annotation(lockMethodListener)") public void lockPointCut(LockMethodListener lockMethodListener) { }
|
编写切入逻辑
一旦有线程调用了获取锁/释放锁方法后,就会执行该逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @AfterReturning( returning = "resultValue", value = "lockPointCut(lockMethodListener)", argNames = "joinPoint,lockMethodListener,resultValue" ) public void afterMethodListener(JoinPoint joinPoint, LockMethodListener lockMethodListener, Boolean resultValue) { Object[] args = joinPoint.getArgs(); boolean isGetLock = lockMethodListener.isGetLock(); String lockKey = String.valueOf(args[0]); String uuid = String.valueOf(args[1]);
if (!resultValue) { return; }
if (isGetLock) { long expireTime = Long.parseLong(String.valueOf(args[2])); if (expireTime <= 0) { return; } witch(lockKey, uuid, expireTime); return; }
unWitch(lockKey, uuid); }
|
3.4 监听锁续期方法
看门狗
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
public void witch(String lockKey, String uuid, long expireTime) { String key = lockKey + uuid; LockInfo reentrantInfo = WITCH_DOG_MAP.get(lockKey + uuid); if (reentrantInfo != null) { ++reentrantInfo.reentrant; return; }
ScheduledFuture<?> future = SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate( new LockAsync(this.redisLock, lockKey, uuid, expireTime), expireTime / 2, expireTime / 2, TimeUnit.MILLISECONDS );
reentrantInfo = new LockInfo(future, 1); WITCH_DOG_MAP.put(key, reentrantInfo); }
public static class LockAsync implements Runnable {
private final RedisLock redisLock;
private final String lockKey;
private final String uuid;
private final long expireTime;
public LockAsync(RedisLock redisLock, String lockKey, String uuid, long expireTime) { this.redisLock = redisLock; this.lockKey = lockKey; this.uuid = uuid; this.expireTime = expireTime; }
@Override public void run() { this.redisLock.renewal(this.lockKey, this.uuid, this.expireTime); } }
|
关闭看门狗
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
public void unWitch(String lockKey, String uuid) { LockInfo reentrantInfo = WITCH_DOG_MAP.get(lockKey + uuid); if (reentrantInfo == null || reentrantInfo.future.isCancelled()) { return; } reentrantInfo.future.cancel(true); }
|
切面整体代码如下:
- 锁每隔半个过期时间,就会续期一次
- 每次续期后,过期时间恢复为最初的过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
| import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit;
@Aspect @Component public class LockMethodListenerAspect {
private static final ConcurrentHashMap<String, LockInfo> WITCH_DOG_MAP = new ConcurrentHashMap<>();
private static final ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(30);
@Resource(name = "redisLock") private RedisLock redisLock;
@Pointcut("@annotation(lockMethodListener)") public void lockPointCut(LockMethodListener lockMethodListener) { }
@AfterReturning( returning = "resultValue", value = "lockPointCut(lockMethodListener)", argNames = "joinPoint,lockMethodListener,resultValue" ) public void afterMethodListener(JoinPoint joinPoint, LockMethodListener lockMethodListener, Boolean resultValue) { Object[] args = joinPoint.getArgs(); boolean isGetLock = lockMethodListener.isGetLock(); String lockKey = String.valueOf(args[0]); String uuid = String.valueOf(args[1]);
if (!resultValue && !isGetLock) { return; }
if (resultValue && isGetLock) { long expireTime = Long.parseLong(String.valueOf(args[2])); if (expireTime <= 0) { return; } witch(lockKey, uuid, expireTime); return; }
if (isGetLock) { unWitch(lockKey, uuid); } }
public void witch(String lockKey, String uuid, long expireTime) { String key = lockKey + uuid; LockInfo reentrantInfo = WITCH_DOG_MAP.get(lockKey + uuid); if (reentrantInfo != null) { ++reentrantInfo.reentrant; return; }
ScheduledFuture<?> future = SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate( new LockAsync(this.redisLock, lockKey, uuid, expireTime), expireTime / 2, expireTime / 2, TimeUnit.MILLISECONDS );
reentrantInfo = new LockInfo(future, 1); WITCH_DOG_MAP.put(key, reentrantInfo); }
public void unWitch(String lockKey, String uuid) { LockInfo reentrantInfo = WITCH_DOG_MAP.get(lockKey + uuid); if (reentrantInfo == null || reentrantInfo.future.isCancelled()) { return; } reentrantInfo.future.cancel(true); }
private static class LockInfo {
public ScheduledFuture<?> future;
public int reentrant;
public LockInfo(ScheduledFuture<?> future, int reentrant) { this.future = future; this.reentrant = reentrant; } }
public static class LockAsync implements Runnable {
private final RedisLock redisLock;
private final String lockKey;
private final String uuid;
private final long expireTime;
public LockAsync(RedisLock redisLock, String lockKey, String uuid, long expireTime) { this.redisLock = redisLock; this.lockKey = lockKey; this.uuid = uuid; this.expireTime = expireTime; }
@Override public void run() { this.redisLock.renewal(this.lockKey, this.uuid, this.expireTime); } } }
|
到这里,Java 实现 redis 分布式锁已经完成。
4. 测试案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| import org.springframework.stereotype.Service;
import javax.annotation.Resource; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
@Service public class DemoService {
@Resource private RedisLock redisLock;
private final static String LOCK_KEY_DEMO = "LOCK_KEY";
private static final ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor( 10, 100, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>() );
public void demo() { Runnable runnable = () -> { long start = System.currentTimeMillis(); String threadKey = RedisLock.generationUuid(); boolean lock = false; try { lock = getLock(threadKey); } catch (InterruptedException e) { throw new RuntimeException(e); } if (!lock) { return; }
try { Thread.sleep(2000L); } catch (InterruptedException e) { throw new RuntimeException(e); }
boolean unLock = false; try { unLock = releaseLock(threadKey); } catch (InterruptedException e) { throw new RuntimeException(e); } };
for (int i = 0; i < 5; i++) { POOL_EXECUTOR.execute(runnable); } }
private boolean getLock(String threadKey) throws InterruptedException { boolean lock = redisLock.lock(LOCK_KEY_DEMO, threadKey, 1000L); if (!lock) { Thread.sleep(100); return getLock(threadKey); } return true; }
private boolean releaseLock(String threadKey) throws InterruptedException { boolean unlock = redisLock.unlock(LOCK_KEY_DEMO, threadKey); if (!unlock) { Thread.sleep(100); return releaseLock(threadKey); } return true; } }
|
执行demo()方法,结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| 线程:7d87528f-8da9-40f5-a32b-7d3cc10fbb17,获取锁:LOCK_KEY,过期时间:1000 线程:7d87528f-8da9-40f5-a32b-7d3cc10fbb17,持有锁:LOCK_KEY,进行续期:1000 线程:7d87528f-8da9-40f5-a32b-7d3cc10fbb17,持有锁:LOCK_KEY,进行续期:1000 线程:7d87528f-8da9-40f5-a32b-7d3cc10fbb17,持有锁:LOCK_KEY,进行续期:1000 线程:7d87528f-8da9-40f5-a32b-7d3cc10fbb17,持有锁:LOCK_KEY,进行续期:1000 线程:7d87528f-8da9-40f5-a32b-7d3cc10fbb17,释放锁:LOCK_KEY 线程:63a22f14-d504-4098-82dc-cd893e87acac,获取锁:LOCK_KEY,过期时间:1000 线程:63a22f14-d504-4098-82dc-cd893e87acac,持有锁:LOCK_KEY,进行续期:1000 线程:63a22f14-d504-4098-82dc-cd893e87acac,持有锁:LOCK_KEY,进行续期:1000 线程:63a22f14-d504-4098-82dc-cd893e87acac,持有锁:LOCK_KEY,进行续期:1000 线程:63a22f14-d504-4098-82dc-cd893e87acac,释放锁:LOCK_KEY 线程:39bb17d2-cd09-4336-9c6a-219484fc709f,获取锁:LOCK_KEY,过期时间:1000 线程:39bb17d2-cd09-4336-9c6a-219484fc709f,持有锁:LOCK_KEY,进行续期:1000 线程:39bb17d2-cd09-4336-9c6a-219484fc709f,持有锁:LOCK_KEY,进行续期:1000 线程:39bb17d2-cd09-4336-9c6a-219484fc709f,持有锁:LOCK_KEY,进行续期:1000 线程:39bb17d2-cd09-4336-9c6a-219484fc709f,持有锁:LOCK_KEY,进行续期:1000 线程:39bb17d2-cd09-4336-9c6a-219484fc709f,释放锁:LOCK_KEY 线程:fb209186-18f8-490c-ae5f-c1939b77d3a5,获取锁:LOCK_KEY,过期时间:1000 线程:fb209186-18f8-490c-ae5f-c1939b77d3a5,持有锁:LOCK_KEY,进行续期:1000 线程:fb209186-18f8-490c-ae5f-c1939b77d3a5,持有锁:LOCK_KEY,进行续期:1000 线程:fb209186-18f8-490c-ae5f-c1939b77d3a5,持有锁:LOCK_KEY,进行续期:1000 线程:fb209186-18f8-490c-ae5f-c1939b77d3a5,持有锁:LOCK_KEY,进行续期:1000 线程:fb209186-18f8-490c-ae5f-c1939b77d3a5,释放锁:LOCK_KEY 线程:8370aba2-2d3c-4852-a8a0-17e6dfbb76cc,获取锁:LOCK_KEY,过期时间:1000 线程:8370aba2-2d3c-4852-a8a0-17e6dfbb76cc,持有锁:LOCK_KEY,进行续期:1000 线程:8370aba2-2d3c-4852-a8a0-17e6dfbb76cc,持有锁:LOCK_KEY,进行续期:1000 线程:8370aba2-2d3c-4852-a8a0-17e6dfbb76cc,持有锁:LOCK_KEY,进行续期:1000 线程:8370aba2-2d3c-4852-a8a0-17e6dfbb76cc,释放锁:LOCK_KEY
|