前言
上一篇讲了线程池的相关知识,这篇文章主要讲解一个
1.并发工具类如CountDownLatch、CyclicBarrier等。
2.线程安全和并发集合:
3.学习如何使用Java提供的线程安全的集合类,如ConcurrentHashMap、CopyOnWriteArrayList等。
并发工具类
- CountDownLatch(倒计数器):有两个常用场景
1.协调子线程结束动作,等待所有子线程运行结束。CountDownLatch允许一个或多个线程等待其他线程完成操作。
例如,我们很多人喜欢玩的王者荣耀,开黑的时候,得等所有人都上线之后,才能开打
public static void main(String[] args) throws InterruptedException
{ CountDownLatch countDownLatch = new CountDownLatch(5);
Thread 大乔 = new Thread(countDownLatch::countDown);Thread 兰陵王 = new Thread(countDownLatch::countDown);Thread 安其拉 = new Thread(countDownLatch::countDown);Thread 哪吒 = new Thread(countDownLatch::countDown);Thread 铠 = new Thread(() -> {
try {
// 稍等,上个卫生间,马上到... Thread.sleep(1500);
countDownLatch.countDown();
} catch (InterruptedException ignored) {}
});
大乔.start();
兰陵王.start();
安其拉.start();
哪吒.start();
铠.start(); countDownLatch.await();
System.out.println("所有玩家已经就位!");
}
- 协调线程开始动作,统一各线程开始的时机。
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Thread 大乔 = new Thread(() -> waitToFight(countDownLatch));
Thread 兰陵王 = new Thread(() - > waitToFight(countDownLatch));Thread 安其拉 = new Thread(() ->waitToFight(countDownLatch));
Thread 哪吒 = new Thread(() -> waitToFight(countDownLatch));
Thread 铠 = new Thread(() -> waitToFight(countDownLatch));
大乔.start();
兰陵王.start();
安其拉.start();
哪吒.start();
铠.start(); Thread.sleep(1000);
countDownLatch.countDown();
System.out.println("敌方还有5秒达到战场,全军出击!");
}private static void waitToFight(CountDownLatch countDownLatch) {
try {
countDownLatch.await(); // 在此等待信号再继续
System.out.println("收到,发起进攻!");
} catch (InterruptedException e)
{ e.printStackTrace();
}
}
countdownlatch核心方法:
- await():等待latch降为0
- boolean await(Long Time,TImeUnit unit) : 等待latch降为0,但是设置了超时时间,毕竟不能为了某人等到永久。
- counDown():latch数量减一
- getCount():获取当前数量
CyclicBarrier同步屏障
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情就是让一组线程到达一个屏障(也可叫做同步点)时候被阻塞,直到最后一个线程到达屏障的时候,屏障开门,所有的被拦截的线程才会被放行。
和countdownlatch类似,都有协调多线程的结束动作。
不同:countdownlatch的使用是一次性的,无法重复利用,而cyclicBarrier可以重复利用。
最核心的方法是await:如果当前的一个线程不是第一个到达的话,它将会进入等待,直到其他线程都到达,除非发生了被中断,屏障被破除,被拆除,屏障重设等。
countdownlatch和cyclicBarrier区别点
- countdownlatch是一次性的,而cyclicBarrier则可以设置多次的屏障,实现重复利用。
- countdownlatch中的各个子线程不可以等待其他线程执行完毕,只能执行自己的任务,而CyclicBarrier可以等待其他的线程。
|CyclicBarrier| CountDownLatch|
CyclicBarrier | CountDownLatch |
---|---|
CyclicBarrier是可以重复利用的,其中的线程可以等待所有的线程完成任务,这是屏障会拆除,并可以选择性的做一些特定的动作 | 是一次性的,不同的线程在同一个计数器上工作,直到线程为0 |
是面向线程数的 | 是面向任务数的 |
再使用 CyclicBarrier时候,必须再构造器中指定参加的线程数,这些线程必须实现await()方法 | 使用Countdownlatch必须要指定任务数,但是这些任务由那些线程完成无关紧要 |
CyclicBarrier可以再所有线程释放后重复使用 | COuntdownlatch在计数为0后就不能再使用了 |
再CyclicBarrier中,如果某个线程遇到中断,超时等问题,那么处于await的线程都会出现问题 | 再COuntdownlatch中一个线程出现问题,不会影响其他线程。 |
Semaphore(信号量)
semaphore信号量是用来控制同时访问特定的资源的线程数量,通过协调各个线程,以保证合理的使用公共资源。
一个Semaphore的用途:它可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool =
Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {for (int i = 0; i < THREAD_COUNT; i++){ threadPool.execute(new Runnable() {@Overridepublic void run(){ try {s.acquire();System.out.println("save data");s.release();} catch (InterruptedException e) {}}
});
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {threadPool.execute(new Runnable(){ @Overridepublic void run(){ try {String A = "银行流水A"; // A录入银行流水数据exgr.exchange(A);}threadPool.shutdown();
}
}在代码中,虽然有30个线程在执行,但是只允许10个并发执行。Semaphore的构造方法 Semaphore(int permits)接受一
个整型的数字,表示可用的许可证数量。 Semaphore(10) 表示允许10个线程获取许可证,也就是最大并发数是10。
Semaphore的用法也很简单,首先线程使用 Semaphore的acquire()方法获取一个许可 证,使用完之后调用release()方
法归还许可证。还可以用tryAcquire()方法尝试获取许可证。
Exchanger交换者
是一个用于线程之间协调协作的工具类,Exchanger用于线程之间的数据交换,他提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。
这两个线程通过 exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规
则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需要将纸制银行流水通过人工的方式录入成电子银行
流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个 Excel数据进
行校对,看看是否录入一致
qw
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable()
{ @Override
public void run()
{ try {
String A = "银行流水A"; // A录入银行流水数据
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() { @Override
public void run() { try {
String B = "银行流水B"; // B录入银行流水数据
String A = exgr.exchange("B");
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:" + A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
假如两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用
exchange(V x, long timeOut, TimeUnit unit)设置最大等待时长。
总结
本篇主要就是讲了两个并发工具类。Countdownlatch倒计数器和CyclicBarrier同步屏障,以及一个semaphore信号量。
ps:如果可以一键三连,给加点动力。