手写线程池

文章目录


在这里插入图片描述

线程池最主要有两部分,阻塞队列和线程池,阻塞队列使用双端队列,我们需要定义队列的容量、 还需要来保证将任务放入队列和取出队列的正确性、使用Reentrentlock定义两个条件变量:生产者条件变量和消费者条件变量,当队列满时任务不能放入队列,在生产者条件变量await,当队列为空时不能从队列中取任务,在消费者的条件变量中await,当然的话还需要有唤醒逻辑,以及取任务和存任务都可以带超时时间,避免总等待的情况。

线程池:定义了线程集合(hashset)、核心线程数、获取任务的超时时间、任务队列、拒绝策略(当没能力处理任务时,这些任务怎么办)
线程池中处理任务的逻辑:当任务数没有超过核心线程数时,每个任务都有对应的线程处理,并且线程的run方法里面是有while循环的,只要任务队列中还有任务时,会源源不断的执行这些任务,直到没有任务时,才将线程池中的线程回收,当然这个是线程池有能力处理这些任务的情况下,当任务队列满了,可以使用策略模式,将策略传入阻塞队列,然后执行拒绝的这种策略。

package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;
import org.springframework.core.log.LogDelegateFactory;import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{// 1. 死等
//            queue.put(task);// 2) 带超时等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行
//            log.debug("放弃{}", task);// 4) 让调用者抛出异常
//            throw new RuntimeException("任务执行失败 " + task);// 5) 让调用者自己执行任务task.run();});for (int i = 0; i < 4; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task);
}@Slf4j(topic = "c.ThreadPool")
class ThreadPool {// 任务队列private BlockingQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 核心线程数private int coreSize;// 获取任务时的超时时间private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 执行任务public void execute(Runnable task) {// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行// 如果任务数超过 coreSize 时,加入任务队列暂存synchronized (workers) {if(workers.size() < coreSize) {Worker worker = new Worker(task);log.debug("新增 worker{}, {}", worker, task);workers.add(worker);worker.start();} else {
//                taskQueue.put(task);// 1) 死等// 2) 带超时等待// 3) 让调用者放弃任务执行// 4) 让调用者抛出异常// 5) 让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}}public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1) 当 task 不为空,执行任务// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
//            while(task != null || (task = taskQueue.take()) != null) {while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行...{}", task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {log.debug("worker 被移除{}", this);workers.remove(this);}}}
}
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {// 1. 任务队列private Deque<T> queue = new ArrayDeque<>();// 2. 锁private ReentrantLock lock = new ReentrantLock();// 3. 生产者条件变量private Condition fullWaitSet = lock.newCondition();// 4. 消费者条件变量private Condition emptyWaitSet = lock.newCondition();// 5. 容量private int capcity;public BlockingQueue(int capcity) {this.capcity = capcity;}// 带超时阻塞获取public T poll(long timeout, TimeUnit unit) {lock.lock();try {// 将 timeout 统一转换为 纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {try {// 返回值是剩余时间if (nanos <= 0) {return null;}nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞获取public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞添加public void put(T task) {lock.lock();try {while (queue.size() == capcity) {try {log.debug("等待加入任务队列 {} ...", task);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();} finally {lock.unlock();}}// 带超时时间阻塞添加public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capcity) {try {if(nanos <= 0) {return false;}log.debug("等待加入任务队列 {} ...", task);nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();return true;} finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否满if(queue.size() == capcity) {rejectPolicy.reject(this, task);} else {  // 有空闲log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/265183.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

系列学习前端之第 3 章:一文精通 css

全套学习 HTMLCSSJavaScript 代码和笔记请下载网盘的资料&#xff1a; 链接: 百度网盘 请输入提取码 提取码: 6666 一、CSS基础 1. CSS简介 CSS 的全称为&#xff1a;层叠样式表 ( Cascading Style Sheets ) 。 CSS 也是一种标记语言&#xff0c;用于给 HTML 结构设…

【微软技术栈】发布自己造的轮子 -- 创建Nuget包(分布操作)

目录 1、您的项目 2、创建 .nuspec 文件 3、一张图片胜过一千个拉取请求 4、包括自述文件 MD 文件 5、构建软件包 6、将包部署到 Nuget.Org 7、手动上传软件包 8、自动化和脚本化部署 9、我们如何构建和部署 ErrLog.IO Nuget 包 10、Nuget统计数据 11、最后的思考 创建 Nuget 包…

关于ASJ 系列剩余电流动作继电器的详细介绍-安科瑞 蒋静

1 概述 ASJ 系列剩余电流动作继电器可与低压断路器或低压接触器等组成组合式剩余电流保护装置&#xff0c;主要适用 于交流 50Hz &#xff0c;额定电压 400V 及以下的 TT 和 TN 系统配电线路&#xff0c;用来对电气线路进行接地故障保护&#xff0c;防止 接地故障电…

安全访问服务边缘(SASE):解决第三方风险的全方位解决方案

随着数字化时代的到来&#xff0c;企业和组织对于网络安全的需求越来越迫切。传统的安全解决方案已经无法满足复杂多变的网络环境&#xff0c;因此新兴的安全访问服务边缘&#xff08;SASE&#xff09;应运而生。本文将介绍SASE的概念和工作原理&#xff0c;并重点阐述它作为第…

深入学习Redis:从入门到实战

Redis快速入门 1.初识Redis1.1.认识NoSQL1.1.1.结构化与非结构化1.1.2.关联和非关联1.1.3.查询方式1.1.4.事务1.1.5.总结 1.2.认识Redis1.3.安装Redis1.3.1.依赖库1.3.2.上传安装包并解压1.3.3.启动1.3.4.默认启动1.3.5.指定配置启动1.3.6.开机自启 1.4.Redis桌面客户端1.4.1.R…

awt中文乱码-Intellij IDEA

乱码的根本原因在于秦始皇嘎太早了&#xff08;bushi 解决方法&#xff1a;肉眼可见的编码设置统一为GBK 1.打开设置找到文件编码 2.肉眼可见的编码统统改成GBK 有人该问了&#xff0c;为什么不改成utf-8&#xff0c;因为awt的编码由操作系统决定&#xff0c;我的是win家庭中…

队列的模拟实现

队列的模拟实现 文章目录 队列的模拟实现前言一、队列的基本原理1&#xff09;队列的定义2&#xff09;队列的特性3&#xff09;队列的应用场景 二、模拟实现STL中队列的功能1&#xff09;设计数据结构2&#xff09;初始化队列&#xff08;QueueInit&#xff09;3&#xff09;入…

MySQL 中Relay Log打满磁盘问题的排查方案

MySQL 中Relay Log打满磁盘问题的排查方案 引言&#xff1a; MySQL Relay Log&#xff08;中继日志&#xff09;是MySQL复制过程中的一个重要组件&#xff0c;它用于将主数据库的二进制日志事件传递给从数据库。然而&#xff0c;当中继日志不断增长并最终占满磁盘空间时&…

Flutter:web项目跨域问题解决

前后端解决系列 文章目录 一、Flutter web客户端解决本地环境调试跨域问题二、Flutter web客户端解决线上环境跨域问题 一、Flutter web客户端解决本地环境调试跨域问题 就一句命令【--web-browser-flag "--disable-web-security"】&#xff0c;用来屏蔽浏览器域名请…

简单实现Spring容器(五) 实现bean后置处理器BeanPostProcessor机制

阶段5: // 1.编写自己的Spring容器,实现扫描包,得到bean的class对象. // 2.扫描将 bean 信息封装到 BeanDefinition对象,并放入到Map. // 3.初始化单例池并完成getBean() createBean()方法 // 4.完成依赖注入(如果创建某个Bean对象,存在依赖注入,需要进行bean组装操作) 5.bean…

城市基础设施智慧路灯改造的特点

智慧城市建设稳步有序推进。作为智慧城市的基础设施&#xff0c;智能照明是智慧城市的重要组成部分&#xff0c;而叁仟智慧路灯是智慧城市理念下的新产品。随着物联网和智能控制技术的飞速发展&#xff0c;路灯被赋予了新的任务和角色。除了使道路照明智能化和节能化外&#xf…

机器学习 类别特征编码:Category Encoders 库的使用

✅作者简介&#xff1a;人工智能专业本科在读&#xff0c;喜欢计算机与编程&#xff0c;写博客记录自己的学习历程。 &#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&…