目录
- wait & notify
- wait notify 原理
- 常用API
- sleep(long n)和wait(long n)的区别
- wait notify的使用套路
- 同步模式之保护性暂停
- 实现
- 带超时版
- 多任务版
- 生产者/消费者模式
- park & unpark
- 基本使用
- 原理
wait & notify
wait notify 原理
假设一个线程,在获取锁执行代码块后,可能由于某些条件不满足,在代码块里一直等着条件,这样就会一直占用着锁,其它人就得一直阻塞,效率太低。
在前面一节学习Monitor
时,提到过Monitor里的WaitSet
,它得主要是一些前面获取过锁,但是在等待某些条件重新唤醒的线程。
WaitSet里的线程与EntrySet里不同的是:WaitSet里的线程以前已经获取过锁了,只是由于不满足一些条件暂时阻塞了,里面的线程是不会给分配锁的。EntrySet里的线程都是等待分配锁的线程,可能包含第一次进入队列的,也可能有从WaitSet里被唤醒的。
wait/notify
配合Monitor里的WaitSet,就可以解决上面说的一直占用锁的问题。
原理如下:
- 当某个线程获取锁在代码块执行时,发现条件不满足,调用 wait 方法,释放锁,即可进入 WaitSet队列, 变为 WAITING 状态;
- WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入EntryList 重新竞争。
BLOCKED 和 WAITING 的线程区别:
BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片;但是WAITING线程需要Owner 线程notify唤醒,BLOCKED 线程需要 Owner 线程释放锁时唤醒;
注意:
- wait/notify在调用前一定要获得锁,如果在调用前没有获得锁,程序会抛出异常;
- 如果获得的不是同一把锁,notify不起作用。
- 执行notify方法后,当前线程并不会立即释放锁,要等到程序执行完,即退出synchronized同步区域后
总结:wait 方法使线程暂停运行,而notify 方法通知暂停的线程继续运行。
常用API
obj.wait()
让进入 object 监视器的线程到 waitSet 等待wait(long n)
让进入 object 监视器的线程到 waitSet 等待,等待时间为n毫秒obj.notify()
在 object 上正在 waitSet 等待的线程中随机挑一个唤醒obj.notifyAll()
让 object 上正在 waitSet 等待的线程全部唤醒
它们都是线程之间进行协作的手段,都属于 Object 对象的方法。
示例:
import lombok.extern.slf4j.Slf4j;@Slf4j
public class WaitFyTest01 {final static Object obj = new Object();public static void main(String[] args) throws InterruptedException {new Thread(() -> {synchronized (obj) {log.info("线程1执行....");try {obj.wait(); // 让线程在obj上一直等待下去} catch (InterruptedException e) {e.printStackTrace();}log.info("线程1执行完毕");}}).start();new Thread(() -> {synchronized (obj) {log.info("线程2执行....");try {obj.wait(); // 让线程在obj上一直等待下去} catch (InterruptedException e) {e.printStackTrace();}log.info("线程2执行完毕");}}).start();// 主线程两秒后执行Thread.sleep(2000);log.info("唤醒 obj 上其它线程");synchronized (obj) {obj.notify(); // 唤醒obj上一个线程// obj.notifyAll(); // 唤醒obj上所有等待线程}}}
notify 的一种结果如下:
2023-05-29 23:38:30,536 - 0 INFO [Thread-0] up.cys.chapter03.WaitFyTest01:16 - 线程1执行....
2023-05-29 23:38:30,544 - 8 INFO [Thread-1] up.cys.chapter03.WaitFyTest01:28 - 线程2执行....
2023-05-29 23:38:32,542 - 2006 INFO [main] up.cys.chapter03.WaitFyTest01:39 - 唤醒 obj 上其它线程
2023-05-29 23:38:32,545 - 2009 INFO [Thread-0] up.cys.chapter03.WaitFyTest01:22 - 线程1执行完毕
notifyAll 的结果如下:
2023-05-29 23:40:18,855 - 0 INFO [Thread-0] up.cys.chapter03.WaitFyTest01:16 - 线程1执行....
2023-05-29 23:40:18,863 - 8 INFO [Thread-1] up.cys.chapter03.WaitFyTest01:28 - 线程2执行....
2023-05-29 23:40:20,862 - 2007 INFO [main] up.cys.chapter03.WaitFyTest01:39 - 唤醒 obj 上其它线程
2023-05-29 23:40:20,866 - 2011 INFO [Thread-0] up.cys.chapter03.WaitFyTest01:22 - 线程1执行完毕
2023-05-29 23:40:20,867 - 2012 INFO [Thread-1] up.cys.chapter03.WaitFyTest01:34 - 线程2执行完毕
wait() 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到notify 为止。
sleep(long n)和wait(long n)的区别
sleep是让线程睡眠一段时间,wait是让线程等待一段时间。虽然二者都是让线程等待一段时间再执行,但是二者完全不同。
- sleep 是 Thread 方法,而 wait 是 Object 的方法
- sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
- sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
- 两种等待下,它们的线程状态都是 TIMED_WAITING
wait notify的使用套路
一般我们使用wait notify时:
- 在wait端:一般有个while循环一直来判断条件是否满足,如果不满足就进入wait等待,满足就执行逻辑
- 在notify端一般使用notifyAll来唤醒所有等待的线程。因为notify只能随机唤醒一个,会存在虚假唤醒(通知了但没有真正唤醒)的情况。
代码结构如下:
synchronized(lock) {while(条件不成立) {lock.wait();}// 干活
}
//另一个线程
synchronized(lock) {lock.notifyAll();
}
同步模式之保护性暂停
实现
学完wait notify,来利用它实现一个模式,即保护性暂停。
假如有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject(如果有结果不断从一个线程到另一个线程那么可以使用消息队列,即消费者/生产者模式)。JDK 中,join 的实现、Future 的实现,采用的就是此模式。因为要等待另一方的结果,因此归类到同步模式。
首先实现GuardedObject
类,它主要用来存储response,并提供获取和设置response值的方法,代码如下:
class GuardedObject {private Object response;/*** 锁对象*/private final Object lock = new Object();/*** 获取response* @return*/public Object get() {synchronized (lock) {// 条件不满足则等待while (response == null) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();} }return response; }}/***设置response* @param response*/public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}
}
使用时,测试类代码如下:
public static void main(String[] args) {GuardedObject guardedObject = new GuardedObject();// 一个子线程设置responsenew Thread(() -> {try {// 休息2秒再设置Thread.sleep(2000);log.info("set response complete...");guardedObject.complete(1);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 主线程等着获取response,阻塞log.info("waiting...");Object response = guardedObject.get();log.info("get response: {}", response);}
执行结果如下:
2023-05-31 21:33:48,583 - 0 INFO [main] up.cys.chapter03.GuardedObjectTest:32 - waiting...
2023-05-31 21:33:50,585 - 2002 INFO [Thread-0] up.cys.chapter03.GuardedObjectTest:24 - set response complete...
2023-05-31 21:33:50,604 - 2021 INFO [main] up.cys.chapter03.GuardedObjectTest:34 - get response: 1
与前面我们了解的join相比,功能类似,但是相比join:
- 主线程不需要等待子线程结束,保护性暂停模式不需要,只要等唤醒就可以,唤醒后子线程还可以做其他事
- join等待结果的变量只能设置为全局的,这样其他线程才可以拿到,但是这个模式中的response是局部的,通过一个对象来传递。
带超时版
上面实现的get方法会一直等待,如果想设置一个等待的超时时间,如何实现?
主要思路:修改GuardedObject的get方法,增加一个参数,为超时时间;在wait时,设置wait超时时间。仅仅如此还不够,因为我们有while循环,所以,当下次被虚假唤醒后,还没有response时,又再次进入了循环,重新等待了,时间也不对了。所以需要记录这个等待时间,每次循环重新进来时,要重新计算等待时间。具体代码如下:
import lombok.extern.slf4j.Slf4j;@Slf4j
public class GuardedObjectTest {public static void main(String[] args) {GuardedObject guardedObject = new GuardedObject();// 一个子线程设置responsenew Thread(() -> {try {// 休息2秒再设置Thread.sleep(2000);log.info("set response complete...");guardedObject.complete(1);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 主线程等着获取response,阻塞log.info("waiting...");Object response = guardedObject.get(3000);log.info("get response: {}", response);}}@Slf4j
class GuardedObject {private Object response;/*** 锁对象*/private final Object lock = new Object();/*** 获取response* @return*/public Object get(long millis) {synchronized (lock) {// 1) 记录最初时间long begin = System.currentTimeMillis();// 2) 已经经历的时间long timePassed = 0;while (response == null) {// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等long waitTime = millis - timePassed;log.debug("waitTime: {}", waitTime);if (waitTime <= 0) {log.debug("break...");break; }try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}// 3) 如果提前被唤醒,这时已经经历的时间假设为 400timePassed = System.currentTimeMillis() - begin;}return response;}}/***设置response* @param response*/public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}
}
多任务版
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。
新增 id 用来标识 Guarded Object:
class GuardedObjectProved {// 标识 Guarded Objectprivate int id;public GuardedObjectProved(int id) {this.id = id;}public int getId() {return id;}private Object response;/*** 锁对象*/private final Object lock = new Object();/*** 获取response* @return*/public Object get(long millis) {synchronized (lock) {// 1) 记录最初时间long begin = System.currentTimeMillis();// 2) 已经经历的时间long timePassed = 0;while (response == null) {// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等long waitTime = millis - timePassed;log.debug("waitTime: {}", waitTime);if (waitTime <= 0) {log.debug("break...");break; }try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}// 3) 如果提前被唤醒,这时已经经历的时间假设为 400timePassed = System.currentTimeMillis() - begin;}return response;}}/***设置response* @param response*/public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}
}
中间解耦类,相当于邮箱:
class Mailboxes {private static Map<Integer, GuardedObjectProved> boxes = new Hashtable<>();private static int id = 1;// 产生唯一 idprivate static synchronized int generateId() {return id++;}public static GuardedObjectProved getGuardedObject(int id) {return boxes.remove(id);}public static GuardedObjectProved createGuardedObject() {GuardedObjectProved go = new GuardedObjectProved(generateId());boxes.put(go.getId(), go);return go;}public static Set<Integer> getIds() {return boxes.keySet();}
}
业务相关类
@Slf4j
class People extends Thread{@Overridepublic void run() {// 收信GuardedObjectProved guardedObject = Mailboxes.createGuardedObject();log.info("开始收信 id:{}", guardedObject.getId());Object mail = guardedObject.get(5000);log.info("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);}
}@Slf4j
class Postman extends Thread {private int id;private String mail;public Postman(int id, String mail) {this.id = id;this.mail = mail;}@Overridepublic void run() {GuardedObjectProved guardedObject = Mailboxes.getGuardedObject(id);log.info("送信 id:{}, 内容:{}", id, mail);guardedObject.complete(mail);}
}
测试:
public class GuardedObjectTest02 {public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 3; i++) {new People().start();}Thread.sleep(1000);for (Integer id : Mailboxes.getIds()) {new Postman(id, "内容" + id).start();}}
}
生产者/消费者模式
与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应。消费队列可以用来平衡生产和消费的线程资源。生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据。消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据。JDK 中各种阻塞队列,采用的就是这种模式。
在实现时注意以下几点:
- 这里消息队列与我们以前认知的不同,以前的大多数是进程间通信使用的。
- 消息需要有唯一标识,用来区分不同消息,因此自己定义一个消息类。
具体实现代码如下:
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;@Slf4j
public class ProductConsumerTest {public static void main(String[] args) {MessageQueue messageQueue = new MessageQueue(2);// 4 个生产者线程for (int i = 0; i < 4; i++) {int id = i;new Thread(() -> {try {log.info("download...");log.info("try put message({})", id);Thread.sleep(1000);messageQueue.put(new Message(id, "response"));} catch (InterruptedException e) {e.printStackTrace();}}, "生产者" + i).start();}// 1 个消费者线程, 处理结果new Thread(() -> {while (true) {Message message = messageQueue.take();String response = (String) message.getMessage();log.info("take message({}): {}", message.getId(), response);}}, "消费者").start();}
}class Message {private int id;private Object message;public Message(int id, Object message) {this.id = id;this.message = message;}public int getId() {return id;}public Object getMessage() {return message;}
}@Slf4j
class MessageQueue {private LinkedList<Message> queue;private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;queue = new LinkedList<>();}public Message take() {synchronized (queue) {while (queue.isEmpty()) {log.info("没货了, wait");try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}Message message = queue.removeFirst();queue.notifyAll();return message;}}public void put(Message message) {synchronized (queue) {while (queue.size() == capacity) {log.info("库存已达上限, wait");try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(message);queue.notifyAll();}}
}
运行如下:
2023-06-02 20:59:07,643 - 0 INFO [生产者1] up.cys.chapter03.ProductConsumerTest:23 - download...
2023-06-02 20:59:07,644 - 1 INFO [生产者3] up.cys.chapter03.ProductConsumerTest:23 - download...
2023-06-02 20:59:07,644 - 1 INFO [生产者2] up.cys.chapter03.ProductConsumerTest:23 - download...
2023-06-02 20:59:07,644 - 1 INFO [生产者0] up.cys.chapter03.ProductConsumerTest:23 - download...
2023-06-02 20:59:07,644 - 1 INFO [消费者] up.cys.chapter03.MessageQueue:73 - 没货了, wait
2023-06-02 20:59:07,673 - 30 INFO [生产者0] up.cys.chapter03.ProductConsumerTest:24 - try put message(0)
2023-06-02 20:59:07,673 - 30 INFO [生产者3] up.cys.chapter03.ProductConsumerTest:24 - try put message(3)
2023-06-02 20:59:07,673 - 30 INFO [生产者2] up.cys.chapter03.ProductConsumerTest:24 - try put message(2)
2023-06-02 20:59:07,673 - 30 INFO [生产者1] up.cys.chapter03.ProductConsumerTest:24 - try put message(1)
2023-06-02 20:59:08,683 - 1040 INFO [生产者0] up.cys.chapter03.MessageQueue:88 - 库存已达上限, wait
2023-06-02 20:59:08,684 - 1041 INFO [消费者] up.cys.chapter03.ProductConsumerTest:37 - take message(2): response
2023-06-02 20:59:08,686 - 1043 INFO [消费者] up.cys.chapter03.ProductConsumerTest:37 - take message(3): response
2023-06-02 20:59:08,686 - 1043 INFO [消费者] up.cys.chapter03.ProductConsumerTest:37 - take message(1): response
2023-06-02 20:59:08,686 - 1043 INFO [消费者] up.cys.chapter03.ProductConsumerTest:37 - take message(0): response
2023-06-02 20:59:08,686 - 1043 INFO [消费者] up.cys.chapter03.MessageQueue:73 - 没货了, wait
park & unpark
基本使用
park/unpark
与 wait/notify
功能类似,都是用来暂停和唤醒线程。park用来暂停线程,unpark用来将暂停的线程恢复。两个都是LockSupport
类下的方法。
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)
示例:
先 park 再 unpark
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.LockSupport;@Slf4j
public class ParkUnparkTest01 {public static void main(String[] args) {Thread t1 = new Thread(() -> {log.info("start...");// 子线程阻塞1秒try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.info("park...");LockSupport.park();log.info("resume...");},"t1");t1.start();// 主线程阻塞2秒try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}log.info("unpark...");LockSupport.unpark(t1);}
}
输出如下:
2023-06-02 21:42:03,990 - 0 INFO [t1] up.cys.chapter03.ParkUnparkTest01:17 - start...
2023-06-02 21:42:05,004 - 1014 INFO [t1] up.cys.chapter03.ParkUnparkTest01:24 - park...
2023-06-02 21:42:05,992 - 2002 INFO [main] up.cys.chapter03.ParkUnparkTest01:36 - unpark...
2023-06-02 21:42:05,993 - 2003 INFO [t1] up.cys.chapter03.ParkUnparkTest01:26 - resume...
如果先 unpark 再 park,如下:
2023-06-02 21:44:09,154 - 0 INFO [t1] up.cys.chapter03.ParkUnparkTest01:17 - start...
2023-06-02 21:44:10,160 - 1006 INFO [main] up.cys.chapter03.ParkUnparkTest01:36 - unpark...
2023-06-02 21:44:11,166 - 2012 INFO [t1] up.cys.chapter03.ParkUnparkTest01:24 - park...
2023-06-02 21:44:11,166 - 2012 INFO [t1] up.cys.chapter03.ParkUnparkTest01:26 - resume...
注意运行结果,主线程先进行了unpark,当t1线程park后,但是紧接着就打印了resume,并没有暂停,这是为什么呢?主要由于先unpark后,会保存一个状态,下次park也不会暂停线程了。
特点:
- wait、notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
- park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify
原理
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter
, _cond
和 _mutex
。
_cond
:类似队列,当线程暂停时,存放线程的地方_counter
:判断条件,有0 和1两个状态_mutex
:互斥锁
- 当调用
park()
方法时
检查 _counter
:
- 如果_counter是0,这时,获得 _mutex 互斥锁,线程进入 _cond 条件变量阻塞,并再次设置 _counter = 0
- 如果_counter是1,则线程继续运行
- 当调用
unpark((Thread_0)
方法时
-
如果这时线程在
_cond
中:设置 _counter 为 1,唤醒_cond
条件变量中的 Thread_0,Thread_0 恢复运行,最后设置 _counter 为 0 -
如果这时线程在运行中:设置
_counter
为 1即可;如果后面又调用了park
方法,则检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行, 设置 _counter 为 0