1. 等待与通知
为了支持多线程之间的协作,JDK 中提供了两个非常重要的方法:wait() 和 notify() ,这两个方法定义在 Object 类中,这意味着任何 Java 对象都可以调用者两个方法。如果一个线程调用了 object.wait() 方法,那么它就会进入该对象的等待队列中,这个队列中可能包含了多个线程,此时代表多个线程都在等待同一个对象;当 object.notify() 方法被调用时,它就会从这个等待队列中随机唤醒一个线程。
需要特别注意的是在调用这两个方法时,它们都必须位于对应对象的 synchronzied 语句中,因为这两个方法在调用前都需要获得对应对象的监视器(内部锁),过程如下:
使用示例如下:
public class J3_WaitAndNotify { private static final Object object = new Object(); public static void main(String[] args) {
new Thread(() -> {
synchronized (object) {
try {
System.out.println("对象object等待");
object.wait();
System.out.println("线程1后续操作");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
synchronized (object) {
System.out.println("线程2开始操作");
System.out.println("对象object唤醒");
object.notify();
}
}).start();
}
}// 输出
对象object等待
线程2开始操作
对象object唤醒
线程1后续操作
notify() 表示随机唤醒任意一个等待线程,如果想要唤醒所有等待线程,则可以使用 notifyAll() 方法:
public class J5_NotifyAll {private static final Object object = new Object();public static void main(String[] args) {new Thread(() -> {
synchronized (object) {try {
System.out.println("对象object在线程1等待");object.wait();
System.out.println("线程1后续操作");} catch (InterruptedException e) {
e.printStackTrace();}}}).start();new Thread(() -> {
synchronized (object) {try {
System.out.println("对象object在线程2等待");object.wait();
System.out.println("线程2后续操作");} catch (InterruptedException e) {
e.printStackTrace();}}}).start();new Thread(() -> {
synchronized (object) {
System.out.println("线程3开始操作");
System.out.println("对象object唤醒");object.notifyAll();}}).start();}
}// 输出
对象object在线程1等待
对象object在线程2等待
线程3开始操作
对象object唤醒
线程2后续操作
线程1后续操作
在上面的示例中,由于有两个线程处于等待状态,所以 notifyAll() 的效果等价于调用 notify() 两次:
object.notify();
object.notify();
2. 条件变量
综上所述可以使用 wait() 和 notify() 配合内部锁 synchronized 可以实现线程间的等待与唤醒,如果你使用的是显示锁而不是内部锁,此时可以使用 Condition 来实现同样的效果。Condition 接口中定义了如下方法:
await():使得当前线程进入等待状态,类似于 object.wait();
awaitUninterruptibly():与 await() 类似,但它不会在等待过程中响应中断;
awaitNanos(long nanosTimeout) & await(long time, TimeUnit unit) & awaitUntil(Date deadline):有时间限制的等待;
signal():用于随机唤醒一个等待;
signalAll():用于唤醒所有等待。
和 object 的 wait()\notify()\notifyAll() 一样,在使用 condition 的 await()\signal()\signalAll() 前,也要求线程必须持有相关的重入锁, 示例如下:
public class AwaitAndSignal {private static ReentrantLock lock = new ReentrantLock();private static Condition condition = lock.newCondition();static class IncreaseTask implements Runnable {
@Overridepublic void run() {try {lock.lock();String threadName = Thread.currentThread().getName();
System.out.println(threadName + "线程等待通知...");
condition.await();
System.out.println(threadName + "线程后续操作");} catch (InterruptedException e) {
e.printStackTrace();} finally {lock.unlock();}}}public static void main(String[] args) throws InterruptedException {Thread thread1 = new Thread(new IncreaseTask());
thread1.start();
Thread.sleep(1000);
System.out.println("主线程开始操作");lock.lock();
System.out.println("主线程唤醒");
condition.signal();lock.unlock();}
}// 输出:
Thread-0线程等待通知...
主线程开始操作
主线程唤醒
Thread-0线程后续操作
3. Join
Thread.join() 可以让当前线程等待目标线程结束后再开始运行,示例如下:
public class J1_Normal {private static int j = 0;public static void main(String[] args) {Thread thread = new Thread(() -> {for (int i = 0; i < 100000; i++) {
j++;}});
thread.start();
System.out.println(j);}
}
// 此时主线程不等待子线程运行完成,通常输出结果为:0public class J2_Join {private static int j = 0;public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(() -> {for (int i = 0; i < 100000; i++) {
j++;}});
thread.start();
thread.join();
System.out.println(j);}
}
// 此时主线程需要等待子线程运行完成,输出结果为:100000
4. CountDownLatch
Thread.join() 可以让当前线程等待目标线程结束后再开始运行,但大多数时候,你只需要等待目标线程完成特定的操作,而不必等待其完全终止。此时可以使用条件变量 Condition 来实现,也可以使用更为简单的工具类 CountDownLatch 。CountDownLatch 会在内部维护一个计数器,每次完成一个任务,则计数器减 1,当计数器为 0 时,则唤醒所有的等待线程,示例如下:
public class j1_Normal {private static AtomicInteger integer = new AtomicInteger(0);static class IncreaseTask implements Runnable {
@Overridepublic void run() {try {// 假设这是一个耗时的任务
Thread.sleep(3000);
integer.incrementAndGet();} catch (InterruptedException e) {
e.printStackTrace();}}}public static void main(String[] args) {IncreaseTask task = new IncreaseTask();ExecutorService executorService = Executors.newFixedThreadPool(100);for (int i = 0; i < 100; i++) {
executorService.submit(task);}
System.out.println("integer:" + integer);
executorService.shutdown();}
}// 不使用CountDownLatch 时,主线程不会子线程等待计算完成,此时输出通常为: 0public class J2_CountDown {private static int number = 100;// 指定计数器的初始值private static CountDownLatch latch = new CountDownLatch(number);private static AtomicInteger integer = new AtomicInteger(0);static class IncreaseTask implements Runnable {
@Overridepublic void run() {try {// 假设这是一个耗时的任务
Thread.sleep(3000);
integer.incrementAndGet();// 计数器减1
latch.countDown();} catch (InterruptedException e) {
e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException {IncreaseTask task = new IncreaseTask();ExecutorService executorService = Executors.newFixedThreadPool(100);for (int i = 0; i < number; i++) {
executorService.submit(task);}// 等待计数器为0时唤醒所有等待的线程
latch.await();
System.out.println("integer:" + integer);
executorService.shutdown();}
}// 使用CountDownLatch 时,主线程需要等待所有的子线程计算完成后再输出,计算结果为:100
5. CyclicBarrier
CyclicBarrier 和 CountDownLatch 类似,都是用于等待一个或者多个线程完成特定的任务后再执行某项操作,但不同的是它可以循环使用,示例如下:
/**
* 每五个人完成任务后,则算一个小组已完成
*/
public class J1_CyclicBarrier {private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("五人小组任务执行完成"));static class Task implements Runnable {
@Overridepublic void run() {try {long l = new Double(Math.random() * 5000).longValue();
Thread.sleep(l);
System.out.println("任务" + Thread.currentThread().getId() + "执行完成");
cyclicBarrier.await();} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();}}}public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(20);for (int j = 0; j < 10; j++) {
executorService.submit(new Task());}
executorService.shutdown();}
}// 输出如下:
任务21执行完成
任务20执行完成
任务15执行完成
任务14执行完成
任务22执行完成
五人小组任务执行完成
任务17执行完成
任务13执行完成
任务19执行完成
任务18执行完成
任务16执行完成
五人小组任务执行完成
基于 CyclicBarrier 的特性,通常可以用于在测试环境来模仿高并发,如每次等待一万个线程启动后再让其并发执行某项压力测试。
6. Semaphore
信号量(Semaphore)可以看做是锁的扩展,由于锁的排它性,所以一次只允许一个线程来访问某个特定的资源, 而 Semaphore 则允许多个线程并发的访问某个特定的资源,并且可以通过配置许可证的数量来限制并发访问的线程数,因此其可以用于流量控制等场景中:
public class J1_Semaphore {// 限制并发访问的线程的数量为5private static Semaphore semaphore = new Semaphore(5);static class IncreaseTask implements Runnable {
@Overridepublic void run() {try {
semaphore.acquire();
System.out.println(Thread.currentThread().getId() + "获得锁!");
Thread.sleep(5000);
semaphore.release();} catch (InterruptedException e) {
e.printStackTrace();}}}public static void main(String[] args) {IncreaseTask task = new IncreaseTask();for (int i = 0; i < 20; i++) {new Thread(task).start();}}
}// 输出如下,至多只能有五个线程并发获得锁
13获得锁!
15获得锁!
16获得锁!
18获得锁!
17获得锁!
....
19获得锁!
20获得锁!
21获得锁!
22获得锁!
23获得锁!
....
7. LockSupport
LockSupport 可以在线程内的任意位置实现阻塞。它采用和 Semaphore 类似的信号量机制:它为每个线程准备一个许可,如果许可可用,则 park() 方法会立即返回,并且消费掉这个许可,让许可不可用;此时因为许可不可用,相应的线程就会被阻塞。而 unpark() 则会使得一个许可从不可用变为可用。但和 Semaphore 不同的是:它的许可不能累加,你不可能拥有超过一个许可,它永远只有一个:
public class J1_LockSupport {static class Task implements Runnable {
@Overridepublic void run() {long id = Thread.currentThread().getId();
System.out.println("线程" + id + "开始阻塞");
LockSupport.park();
System.out.println("线程" + id + "解除阻塞");}}public static void main(String[] args) throws InterruptedException {Thread thread01 = new Thread(new Task());Thread thread02 = new Thread(new Task());
thread01.start();
thread02.start();
Thread.sleep(3000);
System.out.println("主线程干预");
LockSupport.unpark(thread01);
LockSupport.unpark(thread02);}
}// 输出:
线程13开始阻塞
线程14开始阻塞
主线程干预
线程13解除阻塞
线程14解除阻塞