概念:CyclicBarrier翻译为循环(屏障/栅栏),当一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续工作。
设计目的:和CountDownLatch都为同步辅助类,因为CountDownLatch的计数器是一次性的,属于一对多,
1、【某一个线程需要其余线程执行完之后再执行 (一个等多个)】,
2、【一些线程需要在某个时刻同时执行,就像等待裁判员枪响后,才能同时起跑(多个等一个)】
所以,CyclicBarrier被设计成为计数器可循环使用,多对多。只要多个线程都达到后,自会执行接下来的事,没有CountDownLatch的一个等多个,多个等一个的现象。
源码解析:
public class CyclicBarrier {/*** Each use of the barrier is represented as a generation instance.* The generation changes whenever the barrier is tripped, or* is reset. There can be many generations associated with threads* using the barrier - due to the non-deterministic way the lock* may be allocated to waiting threads - but only one of these* can be active at a time (the one to which {@code count} applies)* and all the rest are either broken or tripped.* There need not be an active generation if there has been a break* but no subsequent reset.* 屏障的每次使用都表示为一个生成实例。当屏障被触发或重置时,生成就会改变。可以有许多代与使用barrier的线程相关联——由于锁可能以不确定的方式分配给等待线程——但是一次只能有一个是活动的({@code count}应用的那个),其余的要么中断,要么被触发。如果有中断,但没有随后的重置,则不需要活动代。*/private static class Generation {boolean broken = false;}/** The lock for guarding barrier entry保护屏障的锁,重入锁,调用await()方法的时候会用到,只有一个线程会执行成功,所以多线程高并发的情况下CyclicBarrier的执行效率不是很高 */private final ReentrantLock lock = new ReentrantLock();/** Condition to wait on until tripped待跳闸的条件( Condition本身用于线程间通信,在这就是阻塞和唤醒线程用的)*/private final Condition trip = lock.newCondition();/** The number of parties这个就是计数器初始值,新建类的时候传入的数值会赋值给他 */private final int parties;/* The command to run when tripped 跳闸时要执行操作的线程*/private final Runnable barrierCommand;/** The current generation 当前代*/private Generation generation = new Generation();/*** Number of parties still waiting. Counts down from parties to 0* on each generation. It is reset to parties on each new* generation or when broken.还有很多任务在等待。每一代从计数器初始值(parties)降到0。它会在每一代新创建或用尽时重置。*/private int count;/*** Updates state on barrier trip and wakes up everyone.* Called only while holding lock.只在持有锁时调用,去更新屏障的状态并唤醒其他线程。(意思代表下一代,其实就是新建一个代对象重新赋值)*/private void nextGeneration() {// signal completion of last generation//当计数器减少为零证明这代已经满了,唤醒所有当前代阻塞的线程,这是Condition的方法trip.signalAll();// set up next generation//将初始需要拦截的计数器初始值初始化给count计数器count = parties;//上一代已经结束了,就要重新开启下一代generation = new Generation();}/**这个方法就是告诉这一代线程出问题了: 屏障里面的某个线程中断了,那就唤醒所有线程,然后这个屏障拦截的所有线程都会被唤醒,然后抛出异常,并告诉这一代坏掉了* Sets current barrier generation as broken and wakes up everyone.* Called only while holding lock.*/private void breakBarrier() {//将这代标记为true,告诉已经坏掉了generation.broken = true;//这一代重新计数count = parties;//唤醒这一代里面的所有阻塞线程trip.signalAll();}/*** Main barrier code, covering the various policies.主要的屏障代码,涵盖了各种策略*/private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {//当多线程并发情况下只有一个线程能获得锁(注意:该锁为jvm级锁,只能保证单实例有效,多进程下管理共享资源无效)final ReentrantLock lock = this.lock;lock.lock();try {//获取当前代的信息final Generation g = generation;//调用await方法之前需要判断这一代是否已经坏掉了,如果坏掉了直接抛出异常if (g.broken)throw new BrokenBarrierException();//判断当前线程是否中断了,如果中断了也会抛出异常,并调用breakBarrier方法if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}
//如果这代没有坏掉,当前线程也没有中断,那么就将计数器减去1int index = --count;//减去1之后的数值如果为零,证明这一代已经满了,然后唤醒屏障拦截的所有线程if (index == 0) { // trippedboolean ranAction = false;try {//获取初始化的那个线程或者传入的线程final Runnable command = barrierCommand;if (command != null)//线程执行任务command.run();ranAction = true;//执行换代操作nextGeneration();return 0;} finally {//如果上的操作执行出现问题了,那么就执行breakBarrier方法,唤醒所有线程if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed out//自旋for (;;) {try {//await()方法可以传入两个参数的,一个是否需要判断等待时间,一个等待的时间//如果不需要判断等待时间,那么直接调用await()方法,这是Condition类的方法if (!timed)trip.await();//如果需要等待的时间大于0,那么线程阻塞要设置时间,超过这个时间需要被唤醒else if (nanos > 0L)//设置超时阻塞时间,线程就被阻塞了,程序执行到这里就不会继续向下执行,直到这个线程被唤醒,才会继续向下执行nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {//当前线程在阻塞过程,被中断了或者由于这一代屏障前的其他线程原因导致的代损坏了//执行breakBarrier方法唤醒其他线程breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.//如果不是上述两个原因造成的,就中断当前线程Thread.currentThread().interrupt();}}// 当有任何一个线程中断,会调用 breakBarrier 方法.// 就会唤醒其他的线程,其他线程醒来后,也要抛出异常//走到这里这名线程被唤醒了,唤醒的方式可能是计数器为零被唤醒,也有可能调用breakBarrier//方法导致代坏掉了而唤醒的,那么需要需要抛出代 破损异常if (g.broken)throw new BrokenBarrierException();// g != generation 正常换代了,因为计数器为零了,代更新了//一切正常,返回当前线程所在屏障的下标// 如果 g == generation,说明还没有换代,那为什么会醒了?这里就是代的作用// 因为一个线程可以使用多个屏障,当别的屏障唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。// 正是因为这个原因,才需要 generation 来保证正确。 if (g != generation)return index;//如果不需要设置等待时间并且传入的时间不合法小于0,那么执行breakBarrier抛出异常if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}/*** Creates a new {@code CyclicBarrier} that will trip when the* given number of parties (threads) are waiting upon it, and which* will execute the given barrier action when the barrier is tripped,* performed by the last thread entering the barrier.** @param parties the number of threads that must invoke {@link #await}* before the barrier is tripped* @param barrierAction the command to execute when the barrier is* tripped, or {@code null} if there is no action* @throws IllegalArgumentException if {@code parties} is less than 1*/public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}/*** Creates a new {@code CyclicBarrier} that will trip when the* given number of parties (threads) are waiting upon it, and* does not perform a predefined action when the barrier is tripped.** @param parties the number of threads that must invoke {@link #await}* before the barrier is tripped* @throws IllegalArgumentException if {@code parties} is less than 1*/public CyclicBarrier(int parties) {this(parties, null);}/*** Returns the number of parties required to trip this barrier.** @return the number of parties required to trip this barrier*/public int getParties() {return parties;}/*** Waits until all {@linkplain #getParties parties} have invoked* {@code await} on this barrier.** <p>If the current thread is not the last to arrive then it is* disabled for thread scheduling purposes and lies dormant until* one of the following things happens:* <ul>* <li>The last thread arrives; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* one of the other waiting threads; or* <li>Some other thread times out while waiting for barrier; or* <li>Some other thread invokes {@link #reset} on this barrier.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** <p>If the barrier is {@link #reset} while any thread is waiting,* or if the barrier {@linkplain #isBroken is broken} when* {@code await} is invoked, or while any thread is waiting, then* {@link BrokenBarrierException} is thrown.** <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,* then all other waiting threads will throw* {@link BrokenBarrierException} and the barrier is placed in the broken* state.** <p>If the current thread is the last thread to arrive, and a* non-null barrier action was supplied in the constructor, then the* current thread runs the action before allowing the other threads to* continue.* If an exception occurs during the barrier action then that exception* will be propagated in the current thread and the barrier is placed in* the broken state.** @return the arrival index of the current thread, where index* {@code getParties() - 1} indicates the first* to arrive and zero indicates the last to arrive* @throws InterruptedException if the current thread was interrupted* while waiting* @throws BrokenBarrierException if <em>another</em> thread was* interrupted or timed out while the current thread was* waiting, or the barrier was reset, or the barrier was* broken when {@code await} was called, or the barrier* action (if present) failed due to an exception*/public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}/*** Waits until all {@linkplain #getParties parties} have invoked* {@code await} on this barrier, or the specified waiting time elapses.** <p>If the current thread is not the last to arrive then it is* disabled for thread scheduling purposes and lies dormant until* one of the following things happens:* <ul>* <li>The last thread arrives; or* <li>The specified timeout elapses; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* one of the other waiting threads; or* <li>Some other thread times out while waiting for barrier; or* <li>Some other thread invokes {@link #reset} on this barrier.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** <p>If the specified waiting time elapses then {@link TimeoutException}* is thrown. If the time is less than or equal to zero, the* method will not wait at all.** <p>If the barrier is {@link #reset} while any thread is waiting,* or if the barrier {@linkplain #isBroken is broken} when* {@code await} is invoked, or while any thread is waiting, then* {@link BrokenBarrierException} is thrown.** <p>If any thread is {@linkplain Thread#interrupt interrupted} while* waiting, then all other waiting threads will throw {@link* BrokenBarrierException} and the barrier is placed in the broken* state.** <p>If the current thread is the last thread to arrive, and a* non-null barrier action was supplied in the constructor, then the* current thread runs the action before allowing the other threads to* continue.* If an exception occurs during the barrier action then that exception* will be propagated in the current thread and the barrier is placed in* the broken state.** @param timeout the time to wait for the barrier* @param unit the time unit of the timeout parameter* @return the arrival index of the current thread, where index* {@code getParties() - 1} indicates the first* to arrive and zero indicates the last to arrive* @throws InterruptedException if the current thread was interrupted* while waiting* @throws TimeoutException if the specified timeout elapses.* In this case the barrier will be broken.* @throws BrokenBarrierException if <em>another</em> thread was* interrupted or timed out while the current thread was* waiting, or the barrier was reset, or the barrier was broken* when {@code await} was called, or the barrier action (if* present) failed due to an exception*/public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));}/*** Queries if this barrier is in a broken state.** @return {@code true} if one or more parties broke out of this* barrier due to interruption or timeout since* construction or the last reset, or a barrier action* failed due to an exception; {@code false} otherwise.*/public boolean isBroken() {final ReentrantLock lock = this.lock;lock.lock();try {return generation.broken;} finally {lock.unlock();}}/*** Resets the barrier to its initial state. If any parties are* currently waiting at the barrier, they will return with a* {@link BrokenBarrierException}. Note that resets <em>after</em>* a breakage has occurred for other reasons can be complicated to* carry out; threads need to re-synchronize in some other way,* and choose one to perform the reset. It may be preferable to* instead create a new barrier for subsequent use.重置整个屏障为初始状态,将已经拦截的释放掉,全部重新开始计数*/public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {//执行breakBarrier方法唤醒其他线程breakBarrier(); // break the current generation//更新屏障的状态并唤醒其他线程nextGeneration(); // start a new generation} finally {lock.unlock();}}/*** Returns the number of parties currently waiting at the barrier.* This method is primarily useful for debugging and assertions.** @return the number of parties currently blocked in {@link #await}*/public int getNumberWaiting() {final ReentrantLock lock = this.lock;lock.lock();try {return parties - count;} finally {lock.unlock();}}
}
实例:
假如公司团建,大家一起做大巴车,在大巴车出发之前,肯定是需要点名的,只有大家都到车上之后,才会发车,然后到了到了目的地之后,肯定是所有人都下车了,司机才能把车开走,这个过程中涉及了2次大家都就位之后,司机才能继续操作,可以证明CyclicBarrier可以循环使用计数器。
package com.runlion.middleground.marginal;import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;/*** * @description:* @date 2024年04月30日 17:26:05*/
@Slf4j
public class StudyTest {@Getter@Setterstatic class Flag{public int count=0;}@Test@SneakyThrowspublic void testCyclicBarrier() {Flag flag=new Flag();CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {if(flag.getCount()==0){System.out.println("所有人都上车了,可以发车了");flag.setCount(1);}else {System.out.println("所有人都下车了,司机可以走了");}});for (int i = 1; i < 6; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "号上车了");cyclicBarrier.await();System.out.println(Thread.currentThread().getName() + "号开始休息了");TimeUnit.MILLISECONDS.sleep(new Random().nextInt(2000));System.out.println(Thread.currentThread().getName() + "号下车了");cyclicBarrier.await();System.out.println(Thread.currentThread().getName() + "号到达目的地了");}catch (Exception e){log.info(e.getMessage());}}, String.valueOf(i)).start();}System.out.println("主线程不阻塞");}
}