JAVA线程池有哪些队列? 以及它们的适用场景案例

news/2025/1/8 9:55:37/文章来源:https://www.cnblogs.com/wgjava/p/18659077

大家好,我是 V 哥。在高并发应用场景下,线程池的使用是必然的,那在线程中的队列都有哪些呢?下面 V 哥整理的几种常见的线程池队列以及适用场景案例,分享给大家。

线程池中的队列主要用于存放等待执行的任务,以下是几种常见的线程池队列:

1. 无界队列(Unbounded Queue)

  • LinkedBlockingQueue(基于链表的阻塞队列)
    • 特点:它是一个基于链表实现的阻塞队列,默认情况下容量为 Integer.MAX_VALUE,也就是几乎可以看作是无界的(实际受限于系统内存等因素)。当线程池中的线程处理任务速度小于任务提交速度时,任务会不断被添加到这个队列中,理论上不会出现队列满的情况,因此可以避免任务拒绝的情况发生,但如果任务持续快速堆积,可能会导致内存溢出等问题。
    • 适用场景:适用于任务量波动较大,但对任务拒绝比较敏感,希望尽可能容纳所有提交任务的场景,比如一些后台异步任务处理场景,像日志记录异步处理等,只要内存资源允许,尽量接收并处理所有待记录的日志信息。

下面来看一个案例:

以下是一个简单的使用Java实现的LinkedBlockingQueue类似功能的代码示例,这里为了突出核心逻辑,简化了一些边界情况处理等,但涵盖了其主要的阻塞队列特性,比如当队列满时阻塞插入线程,队列空时阻塞获取线程等,示例代码如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;// 自定义的简单LinkedBlockingQueue实现
public class MyLinkedBlockingQueue<E> {// 链表节点类,用于存储队列中的元素private static class Node<E> {E item;Node<E> next;Node(E x) {item = x;}}// 队列头节点private Node<E> head;// 队列尾节点private Node<E> last;// 队列当前元素个数private int count;// 队列容量,这里设置为Integer.MAX_VALUE模拟无界(实际受内存限制)private final int capacity = Integer.MAX_VALUE;// 用于并发控制的锁private final Lock lock = new ReentrantLock();// 队列非空条件,用于获取元素时等待队列有元素可用private final Condition notEmpty = lock.newCondition();// 队列未满条件,用于插入元素时等待队列有空间private final Condition notFull = lock.newCondition();// 构造方法,初始化头节点和尾节点public MyLinkedBlockingQueue() {head = new Node<>(null);last = head;}// 往队列中插入元素的方法public void put(E e) throws InterruptedException {lock.lock();try {// 如果队列已满(这里实际很难满,除非内存耗尽等极端情况),阻塞等待有空间while (count == capacity) {notFull.await();}// 将新元素添加到队列尾部Node<E> newNode = new Node<>(e);last.next = newNode;last = newNode;count++;// 插入元素后通知等待获取元素的线程,队列有元素了notEmpty.signal();} finally {lock.unlock();}}// 从队列中获取元素的方法public E take() throws InterruptedException {lock.lock();try {// 如果队列空,阻塞等待有元素可获取while (count == 0) {notEmpty.await();}// 获取头节点的下一个节点(实际要获取的元素所在节点)Node<E> first = head.next;E element = first.item;// 将头节点指向下一个节点,移除当前获取的元素head.next = first.next;if (last == first) {last = head;}count--;// 通知等待插入元素的线程,队列有空间了notFull.signal();return element;} finally {lock.unlock();}}// 获取当前队列中元素的数量public int size() {lock.lock();try {return count;} finally {lock.unlock();}}
}

以下是一个简单的测试类,用于演示如何使用这个自定义的MyLinkedBlockingQueue来模拟处理日志记录这样的异步任务场景:

public class TestMyLinkedBlockingQueue {public static void main(String[] args) {MyLinkedBlockingQueue<String> queue = new MyLinkedBlockingQueue<>();// 模拟生产者线程,不断产生日志信息并放入队列Thread producerThread = new Thread(() -> {for (int i = 0; i < 1000; i++) {try {String logMessage = "Log message " + i;queue.put(logMessage);System.out.println("Produced: " + logMessage);Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});// 模拟消费者线程,从队列中获取日志信息并处理(这里简单打印模拟处理)Thread consumerThread = new Thread(() -> {while (true) {try {String log = queue.take();System.out.println("Consumed: " + log);Thread.sleep(200);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});producerThread.start();consumerThread.start();try {producerThread.join();consumerThread.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

在上述代码中:

  1. MyLinkedBlockingQueue类是核心的自定义阻塞队列实现:
    • 内部通过定义链表节点类Node来构建链表结构存储元素。
    • 使用ReentrantLock来实现并发控制,配合Condition对象(notEmptynotFull)来实现当队列空时阻塞获取元素的线程、队列满时阻塞插入元素的线程的功能。
    • put方法用于向队列中插入元素,当队列元素个数达到设定容量(这里模拟无界情况)时,线程会等待直到有空间可以插入元素,插入后会通知等待获取元素的线程。
    • take方法用于从队列中获取元素,当队列空时,线程会等待直到有元素可获取,获取元素后会通知等待插入元素的线程。
  2. TestMyLinkedBlockingQueue类是用于测试的主类:
    • 创建了自定义的阻塞队列实例,并启动了生产者线程和消费者线程。
    • 生产者线程不断生成日志信息(模拟)并放入队列,消费者线程从队列中取出日志信息并模拟处理(简单打印),展示了在异步任务处理场景下该阻塞队列的基本使用方式。

需要注意的是,真正的LinkedBlockingQueue在Java的java.util.concurrent包中有着更完善的功能、异常处理以及性能优化等方面的设计,比如支持可中断的插入和获取操作、更精细的内存管理等,但这个示例可以帮助理解其基本的阻塞队列原理和实现思路。

2. 有界队列(Bounded Queue)

  • ArrayBlockingQueue(基于数组的阻塞队列)
    • 特点:基于数组实现的阻塞队列,在创建时需要指定队列的容量大小。当队列已满时,若再有新的任务提交,提交任务的线程会被阻塞,直到队列有空闲空间为止。它是一个有界的、遵循先进先出(FIFO)原则的队列,保证了任务按照提交的先后顺序依次执行。
    • 适用场景:适用于对资源使用有明确限制,需要控制队列中任务数量的场景,例如在一个资源有限的服务器环境下,对同时处理的网络请求任务数量进行限制,避免过多任务堆积耗尽系统资源,通过设置合适的队列容量,确保系统的稳定性和响应性能。

来看一个案例实现:

以下是一个使用Java实现的简单ArrayBlockingQueue类似功能的代码示例,重点体现了其基于数组的阻塞队列特性,包括有界容量、队列满时阻塞插入线程、队列空时阻塞获取线程等关键功能,示例代码如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;// 自定义的简单ArrayBlockingQueue实现
public class MyArrayBlockingQueue<E> {private final E[] items; // 用于存储元素的数组private int takeIndex; // 下一个获取元素的索引private int putIndex; // 下一个插入元素的索引private int count; // 当前队列中元素的数量private final Lock lock = new ReentrantLock();private final Condition notEmpty = lock.newCondition();private final Condition notFull = lock.newCondition();// 构造方法,传入队列容量大小,初始化数组等相关属性@SuppressWarnings("unchecked")public MyArrayBlockingQueue(int capacity) {if (capacity <= 0) {throw new IllegalArgumentException("Capacity must be greater than 0");}items = (E[]) new Object[capacity];takeIndex = 0;putIndex = 0;count = 0;}// 向队列中插入元素的方法public void put(E e) throws InterruptedException {lock.lock();try {// 如果队列已满,阻塞等待有空间while (count == items.length) {notFull.await();}// 将元素放入数组指定位置(根据putIndex)items[putIndex] = e;// 更新putIndex,循环利用数组空间,达到数组末尾后回到开头putIndex = (putIndex + 1) % items.length;count++;// 插入元素后通知等待获取元素的线程,队列有元素了notEmpty.signal();} finally {lock.unlock();}}// 从队列中获取元素的方法public E take() throws InterruptedException {lock.lock();try {// 如果队列空,阻塞等待有元素可获取while (count == 0) {notEmpty.await();}// 获取当前takeIndex位置的元素E element = items[takeIndex];// 将该位置元素置空,方便垃圾回收items[takeIndex] = null;// 更新takeIndex,循环利用数组空间takeIndex = (takeIndex + 1) % items.length;count--;// 通知等待插入元素的线程,队列有空间了notFull.signal();return element;} finally {lock.unlock();}}// 获取当前队列中元素的数量public int size() {lock.lock();try {return count;} finally {lock.unlock();}}
}

来写一个简单的测试类,用于演示如何使用这个自定义的MyArrayBlockingQueue来模拟在资源有限环境下对网络请求任务数量进行限制的场景:

public class TestMyArrayBlockingQueue {public static void main(String[] args) {// 设置队列容量为5,模拟限制同时处理的任务数量MyArrayBlockingQueue<String> queue = new MyArrayBlockingQueue<>(5);// 模拟生产者线程,不断产生网络请求任务并放入队列Thread producerThread = new Thread(() -> {for (int i = 0; i < 10; i++) {try {String request = "Network Request " + i;queue.put(request);System.out.println("Produced: " + request);Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});// 模拟消费者线程,从队列中获取网络请求任务并处理(这里简单打印模拟处理)Thread consumerThread = new Thread(() -> {while (true) {try {String request = queue.take();System.out.println("Consumed: " + request);Thread.sleep(200);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});producerThread.start();consumerThread.start();try {producerThread.join();consumerThread.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

在上述代码中:

  1. MyArrayBlockingQueue类是核心的自定义阻塞队列实现:
    • 内部使用一个泛型数组items来存储队列中的元素,通过takeIndexputIndex来分别标记获取元素和插入元素的索引位置,利用count记录当前队列中元素的数量。
    • 使用ReentrantLock进行并发控制,并配合Condition对象(notEmptynotFull)实现了队列空时阻塞获取元素的线程、队列满时阻塞插入元素的线程的功能。
    • put方法用于向队列中插入元素,当队列已满(元素数量达到数组容量)时,线程会等待直到有空间可以插入元素,插入元素后会通知等待获取元素的线程。
    • take方法用于从队列中获取元素,当队列空时,线程会等待直到有元素可获取,获取元素后会通知等待插入元素的线程。
  2. TestMyArrayBlockingQueue类是用于测试的主类:
    • 创建了自定义的阻塞队列实例,并设置了容量为5,模拟对网络请求任务数量的限制场景。
    • 启动了生产者线程和消费者线程,生产者线程不断生成网络请求任务(模拟)并放入队列,消费者线程从队列中取出任务并模拟处理(简单打印),展示了在资源有限场景下该阻塞队列的基本使用方式。

以上案例代码,可以收藏起来,慢慢消化哈。

  • LinkedBlockingDeque(基于链表的双向阻塞队列)
    • 特点:它也是基于链表结构,但与LinkedBlockingQueue不同的是,它是一个双向队列,支持在队列的两端进行插入和移除操作,同样可以设置容量限制成为有界队列。在多线程环境下,这种双向操作特性可以提供更灵活的任务调度方式,比如可以实现将高优先级任务从队头插入优先执行等情况。
    • 适用场景:适合需要灵活调整任务执行顺序,同时又要对队列规模进行控制的场景,比如在一个任务处理系统中,有紧急任务需要插队优先处理时,可以通过在队头插入的方式让其尽快被执行,并且通过设置容量防止过多任务无序堆积。

下面来看一个案例:

以下是一个使用Java实现的简单LinkedBlockingDeque类似功能的代码示例,体现了其基于链表的双向阻塞队列特性,包括可以在两端插入和移除元素、设置容量限制、队列满或空时阻塞相应操作线程等关键功能,示例代码如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;// 自定义的简单LinkedBlockingDeque实现
public class MyLinkedBlockingDeque<E> {// 链表节点类,用于存储队列中的元素private static class Node<E> {E item;Node<E> prev;Node<E> next;Node(E x) {item = x;}}private Node<E> head; // 队列头节点private Node<E> tail; // 队列尾节点private int count; // 当前队列中元素的数量private final int capacity; // 队列容量,用于控制队列规模private final Lock lock = new ReentrantLock();private final Condition notEmpty = lock.newCondition();private final Condition notFull = lock.newCondition();// 构造方法,传入队列容量,初始化头节点和尾节点public MyLinkedBlockingDeque(int capacity) {if (capacity <= 0) {throw new IllegalArgumentException("Capacity must be greater than 0");}this.capacity = capacity;head = new Node<>(null);tail = new Node<>(null);head.next = tail;tail.prev = head;}// 在队列头部插入元素的方法public void putFirst(E e) throws InterruptedException {lock.lock();try {// 如果队列已满,阻塞等待有空间while (count == capacity) {notFull.await();}Node<E> newNode = new Node<>(e);// 将新节点插入到头部Node<E> next = head.next;head.next = newNode;newNode.prev = head;newNode.next = next;next.prev = newNode;count++;// 插入元素后通知等待获取元素的线程,队列有元素了notEmpty.signal();} finally {lock.unlock();}}// 在队列尾部插入元素的方法public void putLast(E e) throws InterruptedException {lock.lock();try {// 如果队列已满,阻塞等待有空间while (count == capacity) {notFull.await();}Node<E> newNode = new Node<>(e);// 将新节点插入到尾部Node<E> prev = tail.prev;prev.next = newNode;newNode.prev = prev;newNode.next = tail;tail.prev = newNode;count++;// 插入元素后通知等待获取元素的线程,队列有元素了notEmpty.signal();} finally {lock.unlock();}}// 从队列头部获取元素的方法public E takeFirst() throws InterruptedException {lock.lock();try {// 如果队列空,阻塞等待有元素可获取while (count == 0) {notEmpty.await();}Node<E> first = head.next;// 移除头节点Node<E> next = first.next;head.next = next;next.prev = head;E element = first.item;first.item = null;count--;// 通知等待插入元素的线程,队列有空间了notFull.signal();return element;} finally {lock.unlock();}}// 从队列尾部获取元素的方法public E takeLast() throws InterruptedException {lock.lock();try {// 如果队列空,阻塞等待有元素可获取while (count == 0) {notEmpty.await();}Node<E> last = tail.prev;// 移除尾节点Node<E> prev = last.prev;prev.next = tail;tail.prev = prev;E element = last.item;last.item = null;count--;// 通知等待插入元素的线程,队列有元素了notFull.signal();return element;} finally {lock.unlock();}}// 获取当前队列中元素的数量public int size() {lock.lock();try {return count;} finally {lock.unlock();}}
}

咱们来写一个测试类,用于演示如何使用这个自定义的MyLinkedBlockingDeque来模拟在任务处理系统中对任务执行顺序灵活调整以及控制队列规模的场景:

public class TestMyLinkedBlockingDeque {public static void main(String[] args) {// 设置队列容量为5,模拟控制队列规模MyLinkedBlockingDeque<String> queue = new MyLinkedBlockingDeque<>(5);// 模拟生产者线程,产生任务并插入队列(先插入普通任务到尾部)Thread producerThread = new Thread(() -> {for (int i = 0; i < 8; i++) {try {String task = "Task " + i;if (i < 5) {queue.putLast(task);} else {// 模拟有紧急任务,插入到头部queue.putFirst("Urgent Task " + (i - 5));}System.out.println("Produced: " + task);Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});// 模拟消费者线程,从队列中获取任务并处理(这里简单打印模拟处理)Thread consumerThread = new Thread(() -> {while (true) {try {String task = queue.takeFirst();System.out.println("Consumed: " + task);Thread.sleep(200);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});producerThread.start();consumerThread.start();try {producerThread.join();consumerThread.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

在上述代码中:

  1. MyLinkedBlockingDeque类是核心的自定义双向阻塞队列实现:
    • 内部通过定义Node类构建双向链表结构来存储元素,有头节点head和尾节点tail,通过指针维护节点之间的双向关系。
    • 使用ReentrantLock进行并发控制,并配合Condition对象(notEmptynotFull)实现队列空时阻塞获取元素的线程、队列满时阻塞插入元素的线程的功能。
    • putFirstputLast方法分别用于向队列头部和尾部插入元素,当队列已满时,相应线程会等待直到有空间可插入,插入后通知等待获取元素的线程。
    • takeFirsttakeLast方法分别用于从队列头部和尾部获取元素,当队列空时,相应线程会等待直到有元素可获取,获取后通知等待插入元素的线程。
  2. TestMyLinkedBlockingDeque类是用于测试的主类:
    • 创建了自定义的双向阻塞队列实例,并设置容量为5,模拟控制队列规模的场景。
    • 启动了生产者线程和消费者线程,生产者线程先正常往队列尾部插入任务,然后模拟有紧急任务往队列头部插入,消费者线程从队列头部获取任务并模拟处理(简单打印),展示了在任务处理系统中该双向阻塞队列灵活调整任务执行顺序以及控制队列规模的基本使用方式。

学肥了么,还不懂欢迎关注威哥爱编程私信给我,慢慢给你细说。

  • PriorityBlockingQueue(基于优先级的阻塞队列)
    • 特点:这是一个支持优先级排序的无界阻塞队列(虽然说是无界,但实际受系统资源限制),队列中的元素(即任务)需要实现 Comparable 接口或者在创建队列时传入自定义的比较器 Comparator,以此来确定任务的优先级顺序。每次从队列中取出任务时,会优先取出优先级最高的任务进行执行。
    • 适用场景:适用于任务有明显优先级区分的情况,例如在一个监控系统中,告警任务有不同的严重级别,严重级别高的告警任务(如服务器宕机告警)优先级更高,需要优先处理,就可以将这些告警任务放入PriorityBlockingQueue中,按照优先级高低依次执行。

来看一个案例:

以下是一个使用Java实现的简单PriorityBlockingQueue类似功能的代码示例,重点体现了基于优先级的阻塞队列特性,即队列中的元素需要实现Comparable接口来定义优先级顺序,队列能根据优先级高低来决定元素的取出顺序,同时具备阻塞等待的功能,示例代码如下:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;// 自定义的简单PriorityBlockingQueue实现
public class MyPriorityBlockingQueue<E extends Comparable<? super E>> {private final List<E> queue; // 用于存储元素的列表private final Lock lock = new ReentrantLock();private final Condition notEmpty = lock.newCondition();// 构造方法,初始化存储列表public MyPriorityBlockingQueue() {queue = new ArrayList<>();}// 向队列中插入元素的方法public void put(E e) throws InterruptedException {lock.lock();try {queue.add(e);// 插入元素后进行上浮操作,确保满足优先级顺序siftUp(queue.size() - 1);// 插入元素后通知等待获取元素的线程,队列有元素了notEmpty.signal();} finally {lock.unlock();}}// 从队列中获取并移除优先级最高的元素(即队头元素)的方法public E take() throws InterruptedException {lock.lock();try {// 如果队列空,阻塞等待有元素可获取while (queue.size() == 0) {notEmpty.await();}E result = queue.get(0);int lastIndex = queue.size() - 1;// 将队尾元素移到队头E last = queue.get(lastIndex);queue.set(0, last);queue.remove(lastIndex);// 进行下沉操作,重新调整优先级顺序siftDown(0);return result;} finally {lock.unlock();}}// 获取当前队列中元素的数量public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}// 上浮操作,确保元素在合适的优先级位置(类似堆排序中的上浮操作)private void siftUp(int k) {while (k > 0) {int parent = (k - 1) / 2;E element = queue.get(k);E parentElement = queue.get(parent);if (element.compareTo(parentElement) >= 0) {break;}// 交换元素位置swap(parent, k);k = parent;}}// 下沉操作,确保元素在合适的优先级位置(类似堆排序中的下沉操作)private void siftDown(int k) {int half = queue.size() / 2;while (k < half) {int leftChild = 2 * k + 1;int rightChild = leftChild + 1;int childToSwap = leftChild;E element = queue.get(k);E leftChildElement = queue.get(leftChild);if (rightChild < queue.size() && leftChildElement.compareTo(queue.get(rightChild)) > 0) {childToSwap = rightChild;}E childToSwapElement = queue.get(childToSwap);if (element.compareTo(childToSwapElement) <= 0) {break;}// 交换元素位置swap(k, childToSwap);k = childToSwap;}}// 交换列表中两个位置的元素private void swap(int i, int j) {E temp = queue.get(i);queue.set(i, queue.get(j));queue.set(j, temp);}
}

下面咱们来写个测试类,用于模拟在监控系统中使用这个自定义的MyPriorityBlockingQueue来处理不同优先级告警任务的场景:

class AlertTask implements Comparable<AlertTask> {private final String message;private final int priority;public AlertTask(String message, int priority) {this.message = message;this.priority = priority;}@Overridepublic int compareTo(AlertTask other) {// 按照优先级从小到大排序,优先级数值越小越优先,这里返回差值来比较return Integer.compare(this.priority, other.priority);}@Overridepublic String toString() {return "AlertTask{" +"message='" + message + '\'' +", priority=" + priority +'}';}
}public class TestMyPriorityBlockingQueue {public static void main(String[] args) {MyPriorityBlockingQueue<AlertTask> queue = new MyPriorityBlockingQueue<>();// 模拟生产者线程,产生不同优先级的告警任务并放入队列Thread producerThread = new Thread(() -> {AlertTask highPriorityTask = new AlertTask("Server Down Alert", 1);AlertTask mediumPriorityTask = new AlertTask("High CPU Usage Alert", 3);AlertTask lowPriorityTask = new AlertTask("Disk Space Low Alert", 5);try {queue.put(highPriorityTask);queue.put(mediumPriorityTask);queue.put(lowPriorityTask);} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 模拟消费者线程,从队列中获取告警任务并处理(这里简单打印模拟处理)Thread consumerThread = new Thread(() -> {while (true) {try {AlertTask task = queue.take();System.out.println("Processing: " + task);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});producerThread.start();consumerThread.start();try {producerThread.join();consumerThread.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

在上述代码中:

  1. MyPriorityBlockingQueue类是核心的自定义基于优先级的阻塞队列实现:
    • 内部使用ArrayList来存储队列中的元素,通过ReentrantLock进行并发控制,并配合Condition对象notEmpty实现队列空时阻塞获取元素的线程的功能。
    • put方法用于向队列中插入元素,插入后会调用siftUp方法进行上浮操作,以保证队列中的元素始终按照优先级顺序排列(基于元素实现的Comparable接口来比较),插入完成后还会通知等待获取元素的线程。
    • take方法用于从队列中获取并移除优先级最高的元素(即队头元素),当队列空时,线程会等待直到有元素可获取,获取元素前会先将队尾元素移到队头并调用siftDown方法进行下沉操作,重新调整优先级顺序,然后返回获取的元素。
    • siftUpsiftDown方法分别实现了类似堆排序中的上浮和下沉操作,通过不断比较元素的优先级并交换位置,来确保队列中的元素符合优先级顺序要求,swap方法用于交换列表中两个位置的元素。
  2. TestMyPriorityBlockingQueue类是用于测试的主类:
    • 首先定义了AlertTask类实现Comparable接口,用于表示告警任务并定义其优先级比较规则,根据给定的优先级数值来确定任务的优先级高低。
    • 创建了自定义的基于优先级的阻塞队列实例,启动了生产者线程和消费者线程,生产者线程生成不同优先级的告警任务并放入队列,消费者线程从队列中获取告警任务并模拟处理(简单打印),展示了在监控系统场景下该优先级阻塞队列的基本使用方式。

这个案例可以帮助你理解基于优先级的阻塞队列原理和实现思路。Get 到了么,有任何疑问可以关注威哥爱编程私信给我。

3. 同步队列(Synchronous Queue)

  • SynchronousQueue
    • 特点:它是一种特殊的队列,内部没有实际的存储容量,每插入一个任务必须等待有线程来获取并执行这个任务,反之,线程来获取任务时,如果没有任务可用,线程会被阻塞等待任务提交。这种队列更像是一种任务传递的媒介,直接将任务从提交者传递到执行线程手上,保证了任务的即时处理,不存在任务排队等待的情况。
    • 适用场景:适用于要求任务提交后能立即被执行,不允许有任务等待堆积的场景,比如在一些对实时性要求极高的交互场景中,像在线实时交易系统中处理下单请求,希望下单任务能马上被线程处理,而不是先放入队列等待,以保障交易的及时性和流畅性。

来看一个案例代码:

以下是一个使用Java实现的简单同步队列(SynchronousQueue)类似功能的代码示例,重点体现了其核心特性,即每插入一个任务必须等待有线程来获取并执行这个任务,反之,线程来获取任务时,如果没有任务可用,线程会被阻塞等待任务提交,示例代码如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;// 自定义的简单同步队列实现
public class MySynchronousQueue<E> {private E element; // 用于存放当前要传递的元素private boolean hasElement = false; // 标记是否有元素存在private final Lock lock = new ReentrantLock();private final Condition isEmpty = lock.newCondition();private final Condition isFull = lock.newCondition();// 向队列中插入元素的方法public void put(E e) throws InterruptedException {lock.lock();try {// 如果已经有元素了,阻塞等待元素被取走while (hasElement) {isFull.await();}element = e;hasElement = true;// 通知等待获取元素的线程,有元素可获取了isEmpty.signal();} finally {lock.unlock();}}// 从队列中获取元素的方法public E take() throws InterruptedException {lock.lock();try {// 如果没有元素,阻塞等待元素被放入while (!hasElement) {isEmpty.await();}E result = element;hasElement = false;// 通知等待插入元素的线程,可以插入新元素了isFull.signal();return result;} finally {lock.unlock();}}
}

来来来,写一个简单的测试类,模拟在线实时交易系统中处理下单请求这样的高实时性场景下使用这个自定义的同步队列:

public class TestMySynchronousQueue {public static void main(String[] args) {MySynchronousQueue<String> queue = new MySynchronousQueue<>();// 模拟生产者线程,不断产生下单请求并放入队列Thread producerThread = new Thread(() -> {for (int i = 0; i < 10; i++) {try {String orderRequest = "Order Request " + i;queue.put(orderRequest);System.out.println("Produced: " + orderRequest);Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});// 模拟消费者线程,从队列中获取下单请求并处理(这里简单打印模拟处理)Thread consumerThread = new Thread(() -> {while (true) {try {String order = queue.take();System.out.println("Consumed: " + order);Thread.sleep(200);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});producerThread.start();consumerThread.start();try {producerThread.join();consumerThread.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

在上述代码中:

  1. MySynchronousQueue类是核心的自定义同步队列实现:
    • 使用一个变量element来临时存放要传递的元素,通过hasElement变量来标记当前是否有元素存在于队列中(其实它内部没有真正的队列存储结构,只是起到一个元素传递的作用)。
    • 利用ReentrantLock进行并发控制,并配合Condition对象isEmptyisFull来实现当没有元素时阻塞获取元素的线程、有元素时阻塞插入元素的线程的功能。
    • put方法用于向队列中插入元素,当已经有元素存在(即hasElementtrue)时,插入线程会等待直到元素被取走,插入元素后会通知等待获取元素的线程。
    • take方法用于从队列中获取元素,当没有元素(即hasElementfalse)时,获取线程会等待直到有元素被放入,获取元素后会通知等待插入元素的线程。
  2. TestMySynchronousQueue类是用于测试的主类:
    • 创建了自定义的同步队列实例,启动了生产者线程和消费者线程,生产者线程不断生成下单请求(模拟)并放入队列,消费者线程从队列中取出请求并模拟处理(简单打印),展示了在高实时性要求场景下该同步队列的基本使用方式。

通过以上案例的学习,帮助咱们理解其基本的同步队列原理和实现思路。

最后

不同的线程池队列有着各自的特点和适用场景,在实际使用线程池时,需要根据具体的业务需求、系统资源状况以及对任务执行顺序、响应时间等方面的要求,合理选择相应的队列来构建线程池,以实现高效的任务处理。 关注威哥爱编程,学习编程不迷茫,关注威哥爱编程,代码世界任纵横。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/865766.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ABAP 如何得到完整的错误消息(转)

场景描述:BDC或者BAPI返回的系统消息需要组成完整的文本,显示出来供查错误原因.方法一,用message into,语法如下:MESSAGE ID sy-msgid TYPE sy-msgty NUMBER sy-msgnoINTO DATA(mtext)WITH sy-msgv1 sy-msgv2 sy-msgv3 sy-msgv4方法二,调用函数 MESSAGE_TEXT_BUILD() 作用…

CDS标准视图:技术对象检验级别描述 I_TechObjInspectionLevelText

视图名称:技术对象检验级别描述 I_TechObjInspectionLevelText 视图类型:基础视图 视图代码:点击查看代码 @AbapCatalog: {sqlViewName: ITECHOBJINSPLVLT,compiler.compareFilter: true,preserveKey: true }@AccessControl.authorizationCheck: #NOT_REQUIRED@EndUserText.…

面向强化学习的状态空间建模:RSSM的介绍和PyTorch实现

循环状态空间模型(Recurrent State Space Models, RSSM)最初由 Danijar Hafer 等人在论文《Learning Latent Dynamics for Planning from Pixels》中提出。该模型在现代基于模型的强化学习(Model-Based Reinforcement Learning, MBRL)中发挥着关键作用,其主要目标是构建可…

从接单到反馈:看板软件打造家政服务闭环管理

家政行业通过应用看板软件可以更加高效、有序地管理日常工作,优化工作流程,提高团队协作效率和服务质量。根据《2024年中国家政服务行业发展状况及消费洞察报告》,2023年中国家政服务市场规模达11641亿元,仍呈增长趋势;预计到2026年,中国家政服务行业的市场规模将突破130…

国产化板卡设计资料:2270-VC709E 基于FMC接口的JFM7VX690T36 PCIeX8 接口卡

VC709E 基于FMC接口的JFM7VX690T36 PCIeX8 接口卡 一、板卡概述 本板卡基于FPGA JFM7VX690T36 芯片,支持PCIeX8、两组 64bit DDR3容量8GByte,HPC的FMC连接器,板卡支持各种FMC子卡扩展。软件支持windows,Linux操作系统。二、功能和技术指标 : 四、应用领域软件无线电处理平…

SAP SE37函数模块异常抛出

今天写了一个功能,生产订单批量打删除标识,我用一个函数来封装相应的功能,并且使用一下RAISE异常的功能 在函数里面判断用户是否在ALV界面选中了数据,如果没有选中则抛出异常 打断点发现,RAISE异常之后代码会中断执行,退出相应的函数模块。是非常好用的代码。函数部分调用…

网站管理助手无法创建数据库及连接问题的解决方案

问题描述: 用户在使用网站管理助手创建数据库时遇到困难,不仅无法创建新的数据库,而且已创建的数据库也无法正常连接。这种情况严重影响了用户的数据库管理效率和网站的正常运行。 解决方案:确认工具与环境兼容性: 首先,确保使用的网站管理助手版本与当前的操作系统和数据…

如何解决无法登录云服务器的问题

问题描述: 我无法登录我的云服务器,输入正确的账户密码后仍然无法成功登录。请问如何解决这个问题? 回答: 当您无法登录云服务器时,可能是由于多种原因引起的。以下是一些详细的排查和解决方案:检查远程服务状态:重启远程服务:首先,尝试重启服务器的远程服务(如SSH或…

请问如何处理网站频繁被篡改的问题

问题描述: 用户报告其网站频繁被篡改,首页文件不断被修改,甚至文件名也会被更改。尽管采取了多种防护措施,问题仍然频繁发生。用户希望了解如何彻底解决网站被篡改的问题,确保网站安全稳定运行。 解决方案:确认木马病毒和漏洞: 确认网站是否存在木马病毒或程序漏洞。可以…

如何设置二级域名直接跳转至移动端网站,避免通过默认路径访问?

问题描述: 用户希望将二级域名(如 m.example.com)直接指向移动端网站,而不是通过主域名的 /m 文件夹进行访问。当前情况下,手机访问主域名时会自动跳转到 /m 文件夹,但希望通过二级域名直接访问移动端内容。 解决方案: 要实现二级域名直接访问移动端网站而不经过默认路径…

如何在织梦CMS中修改网站栏目标题以优化SEO效果

问题描述: 用户希望了解如何在织梦CMS(DedeCMS)中修改网站栏目标题,以提高搜索引擎优化(SEO)的效果。具体来说,用户想要知道如何通过后台设置或代码修改来实现这一目标。 回答: 在织梦CMS中,修改网站栏目标题是提升SEO效果的重要步骤之一。以下是详细的步骤和注意事项…

处理服务器遭受 DDoS 攻击后的解封与防护措施

当您的服务器遭受 DDoS 攻击后,机房可能会暂时封停服务器的 IP 地址,以保护网络稳定。为了尽快恢复服务并防止未来的攻击,您可以采取以下步骤:确认攻击类型:首先,确认攻击的具体类型(如 SYN Flood、UDP Flood 等)。这有助于您选择合适的防护措施。 通过云服务商提供的监…