CyclicBarrier
和 CountDownLatch 有点类似,主要区别是 CyclicBarrier 可以重用,常用方法如下:
CyclicBarrier barrier = new CyclicBarrier(3); // 表示条件为:要有 3 个线程达到屏障(未指定屏障动作)
barrier.await(); // 如果没有 3 个线程到达屏障,当前线程就阻塞,直到有 3 个线程达到才恢复// 指定屏障动作
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {@Overridepublic void run() {System.out.println("All parties have arrived at the barrier, let's continue.");}
});
await() 底层是让 state - 1,达到栅栏的线程数量足够时,state 会重置,这样来体现可重用的。CountDown 不会重置 state
如果要使用线程池,线程池数量要和 CyclicBarrier 计数器一致
用法示例1
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeoutException;public class CyclicBarrierExample {public static void main(String[] args) {// 创建一个 CyclicBarrier,设定需要3个线程到达屏障CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {@Overridepublic void run() {// 当 3 个线程到达屏障,执行屏障动作System.out.println("All threads have reached the barrier, resuming execution.");}});// 启动3个线程,它们都会到达屏障for (int i = 0; i < 3; i++) {new Thread(new Worker(barrier)).start();}}
}class Worker implements Runnable {private CyclicBarrier barrier;public Worker(CyclicBarrier barrier) {this.barrier = barrier;}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() + " is waiting at the barrier.");barrier.await(); // 当前线程挂起了,下面这行不会打印,直到 3 个线程达到屏障,当前线程恢复运行System.out.println(Thread.currentThread().getName() + " has passed the barrier.");} catch (InterruptedException | BrokenBarrierException e) {Thread.currentThread().interrupt();}}
}
用法示例2
执行两次,第二次执行的时候没有重新创建 CyclicBarrier,体现可重用
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeoutException;public class CyclicBarrierReusableExample {public static void main(String[] args) {final int numberOfThreads = 3;final CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, new Runnable() {@Overridepublic void run() {System.out.println("Barrier reached! All threads completed the phase.");}});Runnable task = () -> {try {System.out.println(Thread.currentThread().getName() + " is performing task.");Thread.sleep((long) (Math.random() * 1000));System.out.println(Thread.currentThread().getName() + " finished task and waiting at the barrier.");barrier.await();// Phase complete, proceed to the next phaseSystem.out.println(Thread.currentThread().getName() + " moving to the next phase.");} catch (InterruptedException | BrokenBarrierException e) {Thread.currentThread().interrupt();}};// Launch threads for phase 1for (int i = 0; i < numberOfThreads; i++) {new Thread(task).start();}// Wait for all threads to complete phase 1 and then start phase 2try {Thread.sleep(2000); // Simulate some delay between phases} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Starting next phase...");// Launch threads for phase 2, using the same barrierfor (int i = 0; i < numberOfThreads; i++) {new Thread(task).start();}}
}
改成 CountDown,因为 CountDown 的计数器不会重置,所以要创建多个 CountDown
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.BrokenBarrierException;public class CountDownLatchExample {public static void main(String[] args) {final int numberOfThreads = 3;// Define a taskRunnable task = new Runnable() {private CountDownLatch latch;public Runnable withLatch(CountDownLatch latch) {this.latch = latch;return this;}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() + " is performing task.");// Simulate some workThread.sleep((long) (Math.random() * 1000));System.out.println(Thread.currentThread().getName() + " finished task and waiting at the latch.");latch.await(); // Wait until the latch count reaches zero// Phase complete, proceed to the next phaseSystem.out.println(Thread.currentThread().getName() + " moving to the next phase.");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}};// Method to start threads with a CountDownLatchvoid startPhase(int numberOfThreads) {CountDownLatch latch = new CountDownLatch(numberOfThreads);// Start threadsfor (int i = 0; i < numberOfThreads; i++) {new Thread(task.withLatch(latch)).start();}// Release the latch after all threads have startednew Thread(() -> {try {Thread.sleep(2000); // Simulate some delay before releasing the latchSystem.out.println("Releasing the latch!");latch.countDown(); // Count down the latch to zero} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}// Start phase 1startPhase(numberOfThreads);// Wait for some time before starting the next phasetry {Thread.sleep(3000); // Simulate some delay between phases} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Starting next phase...");// Start phase 2startPhase(numberOfThreads);}
}