redis cluster集群模式下实现批量可重入锁

 更新时间:2024年02月28日 09:32:17   作者:RachelHwang  
本文主要介绍了使用redis cluster集群版所遇到的问题解决方案及redis可重入锁是否会有死锁的问题等,具有一定的参考价值,感兴趣的可以了解一下
(福利推荐:【腾讯云】服务器最新限时优惠活动,云服务器1核2G仅99元/年、2核4G仅768元/3年,立即抢购>>>:9i0i.cn/qcloud

(福利推荐:你还在原价购买阿里云服务器?现在阿里云0.8折限时抢购活动来啦!4核8G企业云服务器仅2998元/3年,立即抢购>>>:9i0i.cn/aliyun

一、redis cluster 集群版

在Redis 3.0版本以后,Redis发布了Redis Cluster。该集群主要支持搞并发和海量数据处理等优势,当 Redis 在集群模式下运行时,它处理数据存储的方式与作为单个实例运行时不同。这是因为它应该准备好跨多个节点分发数据,从而实现水平可扩展性。具体能力表现为:

  • 自动分割数据到不同的节点上
  • 整个集群的部分节点失败或者不可达的情况下能够继续处理命令

Redis没有使用一致性hash,而是引入哈希槽的概念,也就是 Hash Slot。Redis集群由16384个哈希槽slot,每个key通过CRC16校验后对16384取模来决定放置那个槽,集群的每个节点负责一部分hash槽,也就是说数据存放在hash槽里,而每个节点只负责部分hash槽(这样数据就存放在不同的节点)。

例如:node1、node2、node3三个节点,node1节点负责0到5500号hash槽,node2节点负责5501到11000号hash槽,node3节点负责11001到16384号hash槽。这种结构很容易添加或者删除节点,比如如果我想新添加个节点node4, 我需要从节点 node1, node2, node3中得部分槽到node4上. 如果我想移除节点node1,需要将node1中的槽移到node2和node3节点上,然后将没有任何槽的node1节点从集群中移除即可. 由于从一个节点将哈希槽移动到另一个节点并不会停止服务,所以无论添加删除或者改变某个节点的哈希槽的数量都不会造成集群不可用的状态。

在这里插入图片描述

在某些集群方案中,涉及多个key的操作会被限制在一个slot中,如Redis Cluster中的mget/mset操作。这里就会涉及到 哈希标签 Hash Tag 的概念。

Hash Tag是用于计算哈希槽时的一个特殊场景,是一种确保多个键分配到同一个哈希槽中的方法。这是为了在Redis集群中实现多键操作而使用的。为了实现Hash Tag,在某些情况下,会以稍微不同的方式计算key的哈希槽。如果key包含"{…}"模式,则仅对{和}之间的子字符串进行散列以获取哈希槽。但由于可能存在多个{或}出现,因此该算法遵循以下规则:

  • 如果key包含字符 {
  • 并且如果 } 字符位于 { 的右侧
  • 并且在第一个 { 和第一个 } 之间存在一个或多个字符

对于符合上述规则的key,则不会对整个key进行散列处理,而只会对第一次出现 { 和随后第一次出现 } 之间的内容进行散列。否则,对整个key进行散列处理。

在这里插入图片描述

不使用hash tag批量设置不同名称的key:

127.0.0.1:6379> mset name name1 name2 name3
(error) CROSSSLOT Keys in request don't hash to the same slot

显示错误信息:CROSSSLOT 请求中的key没有哈希到同一个插槽。这个问题是因为多键操作的时候每个键对应的slot可能不是一个,客户端没法做move操作。

解决思路就是采用redis cluster的hashTag,当redis的key加上hashTag时,集群算key的slot是按照hashTag进行计算,即可保证hashTag一致的key能分配到相同的stlot中。:

127.0.0.1:6379> mset name {name} {name}1 {name}2 {name}3

二、redis 分布式锁

Redis锁使用起来比较简单,既可以锁定单个键,也可以批量锁定多个键,以实现更大规模的操作。它也是分布式应用中使用最广泛的分布式锁实施方式,可以有效解决单点故障、死锁和负载失衡等问题。

大规模锁定Redis,实现批量操作,一般通过以下实现:

  • 使用Redis的消息订阅机制,创建消息频道,用于锁定指定键之间的多个键。消息频道的名字称为锁名,它代表锁定的范围和跨度。
  • 然后,通过Redis的SUBSCRIBE命令订阅消息频道名字,比如“ lock_key”,并调用Redis BLPOP,将锁定的键占据,以实现批量锁定。
  • 此外,也可以使用Redis的Lua脚本实现批量锁定。获取带锁的Key数组,这里以数组形式表示。同时,以原子的形式执行多个SETNX命令,一旦全部执行成功,则实现批量锁定:
local locks = red:lrange("lock_keys", 1, -1)
for i, v in iprs(locks) do
    if redis.call("setnx", v, field) == 1 then
        red.lpush("locked_keys", v)
    end
end

释放锁定的键,实现批量解锁,语句如下:

local unlocked_locks = red:lrange("locked_keys",1, -1)
for i, v in iprs(unlocked_locks) do
    red.del(v)
end
red.del("locked_keys")

使用Redis的WATCH功能,防止多个客户端同时更新同一键,即如果更新发生乐观锁的冲突的情况下,返回失败给客户端,从而保证了锁定的原子性:

-- 使用Redis watch,开始监听
red.watch("lock_keys")
-- 进行具体操作
-- …
-- 解锁操作
red.unwatch()

Redis锁使用起来非常简单,可以用于单个键锁定和大规模锁定,从而实现批量操作,有效解决分布式应用中的死锁、负载失衡、单点故障等问题。

三、如何使用 redis 实现批量可重入锁?

1、方案一:Lua脚本批量加锁

Lua加锁脚本处理:

	/**
	 * 加锁脚本
	 * KEYS[1] key
	 * ARGV[1] value
	 * ARGV[2] expire
	 * 判断key是否存在,不存在则加锁,并记录加锁次数+1;若存在,则判断value是否相等,相等则记录加锁次数+1,不相等则返回0
	 */
	private static final String REENTRANT_LOCK_SCRIPT = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
        "    redis.call('SET', KEYS[1], ARGV[1]) " +
        "    redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
        "    redis.call('INCR', 'lockCount:' .. KEYS[1]) " +
        "    return 1 " +
        "else " +
        "    if redis.call('GET', KEYS[1]) == ARGV[1] then " +
        "        redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
        "        redis.call('INCR', 'lockCount:' .. KEYS[1]) " +
        "        return 1 " +
        "    else " +
        "        return 0 " +
        "    end " +
        "end";
        
    @Bean
    public DefaultRedisScript<Long> reentrantLockRedisScript(){
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(REENTRANT_LOCK_SCRIPT, Long.class);
        try {
            List<Boolean> results= redisTemplate.getConnectionFactory().getConnection().scriptExists(redisScript.getSha1());
            if(Boolean.FALSE.equals(results.get(0)))
            {
                redisTemplate.getConnectionFactory().getConnection().scriptLoad(redisScript.getScriptAsString().getBytes());
                log.info("redis reentrantLockRedisScript load success");
            }

        }catch (Exception ex){
            log.error("redis reentrantLockRedisScript load error",ex);
        }

        return redisScript;
    }

Lua解锁脚本处理:

    private static final String RELEASE_REENTRANT_LOCK_SCRIPT = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
            "    return 0 " +
            "else " +
            "    if redis.call('GET', KEYS[1]) == ARGV[1] then " +
            "        redis.call('DECR', 'lockCount:' .. KEYS[1]) " +
            "        if redis.call('GET', 'lockCount:' .. KEYS[1]) == '0' then " +
            "            redis.call('DEL', KEYS[1]) " +
            "        end " +
            "        return 1 " +
            "    else " +
            "        return 0 " +
            "    end " +
            "end";
    @Bean
    public DefaultRedisScript<Long> releaseReentrantLockRedisScript(){
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_REENTRANT_LOCK_SCRIPT, Long.class);
        try {
            List<Boolean> results= redisTemplate.getConnectionFactory().getConnection().scriptExists(redisScript.getSha1());
            if(Boolean.FALSE.equals(results.get(0)))
            {
                redisTemplate.getConnectionFactory().getConnection().scriptLoad(redisScript.getScriptAsString().getBytes());
                log.info("redis releaseReentrantLockRedisScript load success");
            }

        }catch (Exception ex){
            log.error("redis releaseReentrantLockRedisScript load error",ex);
        }

        return redisScript;
    }

redis set 多个key场景需要hash tag处理:

private static final String KEY_FORMAT = "%s{%s}";
String slotKey = String.format(KEY_FORMAT, key, key);

结合以上封装,批量可重入锁的方法处理如下:

/**
     * 可重入锁,value相同的情况下,可重复加锁,当所有加锁方都解锁后才会释放锁
     *
     * @param key           锁的key
     * @param value         锁的value
     * @param expireSeconds 锁过期时间
     * @param waitSeconds   等待时间
     * @param process
     * @return
     */
    public boolean reentrantLock(String key, String value, long expireSeconds, long waitSeconds, Runnable process) {
        boolean lock = false;
        try {
            //特殊处理key,为了保证原key和计数key落在同一个slot,将原key拼装成: key{key}
            String slotKey = String.format(KEY_FORMAT, key, key);
            long start = System.currentTimeMillis();
            while (!(lock = redisUtil.execute(reentrantLockRedisScript, Collections.singletonList(slotKey), value, expireSeconds) == 1)) {
                Thread.sleep(100);
                if ((System.currentTimeMillis() - start) / 1000 > waitSeconds) {
                    break;
                }
            }
            //加锁成功,执行传入的方法,最后用lua脚本判断锁的value是否还是当前的value,是则执行解锁
            if (lock) {
                try {
                    process.run();
                } catch (Exception ex) {
                    throw ex instanceof BusinessException ? ex : new BusinessException(ex.getMessage());
                } finally {
                    redisUtil.execute(releaseReentrantLockRedisScript, Collections.singletonList(slotKey), value);
                }
            }

        } catch (BusinessException businessException) {
            throw businessException;
        } catch (Exception e) {
            log.error("redis lockAndRun error!lockKey=" + key, e);
        }
        return lock;
    }

    /**
     * 先获取可重入锁,获取成功后批量加锁,执行传入的方法
     *
     * @param keys
     * @param reentrantKey
     * @param value
     * @param expireSeconds
     * @param waitSeconds
     * @param process
     * @return
     */
    public boolean batchReentrantLock(Set<String> keys, String reentrantKey, String value, long expireSeconds, long waitSeconds, Runnable process) {
        List<Boolean> result = new ArrayList<>(1);
        boolean reentrantLock = reentrantLock(reentrantKey, value, expireSeconds, waitSeconds, () -> {
            result.add(batchLockAndRun(keys, expireSeconds, waitSeconds, process));
        });
        return reentrantLock && result.get(0);
    }

2、方案二:pipeline批量加锁

  • 不用lua以避免cross slot error
  • 批量加锁失败后立即全部解锁,防止死锁
	/**
	 * 使用redisTemplate
	 * @param script
	 * @param keys
	 * @param args
	 * @param <T>
	 */
	public List<Object> executePipelined(String[] keys, String[] values, long time, TimeUnit timeUnit) {
	      return redisTemplate.executePipelined(new SessionCallback<Object>() {
	           @Override
	           public Object execute(RedisOperations operations) throws DataAccessException {
	               for (int i = 0; i < keys.length; i++) {
	                   operations.opsForValue().setIfAbsent(keys[i], values[i], time, timeUnit);
	               }
	               return null;
	           }
	       });
	}
	
	/**
	 * pipeline批量加锁
	 * @param keys          需要加锁的key
	 * @param values        锁的value,用于解锁时判断是否是当前线程加的锁
	 * @param expireSeconds 锁过期时间
	 * @return boolean 是否加锁成功
	 */
	private boolean tryBatchLock(String[] keys, String[] values, long expireSeconds) {
	    List<Object> results = redisUtil.executePipelined(keys, values, expireSeconds, TimeUnit.SECONDS);
	    if (results == null || results.size() != keys.length || results.contains(false)) {
	        //加锁失败,立即解锁
	        redisUtil.executePipelined(releaseReentrantLockRedisScript, keys, values);
	        return false;
	    }
	    return true;
	}

四、总结

以上是使用redis cluster集群版所遇到的问题以及解决方案,主要在业务实现过程中,需要注意redis cluster key会被划分到不同的槽中的问题,以及redis可重入锁是否会有死锁的问题等,更多相关redis cluster 批量可重入锁内容请搜索程序员之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持程序员之家!

相关文章

  • 利用Redis进行数据缓存的项目实践

    利用Redis进行数据缓存的项目实践

    在实际的业务场景中,Redis 一般和其他数据库搭配使用,用来减轻后端数据库的压力,本文就介绍了利用Redis进行数据缓存的项目实践,具有一定的参考价值,感兴趣的可以了解一下
    2022-06-06
  • Redis?哈希Hash底层数据结构详解

    Redis?哈希Hash底层数据结构详解

    这篇文章主要介绍了Redis?哈希Hash底层数据结构详解的相关资料,需要的朋友可以参考下
    2022-08-08
  • Redis主从复制详解

    Redis主从复制详解

    今天小编就为大家分享一篇关于Redis主从复制详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • redis中scan命令的基本实现方法

    redis中scan命令的基本实现方法

    这篇文章主要给大家介绍了关于redis中scan命令的基本实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-10-10
  • Redis高可用梳理详解

    Redis高可用梳理详解

    高可用的本质是有备份,在出现故障的时候,有backup可以提供服务,本文详细介绍了Redis的高可用,感兴趣的同学可以参考阅读
    2023-05-05
  • Redis的4种缓存模式分享

    Redis的4种缓存模式分享

    这篇文章主要介绍了Redis的4种缓存模式分享,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下
    2022-07-07
  • redisson滑动时间窗应用场景解决方案

    redisson滑动时间窗应用场景解决方案

    前10分钟内累计3次验证失败后,增加图形验证码验证条件,前10分钟内累计6次验证失败后,系统自动锁定该账号15分钟,15分钟后自动解锁,本文给大家分享redisson滑动时间窗应用场景解决方案,感兴趣的朋友一起看看吧
    2024-01-01
  • 关于SpringBoot 使用 Redis 分布式锁解决并发问题

    关于SpringBoot 使用 Redis 分布式锁解决并发问题

    针对上面问题,一般的解决方案是使用分布式锁来解决,本文通过场景分析给大家介绍关于SpringBoot 使用 Redis 分布式锁解决并发问题,感兴趣的朋友一起看看吧
    2021-11-11
  • redis实现排行榜功能

    redis实现排行榜功能

    排行榜在很多地方都能使用到,redis的zset可以很方便地用来实现排行榜功能,本文就来简单的介绍一下如何使用,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-05-05
  • Redis RDB技术底层原理详解

    Redis RDB技术底层原理详解

    为了使Redis在重启之后仍能保证数据不丢失,需要将数据从内存中以某种形式同步到硬盘中,这一过程就是持久化,本文重点给大家介绍Redis RDB技术底层原理实现方法,一起看看吧
    2021-09-09

最新评论

?


http://www.vxiaotou.com