草庐IT

ReentrantLock 公平锁源码 第0篇

BNDong 2023-03-28 原文

ReentrantLock 0

关于ReentrantLock的文章其实写过的,但当时写的感觉不是太好,就给删了,那为啥又要再写一遍呢

最近闲着没事想自己写个锁,然后整了几天出来后不是跑丢线程就是和没加锁一样,而且五六段就一个cas性能很差,感觉离大师写的差十万八千里

于是!我就想重新研究研究看看大师咋写的,这篇博客也算个笔记吧,这篇看的是ReentrantLock的公平锁,准备写个两三篇关于ReentrantLock 就这两天写!

这篇博客完全个人理解,如果有不对的地方欢迎您评论或者私信我,我非常乐意接受您的意见或建议

CAS

首先要知道,ReentrantLock是基本都是在java代码层面实现的,而最主要的一个东西就是CAS compare and swap 比较并交换

这个操作可以看成为是一个原子性的,在java中可以使用反射获取到Unsafe类来进行cas操作

public test() {
    try {
        Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
        if (!unsafeField.isAccessible()) {
            unsafeField.setAccessible(true);
        }
        unsafe = (Unsafe) unsafeField.get(null);
    } catch (NoSuchFieldException | IllegalAccessException e) {
        e.printStackTrace();
    }
}

Park

在juc包下LockSupport类中有两个方法parkunpark 这两个就像是wait和notify/notifyAll,但是又不相同,可以暂时理解为就是暂停线程和启动线程

详细的介绍可以看看这个博客 : https://www.jianshu.com/p/da76b6ab56be

关于如何使用ReentrantLock就不在赘述了直接开始看代码,本来是想把类的关系图放这的,但是我的idea好像有点问题,你们可以自己打开idea看,ctrl+alt+u打开类的关系图

public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock(true);
    lock.lock();
    lock.unlock();
}

构造方法

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

lock方法

public void lock() {
    sync.lock();
}

点到里面实际调用的是FairSync类中的lock()方法

final void lock() {
    acquire(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcuqire方法

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

首先是获取了当前的线程,之后有个getState,这个方法返回的是当前这个锁的状态

protected final int getState() {
    return state;
}

hasQueuedPredecessors

先来看Node,这个Node是一个组成双向链表的实体类,几个重要的属性

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

waitStatus 存放当前结点的状态

prev 存放上个结点

next 存放下个结点

thread 存放线程

nextWaiter 翻译是下个服务员,我理解是为下个节点服务,后面咱们细说

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

这里有两个属性,tail尾结点,head头结点,之后下面一个判断分别是

头结点不等于尾结点 并且 (头结点的下一结点不等于null或者头结点后面一个结点的线程不等于当前线程)

if (c == 0) {
    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
}

hasQueuedPredecessors()接着就是一个cas,修改的这个锁的状态

如果成功,则调用setExclusiveOwnerThread()

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

将当前线程存放到exclusiveOwnerThread属性中

那么在没有冲突的情况下lock的方法就走完了,现在咱们假设只有一个线程,从头来捋一下加锁的过程

试跑一下

咱们顺着逻辑捋一下,从最开始的lock()方法开始,前面的就不写了,直接到acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

进入acquire

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
}

因为这个getState()方法获取的是属性state 而这个属性又没有其他的赋值操作,初始化就是0,进入if(c==0)

之后进入hasQueuedPredecessors()

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

首先第一个判断就已经是false了,因为tail和head都没有进行过初始化,都是null,所以等于,直接返回 false,而在hasQueuedPredecessors()方法前面还有一个!取反为true,直接进入if代码块

设置完exclusiveOwnerThread属性后就return true,走出lock()方法,加锁方法结束

exclusiveOwnerThread属性存放的是当前那个线程在占有锁

这是在没有线程获取锁冲突的情况下,如果现在两个线程同时来的话,还是看tryAcquire方法

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

咱们现在假设线程A获取到锁,去执行业务代码了,线程B进入

getState()获取的值就不在是0了,因为线程A执行完compareAndSetState(0, acquires)改的就是getState()方法获取的state属性

那么进入else if (current == getExclusiveOwnerThread()) 哎这个不是获取当前占有锁的那个线程的方法吗,是的

那为什么有这个判断呢,ReentrantLock的特性 重入锁,啥叫重入锁?:同一个线程可以多次获取同一个锁 例如下面的例子

public class Test{
    private static final ReentrantLock LOCK=new ReentrantLock(true);

    public void a(){
        LOCK.lock();
        b();
        LOCK.unlock();
    }

    public void b(){
        LOCK.lock();
        //xxxxxx
        LOCK.unlock();
    }
}

如果没有重入锁的特性,那么这个方法是不是就死锁了呢?,假设当我们一个线程去调用a方法时..

  • a : 兄弟我需要锁才能执行你的代码啊

  • b : 那你先解锁啊

  • a : 我要调用完你我才能解锁啊

  • b : 那你不解锁咋调用我啊

........

好了回到代码中,就是将锁持有的状态+1,设置后返回true,因为我们现在是B线程,所以这个if不成立,返回false

回到acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

因为tryAcquire为false,取反继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

addWaiter

先来看里面的addWaiter方法吧,传了一个参数Node.EXCLUSIVE

static final Node EXCLUSIVE = null;

这个参数是Node类中的一个属性

private Node addWaiter(Node mode) {
    //创建一个Node
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

Node的有参构造如下

Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}

首先是创建了一个Node结点,之后判断如果tail结点不为null,因为A线程走完tryAcquire直接返回了,tail和head都是null,所以if不成立,进入enq方法

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

首先还是获取tail,那么这时候还是为null,因为我们的假设就两个线程,A线程已经去执行业务了,所以进去第一个if

通过cas来设置头节点为一个new Node() 注意!这里是新new的Node,设置完后将头在设置给尾,那么此时的节点关系如下

em?? 咱们这B节点也没有添加进链表啊,别急,看看上面的for(;;)

在下次循环的时候tail还等于null吗?答案是否定的

之后还是头节点赋值给t,将B节点的上一个设置为t,cas设置tail,成功后t节点的next设置为B节点,返回t (返回的值其实没接收)

挺简单的逻辑说的太费劲了,还是看图吧,执行完第二遍for后节点关系如下

这个enq方法是百分百能确定这个节点已经添加进去的,因为你不添加进去就出不来这个方法,那么返回addWaiter方法,走完这个enq就还有一句话,return node;

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

那么一进来就定义了一个failed用来处理如果发生错误需要将链表中错误的节点移除,咱先不看

之后的一个interrupted存放是否被打断过,继续发现还是一个for(;;),第一步执行了node.predecessor()

final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}

也就是先获取了下B节点的上一个,也就是那个线程为null的空节点(注意:不是null,而是一个空的Node)

判断上个节点是不是head,如果是,则尝试获取锁

假设现在A线程还是没有完成业务代码,没有执行unlock(),那么我们进入下个if,

if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
    interrupted = true;

shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

代码不多,但是不太好理解 开始还是获取B节点上个节点的状态,也就是那个空节点,因为咱们一路代码跟过来的,没有看到哪里对空节点的state属性进行过修改,所以它还是0

那么第一个判断

static final int SIGNAL    = -1;  //Node类中的属性

空节点的状态是否为-1,显然不是,if(ws>0)也不会进,直接进入else,cas修改空节点的状态,改为了-1,然后返回

if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
    interrupted = true;

因为是&&阻断了后面的方法,所以不进入,那么本次循环结束,最外层是个for(;;)所以下次循环开始

我们还是假设通过tryAcquire()没有获取到锁,又进入了shouldParkAfterFailedAcquire

那么这次第一个if我们就会进去,因为上次循环已经将B节点前面的一个空节点的状态改为-1了,return true

回到if那么就进入parkAndCheckInterrupt方法

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); 
    return Thread.interrupted();
}

park,那么B线程就停在这里了,把目光回到A线程,它终于执行完业务代码了,执行unlock

unlock

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

看第一个if中的tryRelease

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

第一件事就是将锁的状态-1,因为它重入一次就+1,这也是为什么调用lock多少次就需要调用unlock多少次,因为要保证锁的状态为0

之后判断加锁的线程和解锁的线程是不是同一个,不是抛出异常

boolean free = false;这个来标识这个锁是不是已经没有人持有了,因为A线程就调用了一次lock,所以if(c==0)成立

将free 改为true后将当前持有锁的线程设置为null,设置锁的状态,返回true,回到release方法

因为返回true,所以进入if,判断head节点不为空,并且头节点的状态不为0

那么头节点是空的吗? -不是 因为B节点在初始化链表是添加了一个空的节点(再说一遍不是null!是空的Node节点)

那么头节点的状态是0吗? -不是 我们在第二次执行shouldParkAfterFailedAcquire()方法时已经将头节点的状态设置为-1了

那么进入unparkSuccessor()

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

获取,cas将头节点的状态赋值为0,获取头节点的下一个节点,也就是我们的B节点,那么if (s == null || s.waitStatus > 0) 为false,走最下面的if(s!=null)

就一句话 unpark(s.thread)

B线程被唤醒后回到acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

因为是个死循环,那么继续走if的判断if (p == head && tryAcquire(arg))这个时候tryAcquire方法就可以获取到锁,进入if,设置head并清除引用后方法结束

到此,AB线程都走完了

篇幅有点长了,这两天再接着写,拜拜?

有关ReentrantLock 公平锁源码 第0篇的更多相关文章

  1. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  2. elasticsearch源码关于TransportSearchAction【阶段三】 - 2

    1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>

  3. (附源码)vue3.0+.NET6实现聊天室(实时聊天SignalR) - 2

    参考文章搭建文章gitte源码在线体验可以注册两个号来测试演示图:一.整体介绍  介绍SignalR一种通讯模型Hub(中心模型,或者叫集线器模型),调用这个模型写好的方法,去发送消息。  内容有:    ①:Hub模型的方法介绍    ②:服务器端代码介绍    ③:前端vue3安装并调用后端方法    ④:聊天室样例整体流程:1、进入网站->调用连接SignalR的方法2、与好友发送消息->调用SignalR的自定义方法 前端通过,signalR内置方法.invoke()  去请求接口3、监听接受方法(渲染消息)通过new signalR.HubConnectionBuilder().on

  4. Cesium源码解析一(terrain文件的加载、解析与渲染全过程梳理) - 2

    快速导航(持续更新中…)Cesium源码解析一(terrain文件的加载、解析与渲染全过程梳理)Cesium源码解析二(metadataAvailability的含义)Cesium源码解析三(metadata元数据拓展中行列号的分块规则解析)Cesium源码解析四(Quantized-Mesh(.terrain)格式文件在CesiumJS和UE中加载情况的对比)目录1.前言2.本篇的由来3.terrain文件的加载3.1更新环境3.2更新和执行渲染命令3.3数据优化3.4结束当前帧4.总结1.前言  目前市场上三维比较火的实现方案主要有两种,b/s的方案主要是Cesium,c/s的方案主要是u

  5. 停车系统源码-基于springboot+uniapp开源项目 - 2

    Iparking停车收费管理系统-可商用介绍Iparking是一款基于springBoot的停车收费管理系统,支持封闭车场和路边车场,支持微信支付宝多种支付渠道,支持多种硬件,涵盖了停车场管理系统的所有基础功能。技术栈Springboot,MybatisPlus,Beetl,Mysql,Redis,RabbitMQ,UniApp功能云端功能序号模块功能描述1系统管理菜单管理配置系统菜单2系统管理组织管理管理组织机构3系统管理角色管理配置系统角色,包含数据权限和功能权限配置4系统管理用户管理管理后台用户5系统管理租户管理多租户管理6系统管理公众号配置租户公众号配置7系统管理操作日志审计日志8系统

  6. 打通源码,高效定位代码问题|云效工程师指北 - 2

    大家好,我叫胡飞虎,花名虎仔,目前负责云效旗下产品Codeup代码托管的设计与开发。代码作为企业最核心的数据资产,除了被构建、部署之外还有更大的价值。为了帮助企业和团队挖掘更多源代码价值以赋能日常代码研发、运维等工作,云效代码团队在大数据和智能化方向进行了一系列的探索和实践(例如代码搜索与推荐),本文主要介绍我们如何通过直接打通源代码来提高研发与运维效率。随着微服务架构的流行,一个业务流程需要多个微服务共同完成。一旦出现问题,运维人员在面对数量多、调用链路复杂的情况下,很难快速锁定导致问题发生的罪魁祸首:代码。为了提高排查效率,目前常见的解决方案是:链路跟踪+日志分析工具相结合。即通过链路跟踪

  7. Android Studio开发之使用内容组件Content获取通讯信息讲解及实战(附源码 包括添加手机联系人和发短信) - 2

    运行有问题或需要源码请点赞关注收藏后评论区留言一、利用ContentResolver读写联系人在实际开发中,普通App很少会开放数据接口给其他应用访问。内容组件能够派上用场的情况往往是App想要访问系统应用的通讯数据,比如查看联系人,短信,通话记录等等,以及对这些通讯数据及逆行增删改查。首先要给AndroidMaifest.xml中添加响应的权限配置 下面是往手机通讯录添加联系人信息的例子效果如下分成三个步骤先查出联系人的基本信息,然后查询联系人号码,再查询联系人邮箱代码 ContactAddActivity类packagecom.example.chapter07;importandroid

  8. java 版本企业电子招投标采购系统源码之登录页面 - 2

    ​ 信息数智化招采系统服务框架:SpringCloud、SpringBoot2、Mybatis、OAuth2、Security前端架构:VUE、Uniapp、Layui、Bootstrap、H5、CSS3涉及技术:Eureka、Config、Zuul、OAuth2、Security、OSS、Turbine、Zipkin、Feign、Monitor、Stream、ElasticSearch等企业电子化采购系统企业电子化采购系统是明理公司在多家大、中、小型企业采购需求的分析与实际应用的基础上,结合企业采购流程优化再造理念开发的一体化电子招标采购平台,对于招标项目提供交易过程的全流程电子化、规范化管

  9. 有符号距离场原理及实现源码 - 2

    有符号距离场(SDF:SignedDistanceField)是距离场的一种变体,它在3D(2D)空间中将位置映射到其到最近平面(边缘)的距离。距离场在图像处理、物理学和计算机图形学等许多研究中都有应用。在计算机图形的上下文中,距离场通常是有符号的,表示某个位置是否在网格内。在计算机图形学和游戏开发中,SDF显示出极大的通用性,它可以用于碰撞测试、网格表示、光线追踪等。此外,人们发现它在使用光线追踪渲染场景时也有一些好处(即,ray-marching)算法——几乎不需要额外成本就可以产生像软阴影和环境光遮蔽这样的阴影效果。这个项目是关于实时光线行进渲染器的从零开始的C++实现,它包括一个SDF

  10. SpringSecurity 源码理解及使用(三) - 2

    目录springSecurity授权权限管理策略基于url的权限管理基于方法的权限管理将url权限管理设为动态会话管理会话并发管理会话失效处理禁止再次登录会话共享源码分析CSRF跨站请求伪造开启CSRF防御传统web开发前后端分离开启CSRF防护csrf防御过程CORS跨域问题springBoot解决跨域的三种方式springSecurity解决跨域springSecurity授权认证与授权解耦授权:据系统提前设置好的规则,给用户分配可以访问某一个资源的权限,用户根据自己所具有权限,去执行相应操作。GrantedAuthority应该如何理解呢?是角色还是权限?权限是具体一些操作,角色是一些权

随机推荐