Java自定义一个线程池

线程池图解 

线程池与主线程之间通过一个阻塞队列来平衡任务分配,阻塞队列中既可以满足线程等待,又要接收主线程的任务。

线程池实现

使用一个双向链表实现任务队列

 创建任务队列

//阻塞队列
public class BlockingQueue<T> {//双线链表private Deque<T> queue = new ArrayDeque();//锁private ReentrantLock lock =new ReentrantLock();//生产者条件变量private Condition fullWaitSet = lock.newCondition();//消费者条件变量private Condition emptyWaitSet = lock.newCondition();//容器容量大小private int capacity;//阻塞获取public T pull(long timeOut, TimeUnit unit){lock.lock();//判断链表中是否存在任务待处理try {//将尝试时间转化为纳秒long nanos = unit.toNanos(timeOut);while (queue.isEmpty()){try {if (nanos<0){return null;}//awaitNanos返回结果是最大等待时间减去睡眠时间的剩余时间nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}//阻塞添加public void put(T element){lock.lock();try{while(queue.size()==capacity){//说明满了,暂时无法添加新的任务try {fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(element);emptyWaitSet.signal();}finally {lock.unlock();}}//获取队列任务数量public int size(){lock.lock();try {return queue.size();}finally {lock.unlock();}}
}

创建线程池 

public class ThreadPool {//任务队列private BlockingQueue<Runnable> blockingQueue;//线程集合private HashSet<Worker> workers = new HashSet();//核心线程数private int coreNum;//超时时间private long timeOut;private TimeUnit unit;public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity) {System.out.println("初始化线程池");this.coreNum = coreNum;this.timeOut = timeOut;this.unit = unit;this.blockingQueue = new BlockingQueue<>(queueCapacity);}//线程执行任务public void execute(Runnable task) {//当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueuesynchronized (workers) {if (workers.size() < coreNum) {Worker worker = new Worker(task);System.out.println("新增worker"+worker);workers.add(worker);worker.start();} else {System.out.println("从消息队列中获取task");blockingQueue.put(task);}}}class Worker extends Thread {private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {while (task != null || (task = blockingQueue.pull(timeOut, unit)) != null) {try {System.out.println("Worker执行任务");task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers){System.out.println("Worker执行完毕"+this);workers.remove(this);}}}
}

测试 

public class Test {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(3,3000, TimeUnit.MILLISECONDS,5);for (int i = 0; i < 10; i++) {int j = i;threadPool.execute(()->{try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("生产任务:"+j);});}}
}

初始化线程池

新增workerThread[Thread-0,5,main]

新增workerThread[Thread-1,5,main]

新增workerThread[Thread-2,5,main]

Worker执行任务

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@7ba4f24f

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@3b9a45b3

加入任务队列TheadPool.Test$$Lambda$1/1078694789@7699a589

加入任务队列TheadPool.Test$$Lambda$1/1078694789@58372a00

加入任务队列TheadPool.Test$$Lambda$1/1078694789@4dd8dc3

等待加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736

生产任务:2

生产任务:1

生产任务:0

Worker执行任务

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@378bf509

生产任务:3

生产任务:4

生产任务:5

Worker执行任务

Worker执行任务

Worker执行任务

生产任务:6

生产任务:8

生产任务:7

Worker执行任务

Worker执行完毕Thread[Thread-1,5,main]

Worker执行完毕Thread[Thread-0,5,main]

生产任务:9

Worker执行完毕Thread[Thread-2,5,main]

添加拒绝策略

上面测试中,有一点不友好的是,当任务队列满了之后,再向其中添加任务时,主线程会死等任务添加成功。

对此我们可以选择多种解决方案

  • 死等
  • 添加超时时间
  • 让调用者方式执行
  • 让调用者抛出异常
  • 让调用者自己执行

创建拒绝策略

@FunctionalInterface
public interface RejectPolicy<T> {void reject(BlockingQueue<T> queue,T task);
}

修改线程池的执行方法 

	//添加属性private RejectPolicy rejectPolicy;//构造方法public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity, RejectPolicy rejectPolicy) {System.out.println("初始化线程池");this.coreNum = coreNum;this.timeOut = timeOut;this.unit = unit;this.blockingQueue = new BlockingQueue<>(queueCapacity);this.rejectPolicy = rejectPolicy;}//线程执行任务public void execute(Runnable task) {//当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueuesynchronized (workers) {if (workers.size() < coreNum) {Worker worker = new Worker(task);System.out.println("新增worker" + worker);workers.add(worker);worker.start();} else {
//                System.out.println("从消息队列中获取task");
//                blockingQueue.put(task);blockingQueue.tryPut(rejectPolicy,task);}}}

 任务队列添加方法

    public void tryPut(RejectPolicy rejectPolicy, T task) {lock.lock();try {if (queue.size() == capacity) {//如果满了,需要调用拒绝策略rejectPolicy.reject(this,task);} else {queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}

测试 

public class Test {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(3,3000,TimeUnit.MILLISECONDS,5,(queue,task)->{//由调用者决定任务队列满了之后如何处理后续任务queue.put(task);//死等queue.offer(task,1000,TimeUnit.MILLISECONDS);//超时返回//啥也不干,直接丢弃任务task.run();//调用者自己执行throw new RuntimeException("任务秩序异常");//抛出异常});for (int i = 0; i < 10; i++) {int j = i;threadPool.execute(()->{try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("生产任务:"+j);});}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/223268.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

京东秒杀之商品展示

1 在gitee上添加.yml文件 1.1 添加good-server.yml文件 server:port: 8084 spring:datasource:url: jdbc:mysql://localhost:3306/shop_goods?serverTimezoneGMT%2B8driverClassName: com.mysql.cj.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceusername: rootpa…

C语言:输入一行字符,分别统计出其中英文字母、空格、数字和其他字符的个数

分析&#xff1a; 在主函数 main 中&#xff0c;程序首先定义一个字符变量 c&#xff0c;以及四个整型变量 letters、k、s 和 o&#xff0c;并初始化它们的值为 0。然后使用 printf 函数输出提示信息&#xff0c;让用户输入一行字符。 接下来&#xff0c;程序通过 while 循环结…

线性分类器---损失函数与优化算法

如何衡量分类器对当前样本的效果好坏&#xff1f; 需要损失函数 什么是损失函数&#xff1f; 损失函数搭建了模型性能与模型参数之间的桥梁&#xff0c;指导 模型参数优化。  损失函数是一个函数&#xff0c;用于度量给定分类器的预测值与真实值 的不一致程度&#xff0c;…

线性分类器--分类模型

记录学习 北京邮电大学计算机学院 鲁鹏 为什么从线性分类器开始&#xff1f;  形式简单、易于理解  通过层级结构&#xff08;神经网络&#xff09;或者高维映射&#xff08;支撑向量机&#xff09;可以 形成功能强大的非线性模型 什么是线性分类器&#xff1f; 线性分…

入侵redis之准备---VMware安装部署kail镜像服务器【详细包含云盘镜像】

入侵redis之准备—VMware安装部署kail镜像服务器【详细包含云盘镜像】 kail是一个很好玩的操作系统&#xff0c;不多说了哈 下载kail镜像 kail官网:https://www.kali.org/get-kali/#kali-platforms 百度云盘下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1PRjo…

什么是半监督学习

1 概述 1.1 定义 半监督学习&#xff08;Semi-Supervised Learning&#xff09;是机器学习中的一个重要分支&#xff0c;它介于监督学习和无监督学习之间。半监督学习利用少量标注数据和大量未标注数据共同训练模型&#xff0c;旨在充分挖掘未标注数据中潜在的信息和模式&…

还不懂缓存穿透?Redis缓存穿透深度剖析

&#x1f388;个人公众号:&#x1f388; :✨✨✨ 可为编程✨ &#x1f35f;&#x1f35f; &#x1f511;个人信条:&#x1f511; 知足知不足 有为有不为 为与不为皆为可为&#x1f335; &#x1f349;本篇简介:&#x1f349;本篇记录Redis缓存穿透深度剖析命令操作&#xff0c;…

【云备份】数据管理模块

文章目录 1. 数据管理模块要管理什么数据&#xff1f;2. 数据管理模块如何管理数据&#xff1f;3. 数据管理模块的具体实现BackupInfo 数据信息类NewBackupInfo —— 获取各项属性信息 DataManager 数据管理类构造函数析构函数insert —— 新增update —— 修改GetOneByURL——…

Portraiture2024最新Photoshop磨皮插件更新啦

Portraiture是一款由Imagenomic公司研发的Photoshop磨皮插件。该插件以其优秀的磨皮效果&#xff0c;成为了众多摄影师和化妆师使用的首选插件。Portraiture主要用于影楼、婚纱、时尚摄影等各个领域。其主要特点是能够轻松地模拟人眼的视觉感受&#xff0c;自然地修饰人像照片。…

netty(三) taskQueue自定义任务,http服务器快速入门,netty核心模块,Unpooled

如果执行某些业务比较复杂&#xff0c;比较耗时&#xff0c;可以使用异步来完成 当然可以有多个任务 上面的结果是&#xff0c;在第一个任务处理完&#xff0c;再等20秒执行&#xff0c;简单来说&#xff0c;就是第一个在10秒执行&#xff0c;第二个在第30秒的时候执行&#…

Linux:通过VMWare,定制化Linux系统

一、原理图 二、新增磁盘&#xff08;对应上图sdb盘&#xff09; 三、挂载磁盘 主要是四步&#xff1a;查看磁盘&#xff0c;分区磁盘&#xff0c;格式化磁盘&#xff0c;挂载磁盘 1、查看磁盘 2、分区磁盘 3、格式化磁盘 4、挂载磁盘 创建两个备用目录&#xff0c;用于磁盘…

Qt_一个由单例引发的崩溃

Qt_一个由单例引发的崩溃 文章目录 Qt_一个由单例引发的崩溃摘要关于 Q_GLOBAL_STATIC代码测试布局管理器源码分析Demo 验证关于布局管理器析构Qt 类声明周期探索更新代码获取父类分析Qt 单例宏源码 关键字&#xff1a; Qt、 Q_GLOBAL_STATIC、 单例、 UI、 崩溃 摘要 今…