读读源码-java中的阻塞队列blockingqueue

阻塞队列是一个在队列基础上又新增了两个附加操作的队列,用于解耦

支持阻塞的插入方法:队列满的时候,队列会阻塞插入元素的线程,直到队列不满。

支持阻塞的移除方法:队列空的时候,获取元素的线程会等待队列变为非空

 

blockingqueue继承queue,属于juc包。

boolean add(E e);boolean offer(E e);
boolean offer(E e, long timeout, TimeUnit unit)  throws InterruptedException;void put(E e) throws InterruptedException;E take() throws InterruptedException;E poll(long timeout, TimeUnit unit)  throws InterruptedException;boolean remove(Object o);

add与offer、remove方法是无阻塞的,是queue本身的接口。(add和offer的区别就是add在队列满的时候会抛异常,而offer则是返回一个false)

put与take、poll是阻塞的,都抛出了阻塞中断异常。(take和poll的区别就是take不论等待多久都要把元素从队列中移除,而poll会根据等待时间选择是否放弃)

offer也有阻塞方法,其中timeout设置的多少时间内还没有加入队列则抛弃该元素

 

ArrayBlockingQueue

数组必须指定大小,入队出队都会被锁上(也就是说他在只有入队高并发/出队高并发的时候表现比较好,同时高并发表现不好)

与queue不同的特点是一把锁和两个条件标识notFull和notEmpty

数据结构

    /** The queued items */final Object[] items;/** items index for next take, poll, peek or remove */int takeIndex;/** items index for next put, offer, or add */int putIndex;/** Number of elements in the queue */int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*//** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;

比普通的queue,他还多了一把可重入锁,两个条件(分别用于take和put)

 

构造方法,他的fair默认为false(非公平模式,即线程获取锁的顺序不固定。此外公平模式即FIFO,谁先请求的谁先获取锁的访问权)

    public ArrayBlockingQueue(int capacity) {this(capacity, false);}

 

put方法

    public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}

 入队完成后通知当前队列已是非空状态,在队列满的时候通过将下一指针指向0来实现循环数组,提高空间利用率

    private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}

 

take方法,这里的可重入锁和put用的是同一个

    public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}

出队原理同类似于入队

private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}

 

LinkedBlockingQueue

两把锁分别控制队头队尾,put与put之间会互斥,take与take之间也会互斥,而put和take不互斥。

和array相比,容量近乎无限(整数最大值)

put会通知take,take也会通知put。put非满/take非空也会通知其他的put/take

数据结构

    static class Node<E> {E item;/*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*/Node<E> next;Node(E x) { item = x; }}/** The capacity bound, or Integer.MAX_VALUE if none */private final int capacity;/** Current number of elements */private final AtomicInteger count = new AtomicInteger();/*** Head of linked list.* Invariant: head.item == null*/transient Node<E> head;/*** Tail of linked list.* Invariant: last.next == null*/private transient Node<E> last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();

构造方法(可不设置容器大小,默认为整数最大值)

public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}

这里的put、take类似于ArrayBlockingQueue,不同的是这里的当前容量使用了AtomicInteger确保线程安全;并且由于两个锁是独立的,也需要互相通知。

在梳理时,c == capacity让我非常不解。后得到的答案是

如果移除一个元素前队列是满的,这时put那边是阻塞状态的,那说明需要通知put,你现在可以插入元素了

public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

 

 

PriortyBlockingQueue

大体上和ArrayBlockingQueue类似,主要是通过数组实现了一个二叉堆,实现按优先级出队列。

此外没有notFull,会自动扩容

数据结构

    /*** 优先级队列表示为平衡二进制堆:queue[n] 的两个子级是 queue[2*n+1] 和 queue[2*(n+1)]。优先级队列按 comparator 排序,
* 如果 comparator 为 null,则按元素的自然排序排序:对于堆中的每个节点 n 和 n 的每个后代 d,n <= d。
* 具有最低值的元素位于 queue[0] 中,假设队列为非空。
*/private transient Object[] queue;/*** The number of elements in the priority queue.*/private transient int size;/*** The comparator, or null if priority queue uses elements'* natural ordering.*/private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/private final ReentrantLock lock;/*** Condition for blocking when empty*/private final Condition notEmpty;

take与put类似于ArrayBlockingQueue

这里说一说PriortyBlockingQueue的扩容机制,在不指定大小的情况下默认容器大小为11

    /*** Default array capacity.*/private static final int DEFAULT_INITIAL_CAPACITY = 11;
    /*** 创建一个 PriorityBlockingQueue 具有默认初始容量 (11) 的元素,* 该容量根据其 元素的自然顺序对其元素进行排序。*/public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}

在put、offer的时候会检测当前容量是否已满,满了会触发扩容机制进行扩容,然后才会通过比较器将数据放入堆中

public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}

扩容机制:

1.首先释放主锁,因为扩容比较耗时,释放锁可以让其他线程操作队列避免阻塞。

2.通过CAS将allocationSpinLock锁设置为1,确保只有当前线程在进行扩容操作

3.如果当前size小于64,则新的size为旧的翻倍再+2(快速扩容);如果当前size大于64,则扩大到150%(比例扩容)。计算当前容量是否超过最大值,超了则报OOM

4.创建新数组并释放锁

5.扩容失败则退让cpu,避免忙等

6.将新数组copy到队列

private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocating
            Thread.yield();lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/878344.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

为什么选择持续绩效管理(CPM)

组织变革和发展的关键始于你的员工基础。要想在组织中建立一个不断发展和创新的环境,你必须确保你的员工是以目标、绩效和反馈为导向的。 #持续绩效管理#鼓励上级和下级之间建立直接的双向沟通。一个持续的绩效评估可以确保平稳有效的联系,随着时间的推移,可以看到更好的结果…

unity直接安装插件

一共三步 1.窗口->包管理器2. 在弹出的界面里点左上角的加号 依次解读选项为disk 文件夹(zip压缩文件请提前解压)tarball 压缩包(只支持tar压缩包)git URL 网址下载(国内应该得设置代理)by name 指定软件和版本下载 3.以disk为例,进入解压的地方选中package.json接着…

北航计网课程笔记-四、网络层

第四章 网络层 本章重点 1、IP数据报的格式和分片 2、IP地址的表示和分类分配、子网掩码、地址有效性、广播地址 3、子网划分 4、CIDR路由聚合 5、ARP协议和ICMP协议 6、路由表、路由器的工作原理 7、RIP协议和OSPF协议 8、网络层的综合设计和应用 网络层的功能 网络层的主要任…

北航计网课程笔记-六、应用层

第六章 应用层 应用层概述 应用层对应用程序的通信提供服务。应用层的功能:文件传输、访问和管理 电子邮件 虚拟终端 查询服务和远程作业登录应用层的重要协议FTP SMTP、POP3 HTTP DNS网络应用模型 客户/服务器模型C/S客户是服务请求方,服务器是服务提供方 Web、FTP、远程登录…

北航计网课程笔记-七、一些总结

复习到最后想开摆了,也没总结啥,只是一些格式……一些总结MAC帧格式IP数据报格式 IP首部20B~60B IP总长最多65535BTCP报文格式 TCP首部20B~60B 数据部分最多65535-20-20 = 65495BUDP报文格式UDP&TCP伪首部格式HTTP报文格式

北航计网课程笔记-一、概述

写在前面:本笔记根据王道408完成,北航软院不考的内容不在其中。 总之复习期末把王道刷一遍就行了。 个人学习用,如有错误敬请指正。第一章 概述 计算机网络概念 计算机网络的定义:一些互联的、自治的计算机的集合,主体是多台计算机,媒介为通信媒介, 目的是通信与资源共享…

北航计网课程笔记-二、物理层

第二章 物理层 物理层的任务:将原始的比特流从一台机器上传输到另一台机器上。 确定与传输媒体的接口特性:机械、电气、功能、规程通信基础 一些基本概念数据,信号和码元数据是传送信息的实体;信号是数据的电气或电磁表现;码元是数字通信中数字信号的计量单位,1码元可以携…

Winform和MFC的一个区别

前段时间我嫌桌面快捷键太多,我使用C语言用MFC制作了桌面自定义系统(上图左),可是这个系统存在一些问题,比如(下图) 默认情况下它的右上角没有最大化和最小化的按钮,而Winform是有的(下图)。 还有一个问题:在任务栏,点击时MFC不能最小化,而Winform可以;还有一个很…

应用中的 PostgreSQL项目案例

title: 应用中的 PostgreSQL项目案例 date: 2025/2/3 updated: 2025/2/3 author: cmdragon excerpt: 随着大数据和云计算的兴起,企业在数据管理和数据分析方面面临着越来越复杂的挑战。PostgreSQL 作为一个开源关系型数据库,凭借其卓越的扩展性和强大的功能,逐渐成为众多企业…

来自aakennes的新年祝福

来自aakennes的新年祝福组题人: @aakennes \(A\) P888. 字符串会上树 \(AC\)基础字符串。点击查看代码 string s,t="",w=""; map<string,string>f; int main() { // #define Isaac #ifdef Isaacfreopen("in.in","r",stdin);fr…

1.3 决定程序流程的程序计数器

CPU先执行0100 CPU每执行一个指令,程序计数器的值就会自动加1 CPU的控制器就会参照程序计数器的数值,从内存中读取命令并执行。 程序计数器决定着程序的流程

【每日一题】20250203

你是万千星球的星球! 你是沸腾的准则!你是精心保存的潜伏的萌芽!你是核心!【每日一题】我国发射的“天宫一号”和“神州八号”在对接前,”天宫一号”的运行轨道高度为 \(350 \; \mathrm{km}\),“神州八号”的运行轨道高度为 \(343 \; \mathrm{km}\).它们的运行轨道均视为…