摘要:当你使用java实现一个线程同步的对象时,一定会包含一个问题:你该如何保证多个线程访问该对象时,正确地进行阻塞等待,正确地被唤醒?
本文分享自华为云社区《JUC中的AQS底层详细超详解,剖析AQS设计中所需要考虑的各种问题!》,作者: breakDawn 。
当你使用java实现一个线程同步的对象时,一定会包含一个问题:
你该如何保证多个线程访问该对象时,正确地进行阻塞等待,正确地被唤醒?
关于这个问题,java的设计者认为应该是一套通用的机制
因此将一套线程阻塞等待以及被唤醒时锁分配的机制称之为AQS
全称 AbstractQuenedSynchronizer
中文名即抽象的队列式同步器 。
基于AQS,实现了例如ReentenLock之类的经典JUC类。
AQS中的资源是一个int值,而且是volatile的,并提供了3个方法给子类使用:
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// cas方法
compareAndSetState(int oldState, int newState);
如果state上限只有1,那么就是独占模式Exclusive,例如 ReentrantLock
如果state上限大于1,那就是共享模式Share,例如 Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
对外暴露的getter/setter方法,是走不了CAS的。而且setter/getter没有被synchronized修饰。所以必须要volatile,保证可见性
这样基于AQS的实现可以直接通过getter/setter操作state变量,并且保证可见性,也避免重排序带来的影响。比如CountDownLatch,ReentrantReadWriteLock,Semaphore都有体现(各种getState、setState)
volatile的state成员有一个问题,就是如果是复合操作的话不能保证复合操作的原子性
因此涉及 state增减的情况,采用CAS
如果是state设置成某个固定值,则使用setState
这个队列的目的是为了公平锁的实现
即为了保证先到先得,要求每个线程封装后的Node按顺序拼接起来。
不是的,本质上是一个链表式的队列
因此核心在于链表节点Node的定义
除了比较容易想到的prev和next指针外
还包含了该节点内的线程
以及 waitStatus 等待状态
4种等待状态如下:
入队过程可能引发冲突
因此会用CAS保障入队安全。
private Node enq(final Node node) {
//多次尝试,直到成功为止
for (;;) {
Node t = tail;
//tail不存在,设置为首节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//设置为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
一旦有节点出队,说明有线程释放资源了,队头的等待线程可以开始尝试获取了。
于是首节点的线程释放同步状态后,将会唤醒它的后继节点(next)
而后继节点将会在获取同步状态成功时将自己设置为首节点
注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态
AQS使用的设计模式是模板方法模式。
具体代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 发现中断过,则触发中断异常
selfInterrupt();
}
即AQS抽象基类AbstractQueuedSynchronizer给外部调用时,都是调的acquire(int arg)方法。这个方法的内容是写死的。
而acquire中,需要调用tryAcquire(arg), 这个方法是需要子类实现的,作用是判断资源是否足够获取arg个
(下面部分代码注释选自: (2条消息) AQS子类的tryAcquire和tryRelease的实现_Mutou_ren的博客-CSDN博客_aqs tryacquire )
这里暂时只谈论一种容易理解的tryAcuire实现,其他附加特性的tryAcquire先不提。
里面主要就做这几件事:
protected final boolean tryAcquire(int acquires){
final Thread current = Thread.currentThread();
int c = getState();
// state==0代表当前没有锁,可以进行获取
if (c == 0) {
// 非公平才有的判断,会判断是否还有前驱节点,直接自己为头节点了或者同步队列空了才会继续后面的锁的获取操作
if (!hasQueuedPredecessors()
//CAS设置state为acquires,成功后标记exclusiveOwnerThread为当前线程
&& compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前占有线程等于自己,代表重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 出现负数,说明溢出了
if (nextc < 0) //
throw new Error("Maximum lock count exceeded");
// 因为是重入操作,可以直接进行state的增加,所以不需要CAS
setState(nextc);
return true;
}
return false;
}
当获取资源失败,会进行addWaiter(Node.EXCLUSIVE), arg)。
目的是创建一个等待节点Node,并添加到等待队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 通过CAS竞争队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 竞争队尾失败,于是进行CAS频繁循环竞争队尾
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
并在 "处于头节点时尝试获取资源->睡眠->唤醒“中循环。
当已经跑完任务的线程释放资源时,会唤醒之前阻塞的线程。
当被唤醒后,就会检查自己是不是头节点,如果不是,且认为可以阻塞,那就继续睡觉去了
(下面代码注释部分选自AQS(acquireQueued(Node, int) 3)–队列同步器 - 小窝蜗 - 博客园 (http://cnblogs.com) )
final boolean acquireQueued(final Node node, int arg) {
// 标识是否获取资源失败
boolean failed = true;
try {
// 标识当前线程是否被中断过
boolean interrupted = false;
// 自旋操作
for (;;) {
// 获取当前节点的前继节点
final Node p = node.predecessor();
// 如果前继节点为头结点,说明排队马上排到自己了,可以尝试获取资源,若获取资源成功,则执行下述操作
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头结点
setHead(node);
// 说明前继节点已经释放掉资源了,将其next置空,好让虚拟机提前回收掉前继节点
p.next = null; // help GC
// 获取资源成功,修改标记位
failed = false;
// 返回中断标记
return interrupted;
}
// 若前继节点不是头结点,或者获取资源失败,
// 则需要判断是否需要阻塞该节点持有的线程
// 若可以阻塞,则继续执行parkAndCheckInterrupt()函数,
// 将该线程阻塞直至被唤醒
// 唤醒后会检查是否已经被中断,若返回true,则将interrupted标志置于true
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 最终获取资源失败,则当前节点放弃获取资源
if (failed)
cancelAcquire(node);
}
}
该方法不会直接阻塞线程,因为一旦线程挂起,后续就只能通过唤醒机制,中间还发生了内核态用户态切换,消耗很大。
因此会先不断确认前继节点的实际状态,在只能阻塞的情况下才会去阻塞。
并且会过滤掉cancel的线程节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前继节点的等待状态
int ws = pred.waitStatus;
// 如果等待状态为Node.SIGNAL(-1),则直接返回true即可以阻塞
// 因为这说明前继节点完成资源的释放或者中断后,会主动唤醒后继节点的(这也即是signal信号的含义),因此方法外面不用再反复CAS了,直接阻塞吧
if (ws == Node.SIGNAL) return true;
// 如果前继节点的等待值大于0即CANCELLED(1),说明前继节点的线程发生过cancel动作
// 那就继续往前遍历,直到当前节点的前继节点的状态不为cancel
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前继节点的等待状态不为SIGNAL(-1),也不为Cancel(1)
// 那么只能是PROPAGATE(-3)或者CONDITION(-2)或者INITIAL(0)
// 直接设置成SIGNAL,下一次还没CAS成功,就直接睡觉了
// 因此在前面所有节点没辩护的情况下, 最多一次之后就会返回true让外面阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
使用LockSupport.park来阻塞当前这个对象所在的线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 确认是否是中断导致的park结束,并清除中断标记
return Thread.interrupted();
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
如果还要深挖底层实现原理,可以详细见该链接
简而言之,是用mutex和condition保护了一个_counter的变量,当park时,这个变量置为了0,当unpark时,这个变量置为1。
底层用的C语言的pthread_mutex_unlock、pthread_cond_wait 、pthread_cond_signal ,但是针对了mutex和_cond两个变量进行加锁。
对线程调用 t1.interrupt();时
会导致 LockSupport.park() 阻塞的线程重新被唤醒
即有两种唤醒情况: 被前置节点唤醒,或者被外部中断唤醒
这时候要根据调用的acuire类型决定是否在中断发生时结束锁的获取。
上面介绍的是不可中断锁。
在parkAndCheckInterrupt中,当park结束阻塞时时,使用的是 Thread.interrupted() 而不是 .isInterrupted() 来返回中断状态
因为前者会返回线程当前的中断标记状态同时清除中断标志位(置为false)
外层CAS循环时, 就不会让线程受中断标记影响,只是记录一下是否发生过中断
当获取锁成功后,如果发现有过线程中断,则会触发中断异常,
之后便由获取锁的调用者自己决定是否要处理线程中断。像下面这样:
reentrantLock.lock();
try {
System.out.println("t1");
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
那么另一种情况就是可中断锁了。
ReentranLock有一个lockInterruptibly()方法就是这种情况
线程被唤醒时,如果发现自己被中断过,就会直接抛异常而不是继续获取锁
因此如果你的线程对中断很敏感,那么就是用可中断锁,及时响应。
如果不敏感,也要注意处理中断异常。
首先AQS提供的模板方法为release方法。
核心逻辑就是对资源进行尝试性释放
如果成功,就唤醒等待队列中的第一个头节点
public final boolean release(int arg) {
// 是否释放成功,tryRelease是子类要实现的方法
if (tryRelease(arg)) {
Node h = head;
// 判断头节点是否正在阻塞中,是的话唤醒
if (h != null && h.waitStatus != 0)
// 唤醒头节点
unparkSuccessor(h);
return true;
}
return false;
}
看一下ReteenLock中的tryRelease实现
就是减一下资源值。
当资源值清零,则说明可以解除了对当前点的占用
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 设置当前占用线程为null
setExclusiveOwnerThread(null);
}
// 不需要CAS,因为只有持有锁的人才能做释放,不担心竞争
setState(c);
return free;
}
以ReteenLock为例,它内部tryAcquire有两种同步器的实现
公平同步器和非公平同步器都是ReentrantLock中定义的一个static内部类
ReentrantLock根据配置的不同,使用这2个同步器做资源的获取和同步操作
他们二者的提供的lock操作,本质上就是AQS的acquire(1)
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
二者在公平和非公平的实现区别上,就是唤醒线程后,只有等待队列的队头节点才会尝试竞争。
而非公平锁是只要唤醒了就可以尝试竞争。
因此核心区别在于hasQueuedPredecessors方法!
非公平锁可能引发“饥饿”,即一个线程反复抢占获取,而其他线程一直拿不到。
而公平锁不存在饥饿,只要排上队了就一定能拿到
非公平锁的平均性能比公平锁要高, 因为非公平锁中所有人都可以CAS抢占,如果同步块的时间非常短,那么可能所有人都不需要阻塞,减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。
性能测试中公平锁的耗时是非公平锁的94.3倍, 总切换次数是133倍
默认是非公平的,原因就是上文考虑的性能差距过大问题, 因此公平锁只能用于特定对性能要求不高且饥饿发生概率不大的场景中。
先实现了一个静态内部类Sync
和上面的RLock类一个区别在于需要state初始化值,不一定为1
Sync(int permits) {
setState(permits);
}
再继承实现了FairSync和NoFairSync
使用CAS实现值的增加或者减少
公平/非公平的区别同样是hasQueuedPredecessors的判断
protected int tryAcquireShared(int acquires) {
for (;;) {
// 队头判断,公平锁核心
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
// 信号量不足,直接返回负数
if (remaining < 0 ||
// 能抢成功,返回修改后的值,抢失败则for循环继续
compareAndSetState(available, remaining))
return remaining;
}
}
通过current == getExclusiveOwnerThread()来判断并进行非CAS的setState操作
if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 出现负数,说明溢出了
if (nextc < 0) //
throw new Error("Maximum lock count exceeded");
// 因为是重入操作,可以直接进行state的增加,所以不需要CAS
setState(nextc);
return true;
}
注意处理重入问题时,如果是独占锁,是可以直接setState而不需要CAS的,因为不会竞争式地重入!
ReentrantLock释放时,也会处理重入,关键点就是对getState() - release后的处理,是否返回true或者false
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 只有资源数为0才会解锁
// 才算释放成功,否则这锁还是占住了
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
AQS提供的方法中带有Nanos后缀的方法就是支持超时中断的方法。
核心逻辑就是每次阻塞前,确认nanosTimeout是否已经超时了。
每次唤醒时,将nanosTimeout减去阻塞所花的时间,重新确认,并修改lastTime
关键部分见下图
首先这个值是写死的1000L即1000纳秒
1000纳秒是个非常小的数字,而小于等于1000纳秒的超时等待,无法做到十分的精确,那么就不要使用这么短的一个超时时间去影响超时计算的精确性,所以这时线程不做超时等待,直接做自旋就好了。
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer
刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr
我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢
我注意到像bundler这样的项目在每个specfile中执行requirespec_helper我还注意到rspec使用选项--require,它允许您在引导rspec时要求一个文件。您还可以将其添加到.rspec文件中,因此只要您运行不带参数的rspec就会添加它。使用上述方法有什么缺点可以解释为什么像bundler这样的项目选择在每个规范文件中都需要spec_helper吗? 最佳答案 我不在Bundler上工作,所以我不能直接谈论他们的做法。并非所有项目都checkin.rspec文件。原因是这个文件,通常按照当前的惯例,只
我正在使用active_admin,我在Rails3应用程序的应用程序中有一个目录管理,其中包含模型和页面的声明。时不时地我也有一个类,当那个类有一个常量时,就像这样:classFooBAR="bar"end然后,我在每个必须在我的Rails应用程序中重新加载一些代码的请求中收到此警告:/Users/pupeno/helloworld/app/admin/billing.rb:12:warning:alreadyinitializedconstantBAR知道发生了什么以及如何避免这些警告吗? 最佳答案 在纯Ruby中:classA