文章目录
- 一、阻塞队列介绍
- 1. Queue接口
- 2. BlockingQueue接口
- 3. 阻塞队列特性
- 二、 ArrayBlockingQueue
- 1. 简介
- 3. 使用
- 3. 底层原理
- 三、LinkedBlockingQueue(使用最多的阻塞队列)
- 1. 简介
- 2. 使用
- 3. 底层原理
- 4. LinkedBlockingQueue与ArrayBlockingQueue对比
- 四、LinkedBlockingDeque
- 1. 简介
- 2. 底层原理
一、阻塞队列介绍
1. Queue接口
它定义了队列的一些基本的操作规范,但这些方法都不会对队列进行阻塞
public interface Queue<E> extends Collection<E> {boolean add(E e);E remove();E poll();E element();E peek();
}
2. BlockingQueue接口
它继承了Queue接口,是队列的一种,阻塞队列对Queue接口进行了扩展:
- take方法:队列空时,获取元素的线程会等待队列变为非空
- put方法:队列满是,队列会阻塞插入元素的线程,直到队列不满
public interface BlockingQueue<E> extends Queue<E> {boolean add(E e);boolean offer(E e);void put(E e) throws InterruptedException;boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;E take() throws InterruptedException;E poll(long timeout, TimeUnit unit)throws InterruptedException;int remainingCapacity();boolean remove(Object o);boolean contains(Object o);int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);
}
3. 阻塞队列特性
阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字,所以下面重点介绍阻塞功能:阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。
- take
take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。
- put
put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。
- 是否有界
阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。
- 应用场景
BlockingQueue 是线程安全的,我们在很多场景下都可以利用线程安全的队列来优雅地解决我们业务自身的线程安全问题。比如说,使用生产者/消费者模式的时候,我们生产者只需要往队列里添加元素,而消费者只需要从队列里取出它们就可以了,如图所示:
因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问 题。生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从“你”转移到了“队列”上,降低了我们开 发的难度和工作量。同时,队列它还能起到一个隔离的作用。比如说我们开发一个银行转账的程序,那么生产者线程不需要关心具体的转账逻辑,只需要把转账任务,如账户和金额等信息放到队列中就可以,而不需要去关心银行这个类如何实现具体的转账业务。而作为银行这个类来讲,它会去从队列里取出来将要执行的具体的任务,再去通过自己的各种方法来完成本次转账。这样就实现了具体任务与执行任务类之间的解耦,任务被放在了阻塞队列中,而负责放任务的线程是无法直接访问到我们银行具体实现转账操作的对象的,实现了隔离,提高了安全性。
- 常见的阻塞队列
二、 ArrayBlockingQueue
1. 简介
ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用 ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满, 大量生产线程被阻塞。使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发 场景下会成为性能瓶颈。
3. 使用
BlockingQueuequeue=newArrayBlockingQueue(1024); queue.put("1");//向队列中添加元素
Objectobject=queue.take();//从队列中取出元素
3. 底层原理
利用了Lock锁的Condition通知机制进行阻塞控制。
//存放元素数组final Object[] items;
//取指针,take、poll、peek和remove操作都会使用这个指针int takeIndex;
//放指针,put、offer和add操作都会使用到这个指针int putIndex;
//定义了独占锁
final ReentrantLock lock;
//定了了两个条件队列,一个用于消费者等待一个用于生产者等待
private final Condition notEmpty;
private final Condition notFull;
put方法,是如何往阻塞队列中放置元素的
public void put(E e) throws InterruptedException {Objects.requireNonNull(e);//定义了独占锁final ReentrantLock lock = this.lock;//加的可中断的锁lock.lockInterruptibly();try {//当元素满了while (count == items.length)//在notFull条件队列中等待notFull.await();//否则进行入队操作enqueue(e);} finally {//解锁lock.unlock();}}
下面看看enqueue(e)是如何实现真正入队的
private void enqueue(E e) {//获得元素数组final Object[] items = this.items;//使用putIndex将元素放入队列中items[putIndex] = e;//当前已经满了if (++putIndex == items.length) //putIndex=0,有一种循环数组的味道了,主要是为了避免删除元素时,出现元素移位的情况(数组本身存在的问题)putIndex = 0;//元素数量+1count++;//唤醒消费者队列notEmpty.signal();}
take方法,是如何向阻塞队列中取元素的
public E take() throws InterruptedException {//获取锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//队列为空,阻塞while (count == 0)notEmpty.await();//获取元素return dequeue();} finally {//解锁lock.unlock();}}
下面看看dequeue是如何获取元素的
private E dequeue() {//获取元素数组final Object[] items = this.items;@SuppressWarnings("unchecked")//获取takeindex位置的元素E e = (E) items[takeIndex];//删除takeindex地方的元素items[takeIndex] = null;//这里同样是循环数组的元素if (++takeIndex == items.length) takeIndex = 0;//数量-1count--;if (itrs != null)itrs.elementDequeued();//唤醒生产者线程notFull.signal();return e;}
三、LinkedBlockingQueue(使用最多的阻塞队列)
1. 简介
LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存, 则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。 LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
2. 使用
BlockingQueue<Integer>boundedQueue=newLinkedBlockingQueue<>(100);
BlockingQueue<Integer>unboundedQueue=newLinkedBlockingQueue<>();
3. 底层原理
首先看看它采用的数据结构
//定义了一个静态内部类,代表我们链表上的节点
static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}//阻塞队列的容量
private final int capacity;
//当前队列的容量,是一个原子变量
private final AtomicInteger count = new AtomicInteger();
//头节点
transient Node<E> head;
//尾节点
private transient Node<E> last;
//取锁
private final ReentrantLock takeLock = new ReentrantLock();
//放锁
private final ReentrantLock putLock = new ReentrantLock();
//取等待队列
private final Condition notEmpty = takeLock.newCondition();
//放等待队列private final Condition notFull = putLock.newCondition();
put方法
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();final int c;//创建一个新的节点final Node<E> node = new Node<E>(e);//获取放锁final ReentrantLock putLock = this.putLock;//获取当前阻塞队列中元素的数量final AtomicInteger count = this.count;//加中断锁putLock.lockInterruptibly();try {//当阻塞队列容量满了while (count.get() == capacity) {//此时放原属线程需要阻塞notFull.await();}//进行入队操作enqueue(node);//元素增加,这里使用原子变量的原因是放和取本身不是互斥的c = count.getAndIncrement();if (c + 1 < capacity)//当前元素数量c+1小于容量,唤醒阻塞的生产者线程(提高吞吐量)notFull.signal();} finally {//解锁putLock.unlock();}if (c == 0)//元素个数为1,唤醒消费者现场signalNotEmpty();}
enqueue方法
//一个简单的单链表尾部加元素的操作private void enqueue(Node<E> node) {last = last.next = node;}
take方法
public E take() throws InterruptedException {final E x;final int c;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {//队列为空,取线程阻塞while (count.get() == 0) {notEmpty.await();}//元素出队列x = dequeue();c = count.getAndDecrement();if (c > 1)//还有元素,可以提前唤醒消费者线程notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}
//一个单链表删除头部元素的逻辑private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}
4. LinkedBlockingQueue与ArrayBlockingQueue对比
LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和 ArrayBlockingQueue的不同点在于:
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而 LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而 LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
- 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影 响。
- 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而 LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
四、LinkedBlockingDeque
1. 简介
LinkedBlockingDeque是一个基于链表实现的双向阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,可以看做无界队列,但也可以设置容量限制,作为有界队列。相比于其他阻塞队列,LinkedBlockingDeque 多了 addFirst、addLast、peekFirst、peekLast 等方法。以first结尾的方法,表示插入、获取或移除双端队列的第一个元素。以 last 结尾的方法,表示插入、获取或移除双端队列的最后一个元素。但本质上并没有优化锁的竞争情况,因为不管是从队首还是队尾,都是在竞争同一把锁,只不过数据插入和获取的方式多了。
2. 底层原理
//链表节点
static final class Node<E> {E item;Node<E> prev;Node<E> next;Node(E x) {item = x;}}
//首节点
transient Node<E> first;
//尾节点
transient Node<E> last;
//只定义了一把锁
final ReentrantLock lock = new ReentrantLock();
//定义了两个条件等待队列private final Condition notEmpty = lock.newCondition();private final Condition notFull = lock.newCondition();
put方法
public void put(E e) throws InterruptedException {putLast(e);}
putLast方法
public void putLast(E e) throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {while (!linkLast(node))notFull.await();} finally {lock.unlock();}}
linkLast实际链表尾部加元素的逻辑
private boolean linkLast(Node<E> node) {// assert lock.isHeldByCurrentThread();if (count >= capacity)return false;Node<E> l = last;node.prev = l;last = node;if (first == null)first = node;elsel.next = node;++count;notEmpty.signal();return true;}
take方法
public E take() throws InterruptedException {return takeFirst();}
takeFirst方法
public E takeFirst() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock();try {E x;while ( (x = unlinkFirst()) == null)notEmpty.await();return x;} finally {lock.unlock();}}
unlinkFirst实际删除元素方法
private E unlinkFirst() {// assert lock.isHeldByCurrentThread();Node<E> f = first;if (f == null)return null;Node<E> n = f.next;E item = f.item;f.item = null;f.next = f; // help GCfirst = n;if (n == null)last = null;elsen.prev = null;--count;notFull.signal();return item;}
上面take和put方法逻辑和LinkedBlockingQeque一样,都是队首删元素,队尾添加元素。但LinkedBlockingDeque是一个双向链表,所以它添加了许多其它方法,原理都差不多,这里就不详细介绍了。