Redis分布式可重入锁实现方案

 更新时间:2024年02月19日 09:40:03   作者:程序员小潘  
在单进程环境下,要保证一个代码块的同步执行,直接用synchronized 关键字或ReetrantLock 即可,在分布式环境下,要保证多个节点的线程对代码块的同步访问,就必须要用到分布式锁方案,本文介绍一下基于 Redis实现的分布式锁方案,感兴趣的朋友一起看看吧
(福利推荐:【腾讯云】服务器最新限时优惠活动,云服务器1核2G仅99元/年、2核4G仅768元/3年,立即抢购>>>:9i0i.cn/qcloud

(福利推荐:你还在原价购买阿里云服务器?现在阿里云0.8折限时抢购活动来啦!4核8G企业云服务器仅2998元/3年,立即抢购>>>:9i0i.cn/aliyun

前言

在单进程环境下,要保证一个代码块的同步执行,直接用synchronized 关键字或ReetrantLock 即可。在分布式环境下,要保证多个节点的线程对代码块的同步访问,就必须要用到分布式锁方案。
分布式锁实现方案有很多,有基于关系型数据库行锁实现的;有基于ZooKeeper临时顺序节点实现的;还有基于 Redis setnx 命令实现的。本文介绍一下基于 Redis 实现的分布式锁方案。

理解分布式锁

实现分布式锁有几个要求

  • 互斥性:任意时刻,最多只会有一个客户端线程可以获得锁
  • 可重入:同一客户端的同一线程,获得锁后能够再次获得锁
  • 避免死锁:客户端获得锁后即使宕机,后续客户端也可以获得锁
  • 避免误解锁:客户端A加的锁只能由A自己释放
  • 释放锁通知:持有锁的客户端释放锁后,最好可以通知其它客户端继续抢锁
  • 高性能和高可用

Redis 服务端命令是单线程串行执行的,天生就是原子的,并且支持执行自定义的 lua 脚本,功能上更加强大。
关于互斥性,我们可以用 setnx 命令实现,Redis 可以保证只会有一个客户端 set 成功。但是由于我们要实现的是一个分布式的可重入锁,数据结构得用 hash,用客户端ID+线程ID作为 field,value 记作锁的重入次数即可。
关于死锁,代码里建议把锁的释放写在 finally 里面确保一定执行,针对客户端抢到锁后宕机的场景,可以给 redis key 设置一个超时时间来解决。
关于误解锁,客户端在释放锁时,必须判断 field 是否当前客户端ID以及线程ID一致,不一致就不执行删除,这里需要用到 lua 脚本判断。
关于释放锁通知,可以利用 Redis 发布订阅模式,给每个锁创建一个频道,释放锁的客户端负责往频道里发送消息通知等待抢锁的客户端。
最后关于高性能和高可用,因为 Redis 是基于内存的,天生就是高性能的。但是 Redis 服务本身一旦出现问题,分布式锁也就不可用了,此时可以多部署几台独立的示例,使用 RedLock 算法来解决高可用的问题。

设计实现

首先我们定义一个 RedisLock 锁对象的抽象接口,只有尝试加锁和释放锁方法

public interface RedisLock {
    boolean tryLock();
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
    void unlock();
}

然后提供一个默认实现 DefaultRedisLock

public class DefaultRedisLock implements RedisLock {
    // 客户端ID UUID
    private final String clientId;
    private final StringRedisTemplate redisTemplate;
    // 锁频道订阅器 接收释放锁通知
    private final LockSubscriber lockSubscriber;
    // 加锁的key
    private final String lockKey;
}

关于tryLock() ,首先执行lua脚本尝试获取锁,如果加锁失败则返回其它客户端持有锁的过期时间,客户端订阅锁对应的频道,然后sleep,直到收到锁释放的通知再继续抢锁。最终不管有没有抢到锁,都会在 finally 取消频道订阅。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
    final long timeout = System.currentTimeMillis() + unit.toMillis(waitTime);
    final long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
        return true;
    }
    if (System.currentTimeMillis() >= timeout) {
        return false;
    }
    final Semaphore semaphore = lockSubscriber.subscribe(getChannel(lockKey), threadId);
    try {
        while (true) {
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                return true;
            }
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            semaphore.tryAcquire(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lockSubscriber.unsubscribe(getChannel(lockKey), threadId);
    }
    return false;
}

tryAcquire() 就是执行lua脚本来加锁,解释一下这段脚本的逻辑:首先判断 lockKey 是否存在,不存在则直接设置 lockKey并且设置过期时间,返回空,表示加锁成功。存在则判断 field 是否和当前客户端ID+线程ID一致,一致则代表锁重入,递增一下value即可,不一致代表加锁失败,返回锁的过期时间

private Long tryAcquire(long leaseTime, TimeUnit timeUnit, long threadId) {
    return redisTemplate.execute(RedisScript.of(
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "return redis.call('pttl', KEYS[1]);", Long.class), Collections.singletonList(lockKey),
            String.valueOf(timeUnit.toMillis(leaseTime)), getLockName(threadId));
}

lockName是由客户端ID和线程ID组成的:

private String getLockName(long threadId) {
    return clientId + ":" + threadId;
}

如果加锁失败,客户端会尝试订阅对应的频道,名称规则是:

private String getChannel(String lockKey) {
    return "__lock_channel__:" + lockKey;
}

订阅方法是LockSubscriber#subscribe ,同一个频道无需订阅多个监听器,所以用一个 Map 记录。订阅成功以后,会返回当前线程对应的一个 Semaphore 对象,默认许可数是0,当前线程会调用Semaphore#tryAcquire 等待许可数,监听器在收到锁释放消息后会给 Semaphore 对象增加许可数,唤醒线程继续抢锁。

@Component
public class LockSubscriber {
    @Autowired
    private RedisMessageListenerContainer messageListenerContainer;
    private final Map<String, Map<Long, Semaphore>> channelSemaphores = new HashMap<>();
    private final Map<String, MessageListener> listeners = new HashMap<>();
    private final StringRedisSerializer serializer = new StringRedisSerializer();
    public synchronized Semaphore subscribe(String channelName, long threadId) {
        MessageListener old = listeners.put(channelName, new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                String channel = serializer.deserialize(message.getChannel());
                String ignore = serializer.deserialize(message.getBody());
                Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channel);
                if (semaphoreMap != null && !semaphoreMap.isEmpty()) {
                    semaphoreMap.values().stream().findFirst().ifPresent(Semaphore::release);
                }
            }
        });
        if (old == null) {
            messageListenerContainer.addMessageListener(listeners.get(channelName), new ChannelTopic(channelName));
        }
        Semaphore semaphore = new Semaphore(0);
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.getOrDefault(channelName, new HashMap<>());
        semaphoreMap.put(threadId, semaphore);
        channelSemaphores.put(channelName, semaphoreMap);
        return semaphore;
    }
    public synchronized void unsubscribe(String channelName, long threadId) {
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channelName);
        if (semaphoreMap != null) {
            semaphoreMap.remove(threadId);
            if (semaphoreMap.isEmpty()) {
                MessageListener listener = listeners.remove(channelName);
                if (listener != null) {
                    messageListenerContainer.removeMessageListener(listener);
                }
            }
        }
    }
}

对于 unlock,就只是一段 lua 脚本,这里解释一下:判断当前客户端ID+线程ID 这个 field 是否存在,存在说明是自己加的锁,可以释放。不存在说明不是自己加的锁,无需做任何处理。因为是可重入锁,每次 unlock 都只是递减一下 value,只有当 value 等于0时才是真正的释放锁。释放锁的时候会 del lockKey,再 publish 发送锁释放通知,让其他客户端可以继续抢锁。

@Override
public void unlock() {
    long threadId = Thread.currentThread().getId();
    redisTemplate.execute(RedisScript.of(
                    "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
                            "return nil;end;" +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " +
                            "if (counter > 0) then " +
                            "return 0; " +
                            "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], 1); " +
                            "return 1; " +
                            "end; " +
                            "return nil;", Long.class), Arrays.asList(lockKey, getChannel(lockKey)),
            getLockName(threadId));
}

最后,我们需要一个 RedisLockFactory 来创建锁对象,它同时会生成客户端ID

@Component
public class RedisLockFactory {
    private static final String CLIENT_ID = UUID.randomUUID().toString();
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private LockSubscriber lockSubscriber;
    public RedisLock getLock(String lockKey) {
        return new DefaultRedisLock(CLIENT_ID, redisTemplate, lockSubscriber, lockKey);
    }
}

至此,一个基于 Redis 实现的分布式可重入锁就完成了。

尾巴

目前这个版本的分布式锁,保证了互斥性、可重入、避免死锁和误解锁、实现了释放锁通知,但是并没有高可用的保证。如果 Redis 是单实例部署,就会存在单点问题,Redis 一旦故障,整个分布式锁将不可用。如果 Redis 是主从集群模式部署,虽然有主从自动切换,但是 Master 和 Slave 之间的数据同步是存在延迟的,分布式锁可能会出现问题。比如:客户端A加锁成功,lockKey 写入了 Master,此时 Master 宕机,其它 Slave 升级成了 Master,但是还没有同步到 lockKey,客户端B来加锁也会成功,这就没有保证互斥性。针对这个问题,可以参考 RedLock 算法,部署多个单独的 Redis 示例,只要一半以上的Redis节点加锁成功就算成功,来尽可能的保证服务高可用。

到此这篇关于Redis分布式可重入锁实现方案的文章就介绍到这了,更多相关Redis重入锁内容请搜索程序员之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持程序员之家!

相关文章

  • Spring?redis使用报错Read?timed?out排查及解决过程

    Spring?redis使用报错Read?timed?out排查及解决过程

    项目使用spring集成redis,偶尔会出现read timed out的情况,刚开始以为是网络不稳定引起的,后面发现影响业务测试的准确性,这篇文章主要给大家介绍了关于Spring redis使用报错Read timed out排查及解决过程的相关资料,需要的朋友可以参考下
    2024-02-02
  • 如何通过redis减库存的秒杀场景实现

    如何通过redis减库存的秒杀场景实现

    本文通过解决秒杀系统中的一个场景即数据预加载,即把库存数据事先加载到缓存,然后通过缓存来更新库存,简单介绍了如何通过redis减库存的秒杀场景实现,感兴趣的可以了解一下
    2022-06-06
  • redis批量操作pipeline管道操作方法

    redis批量操作pipeline管道操作方法

    Redis本身是基于一个Request一个Response方式的同步请求,正常情况下,客户端发送一个命令,这篇文章主要介绍了redis批量操作pipeline管道,需要的朋友可以参考下
    2022-09-09
  • 如何利用Redis?List实现Java数据库分页快速查询

    如何利用Redis?List实现Java数据库分页快速查询

    这篇文章主要给大家介绍了关于如何利用Redis?List实现Java数据库分页快速查询的相关资料,Redis是一个高效的内存数据库,它支持包括String、List、Set、SortedSet和Hash等数据类型的存储,需要的朋友可以参考下
    2024-02-02
  • Redis使用元素删除的布隆过滤器来解决缓存穿透问题

    Redis使用元素删除的布隆过滤器来解决缓存穿透问题

    本文主要介绍了Redis使用元素删除的布隆过滤器来解决缓存穿透问题,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • Redis集群方案

    Redis集群方案

    前段时间搞了搞Redis集群,想用做推荐系统的线上存储,说来挺有趣,这边基础架构不太完善,因此需要我们做推荐系统的自己来搭这个存储环境,就自己折腾了折腾
    2020-07-07
  • 如何使用注解方式实现?Redis?分布式锁

    如何使用注解方式实现?Redis?分布式锁

    这篇文章主要介绍了如何使用注解方式实现Redis分布式锁,文章围绕主题展开详细的内容介绍,教大家如何优雅的使用Redis分布式锁,感兴趣的小伙伴可以参考一下
    2022-07-07
  • Redis过期键与内存淘汰策略深入分析讲解

    Redis过期键与内存淘汰策略深入分析讲解

    因为redis数据是基于内存的,然而内存是非常宝贵的资源,然后我们就会对一些不常用或者只用一次的数据进行存活时间设置,这样才能提高内存的使用效率,下面这篇文章主要给大家介绍了关于Redis中过期键与内存淘汰策略,需要的朋友可以参考下
    2022-11-11
  • odoo中使用redis实现缓存的步骤

    odoo中使用redis实现缓存的步骤

    这篇文章主要介绍了odoo中使用redis实现缓存的步骤,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-04-04
  • 浅谈Redis的异步机制

    浅谈Redis的异步机制

    命令操作、系统配置、关键机制、硬件配置等会影响 Redis 的性能,还要提前准备好应对异常的方案,本文主要介绍了Redis的异步机制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-05-05

最新评论

?


http://www.vxiaotou.com