我的编程空间,编程开发者的网络收藏夹
学习永远不晚

【Redis进阶】一文搞懂Redisson的看门狗机制底层实现

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

【Redis进阶】一文搞懂Redisson的看门狗机制底层实现

文章目录

1. 看门狗机制概述

看门狗机制是Redission提供的一种自动延期机制,这个机制使得Redission提供的分布式锁是可以自动续期的

private long lockWatchdogTimeout = 30 * 1000;

看门狗机制提供的默认超时时间是30*1000毫秒,也就是30秒

如果一个线程获取锁后,运行程序到释放锁所花费的时间大于锁自动释放时间(也就是看门狗机制提供的超时时间30s),那么Redission会自动给redis中的目标锁延长超时时间。

在Redission中想要启动看门狗机制,那么我们就不用获取锁的时候自己定义leaseTime(锁自动释放时间)

如果自己定义了锁自动释放时间的话,无论是通过lock还是tryLock方法,都无法启用看门狗机制。

但是,如果传入的leaseTime为-1,也是会开启看门狗机制的。

分布式锁是不能设置永不过期的,这是为了避免在分布式的情况下,一个节点获取锁之后宕机从而出现死锁的情况,所以需要个分布式锁设置一个过期时间。但是这样会导致一个线程拿到锁后,在锁的过期时间到达的时候程序还没运行完,导致锁超时释放了,那么其他线程就能获取锁进来,从而出现问题。

所以,看门狗机制的自动续期,就很好地解决了这一个问题。


2. 源码解读

进入tryLock方法,这里的tryLock(waitTime, -1, unit)有三个参数

  1. waitTime:获取锁的最大等待时间(没有传默认为-1)
  2. leaseTime:锁自动释放的时间(没有传的话默认-1)
  3. unit:时间的单位(等待时间和锁自动释放的时间单位)
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {    return tryLock(waitTime, -1, unit);}
    @Override    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {        long time = unit.toMillis(waitTime);        long current = System.currentTimeMillis();        long threadId = Thread.currentThread().getId();        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);        // lock acquired        if (ttl == null) {            return true;        }                time -= System.currentTimeMillis() - current;        if (time <= 0) {            acquireFailed(waitTime, unit, threadId);            return false;        }                current = System.currentTimeMillis();        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {            if (!subscribeFuture.cancel(false)) {                subscribeFuture.onComplete((res, e) -> {                    if (e == null) {                        unsubscribe(subscribeFuture, threadId);                    }                });            }            acquireFailed(waitTime, unit, threadId);            return false;        }        try {            time -= System.currentTimeMillis() - current;            if (time <= 0) {                acquireFailed(waitTime, unit, threadId);                return false;            }                    while (true) {                long currentTime = System.currentTimeMillis();                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);                // lock acquired                if (ttl == null) {                    return true;                }                time -= System.currentTimeMillis() - currentTime;                if (time <= 0) {                    acquireFailed(waitTime, unit, threadId);                    return false;                }                // waiting for message                currentTime = System.currentTimeMillis();                if (ttl >= 0 && ttl < time) {                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                } else {                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);                }                time -= System.currentTimeMillis() - currentTime;                if (time <= 0) {                    acquireFailed(waitTime, unit, threadId);                    return false;                }            }        } finally {            unsubscribe(subscribeFuture, threadId);        }//        return get(tryLockAsync(waitTime, leaseTime, unit));    }

这上面一坨主要是锁重试的代码,感兴趣可以看【Redis】4.万字文章带你深入Redisson与源码解读(建议收藏)——起名方面没有灵感的博客-CSDN博客

而看门狗机制的相关代码主要在tryAcquire方法上,在这个方法里主要看到方法是tryAcquireAsync(waitTime, leaseTime, unit, threadId)

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}

由于在tryLock方法中没传leaseTime,所以leaseTime为默认值-1

调用tryLockInnerAsync,如果获取锁失败,返回的结果是这个key的剩余有效期,如果获取锁成功,则返回null。

获取锁成功后,如果检测不存在异常并且获取锁成功`(ttlRemaining == null)。

那么则执行this.scheduleExpirationRenewal(threadId);来启动看门狗机制。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {    if (leaseTime != -1L) {        return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);    } else {        //如果获取锁失败,返回的结果是这个key的剩余有效期        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);        //上面获取锁回调成功之后,执行这代码块的内容        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {            //不存在异常            if (e == null) {                //剩余有效期为null                if (ttlRemaining == null) {                    //这个函数是解决最长等待有效期的问题                    this.scheduleExpirationRenewal(threadId);                }            }        });        return ttlRemainingFuture;    }}<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {    internalLockLeaseTime = unit.toMillis(leaseTime);    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,                          // 锁不存在,则往redis中设置锁信息                          "if (redis.call('exists', KEYS[1]) == 0) then " +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +                          "return nil; " +                          "end; " +                          // 锁存在                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +                          "return nil; " +                          "end; " +                          "return redis.call('pttl', KEYS[1]);",                          Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

一个锁就对应自己的一个ExpirationEntry类,

EXPIRATION_RENEWAL_MAP存放的是所有的所信息。

根据锁的名称从EXPIRATION_RENEWAL_MAP里面获取锁,如果存在这把锁则冲入,如果不存在,则将这个新锁放置进EXPIRATION_RENEWAL_MAP,并且开启看门狗机制。

private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();private void scheduleExpirationRenewal(long threadId) {    ExpirationEntry entry = new ExpirationEntry();    //这里EntryName是指锁的名称    ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);    if (oldEntry != null) {        //重入        //将线程ID加入        oldEntry.addThreadId(threadId);    } else {        //将线程ID加入        entry.addThreadId(threadId);        //续约        this.renewExpiration();    }}

首先,从EXPIRATION_RENEWAL_MAP中获取这个锁,接下来定义一个延迟任务task,这个任务的步骤如下

  1. 新创建了一个子线程去反复调用
  2. EXPIRATION_RENEWAL_MAP中获取这把锁,如果这把锁不存在了,说明被删除了,不在需要续期了。
  3. 从锁中获取获得这把锁的线程IDthreadId
  4. 调用renewExpirationAsync方法刷新最长等待时间
  5. 如果刷新成功,则进来递归调用这个函数renewExpiration()

这个任务task设置为 this.internalLockLeaseTime / 3L,也是锁自动释放时间,因为没传,也就是10s。

也就是说,这个延迟任务延迟十秒执行一次。

最后,为这把锁ee设置延迟任务task即可

private void renewExpiration() {    //先从map里得到这个ExpirationEntry    ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());    if (ee != null) {        //这个是一个延迟任务        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {            //延迟任务内容            public void run(Timeout timeout) throws Exception {                //拿出ExpirationEntry                ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());                if (ent != null) {                    //从ExpirationEntry拿出线程ID                    Long threadId = ent.getFirstThreadId();                    if (threadId != null) {                        //调用renewExpirationAsync方法刷新最长等待时间                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);                        future.onComplete((res, e) -> {if (e != null) {    RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);} else {    if (res) {        //renewExpirationAsync方法执行成功之后,进行递归调用,调用自己本身函数        //那么就可以实现这样的效果        //首先第一次进行这个函数,设置了一个延迟任务,在10s后执行        //10s后,执行延迟任务的内容,刷新有效期成功,那么就会再新建一个延迟任务,刷新最长等待有效期        //这样这个最长等待时间就会一直续费        RedissonLock.this.renewExpiration();    }}                        });                    }                }            }        },                       //这是锁自动释放时间,因为没传,所以是看门狗时间=30*1000                      //也就是10s                      this.internalLockLeaseTime / 3L,                       //时间单位                      TimeUnit.MILLISECONDS);        //给当前ExpirationEntry设置延迟任务        ee.setTimeout(task);    }}// 刷新等待时间protected RFuture<Boolean> renewExpirationAsync(long threadId) {    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +                          "return 1; " +                          "end; " +                          "return 0;",                          Collections.singletonList(getName()),                          internalLockLeaseTime, getLockName(threadId));}

最后,在释放锁的时候,就会关闭所有的延迟任务,核心代码如下

public RFuture<Void> unlockAsync(long threadId) {    RPromise<Void> result = new RedissonPromise();    RFuture<Boolean> future = this.unlockInnerAsync(threadId);    future.onComplete((opStatus, e) -> {        //取消锁更新任务        this.cancelExpirationRenewal(threadId);        if (e != null) {            result.tryFailure(e);        } else if (opStatus == null) {            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);            result.tryFailure(cause);        } else {            result.trySuccess((Object)null);        }    });    return result;}void cancelExpirationRenewal(Long threadId) {    //获得当前这把锁的任务    ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());    if (task != null) {        //当前锁的延迟任务不为空,且线程id不为空        if (threadId != null) {            //先把线程ID去掉            task.removeThreadId(threadId);        }        if (threadId == null || task.hasNoThreads()) {            //然后取出延迟任务            Timeout timeout = task.getTimeout();            if (timeout != null) {                //把延迟任务取消掉                timeout.cancel();            }//再把ExpirationEntry移除出map            EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());        }    }}

3. 总结

在使用Redis实现分布式锁的时候,会存在很多问题。

比如说业务逻辑处理时间>自己设置的锁自动释放时间的话,Redis就会按超时情况把锁释放掉,而其他线程就会趁虚而入抢夺锁从而出现问题,因此需要有一个续期的操作。

并且,如果释放锁的操作在finally完成,需要判断一下当前锁是否是属于自己的锁,防止释放掉其他线程的锁,这样释放锁的操作就不是原子性了,而这个问题很好解决,使用lua脚本即可。

Redisson的出现,其中的看门狗机制很好解决续期的问题,它的主要步骤如下:

  1. 在获取锁的时候,不能指定leaseTime或者只能将leaseTime设置为-1,这样才能开启看门狗机制。
  2. tryLockInnerAsync方法里尝试获取锁,如果获取锁成功调用scheduleExpirationRenewal执行看门狗机制
  3. scheduleExpirationRenewal中比较重要的方法就是renewExpiration,当线程第一次获取到锁(也就是不是重入的情况),那么就会调用renewExpiration方法开启看门狗机制。
  4. renewExpiration会为当前锁添加一个延迟任务task,这个延迟任务会在10s后执行,执行的任务就是将锁的有效期刷新为30s(这是看门狗机制的默认锁释放时间)
  5. 并且在任务最后还会继续递归调用renewExpiration

也就是总的流程就是,首先获取到锁(这个锁30s后自动释放),然后对锁设置一个延迟任务(10s后执行),延迟任务给锁的释放时间刷新为30s,并且还为锁再设置一个相同的延迟任务(10s后执行),这样就达到了如果一直不释放锁(程序没有执行完)的话,看门狗机制会每10s将锁的自动释放时间刷新为30s。

而当程序出现异常,那么看门狗机制就不会继续递归调用renewExpiration,这样锁会在30s后自动释放。

或者,在程序主动释放锁后,流程如下:

  1. 将锁对应的线程ID移除
  2. 接着从锁中获取出延迟任务,将延迟任务取消
  3. 在将这把锁从EXPIRATION_RENEWAL_MAP中移除。

来源地址:https://blog.csdn.net/weixin_51146329/article/details/129612350

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

【Redis进阶】一文搞懂Redisson的看门狗机制底层实现

下载Word文档到电脑,方便收藏和打印~

下载Word文档

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录