让ThreadPoolExecutor无所遁形:Java线程池运行原理详解

ThreadPoolExecutor的核心工作原理

当我们在Java中讨论并发和多线程时,ThreadPoolExecutor 是不可或缺的一个类。在 java.util.concurrent 包下,该类负责管理线程池内的线程,包括线程的创建、执行、管理以及线程池的监控等。理解 ThreadPoolExecutor 如何保证线程池正确运作是非常关键的。本章将带您深入源码,解析 ThreadPoolExecutor 的核心工作原理。

线程池状态和控制流程概述

file
首先,让我们来了解一下 ThreadPoolExecutor 是如何通过内部的状态控制来管理线程池的。ThreadPoolExecutor 类维护着一个 ctl 的原子整型变量,该变量高位存储线程池状态,低位存储线程数量。

ctl的位字段表示方法

在 ThreadPoolExecutor 源码中,您会看到如下定义:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 状态在高位
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

此代码段定义了 ThreadPoolExecutor 状态。它用位操作确保状态存储在高位,而工作线程数量存储在低位。线程池的状态反映了线程池的生命周期,比如运行中、关闭中等。

状态转换及控制

线程池状态通过 ctl 变量控制,并提供了几个辅助函数来管理这些状态:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

这些函数帮助我们获取当前的状态以及当前正在运行的线程数量,还通过 compareAndSet 方法保证状态更新操作的原子性。
综上所述,ThreadPoolExecutor 类的 ctl 相关属性是线程池正确运行的关键部分,它不仅标示了线程池的状态,还协调着线程的工作流程。

ThreadPoolExecutor中的属性详解

在深度了解 ThreadPoolExecutor 之前,我们需要了解一些它的键属性。这些属性共同决定了线程池的行为和性能。

核心线程与最大线程数

corePoolSize 和 maximumPoolSize 这两个参数分别代表着线程池中核心线程数量和允许的最大线程数量。核心线程会一直存活,即使它们处于空闲状态,而非核心线程如果空闲时间超过了 keepAliveTime 就会被终止。

private volatile int corePoolSize;
private volatile int maximumPoolSize;

任务队列与SynchronousQueue特性

任务队列是存放提交未执行任务的缓冲队列。ThreadPoolExecutor 使用一个 BlockingQueue 来存放任务。

private final BlockingQueue<Runnable> workQueue;

其中 SynchronousQueue 是一种无存储空间的阻塞队列,每个 put 必须等待一个 take,反之亦然。

keepAliveTime与TimeUnit的作用

当线程数量超过核心线程数时,多余的空闲线程将在等待新任务的时候,最多等待 keepAliveTime 所指定的时间长度,超时后将会被终止。

private volatile long keepAliveTime;

这里的 TimeUnit 是一个枚举,用来指定 keepAliveTime 的时间单位。

private final TimeUnit unit;

ThreadFactory的自定义线程创建

用户可以通过实现 ThreadFactory 接口来自定义线程创建方式,比如设置线程名、分组、优先级等。

private volatile ThreadFactory threadFactory;

Handler的拒绝策略

当任务太多,无法及时处理时,我们必须提供一种策略来处理这些额外的任务。ThreadPoolExecutor 支持几种拒绝策略。

private volatile RejectedExecutionHandler handler;

ThreadPoolExecutor类中的重要操作方法解析

ThreadPoolExecutor 中定义了几种核心的方法,用于执行任务、管理线程池的生命周期等。在本章中,我们将通过源码来分析这些关键方法是如何实现的。

execute方法的任务提交流程

execute 方法是线程池的核心之一,用于提交任务供线程池执行。这个方法主要涉及到任务的接收和线程的调度处理。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}

在这段代码中,首先检查传入的任务对象是否为 null,然后根据当前线程数量和核心线程数比较,以便决定是否需要添加新的工作线程。如果当前运行状态是 RUNNING 并且任务可以成功添加到队列中,那么会再次检查线程池的状态并尝试添加一个非核心线程。如果这些步骤都失败了,那么将会调用拒绝策略来处理这个任务。

submit方法与FutureTask的协同

除了 execute 方法,ThreadPoolExecutor 同时提供了 submit 方法,这个方法允许提交带返回值的任务。其内部实际上是转化为 FutureTask 来处理。

public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}

submit 方法首先将 Callable 任务转化为 FutureTask,再调用 execute 方法进行执行。返回的 Future 对象可以用来获取异步执行结果。

shutdown和shutdownNow方法的区别

shutdown 方法用于平缓的关闭线程池,它会等待正在执行的任务执行完毕,但不接收新任务。

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}
}

而 shutdownNow 方法则更加激进,它试图停止所有正在执行的任务,并返回等待执行的任务列表。

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}return tasks;
}

ThreadPoolExecutor类中的重要内部类

ThreadPoolExecutor 的功能实现,除了依赖于它的核心属性和方法之外,还依赖于一些关键的内部类。这些内部类扮演着线程池运作中不可或缺的角色。

Worker类的作用和生命周期

ThreadPoolExecutor 使用 Worker 类来封装线程池中的每个工作线程。Worker 类是 ThreadPoolExecutor 的一个私有内部类,并且实现了 Runnable 接口。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{final Thread thread;Runnable firstTask;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}// 省略其他方法
}

这段代码展示了 Worker 类的简化结构。Worker 类首先使用给定的 ThreadFactory 来创建新的线程,并在创建时可以选择传入一个首个任务。这个类还利用了AQS来控制线程的状态。每个工作线程的生命周期开始于它对 firstTask 的执行,然后它会持续从任务队列获取并执行任务,直到线程池终止或线程无任务可执行而被终止。

内部阻塞队列的实现与特性

线程池中使用阻塞队列来暂存待执行的任务。这是一种线程安全的队列实现,能够在高并发场景下正确地管理任务。

private final BlockingQueue<Runnable> workQueue;

ThreadPoolExecutor 支持不同类型的阻塞队列,如 ArrayBlockingQueue、LinkedBlockingQueue 和 PriorityBlockingQueue。每种队列都有其特殊的使用场景和性能特点。

LinkedBlockingQueue与ArrayBlockingQueue

LinkedBlockingQueue 和 ArrayBlockingQueue 是两种常见的阻塞队列实现。LinkedBlockingQueue 基于链表结构,理论上具有更高的处理效率,而 ArrayBlockingQueue 基于数组实现,固定容量并且在预分配内存方面性能更优。

DelayedWorkQueue的定时任务处理

如果 ThreadPoolExecutor 配置为 ScheduledThreadPoolExecutor,则使用 DelayedWorkQueue 来处理定时任务。这个队列能够让线程延时或周期性地执行任务。

ThreadPoolExecutor的扩展与自定义

ThreadPoolExecutor 提供了几个可以覆写的钩子方法,使得我们可以根据业务需求来扩展和自定义线程池的行为。同时,我们可通过自定义拒绝策略来处理无法立即执行的任务。本章将介绍如何利用这些功能来增强 ThreadPoolExecutor 的功能。

beforeExecute、afterExecute和terminated方法的扩展

ThreadPoolExecutor 允许我们在任务执行前后以及线程池终止时执行一些自定义逻辑。这是通过覆写 beforeExecute、afterExecute 和 terminated 方法来实现的。

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

这些方法默认是空实现,我们可以根据需要来重写这些方法,比如记录日志、计算任务执行时间、收集线程池统计信息等。

自定义拒绝策略与RejectedExecutionHandler的应用

我们可以实现 RejectedExecutionHandler 接口来创建自己的拒绝策略。例如,我们可以写一个记录日志并且尝试重新提交任务的拒绝策略。

public class LogAndRetryRejectedExecutionHandler implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {System.out.println("Task rejected, retrying...");// Retry submission of the taskexecutor.execute(r);} else {System.err.println("Task rejected and cannot retry, executor has shut down.");}}
}

将上面的自定义拒绝策略应用到 ThreadPoolExecutor 中:

ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,workQueue, threadFactory,new LogAndRetryRejectedExecutionHandler()
);

实战案例:定制一个监控线程池运行状态的ThreadPoolExecutor

现在让我们来看一个实战案例,其中我们将创建一个监控线程池的 ThreadPoolExecutor 并实施自定义扩展。

public class MonitoringThreadPoolExecutor extends ThreadPoolExecutor {private final ConcurrentHashMap<String, Long> timing = new ConcurrentHashMap<>();// 构造方法和其他必要的方法@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);timing.put(String.valueOf(r.hashCode()), System.nanoTime());}@Overrideprotected void afterExecute(Runnable r, Throwable t) {Long startTime = timing.remove(String.valueOf(r.hashCode()));long taskDuration = System.nanoTime() - startTime;// Record or log the task duration}@Overrideprotected void terminated() {super.terminated();// Do some final logging or resource release}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/652121.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

汽车底盘域的学习笔记

前言&#xff1a;底盘域分为传统车型底盘域和新能源车型底盘域&#xff08;新能源系统又可以分为纯电和混动车型&#xff0c;有时间可以再研究一下&#xff09; 1&#xff1a;传统车型底盘域 细分的话可以分为四个子系统 传动系统 行驶系统 转向系统 制动系统 1.1传动系…

STM32H7 HSE时钟的使用方法介绍

目录 概述 1 STM32H750 HSE时钟介绍 2 使用STM32Cube创建Project 3 认识HSE时钟 3.1 HSE时钟的特性 3.2 HSE的典型应用电路 4 STM32Cube中配置时钟 4.1 时钟需求 4.2 配置参数 4.2.1 使能外围资源 4.2.2 使用STM32Cube注意项 4.2.3 配置参数 5 总结 概述 本文主要…

正点原子[第二期]Linux之ARM(MX6U)裸机篇学习笔记-6.3

前言&#xff1a; 本文是根据哔哩哔哩网站上“正点原子[第二期]Linux之ARM&#xff08;MX6U&#xff09;裸机篇”视频的学习笔记&#xff0c;在这里会记录下正点原子 I.MX6ULL 开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了正点原子教学视频和链接中的内容。…

便携式应急指挥箱规格参数

概况: 微缩型的无线视频音频传输的机动挥所。体积小、重量轻、公配电方便、携带便携、功能齐全。可进行单兵作战&#xff0c;通过此无线音频视频传输的指挥箱能完成现场图像、语音、数据的采集等功能&#xff0c;可以通过5G/4G/WIFI等多种无线网络完成传输的需求&#xff0c;或…

【深度学习】YOLOv5,金属表面的缺陷检测,GC10-DET数据集

目录&#xff1a; 文章目录 数据集数据集转换下载yolov5创建 dataset.yaml训练参数开始训练数据分布训练结果问询、帮助 数据集 数据集地址&#xff1a; https://github.com/lvxiaoming2019/GC10-DET-Metallic-Surface-Defect-Datasets 数据集下载方式&#xff1a; Downlo…

openWebUI+ollamawindows+不用docker+webLite本地安装

openWebUI & ollama & windows & 不用docker & webLite 本地安装 总结一下安装教程 10核CPU16G内存 两个web框架都可以&#xff0c;先说简单的 ollama-webui-lite(https://github.com/ollama-webui/ollama-webui-lite) 轻量级&#xff0c;只使用nodejs 先装…

kaggle之皮肤癌数据的深度学习测试

kaggle之皮肤癌数据的深度学习测试 近期一直在肝深度学习 很久之前&#xff0c;曾经上手搞过一段时间的深度学习&#xff0c;似乎是做轮胎花纹的识别&#xff0c;当初用的是TensorFlow&#xff0c;CPU版本的&#xff0c;但已经很长时间都没弄过了 现在因为各种原因&#xff…

编程学习路线

Java最强学习路线 快来官网定制一套属于自己的学习路线吧 官方网址&#xff1a; Learn to become a modern Java developerCommunity driven, articles, resources, guides, interview questions, quizzes for java development. Learn to become a modern Java developer by…

MySQL中脏读与幻读

一般对于我们的业务系统去访问数据库而言&#xff0c;它往往是多个线程并发执行多个事务的&#xff0c;对于数据库而言&#xff0c;它会有多个事务同时执行&#xff0c;可能这多个事务还会同时更新和查询同一条数据&#xff0c;所以这里会有一些问题需要数据库来解决 我们来看…

Vscode上使用Clang,MSVC, MinGW, (Release, Debug)开发c++完全配置教程(包含常见错误),不断更新中.....

1.VSCode报错头文件找不到 clang(pp_file_not_found) 在Fallback Flags中添加 -I&#xff08;是-include的意思&#xff0c;链接你的编译器对应头文件地址&#xff0c;比如我下面的是MSVC的地址&#xff09; 问题得到解决~

springboot如何使用RedisTemplate

第一步&#xff1a;创建一个spring boot项目 第二步&#xff1a;pom导入redis相关依赖 <!--reids依赖--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </depen…

Office Word自动编号转文本

原理 使用office自带的宏功能&#xff0c;一键替换 过程 调出word的“开发工具”选项 文件->选项->自定义功能区->选中开发工具->确定 创建宏 开发工具->宏->创建宏 编写宏 在弹出来的框里&#xff0c;替换代码为 Sub num2txt() ActiveDocument.…