阻塞队列是一个在队列基础上又新增了两个附加操作的队列,用于解耦
支持阻塞的插入方法:队列满的时候,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法:队列空的时候,获取元素的线程会等待队列变为非空
blockingqueue继承queue,属于juc包。
boolean add(E e);boolean offer(E e); boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;void put(E e) throws InterruptedException;E take() throws InterruptedException;E poll(long timeout, TimeUnit unit) throws InterruptedException;boolean remove(Object o);
add与offer、remove方法是无阻塞的,是queue本身的接口。(add和offer的区别就是add在队列满的时候会抛异常,而offer则是返回一个false)
put与take、poll是阻塞的,都抛出了阻塞中断异常。(take和poll的区别就是take不论等待多久都要把元素从队列中移除,而poll会根据等待时间选择是否放弃)
offer也有阻塞方法,其中timeout设置的多少时间内还没有加入队列则抛弃该元素
ArrayBlockingQueue
数组必须指定大小,入队出队都会被锁上(也就是说他在只有入队高并发/出队高并发的时候表现比较好,同时高并发表现不好)
与queue不同的特点是一把锁和两个条件标识notFull和notEmpty
数据结构
/** The queued items */final Object[] items;/** items index for next take, poll, peek or remove */int takeIndex;/** items index for next put, offer, or add */int putIndex;/** Number of elements in the queue */int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*//** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;
比普通的queue,他还多了一把可重入锁,两个条件(分别用于take和put)
构造方法,他的fair默认为false(非公平模式,即线程获取锁的顺序不固定。此外公平模式即FIFO,谁先请求的谁先获取锁的访问权)
public ArrayBlockingQueue(int capacity) {this(capacity, false);}
put方法
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}
入队完成后通知当前队列已是非空状态,在队列满的时候通过将下一指针指向0来实现循环数组,提高空间利用率
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}
take方法,这里的可重入锁和put用的是同一个
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}
出队原理同类似于入队
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}
LinkedBlockingQueue
两把锁分别控制队头队尾,put与put之间会互斥,take与take之间也会互斥,而put和take不互斥。
和array相比,容量近乎无限(整数最大值)
put会通知take,take也会通知put。put非满/take非空也会通知其他的put/take
数据结构
static class Node<E> {E item;/*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*/Node<E> next;Node(E x) { item = x; }}/** The capacity bound, or Integer.MAX_VALUE if none */private final int capacity;/** Current number of elements */private final AtomicInteger count = new AtomicInteger();/*** Head of linked list.* Invariant: head.item == null*/transient Node<E> head;/*** Tail of linked list.* Invariant: last.next == null*/private transient Node<E> last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();
构造方法(可不设置容器大小,默认为整数最大值)
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);} public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}
这里的put、take类似于ArrayBlockingQueue,不同的是这里的当前容量使用了AtomicInteger确保线程安全;并且由于两个锁是独立的,也需要互相通知。
在梳理时,c == capacity让我非常不解。后得到的答案是
如果移除一个元素前队列是满的,这时put那边是阻塞状态的,那说明需要通知put,你现在可以插入元素了
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}
PriortyBlockingQueue
大体上和ArrayBlockingQueue类似,主要是通过数组实现了一个二叉堆,实现按优先级出队列。
此外没有notFull,会自动扩容
数据结构
/*** 优先级队列表示为平衡二进制堆:queue[n] 的两个子级是 queue[2*n+1] 和 queue[2*(n+1)]。优先级队列按 comparator 排序,
* 如果 comparator 为 null,则按元素的自然排序排序:对于堆中的每个节点 n 和 n 的每个后代 d,n <= d。
* 具有最低值的元素位于 queue[0] 中,假设队列为非空。*/private transient Object[] queue;/*** The number of elements in the priority queue.*/private transient int size;/*** The comparator, or null if priority queue uses elements'* natural ordering.*/private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/private final ReentrantLock lock;/*** Condition for blocking when empty*/private final Condition notEmpty;
take与put类似于ArrayBlockingQueue
这里说一说PriortyBlockingQueue的扩容机制,在不指定大小的情况下默认容器大小为11
/*** Default array capacity.*/private static final int DEFAULT_INITIAL_CAPACITY = 11;
/*** 创建一个 PriorityBlockingQueue 具有默认初始容量 (11) 的元素,* 该容量根据其 元素的自然顺序对其元素进行排序。*/public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}
在put、offer的时候会检测当前容量是否已满,满了会触发扩容机制进行扩容,然后才会通过比较器将数据放入堆中
public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}
扩容机制:
1.首先释放主锁,因为扩容比较耗时,释放锁可以让其他线程操作队列避免阻塞。
2.通过CAS将allocationSpinLock锁设置为1,确保只有当前线程在进行扩容操作
3.如果当前size小于64,则新的size为旧的翻倍再+2(快速扩容);如果当前size大于64,则扩大到150%(比例扩容)。计算当前容量是否超过最大值,超了则报OOM
4.创建新数组并释放锁
5.扩容失败则退让cpu,避免忙等
6.将新数组copy到队列
private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocating Thread.yield();lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}