Redisson?框架中的分布式锁实现方法

 更新时间:2024年03月02日 08:58:44   作者:emanjusaka  
这篇文章主要介绍了Redisson?框架中的分布式锁,实现分布式锁通常有三种方式:数据库、Redis 和 Zookeeper,我们比较常用的是通过 Redis 和 Zookeeper 实现分布式锁,需要的朋友可以参考下
(福利推荐:【腾讯云】服务器最新限时优惠活动,云服务器1核2G仅99元/年、2核4G仅768元/3年,立即抢购>>>:9i0i.cn/qcloud

(福利推荐:你还在原价购买阿里云服务器?现在阿里云0.8折限时抢购活动来啦!4核8G企业云服务器仅2998元/3年,立即抢购>>>:9i0i.cn/aliyun

实现分布式锁通常有三种方式:数据库、Redis 和 Zookeeper。我们比较常用的是通过 Redis 和 Zookeeper 实现分布式锁。Redisson 框架中封装了通过 Redis 实现的分布式锁,下面我们分析一下它的具体实现。

by emanjusaka from https://www.emanjusaka.top/2024/03/redisson-distributed-lock 彼岸花开可奈何
本文欢迎分享与聚合,全文转载请留下原文地址。

关键点

  • 原子性

    要么都成功,要么都失败

  • 过期时间

    如果锁还没来得及释放就遇到了服务宕机,就会出现死锁的问题。给 Redis 的 key 设置过期时间,即使服务宕机了超过设置的过期时间锁会自动进行释放。

  • 锁续期

    因为给锁设置了过期时间而我们的业务逻辑具体要执行多长时间可能是变化和不确定的,如果设定了一个固定的过期时间,可能会导致业务逻辑还没有执行完,锁被释放了的问题。锁续期能保证锁是在业务逻辑执行完才被释放。

  • 正确释放锁

    保证释放自己持有的锁,不能出现 A 释放了 B 持有锁的情况。

Redis 实现分布式锁的几种部署方式

  • 单机

    在这种部署方式中,Redis 的所有实例都部署在同一台服务器上。这种部署方式简单易行,但存在单点故障的风险。如果 Redis 实例宕机,则所有分布式锁都将失效。

  • 哨兵

    在这种部署方式中,Redis 的多个实例被配置为哨兵。哨兵负责监控 Redis 实例的状态,并在主实例宕机时自动选举一个新的主实例。这种部署方式可以提供更高的可用性和容错性。

  • 集群

    在这种部署方式中,Redis 的多个实例被配置为一个集群。集群中的每个实例都是平等的,并且可以处理读写操作。这种部署方式可以提供最高的可用性和容错性。

  • 红锁

    搞几个独立的 Master,比如 5 个,然后挨个加锁,只要超过一半以上(这里是 5/2+1=3 个)就代表加锁成功,然后释放锁的时候也逐台释放。

使用方式

引入依赖

<!--        pom.xml文件-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.17.7</version>
</dependency>

版本依赖:

redisson-spring-data module nameSpring Boot version
redisson-spring-data-161.3.y
redisson-spring-data-171.4.y
redisson-spring-data-181.5.y
redisson-spring-data-2x2.x.y
redisson-spring-data-3x3.x.y

yml配置

spring:
  redis:
    redisson:
      config:
        singleServerConfig:
          address: redis://127.0.0.1:6379
          database: 0
          password: null
          timeout: 3000

直接注入使用

package top.emanjusaka;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
 * @Author emanjusaka www.emanjusaka.top
 * @Date 2024/2/28 16:41
 * @Version 1.0
 */
@Service
public class Lock {
    @Resource
    private RedissonClient redissonClient;
    public void lock() {
        // 写入redis的key值
        String lockKey = "lock-test";
        // 获取一个Rlock锁对象
        RLock lock = redissonClient.getLock(lockKey);
        // 获取锁,并为其设置过期时间为10s
        lock.lock(10, TimeUnit.SECONDS);
        try {
            // 执行业务逻辑....
            System.out.println("获取锁成功!");
        } finally {
            // 释放锁
            lock.unlock();
            System.out.println("释放锁成功!");
        }
    }
}

底层剖析

lock()

关键代码

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
                "if ((redis.call('exists', KEYS[1]) == 0) " +
                       "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }
  • RFuture<T>:表示返回一个异步结果对象,其中泛型参数 T 表示结果的类型。

  • tryLockInnerAsync 方法接受一下参数:

    • waitTime:等待时间,用于指定在获取锁时的最大等待时间。
    • leaseTime:租约时间,用于指定锁的持有时间
    • unit:时间单位,用于将 leaseTime 转换为毫秒
    • threadId:线程 ID,用于标识当前线程
    • command:Redis 命令对象,用于执行 Redis 操作
  • 方法体中的代码使用 Lua 脚本来实现分布式锁的逻辑。

    • if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)): 如果键不存在或者哈希表中已经存在对应的线程ID,则执行以下操作:
      • redis.call('hincrby', KEYS[1], ARGV[2], 1): 将哈希表中对应线程ID的值加1。
      • redis.call('pexpire', KEYS[1], ARGV[1]): 设置键的过期时间为租约时间。
      • return nil: 返回nil表示成功获取锁。
    • else: 如果键存在且哈希表中不存在对应的线程ID,则执行以下操作:
      • return redis.call('pttl', KEYS[1]): 返回键的剩余生存时间。
  • commandExecutor.syncedEval:表示同步执行 Redis 命令

  • LongCodec.INSTANCE:用于编码和解码长整型数据

  • Collections.singletonList(getRawName()):创建一个只包含一个元素的列表,元素为锁的名称

  • unit.toMillis(leaseTime):将租约时间转换为毫秒

  • getLockName(threadId):根据线程 ID 生成锁的名称

// 省去了那些无关重要的代码
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    // tryAcquire就是上面分析的lua完整脚本
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // 返回null就代表上锁成功。
    if (ttl == null) {
        return;
    }
    // 如果没成功,也就是锁的剩余时间不是null的话,那么就执行下面的逻辑
    // 其实就是说 如果有锁(锁剩余时间不是null),那就死循环等待重新抢锁。
    try {
        while (true) {
            // 重新抢锁
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // 抢锁成功就break退出循环
            if (ttl == null) {
                break;
            }
            // 省略一些代码
        }
    } finally {}
}

上面代码实现了一个分布式锁的功能。它使用了Lua脚本来尝试获取锁,并在成功获取锁后返回锁的剩余时间(ttl)。如果获取锁失败,则进入一个死循环,不断尝试重新获取锁,直到成功为止。

unlock()

关键代码

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return nil;",
                Arrays.asList(getRawName(), getChannelName()),
                LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand());
    }
  • RFuture<Boolean>: 表示返回一个异步结果对象,其中泛型参数Boolean表示结果的类型。
  • unlockInnerAsync方法接受以下参数:
    • threadId: 线程ID,用于标识当前线程。
  • 方法体中的代码使用Lua脚本来实现分布式锁的解锁逻辑。以下是对Lua脚本的解释:
    • if (redis.call('hexists', KEYS[1], ARGV[3]) == 0): 如果哈希表中不存在对应的线程ID,则返回nil表示无法解锁。
    • local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1): 将哈希表中对应线程ID的值减1,并将结果赋值给变量counter。
    • if (counter > 0): 如果counter大于0,表示还有其他线程持有锁,执行以下操作:
      • redis.call('pexpire', KEYS[1], ARGV[2]): 设置键的过期时间为租约时间。
      • return 0: 返回0表示锁仍然被其他线程持有。
    • else: 如果counter等于0,表示当前线程是最后一个持有锁的线程,执行以下操作:
      • redis.call('del', KEYS[1]): 删除键,释放锁。
      • redis.call(ARGV[4], KEYS[2], ARGV[1]): 调用发布命令,通知其他线程锁已经释放。
      • return 1: 返回1表示成功释放锁。
    • return nil: 如果前面的条件都不满足,返回nil表示无法解锁。
  • evalWriteAsync方法用于执行Lua脚本并返回异步结果对象。
  • getRawName(): 获取锁的名称。
  • LongCodec.INSTANCE: 用于编码和解码长整型数据。
  • RedisCommands.EVAL_BOOLEAN: 指定Lua脚本的返回类型为布尔值。
  • Arrays.asList(getRawName(), getChannelName()): 创建一个包含两个元素的列表,元素分别为锁的名称和频道名称。
  • LockPubSub.UNLOCK_MESSAGE: 发布消息的内容。
  • internalLockLeaseTime: 锁的租约时间。
  • getLockName(threadId): 根据线程ID生成锁的名称。
  • getSubscribeService().getPublishCommand(): 获取发布命令。

锁续期

watchDog

核心工作流程是定时监测业务是否执行结束,没结束的话在看你这个锁是不是快到期了(超过锁的三分之一时间),那就重新续期。这样防止如果业务代码没执行完,锁却过期了所带来的线程不安全问题。

Redisson 的 watchDog 机制底层不是调度线程池,而是直接用的 netty 事件轮。

Redisson的WatchDog机制是用于自动续期分布式锁和监控对象生命周期的一种机制,确保了分布式环境下锁的正确性和资源的及时释放。

  • 自动续期:当Redisson客户端获取了一个分布式锁后,会启动一个WatchDog线程。这个线程负责在锁即将到期时自动续期,保证持有锁的线程可以继续执行任务。默认情况下,锁的初始超时时间是30秒,每10秒钟WatchDog会检查一次锁的状态,如果锁依然被持有,它会将锁的过期时间重新设置为30秒。
  • 参数配置:可以通过设置lockWatchdogTimeout参数来调整WatchDog检查锁状态的频率和续期的超时时间。这个参数默认值是30000毫秒(即30秒),适用于那些没有明确指定leaseTimeout参数的加锁请求。
  • 重连机制:除了锁自动续期外,WatchDog机制还用作Redisson客户端的自动重连功能。当客户端与Redis服务器失去连接时,WatchDog会自动尝试重新连接,从而恢复服务的正常运作。
  • 资源管理:WatchDog也负责监控Redisson对象的生命周期,例如分布式锁。当对象的生命周期到期时,WatchDog会将其从Redis中删除,避免过期数据占用过多内存空间。
  • 异步加锁:在加锁的过程中,WatchDog会在RedissonLock#tryAcquireAsync方法中发挥作用,该方法是进行异步加锁的逻辑所在。通过这种方式,加锁操作不会阻塞当前线程,提高了系统的性能。

到此这篇关于Redisson 框架中的分布式锁的文章就介绍到这了,更多相关Redisson分布式锁内容请搜索程序员之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持程序员之家!

相关文章

  • Deepin UOS编译安装Redis的实现步骤

    Deepin UOS编译安装Redis的实现步骤

    本文主要介绍了Deepin UOS编译安装Redis的实现步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-01-01
  • redis集群搭建_动力节点Java学院整理

    redis集群搭建_动力节点Java学院整理

    这篇文章主要介绍了redis集群搭建,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • Redis如何优雅的删除特定前缀key

    Redis如何优雅的删除特定前缀key

    这篇文章主要给大家介绍了关于Redis如何优雅的删除特定前缀key的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Redis具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-06-06
  • 全网最完整的Redis新手入门指导教程

    全网最完整的Redis新手入门指导教程

    这篇文章主要给大家介绍了Redis新手入门的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • redis的五大数据类型应用场景分析

    redis的五大数据类型应用场景分析

    这篇文章主要介绍了redis的五大数据类型实现原理,本文给大家分享五大数据类型的应用场景分析,需要的朋友可以参考下
    2021-08-08
  • Redis数据库安全详解

    Redis数据库安全详解

    这篇文章主要为大家介绍了Redis数据库安全详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-11-11
  • 在Centos?8.0中安装Redis服务器的教程详解

    在Centos?8.0中安装Redis服务器的教程详解

    由于考虑到linux服务器的性能,所以经常需要把一些中间件安装在linux服务上,今天通过本文给大家介绍下在Centos?8.0中安装Redis服务器的详细过程,感兴趣的朋友一起看看吧
    2022-03-03
  • 使用Redis解决高并发方案及思路解读

    使用Redis解决高并发方案及思路解读

    这篇文章主要介绍了使用Redis解决高并发方案及思路,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • Redis实现全局唯一Id的使用示例

    Redis实现全局唯一Id的使用示例

    全局唯一ID有多个方法可供选择,其中一种是使用Redis,本文就来介绍一下Redis实现全局唯一Id的使用示例,具有一定的参考价值,感兴趣的可以了解一下
    2023-12-12
  • 关于SpringBoot 使用 Redis 分布式锁解决并发问题

    关于SpringBoot 使用 Redis 分布式锁解决并发问题

    针对上面问题,一般的解决方案是使用分布式锁来解决,本文通过场景分析给大家介绍关于SpringBoot 使用 Redis 分布式锁解决并发问题,感兴趣的朋友一起看看吧
    2021-11-11

最新评论

?


http://www.vxiaotou.com