详解AQS七:深入理解信号量机制Semaphore

Published on 2025-01-19 14:38 in 分类: 博客 with 狂盗一枝梅
分类: 博客

所谓的共享锁就是在同一时刻允许多个线程持有的锁,和CountDownLatch不同,Semaphore是一种真正的共享锁:Semaphore可以用来控制在同一时刻访问共享资源的线程数量,通过协调各个线程以保证共享资源的合理使用。Semaphore维护了一组虚拟许可,它的数量可以通过构造器的参数指定。线程在访问共享资源前必须调用Semaphore的acquire()方法获得许可,如果许可数量为0,该线程就一直阻塞。线程访问完资源后,必须调用Semaphore的release()方法释放许可。更形象的说法是:Semaphore是一个许可管理器。

以买票为例说明Semaphore的使用场景:

假如汽车站有四个卖票窗口,那最多只能接待四个人同时买票,无论有多少买票的人,超出四个人的必须得在后面排队等待,等窗口上的人离开以后,才能再去一个人在空出来的窗口上买票。使用信号量机制模拟这个过程,谁才是Semaphore管理的对象呢,是买票人,窗口数量,还是卖票人?答案是窗口数量。总结下来,Semaphore要求管理的目标有以下特性:

  • 资源数量固定,卖票窗口数量是固定的
  • 资源可以获取和释放,买票人买票的时候占据窗口,买票人离开的时候释放窗口

一、Semaphore使用方式

Semaphore的常用方法就三个:构造方法、获取共享锁的方法、释放共享锁的方法。

以售票为例说明Semaphore的使用方式:

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * @author kdyzm
 * @date 2025/1/4
 */
@Slf4j
public class SemaphoreDemo {

    //100个买票的人
    private static final int BUYER_COUNT = 100;

    //四个售票窗口,许可证数量是4
    private static final Semaphore TICKET_WINDOWS = new Semaphore(4);

    //倒计时计数器
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(BUYER_COUNT);

    public static void main(String[] args) {

        for (int i = 0; i < BUYER_COUNT; i++) {
            Thread thread = new Thread(new Buyer(), "线程" + i);
            thread.start();
        }
        try {
            COUNT_DOWN_LATCH.await();
        } catch (InterruptedException e) {
            log.error("", e);
        }
    }


    static class Buyer implements Runnable {

        @Override
        public void run() {
            try {
                //获取共享锁:获取一个许可证,如果许可证不够,则阻塞等待释放
                TICKET_WINDOWS.acquire();
                log.info("【{}】购票中......", Thread.currentThread().getName());
                //模拟业务耗时
                TimeUnit.SECONDS.sleep(2);
                //释放共享锁:释放一个许可证
                TICKET_WINDOWS.release();
            } catch (InterruptedException e) {
                log.error("", e);
            }
            COUNT_DOWN_LATCH.countDown();
        }
    }
}

运行结果如下:

动画9

可以看到,每四个线程可以同时执行任务,其它线程则需要等待。

在这个案例中,使用semaphore管理4个窗口的使用,每个购票者都要先抢占共享锁才能进行购票,购票完成之后释放共享锁,共享锁允许四个线程同时访问,超出线程数则需要阻塞等待。

Sempahore的核心方法其实就三个:

构造方法:public Semaphore(int permits),该构造方法初始化允许N个线程同时访问的的共享锁。

**acquire方法:**获取共享锁

**release方法:**释放共享锁

Semaphore的使用很简单,那它的实现原理是怎么样的呢?

二、Semaphore的实现原理

Semaphore是基于AQS实现的共享锁,从其代码结构上就能看出来:

image-20250113163528942

Semaphore的代码结构和CountDownLatch以及ReetrantLock几乎是一模一样的,内部都维护了一个Sync实例,Semaphore的获取锁以及释放锁都会委托给Sync类中的方法,而Sync类中只是实现了AQS的钩子方法,Semaphore中比较重要的acquire和release方法实际上都是调用的AQS中的方法。

1、获取锁:acquire方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquire方法直接调用了AQS中的acquireSharedInterruptibly方法

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //先判断中断标志位,如果发生了中断则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    /**
     * 尝试获取锁,如果获取锁失败,则进入队列等待
     *
     */
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

根据方法名可以知道,该方法是个“可中断”的方法,所以该方法会抛出IE异常;之后会先尝试一次获取锁,如果返回值小于0,表示获取锁失败了,这时候就要调用doAcquireSharedInterruptibly方法在AQS中排队等待别人释放锁。

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
     //将节点标记为共享类型,并加入AQS同步队列。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //获取节点的前置节点
            final Node p = node.predecessor();
            //判断是否是头结点,是头结点就表示自己可以尝试获取锁了。
            if (p == head) {
                //尝试获取共享锁
                int r = tryAcquireShared(arg);
                //r>=0表示已经成功获取到了锁
                if (r >= 0) {
                    //执行获取锁成功的流程,由于是共享锁,如果剩余许可数量大于0,需要自动唤醒下一个节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //检查是否应该在失败后挂起。
            if (shouldParkAfterFailedAcquire(p, node) &&
                //挂起线程
                parkAndCheckInterrupt())
                //如果发生了中断,就抛出IE异常。
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireSharedInterruptibly 方法在之前分析CountDownLatch的时候已经讲过:该方法会将节点添加到AQS同步队列中并不断尝试获取锁;获取失败就挂起等待,唤醒之后继续尝试获取锁,不断重复此过程直到成功获取到锁。

2、钩子方法:tryAcquireShared

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

tryAcquireShared方法是AQS的钩子方法,对于非公平锁来说,它会调用nonfairTryAcquireShared方法获取锁

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //获取可用的许可数量
        int available = getState();
        //计算如果分配出去acquires个许可,还剩下多少个许可
        int remaining = available - acquires;
        if (remaining < 0 //如果小于0,表示获取锁失败,直接返回remaining
            || compareAndSetState(available, remaining))//如果大于0则执行CAS方法更新剩余可用许可数量
            return remaining;
    }
}

tryAcquireShared方法实际上就是更新可用许可数量的方法,其返回值是剩余许可数量;所以获取锁失败的时候返回值为负值,之后acquire方法就会让当前线程在AQS同步队列中排队等待。

总结一下,tryAcquireShared方法才是真正获取锁的方法,它的返回值会决定当前节点后续是否进入AQS同步队列。

3、释放锁:release方法

public final boolean releaseShared(int arg) {
    //尝试释放锁
    if (tryReleaseShared(arg)) {
        //如果释放锁成功,则尝试唤醒后续节点来抢锁
        doReleaseShared();
        return true;
    }
    return false;
}

在这个方法中,会先调用模板方法tryReleaseShared方法释放许可

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //获取当前剩余许可数量
        int current = getState();
        //增加release个许可的数量
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //更新许可数量,如果失败则for循环下次重试
        if (compareAndSetState(current, next))
            return true;
    }
}

可以看到tryReleaseShared方法很简单,实际上就是恢复了之前用掉的许可。

调用tryReleaseShared方法恢复许可之后,会调用doReleaseShared方法,在CountDownLatch中已经分析过该方法,但是由于执行的上下文已经变成了Semaphore,它的一些方法执行逻辑已经发生了变化,而且变得比较复杂,这在下一章节再分析。

三、PROPAGATE状态的作用

上一章节讲到了释放锁的问题,为何突然转变成研究PROPAGATE状态的作用了?

PROPAGATE状态是共享锁AQS同步队列中节点的专属状态,而且并非所有节点的状态都是PROPAGATE,只有特殊场景下才会触发将节点状态更改为PROPAGATE,目的是为了解决某些场景下明明许可数量足够,但是无法唤醒后继节点的BUG。

首先看下tryReleaseShared方法

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        //如果AQS同步队列存在等待中的节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            /**
             * 节点入队之后调用shouldParkAfterFailedAcquire会将前置节点都改成SIGNAL状态,
             * 所以大部分情况该IF条件都会得到满足
             */
            if (ws == Node.SIGNAL) {
                //使用CAS无锁操作保证接下来的唤醒操作在当前节点在头结点的位置上的情况下只会执行一次
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;     
                //唤醒后继节点
                unparkSuccessor(h);
            }
            /**
             * 划重点的代码:这段代码很多人可能觉得不会执行到,不理解这段代码为什么会存在,
             * 实际上是为为了解决有些情况下许可数量充足但是后继节点无法被唤醒的问题
             */
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        //for循环停止的条件是head值没有发生变化
        if (h == head)                   
            break;
    }
}

tryReleaseShared方法在Semaphore信号量机制中,可能会在以下两个地方被调用到:

  1. acquire获取锁成功以后,在setHeadAndPropagate方法中被调用到
  2. release释放锁成功以后被调用到

经过之前ReentrantLock独占锁的学习之后,在我们的认知里,释放锁之后唤醒后继线程是常规操作,为什么获取锁成功以后也会唤醒后继线程呢?我认为有以下两点原因:

  1. Semaphore是共享锁,当有足够的许可时,获取锁成功的线程立即通知后继节点抢锁,可以提高锁获取的效率
  2. 有些情况下,头结点状态是0,释放锁的线程没有机会调用unparkSuccessor(h);方法;如果没有抢锁线程抢到锁之后主动唤醒后继节点,后继结点将无法被唤醒。

第二种情况,使用PROPAGATE状态加上抢锁线程主动唤醒后继线程,就可以解决。

1、2009年的一个bug

PROPAGATE状态并非一开始就设计好的,而是因为有人在2009年提了一个bug:Concurrent Semaphore release may cause some require thread not signaled ,Doug Lea 则在同年解决了这个bug:Concurrent Semaphore release may cause some require thread not signaled ,引入PROPAGATE状态正是为了解决该Bug。

image-20250115230534811

Bug的标题翻译成中文:Semaphore锁释锁以后或许会导致一些正在获取锁的线程无法被唤醒。Bug在当时的JDK1.6下通过以下代码会重现:

import java.util.concurrent.Semaphore;

public class TestSemaphore {

    private static Semaphore sem = new Semaphore(0);

    private static class Thread1 extends Thread {
        @Override
        public void run() {
            sem.acquireUninterruptibly();
        }
    }

    private static class Thread2 extends Thread {
        @Override
        public void run() {
            sem.release();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10000000; i++) {
            Thread t1 = new Thread1();
            Thread t2 = new Thread1();
            Thread t3 = new Thread2();
            Thread t4 = new Thread2();
            t1.start();
            t2.start();
            t3.start();
            t4.start();
            t1.join();
            t2.join();
            t3.join();
            t4.join();
            System.out.println(i);
        }
    }
}

上面这段代码定义了一个初始许可数量为0的Semaphore共享锁,这意味着任何获取锁的线程都会失败然后乖乖在AQS队列中排队,但是Bug的提出人定义了两个线程,一个线程获取锁,一个线程释放锁,而且分别定义了两个实例,再定义10000000次循环,每次循环都有两个线程获取锁,两个线程释放锁,而且每次循坏内主线程都会等待子线程执行完毕,理论上来说程序应该会稳定运行直到程序结束,但是这个Bug的提出人运行该代码出现了一种诡异的现象:

程序运行在某次循环的时候卡住了,卡住的时候Semaphore共享锁还剩下一个许可,AQS队列中还有一个线程在排队等待但是没有人唤醒它。

2、未修复bug前的代码

在分析Bug形成原因前,先看看未修复bug前的代码长什么样子,完整的AQS代码如下:https://github.com/openjdk/jdk8u/blob/ca6e1aecc352d16fd2a731ad6e388b018b9ae4b0/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java ,代码很长,在未修复前,PROPAGATE状态还不存在,doReleaseShared方法不存在,提炼出来关键的四个方法如下:

//获取锁的方法
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


/**
 * 该方法是线程在获取共享锁成功以后如果存在可用的许可就调用该方法
 */
private void setHeadAndPropagate(Node node, int propagate) {
    //将当前节点设置为头结点
    setHead(node);
    if (propagate > 0 && node.waitStatus != 0) {
        /*
         * Don't bother fully figuring out successor.  If it
         * looks null, call unparkSuccessor anyway to be safe.
         */
        Node s = node.next;
        if (s == null || s.isShared())
            unparkSuccessor(node);
    }
}

/**
 * 该方法在释放锁的时候会被调用
 *
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //唤醒后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    //先将头结点状态改为0
	compareAndSetWaitStatus(node, Node.SIGNAL, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

3、bug分析

未修复前的代码比修复后的代码简单很多,这段代码出现了什么问题导致的Bug出现呢?由于每次循环主线程都要等待子线程执行完毕,如果顺利执行完毕,Semaphore的许可数量都会又变成0,所以循环之间彼此不受影响,Bug的提出者之所以要循环10000000次,只是为了让Bug能稳定重现。所以Bug是有可能在第一次循环就出现的。

从代码中可以看到,t1、t2是获取锁的线程,t3、t4是释放锁的线程,一开始的时候Semaphore的许可数量是0,下面通过时序图重现该bug

image-20250116174737357

t1和t2线程是获取锁的线程,假设它们先执行了,然后获取锁失败了,这时候AQS同步队列中的数据结构如下所示

image-20250116175729203

接着线程3开始释放锁,并且释放成功,此时Semaphore的许可数量为1,它将头结点修改为0之后唤醒了线程1,之后线程结束,此时AQS同步队列中的数据结构如下:

image-20250116220627837

线程1被唤醒后发现许可数量为1,于是成功获取到了锁,许可数量减一变成0,线程1接下来本该执行setHeadAndPropagate方法自己成为head节点并唤醒后继结点,无奈发生了线程切换,切换到了线程4释放锁,此时AQS同步队列中的数据结构仍然没有发生变化:

image-20250116220627837

线程4释放锁成功了,此时Semaphore许可数量变成了1,它判断了当前AQS队列的头结点状态,发现是0,于是就没有唤醒后继结点;线程切换到线程1,线程1继续执行setHeadAndPropagate方法,重新设置了头部节点为自己,虽然最新的许可数量已经变成了1,但是由于没有获取最新的许可数量,线程1错以为没有许可可用了,所以也没有唤醒后继结点,此时AQS数据结构如下所示:

image-20250116221415167

其结果就是队列中线程2还在等待别人唤醒它,Semaphore许可数量为1却没有线程来获取。

4、使用PROPAGATE状态破局

队列中的线程2应该被谁唤醒?再看一看AQS同步队列中的数据结构:

image-20250116221415167

AQS同步队列的基本规则就是前置节点唤醒后继节点,所以线程2理应当被线程1唤醒,线程1当初是因为在setHeadAndPropagate方法中判断没有可用的许可才没有唤醒后继结点

if (propagate > 0 && node.waitStatus != 0) {
}

线程1执行到setHeadAndPropagate方法的时候,AQS同步队列结构如下所示:

image-20250116220627837

Doug Lea的解决办法是线程4在释放锁的时候发现头部节点状态为0,就将其改成-3;线程1发现head状态是-3,就无条件唤醒后继节点:

image-20250116223024043

这样就解决了问题。-3是**PROPAGATE的状态值,PROPAGATE**状态也因此被引入进来,而且PROPAGATE状态在JDK代码中也仅仅在doReleaseShared方法中被直接引用,它实际上就是为了解决该特定场景下的问题而存在的。

5、修复bug后的代码

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);
    //这里不仅仅判断了剩余的许可数量,还判断了头结点状态是否小于0,自然也包括PROPAGATE状态
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            //划重点,头结点状态为0的时候要将其状态更改为PROPAGATE
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

6、总结

由于多线程环境下线程都是并行运行的,头结点没有及时更新(中间态)而且状态变成0就有可能导致后继节点无法被唤醒,通过使用PROPAGATE状态可以让被唤醒的线程无条件继续唤醒后继线程,从而解决该问题。

四、全流程分析Semaphore共享锁

接下来全面分析一下Semaphore共享锁在运行中的时候,AQS同步队列中的数据结构和状态的变化。

还是以一开始的卖票为例:

@Slf4j
public class SemaphoreDemo {

    //10000个买票的人
    private static final int BUYER_COUNT = 10000;

    //四个售票窗口
    private static final Semaphore TICKET_WINDOWS = new Semaphore(4);

    //倒计时计数器
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(BUYER_COUNT);

    public static void main(String[] args) {
        for (int i = 0; i < BUYER_COUNT; i++) {
            Thread thread = new Thread(new Buyer(), "线程" + i);
            thread.start();
        }
        try {
            COUNT_DOWN_LATCH.await();
        } catch (InterruptedException e) {
            log.error("", e);
        }
    }


    static class Buyer implements Runnable {

        @Override
        public void run() {
            try {
                //为了充分模拟复杂环境,这里延时获取锁
                TimeUnit.MILLISECONDS.sleep(200);
                //获取共享锁
                TICKET_WINDOWS.acquire();
                log.info("【{}】购票中......", Thread.currentThread().getName());
                //模拟业务耗时
                TimeUnit.SECONDS.sleep(2);
                //释放共享锁
                TICKET_WINDOWS.release();
            } catch (InterruptedException e) {
                log.error("", e);
            }
            COUNT_DOWN_LATCH.countDown();
        }
    }
}

上面的程序运行的时候,一方面AQS队列尾部在不断添加节点,另一方面头部节点也在不断获取锁并出队列,程序运行结果就是每隔两秒就有四个线程打印输出。

1、前4个线程

Semaphore初始化有4个许可,所以前4个线程会直接获取到锁,此时AQS同步队列并不会被初始化;前四个线程等待两秒钟期间,每隔200毫秒就有新的线程尝试获取锁,当然肯定是获取失败了的,获取失败之后,它们就会在AQS同步队列中排队,此时的AQS同步队列结构如下所示:

image-20250117175555136

前四个线程等待两秒钟之后将会执行release方法释放锁,它们有可能会同时进入doReleaseShared方法:

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
}

这时候可能会出现很多种情况:

假如四个线程同时拿到当前AQS的head,同时执行到if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))方法,很明显的,由于CAS的原子性,只有一个线程能CAS操作成功,假设线程1CAS成功了,然后它唤醒了后继节点线程5;线程2/3/4则由于CAS失败,执行了continue命令进行了下一轮循环,它们重新获取head节点,这时候有两种情况

  1. head节点还是上一轮循环的节点,线程5虽然被唤醒,但是没有来得及执行setHeadAndPropagate方法改变head节点为自身
  2. head节点已经变成了线程5

三个线程有可能都是情况1,有可能都是情况2,甚至有部分情况1,部分情况2,情况已经变得很复杂,无论是什么情况,其目的只有一个:唤醒后继节点,而唤醒后继节点的唯一条件就是头结点的状态是SIGNAL,而且当前线程通过CAS操作将其状态更改为0,这实际上就是加上了一把锁,保证了头结点没发生变化的时候只会执行一次唤醒后继节点的操作,防止无意义的重复唤醒。

当然,由于是CAS操作,是有可能出现ABA问题的:对head节点CAS操作成功将状态-1改为0之后,此时队尾正在添加节点,它会将前置节点都更改为SIGNAL状态。这样其实也无妨,在后续的unparkSuccessor方法中还会检查一次将状态重新更改为0;如果其它线程发现head状态又变成了-1,想要再次执行唤醒动作,那就执行好了,不影响程序结果。

2、以后的线程

前四个节点完成线程任务之后,其中一个线程唤醒了AQS队列head节点的后继节点,由于许可充足,它获取到锁以后立马唤醒了后继节点,此时它还没开始执行自己的任务;唤醒后继节点以后,才开始干自己的活儿,后继节点重复此操作,直到许可数量不足。

在这个过程中,最新获取到锁的Node立马成为新head,而唤醒它的线程虽然还没开始运行自己的run方法,自己的节点却已经在AQS队列中被移除,此时AQS队列结构张这样:

image-20250117233116660

游离的线程节点变成了3个,而获取到最后一个许可的线程成为了头结点,在唤醒下一个节点之前并不会从AQS队列中被移除。

以后的过程就是重复上述过程,直到最后一个节点。

五、一些疑问

1、会不会出现正在enq的节点无法被唤醒的情况

假如AQS队列中只有一个head节点了,一个新节点想要获取共享锁,但是许可数量不足,于是它执行了addWaiter方法去排队

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

结果节点还没入队,头节点已经执行完成了唤醒完成了后继节点的操作,岂不是没法唤醒了自己了?其实并不会发生这种情况,因为这种情况下新入队的节点并不会挂起:addWaiter方法入队成功之后,在doAcquireSharedInterruptibly方法中的for循环中会立即尝试抢占锁,这时候由于head节点已经释放了锁,所以会立即获取到锁:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    	//入队前可能前置节点已经执行完唤醒操作
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //入队成功之后会立即尝试获取锁,这种情况下必定会获取成功
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


参考文章:https://yangsanity.me/2022/06/11/AQS-PROPAGATE

END.


#java #多线程编程
复制 复制成功