9、阻塞队列

一、什么是阻塞队列

**阻塞队列(BlockingQueue)是一种线程安全的队列**
  • 线程安全:多个线程可以同时对阻塞队列进行读写操作,而不会导致数据不一致或并发问题。
  • 阻塞操作:当生产者线程试图将元素放入已满的队列时,或者消费者线程试图从空队列中取出元素时,这些线程将会被自动阻塞,即进入等待状态,直到队列状态改变,允许进行相应的插入或移除操作。
  • 通知机制:阻塞队列内部采用条件变量和锁等机制来协调生产和消费,当队列状态变化(如非空或非满)时,会自动唤醒相应等待的线程。

二、阻塞队列的实现思路

1、数组作为数据结构:

  • 首先,创建一个固定大小的数组来存储元素。由于是阻塞队列,通常会设计为循环数组(环形缓冲区)以提高空间利用率。

2、头尾指针:

  • 维护两个指针,一个指向队列头部(出队位置),另一个指向队尾(入队位置)。当出队时更新头部指针,入队时更新尾部指针,并且要处理数组边界情况,使得指针能够绕回到数组开头继续使用。

3、线程安全:

  • 使用锁机制(如 synchronized 关键字或 java.util.concurrent.locks.Lock接口)确保在并发环境下对头尾指针以及数组元素的操作是原子性的。
  • 在Java中,还可以利用条件变量(如 Condition 对象)来控制线程在队列为空时等待生产者入队,在队列满时等待消费者出队。

4、容量检查:

  • 每次入队前检查队列是否已满,若满则阻塞入队操作;每次出队前检查队列是否为空,若空则阻塞出队操作。

5、阻塞和唤醒:

  • 当队列状态改变时,通过条件变量的 signal() 或 signalAll() 方法唤醒等待中的线程。

三、代码案例

创建一个公共接口

package queue;/*** 阻塞队列公共接口*/
public interface BlockingQueue<E> {/*** 向队列尾插入值** @param value 添加的值* @return 插入成功返回 true, 插入失败返回 false*/void enqueue(E value) throws InterruptedException;/*** 从队列头获取值, 并从队列中移除获取的值** @return 如果队列非空返回队列头值, 否则返回 null*/E dequeue() throws InterruptedException;/*** 检查队列是否为空** @return 空返回 true, 否则返回 false*/boolean isEmpty();/*** 检查队列是否已满** @return 满返回 true, 否则返回 false*/boolean isFull();
}

单锁案例

单锁案例代码

package queue;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/**1. 单锁阻塞队列2.  3. @param <E>*/
public class SingleLockBlockingQueue<E> implements BlockingQueue<E> {/*** 使用数组作为队列的数据结构*/private E[] array;/*** 队列头指针*/private int head;/*** 队列尾指针*/private int tail;/*** 队列元素个数*/private int size;/*** 定义一个Lock可重入锁*/private Lock lock;/*** 定义一个头指针线程类型(表示获取值的线程)*/private Condition headCondition;/*** 定义一个尾指针线程类型(表示添加值的线程)*/private Condition tailCondition;/*** @param capacity 设置队列的容量*/public SingleLockBlockingQueue(int capacity) {this.array = (E[]) new Object[capacity];this.head = 0;this.tail = 0;this.size = 0;this.lock = new ReentrantLock();this.headCondition = lock.newCondition();this.tailCondition = lock.newCondition();}/*** 向队列尾插入值** @param value 添加的值* @return 插入成功返回 true, 插入失败返回 false*/@Overridepublic void enqueue(E value) throws InterruptedException {lock.lockInterruptibly();//尝试获取锁。try {while (isFull()) {//如果队列已满的话,添加线程进入等待状态。(不要用if判断,否则会出现虚假唤醒)tailCondition.await();//添加线程进入等待状态,并且释放lock锁。}array[tail] = value;//将值写入队列tail = (tail + 1) % array.length; // 更新尾指针size++;headCondition.signal();//唤醒头指针线程类型的线程进行获取操作。} finally {lock.unlock();//释放锁。}}/*** 向队列尾插入值;* 超过等待时间就结束。** @param value   添加的值* @param timeout 超时时间:如果超过这个时间,表示线程不再继续等待。任务结束。* @return 插入成功返回 true, 插入失败返回 false*/public void enqueue(E value, long timeout) throws InterruptedException {lock.lockInterruptibly();//尝试获取锁。try {long t = TimeUnit.MILLISECONDS.toNanos(timeout);//毫秒转纳秒while (isFull()) {//如果队列已满的话,添加线程进入等待状态。(不要用if判断,否则会出现虚假唤醒)if (t <= 0) {return;}t = tailCondition.awaitNanos(t);//添加线程进入等待状态,并且释放lock锁。返回一个剩余等待时间}array[tail] = value;//将值写入队列tail = (tail + 1) % array.length; // 更新尾指针size++;headCondition.signal();//唤醒获取值的线程进行获取操作。} finally {lock.unlock();//释放锁。}}/*** 从队列头获取值, 并从队列中移除获取的值** @return 如果队列非空返回队列头值, 否则返回 null*/@Overridepublic E dequeue() throws InterruptedException {lock.lockInterruptibly();//尝试获取锁。如果没有获取到锁,则线程进入等待状态try {while (isEmpty()) { //如果队列为空的话,获取线程进入等待状态。(不要用if判断,否则会出现虚假唤醒)headCondition.await(); //获取线程进入等待状态,并且释放lock锁。}E e = array[head];//拿到值array[head] = null; //将拿到的值赋值为null;手动垃圾回收head = (head + 1) % array.length; //更新 head索引位置size--;tailCondition.signal(); //唤醒添加线程。return e;} finally {lock.unlock();//释放锁。}}/*** 检查队列是否为空** @return 空返回 true, 否则返回 false*/@Overridepublic boolean isEmpty() {return size == 0;}/*** 检查队列是否已满** @return 满返回 true, 否则返回 false*/@Overridepublic boolean isFull() {return size == array.length;}
}

单锁代码测试结果

1、创建一个阻塞队列对象。
2、创建一个获取线程去队列中获取值。
3、创建一个添加线程去队列中添加值。
4、将获取队列线程写在前面。演示队列为空时。获取线程进入等待状态状态。等添加线程执行完。然后再唤醒获取线程进行获取值。
在这里插入图片描述

单锁案例缺点

  1. 并发性能限制
    当只有一个锁时,任何时刻只能有一个线程进行入队或出队操作。这意味着在高并发场景下,生产者和消费者线程可能会因为争夺这把全局锁而产生不必要的等待,从而影响系统的整体吞吐量。

  2. 无条件唤醒
    使用单个条件变量(如 notEmpty 和 notFull)时,当一个线程被唤醒时,可能并不能立即执行其所需的操作。例如,当一个生产者线程因 notFull 条件变为真而被唤醒时,它需要重新获取锁才能入队,但在竞争过程中可能被其他线程抢占,导致再次陷入等待状态。

  3. 伪公平性问题
    单锁不能提供完全公平的调度策略,即使有多个线程都在等待,但操作系统决定哪个线程获得锁并执行操作时可能并没有遵循先来后到的原则。特别是在非公平锁的情况下,新到达的线程可能比已经等待很久的线程更快地获取到锁。

  4. 饥饿风险
    在极端情况下,如果总是有新的生产者或消费者线程不断地加入并获取到锁,可能导致某些已存在的等待线程长时间得不到执行机会,从而出现“饥饿”现象。

  5. 效率损失
    单锁模型下,即使队列中还有空间或者有元素可消费,但由于只有一个共享资源,仍然会导致所有线程在锁上发生串行化,降低了并行度。

  6. 分离锁优化缺失
    对于一些特殊的阻塞队列实现,比如 LinkedBlockingQueue,采用分离锁技术(两个独立的锁分别保护队列的头部和尾部),可以在一定程度上提高并发性能。单锁无法利用这种优化方式。

为了改善这些缺点,可以考虑使用更复杂的同步机制,如多锁、分离锁、CAS原子操作等技术来设计并发数据结构,以提高并发处理能力和避免上述问题。例如,在Java中,ConcurrentLinkedQueue基于无锁算法实现,而LinkedBlockingQueue则采用了两把锁分别控制生产和消费端的操作。

双锁案例

双锁代码案例

package queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** 双锁阻塞队列** @param <E>*/
public class DoubleLockBlockingQueue<E> implements BlockingQueue<E> {/*** 使用数组作为队列的数据结构*/private E[] array;/*** 队列头指针*/private int head;/*** 队列尾指针*/private int tail;/*** 队列元素个数* 多锁情况下。需要用原子类型,AtomicInteger 是线程安全的*/private AtomicInteger size;/*** 定义一个头指针Lock可重入锁*/private Lock headLock;/*** 定义一个头指针线程类型(表示获取值的线程)*/private Condition headCondition;/*** 定义一个尾指针Lock可重入锁*/private Lock tailLock;/*** 定义一个尾指针线程类型(表示添加值的线程)*/private Condition tailCondition;/*** @param capacity 设置队列的容量*/public DoubleLockBlockingQueue(int capacity) {this.array = (E[]) new Object[capacity];this.head = 0;this.tail = 0;this.size=new AtomicInteger(0);this.headLock = new ReentrantLock();this.headCondition = headLock.newCondition();this.tailLock = new ReentrantLock();this.tailCondition = tailLock.newCondition();}/*** 向队列尾插入值** @param value 添加的值* @return 插入成功返回 true, 插入失败返回 false*/@Overridepublic void enqueue(E value) throws InterruptedException {tailLock.lockInterruptibly();//尝试获取锁。int s;//记录size自增之前的值。try {while (isFull()) {//如果队列已满的话,添加线程进入等待状态。(不要用if判断,否则会出现虚假唤醒)tailCondition.await();//添加线程进入等待状态,并且释放lock锁。}array[tail] = value;//将值写入队列tail = (tail + 1) % array.length; // 更新尾指针s = size.getAndIncrement();//相当于size++;getAndIncrement()方法会返回size自增之前的值//如果size在自增后仍然没有把队列添加满。那么由添加线程唤醒其他的添加线程(否则就由获取方法中的获取线程唤醒添加线程)。if (s + 1 < array.length) {tailCondition.signal();}} finally {tailLock.unlock();//释放锁。}//如果添加线程,在没有添加之前,队列是空的,那么需要当前添加线程去唤醒取值线程if (s == 0) {//由于锁必须配对使用了。所以在唤醒头指针线程类型的线程时,必须先获取锁-》唤醒线程-》释放锁。headLock.lockInterruptibly();try {headCondition.signal();//唤醒获取值线程进行获取操作。} finally {headLock.unlock();}}}/*** 从队列头获取值, 并从队列中移除获取的值** @return 如果队列非空返回队列头值, 否则返回 null*/@Overridepublic E dequeue() throws InterruptedException {headLock.lockInterruptibly();//尝试获取锁。如果没有获取到锁,则线程进入等待状态E e;int s;//记录size自减之前的值。try {while (isEmpty()) { //如果队列为空的话,获取线程进入等待状态。(不要用if判断,否则会出现虚假唤醒)headCondition.await(); //获取线程进入等待状态,并且释放lock锁。}e = array[head];//拿到值array[head] = null; //将拿到的值赋值为null;手动垃圾回收head = (head + 1) % array.length; //更新 head索引位置s = size.getAndDecrement();//相当于size--;getAndDecrement()方法会返回size自减之前的值。//如果队列再取值前还有队列中还有2个或以上的值。那么由当前获取线程唤醒其他获取线程if (s > 1) {headCondition.signal();}} finally {headLock.unlock();//释放锁。}//如果队列在当前取值线程在取值前,队列中的值是满的,那么需要当前取值线程唤醒添加线程。if (s == array.length) {//由于锁必须配对使用了。所以在唤醒尾指针线程类型的线程时,必须先获取锁-》唤醒线程-》释放锁。tailLock.lockInterruptibly();try {tailCondition.signal(); //唤醒添加线程。} finally {tailLock.unlock();}}return e;}/*** 检查队列是否为空** @return 空返回 true, 否则返回 false*/@Overridepublic boolean isEmpty() {return size.get() == 0;}/*** 检查队列是否已满** @return 满返回 true, 否则返回 false*/@Overridepublic boolean isFull() {return size.get() == array.length;}
}

双锁队列代码注意事项

1、 由于用的是双锁,所以size必须要用原子对象类型。

将一个 AtomicInteger 变量 size 赋值为0,你可以使用 set 方法或者直接在初始化时传入0。以下是两种方法的示例:

AtomicInteger size = new AtomicInteger(0);

或者

AtomicInteger size = new AtomicInteger(); // 默认初始值为 0,也可以是其他数值
size.set(0);

获取值是:

AtomicInteger size = new AtomicInteger(); // 默认初始值为 0,也可以是其他数值
int s = size.get();

在这里插入图片描述

2、 lock锁必须是获取锁和解锁一起配对使用。,唤醒线程也是一样。必须在获取锁和解锁中间,不能单独使用。

在这里插入图片描述
在这里插入图片描述
3. 由于添加方法中有释放获取线程的代码,反之 获取方法有释放添加线程的代码。所以要防止出现死锁。解决的方法是:
不要把【唤醒获取线程的代码】写在添加代码的锁之内。
不要把【唤醒添加线程的代码】写在获取代码的锁之内。
把它们上下依次编写

添加方法错误示范
在这里插入图片描述
添加方法的正确写法
在这里插入图片描述

获取方法的错误示范
在这里插入图片描述

获取方法的正确写法
在这里插入图片描述

3、优化锁定性能。(通过线程之间的级联通知)

级联通知:一个线程完成任务释放资源后唤醒其他等待的线程,被唤醒的线程继续执行又可能导致其他线程被唤醒,这种间接影响可以类比为级联通知。

  1. 如果队列在添加之后仍然没有把队列添加满。那么由添加线程唤醒其他的添加线程(否则就由获取方法中的获取线程唤醒添加线程)。如果在添加之前,队列是空的,那么需要当前添加线程去唤醒取值线程
    在这里插入图片描述

  2. 如果队列再取值前还有队列中还有2个或以上的值。那么由当前获取线程唤醒其他获取线程;如果队列在当前取值线程在取值前,队列中的值是满的,那么需要当前取值线程唤醒添加线程。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/441379.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Meteor Lake上测试基于Stable Diffusion的AI应用

上个月刚刚推出的英特尔新一代Meteor Lake CPU&#xff0c;预示着AI PC的新时代到来。AI PC可以不依赖服务器直接在PC端处理AI推理工作负载&#xff0c;例如生成图像或转录音频。这些芯片的正式名称为Intel Core Ultra处理器&#xff0c;是首款配备专门用于处理人工智能任务的 …

LeetCode —— 43. 字符串相乘

&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️Take your time ! &#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️…

【国产MCU】-CH32V307-GPIO控制:输入与输出

GPIO控制:输入与输出 文章目录 GPIO控制:输入与输出1、GPIO简单介绍2、驱动API介绍3、GPIO配置代码实现3.1 GPIO配置为输出3.2 GPIO配置为输入CH32V307的GPIO口可以配置成多种输入或输出模式,内置可关闭的上拉或下拉电阻,可以配置成推挽或开漏功能。GPIO口还可以复用成其他…

Java面试架构篇【一览众山小】

文章目录 &#x1f6a1; 简介☀️ Spring&#x1f425; 体系结构&#x1f420; 生命周期 &#x1f341; SpringMVC&#x1f330; 执行流程 &#x1f31c; SpringBoot&#x1f30d; 核心组件&#x1f38d; 自动装配&#x1f391; 3.0升级 &#x1f505; spring Cloud Alibaba&am…

Redis缓存穿透、缓存击穿、缓存雪崩的解决方案

一、背景 不管是实际工作还是面试&#xff0c;这3个问题都是非常常见的&#xff0c;今天我们就好好探讨一下这个三个问题的解决方案 三者的区别&#xff1a; 缓存穿透&#xff1a;查询缓存和数据库都不存在的数据&#xff0c;缓存没有&#xff0c;数据库也没有 缓存击穿&#…

系统架构设计师-22年-下午题目

系统架构设计师-22年-下午题目 更多软考知识请访问 https://ruankao.blog.csdn.net/ 试题一必答&#xff0c;二、三、四、五题中任选两题作答 试题一 (25分) 说明 某电子商务公司拟升级其会员与促销管理系统&#xff0c;向用户提供个性化服务&#xff0c;提高用户的粘性。…

redis 极简分布式锁实现

写在前面 工作中遇到&#xff0c;整理 reids 做简单分布式锁的思考博文适合刚接触 redis 的小伙伴理解不足小伙伴帮忙指正 对每个人而言&#xff0c;真正的职责只有一个&#xff1a;找到自我。然后在心中坚守其一生&#xff0c;全心全意&#xff0c;永不停息。所有其它的路都是…

(一)PySpark3:安装教程及RDD编程(非常详细)

目录 一、pyspark介绍 二、PySpark安装 三、RDD编程 1、创建RDD 2、常用Action操作 ①collect ②take ③takeSample ④first ⑤count ⑥reduce ⑦foreach ⑧countByKey ⑨saveAsTextFile 3、常用Transformation操作 ①map ②filter ③flatMap ④sample ⑤d…

从比亚迪的整车智能战略,看王传福的前瞻市场布局

众所周知&#xff0c;作为中国新能源汽车的代表企业&#xff0c;比亚迪在中国乃至全球的新能源汽车市场一直都扮演着引领者的角色。2024年新年伊始&#xff0c;比亚迪又为新能源汽车带来了一项重磅发布。 整车智能才是真智能 近日&#xff0c;在“2024比亚迪梦想日”上&#xf…

webassembly003 TTS BARK.CPP-02-bark_tokenize_input(ctx, text);

bark_tokenize_input函数 bark是没有语言控制选项的&#xff0c;但是官方的版本无法运行中文bark_tokenize_input会调用bert_tokenize函数&#xff0c;bark_tokenize_input函数对中文分词失效&#xff0c;也就是导致不支持中文的原因。 void bark_tokenize_input(struct bark_…

jenkins部署(docker)

docker部署&#xff0c;避免安装tomcat 1.拉镜像 docker pull jenkins/jenkins2.宿主机创建文件夹 mkdir -p /lzp/jenkins_home chmod 777 /lzp/jenkins_home/3.启动容器 docker run -d -p 49001:8080 -p 49000:50000 --privilegedtrue -v /lzp/jenkins_home:/var/jenkins_…

2401Idea用GradleKotlin编译Java控制台中文出乱码解决

解决方法 解决方法1 在项目 build.gradle.kts 文件中加入 tasks.withType<JavaCompile> {options.encoding "UTF-8" } tasks.withType<JavaExec> {systemProperty("file.encoding", "utf-8") }经测试, 只加 tasks.withType<…