目录
1.普通生产者与消费者方式
2.加入阻塞队列
等待唤醒机制即通过wait和notify实现多线程之间的通信,进而实现多线程协同工作。
1.普通生产者与消费者方式
生产者和消费者是一种经典的多线程协同工作模式,以厨师(生产者)和顾客(消费者)为例:
厨师和顾客中间有张桌子,厨师会将做好的饭放到桌子上,顾客需要从桌子上拿到饭并吃掉。初始状态下厨师还没有做饭,桌子上是空的,顾客也还没有开始吃饭。
对于厨师而言,他要做以下五种事情:
- 检查桌子上是否还有饭
- 如果桌子上有饭则等待顾客吃完
- 如果没有,则开始做饭
- 将做好的饭放到桌子上
- 通知顾客来吃
对于顾客而言,他要做以下四种事情:
- 检查桌子上是否有饭
- 如果没有饭则等待厨师做好饭
- 如果有饭则开始吃饭
- 吃完饭后通知厨师继续开始做饭
如果用代码实现上面的通知和等待,则要用到下面三种方法:
其中通知经常是用notifyAll,因为如果线程有很多,使用notify仅能随机唤醒一个线程,那可能要唤醒好多次才能唤醒所需要的线程,而notifyAll能唤醒所有线程,自然也会唤醒所需要的线程,所以一般使用notifyAll用来通知。
用代码实现上面的顾客、厨师以及桌子:
//桌子Table
public class Table {//桌子上是否有饭的信号,0代表没有饭,1代表有饭public static int flag=0;//顾客吃饭的目标public static int count=3;//顾客和厨师共享的锁,因为对于共享资源加的锁必须是唯一的public static Object object=new Object();}//顾客Customer
public class Customer extends Thread{@Overridepublic void run() {synchronized (Table.object){while (true){//先检查自己是否已经达成了吃饭的目标//如果已经达到了就break//如果没有则进行接下来的操作if(Table.count==0){System.out.println("顾客达成了目标,结账走人了。");break;}else{//1.先检查桌子上有没有饭//2.如果有则执行吃饭操作//3.如果没有则等待厨师做饭if(Table.flag==1){System.out.println("顾客正在吃饭中...");//将饭从桌子上拿走Table.flag=0;//吃了这碗饭后离目标又近了一步Table.count--;System.out.println("饭吃完了,还剩"+Table.count+"碗饭没吃,正在通知厨师做饭...");//4.吃完饭后通知厨师继续做饭Table.object.notifyAll();}else {try {Table.object.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}}}
}//厨师Cook
public class Cook extends Thread{@Overridepublic void run() {synchronized (Table.object) {while (true) {//先判断是否顾客已经达成吃饭的目标//如果吃饭的目标达成则退出循环//如果没有则继续接下来的操作if(Table.count==0){System.out.println("厨师了解到顾客达成了目标,结束做饭。");break;}else{//1.先检查桌子上有没有饭//2.如果没有则开始做饭//3.如果有则进入等待if(Table.flag==0){System.out.println("厨师正在做饭中...");//4.饭做好了,将饭放到桌子上Table.flag=1;//5.通知顾客来吃//唤醒所有绑定到这个锁的线程System.out.println("饭做好了,已放到桌子上,正在通知顾客来吃...");Table.object.notifyAll();}else{try {//绑定加上这个共享锁的线程,并让该线程进入等待状态Table.object.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}}}
}//模拟顾客和厨师的过程
public class Main {public static void main(String[] args) {Thread t1=new Customer();Thread t2=new Cook();t1.start();t2.start();}
}
运行结果:
首先要注意的是顾客和厨师是两个线程类而不是两个普通的类。然后就是Table相当于管理共享的变量,共享的变量要设置为静态的。最后就是在调用wait和notifyAll时要使用共享锁来调用,用共享锁调用wait会将这个锁和添加这个锁的线程进行绑定,在用这个共享锁调用notifyAll时会唤醒绑定这个锁的所有线程,从而在唤醒时不会通知其他无关的线程。
下面使用lambda表达式来实现:
//资源类,饭
public class Food {//桌子上的食物数量private int num=0;//目标一共需要吃十碗饭private int sumnum=10;//厨师的做饭方法public synchronized boolean cook() throws InterruptedException {//检查是否达到了目标if(sumnum==0){return false;}//是否需要等待if(num!=0){this.wait();}else{//做饭的业务代码num++;System.out.println(Thread.currentThread().getName()+"做好了一碗饭,当前有"+num+"碗饭");//饭做好了,通知顾客来吃this.notifyAll();}return true;}//顾客的吃饭方法public synchronized boolean eat() throws InterruptedException {//检查是否达到了目标if(sumnum==0){return false;}//是否需要等待if(num==0){this.wait();}else{//吃饭的业务代码num--;sumnum--;System.out.println(Thread.currentThread().getName()+"吃了一碗饭,当前有"+num+"碗饭");//饭吃完了,通知厨师继续做this.notifyAll();}return true;}
}//实现
public class Main {public static void main(String[] args) {//创建资源类Food的对象Food food=new Food();//创建厨师线程Thread c1=new Thread(()->{while (true){try {if(!food.cook())break;} catch (InterruptedException e) {throw new RuntimeException(e);}}},"做饭线程1");//创建顾客线程Thread e1=new Thread(()->{while (true){try {if(!food.eat())break;} catch (InterruptedException e) {throw new RuntimeException(e);}}},"吃饭线程1");//启动厨师和顾客的线程c1.start();e1.start();}
}
注意一定要使用if-else语句,将业务代码和通知放在else中:
//正确格式
if(等待条件){wait;}else{业务代码;notifyAll;
}//错误格式
if(等待条件){wait;
}
业务代码;
notifyAll;
如果使用错误格式将业务代码和通知放到外面,那么这个线程在被唤醒后会接着执行业务代码,但是如果这个线程还不满足执行业务代码的条件,由于其他线程调用了notifyAll唤醒了所有的线程,那么这个线程会在不满足条件下开始执行业务代码,这就是虚假唤醒。如果将这些代码放入else中,即使这个线程在不满足条件下被唤醒也不会执行业务代码,而是会进入下一次循环,在进入下一次循环后又会判断是否满足条件。
在使用等待唤醒机制实现线程之间的通信时,可以利用口诀“等待、业务、通知”来编写要执行的方法:
//先判断是否需要等待,如果满足等待条件,执行wait
//业务代码
//使用notifyAll通知其他线程
if(等待条件){this.wait();}else{业务代码this.notifyAll();
}
2.加入阻塞队列
阻塞队列就相当于替换掉了中间的桌子,顾客吃饭时从阻塞队列中拿,厨师做好饭后将饭放到阻塞队列中;阻塞队列由于是一个队列,会遵循先入先出。桌子只能放一碗饭,而阻塞队列可以放多碗饭;并且阻塞队列的底层已经加锁了,不需要我们手动加锁。
阻塞队列有两种,一种是ArrayBlockingQueue<类型>,底层是基于数组的,所以这种阻塞队列的空间是有限的;另一种是LinkedBlockingQueue<类型>,底层是基于链表的,所以这种阻塞队列的空间理论上是无限的,但实际上它的最大长度不能超过int的范围。
用阻塞队列实现起来很简单,生产者只需要向里面放东西,也就是调用put方法,消费者只需要从里面取东西就行了,也就是调用take方法。下面是put和take的源码:
put:
take:
下面用阻塞队列来实现顾客和厨师:
//厨师
public class Cook extends Thread{ArrayBlockingQueue<String> queue;public Cook(ArrayBlockingQueue<String> queue){this.queue=queue;}@Overridepublic void run() {while (true){try {queue.put("饭");System.out.println("厨师做好了一碗饭");} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}//顾客
public class Customer extends Thread{ArrayBlockingQueue<String> queue;public Customer(ArrayBlockingQueue<String> queue){this.queue=queue;}@Overridepublic void run() {while (true){try {String food=queue.take();System.out.println("顾客拿走了一碗饭");} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}//模拟顾客和厨师的过程
public class Main {public static void main(String[] args) {//由于阻塞队列只能有一个,所以阻塞队列不能在Cook或Customer中创建,直接从main中创建即可ArrayBlockingQueue<String> queue=new ArrayBlockingQueue<String>(1);//将这个阻塞队列分别传递给Cook和Customer,保证两者使用的是同一个阻塞队列Thread t1=new Cook(queue);Thread t2=new Customer(queue);t1.start();t2.start();}
}
运行结果:
在上面的代码中, 我们给阻塞队列设置的空间为1,那应该输出的是厨师放一碗饭顾客拿一碗饭才对,而这里却出现了厨师连放多次饭以及顾客连取多次饭的情况,这难道是取和放的时候出现了问题吗?
其实并不是,因为put和take的底层已经进行了加锁和解锁的操作,所以这两个过程是没有问题的,问题出在后面的输出语句。虽然put和take进行了加锁,但是输出的操作并没有加锁,这很可能出现乱输出的情况,毕竟线程的运行完全随机,永远不知道下一秒是哪个线程在占用CPU。不过虽然输出操作会出现问题,但这对实现顾客和厨师的过程并不会产生影响,因为输出操作只是用来给自己看的,实际运行过程不出错即可。