我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Redis分布式锁如何自动续期的实现

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Redis分布式锁如何自动续期的实现

Redis 实现分布式锁

  • 指定一个 key 作为锁标记,存入 Redis 中,指定一个 唯一的用户标识作为 value。
  • 当 key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足互斥性特性。
  • 设置一个过期时间,防止因系统异常导致没能删除这个 key,满足防死锁特性。
  • 当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足只有加锁的人才能释放锁 。

问题

如果这个锁的过期时间是30秒,但是业务运行超过了30秒,比如40秒,当业务运行到30秒的时候,锁过期了,其他客户端拿到了这个锁,怎么办

我们可以设置一个合理的过期时间,让业务能够在这个时间内完成业务逻辑,但LockTime的设置原本就很不容易。

  • LockTime设置过小,锁自动超时的概率就会增加,锁异常失效的概率也就会增加;
  • LockTime设置过大,万一服务出现异常无法正常释放锁,那么出现这种异常锁的时间也就越长。

我们只能通过经验去配置,一个可以接受的值,基本上是这个服务历史上的平均耗时再增加一定的buff。总体来说,设置一个合理的过期时间并不容易

我们也可以不设置过期时间,让业务运行结束后解锁,但是如果客户端出现了异常结束了或宕机了,那么这个锁就无法解锁,变成死锁;

自动续期

我们可以先给锁设置一个LockTime,然后启动一个守护线程,让守护线程在一段时间后,重新去设置这个锁的LockTime。

看起来很简单,但实现起来并不容易

  • 和释放锁的情况一样,我们需要先判断持有锁客户端是否有变化。否则会造成无论谁持有锁,守护线程都会去重新设置锁的LockTime。
  • 守护线程要在合理的时间再去重新设置锁的LockTime,否则会造成资源的浪费。不能动不动就去续。
  • 如果持有锁的线程已经处理完业务了,那么守护线程也应该被销毁。不能业务运行结束了,守护者还在那里继续运行,浪费资源。

看门狗

Redisson的看门狗机制就是这种机制实现自动续期的

在这里插入图片描述

Redissson tryLock


public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // 1.尝试获取锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }

        // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        current = System.currentTimeMillis();

        
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;

              }

            
            while (true) {
                long currentTime = System.currentTimeMillis();

                // 再次尝试获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
                // 超过最大等待时间则返回 false 结束循环,获取锁失败
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    //如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    //则就在wait time 时间范围内等待可以通过信号量
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                // 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 7.无论是否获得锁,都要取消订阅解锁消息
            unsubscribe(subscribeFuture, threadId);
        }
        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
  • 尝试获取锁,返回 null 则说明加锁成功,返回一个数值,则说明已经存在该锁,ttl 为锁的剩余存活时间。
  • 如果此时客户端 2 进程获取锁失败,那么使用客户端 2 的线程 id(其实本质上就是进程 id)通过 Redis 的 channel 订阅锁释放的事件。如果等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,返回 false,也就是第 39 行代码。如果等到了锁的释放事件的通知,则开始进入一个不断重试获取锁的循环。
  • 循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。如果在重试中拿到了锁,则直接返回。如果锁当前还是被占用的,那么等待释放锁的消息,具体实现使用了信号量 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。
  • 当锁正在被占用时,等待获取锁的进程并不是通过一个 while(true) 死循环去获取锁,而是利用了 Redis 的发布订阅机制,通过 await 方法阻塞等待锁的进程,有效的解决了无效的锁申请浪费资源的问题。

看门狗如何自动续期

Redisson看门狗机制, 只要客户端加锁成功,就会启动一个 Watch Dog。


private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

leaseTime 必须是 -1 才会开启 Watch Dog 机制,如果需要开启 Watch Dog 机制就必须使用默认的加锁时间为 30s。

如果你自己自定义时间,超过这个时间,锁就会自定释放,并不会自动续期。

续期原理

续期原理其实就是用lua脚本,将锁的时间重置为30s


private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()),
        internalLockLeaseTime, getLockName(threadId));
}

Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 还持有锁 key(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。

如果服务宕机了,Watch Dog 机制线程也就没有了,此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。

到此这篇关于Redis分布式锁如何自动续期的实现的文章就介绍到这了,更多相关Redis分布式锁自动续期内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Redis分布式锁如何自动续期的实现

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何在Redis中实现分布式锁的自动续期机制

实现分布式锁的自动续期机制可以通过以下步骤在Redis中实现:获取锁时设置一个过期时间,确保锁在一定时间内会自动释放。使用一个后台线程或定时任务来定期更新锁的过期时间,以实现自动续期。在获取锁时,可以使用Redis的SET命令来设置锁的
如何在Redis中实现分布式锁的自动续期机制
2024-03-14

Redis分布式锁该怎么实现续期

这篇文章将为大家详细讲解有关Redis分布式锁该怎么实现续期,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。Redis分布式锁如何续期Redis分布式锁的正确姿势据肥朝了解,很多同学在用分布式
2023-06-26

Redisson如何实现分布式锁、锁续约

这篇文章主要介绍了Redisson如何实现分布式锁、锁续约的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Redisson如何实现分布式锁、锁续约文章都会有所收获,下面我们一起来看看吧。一、基础0)Redisso
2023-07-05

SpringBoot+Redis如何实现分布式锁

这篇文章主要介绍了SpringBoot+Redis如何实现分布式锁,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。jedis的nx生成锁 如何删除锁 模拟抢单动作(10w个人开
2023-06-16

Redis中如何实现分布式锁

这篇文章给大家介绍Redis中如何实现分布式锁,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Redis要实现分布式锁,以下条件应该得到满足互斥性 在任意时刻,只有一个客户端能持有锁。不能死锁 客户端在持有锁的期间崩溃而
2023-06-16

redis如何实现分布式共享锁

Redis可以通过以下两种方式实现分布式共享锁:1. 使用SETNX命令:在Redis中,可以使用SETNX命令(即SET if Not eXists)来实现分布式锁。当一个客户端尝试设置一个键的值时,如果该键不存在,SETNX会设置成功并
2023-09-04

Redis如何实现分布式锁功能

Redis如何实现分布式锁功能分布式锁是在分布式系统中常用的一种同步机制,它可以帮助我们在多个进程或多台服务器之间实现对共享资源的互斥访问。Redis作为一种高性能的缓存和消息队列中间件,也提供了实现分布式锁的功能。本文将介绍Redis如何
Redis如何实现分布式锁功能
2023-11-07

如何在Redis中实现分布式锁

在Redis中实现分布式锁可以通过以下方式:使用SETNX命令:在Redis中可以使用SETNX命令(SET if Not eXists)来设置一个键值对,只有在键不存在的情况下才会执行设置操作。通过SETNX命令可以尝试在指定的键上设置一
如何在Redis中实现分布式锁
2024-04-09

Redis分布式锁的实现方式

目录一、分布式锁是什么1、获取锁2、释放锁二、代码实例上面代码存在锁误删问题:三、基于SETNX实现的分布式锁存在下面几个问题1、不可重入2、不可重试3、超时释放4、主从一致性四、Redisson实现分布式锁1、pom2、配置类3、测试类五
2023-04-03

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录