详解AQS六:深入理解循环栅栏CyclicBarrier

Published on 2025-01-04 12:34 in 分类: 博客 with 狂盗一枝梅
分类: 博客

从字面意思直译过来,CyclicBarrier的意思是“循环屏障”,在国内普遍叫它“循环栅栏”,它和CountDownLatch一样,都是JUC下的线程同步工具。

在上一篇文章《详解AQS五:深入理解共享锁CountDownLatch》讲了CountDownLatch,它的“N次countDown方法调用,一次await方法调用”模式中实际上也有一个屏障,所有线程到达屏障之后屏障之后的代码才能执行,这其实也正是CyclicBarrier的作用,只是和CountDownLatch不同的是,CyclicBarrier的屏障都能无限次使用,而CountDownLatch的屏障只能使用一次。但是这并不能说CyclicBarrier比CountDownLatch高级;相反的,CyclicBarrier的实现要比CountDownLatch的实现简单的多,其使用场景也多有受限。这等之后仔细分析。

一、CyclicBarrier的使用

CyclicBarrier的使用比较简单,首先创建CyclicBarrier实例

CyclicBarrier cb = new CyclicBarrier(THREAD_COUNT, new Runnable() {
    @Override
    public void run() {
        log.info("最终执行任务");
    }
});

构造方法有两个参数:

parties :代表线程数量,这些线程执行完await方法之后就到达屏障了,触发执行barrierAction

barrierAction :到达屏障以后,触发执行的任务,此参数可为空,这样触发屏障之后就没有任务执行了。

创建完CyclicBarrier实例之后,在线程内执行await方法,到达parties数量之后,就会触发执行barrierAction任务了。

以下是一个参考案例:

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * 循环栅栏示例
 *
 * @author kdyzm
 * @date 2025/1/2
 */
@Slf4j
public class CyclicBarrierDemo {
    public static final int THREAD_COUNT = 5;
    public static void main(String[] args) {
        CyclicBarrier cb = new CyclicBarrier(THREAD_COUNT, new Runnable() {
            @Override
            public void run() {
                log.info("最终执行任务");
            }
        });

        //执行两次for循环,触发两次屏障
        for (int j = 0; j < 2; j++) {
            for (int i = 0; i < THREAD_COUNT; i++) {
                new Thread(new Worker(cb)).start();
            }
        }
    }

    static class Worker implements Runnable {
        private CyclicBarrier cb;

        public Worker(CyclicBarrier cb) {
            this.cb = cb;
        }

        @Override
        public void run() {
            log.info("执行任务");
            try {
                TimeUnit.SECONDS.sleep(2);
                int await = cb.await();
                log.info("index={}", await);
            } catch (InterruptedException | BrokenBarrierException e) {
                log.error("", e);
            }
        }
    }

}

二、CyclicBarrier源码解析

由于CyclicBarrier的使用几乎只涉及到了构造方法和await方法,所以从这两个方法着手即可。先看看CyclicBarrier整体结构

public class CyclicBarrier {
    
    /**
     * 静态内部类,用来标记当前轮次是否终止
     */
	private static class Generation {
        boolean broken = false;
    }
    
    /**
     * 使用显式锁创建Condition
     */
    private final ReentrantLock lock = new ReentrantLock();
    
    /**
     * 线程等待的核心,实际上使用了显式锁的等待通知机制
     */
    private final Condition trip = lock.newCondition();
    
    /**
     * 代表线程数量,这些线程执行完await方法之后就到达屏障了,触发执行barrierAction
     */
    private final int parties;
    
    /**
     * 到达屏障以后,触发执行的任务,可为空
     */
    private final Runnable barrierCommand;
    
    /**
     * 当前“代”,每开启新的轮次,generation都将会更新
     */
    private Generation generation = new Generation();
    
    /**
     * 在本轮次中,距离触发执行barrierCommand任务还需要的await方法调用次数;
     * 当变成0的时候,将会触发执行barrierCommand;在下轮次开始的时候会重置为parties值
     */
    private int count;
    
    /**
     * CyclicBarrier的核心代码,如何触发barrierCommand以及如何开启下一轮次,异常处理等均在该方法内
     */
    private int dowait(boolean timed, long nanos){
		//核心代码
    }
    
}

从源码整体结构中可以看到,CyclicBarrier并没有像ReentrantLock或者CountDownLatch一样内部维护了Sync类并实现AQS相关钩子方法,它仅仅只是使用了ReentrantLock的等待通知机制而已。

1、构造方法

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

构造方法没有做特殊的事情,除了初始化parties和barrierAction成员变量之外,还初始化了count参数。

注意之后的parties参数不可变,count参数每执行一次await方法该参数都将会自减一,到0的时候触发执行barrierCommand;到下一轮次开启的时候,count变量的值会被重置为parties的值。

2、await方法

await方法啥都没做,直接将参数转给了doAwait方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

doAwait方法就是核心方法了。

/**
 * 核心代码
 */
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    //整个方法都是临界区,被lock/unlock包裹住
    lock.lock();
    try {
        //暂存当前轮次,方便以后被唤醒及时察觉是否已经换了轮次
        final Generation g = generation;
        
        if (g.broken)
            throw new BrokenBarrierException();

        //如果当前轮次发生了中断,就代表循环屏障已经被破坏,必须停止
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        //当前线程的序号
        int index = --count;
        //如果为0,表示已经到达屏障处,需要触发执行barrierCommand任务了
        if (index == 0) {  
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                //执行barrierCommand任务
                if (command != null)
                    command.run();
                ranAction = true;
                //开启下一轮
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        //index不为0时,说明还未到达屏障处,线程需要排队等待
        for (;;) {
            try {
                //线程排队到AQS等待队列
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
			
            //线程被唤醒之后先检查下是否需要停止任务
            if (g.broken)
                throw new BrokenBarrierException();
			/**
			 *如果if条件成立,表示已经进入下一轮次,当前结束循环即可
			 *如果if条件不成立,那表示还没有到达屏障处:
			 *    如果自己是被误唤醒的,那自己下次循环继续等待即可
			 *    如果自己不是被误唤醒的,那大概率是等待超时了,抛出超时异常
			 */
            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

await方法很简单,上面的代码注释已经写的比较清楚,为了更深入的了解CyclicBarrier的原理,自问自答几个问题。

三、关于CyclicBarrier的疑问

1、有几个线程进入了AQS条件等待队列

对于如下一个循环栅栏,N个线程中有几个线程执行了await方法后进入了AQS条件等待队列?

 CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
    @Override
    public void run() {
        log.info("最终执行任务");
    }
});

答案是N-1个。

看下doAwait方法中的片段:

//当前线程的序号
int index = --count;
//如果为0,表示已经到达屏障处,需要触发执行barrierCommand任务了
if (index == 0) {  
    boolean ranAction = false;
    try {
        final Runnable command = barrierCommand;
        //执行barrierCommand任务
        if (command != null)
            command.run();
        ranAction = true;
        //开启下一轮
        nextGeneration();
        return 0;
    } finally {
        if (!ranAction)
            breakBarrier();
    }
}

count值先执行了自减,如果为0,则执行barrierCommand任务,开启下一轮次的循环栅栏之后,就return 0了,最后一个线程并没有进入AQS等待队列,所以是N-1个线程进入了AQS条件等待队列。

2、为什么执行结果串行化了

回想下一开始的案例

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * 循环栅栏示例
 *
 * @author kdyzm
 * @date 2025/1/2
 */
@Slf4j
public class CyclicBarrierDemo {
    public static final int THREAD_COUNT = 5;
    public static void main(String[] args) {
        CyclicBarrier cb = new CyclicBarrier(THREAD_COUNT, new Runnable() {
            @Override
            public void run() {
                log.info("最终执行任务");
            }
        });

        //执行两次for循环,触发两次屏障
        for (int j = 0; j < 2; j++) {
            for (int i = 0; i < THREAD_COUNT; i++) {
                new Thread(new Worker(cb)).start();
            }
        }
    }

    static class Worker implements Runnable {
        private CyclicBarrier cb;

        public Worker(CyclicBarrier cb) {
            this.cb = cb;
        }

        @Override
        public void run() {
            log.info("执行任务");
            try {
                TimeUnit.SECONDS.sleep(2);
                int await = cb.await();
                log.info("index={}", await);
            } catch (InterruptedException | BrokenBarrierException e) {
                log.error("", e);
            }
        }
    }

}

它的执行结果如下:

image-20250103173303354

很神奇的执行结果并不在预想中,本来按照我预想,代码执行结果应该是以下的样子:

2025-01-03 17:32:22.710 [INFO ] [Thread-2  ] - 执行任务
2025-01-03 17:32:22.710 [INFO ] [Thread-8  ] - 执行任务
2025-01-03 17:32:22.710 [INFO ] [Thread-4  ] - 执行任务
2025-01-03 17:32:22.710 [INFO ] [Thread-9  ] - 执行任务
2025-01-03 17:32:22.710 [INFO ] [Thread-5  ] - 执行任务
2025-01-03 17:32:22.710 [INFO ] [Thread-3  ] - 执行任务
2025-01-03 17:32:22.710 [INFO ] [Thread-10 ] - 执行任务
2025-01-03 17:32:22.710 [INFO ] [Thread-1  ] - 执行任务
2025-01-03 17:32:22.710 [INFO ] [Thread-6  ] - 执行任务
2025-01-03 17:32:22.710 [INFO ] [Thread-7  ] - 执行任务
2025-01-03 17:32:24.724 [INFO ] [Thread-3  ] - 最终执行任务
2025-01-03 17:32:24.724 [INFO ] [Thread-4  ] - index=0
2025-01-03 17:32:24.724 [INFO ] [Thread-10 ] - index=1
2025-01-03 17:32:24.724 [INFO ] [Thread-7  ] - index=2
2025-01-03 17:32:24.724 [INFO ] [Thread-8  ] - index=3
2025-01-03 17:32:24.724 [INFO ] [Thread-2  ] - index=4
2025-01-03 17:32:24.724 [INFO ] [Thread-2  ] - 最终执行任务
2025-01-03 17:32:24.724 [INFO ] [Thread-3  ] - index=0
2025-01-03 17:32:24.724 [INFO ] [Thread-5  ] - index=1
2025-01-03 17:32:24.724 [INFO ] [Thread-9  ] - index=2
2025-01-03 17:32:24.724 [INFO ] [Thread-1  ] - index=3
2025-01-03 17:32:24.724 [INFO ] [Thread-6  ] - index=4

也就是说,应该等前一轮方法都执行完毕以后,才能开启下一轮循环栅栏吧,为什么两个最终任务一起执行完毕以后才连续打印了10条线程内的日志?

答案就是因为线程被唤醒以后要重新获取锁,获取锁失败以后就要乖乖去排队。 下面来仔细分析下这个过程:

doAwait方法由于方法全程加锁,所以各个线程会排队进入AQS同步队列,等待获取锁。由于主线程执行速度比较快,所以可以认为10个线程执行了await方法,AQS此时内部队列结构如下:

image-20250103175117568

十个线程中第一个线程获得了执行权,进入了临界区正在运行,其它九个线程都在AQS同步队列中等待(上图画错了,第一个线程并没有进入AQS等待队列,是游离的在运行中的节点,第二个线程进入队列会创建虚拟节点,该虚拟节点表示运行中的线程,实际上可以认为是线程1,画图麻烦,不改了😂)。然后第一个线程没有达到屏障处,所以它进入等待状态,节点从AQS队列移动到了条件等待队列,直到第五个线程获取到了执行权,此时AQS同步队列和条件等待队列如下所示:

image-20250103180007271

第五个线程触发了循环屏障,执行了nextGeneration方法开启下一轮循环栅栏,唤醒了等待队列中的所有节点,等待队列中有四个节点,被唤醒后立马开始尝试获取锁,此时锁仍然被第五个线程持有,所以它们获取锁失败,全部在AQS同步队列末尾排队,这时条件队列被清空,AQS同步队列新增四个节点,AQS同步队列结构如下所示:

image-20250104103804417

在CyclicBarrier中使用的ReentrantLock显式锁是非公平锁,之前的文章《详解AQS三:ReentrantLock非公平锁原理》已经讲过,ReentrantLock非公平锁在两处设置了“非公平性”:

  1. lock方法被调用,线程直接抢锁而无视AQS同步队列中是否有等待的节点,抢锁失败则入队等待
  2. 在AQS队列中被唤醒,无视前面是否有等待的节点,直接抢锁,抢锁失败的话就继续等待

由于线程5持续占有锁,条件等待队列中的线程失去了第一个非公平抢锁的机会,直接进入了AQS等待队列;而且没有意外情况的话,AQS同步队列中的节点是前面节点唤醒后面节点,所以后面从条件等待队列中转移到AQS同步队列中的节点必须要等待前面六个节点依次获取锁并释放锁才能轮到它们。

随着程序运行,线程5和线程10依次从AQS队列中被移除,线程6、7、8、9则先后经历了获取锁->await方法->等待在条件等待队列->被线程10全部唤醒->转移到AQS同步队列的过程,此时控制台输出已经变成了如下所示

2025-01-04 10:54:41.653 [INFO ] [Thread-9  ] - 执行任务
2025-01-04 10:54:41.653 [INFO ] [Thread-10 ] - 执行任务
2025-01-04 10:54:41.653 [INFO ] [Thread-5  ] - 执行任务
2025-01-04 10:54:41.653 [INFO ] [Thread-8  ] - 执行任务
2025-01-04 10:54:41.653 [INFO ] [Thread-4  ] - 执行任务
2025-01-04 10:54:41.653 [INFO ] [Thread-6  ] - 执行任务
2025-01-04 10:54:41.653 [INFO ] [Thread-3  ] - 执行任务
2025-01-04 10:54:41.653 [INFO ] [Thread-2  ] - 执行任务
2025-01-04 10:54:41.653 [INFO ] [Thread-1  ] - 执行任务
2025-01-04 10:54:41.653 [INFO ] [Thread-7  ] - 执行任务
2025-01-04 10:54:43.661 [INFO ] [Thread-8  ] - 最终执行任务
2025-01-04 10:54:43.661 [INFO ] [Thread-5  ] - 最终执行任务

而AQS同步队列已经变成了这样:

image-20250104110022270

上图是线程10即将从AQS队列中被移除时AQS同步队列的内部结构。

剩下八个在AQS队列中等待的线程都是因为在doAwait方法被唤醒后因为获取锁失败被挂起的线程,它们代码执行到哪里了呢?它们还在CyclicBarrier的doAwait方法中等待:

image-20250104110726383

然后线程10从AQS同步队列中被移除,线程1重新获得锁,然后继续执行,在

if (g != generation)
    return index;

代码处结束doAwait方法——实际上generation已经换了两个了,既不是原来的那个generation,也不是第一轮循环栅栏的generation,甚至不是第二轮的generation,而是第二轮循环栅栏结束之后新创建的第三轮的循环栅栏generation。

接下来线程1继续执行原来线程中的业务代码:

image-20250104111600081

就打印出来类似index=1这种日志了,接下来剩余七个线程依次获取锁,然后打印index=xxx,最后就变成了八个线程集中打印index=xxx。

这样就破案了,由于锁的公平性(虽然是非公平锁),导致线程被唤醒后在AQS同步队列中排队。如何解决这个问题呢?实际上就是让每轮循环栅栏结束后暂停一下,让后续新线程不要排队到从条件等待队列中转移到AQS队列中的那些节点前面就好了。

修改后的代码如下:

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * 循环栅栏示例
 *
 * @author kdyzm
 * @date 2025/1/2
 */
@Slf4j
public class CyclicBarrierDemo {

    public static final int THREAD_COUNT = 5;


    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cb = new CyclicBarrier(THREAD_COUNT, new Runnable() {
            @Override
            public void run() {
                log.info("最终执行任务");
            }
        });

        for (int j = 0; j < 2; j++) {
            for (int i = 0; i < THREAD_COUNT; i++) {
                new Thread(new Worker(cb)).start();
            }
            //等待两秒钟再进行下一轮循环,防止新线程排队到从条件等待队列中转移到AQS队列中的那些节点前面
            TimeUnit.SECONDS.sleep(2);
        }
    }

    static class Worker implements Runnable {
        private CyclicBarrier cb;

        public Worker(CyclicBarrier cb) {
            this.cb = cb;
        }

        @Override
        public void run() {
            log.info("执行任务");
            try {
                int await = cb.await();
                log.info("index={}", await);
            } catch (InterruptedException | BrokenBarrierException e) {
                log.error("", e);
            }
        }
    }

}

运行结果如下所示:

image-20250104121038812

四、CountDownLatch Vs CyclicBarrier

我觉得CountDownLatch和CyclicBarrier的最大区别就在于CountDownLatch的屏障能阻塞调用方代码继续运行,而CyclicBarrier提供一个Runnable构造方法中的参数作为“回调函数”,回调函数的执行并不影响调用方代码继续运行。

另外一个重要的区别就是CountDownLatch不可重复使用,而CyclicBarrier可重复使用。

最后就是CountDownLatch的实现和CyclicBarrier不同,CountDownLatch实现比较复杂,它内部维护了Sync类并实现AQS接口;CyclicBarrier则是直接利用了ReentrantLock的等待通知机制,实际上CyclicBarrier和AQS并没有直接使用关系。



END.


#java #多线程编程
目录
复制 复制成功