详解AQS二:ReentrantLock公平锁原理

Published on 2024-12-17 17:02 in 分类: 博客 with 狂盗一枝梅
分类: 博客

ReentrantLock作为我们使用频率最高的显式锁,它是AQS的经典实现,本篇文章将以ReentrantLock公平锁为例讲解AQS的实现。

一、ReentrantLock

在之前的文章《线程同步机制一:内部锁和显式锁》中已经提到过关于显式锁ReentrantLock的简单使用

private final Lock lock=new ReentrantLock(); // 创建一个Lock接口实例
……

lock.lock(); // 申请锁lock
try{
  // 在此对共享数据进行访问,即此区域为临界区代码
  ……
}finally{
  // 总是在finally块中释放锁,以避免锁泄漏
  lock.unlock(); // 释放锁lock
}

接下来深入ReentrantLock源码查看其具体实现。

1、ReentrantLock中的设计模式

ReentrantLock的实现使用了模板设计模式,关于模板设计模式,详情可参考《设计模式(十五):模板模式(Template Method Pattern)》。模板设计模式中要有一个抽象类,将部分逻辑以具体方法的形式实现,然后声明一些抽象方法来迫使子类实现剩余的逻辑,该抽象类就是AbstractQueuedSynchronizer,也就是AQS。下面看ReentrantLock的类图

AbstractQueuedSynchronizer类图

ReentrantLock内部维护着两种锁类型:公平锁FairSync、非公平锁NonfairSync,ReentrantLock在构造方法中确认当前锁实例是公平锁还是非公平锁:

/**
* 默认是非公平锁
*/
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
* 通过构造方法传参确定是公平锁还是非公平锁
*/
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

代码中的sync是ReentrantLock中的同步器,ReentrantLock的加锁、解锁调用的都是同步器的相关方法:

/**
* 加锁
*/
public void lock() {
    sync.lock();
}

/**
* 解锁
*/
public void unlock() {
    sync.release(1);
}

可以看到ReentrantLock遵循了分离变与不变的原则,将公平锁和非公平锁的变化各自封装在FairSync和NonfairSync中,不变的部分都在AQS中;而ReentrantLock本身面向抽象编程,其加锁和解锁的方法都委托给了抽象同步器Syn,无论内部使用了公平锁还是非公平锁,它都不需要改变调用的代码,这是模板方法模式的典型应用。

2、ReentrantLock的核心:AQS

AQS是ReentrantLock“模板方法模式”中的“模板”,是ReentrantLock显式锁实现的核心,如果想搞明白ReentrantLock,AQS是必须要掌握的类。AQS是AbstractQueuedSynchronizer类的缩写,该类带着注释足足有两千三百多行代码,可以说是个很复杂的类,那么如何去学习该类?胡乱看源码肯定是不可取的,突破口就在于ReentrantLock类的加锁和释放锁方法上。

对于显式锁ReentrantLock来说,核心方法就俩:lock和unlock,顺着这俩方法查找调用链,也就搞清楚了AQS的作用原理。接下来先看看AQS的代码结构。

二、AQS代码结构概览

AQS的代码实现非常长,这里说下比较重要的部分以方便之后源码追踪。

1、状态标志位

/**
 * The synchronization state.
 */
private volatile int state;

state变量是AQS的成员变量,它用来标志当前锁的状态。在ReentrantLock中,state字段初始化为0,一个线程在抢占锁之后会将它设置为1,释放锁后会将它重新设置为0;当同一个线程重复获取锁的时候,该字段会累加,释放锁的时候依次递减直到变成0,这就是可重入的概念。

2、队列节点类

ReentrantLock是独占锁,抢锁失败的线程要去队列中排队,AQS将线程封装到一个一个的节点中,通过指针将各个节点链接起来。

static final class Node {
        /** 节点类型:表示该节点是共享锁节点 */
        static final Node SHARED = new Node();
        /** 节点类型:表示该节点是独占锁节点 */
        static final Node EXCLUSIVE = null;

        /** 节点等待状态值1:该节点已取消 */
        static final int CANCELLED =  1;
        /** 节点等待状态值-1:表示下一个等待的节点需要被唤醒 */
        static final int SIGNAL    = -1;
        /** 节点等待状态值-2:表示当前节点正等待在条件队列 */
        static final int CONDITION = -2;
        /** 节点等待状态值-3:标识下一次共享锁的acquireShared操作需要无条件传播 */
        static final int PROPAGATE = -3;

        /**
         * 等待状态值,只能是以下等待状态值中的其中之一:
         *   SIGNAL:     表示该节点的后继节点正处于阻塞状态,所以当前节点释放锁
         *               以后需要唤醒后续节点。后续节点被唤醒以后会尝试获取锁,
         *               如果失败了则会再次进入阻塞状态,重复这个过程。
         *   CANCELLED:  取消状态,由于超时或者中断的原因,节点可能会被取消等待获取锁。
         *               节点一旦进入这个状态,就不能再转变成其它状态了。
         *   CONDITION:  表示当前节点正处于条件等待队列。需要注意的是该状态只能用于
         *               条件等待队列。
         *   PROPAGATE:  标识下一次共享锁的acquireShared操作需要无条件传播
         *   0:          None of the above
         *
         * 等待状态的这几个值的排序是有一定意义的,你会发现只有CANCELLED状态是大于0的。
         * 非负值意味着其后续节点不需要被唤醒。所以对于大多数情况下来说,不需要关心这些
         * 状态值是否大于0、小于0,在需要唤醒后续节点的时候,通过大于0这种简单的判断,
         * 可以简化代码。
         *
         * 在一般的同步队列中,等待状态初始值是0;在条件等待队列中,初始值则是CONDITION。
         * 修改改值的方法一般是使用CAS修改,以避免线程安全性问题。
         */
        volatile int waitStatus;

        /**
         * 前置节点的指针,可以方便当前节点查询前置节点的等待状态。在进入等待队列的时候会
         * 被赋值,在出队列的时候会被置为null(为了GC)。对于一个被取消的前置节点,当前
         * 节点会一直往前寻找,直到找到一个非取消状态的节点,然后更新自己的前置节点。不要
         * 担心找不到非取消状态的节点,头结点一定不是取消状态的节点(头节点只有获取到锁才
         * 能成为头节点,所以它不可能是取消状态),这样查询到最后查到头节点,也能满足更新
         * 前置节点的条件。
         * 
         */
        volatile Node prev;

        /**
         * 后继节点的指针,可以方便当前节点释放锁以后唤醒后继节点获取锁。
         */
        volatile Node next;

        /**
         * 抢锁的线程
         */
        volatile Thread thread;
    
        /**
         * 条件等待队列下的下一个等待者。
         */
        Node nextWaiter;
}

队列节点类有很多常量:节点类型和节点状态,全放在一起了,具体看代码注释。

另外就是五个成员变量:waitStatus、prev、next、thread、nextWaiter,其中waitStatus、prev、next、thread都用了volatile关键字修饰保证了可见性。

三、公平锁锁抢占的原理

ReentrantLock的公平锁FairSync相对于非公平锁来说简单些,这里就以公平锁的锁抢占作为切入口看看AQS锁抢占的过程。

1、FairSync的lock方法

static final class FairSync extends Sync {
        final void lock() {
            //调用AQS的模板方法acquire
            acquire(1);
        }
    ...省略其它代码...
}

FairSync的加锁代码很简单,直接调用了AQS的模板方法acquire方法,并且传了个参数1,除此之外没有任何代码。

2、AQS模板方法:acquire

/**
 * 该方法仅在独占模式下被调用,并且忽略中断的影响(不会抛出中断异常)。 该方法首先会 
 * 尝试调用tryAcquire方法获取锁,如果成功了就直接返回。否则,当前线程会排队进入队列
 * 等待,重复阻塞-被唤醒-尝试获取锁的步骤,直到成功。该方法通常是实现了Lock接口
 * 的类通过lock方法调用。
 *
 * @param arg 该传参没有什么特别的意义,它可以代表我们希望它代表的任何意思,接下来它
 *        会作为入参传递给tryAcquire方法调用
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire是获得、获取的的意思,该方法在这里实际上就是“获取锁”的意思。可以看到这个方法的代码很简洁,但是包含的代码逻辑不少,只是它将方法调用和返回值判断放在了同一个if判断语句中,所以显的代码比较少而已。

现在看看acquire方法中调用的四个方法:

  • tryAcquire(arg):尝试获取锁
  • addWaiter(Node.EXCLUSIVE):当前线程排队进入等待队列
  • acquireQueued:自旋抢占锁,重复阻塞-被唤醒-尝试获取锁的步骤,直到成功。
  • selfInterrupt():若是自旋抢占锁的过程中线程出现了中断,则执行线程中断操作。

总之这个方法就是抢占锁的完整代码,其整体逻辑如下图所示

image-20241212135504605

3、钩子方法:tryAcquire

tryAcquire是AQS类中的一个钩子方法,也是最核心的抢占锁的方法,它默认是抛出了一个异常,意思就是强制要求子类重写该方法(真让人疑惑,既然如此为啥不定义为抽象方法呢?)

/**
 * 该方法用于尝试获取一个独占锁。 在这个方法中会查询当前锁状态是否允许
 * 获取锁,如果允许就占有锁。
 *
 * 但凡一个线程尝试获取独占锁,这个方法必定会被调用到。如果该方法返回值为false,
 * 那么有可能当前线程会排队进入等待队列(如果还没有在等待队列中),直到其它线程
 * 释放了锁,则该线程就才有机会获取锁。该方法可用于Lock.tryLock()方法中。
 */
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

接下来看看ReentrantLock中的公平锁的方法实现

/**
 * 该方法只有三种情况会被调用到:
 * 1. 第一次获取独占锁的时候
 * 2. 等待队列为空的时候(没有等待者了)
 * 3. 等待队列中的线程被唤醒之后尝试获取锁的时候
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //获取当前AQS同步器的锁状态
    int c = getState();
    //若是处于锁空闲状态
    if (c == 0) {
        //即使锁空闲也要满足条件才能占有锁
        if (
            //①判定是否还存在前置等待节点,只有不存在前置节点才允许获取锁
            !hasQueuedPredecessors() &&
            //CAS设置锁状态为被占用状态
            compareAndSetState(0, acquires)) {
            //设置抢占锁成功的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //②锁已经被占用,而且占用锁的线程和当前线程是同一个线程
    else if (current == getExclusiveOwnerThread()) {
        //state累加
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        //更新锁状态值
        setState(nextc);
        return true;
    }
    //其它情况锁已经被占用而且当前线程不是占有锁的线程,则不允许抢占锁
    return false;
}

在上面的源代码中,①和②处的代码比较有趣,先说下②:

ReentrantLock中的Reentrant其实就是“可重入”的意思,这可重入就体现在同一个线程在持有锁的情况下可以重复获取锁,②处正是实现“重入”的关键。state值在ReentrantLock中为0表示锁空闲,大于0就表示锁已经被占用,每次持有锁的线程重复获取锁,该值都会累加1。

再说下①:

!hasQueuedPredecessors() 作为一个判断条件是当前线程能否获取锁的关键条件,看下它的源代码

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

这段代码的作用是判断当前节点之前是否有其他节点在排队等待,如果存在排队的前驱节点,则返回 true,否则返回 false

在这个方法块中,判断是否该节点前面有等待的节点的依据是:判断头节点的下一个节点是否是当前线程。为什么要判断队列中的第二个节点是否是当前线程?这个答案之后在讲节点入队出队的时候再解答。

首先先思考一个问题,当只有一个线程的时候,AQS队列是否会初始化并入队该线程节点?代码示例如下所示

public class AQSDemo {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock(true);
        lock.lock();
        try {
            System.out.println("Hello,word");
        } finally {
            lock.unlock();
        }
    }
}

调用lock方法的时候,代码又回到了hasQueuedPredecessors方法,由于AQS队列还不存在,所以tail和head此时都为null,那h!=t的判定就是false,hasQueuedPredecessors返回false,而且没有其它线程竞争锁,所以CAS也必定会成功,最后会获取到锁

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //只有一个线程的时候hasQueuedPredecessors会返回false
        if (!hasQueuedPredecessors() &&
            //由于没有竞争,这里CAS必然会成功
            compareAndSetState(0, acquires)) {
            //获取到锁所有权
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

结论就是如果只有一个线程,这个线程将会获取到锁,而且AQS队列不会初始化。

第一个线程获取到锁的情况下,第二个线程来获取锁了,那必然会产生锁竞争,第二个线程执行tryAcquire将会返回false,表示尝试获取锁失败:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //由于第一个线程已经将state字段修改成了1,所以第二个线程执行到此处时不满足状态判断条件
    if (c == 0) {
        //假设由于高并发两个线程同时进入了if语句块同时满足了!hasQueuedPredecessors条件
        if (!hasQueuedPredecessors() &&
            //由于这里是CAS操作,保证了原子性,所以两个线程必然只有一个成功
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

我在上述代码中注释已经写明白了,CAS操作state方法调用确保了无论什么情况下,两个线程同时tryAcquire获取锁只有一个成功,那另外一个失败的会怎么样呢?

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

答案是失败了就会执行addWaiter方法,也就是进队列排队等待锁释放。

tryAcquire流程图如下所示

image-20241217135243014

4、入队方法:addWaiter

/**
 * 封装当前线程和指定的模式成一个新Node后排队进入等待队列
 *
 * @param mode mode模式可以是独占类型:Node.EXCLUSIVE ,也可以是共享类型: Node.SHARED
 * @return 返回值是封装的Node节点
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    //如果队列非空,先尝试一次快速入队
    if (pred != null) {
        node.prev = pred;
        //CAS方法入队,防止线程安全性问题发生
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果上面快速入队方法失败了,enq方法作为fallback方法执行自旋入队
    enq(node);
    return node;
}

简单总结下,addWaiter方法会首先尝试一次快速入队,如果失败了,就走自旋入队逻辑:enq(node)。

5、自旋入队:enq

/**
 * 向队列中插入一个节点,有必要的话需要先初始化队列。
 * @return 插入节点的前置节点,其实没啥用,返回值没用到
 */
private Node enq(final Node node) {
    //自旋重试入队,直到入队成功
    for (;;) {
        Node t = tail;
        //如果尾部节点为空表示AQS队列未初始化
        if (t == null) { 
            //必须先初始化队列
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            //如果队列非空,尝试CAS方式排队到队尾
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

有意思的是初始化队列的这块代码

 if (compareAndSetHead(new Node()))
                tail = head;

初始化队列并没有拿当前待入队的节点初始化,而是new了一个新Node,没有任何意义的Node入队作为头部节点,这是为什么?

因为AQS队列的规则就是:**头部节点是已经获取到锁的线程的节点,第二个节点以及以后的节点则是接下来要被唤醒的节点。**在讲解tryAcquire的时候说过,只有一个线程的时候线程不会入队,第二个线程抢占锁失败之后要入队,结果发现队列是空的,它如果占据了第一个头部节点,则表示它才是持有锁的线程,这就违反了AQS的设计策略,所以这里要搞一个假Node占据头部节点,这样才能保证以后的唤醒等待节点流程以及锁释放流程不会出问题。

所以这个入队的线程,至少需要执行两次for循环才能入队,第一次执行AQS队列初始化,第二次如果CAS入队成功了才会成功。

下面是addWaiter方法完整的流程图:

image-20241217143832775

6、自旋抢占锁:acquireQueued

/**
 * 已经在队列中的线程通过独占模式获取锁的方法,该方法不受中断的影响。
 * 该方法也同样用于等待在条件等待队列中的线程获取锁。
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            //如果前置节点是头部节点,当前节点就尝试抢占锁
            if (p == head && tryAcquire(arg)) {
                //抢锁成功后将抢锁节点设置为头结点
                setHead(node);
                //释放头结点利于GC
                p.next = null; 
                failed = false;
                return interrupted;
            }
            //如果应该阻塞等待就挂起线程进入阻塞状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

当节点进入等待队列后,就开始执行acquireQueued方法自旋抢占锁。在这个方法中,明确设置了抢锁条件:前置节点是头部节点。这也侧面解释了为什么enq方法中AQS队列初始化的时候必须搞一个没啥意义的Node实例作为头部节点。

从代码中还能看到一点:线程释放锁之后并非在线程内将自身节点从等待队列中移除,而是通过唤醒下一个节点,下一个节点获取锁之后将上一个获取锁的线程节点从等待队列中移除。到此未知可以知道,AQS的等待队列是“懒加载”的,等待队列初始化是在enq中第一个节点入队的时候做的,节点移除则是下一个唤醒的节点获取锁之后移除的。

最后就是如果不满足抢锁条件,或者抢锁失败,会执行shouldParkAfterFailedAcquire方法判定是否应该挂起线程,以避免大量的for循环导致的cpu资源浪费。这点正是和原生CLH队列锁最大的不同之处:原生CLH队列锁不会挂起,它会死循环一直查询前置节点状态直到前置节点释放锁,这会浪费大量的CPU资源(详情参考《详解AQS一:CLH队列锁》)。

7、挂起预判:shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire()方法的主要功能是:将当前节点的有效前驱节点(是指有效节点不是CANCELLED类型的节点)找到,并且将有效前驱节点的状态设置为SIGNAL,之后返回true代表当前线程可以马上被阻塞了。

/**
 * @param pred 抢锁线程节点的前置节点
 * @param node 抢锁线程节点
 * @return 如果应该挂起就返回true;否则返回false
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 前置节点是SIGNAL状态,则后继结点可以挂起了。
         */
        return true;
    if (ws > 0) {
        /*
         * 大于0特指CANCELLED状态1,表示前置节点已经取消,则当前节点应该朝前继续查找
         * 非CANCELLED状态的节点,找到后修改当前节点的前置节点指针指向
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 其它类型状态:0、PROPAGATE、CONDITION,那就修改前置节点类型的状态为SIGNAL,
         * 修改完前置节点状态以后,当前节点并不会立即挂起,而是会再执行一次tryAcquire,确
         * 保挂起之前无法获取到锁。
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    
    return false;
}

shouldParkAfterFailedAcquire方法是在acquireQueued方法中的无限for循环中被调用的:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //shouldParkAfterFailedAcquire如果返回true,则会执行
            //parkAndCheckInterrupt方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

从代码中可以看到,一旦shouldParkAfterFailedAcquire方法返回了true,就将执行parkAndCheckInterrupt方法,执行线程挂起。

8、线程挂起:parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

这个方法非常简单,就做了两件事:

  1. 挂起当前线程
  2. 被唤醒以后判定当前线程是否发生了中断

注意这里调用的方法:Thread.interrupted,这个中断方法调用之后会重置中断状态,比如线程中断状态已经是true,第一次调用Thread.interrupted()方法会返回true,调用第二次就会返回false,详情可以参考《java并发编程:线程中断方法interrupt详解》。

为什么要这么做呢?

首先要明白,如果中断状态为true,那么park无法阻塞。LockSupport的park方法如果遇到中断后会响应中断立即停止阻塞,在线程中断状态下,调用再多次的park方法都无法阻塞线程了。acquireQueued方法是个不受中断影响的方法,它的目的是抢锁,至于中断是否要执行,得调用方去判断,所以它定义了一个局部变量interrupted来暂存这个线程中断状态,等线程获取到锁以后告诉调用方中断状态。

为了防止线程不受阻塞的进入临界区破坏线程安全性,必须要让线程的中断状态重置成false,以便下次抢锁失败再次调用LockSupport的park方法的时候能够成功阻塞线程。所以说这里调用Thread.interrupted()方法可以说是极其巧妙了:一方面查询到了实际的中断状态,另一方面实行了中断重置将中断状态改成了false。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; 
                failed = false;
                //获取到锁以后返回真实的中断状态,让调用者执行中断
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //如果检测到了中断,将中断状态暂存到局部变量中
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

自旋抢占锁(acquireQueued)的完整流程如下所示:

image-20241217155055416

9、恢复中断状态:selfInterrupt()

再回过头来看看最开始调用的acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquireQueued方法返回的是线程获取到锁以后检查到的线程中断状态,true表示发生过中断,false表示未发生过中断。如果检测到了中断状态,则执行selfInterrupt方法:

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

这个方法就干了一件事:执行了当前线程的interrupt()方法,这个方法实际上就是设置了一个中断标志,将中断标志设置为true,它并不能立即将线程终止,除非线程正处于等待状态。

这样实际上就是恢复了线程的中断状态,当线程执行的过程中遇到了sleep等方法的调用,就将抛出InterruptedException异常实现线程中断。

四、公平锁锁释放的原理

公平锁的锁释放调用的是ReentrantLock的unlock方法

public void unlock() {
    sync.release(1);
}

它调用的是FairSyn类的release方法,但是FairSyn并没有重写release方法,实际上直接调用的AQS的release方法

1、AQS模板方法:release

/**
 * 该方法是独占模式下的锁释放方法.
 */
public final boolean release(int arg) {
    //尝试释放锁资源
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //唤醒后继结点获取锁
            unparkSuccessor(h);
        return true;
    }
    return false;
}

在release方法中,首先需要调用tryRelease(arg)方法释放锁资源,注意tryRelease方法返回的是true/false,那问题就来了,什么时候返回true,什么时候返回false呢?

这里先确定下结论:当AQS锁状态为0的时候返回true,否则返回false,这个实际上和重入锁有关系,ReentrantLock是重入锁,其代码示例如下所示

public class AQSDemo {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock(true);
        lock.lock();//state变成1
        try {
            lock.lock();//state变成2
            try {
                System.out.println("重入锁1");
            } finally {
                lock.unlock();//state变成1
            }
            System.out.println("重入锁2");
        } finally {
            lock.unlock();//state变成0
        }
    }
}

经过两次加锁,AQS的state值已经变成了2,第一次锁释放state值会变成1,那这时候是否应该唤醒后继节点让其抢占锁?答案是否定的,肯定是要彻底释放锁之后才能唤醒后继结点来抢占锁。

2、钩子方法实现:tryRelease

tryRelease方法是Syn类的方法,它并没有被公平锁类FairSync重写

protected final boolean tryRelease(int releases) {
    //计算锁释放后的state值
    int c = getState() - releases;
    //如果释放锁的线程不是持有锁的线程,就报异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //检查释放锁后的state值是否是0
    if (c == 0) {
        //如果是0表示锁已经彻底释放
        free = true;
        //将持有锁的线程标记为null
        setExclusiveOwnerThread(null);
    }
    //更新state值
    setState(c);
    return free;
}

tryRelease方法很简单,一方面重新计算并更新了state值,另一方面标记当前获取锁的线程为null。

正如之前所说,该方法只有state为0的时候才会返回true,表示可以唤醒后续等待队列中的节点了。

最后注意一个细节:更新state值的方法是调用的setState方法,明明抢占锁的时候都是调用compareAndSetState方法的呀。这是因为释放锁的时候锁已经被当前线程独占获取到了,所以锁释放的时候就不会有线程安全性问题了,就没必要调用CAS方法设置state状态了。

3、唤醒后继节点:unparkSuccessor

unparkSuccessor方法用于唤醒等待队列中的后继节点。

/**
 * @param node 这里的node是头部节点
 */
private void unparkSuccessor(Node node) {
    /*
     * 检查头部节点是否是负数,如果是负数,尝试将其更新成0.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 正常来说头部节点的下一个节点就是将要被唤醒的节点,但是有可能下一个节点
     * 被取消了,那这时候就要从尾部到头部挨个遍历寻找第一个未被取消的后继结点作
     * 为真正需要被唤醒的节点。
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //唤醒后继结点
        LockSupport.unpark(s.thread);
}

该方法被执行后,后继节点被唤醒,会继续执行parkAndCheckInterrupt方法,检查中断,然后在acquireQueued方法中进行下一次for循环,尝试抢锁,如果抢锁失败,则会继续阻塞;如果抢锁成功,如果之前检测出了中断,则恢复中断状态。

但是这个方法有两个疑问:

第一个疑问:当发现节点等待状态小于0,为什么要更新waitStatus状态为0

if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

我们知道Node节点的waitStatus状态初始值是0,直到进入AQS队列中排队等待获取锁,它的值也并没有发生变化;但是一旦有了后继节点,后继结点在自旋抢占锁的过程中会判定前置节点状态,如果是0、-2、-3的话,会将前置节点状态改成-1:SIGNAL状态,后继结点再进入挂起状态。

现在将头结点waitStatus改成了0,那接下来唤醒后继结点以后,后继结点如果抢锁失败,还是会将前置节点waitStatus改成-1,自己再进入挂起状态,这时候就没人能唤醒自己了。。当然在公平锁中后继节点被唤醒以后肯定能获取到锁。

所以为什么要将waitStatus更改为0呢?

第二个疑问:为什么遍历AQS队列要从尾部朝前遍历

Node s = node.next;
if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
            s = t;
}

AQS队列是双向队列,为什么不直接从当前节点查询next节点呢?从代码逻辑上来看,从尾部向前查询是为了防止有NULL节点的情况发生,不明白为什么会有NULL节点出现。


#java #多线程编程
目录
复制 复制成功