Redis分布式锁
1、概述
- 如果在一个分布式系统中,我们从数据库中读取一个数据,然后修改保存,这种情况很容易遇到并发问题。因为读取和更新保存不是一个原子操作,在并发时就会导致数据的不正确。这种场景其实并不少见,比如电商秒杀活动,库存数量的更新就会遇到。如果是单机应用,直接使用本地锁就可以避免。如果是分布式应用,本地锁派不上用场,这时就需要引入分布式锁来解决。 由此可见分布式锁的目的其实很简单,就是为了保证多台服务器在执行某一段代码时保证只有一台服务器执行。
- 为了保证分布式锁的可用性,至少要确保锁的实现要同时满足以下几点:
- 互斥性。在任何时刻,保证只有一个客户端持有锁。
- 不能出现死锁。如果在一个客户端持有锁的期间,这个客户端崩溃了,也要保证后续的其他客户端可以上锁。
- 保证上锁和解锁都是同一个客户端。
- 一般来说,实现分布式锁的方式有以下几种:
- 使用MySQL,基于唯一索引。
- 使用ZooKeeper,基于临时有序节点。
- 使用Redis,基于setnx命令。
2、实际案例
现在使用SpringBoot+redis搭建一个简单的模拟买卖平台的程序,使用redis分布式锁实现多个服务间保证同一时刻同一时间段内同一用户只能有一个请求(防止关键业务出现并发攻击)。
①创建SpringBoot应用,添加pom依赖如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>application.properties配置文件内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13server.port=1111
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
#连接池最大连接数(使用负值表示没有限制)默认8
spring.redis.lettuce.pool.max-active=8
#连接池最大阻塞等待时间(使用负值表示没有限制)默认-1
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接默认8
spring.redis.lettuce.pool.max-idle=8
#连接池中的最小空闲连接默认0
spring.redis.lettuce.pool.min-idle=0②在redis中存入key=goods:001,value=100的数据,代表当前goods:001的商品剩余库存100件。
1
2
3
4
5
6
7[root bin]# pwd
/usr/local/redis/bin
[root bin]# ./redis-cli
127.0.0.1:6379> set goods:001 100
OK
127.0.0.1:6379> get goods:001
"100"③创建Controller,编写下单接口如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public String buy_Goods(){
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0){
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort;
}else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort;
}
}运行程序访问http://localhost:1111/buy_goods后如图:
出现的问题:没有加锁,并发下数字不对,出现超卖现象。
改进1:使用synchronized或者ReentrantLock。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public String buy_Goods() {
synchronized (this) {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
}
public String buy_Goods2(){
if (lock.tryLock()){
try {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0){
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort;
}
}finally {
lock.unlock();
}
}else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort;
}
}- 出现的问题:在单机环境下,可以使用synchronized或Lock来实现。但是在分布式系统中,因为竞争的线程可能不在同一个节点上(不同一个jvm中),不同进程jvm层面的锁就不管用了,所以需要一个让所有进程都能访问到的锁来实现,比如redis或者zookeeper来构建。即可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程。
改进2:使用redis分布式锁setnx。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public static final String REDIS_LOCK_KEY = "mylock";
public String buy_Goods() {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
//setIfAbsent() 就是如果不存在就新建,且是原子性的
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value);//setnx
if (!lockFlag) {
return "抢锁失败,┭┮﹏┭┮";
} else {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort);
stringRedisTemplate.delete(REDIS_LOCK_KEY);//释放锁
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
}
}- 出现的问题:出异常的话,可能无法释放锁, 必须要在代码层面finally释放锁。
改进3:必须要在代码层面finally释放锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public static final String REDIS_LOCK_KEY = "mylock";
public String buy_Goods() {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
//setIfAbsent() 就是如果不存在就新建
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value);//setnx
if (!lockFlag) {
return "抢锁失败,┭┮﹏┭┮";
} else {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
} finally {
stringRedisTemplate.delete(REDIS_LOCK_KEY);//释放锁
}
}
}- 出现的问题:部署了微服务jar包的机器挂了,代码层面根本没有走到finally这块, 没办法保证解锁,这个key没有被删除,需要加入一个过期时间限定key。
改进4:需要对lockKey有过期时间的设定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public static final String REDIS_LOCK_KEY = "mylock";
public String buy_Goods() {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
//setIfAbsent() 就是如果不存在就新建
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value);//setnx
stringRedisTemplate.expire(REDIS_LOCK_KEY, 10L, TimeUnit.SECONDS);
if (!lockFlag) {
return "抢锁失败,┭┮﹏┭┮";
} else {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
} finally {
stringRedisTemplate.delete(REDIS_LOCK_KEY);//释放锁
}
}
}- 出现的问题:设置key+过期时间分开了,必须要合并成一行具备原子性。
改进5:将key+过期时间的设置变成原子性的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public static final String REDIS_LOCK_KEY = "mylock";
public String buy_Goods() {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
//setIfAbsent() == setnx 就是如果不存在就新建,同时加上过期时间保证原子性
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value,10L, TimeUnit.SECONDS);
if (!lockFlag) {
return "抢锁失败,┭┮﹏┭┮";
} else {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
} finally {
stringRedisTemplate.delete(REDIS_LOCK_KEY);//释放锁
}
}
}- 出现的问题:张冠李戴,可能会删除了别人的锁。当业务处理大于过期时间,第一个线程创建的REDIS_LOCK_KEY失效。此时进程2进来,又创建了一个REDIS_LOCK_KEY,继续执行其业务。此时如果线程1刚好执行完成,在finally块中误删线程2创建的REDIS_LOCK_KEY。
改进6:改成只能自己删除自己的,不许动别人的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public static final String REDIS_LOCK_KEY = "mylock";
public String buy_Goods() {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
//setIfAbsent() == setnx 就是如果不存在就新建,同时加上过期时间保证原子性
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value, 10L, TimeUnit.SECONDS);
stringRedisTemplate.expire(REDIS_LOCK_KEY, 10L, TimeUnit.SECONDS);
if (!lockFlag) {
return "抢锁失败,┭┮﹏┭┮";
} else {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
} finally {
if (value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(REDIS_LOCK_KEY))) {
stringRedisTemplate.delete(REDIS_LOCK_KEY);//释放锁
}
}
}
}- 出现的问题:finally块的判断+del删除操作不是原子性的。
改进7:用redis自身的事务或者Lua脚本。
①用redis自身的事务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public static final String REDIS_LOCK_KEY = "mylock";
public String buy_Goods() {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
//setIfAbsent() == setnx 就是如果不存在就新建,同时加上过期时间保证原子性
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value, 10L, TimeUnit.SECONDS);
stringRedisTemplate.expire(REDIS_LOCK_KEY, 10L, TimeUnit.SECONDS);
if (!lockFlag) {
return "抢锁失败,┭┮﹏┭┮";
} else {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
} finally {
while (true) {
stringRedisTemplate.watch(REDIS_LOCK_KEY); //加事务,乐观锁
if (value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(REDIS_LOCK_KEY))) {
stringRedisTemplate.setEnableTransactionSupport(true);
stringRedisTemplate.multi();//开始事务
stringRedisTemplate.delete(REDIS_LOCK_KEY);
List<Object> list = stringRedisTemplate.exec();
if (list == null) { //如果等于null,就是没有删掉,删除失败,再回去while循环那再重新执行删除
continue;
}
}
//如果删除成功,释放监控器,并且breank跳出当前循环
stringRedisTemplate.unwatch();
break;
}
}
}
}②使用Lua脚本:
1
2
3
4
5
6
7
8
9
10
11
12
public class RedisConfig {
private String redisHost;
public Redisson redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://"+redisHost+":6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public static final String REDIS_LOCK_KEY = "mylock";
public String buy_Goods() throws Exception {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
//setIfAbsent() == setnx 就是如果不存在就新建,同时加上过期时间保证原子性
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value, 10L, TimeUnit.SECONDS);
stringRedisTemplate.expire(REDIS_LOCK_KEY, 10L, TimeUnit.SECONDS);
if (!lockFlag) {
return "抢锁失败,┭┮﹏┭┮";
} else {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
} finally {
Jedis jedis = RedisUtils.getJedis();
String script = "if redis.call('get', KEYS[1]) == ARGV[1]" + "then "
+ "return redis.call('del', KEYS[1])" + "else " + " return 0 " + "end";
try {
Object result = jedis.eval(script, Collections.singletonList(REDIS_LOCK_KEY), Collections.singletonList(value));
if ("1".equals(result.toString())) {
System.out.println("------del REDIS_LOCK_KEY success");
} else {
System.out.println("------del REDIS_LOCK_KEY error");
}
} finally {
if (null != jedis) {
jedis.close();
}
}
}
}
}出现的问题:无法确保redisLock过期时间大于业务执行时间。注意:
- redis(集群是AP,单机CP)异步复制可能造成锁丢失,比如主节点没来的及把刚刚set进来这条数据给从节点,就挂了。而Zookeeper(CP)是确保数据同步完毕才给客户端返回成功信息,即保证强一致性。
改进8:使用Redisson。
1
2
3
4
5
6
7
8
9
10
11
12
public class RedisConfig {
private String redisHost;
public Redisson redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://"+redisHost+":6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class GoodController {
private StringRedisTemplate stringRedisTemplate;
private String serverPort;
public static final String REDIS_LOCK_KEY = "mylock";
private Redisson redisson;
public String buy_Goods() {
//String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
RLock redissonLock = redisson.getLock(REDIS_LOCK_KEY);
redissonLock.lock();
try {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
} finally {
//还在持有锁的状态,并且是当前线程持有的锁再解锁
if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()) {
redissonLock.unlock();
}
}
}
}
3、redisson
使用redisson的基本套路如下:
1
2
3
4
5
6
7
8
9
10RLock redissonLock = redisson.getLock("mylock");
try {
redissonLock.lock();
…………
} finally {
//还在持有锁的状态,并且是当前线程持有的锁再解锁
if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()) {
redissonLock.unlock();
}
}原理图如下:
加锁机制:
线程去获取锁,获取成功,则执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败,则一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
- KEYS[1]:指加锁时指定的key,即上方例子的”myLock”。
- ARGV[1]:指锁key的默认生存时间,默认30秒。
- ARGV[2]:指加锁的客户端的ID,例如:078e44a3-5f95-4e24-b6aa-80684655a15a:45,即guid+当前线程的ID。
- 上述代表”078e44a3-5f95-4e24-b6aa-80684655a15a:45”这个客户端对”myLock”这个锁key完成了加锁。接着会执行”pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒,到此为止加锁就完成了。
锁互斥机制:
- 如果客户端2来尝试加锁,执行了同样的一段lua脚本。第一个if判断会执行”exists myLock”,发现myLock这个锁key已经存在了。 接着第二个if判断myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。
- 所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间,比如还剩15000毫秒的生存时间。此时客户端2会进入一个while循环,不停的尝试加锁。
watch dog自动延期机制:
- 使用redisson加锁时一般会设置个过期时间,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,这样的目的主要是防止死锁的发生。 此时可能会出现业务执行时间大于锁过期时间,因此如果想要再继续持有锁的话,就需要启动一个watch dog后台线程,不断的延长锁key的生存时间。 (使用看门狗后不设定锁的时间则默认锁30s,而看门狗的作用是每10s看这个锁释放了没有,如果还没释放就自动发送lua到redis补全30s锁存在的时间)
可重入加锁机制:
Redis存储锁的数据类型是Hash类型。
Hash数据类型的key值包含了当前线程信息。
例如使用redisson存储的hash数据如下:
Hash类型相当于java的<key,<key1,value>>类型,这里key是指”redisson”,有效期剩下9秒,key1的值为078e44a3-5f95-4e24-b6aa-80684655a15a:45, 即guid+当前线程的ID,后面的value是就和可重入加锁有关。
当发生重入时,lua脚本的第一个if判断肯定不成立,”exists myLock”会显示锁key已经存在了。第二个if判断会成立,因为myLock的hash数据结构中包含客户端1的ID,也就是”078e44a3-5f95-4e24-b6aa-80684655a15a:45”,此时就会执行可重入加锁的逻辑”incrby myLock 078e44a3-5f95-4e24-b6aa-80684655a15a:45” 对客户端1的加锁次数累加1。
释放锁的机制:
- 通过执行lock.unlock()就可以释放分布式锁,即就是每次都对myLock数据结构中的那个加锁次数减1,如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用”del myLock”命令,从redis里删除这个key。
上述Redis分布式锁的缺点 :
- 在Redis哨兵模式下,客户端1对某个master节点写入了redisson锁,此时会异步复制给对应的slave节点。但是这个过程中一旦发生master节点宕机,主备切换,slave节点从变为了master节点。这时客户端2来尝试加锁的时候,在新的master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。 解决办法是使用redLock。
参考链接:
①https://www.cnblogs.com/qdhxhz/p/11046905.html
②https://blog.csdn.net/qq_37286668/article/details/107402862