【并发编程】手写线程池阻塞队列

       📝个人主页:五敷有你      
 🔥系列专栏:并发编程
⛺️稳重求进,晒太阳

示意图 

步骤1:自定义任务队列

变量定义

  1. 用Deque双端队列来承接任务
  2. 用ReentrantLock 来做锁
  3. 并声明两个条件变量 Condition fullWaitSet emptyWaitSet
  4. 最后定义容量 capcity

方法:

  1. 添加任务
    1. 注意点:
      1. 任务容量慢了 用await
      2. 每个添加都进行一个emptyWaitSet.signalAll 唤醒沉睡的线程
      3. 考虑万一死等的情况,加入时间的判断
  2. 取出任务
    1. 注意点:
      1. 任务空了 用await
      2. 每个任务取出来都进行一个fullWaitSet.signAll来唤醒沉睡的线程
      3. 考虑超时的情况,加入时间的判断
public class MyBlockQueue<T> {//1.任务队列private Deque<T> deque=new ArrayDeque();//2.锁private ReentrantLock lock=new ReentrantLock();//3.生产者条件变量private Condition fullWaitSet=lock.newCondition();//4.消费者条件变量private Condition emptyWaitSet=lock.newCondition();//5.容量private int capcity;public MyBlockQueue(int capcity) {this.capcity = capcity;}//带超时的阻塞获取public T poll(long timeOut, TimeUnit unit){lock.lock();try {//将timeOUt转换成统一转换为nslong nanos = unit.toNanos(timeOut);while (deque.isEmpty()) {try {//返回值=等待时间-经过的时间if(nanos<=0){return null;}nanos= emptyWaitSet.awaitNanos(nanos);}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}//6. 阻塞获取public T take() {lock.lock();try {while (deque.isEmpty()) {try {emptyWaitSet.await();}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}//阻塞添加public void put(T element){lock.lock();try {while (deque.size()==capcity){try {fullWaitSet.await();}catch (Exception e){}}deque.addLast(element);emptyWaitSet.signalAll();} finally {lock.unlock();}}public int size(){lock.lock();try {return deque.size();}finally {lock.unlock();}}}

步骤2:自定义线程池

  1. 定义变量:
    1. 任务队列 taskQueue
    2. 队列的容量
    3. 线程的集合
    4. 核心线程数
    5. 获取任务的超时时间
    6. 时间单位
  2. 方法
    1. 构造方法 初始化一些核心的参数
    2. 执行方法 execute(task) 里面处理任务
      1. 每执行一个任务就放入一个worker中,并开启线程执行 同时放入workers集合中
      2. 当任务数量>核心数量时,就加入到阻塞队列中
  3. 自定义的类worker
    1. 继承Thread 重写Run方法
      1. 执行传递的任务,每次任务执行完毕,不回收,
      2. 去队列中拿任务 当队列也空了之后 workers集合中移除线程,线程停止。
package com.aqiuo.juc;import java.util.HashSet;
import java.util.concurrent.TimeUnit;public class ThreadPool {//任务队列private MyBlockQueue<Runnable> taskQueue;//队列容量int queueCapcity;//线程集合private HashSet<Worker> workers=new HashSet();//线程池的核心线程private int coreSize;//获取任务的超时时间private long timeOut;//时间单位private TimeUnit timeUnit;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;taskQueue=new MyBlockQueue<>(queueCapcity);}public void exectue(Runnable task){//当任务数没有超过coreSize时,直接交给work对象执行//如果任务超过coreSize时,加入任务队列synchronized (workers){if(workers.size()<coreSize){Worker worker=new Worker(task);System.out.println("新增worker");workers.add(worker);worker.start();//任务数超过了核心数}else{System.out.println(task+"加入任务队列");taskQueue.put(task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task){this.task=task;}@Overridepublic void run() {//执行任务//1)当task不为空,执行任务//2)当task执行完毕,再接着从任务队列中获取任务while (task!=null||(task=taskQueue.take())!=null){try {System.out.println("正在执行worker"+this);sleep(10000);task.run();} catch (Exception e) {}finally {task=null;}}//执行完任务后销毁线程synchronized (workers){workers.remove(this);}}}}

测试

开启15个线程测试

public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);for (int i=0;i<15;i++){int j=i;threadPool.exectue(()->{System.out.println(j);});}}

        执行过程中,超过了队列容量之后,就会发生fullWaitSet阻塞。这个阻塞的线程就开始等待,当有队列不满之后,唤醒fullWaitSet阻塞的队列,

        同理,当队列为空,emptyWaitSet小黑屋阻塞,当有任务被放入,EmptyWaitSet唤醒所有的线程。

这就有一个执行完毕之后,线程不会停止,他会一定等待拿去任务,线程阻塞了EmptyWaitSet

改进

获取任务的超时结束

获取任务take的增强 超时

  //带超时的阻塞获取public T poll(long timeOut, TimeUnit unit){lock.lock();try {//将timeOUt转换成统一转换为nslong nanos = unit.toNanos(timeOut);while (deque.isEmpty()) {try {//返回值=等待时间-经过的时间if(nanos<=0){return null;}nanos= emptyWaitSet.awaitNanos(nanos);}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}

修改worker的run函数

      public void run() {//执行任务//1)当task不为空,执行任务//2)当task执行完毕,再接着从任务队列中获取任务
//            while (task!=null||(task=taskQueue.take())!=null){//修改如下while (task!=null||(task=taskQueue.poll(timeOut,timeUnit))!=null){try {System.out.println("正在执行worker"+this);sleep(1000);task.run();} catch (Exception e) {}finally {task=null;}}

正常结束了

放入任务的超时结束offer()

那么有装入任务 的增强 ,就再提供一个超时装入入offer()吧 ,当放入一个满的队列时,超时后返回false不再放入

//带有超时的队列添加
public Boolean offer(T element,long timeOut, TimeUnit unit){lock.lock();long nanos = unit.toNanos(timeOut);try {while (deque.size()==capcity){try {long l = fullWaitSet.awaitNanos(nanos);if(l<=0){return false;}}catch (Exception e){}}deque.addLast(element);emptyWaitSet.signalAll();return true;} finally {lock.unlock();}
}

拒绝策略

函数式接口

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(MyBlockQueue<T> queue, T task);
}

代码改进

如下部分代码是存入任务的部分

public void exectue(Runnable task){//当任务数没有超过coreSize时,直接交给work对象执行//如果任务超过coreSize时,加入任务队列synchronized (workers){if(workers.size()<coreSize){Worker worker=new Worker(task);System.out.println("新增worker");workers.add(worker);worker.start();//任务数超过了核心数}else{//存入任务//taskQueue.put(task);//当队列满了之后 执行的策略//1) 死等//2)带有超时的等待//3)当调用者放弃任务执行//4)让调用者抛出异常//5)让调用者自己执行任务...//为了增加灵活性,这里不写死,交给调用者//重新写了一个放入任务的方法taskQueue.tryPut(rejectPolicy,task);}}}

阻塞队列里的tryPut

public void tryPut(ThreadPool.RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {//如果队列容量满了,就开始执行拒绝策略if(capcity>= deque.size()){rejectPolicy.reject(this,task);}else{//不满就正常加入到队列中System.out.println(task+"正常加入到队列");deque.addLast(task);}}finally {lock.unlock();}}

//1) 死等

//2)带有超时的等待

//3)当调用者放弃任务执行

//4)让调用者抛出异常

//5)让调用者自己执行任务...

谁调用方法,谁写拒绝策略

为了传入策略,就再构造函数里面加入一个方法的参数传入

//部分代码...
//拒绝策略
RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;taskQueue=new MyBlockQueue<>(queueCapcity);this.rejectPolicy=rejectPolicy;
}

主函数编写拒绝的策略,就lamda表达式会把...

public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{//死等
//            queue.put(task);//超时添加
//            System.out.println(queue.offer(task, 100, TimeUnit.NANOSECONDS));//放弃执行
//            System.out.print("我放弃");//调用者抛出异常
//            throw new RuntimeException("任务执行失败");//调用者执行
//            task.run();});for (int i=0;i<5;i++){int j=i;threadPool.exectue(()->{System.out.println(j);});}}

五种拒绝策略的结果(我不会用slog4j)

1.死等的结果

2.超时拒绝的结果(每个false都是时间到了,每加进去)

3.不作为,调用者放弃任务

4.抛出异常,停止

5.调用者线程执行了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/457057.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

debian12 - openssh-9.6.P1的编译安装

文章目录 debian12 - openssh-9.6.P1的编译安装概述笔记备注END debian12 - openssh-9.6.P1的编译安装 概述 在debian12上, 源码编译安装了openssl3.2 导致ssh失败. lostspeeddebian12d4x64:~$ openssl version OpenSSL 3.2.0 23 Nov 2023 (Library: OpenSSL 3.2.0 23 Nov 2…

基于SpringBoot开发的校刊投稿系统[附源码]

基于SpringBoot开发的校刊投稿系统[附源码] &#x1f345; 作者主页 央顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; &#x1f345; 查看下方微信号获取联系方式 承接各种定制系统 &#x1f…

计算机网络-流量控制(数据链路层的流量控制及与传输层流量控制的区别 流量控制的方法 可靠传输,滑动窗口,流量控制三者关系)

文章目录 数据链路层的流量控制及与传输层流量控制的区别流量控制的方法各方法对应的发生窗口和接收窗口大小 可靠传输&#xff0c;滑动窗口&#xff0c;流量控制三者关系小结 数据链路层的流量控制及与传输层流量控制的区别 端到端&#xff1a;两个主机之间的 点对点&#xf…

蓝桥杯Web应用开发-CSS3 新特性

CSS3 新特性 专栏持续更新中 在前面我们已经学习了元素选择器、id 选择器和类选择器&#xff0c;我们可以通过标签名、id 名、类名给指定元素设置样式。 现在我们继续选择器之旅&#xff0c;学习 CSS3 中新增的三类选择器&#xff0c;分别是&#xff1a; • 属性选择器 • 子…

c++二叉树寒假特训题目(1)

大家好&#xff0c;我是周曦&#xff0c;今天给大家推荐一些二叉树题目。 题目 二叉树存储 这道题是道水题&#xff0c;找找规律ok&#xff0c;本人代码10行。 淘汰赛 这道题推荐使用桶数组 做比较合适&#xff08;就是有点绕&#xff09;。 二叉树深度 这题是一道深搜题&a…

安装newman显示required node version >=16解决办法

直接安装对应版本就行&#xff0c;我的Node.js是14.16.0的版本&#xff0c;newman安装5.2.2的就不会出错。 npm install -g newman5.2.2

精选30个炫酷的数据可视化大屏(含源码),拿走就用!

大屏数据可视化是以大屏为主要展示载体的数据可视化设计。 “大面积、炫酷动效、丰富色彩”,大屏易在观感上给人留下震撼印象,便于营造某些独特氛围、打造仪式感。 原本看不见的数据可视化后,便能调动人的情绪、引发人的共鸣。 大屏数据可视化目前主要有信息展示、数据分…

警惕“中等数字化陷阱”,大力发扬先进基础设施“长板”

上世纪七、八十年代&#xff0c;拉美国家发展由富转穷&#xff0c;人均GDP发展至3000美金左右就开始停滞不前。研究界将这一现象归结为一个极具争议的概念——“中等收入陷阱”。 如今&#xff0c;在我国数字化发展当中&#xff0c;也有一种“中等数字化陷阱”正露出苗头&…

Flutter 网络请求之Dio库

Flutter 网络请求之Dio库 前言正文一、配置项目二、网络请求三、封装① 单例模式② 网络拦截器③ 返回值封装④ 封装请求 四、结合GetX使用五、源码 前言 最近再写Flutter系列文章&#xff0c;在了解过状态管理之后&#xff0c;我们再来学习一下网络请求。 正文 网络请求对于一…

ICLR 2024 | Mol-Instructions: 面向大模型的大规模生物分子指令数据集

发表会议&#xff1a;ICLR 2024 论文标题&#xff1a;Mol-Instructions: A Large-Scale Biomolecular Instruction Dataset for Large Language Models 论文链接&#xff1a;https://arxiv.org/pdf/2306.08018.pdf 代码链接&#xff1a;https://github.com/zjunlp/Mol-Instruct…

QT上位机:串口调试助手

前言 上位机的简单编写可以帮我们测试并完善平台&#xff0c;QT作为一款跨平台的GUI开发框架&#xff0c;提供了非常丰富的常用串口api。本文先从最简单的串口调试助手开始&#xff0c;编写平台软件的串口控制界面 工程配置 QT 串口通信基于QT的QSerialPort类&#xff0c;先在…

AR人脸106240点位检测解决方案

美摄科技针对企业需求推出了AR人脸106/240点位检测解决方案&#xff0c;为企业提供高效、精准的人脸识别服务&#xff0c;采用先进的人脸识别算法和机器学习技术&#xff0c;通过高精度、高速度的检测设备&#xff0c;对人脸进行快速、准确地定位和识别。该方案适用于各种应用场…