草庐IT

分布式锁 - Redisson的看门狗(watchdog)机制

提灯笼的鱼 2023-06-15 原文

分布式锁 - Redisson的看门狗(watchdog)机制

前言

本篇文章从Redisson的加锁(tryLock)入手,带大家由源码来了解一下watchdog的自动延迟加锁操作,如果对Redisson的加锁机制没有了解,建议可以先看一下本人的另一篇博客分布式锁-Redisson的使用及源码分析

结论

  • 想要触发Redisson看门狗机制,不能自定义 leaseTime(或者传参 -1)
  • Redisson默认加锁30秒,每隔10秒刷新加锁时间
  • watchdog的延时时间 可以由 lockWatchdogTimeout指定默认延时时间,但是不要设置太小
  • Redisson是通过Future和Timeout功能来实现异步延时

源码分析

前文讲到 tryLock() 中最重要的加锁逻辑是 tryAcquire(waitTime, leaseTime, unit, threadId),今天我们就来从加锁的方法入手看一下如何完成WatchDog

Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);

接下来,我们进入这个方法看一下

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // get() 方法异步阻塞式获取加锁结果    
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

/**
 * 重点方法 看这里
 */
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Boolean> acquiredFuture;
    
    	// 1、判断锁的持有时间是否由用户自定义(这里我不在细节讨论,感兴趣的童鞋们可以去看上一篇文章)
        if (leaseTime != -1) {
            acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {
            // 2、当用户没有自定义锁占有时间时,默认传入 internalLockLeaseTime
            // private long lockWatchdogTimeout = 30 * 1000; 默认30秒
            acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }

        CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
            // lock acquired
            if (acquired) {
                if (leaseTime != -1) {
                    // 3、如果用户传入占用时间直接转换,把默认值internalLockLeaseTime 更新为用户自定义的占有时间
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    // 4、看这里,看这里,没错,这里就是触发看门狗机制的方法
                    // 重点:只有当 leaseTime == -1时才会触发看门狗机制
                    scheduleExpirationRenewal(threadId);
                }
            }
            return acquired;
        });
        return new CompletableFutureWrapper<>(f);
    }

我们来看一下方法:scheduleExpirationRenewal(threadId);

protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
    	// 1、EXPIRATION_RENEWAL_MAP 是一个全局的静态常量Map
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            // 2、oldEntry != null 说明该线程不是第一次触发
            oldEntry.addThreadId(threadId);
        } else {
            // 3、oldEntry == null 说明该线程是第一次触发
            entry.addThreadId(threadId);
            try {
                // 4、更新过期时间
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }

接下来,我们一起看一下renewExpiration()是如何更新过期时间的

private void renewExpiration() {
    	// 1、获取当前线程的更新对象
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
    	// 2、创建了一个定时任务
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                // 3、异步更新过期时间
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                // 4、res 执行结果,e 异常
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        // 5、如果出现异常,从map中删除,直接返回
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // 6、如果没有报错,就再次定时延期
                        renewExpiration();
                    } else {
                        // 7、否则取消定时
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    	// 8、延迟时间为:internalLockLeaseTime / 3 也就是10秒    
    
        ee.setTimeout(task);
    }

// 通过lua脚本判断给key重新设置过期时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

有关分布式锁 - Redisson的看门狗(watchdog)机制的更多相关文章

  1. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  2. ruby - 停止分布式 Ruby 服务 - 2

    我有一个启动DRb服务的脚本,然后生成处理程序对象并通过DRb.thread.join等待。我希望脚本一直运行直到被明确杀死,所以我添加了trap"INT"doDRb.stop_serviceend在Ruby1.8下成功停止DRb服务并退出,但在1.9下似乎死锁(在OSX10.6.7上)。对该进程进行采样显示在semaphore_wait_signal_trap中有几个线程在旋转。我假设我在调用stop_service时做错了什么,但我不确定是什么。谁能给我任何关于如何正确处理它的指示? 最佳答案 好的,我想我已经找到了解决方案。如

  3. ruby - Ruby 是否提供响应 OS X 上的 Apple 事件的机制? - 2

    我正在使用Ruby-Tk为OSX开发一个桌面应用程序,我想为该应用程序提供一个AppleEvents接口(interface)。这意味着应用程序将定义它将响应的AppleScript命令的字典(对应于发送到应用程序的Apple事件),并且用户/其他应用程序可以使用AppleScript命令编写Ruby-Tk应用程序的脚本。其他脚本语言支持此类功能——Python通过位于http://appscript.svn.sourceforge.net/viewvc/appscript/py-aemreceive/的py-aemreceive库和Tcl通过位于http://tclae.source

  4. ruby - Ruby 的方法解除绑定(bind)机制有什么意义? - 2

    Method#unbind返回对该方法的UnboundMethod引用,稍后可以使用UnboundMethod#bind将其绑定(bind)到另一个对象.classFooattr_reader:bazdefinitialize(baz)@baz=bazendendclassBardefinitialize(baz)@baz=bazendendf=Foo.new(:test1)g=Foo.new(:test2)h=Bar.new(:test3)f.method(:baz).unbind.bind(g).call#=>:test2f.method(:baz).unbind.bind(h).

  5. BigData/Cloud Computing:基于阿里云技术产品的人工智能与大数据/云计算/分布式引擎的综合应用案例目录来理解技术交互流程 - 2

    BigData/CloudComputing:基于阿里云技术产品的人工智能与大数据/云计算/分布式引擎的综合应用案例目录来理解技术交互流程目录一、云计算网站建设:部署与发布网站建设:简单动态网站搭建云服务器管理维护云数据库管理与数据迁移云存储:对象存储管理与安全超大流量网站的负载均衡二、大数据MOOC网站日志分析搭建企业级数据分析平台基于LBS的热点店铺搜索基于机器学习PAI实现精细化营销基于机器学习的客户流失预警分析使用DataV制作实时销售数据可视化大屏使用MaxCompute进行数据质量核查使用Quick BI制作图形化报表使用时间序列分解模型预测商品销量三、云安全云平台使用安全云上服务

  6. ruby - 数组的所有可能分布,来自一个数字 - 2

    我不太确定如何表达这一点,所以我只是举个例子。如果我写:some_method(["a","b"],3)我希望它返回某种形式的[{"a"=>0,"b"=>3},{"a"=>1,"b"=>2},{"a"=>2,"b"=>1},{"a"=>3,"b"=>0}]如果我传入some_method(%w(abc),2)期望的返回值应该是[{"a"=>2,"b"=>0,"c"=>0},{"a"=>1,"b"=>1,"c"=>0},{"a"=>1,"b"=>0,"c"=>1},{"a"=>0,"b"=>2,"c"=>0},{"a"=>0,"b"=>1,"c"=>1},{"a"=>0,"b"=>0,"

  7. Seatunnel超高性能分布式数据集成平台使用体会 - 2

    文章目录概述定义使用场景特点工作流程连接器转换为何选择SeaTunnel安装下载配置文件部署模式入门示例启动脚本配置文件使用参数示例Kafka进Kafka出的ETL示例FlinkRun传递参数概述定义SeaTunnel官网http://seatunnel.incubator.apache.org/SeaTunnel最新版本官网文档http://seatunnel.incubator.apache.org/docs/2.1.3/intro/aboutSeaTunnelGitHub地址https://github.com/apache/incubator-seatunnelSeaTunnel是一个

  8. Selenium等待机制之显示等待 - 2

    显示等待需要用到两个类:WebDriverWait和expected_conditions两个类WebDriverWait:指定轮询间隔、超时时间等expected_conditions:指定了很多条件函数(也可以自定义条件函数)具体可以参考官网:selenium.webdriver.support.expected_conditions—Selenium4.5documentationfromseleniumimportwebdriverfromselenium.webdriver.common.byimportByfromselenium.webdriver.support.uiimpor

  9. ruby - 在 Ruby 中生成高斯(正态分布)随机数的代码 - 2

    用ruby​​生成正态分布随机数的代码是什么?(注意:我回答了我自己的问题,但我会等几天再接受,看看是否有人有更好的答案。)编辑:为此,我查看了两次搜索产生的SO上的所有页面:+“正态分布”ruby和+高斯+随机ruby 最佳答案 Python的random.gauss()和Boost的normal_distribution都使用Box-Mullertransform,所以这对Ruby来说也应该足够好了。defgaussian(mean,stddev,rand)theta=2*Math::PI*rand.callrho=Math.s

  10. ruby - 不支持您提供的授权机制。请使用 AWS4-HMAC-SHA256 - 2

    我收到错误AWS::S3::Errors::InvalidRequest不支持您提供的授权机制。请使用AWS4-HMAC-SHA256.当我尝试将文件上传到新法兰克福地区的S3存储桶时。所有适用于USStandard区域。脚本:backup_file='/media/db-backup_for_dev/2014-10-23_02-00-07/slave_dump.sql.gz's3=AWS::S3.new(access_key_id:AMAZONS3['access_key_id'],secret_access_key:AMAZONS3['secret_access_key'])s3_

随机推荐