【Java 集合】ArrayBlockingQueue

ArrayBlockingQueue, 顾名思义: 基于数组的阻塞队列, 位于 JUC (java.util.concurrent) 下, 是一个线程安全的集合, 其本身具备了

  1. 不支持 null 元素: 存入 null 元素会抛出异常
  2. 固定容量: 在初始化时需要指定一个固定的容量大小。这意味着一旦队列达到最大容量,将不再接受新的元素,直到有消费者取出元素为止
  3. 有序性: 内部采用数组作为底层数据结构,保持了元素的有序性。这意味着当你向队列中添加元素时,它们将按照添加的顺序排列,而消费者线程将按照相同的顺序取出这些元素
  4. 阻塞特性:ArrayBlockingQueue 会在队列满时, 阻塞添加数据的线程直至队列非满状态, 同样, 在队列空时, 阻塞获取数据的线程直至队列重新非空
  5. 支持锁公平性配置: 在初始化时可以指定是否使用公平锁, 默认为非公平锁。公平锁通常会降低吞吐量, 但是减少了可变性和避免了线程饥饿问题

1 实现的数据结构

通常, 队列的实现方式有数组和链表两种方式。

ArrayBlockingQueue 的选择数组作为自己底层的数据结构, 同时通过维护 1 个队头指针 + 1 个队尾指针指针, 达到数据的入队和出队操作。
同时他内部对数组的使用做了一些小心思, 使得入队和出队操作都可以在 O(1) 的时间内完成。

入队涉及到的是数组的添加数据, 同理, 出队涉及到的是数组的删除数据。
而数组的删除操作, 通常的步骤是

  1. 删除数组中的某个元素
  2. 将数组中删除元素后的所有元素往前移动一个位置

而 ArrayBlockingQueue 的是队列, 出队固定的第一个节点, 也就是数组的第一个, 所以内部通过维护的 2 个指针, 简化了数组删除的操作

  1. 删除数组中的某个元素
  2. 将头指针指向下一个元素, 如果头指针指向了数组的最后一个元素, 那么将头指针重新指向数组的第一个元素,
  3. 数组的添加也是到了数组的最后一个元素, 重新回到数组的头部, 通过这种方式避免了删除时, 对数组元素的移动

Alt 'ArrayBlockingQueue 内部数组指针移动方向'

2 源码分析

分析完了 ArrayBlockingQueue 的逻辑实现方式, 下面我们通过源码的形式, 更进一步的了解 ArrayBlockingQueue 的实现。

2.1 ArrayBlockingQueue 持有的属性

public class ArrayBlockingQueue<E> {// 队列的底层实现结构, 数组final Object[] items;// 当前队列的队头指针int takeIndex;// 当前队列的队尾指针int putIndex;// 队列中的元素个数int count;// 用于并发控制的可重入锁final ReentrantLock lock;// 并发时的两种状态// 非空等待条件 (内部实际就是一个队列, 所以可以理解为一个等待队列), 也就是数组中重新有数据了, 可以继续取数据了// 当某个线程尝试从当前的队列中获取元素时, 如果数组中没有数据, 会把这个线程放到这个等待条件中// 在另外一个线程中添加元素到数组中,数组变为非空状态, 会唤醒等待在这个等待条件中的线程private final Condition notEmpty;// 非满等待条件 (内部实际就是一个队列, 所以可以理解为一个等待队列), 也就是数组中的数据没有达到上限, 可以继续添加数据// 当某个线程尝试向当前的队列添加元素, 但是当前数组已经满了, 会把这个线程放到这个等待条件中// 在另一个线程中从当前队列中获取一个元素时, 数组变为非满状态, 会唤醒等待在这个等待条件中的线程private final Condition notFull;
}

items 是一个数组, 用来存放入队的数据, count 表示队列中元素的个数。takeIndex 和 putIndex 分别代表队头和队尾指针。

2.2 ArrayBlockingQueue 构造函数

public class ArrayBlockingQueue<E> {// 指定容量构造函数public ArrayBlockingQueue(int capacity) {// 调用自带的指定容量和锁公平性配置的构造函数, 默认为非公平的this(capacity, false);}// 指定容量和锁公平性配置的构造函数public ArrayBlockingQueue(int capacity, boolean fair) {// 容量小于等于 0, 直接抛异常if (capacity <= 0)throw new IllegalArgumentException();// 声明数组    this.items = new Object[capacity];// 创建可重入锁, 锁公平性由参数配置lock = new ReentrantLock(fair);// 获取非空等待条件notEmpty = lock.newCondition();// 获取非满等待条件notFull =  lock.newCondition();}// 指定容量, 公平性和初始元素的构造函数    public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {// // 指定容量和锁公平性配置的构造函数this(capacity, fair);// 上锁lock.lock(); try {int i = 0;try {// 依次遍历入参的集合, 添加到当前的队列中for (E e : c) {// 队列中的数据不能为空checkNotNull(e);items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}// 算出队列中的元素个数count = i;// 下次添加元素的位置  =  当前队列中的元素个数等于容量上限了 ? 0 (重新回到队头) : 元素的个数 (当前队列的队尾)putIndex = (i == capacity) ? 0 : i;} finally {// 释放锁lock.unlock();}}
}

三个构造函数

指定队列大小的非公平锁构造函数
指定队列大小和锁公平性的构造函数
指定队列大小, 锁公平性和初始元素的构造函数

在第三个函数中, 将入参的集合元素依次添加到当前的队列的过程前, 先使用了 ReentrantLock 来加锁, 再把传入的集合元素按顺序一个个放入 items 中, 这个加锁的操作有必要吗?
一个实例的构造函数不可能存在并发调用的, 那么这个锁的作用是什么呢?

在 Happens-Before 规则中, 有一条监视器锁规则 (Monitor Lock Rule), 简单理解就是线程 A 加锁, 做了数据变更, 线程 A 解锁, 线程 B 加上同一个锁, 这时线程 A 做的变更对线程 B 都是可见的。
创建 ArrayBlockingQueue 的线程是加锁初始 ArrayBlockingQueue 的属性, 后面线程调用 ArrayBlockingQueue 的其他方法时, 都会遇到这个锁, 就会获取到最新的数据。

既然为了可见性, 为什么不使用 volatile 修饰 items 数组呢?
这就涉及到 volatile 的特性了, volatile 修饰的变量, 只能保证可见性, 而这里的 items 数组是一个引用类型, 如果对 items 的引用做了修改 (比如重新赋值, 置为空),
那么其他的线程可以感知到, 但是修改数组里面的数据, volatile 不会保证他们的可见性。

2.3 ArrayBlockingQueue 支持的方法

2.3.1 数据入队方法

ArrayBlockingQueue 提供了多种入队操作的实现来满足不同情况下的需求, 入队操作有如下几种:

  1. boolean add(E e)
  2. void put(E e)
  3. boolean offer(E e)
  4. boolean offer(E e, long timeout, TimeUnit unit)

add(E e)


public class ArrayBlockingQueue<E> extends AbstractQueue<E> {public boolean add(E e) {return super.add(e);}// super.add(E e), 父级的 add 方法, 也就是 AbstractQueue 方法public boolean add(E e) {// 调用自身的 offer     if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
}

可以看到 add 方法调用的是父类, 也就是 AbstractQueue 的 add 方法, 而 AbstractQueue 的 add 方法又重新调用会子类的 offer 方法。

offer(E e)

顺着 add 方法, 看一下 offer 方法:


public class ArrayBlockingQueue<E> {public boolean offer(E e) {// 非空校验, 为空会抛出一个异常checkNotNull(e);final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 当前存储数据的数组的长度 == 存储的数组元素个数的, 达到上限了// 直接返回 falseif (count == items.length)return false;else {// 调用自身的 enqueue 方法将元素添加到队列中enqueue(e);return true;}} finally {lock.unlock();}}private void enqueue(E x) {final Object[] items = this.items;// 将当前的元素添加到数组的 putIndex 位置items[putIndex] = x;// putIndex + 1 后如果已经等于当前数组的长度了, 也就是达到了数组的尾部最后一个了, 直接将 putIndex 设置为 0// 下次添加元素的位置从 0 开始if (++putIndex == items.length)putIndex = 0;// 元素个数 + 1    count++;// 队列里面又有数据了, 非空了, 唤醒在 notEmpty 里面等待的线程notEmpty.signal();}
}

offer 方法的逻辑很简单

入参非空校验
加锁
队列中的元素达到上限, 直接返回 false, 并释放锁
队列中的元素达未到上限, 将元素添加到队列中, 唤醒在非空等待条件中等待的线程, 返回 ture, 并释放锁

在 enqueue 方法中, 将元素放到队列后, 会计算下次元素存放的位置, 这个计算过程实际就是一个取模操作, 当下一个元素存放的位置超过了队列的长度, 那么将元素重新存放到队列的头部, 也就是我们上面说的指针回到数组头部。

offer(E e, long timeout, TimeUnit unit)

offer(E e, long timeout, TimeUnit unit) 方法只是在 offer(E e) 的基础上增加了超时时间的概念。在队列上阻塞了多少时间后, 队列还是满的, 就返回。


public class ArrayBlockingQueue<E> {public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {// 先进行非空校验checkNotNull(e);// 把超时时间转换成纳秒long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;// 获取一个可中断的互斥锁lock.lockInterruptibly();try {// while 循环的目的是防止在中断后没有到达传入的 timeout 时间, 可以继续重试// 当数组的元素个数等于数组的长度了, 达到上限了, 先进入阻塞while (count == items.length) {// 已经达到超时时间了, 直接返回 false, 结束if (nanos <= 0)return false;// 将当前线程阻塞在 非满 等待条件上 nanos 纳秒// 唤醒后, 返回剩余的等待时间 (可被中断)nanos = notFull.awaitNanos(nanos);}// 入队操作enqueue(e);return true;} finally {lock.unlock();}}
}

该方法利用了 Condition 的 awaitNanos 方法, 等待指定时间, 因为该方法可中断, 所以这里利用 while 循环来处理中断后还有剩余时间的问题, 等待时间到了以后数组非满时, 可以调用 enqueue 方法放入队列。

put(E e)

public class ArrayBlockingQueue<E>  {public void put(E e) throws InterruptedException {// 非空校验checkNotNull(e);final ReentrantLock lock = this.lock;// 可中断锁获取lock.lockInterruptibly();try {// 当数组的元素个数 等于数组的长度了, 达到上限了, 进入阻塞等待唤醒while (count == items.length)notFull.await();// 入队操作    enqueue(e);} finally {lock.unlock();}}
}

put() 方法在 count 等于 items 长度时, 即队列已经满时, 进入阻塞, 然后一直等待, 直到被其他线程唤醒。唤醒后调用 enqueue 方法放入队列。

2.3.2 数据出队方法

同入队的方法一样, 出队也有多种实现, ArrayBlockingQueue 提供了好几种出队的方法, 大体如下:

  1. E poll()
  2. E poll(long timeout, TimeUnit unit)
  3. E take()

poll()

public class ArrayBlockingQueue<E>  {public E poll() {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 如果当前数组的元素个数为 0, 直接返回 null,// 否则调用 dequeue 方法获取一个元素返回return (count == 0) ? null : dequeue();} finally {lock.unlock();}}private E dequeue() {final Object[] items = this.items;// 获取 taskIndex 位置的元素, 同时将 taskIndex 位置置为 null@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;// taskIndex + 1 后, 如果等于数组的长度, 达到了数组的长度, 将 taskIndex 置为 0, 从头开始if (++takeIndex == items.length)takeIndex = 0;// 元素个数 - 1    count--;// 迭代器不为空, 也要进行元素的弹出 (这里可以先暂时不处理)if (itrs != null)itrs.elementDequeued();// 唤醒在 notFull 等待条件上的线程notFull.signal();return x;}
}

poll() 如果队列没有元素返回 null, 否则调用 dequeue() 方法把队头的元素出队列。
dequeue 会根据 takeIndex 获取到该位置的元素, 并把该位置置为 null, 然后将队头的指针指向下一个元素, 当当前指针已经在数组的最后一个元素, 则重新回到数组的头部, 最后唤醒 notFull 等待条件中的线程。

poll(long timeout, TimeUnit unit)

该方法是 poll() 的可配置超时等待方法。
和入队方法 offer() 方法一样, 使用 while 循环 + Condition 的 awaitNanos 来进行等待, 等待时间到后, 队列有数据, 就执行 dequeue 获取元素。


public class ArrayBlockingQueue<E>  {public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 转换为纳秒long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;// 获取可中断的锁lock.lockInterruptibly();try {// 数组的元素个数为 0 while (count == 0) {// 超过了等待时间了, 返回 nullif (nanos <= 0)return null;// 带超时的的等待nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}
}

take()


public class ArrayBlockingQueue<E>  {public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 当前数组的容量为 0, 将当前线程阻塞在 notEmpty 的等待条件中, 等待唤醒while (count == 0)notEmpty.await();// 线程唤醒了, 调用 dequeue 获取数据    return dequeue();} finally {lock.unlock();}}
}

take() 方法和 put() 方法类似, 当队列为空时, 进入阻塞, 一直等待, 直到被唤醒, 唤醒后调用 dequeue() 方法获取队列中的元素。

2.3.3 获取元素方法

获取数据的方法就一个。

peek()

public class ArrayBlockingQueue<E>  {public E peek() {final ReentrantLock lock = this.lock;// 尝试获取锁lock.lock();try {// 直接返回当前数组的 takeIndex 位置的元素, 也就是队头, 可能为空return itemAt(takeIndex);} finally {lock.unlock();}}final E itemAt(int i) {return (E) items[i];}}

这里获取元素时上锁是为了避免脏数据的产生。

2.3.4 删除元素方法

因为删除元素是指定元素删除,删除的位置不确定,所以只能像普通的数组删除一样, 对应位置的元素删除后, 后面的元素向前移动一个位置。

remove(Object o)


public class ArrayBlockingQueue<E> {public boolean remove(Object o) {// 需要删除的元素为空, 直接返回 falseif (o == null)return false;final Object[] items = this.items;final ReentrantLock lock = this.lock;lock.lock();try {// 数组里面没有数据, 可以直接不处理if (count > 0) {final int putIndex = this.putIndex;int i = takeIndex;// 从 takeIndex 一直遍历到 putIndex, (遍历途中, 如果到了数组的尾部, 就从 0 继续开始) // 直到找到和元素 o 相同的元素, 调用 removeAt 进行删除do {if (o.equals(items[i])) {removeAt(i);return true;}if (++i == items.length)i = 0;} while (i != putIndex);}} finally {lock.unlock();}}void removeAt(final int removeIndex) {final Object[] items = this.items;// 移除的位置刚好是 taskIndex, 也就是数组的头部if (removeIndex == takeIndex) {// 直接将 taskIndex 置为 nullitems[takeIndex] = null;// taskIndex + 1 后等于数组的长度, 达到了尾部了, 回到头部if (++takeIndex == items.length)takeIndex = 0;// 元素个数 - 1    count--;// 迭代器不为空, 进行迭代器的元素删除if (itrs != null)itrs.elementDequeued();} else {final int putIndex = this.putIndex;// 将 removeIndex 到 putIndex 间所有的元素都向前移动一位, 移动到尾部了, 就从 0 继续开始for (int i = removeIndex;;) {// 从下一个位置开始int next = i + 1;// 下一个位置为数组的尾部了, 从 0 继续开始if (next == items.length)next = 0;// 当前要处理的位置 i 的下一个位置不等于 putIndex if (next != putIndex) {// 将当前的位置 i 的值修改为下一个位置的值items[i] = items[next];// 更新需要处理的位置为下一个位置i = next;} else {// 当前要处理的位置 i 的下一个位置为 putIndex // 将当前位置置为 nullitems[i] = null;// 当前的 putIndex = 当前的位置this.putIndex = i;// 跳出循环break;}}// 元素个数减 1count--;// 迭代器不为空, 进行迭代器的元素删除if (itrs != null)itrs.removedAt(removeIndex);}// 唤醒等待在 notFull 上的线程notFull.signal();}
}

remove 整体的逻辑比较简单, 从 takeIndex 开始一直遍历到 putIndex, 直到找到和元素 o 相同的元素, 调用 removeAt 方法进行删除。

而 removeAt 方法的处理方式分为两种情况来考虑

  1. removeIndex == takeIndex, 这时后面的元素不需要往前移, 而只需要把 takeIndex 的指向下一个元素即可
  2. removeIndex != takeIndex, 这时通过 putIndex 将 removeIndex 后的元素往前移一位

3 总结

它是 BlockingQueue 接口的一种实现,通过固定大小的数组来存储元素,
同时借助 ReentrantLock 和 ReentrantLock 的 Condition 提供了阻塞操作,使得在队列已满或为空时,线程能够安全地等待。
内部借助头尾 2 个指针的移动达到一种循环数组的效果, 避免了整个元素删除时, 数组需要将后面的元素迁移的操作。

4 参考

【细谈Java并发】谈谈ArrayBlockingQueue

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/284858.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人工智能原理课后习题(考试相关的)

文章目录 问答题知识表示一阶谓词逻辑表示法语义网络表示法 确定推理谓词公式永真和可满足性内容归结演绎推理 不确定推理主观贝叶斯可信度方法证据理论 搜索策略机器学习 问答题 什么是人工智能&#xff1f; 人工智能就是让机器看起来像人类表现出的智能水平一样 人工智能就是…

磁盘及文件系统(上)

这次博客我们将重点理解Ext2文件系统。 首先我们要理解什么是文件系统。 在之前我们一直理解的文件都是一个被打开的文件&#xff0c;而os为了能够管理这样的文件创建了struct_file这样的结构体对象在内核中描述被打开的文件&#xff0c;这个结构体对象中包含了被打开文件的基…

9、ble_mesh基础

node节点&#xff0c;不属于网络的设备称为未配置设备。未配置的设备无法发送或接收网格消息&#xff1b;但是&#xff0c;它会向 Provisioners 宣传其存在。 Provisioner供应&#xff0c;验证&#xff0c;邀请&#xff0c;加入网络成为节点。 一个节点有多个控制或开关&#x…

音视频学习(二十一)——rtmp收流(tcp方式)

前言 本文主要介绍rtmp协议收流流程&#xff0c;在linux上搭建rtmp服务器&#xff0c;通过自研的rtmp收流库发起取流请求&#xff0c;使用ffmpegqt实现视频流的解码与播放。 关于rtmp协议基础介绍可查看&#xff1a;https://blog.csdn.net/www_dong/article/details/13102607…

webpack学习-6.缓存

webpack学习-6.缓存 1.前言2.输出文件的文件名3. 提取引导模板4.模块标识符5.总结 1.前言 webpack 会在打包后生成可部署的 /dist 目录&#xff0c;并将打包后的内容放在此目录。一旦 /dist 目录中的内容部署到服务器上&#xff0c;客户端&#xff08;通常是浏览器&#xff09…

一文搞懂系列——DBC数据库信号解析规则及案例

背景 最近在项目中&#xff0c;同事遇到了一个dbc数据库解析错误的问题&#xff1a;基于ekuiper 对can报文解析&#xff0c;发现实际输出结果与预期差距较大。当时他第一反应是ekuiper的解析规则有误&#xff0c;因此就没有跟踪下去了。因为之前我用过ekuiper的CAN报文解析功能…

采购oled屏幕,应注意什么

在采购OLED屏幕时&#xff0c;应注意以下几点&#xff1a; 规格和参数&#xff1a;了解OLED屏幕的规格和参数&#xff0c;包括尺寸、分辨率、亮度、对比度、响应时间等。确保所采购的屏幕符合项目的需求和预期效果。 品质和可靠性&#xff1a;选择具有可靠品质和稳定性的OLED屏…

Shell三剑客:sed(命令)二

一、插入命令&#xff1a;i&#xff08;之前&#xff09; [rootlocalhost ~]# sed -r 2i aaaaaaa passwd.txt root:x:0:0:root:/root:/bin/bash aaaaaaa bin:x:1:1:bin:/bin:/sbin/nologin[rootlocalhost ~]# sed -r 2i aaaaaaa\ > bbb\ > ccc passwd.txt root:x:0:0:r…

Stable-Diffusion|从图片反推prompt的工具:Tagger(五)

stable-diffusion-webui-wd14-tagger 前面几篇&#xff1a; Stable-Diffusion|window10安装GPU版本的 Stable-Diffusion-WebUI遇到的一些问题&#xff08;一&#xff09; 【Stable-Diffusion|入门怎么下载与使用civitai网站的模型&#xff08;二&#xff09;】 Stable-Diffusi…

【深度强化学习】确定性策略梯度算法 DDPG

前面讲到如 REINFORCE&#xff0c;Actor-Critic&#xff0c;TRPO&#xff0c;PPO 等算法&#xff0c;它们都是随机性策略梯度算法&#xff08;Stochastic policy&#xff09;&#xff0c;在广泛的任务上表现良好&#xff0c;因为这类方法鼓励了算法探索&#xff0c;给出的策略是…

档案数字化管理可以提供什么服务?

档案数字化管理提供了便捷、高效和安全的档案管理服务&#xff0c;帮助组织更好地管理和利用自己的档案资源。 具体来说&#xff0c;专久智能档案数字化管理可以提供以下服务&#xff1a; 1. 档案扫描和数字化&#xff1a;将纸质档案通过扫描仪转换为数字格式&#xff0c;包括文…

Convolutional Neural Network(CNN)——卷积神经网络

1.NN的局限性 拓展性差 NN的计算量大性能差&#xff0c;不利于在不同规模的数据集上有效运行若输入维度发生变化&#xff0c;需要修改并重新训练网络容易过拟合 全连接导致参数量特别多&#xff0c;容易过拟合如果增加更多层&#xff0c;参数量会翻倍无法有效利用局部特征 输入…