java八股文面试[多线程]——阻塞队列

阻塞队列大纲:

什么是阻塞队列

阻塞队列:从名字可以看出,他也是队列的一种,那么他肯定是一个先进先出(FIFO)的数据结构。与普通队列不同的是,他支持两个附加操作,即阻塞添加阻塞删除方法。

那么阻塞添加跟阻塞删除是什么意思呢?

阻塞队列的特点

image-20200620174922523.png

如上图,线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素。而在这一系列操作必须符合以下规定:

  • 阻塞添加:当阻塞队列是满时,往队列里添加元素的操作将被阻塞。
  • 阻塞移除:当阻塞队列是空时,从队列中获取元素/删除元素的操作将被阻塞。

上面的解释可能还有点晦涩难懂,那我举个例子吧:

现有三个角色:顾客,休息区,银行办理窗口。(Thread1为顾客,BlockingQueue为休息区,Thread2为银行办理窗口)。

  1. 正常情况下,一个银行办理窗口同一时间只能对接一个顾客;

  2. 恰巧今天办理的顾客有3个人,另外2个顾客怎么办,你总不至于给人家说不办了,快回家吧;

  3. 而正确的做法是你可以让这两个人在休息区等候,等银行窗口空闲了,然后去办理。

一个人正在银行办理业务,你后面的人不能打断(保证了原子性),或者争抢(有序性,先进先出),只能在休息区等待,直到上一个人释放资源,才轮到下一个人。

其实上面的情况面临的问题是:当一个线程占有资源的时候,你后面线程请求不得不阻塞,但这也不一定是缺点,反而更像是一件好事,因为他并不暴力的解决问题。

我们再来看一下关于阻塞的定义:

在多线程中,阻塞的意思是,在某些情况下会挂起线程,一旦条件成熟,被阻塞的线程就会被自动唤醒。

也就是说,线程的wait和notify机制是需要我们自己去手动控制,但是我们自己认为的控制是很容易出现问题的,比如死锁,逻辑判断等…

但是有了阻塞队列,一切的问题就迎刃而解了。

阻塞队列的好处

阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。

BlockingQueue的主要方法

BlockingQueue提供的部分方法:

1592650647(1).jpg

根据插入和取出两种类型的操作,具体分为下面一些类型:

方法类型抛出异常返回布尔阻塞超时
插入add(E e)offer(E e)put(E e)offer(E e,Time,TimeUnit)
取出remove()poll()take()poll(Time,TimeUnit)
队首element()peek()
  • 抛出异常是指当队列满时,再次插入会抛出异常(如果队列未满,插入返回值未true);

  • 返回布尔是指当队列满时,再次插入会返回false;

  • 阻塞是指当队列满时,再次插入会被阻塞,直到队列取出一个元素,才能插入。

  • 超时是指当一个时限过后,才会插入或者取出。

生产
add、offer、put这3个方法都是往队列尾部添加元素,区别如下:
add:不会阻塞,添加成功时返回true,不响应中断,当队列已满导致添加失败时抛出IllegalStateException
offer:不会阻塞,添加成功时返回true,因队列已满导致添加失败时返回false不响应中断
put:会阻塞会响应中断。

消费
take、poll方法能获取队列头部第1个元素,区别如下:
take:会响应中断,会一直阻塞直到取得元素或当前线程中断。
poll:会响应中断,会阻塞,阻塞时间参照方法里参数timeout.timeUnit,当阻塞时间到了还没取得元素会返回null

add方法代码

public class BlockingQueueTest {public static void main(String[] args) {BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);System.out.println("--------以下为add的相关操作---------");addRemoveTest(blockingQueue);}public static void addRemoveTest(BlockingQueue<String> blockingQueue) {System.out.println("添加状态+\t"+blockingQueue.add("1"));System.out.println("添加状态+\t"+blockingQueue.add("2"));System.out.println("添加状态+\t"+blockingQueue.add("3"));
//        System.out.println("添加状态+\t"+blockingQueue.add("4"));System.out.println("队首元素+\t"+blockingQueue.element());System.out.println("删除元素+\t"+blockingQueue.remove());System.out.println("队首元素+\t"+blockingQueue.element());System.out.println("删除元素+\t"+blockingQueue.remove());System.out.println("队首元素+\t"+blockingQueue.element());System.out.println("删除元素+\t"+blockingQueue.remove());
//        System.out.println("队首元素+\t"+blockingQueue.element());
//        System.out.println("删除元素+\t"+blockingQueue.remove(blockingQueue.element()));}
}

未打开注释代码输出如下:

--------以下为add的相关操作---------
添加状态+	true
添加状态+	true
添加状态+	true
队首元素+	1
删除元素+	1
队首元素+	2
删除元素+	2
队首元素+	3
删除元素+	3

当队列已满,继续添加元素时(打开注释代码),输出如下:

--------以下为add的相关操作---------
添加状态+	true
添加状态+	true
添加状态+	true
Exception in thread "main" java.lang.IllegalStateException: Queue fullat java.util.AbstractQueue.add(AbstractQueue.java:98)...略

BlockingQueue的实现类

从整体架构图上来看,BlockingQueue是实现了Queue接口,而Queue是属于Collection接口下的派生类。

ArrayBlockingQueue 由数组构成的有界阻塞队列
LinkedBlockingQueue 由链表构成的有界阻塞队列
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
DelayQueue 支持优先级的延迟无界阻塞队列
SynchronousQueue 单个元素的阻塞队列
LinkedTransferQueue 由链表构成的无界阻塞队列
LinkedBlockingDeque 由链表构成的双向阻塞队列

粗体标记的三个用得比较多,许多消息中间件底层就是用它们实现的,也是我们下面着重说明的。

SynchronousQueue: 队列只有一个元素,如果想插入多个,必须等队列元素取出后,才能插入,只能有一个“坑位”,用一个插一个。

需要注意的是LinkedBlockingQueue虽然是有界的,但有个巨坑,其默认大小是Integer.MAX_VALUE,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor有体现)。
 

SynchronusQueue

在这里需要特别演示以下synchronusQueue的使用,他是不存储元素的,来一个,消费一个。(同一时间内只能添加一个元素)

代码演示如下:

public class SynchronusQueueTest {public static void main(String[] args) {BlockingQueue<String> synchronusQueue = new SynchronousQueue<>();new Thread(() ->{try{System.out.println(Thread.currentThread().getName()+"\t put 1");synchronusQueue.put("1");System.out.println(Thread.currentThread().getName()+"\t put 2");synchronusQueue.put("2");System.out.println(Thread.currentThread().getName()+"\t put 3");synchronusQueue.put("3");}catch(Exception e){e.getStackTrace();}},"Prod").start();new Thread(() ->{try {try{ TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){ e.printStackTrace(); }System.out.println(Thread.currentThread().getName()+"\t take "+synchronusQueue.take());try{ TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){ e.printStackTrace(); }System.out.println(Thread.currentThread().getName()+"\t take "+synchronusQueue.take());try{ TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){ e.printStackTrace(); }System.out.println(Thread.currentThread().getName()+"\t take"+synchronusQueue.take());} catch (Exception e) {e.printStackTrace();}},"Cons").start();}

输出如下:(有时间间隔

Prod	 put 1
Cons	 take 1
Prod	 put 2
Cons	 take 2
Prod	 put 3
Cons	 take3

ArrayListBlockingQueue原理

通过源码来看,其实ArrayListBlockingQueue是通过ReentrantLock和Condition条件队列来实现阻塞的。一些成员变量如下:

    //存储数据final Object[] items;//返回获取数据的索引,主要用于take、poll、peek、remove方法int takeIndex;//返回添加数据的索引,主要用于 put、offer、add 方法int putIndex;// 队列元素的个数int count;//可重入锁final ReentrantLock lock;//条件对象,用于通知take方法队列的线程private final Condition notEmpty;//条件对象,用于通知put方法队列的线程private final Condition notFull;//迭代器transient Itrs itrs = null;

添加元素原理

我们再来看看元素的插入是怎么使用这个Condition条件的。添加方法有add,offer,put。我们先来看看**add(offer)**方法:

//add方法
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}//offer方法
public boolean offer(E e) {//判断是否为nullcheckNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {//判断队列是否满if (count == items.length)return false;else {//添加元素到队列enqueue(e);return true;}} finally {lock.unlock();}}//元素入队操作
private void enqueue(E x) {//获取当前数组final Object[] items = this.items;//通过putIndex索引对数组进行赋值items[putIndex] = x;//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;if (++putIndex == items.length)putIndex = 0;//队列中元素数量加1count++;//唤醒调用take()方法的线程,执行元素获取操作。notEmpty.signal();
}

首先add方法其实本质调用的是offer方法,而在offer的最关键处,也就是enqueue入队操作。

  1. reentrantLock保证的线程的互斥性,即同一时间只能有一个线程操作。如果队列已满,返回ture,add方法则是抛出异常;如果队列未满,则开始入队操作。

  2. 在入队操作时,他会通过一个全局变量putIndex作为索引,指引着新来元素的位置。在这里有个小细节,就是判断putIndex是否与队列长度相等,如果队列已满,而且队列的操作时先进先出,索引下一次来插入元素的位置肯定时队头,也就是索引0的位置;

  3. 队内已经有元素了,然后开始唤醒take操作来消费元素。这个signal()其实时notify()的升级版

在添加操作中,还有一个put方法,他是会导致线程阻塞的。具体源码如下:

//put方法,阻塞时可中断public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();//该方法可中断try {//当队列元素个数与数组长度相等时,无法添加元素while (count == items.length)//将当前调用线程挂起,添加到notFull条件队列中等待唤醒notFull.await();enqueue(e);//如果队列没有满直接添加。。} finally {lock.unlock();}}

他是通过condition的await方法来实现阻塞的,但由于又添加了lockInterruptibly标识,说明其阻塞可被打断

获取元素/删除元素原理

添加方法有remove,poll,take。我们先来看看poll方法

	public E poll() {//reentrantLock互斥锁final ReentrantLock lock = this.lock;lock.lock();try {//如果队列为0,返回null,反之进入移除队列return (count == 0) ? null : dequeue();} finally {lock.unlock();}}//移除队列private E dequeue() {//获取当前队列数据final Object[] items = this.items;@SuppressWarnings("unchecked")//获取当前队头数据E x = (E) items[takeIndex];//将队头数据置为nullitems[takeIndex] = null;//如果队头索引自增与数组长度相等,则将其索引设置为第一位if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)//更新迭代器中的元素数据itrs.elementDequeued();//唤醒put/offer/add等方法notFull.signal();return x;}

poll方法他是通过删除队列头数据来进行移除元素的,唤醒与沉睡机制采用reentrantLock的 condition机制。接下来我们来看一下take方法:

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();//中断try {//队列没有元素,阻塞移除方法的线程while (count == 0)notEmpty.await();//有元素进行元素移除操作return dequeue();} finally {lock.unlock();}}

take方法跟poll方法一样,也是通过dequeue()方进行移除元素,但不同的是,他会进行一个线程阻塞,也就是运用condition的await方法,同时这个阻塞时可被打断的,关键词lockInterruptibly

remove方法相对来说比较复杂,他跟以上两个方法的不同点在于remove可以根据索引来删除元素,而另两个则是通过删除队列的头元素。

	public boolean remove(Object o) {//确保传入元素不为nullif (o == null) return false;//获取队列当前数据final Object[] items = this.items;final ReentrantLock lock = this.lock;lock.lock();try {if (count > 0) {final int putIndex = this.putIndex;int i = takeIndex;//找出O元素的索引值do {if (o.equals(items[i])) {//如果匹配到,删除元素,i为o的索引removeAt(i);return true;}//只有一个元素时,重置索引值if (++i == items.length)i = 0;} while (i != putIndex);}return false;} finally {lock.unlock();}}

以下是removeAt方法:

void removeAt(final int removeIndex) {final Object[] items = this.items;//判断当前元素是否是头部索引值if (removeIndex == takeIndex) {items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();} else {//如果不是,通过移动元素位置,将要删除的元素置为队尾删除final int putIndex = this.putIndex;for (int i = removeIndex;;) {//确保当前队列大小大于1int next = i + 1;if (next == items.length)next = 0;//如果不是队尾元素,进行元素移动if (next != putIndex) {items[i] = items[next];i = next;} else {//如果是队尾,元素移动完毕,直接将队尾为null,即删除items[i] = null;this.putIndex = i;break;}}count--;if (itrs != null)itrs.removedAt(removeIndex);}notFull.signal();}

阻塞队列的应用场景

目前我所接触到的场景有两个:

  1. 线程池的底层存储
  2. 生产消费队列模式。

下面我们将着重讲解生产消费队列模式,线程池我们会在专门的章节说到。

首先,我们先来演示以下如果没有阻塞队列,生产消费模式是怎样写的。然后我们在去用阻塞队列去实现。

传统模式

要求:初始值为0的变量,两个线程交替操作,一个+1,一个-1,执行五轮。

//资源类
class MyResource{int number = 0;private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();//自增public void increaseNum(){lock.lock();try{//判断while(number != 0){condition.await();}//干活number++;System.out.println(Thread.currentThread().getName() + "\t" + number);//唤醒condition.signalAll();}catch(Exception e){e.getStackTrace();}finally {lock.unlock();}}//自减public void decrNum(){lock.lock();try{//判断while(number == 0){condition.await();}//干活number--;System.out.println(Thread.currentThread().getName() + "\t" + number);//唤醒condition.signalAll();}catch(Exception e){e.getStackTrace();}finally {lock.unlock();}}}public class ProdConsTradTest {public static void main(String[] args) {MyResource myResource = new MyResource();new Thread(() ->{for (int i = 0; i < 5; i++) {myResource.increaseNum();}},"Prod").start();new Thread(() ->{for (int i = 0; i < 5; i++) {myResource.decrNum();}},"Cons").start();}
}

输出如下:

Prod	1
Cons	0
Prod	1
Cons	0
Prod	1
Cons	0
Prod	1
Cons	0
Prod	1
Cons	0

其实,通过上述代码以及上述阻塞队列的源码可以知道,传统模式的实现就是ArrayListBlockingQueue底层代码,通过condition去通知沉睡与唤醒。

阻塞队列模式

class MyData {//全局开关private volatile boolean flag = true;private BlockingQueue<String> queue;private AtomicInteger atomicInteger =  new AtomicInteger();public MyData(BlockingQueue<String> queue) {this.queue = queue;}public void myProd() throws InterruptedException {String data = null;boolean isOfferSuccess;while(flag){data = atomicInteger.incrementAndGet()+"";isOfferSuccess = queue.offer(data, 2l, TimeUnit.SECONDS);if(isOfferSuccess){System.out.println(Thread.currentThread().getName()+"线程\t 插入队列成功 \t 插入队列的值为:"+data);}else{System.out.println(Thread.currentThread().getName()+"线程\t 插入队列失败");}TimeUnit.SECONDS.sleep(1);}}public void myCons() throws InterruptedException {String result =null;while(flag){result = queue.poll(2l, TimeUnit.SECONDS);if(Objects.isNull(result) || result.equalsIgnoreCase("")){flag = false;System.err.println(Thread.currentThread().getName() + "\t超过2秒钟没有消费,退出消费");return;}System.out.println(Thread.currentThread().getName() + "\t消费队列\t消费\t" + result + "\t成功");}}public void stop() {this.flag = false;}
}public class ProdConsBlockingQueueTest {public static void main(String[] args) {MyData myData = new MyData(new ArrayBlockingQueue<>(10));new Thread(() ->{System.out.println(Thread.currentThread().getName()+" \t 生产者线程开始生产");try {myData.myProd();} catch (InterruptedException e) {e.printStackTrace();}},"Pord").start();new Thread(() ->{System.out.println(Thread.currentThread().getName()+" \t 消费者线程开始消费");try {myData.myCons();} catch (InterruptedException e) {e.printStackTrace();}},"Cons").start();try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.err.println("5秒钟后,叫停");myData.stop();}
}

输出如下:

Pord 	 生产者线程开始生产
Cons 	 消费者线程开始消费
Pord线程	 插入队列成功 	 插入队列的值为:1
Cons	消费队列	消费	1	成功
Pord线程	 插入队列成功 	 插入队列的值为:2
Cons	消费队列	消费	2	成功
Pord线程	 插入队列成功 	 插入队列的值为:3
Cons	消费队列	消费	3	成功
Pord线程	 插入队列成功 	 插入队列的值为:4
Cons	消费队列	消费	4	成功
Pord线程	 插入队列成功 	 插入队列的值为:5
Cons	消费队列	消费	5	成功
5秒钟后,叫停
Cons	超过2秒钟没有消费,退出消费

LinkedBlockingQueue和ArrayBlockingQueue区别
队列大小不同;

ArrayBlockingQueue在初始化的时候,必须指定队列的大小

而LinkedBlockingQueue在初始化的时候,如果你没有指定大小,则会默认Integer.MAX_VALUE,是一个很大的值,这样就会导致数据在一个不可控范围,一旦添加速度远大于移除的速度时,可能会有内存泄漏的风险。

底层实现不同;

ArrayBlockingQueue的底层是一个数组,而LinkedBlockingQueue底层是一个链表结构。官方文档介绍中,LinkedBlockingQueue的吞吐行是高于arrayBlockingQueue;但是在添加或移除元素中,LinkedBlockingQueue则会生成一个额外的Node对象,对GC可能存在影响;

至于为什么说LinkedBlockingQueue的吞吐性是高于arrayBlockingQueue:

吞吐性能强是因为有两个锁,试想一下,Array里面使用的是一个锁,不管put还是take行为,都可能被这个锁卡住,而Linked里面put和take是两个锁,put只会被put行为卡住,而不会被take卡住,因此吞吐性能自然强于Array。 而“less predictable performance”这个也是显而易见的,Array采用的时固定内存,而Linked采用的时动态内存,无论是分配内存还是释放内存(甚至GC)动态内存的性能自然都会比固定内存要差

锁机制不一样;

ArrayBlockingQueue使用的一个锁来控制,LinkedBlockingQueue使用了2个锁来控制,一个名为putLock,另一个是takeLock,但是锁的本质都是ReentrantLock

总结
其实在阻塞队列这一块,他对并发理论的运用还是很多的,都是将其封装为底层,然后直接使用,直接让我们不用要去考虑来回通知唤醒,而且也极大的可能性避免了线程死锁问题。
 

知识来源:

【并发与线程】 阻塞和非阻塞队列的并发安全原理是什么?_哔哩哔哩_bilibili

阻塞队列详解_Zz罗伯特的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/92340.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学术加油站|基于端到端性能的学习型基数估计器综合测评

编者按 本文系东北大学李俊虎所著&#xff0c;也是「 OceanBase 学术加油站」系列第 11 篇内容。 「李俊虎&#xff1a;东北大学计算机科学与工程学院在读硕士生&#xff0c;课题方向为数据库查询优化&#xff0c;致力于应用 AI 技术改进传统基数估计器&#xff0c;令数据库选…

【Python】Python 实现自动发送邮件功能

目录 Python 自动发送邮件 可能会遇到的问题 1. SMTP认证错误&#xff1a; 2. SMTP服务器连接问题&#xff1a; 3. 邮件格式错误&#xff1a; 4. 邮件被拦截或进入垃圾邮件箱&#xff1a; 5. 网络连接问题&#xff1a; 6. 安全性设置问题&#xff1a; 注意事项 1. 发…

项目-IM

tim-server tim-server启动类实现CommandLineRunner接口&#xff0c;重写run()方法 run()方法开启一个线程&#xff0c;创建zk持久父节点&#xff0c;创建临时顺序子节点&#xff0c;将netty-server信息写入 1.1 用户登录 1.2 gateway向认证授权中心请求token 1.3 从zookee…

langchain介绍之-Prompt

LangChain 是一个基于语言模型开发应用程序的框架。它使得应用程序具备以下特点&#xff1a;1.数据感知&#xff1a;将语言模型与其他数据源连接起来。2.代理性&#xff1a;允许语言模型与其环境进行交互 LangChain 的主要价值在于&#xff1a;组件&#xff1a;用于处理语言模型…

MySQL高阶查询语句

目录 一、常用查询 1、按关键字排序 1.1 升序排序 1.2 降序排序 1.3 结合where进项条件过滤再排序 1.4 多条件排序 2、区间判断及查询不重复记录 2.1 and/or&#xff08;且/或&#xff09; 2.2 嵌套 /多条件 2.3 distinct 查询不重复记录 3、对结果进行分组 4、限…

VM装MACos

准备工具: 下载macOS Ventura 13 ISO镜像文件、VMware Workstation Pro最新版并激活(自行官网下载即可,需要镜像和key可以最下边的云盘自取) 下载Unlocker for VMware Workstation Pro,该工具用于解锁 macOS 系统支持、目前已支持macOS 13。 有需要的部分工具我放到最后 …

JavaScript Web APIs - 06 正则表达式

Web APIs - 06 文章目录 Web APIs - 06正则表达式正则基本使用元字符边界符量词范围字符类 替换和修饰符正则插件change 事件判断是否有类 目标&#xff1a;能够利用正则表达式完成小兔鲜注册页面的表单验证&#xff0c;具备常见的表单验证能力 正则表达式综合案例阶段案例 正…

python 笔记(2)——文件、异常、面向对象、装饰器、json

目录 1、文件操作 1-1&#xff09;打开文件的两种方式&#xff1a; 1-2&#xff09;文件操作的简单示例&#xff1a; write方法: read方法&#xff1a; readline方法&#xff1a; readlines方法&#xff1a; 2、异常处理 2-1&#xff09;不会中断程序的异常捕获和处理…

Java实现根据商品ID获取京东商品详情数据,1688商品详情接口,1688API接口封装方法

要通过京东的API获取商品详情数据&#xff0c;您可以使用京东开放平台提供的接口来实现。以下是一种使用Java编程语言实现的示例&#xff0c;展示如何通过京东开放平台API获取商品详情&#xff1a; 首先&#xff0c;确保您已注册成为京东开放平台的开发者&#xff0c;并创建一…

【枚举区间】CF Edu10 C

Problem - C - Codeforces 题意&#xff1a; 思路&#xff1a; 应该反思一下这么典的思路为什么会想不到 枚举区间&#xff0c;一个很经典的套路是&#xff0c;枚举 l&#xff0c;对 r 计数 对于一个l&#xff0c;r取这么多限制中离 l 最近的那个 Code&#xff1a; #inclu…

基于Laravel通用型内容建站企业官网系统源码 可免费商用

是一个基于 Laravel 企业内容建站系统。模块市场拥有丰富的功能应用&#xff0c;支持后台一键快速安装&#xff0c;让开发者能快的实现业务功能开发。 系统完全开源&#xff0c;免费且不限制商业使用 2023年08月23日增加了以下12个特性&#xff1a; [新功能] 手机端Banner支持…