阻塞队列底层原理分析(一)

文章目录

    • 一、阻塞队列介绍
      • 1. Queue接口
      • 2. BlockingQueue接口
      • 3. 阻塞队列特性
    • 二、 ArrayBlockingQueue
      • 1. 简介
      • 3. 使用
      • 3. 底层原理
    • 三、LinkedBlockingQueue(使用最多的阻塞队列)
      • 1. 简介
      • 2. 使用
      • 3. 底层原理
      • 4. LinkedBlockingQueue与ArrayBlockingQueue对比
    • 四、LinkedBlockingDeque
      • 1. 简介
      • 2. 底层原理

一、阻塞队列介绍

1. Queue接口

它定义了队列的一些基本的操作规范,但这些方法都不会对队列进行阻塞

public interface Queue<E> extends Collection<E> {boolean add(E e);E remove();E poll();E element();E peek();
}

2. BlockingQueue接口

它继承了Queue接口,是队列的一种,阻塞队列对Queue接口进行了扩展:

  • take方法:队列空时,获取元素的线程会等待队列变为非空
  • put方法:队列满是,队列会阻塞插入元素的线程,直到队列不满
public interface BlockingQueue<E> extends Queue<E> {boolean add(E e);boolean offer(E e);void put(E e) throws InterruptedException;boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;E take() throws InterruptedException;E poll(long timeout, TimeUnit unit)throws InterruptedException;int remainingCapacity();boolean remove(Object o);boolean contains(Object o);int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);
}

在这里插入图片描述

3. 阻塞队列特性

阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字,所以下面重点介绍阻塞功能:阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。

  • take

take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。

  • put

put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。

  • 是否有界

阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。

  • 应用场景

BlockingQueue 是线程安全的,我们在很多场景下都可以利用线程安全的队列来优雅地解决我们业务自身的线程安全问题。比如说,使用生产者/消费者模式的时候,我们生产者只需要往队列里添加元素,而消费者只需要从队列里取出它们就可以了,如图所示:

在这里插入图片描述
因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问 题。生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从“你”转移到了“队列”上,降低了我们开 发的难度和工作量。同时,队列它还能起到一个隔离的作用。比如说我们开发一个银行转账的程序,那么生产者线程不需要关心具体的转账逻辑,只需要把转账任务,如账户和金额等信息放到队列中就可以,而不需要去关心银行这个类如何实现具体的转账业务。而作为银行这个类来讲,它会去从队列里取出来将要执行的具体的任务,再去通过自己的各种方法来完成本次转账。这样就实现了具体任务与执行任务类之间的解耦,任务被放在了阻塞队列中,而负责放任务的线程是无法直接访问到我们银行具体实现转账操作的对象的,实现了隔离,提高了安全性。

  • 常见的阻塞队列

在这里插入图片描述

二、 ArrayBlockingQueue

1. 简介

ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用 ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满, 大量生产线程被阻塞。使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发 场景下会成为性能瓶颈。

3. 使用

BlockingQueuequeue=newArrayBlockingQueue(1024);  queue.put("1");//向队列中添加元素
Objectobject=queue.take();//从队列中取出元素

3. 底层原理

利用了Lock锁的Condition通知机制进行阻塞控制。

//存放元素数组final Object[] items;
//取指针,take、poll、peek和remove操作都会使用这个指针int takeIndex;
//放指针,put、offer和add操作都会使用到这个指针int putIndex;
//定义了独占锁
final ReentrantLock lock;
//定了了两个条件队列,一个用于消费者等待一个用于生产者等待
private final Condition notEmpty;
private final Condition notFull;

put方法,是如何往阻塞队列中放置元素的

    public void put(E e) throws InterruptedException {Objects.requireNonNull(e);//定义了独占锁final ReentrantLock lock = this.lock;//加的可中断的锁lock.lockInterruptibly();try {//当元素满了while (count == items.length)//在notFull条件队列中等待notFull.await();//否则进行入队操作enqueue(e);} finally {//解锁lock.unlock();}}

下面看看enqueue(e)是如何实现真正入队的

   private void enqueue(E e) {//获得元素数组final Object[] items = this.items;//使用putIndex将元素放入队列中items[putIndex] = e;//当前已经满了if (++putIndex == items.length) //putIndex=0,有一种循环数组的味道了,主要是为了避免删除元素时,出现元素移位的情况(数组本身存在的问题)putIndex = 0;//元素数量+1count++;//唤醒消费者队列notEmpty.signal();}

take方法,是如何向阻塞队列中取元素的

 public E take() throws InterruptedException {//获取锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//队列为空,阻塞while (count == 0)notEmpty.await();//获取元素return dequeue();} finally {//解锁lock.unlock();}}

下面看看dequeue是如何获取元素的

private E dequeue() {//获取元素数组final Object[] items = this.items;@SuppressWarnings("unchecked")//获取takeindex位置的元素E e = (E) items[takeIndex];//删除takeindex地方的元素items[takeIndex] = null;//这里同样是循环数组的元素if (++takeIndex == items.length) takeIndex = 0;//数量-1count--;if (itrs != null)itrs.elementDequeued();//唤醒生产者线程notFull.signal();return e;}

三、LinkedBlockingQueue(使用最多的阻塞队列)

1. 简介

LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存, 则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。 LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

2. 使用

BlockingQueue<Integer>boundedQueue=newLinkedBlockingQueue<>(100); 
BlockingQueue<Integer>unboundedQueue=newLinkedBlockingQueue<>();

3. 底层原理

首先看看它采用的数据结构

//定义了一个静态内部类,代表我们链表上的节点
static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}//阻塞队列的容量
private final int capacity;
//当前队列的容量,是一个原子变量
private final AtomicInteger count = new AtomicInteger();
//头节点
transient Node<E> head;
//尾节点
private transient Node<E> last;
//取锁
private final ReentrantLock takeLock = new ReentrantLock();
//放锁
private final ReentrantLock putLock = new ReentrantLock();
//取等待队列
private final Condition notEmpty = takeLock.newCondition();
//放等待队列private final Condition notFull = putLock.newCondition();

put方法

public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();final int c;//创建一个新的节点final Node<E> node = new Node<E>(e);//获取放锁final ReentrantLock putLock = this.putLock;//获取当前阻塞队列中元素的数量final AtomicInteger count = this.count;//加中断锁putLock.lockInterruptibly();try {//当阻塞队列容量满了while (count.get() == capacity) {//此时放原属线程需要阻塞notFull.await();}//进行入队操作enqueue(node);//元素增加,这里使用原子变量的原因是放和取本身不是互斥的c = count.getAndIncrement();if (c + 1 < capacity)//当前元素数量c+1小于容量,唤醒阻塞的生产者线程(提高吞吐量)notFull.signal();} finally {//解锁putLock.unlock();}if (c == 0)//元素个数为1,唤醒消费者现场signalNotEmpty();}

enqueue方法

//一个简单的单链表尾部加元素的操作private void enqueue(Node<E> node) {last = last.next = node;}

take方法

public E take() throws InterruptedException {final E x;final int c;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {//队列为空,取线程阻塞while (count.get() == 0) {notEmpty.await();}//元素出队列x = dequeue();c = count.getAndDecrement();if (c > 1)//还有元素,可以提前唤醒消费者线程notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}
//一个单链表删除头部元素的逻辑private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}

4. LinkedBlockingQueue与ArrayBlockingQueue对比

LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和 ArrayBlockingQueue的不同点在于:

  • 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而 LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
  • 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而 LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
  • 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影 响。
  • 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而 LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

四、LinkedBlockingDeque

1. 简介

LinkedBlockingDeque是一个基于链表实现的双向阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,可以看做无界队列,但也可以设置容量限制,作为有界队列。相比于其他阻塞队列,LinkedBlockingDeque 多了 addFirst、addLast、peekFirst、peekLast 等方法。以first结尾的方法,表示插入、获取或移除双端队列的第一个元素。以 last 结尾的方法,表示插入、获取或移除双端队列的最后一个元素。但本质上并没有优化锁的竞争情况,因为不管是从队首还是队尾,都是在竞争同一把锁,只不过数据插入和获取的方式多了。

2. 底层原理

//链表节点
static final class Node<E> {E item;Node<E> prev;Node<E> next;Node(E x) {item = x;}}
//首节点
transient Node<E> first;
//尾节点
transient Node<E> last;
//只定义了一把锁
final ReentrantLock lock = new ReentrantLock();
//定义了两个条件等待队列private final Condition notEmpty = lock.newCondition();private final Condition notFull = lock.newCondition();

put方法

public void put(E e) throws InterruptedException {putLast(e);}

putLast方法

    public void putLast(E e) throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {while (!linkLast(node))notFull.await();} finally {lock.unlock();}}

linkLast实际链表尾部加元素的逻辑

 private boolean linkLast(Node<E> node) {// assert lock.isHeldByCurrentThread();if (count >= capacity)return false;Node<E> l = last;node.prev = l;last = node;if (first == null)first = node;elsel.next = node;++count;notEmpty.signal();return true;}

take方法

 public E take() throws InterruptedException {return takeFirst();}

takeFirst方法

public E takeFirst() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock();try {E x;while ( (x = unlinkFirst()) == null)notEmpty.await();return x;} finally {lock.unlock();}}

unlinkFirst实际删除元素方法

private E unlinkFirst() {// assert lock.isHeldByCurrentThread();Node<E> f = first;if (f == null)return null;Node<E> n = f.next;E item = f.item;f.item = null;f.next = f; // help GCfirst = n;if (n == null)last = null;elsen.prev = null;--count;notFull.signal();return item;}

上面take和put方法逻辑和LinkedBlockingQeque一样,都是队首删元素,队尾添加元素。但LinkedBlockingDeque是一个双向链表,所以它添加了许多其它方法,原理都差不多,这里就不详细介绍了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/269667.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

原创改进|多策略融合的改进蜣螂优化算法

作者在前段时间的一篇文章中介绍过了蜣螂优化算法(dung beetle optimizer&#xff0c;DBO)的原理及实现&#xff0c;该算法是由东华大学沈波教授团队在2022年提出[1]&#xff0c;其灵感来自蜣螂的滚球、跳舞、觅食、偷窃和繁殖行为这5种习性&#xff0c;其不同的子种群执行了不…

论文阅读三——端到端的帧到凝视估计

论文阅读三——端到端的帧到凝视估计 主要内容研究问题文章的解题思路文章的主要结构 论文实验关于端到端凝视估计的数据集3种基线模型与EFE模型的对比在三个数据集中与SOTA进行比较 问题分析重要架构U-Net 基础知识 主要内容 文章从端到端的方法出发&#xff0c;提出了根据he…

西南交通大学【数据结构实验8】

实验内容及要求&#xff1a; 编写控制台应用程序&#xff0c;提供以下菜单项&#xff1a; 插入元素 从键盘输入若干两两互不相同的非0整数&#xff0c;直到输入0时停止。将输入的所有非0整数按输入次序插入二叉排序树(初始时是空树)。 插入某个非0整数时&#xff0c;若该整…

MySQL笔记-第11章_数据处理之增删改

视频链接&#xff1a;【MySQL数据库入门到大牛&#xff0c;mysql安装到优化&#xff0c;百科全书级&#xff0c;全网天花板】 文章目录 第11章_数据处理之增删改1. 插入数据1.1 实际问题1.2 方式1&#xff1a;VALUES的方式添加1.3 方式2&#xff1a;将查询结果插入到表中 2. 更…

Latex/Overleafc插入eps格式图片不显示、png转eps等解决问题

最近在搞论文&#xff0c;有些程序生成的图片大部分为png格式&#xff0c;但很多期刊要求eps矢量图格式&#xff0c;特此整理下遇到的各种坑&#xff0c;虽然不算什么技术问题&#xff0c;但是确实也浪费了不少时间。。。 1. png转eps在线神器&#xff1a; 【cloudconvert】 …

Python从入门到精通九:Python异常、模块与包

了解异常 什么是异常 当检测到一个错误时&#xff0c;Python解释器就无法继续执行了&#xff0c;反而出现了一些错误的提示&#xff0c;这就是所谓的“异常”, 也就是我们常说的BUG bug单词的诞生 早期计算机采用大量继电器工作&#xff0c;马克二型计算机就是这样的。 19…

Redis生产实战-热key、大key解决方案、数据库与缓存最终一致性解决方案

生产环境中热 key 处理 热 key 问题就是某一瞬间可能某条内容特别火爆&#xff0c;大量的请求去访问这个数据&#xff0c;那么这样的 key 就是热 key&#xff0c;往往这样的 key 也是存储在了一个 redis 节点中&#xff0c;对该节点压力很大 那么对于热 key 的处理就是通过热…

【JVM从入门到实战】(四)类的生命周期

什么是类的生命周期 类的生命周期描述了一个类加载、连接、初始化、使用、卸载的整个过程 一个类完整的生命周期如下&#xff1a; 加载阶段 加载阶段第一步是类加载器根据类的全限定名通过不同的渠道以二进制流的方式获取字节码信息。 程序员可以使用Java代码拓展的不同的渠道…

CentOS7安装MySQL8.0

一、使用Yum安装 1. 使用wget下载MySQL的rpm包 wget https://repo.mysql.com//mysql80-community-release-el7-3.noarch.rpm2. 安装下载好的rpm包 yum localinstall mysql80-community-release-el7-3.noarch.rpm 3. 安装mysql&#xff08;该步可能出现问题&#xff09; yum…

vue项目中 CDN 是vue本身的依赖可以按需加载还是项目中所有的第三方库都可以按需加载?

这是我看到CDN简介时产生的问题 相信很多小伙伴会有 和我一样的疑问 在这里 我也统一回答一下 CDN&#xff08;内容分发网络&#xff09;是一种通过将数据分发到全球各个节点&#xff0c;以提供快速、可靠的内容传输的技术。在Vue项目中&#xff0c;CDN可以用于按需加载Vue本…

GEE:使用网格搜索法(Grid Search)求机器学习的最优参数或者参数组合

作者:CSDN @ _养乐多_ 本文记录了在 Google Earth Engine(GEE)平台中,计算机器学习分类算法最优参数的代码,其中包括单一参数的最优和不同参数组合的最优。使用的最优参数计算方法是网格搜索法(Grid Search),GEE 平台上并没有现成的网格搜索法 API,因此,本文在 GEE …

Android studio 无法查看源码

Android studio 查看源码时提示 Decompiled .class file,bytecode version:52.0(java 8) 1、检查 buildToolsVersion 2、检查相关资源文件