在不同进程需要互斥地访问共享资源时,分布式锁是一种非常有用的技术手段。有很多三方库和文章描述如何用Redis实现一个分布式锁管理器,但是这些库实现的方式差别很大,而且很多简单的实现其实只需采用稍微增加一点复杂的设计就可以获得更好的可靠性。这篇文章的目的就是尝试提出一种官方权威的用Redis实现分布式锁管理器的算法,我们把这个算法称为RedLock。
Redlock是redis官方提出的实现分布式锁管理器的算法。这个算法会比一般的普通方法更加安全可靠。关于这个算法的讨论可以看下官方文档。
在Redis2.6.12版本之前,使用setnx命令设置key-value、使用expire命令设置key的过期时间获取分布式锁,使用del命令释放分布式锁。

setnx命令设置完key-value后,还没来得及使用expire命令设置过期时间,当前线程挂掉了,会导致当前线程设置的key一直有效,后续线程无法正常通过setnx获取锁,造成死锁。
解决思路:死锁问题是因为两个命令是分开执行并且不具备原子特性,如果能将这两个命令合二为一就可以解决问题了。在Redis2.6.12版本中实现了这个功能,Redis为set命令增加了一系列选项,可以通过 SET resource_name my_random_value NX PX max-lock-time来获取分布式锁,这个命令仅在不存在key(resource_name)的时候才能被执行成功(NX选项),并且这个key有一个max-lock-time秒的自动失效时间(PX属性)。这个key的值是“my_random_value”,它是一个随机值,这个值在所有的机器中必须是唯一的,用于安全释放锁。
在分布式环境下,线程A通过这种实现方式获取到了锁,但是在获取到锁之后,执行被阻塞了,导致该锁失效,此时线程B获取到该锁,之后线程A恢复执行,执行完成后释放该锁,直接使用del命令,将会把线程B的锁也释放掉,而此时线程B还没执行完,将会导致不可预知的问题。
解决思路:为了解决第二个问题,用到了“my_random_value”,释放锁的时候,只有key存在并且存储的“my_random_value”值和指定的值一样才执行del命令,此过程可以通过以下Lua脚本实现:
- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
Redis分布式锁会有个缺陷,就是在Redis的sentinel哨兵模式下:
这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。
缺陷在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。
解决思路:第三个问题是因为采用了主从复制导致的,解决方案是不采用主从复制,使用RedLock算法,RedLock底层逻辑和ZK锁很类似。
这里引用网上一段关于RedLock算法的描述:
在Redis的分布式环境中,我们有N个Redis master节点,这些节点完全互相独立,不存在主从复制或者其他集群协调机制。为了取到锁,客户端应该执行以下操作,比如有5个Redis master:
当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。通过上面的解决方案可以实现一个高效、高可用的分布式锁,这里推荐一个成熟、开源的分布式锁实现,即Redisson。
优点:高性能,借助Redis实现比较方便。
缺点:线程获取锁后,如果处理时间过长会导致锁超时失效,所以,通过锁超时机制不是十分可靠。
详解:
首先要有多个(最好是奇数个)对等的(没有主从关系)Redis结点。当进行加锁时(比如是用SETNX命令),则这个设置key-value的命令会发给每个Redis结点执行,当且仅当客户端收到超过半数的结点写成功的消息时,才认为加锁成功,才开始执行后面的业务代码。

下图中,Client 1向Redis 1/2/3三个结点去写key-value,假设当前处在Redis 1和Redis 2写入成功了,Redis 3还没有写入成功的状态,这个时候Client 1就已经认为加锁成功了,实际上已经可以执行业务代码了。
此时,假设有一个Redis结点挂了(最坏的情况就是已经写入了key的一个结点挂了,如下图所示Redis 1挂了),这个时候假设Client 2也要尝试加锁,此时Redis 2由于已经被Client 1写过了,没法写入成功,但是Redis 3可以写入成功。此时只有1个结点能写入成功,所以认为加锁不成功,这样Client 2就不会开始错误的执行业务代码,也就不会出现并发安全问题。
RedLock目前还有一些争议,所以很多时候可能不会用这种解决方案。下面是老师上课时展示的一块用Redisson实现RedLock的伪代码。在使用的时候,大体就是对不同的Redis实例,用Redisson获取RLock对象,然后用这些RLock对象来构造RedissonRedLock对象,然后用它来实现加锁解锁的逻辑即可。

“分布式锁”这个东西从概念上就和“高并发”是背道而驰的,只是为了解决高并发量下的程序并发安全问题,用一把锁实际上把所有控制的代码(线程)变成了顺序执行,这样其实会损失很多性能,但是在并发量不是太高的场景下一般也就直接这样用就够了。
ConcurrentHashMap是并发安全的集合,而且在高并发的场景下性能表现也很不错,可以参考下它的底层实现——分段锁。利用这种思想就可以从业务角度出发,对操作的业务实体进行分段,来优化分布式锁。
比如在秒杀场景下,iPhone这个商品有200个,要做秒杀,那么可以每20个分成一段,让它变成10个物品:
iPhone_1(20个), iPhone_2(20个), ..., iPhone_10(20个)
这个时候如果要做秒杀,则不同段的“商品”iPhone_x可以并行执行,在上面这种方式下并发量就提升到了10倍。
如果某个段的“商品”库存不够减了,比如iPhone_2只剩1个了,但是这个时候要一次性减掉3个,这个时候就可以合并其他段位的“商品”,然后再去减库存。
Zookeeper(以后简称ZK)也是一种k-v形式的存储中间件,只是它的内部结构是树形的,在ZK的集群里也有主从的概念,主结点叫Leader,从结点叫Follower,如果使用ZK就能解决这种主从同步引起的分布式锁失效问题。
基于zookeeper临时有序节点可以实现的分布式锁。
大致思想即为:每个客户端对某个方法加锁时,在zookeeper上的与该客户端对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单,只需要判断当前生成的序号是否是此目录下有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
来看下Zookeeper能不能解决前面提到的问题。

ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:
1、创建一个目录mylock;
2、线程A想获取锁就在mylock目录下创建临时顺序节点;
3、获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
4、线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
5、线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。
这里推荐一个apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。
优点:具备高可用、可重入、阻塞锁特性,可解决失效死锁问题。
缺点:因为需要频繁的创建和删除节点,性能上不如Redis方式。
redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.9.0</version>
</dependency>
集群模式除了适用于Redis集群环境,也适用于任何云计算服务商提供的集群模式,例如AWS ElastiCache集群版、Azure Redis Cache和阿里云(Aliyun)的云数据库Redis版。
程序化配置集群的用法:

基于Redis的Redisson红锁 RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个 RLock对象关联为一个红锁,每个 RLock对象实例可以来自于不同的Redisson实例。

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://192.168.0.1:5378")
.setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://192.168.0.1:5379")
.setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://192.168.0.1:5380")
.setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
String resourceName = "REDLOCK_KEY";
RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);
// 向3个redis实例尝试加锁
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
// isLock = redLock.tryLock();
// 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
System.out.println("isLock = "+isLock);
if (isLock) {
//TODO if get lock success, do something;
}
} catch (Exception e) {
} finally {
// 无论如何, 最后都要解锁
redLock.unlock();
}
实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock("REDLOCK_KEY"),源码在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
return id + ":" + threadId;
}
获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 获取锁时需要在redis实例上执行的lua命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 获取分布式锁的KEY的失效时间毫秒数
"return redis.call('pttl', KEYS[1]);",
// 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
释放锁的代码为redLock.unlock(),核心源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 释放锁时需要在redis实例上执行的lua命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果分布式锁KEY不存在,那么向channel发布一条消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果就是当前线程占有分布式锁,那么将重入次数减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
大家都知道,如果负责储存某些分布式锁的某些Redis节点宕机以后,而且这些锁正好处于锁住的状态时,这些锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。
默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改
Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了 leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
自己的理解就是:
在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。
但在实际开发中会有下面一种情况:
//设置锁1秒过去
redissonLock.lock(“redisson”, 1);
/**
* 业务逻辑需要咨询2秒
*/
redissonLock.release(“redisson”);
( * 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,
这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性。
Redisson可以实现可重入加锁机制的原因,我觉得跟两点有关:
● Redis存储锁的数据类型是 Hash类型
● Hash数据类型的key值包含了当前线程信息。
下面是redis存储的数据

这里表面数据类型是Hash类型,Hash类型相当于我们java的 <key,<key1,value>> 类型,这里key是指 ‘redisson’
它的有效期还有9秒,我们再来看里们的key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45它的组成是:guid + 当前线程的ID。后面的value是就和可重入加锁有关。

上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
我有一个启动DRb服务的脚本,然后生成处理程序对象并通过DRb.thread.join等待。我希望脚本一直运行直到被明确杀死,所以我添加了trap"INT"doDRb.stop_serviceend在Ruby1.8下成功停止DRb服务并退出,但在1.9下似乎死锁(在OSX10.6.7上)。对该进程进行采样显示在semaphore_wait_signal_trap中有几个线程在旋转。我假设我在调用stop_service时做错了什么,但我不确定是什么。谁能给我任何关于如何正确处理它的指示? 最佳答案 好的,我想我已经找到了解决方案。如
BigData/CloudComputing:基于阿里云技术产品的人工智能与大数据/云计算/分布式引擎的综合应用案例目录来理解技术交互流程目录一、云计算网站建设:部署与发布网站建设:简单动态网站搭建云服务器管理维护云数据库管理与数据迁移云存储:对象存储管理与安全超大流量网站的负载均衡二、大数据MOOC网站日志分析搭建企业级数据分析平台基于LBS的热点店铺搜索基于机器学习PAI实现精细化营销基于机器学习的客户流失预警分析使用DataV制作实时销售数据可视化大屏使用MaxCompute进行数据质量核查使用Quick BI制作图形化报表使用时间序列分解模型预测商品销量三、云安全云平台使用安全云上服务
我不太确定如何表达这一点,所以我只是举个例子。如果我写:some_method(["a","b"],3)我希望它返回某种形式的[{"a"=>0,"b"=>3},{"a"=>1,"b"=>2},{"a"=>2,"b"=>1},{"a"=>3,"b"=>0}]如果我传入some_method(%w(abc),2)期望的返回值应该是[{"a"=>2,"b"=>0,"c"=>0},{"a"=>1,"b"=>1,"c"=>0},{"a"=>1,"b"=>0,"c"=>1},{"a"=>0,"b"=>2,"c"=>0},{"a"=>0,"b"=>1,"c"=>1},{"a"=>0,"b"=>0,"
文章目录概述定义使用场景特点工作流程连接器转换为何选择SeaTunnel安装下载配置文件部署模式入门示例启动脚本配置文件使用参数示例Kafka进Kafka出的ETL示例FlinkRun传递参数概述定义SeaTunnel官网http://seatunnel.incubator.apache.org/SeaTunnel最新版本官网文档http://seatunnel.incubator.apache.org/docs/2.1.3/intro/aboutSeaTunnelGitHub地址https://github.com/apache/incubator-seatunnelSeaTunnel是一个
用ruby生成正态分布随机数的代码是什么?(注意:我回答了我自己的问题,但我会等几天再接受,看看是否有人有更好的答案。)编辑:为此,我查看了两次搜索产生的SO上的所有页面:+“正态分布”ruby和+高斯+随机ruby 最佳答案 Python的random.gauss()和Boost的normal_distribution都使用Box-Mullertransform,所以这对Ruby来说也应该足够好了。defgaussian(mean,stddev,rand)theta=2*Math::PI*rand.callrho=Math.s
一、知识框架二、练习题调节一个装瓶机使其对每个瓶子的灌装量均值为μ盎司,通过观察这台装瓶机对每个瓶子的灌装量服从标准差σ=1.0盎司的正态分布。随机抽取这台机器灌装的9个瓶子组成一个样本,并测定每个瓶子的灌装量。试确定样本均值偏离总体均值不超过0.3盎司的概率。解:设每个瓶子的灌装量为X,X为样本均值,样本容量为n。由于总体X服从正态分布,样本均值X也服从正态分布,且均值相同,标准差为所以三、简述题1什么是统计量?为什么要引进统计量?统计量中为什么不含任何未知参数?答:(1)统计量的定义:设X1,X2,…,Xn是从总体X中抽取的容量为n的一个样本,如果由此样本构造一个函数T(X1,X2,…,X
嘿,有没有办法选择均匀分布的随机数?我用过这个功能Math.floor(Math.random()*2)返回1或0。但是,我不认为它有确切的50%的机会产生任何一个。更好的想法?谢谢 最佳答案 如果你不相信,检查:vartotal=0;varones=0;for(vari=0;i此代码给出0.49972-非常接近50%。 关于javascript-均匀分布的随机数,我们在StackOverflow上找到一个类似的问题: https://stackoverflo
我想返回一个数组,其中包含一组根据自定义频率随机分布的唯一元素。我的真实用例是根据对这些图像的流行程度进行定性加权来重复轮播图像。例如假设我有5个带权重的元素:一个,20%B、50%C、80%D、10%我想写一个函数,在给定长度的情况下,尝试逼近一个序列,使得C出现的频率是D的八倍;D出现的次数比B少5倍;A的出现频率是C的三倍。 最佳答案 CwillappeareighttimesmoreoftenthanD;Dwillappear5timeslessoftenthanB;Awillappearthreetimeslessofte
文章目录一、前言二、概述三、TM事务管理器初始化1、TM初始化流程图2、TM初始化流程1)获取TmNettyRemotingClient实例1>TmNettyRemotingClient实例化2>AbstractNettyRemotingClient实例化2)初始化TmNettyRemotingClient1>注册一些请求处理组件2>初始化AbstractNettyRemotingClient(1)AbstractNettyRemoting初始化(2)启动netty客户端组件Abs