Java开发大厂面试第04讲:深入理解ThreadPoolExecutor,参数含义与源码执行流程全解

线程池是为了避免线程频繁的创建和销毁带来的性能消耗,而建立的一种池化技术,它是把已创建的线程放入“池”中,当有任务来临时就可以重用已有的线程,无需等待创建的过程,这样就可以有效提高程序的响应速度。但如果要说线程池的话一定离不开 ThreadPoolExecutor ,在阿里巴巴的《Java 开发手册》中是这样规定线程池的:

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的读者更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

  • CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

  • `ThreadPoolExecutor` 是 Java 中用于创建和管理线程池的类。它允许你控制线程池的大小、工作队列的大小和线程的创建方式等。下面是 `ThreadPoolExecutor` 构造函数的参数含义:
  1. corePoolSize(核心线程数):

    • 线程池中保持活动状态的最小线程数。
    • 即使在没有任务需要执行时,线程池也会保持 corePoolSize 数量的线程。
    • 如果当前线程数少于 corePoolSize,则创建新线程来处理新提交的任务。
  2. maximumPoolSize(最大线程数):

    • 线程池中允许的最大线程数。
    • 当工作队列已满,并且当前线程数小于 maximumPoolSize 时,线程池会创建新的线程来处理任务。
    • 注意,这个值不能超过 Integer.MAX_VALUE
  3. keepAliveTime(空闲线程存活时间):

    • 当线程数大于 corePoolSize 时,这是多余的空闲线程在终止前等待新任务的最长时间。
    • 这个时间只对非核心线程有效。
    • 如果在 keepAliveTime 时间段内没有新的任务提交,那么非核心线程将被终止。
  4. unit(时间单位):

    • 用于 keepAliveTime 参数的时间单位,如 TimeUnit.SECONDSTimeUnit.MILLISECONDS 等。
  5. workQueue(工作队列):

    • 用于保存待执行的任务的队列。
    • 它是一个 BlockingQueue,用于存放待执行的任务。
    • 当核心线程都在忙碌时,新任务将被放入此队列中等待执行。
  6. threadFactory(线程工厂):

    • 用于创建新线程的工厂。
    • 你可以通过自定义的 ThreadFactory 来设置新线程的创建方式,如设置线程名称、优先级等。
  7. handler(拒绝策略):

    • 当工作队列已满,并且线程数已达到 maximumPoolSize 时,如果再有新的任务提交,线程池将使用此拒绝策略来处理该任务。
    • Java 提供了几种预定义的拒绝策略,如 AbortPolicy(直接抛出异常)、CallerRunsPolicy(调用者运行任务)、DiscardPolicy(丢弃任务)和 DiscardOldestPolicy(丢弃队列中最旧的任务)。

关于 ThreadPoolExecutor 的源码执行流程,这涉及到具体的实现细节和并发控制,因此在这里只能给出一个大致的概述:

  • 当提交新任务到 ThreadPoolExecutor 时,它首先会尝试使用核心线程来执行该任务。
  • 如果核心线程都在忙碌,并且工作队列未满,那么任务将被放入工作队列中等待执行。
  • 如果工作队列已满,并且当前线程数小于 maximumPoolSize,那么线程池将创建新的线程来执行任务。
  • 如果工作队列已满,并且线程数已达到 maximumPoolSize,那么线程池将使用拒绝策略来处理该任务。

在整个过程中,ThreadPoolExecutor 使用了内部锁和条件变量来实现对线程和任务的并发控制。它还提供了丰富的 API 来监控和管理线程池的状态,如获取线程池大小、当前活跃线程数、已完成任务数等。

其实当我们去看 Executors 的源码会发现,Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor() 和 Executors.newCachedThreadPool() 等方法的底层都是通过 ThreadPoolExecutor 实现的,所以本课时我们就重点来了解一下 ThreadPoolExecutor 的相关知识,比如它有哪些核心的参数?它是如何工作的?

典型回答

ThreadPoolExecutor 的核心参数指的是它在构建时需要传递的参数,其构造方法如下所示:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||// maximumPoolSize 必须大于 0,且必须大于 corePoolSizemaximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

第 1 个参数:corePoolSize 表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。

第 2 个参数:maximumPoolSize 表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。

第 3 个参数:keepAliveTime 表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。

第 4 个参数:unit 表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。

第 5 个参数:workQueue 表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。

第 6 个参数:threadFactory 表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程,源代码如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {// Executors.defaultThreadFactory() 为默认的线程创建工厂this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public static ThreadFactory defaultThreadFactory() {return new DefaultThreadFactory();
}
// 默认的线程创建工厂,需要实现 ThreadFactory 接口
static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = “pool-” +
                      poolNumber.getAndIncrement() +
                     “-thread-”;
    }
    // 创建线程
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon()) 
            t.setDaemon(false); // 创建一个非守护线程
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY); // 线程优先级设置为默认值
        return t;
    }
}

我们也可以自定义一个线程工厂,通过实现 ThreadFactory 接口来完成,这样就可以自定义线程的名称或线程执行的优先级了。

第 7 个参数:RejectedExecutionHandler 表示指定线程池的拒绝策略,当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执行此任务时,就会用到此拒绝策略,它属于一种限流保护的机制。

线程池的工作流程要从它的执行方法 execute() 说起,源码如下:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 当前工作的线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {// 创建新的线程执行此任务if (addWorker(command, true))return;c = ctl.get();}// 检查线程池是否处于运行状态,如果是则把任务添加到队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次检查线程池是否处于运行状态,防止在第一次校验通过后线程池关闭// 如果是非运行状态,则将刚加入队列的任务移除if (! isRunning(recheck) && remove(command))reject(command);// 如果线程池的线程数为 0 时(当 corePoolSize 设置为 0 时会发生)else if (workerCountOf(recheck) == 0)addWorker(nullfalse); // 新建线程执行任务}// 核心线程都在忙且队列都已爆满,尝试新启动一个线程执行失败else if (!addWorker(command, false)) // 执行拒绝策略reject(command);
}

其中 addWorker(Runnable firstTask, boolean core) 方法的参数说明如下:

  • firstTask,线程应首先运行的任务,如果没有则可以设置为 null;

  • core,判断是否可以创建线程的阀值(最大值),如果等于 true 则表示使用 corePoolSize 作为阀值,false 则表示使用 maximumPoolSize 作为阀值。

考点分析

本课时的这道面试题考察的是你对于线程池和 ThreadPoolExecutor 的掌握程度,也属于 Java 的基础知识,几乎所有的面试都会被问到,其中线程池任务执行的主要流程,可以参考以下流程图:

与 ThreadPoolExecutor 相关的面试题还有以下几个:

  • ThreadPoolExecutor 的执行方法有几种?它们有什么区别?

  • 什么是线程的拒绝策略?

  • 拒绝策略的分类有哪些?

  • 如何自定义拒绝策略?

  • ThreadPoolExecutor 能不能实现扩展?如何实现扩展?

知识扩展

execute() VS submit()

execute() 和 submit() 都是用来执行线程池任务的,它们最主要的区别是,submit() 方法可以接收线程池执行的返回值,而 execute() 不能接收返回值。

来看两个方法的具体使用:

ThreadPoolExecutor executor = new ThreadPoolExecutor(21010L,TimeUnit.SECONDS, new LinkedBlockingQueue(20));
// execute 使用
executor.execute(new Runnable() {@Overridepublic void run() {System.out.println("Hello, execute.");}
});
// submit 使用
Future<String> future = executor.submit(new Callable<String>() {@Overridepublic String call() throws Exception {System.out.println("Hello, submit.");return "Success";}
});
System.out.println(future.get());

以上程序执行结果如下:

Hello, submit.
Hello, execute.
Success

从以上结果可以看出 submit() 方法可以配合 Futrue 来接收线程执行的返回值。它们的另一个区别是 execute() 方法属于 Executor 接口的方法,而 submit() 方法则是属于 ExecutorService 接口的方法,它们的继承关系如下图所示:

线程池的拒绝策略

当线程池中的任务队列已经被存满,再有任务添加时会先判断当前线程池中的线程数是否大于等于线程池的最大值,如果是,则会触发线程池的拒绝策略。

Java 自带的拒绝策略有 4 种:

  • AbortPolicy,终止策略,线程池会抛出异常并终止执行,它是默认的拒绝策略;

  • CallerRunsPolicy,把任务交给当前线程来执行;

  • DiscardPolicy,忽略此任务(最新的任务);

  • DiscardOldestPolicy,忽略最早的任务(最先加入队列的任务)。

例如,我们来演示一个 AbortPolicy 的拒绝策略,代码如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1310,TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.AbortPolicy()); // 添加 AbortPolicy 拒绝策略
for (int i = 0; i < 6; i++) {executor.execute(() -> {System.out.println(Thread.currentThread().getName());});
}

以上程序的执行结果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.lagou.interview.ThreadPoolExample$$Lambda$1/1096979270@448139f0 rejected from java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at com.lagou.interview.ThreadPoolExample.rejected(ThreadPoolExample.java:35)at com.lagou.interview.ThreadPoolExample.main(ThreadPoolExample.java:26)

可以看出当第 6 个任务来的时候,线程池则执行了 AbortPolicy  拒绝策略,抛出了异常。因为队列最多存储 2 个任务,最大可以创建 3 个线程来执行任务(2+3=5),所以当第 6 个任务来的时候,此线程池就“忙”不过来了。

自定义拒绝策略

自定义拒绝策略只需要新建一个 RejectedExecutionHandler 对象,然后重写它的 rejectedExecution() 方法即可,如下代码所示:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1310,TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),new RejectedExecutionHandler() {  // 添加自定义拒绝策略@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 业务处理方法System.out.println("执行自定义拒绝策略");}});
for (int i = 0; i < 6; i++) {executor.execute(() -> {System.out.println(Thread.currentThread().getName());});
}

以上代码执行的结果如下:

执行自定义拒绝策略
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2

可以看出线程池执行了自定义的拒绝策略,我们可以在 rejectedExecution 中添加自己业务处理的代码。

ThreadPoolExecutor 扩展

ThreadPoolExecutor 的扩展主要是通过重写它的 beforeExecute() 和 afterExecute() 方法实现的,我们可以在扩展方法中添加日志或者实现数据统计,比如统计线程的执行时间,如下代码所示:

public class ThreadPoolExtend {public static void main(String[] args) throws ExecutionException, InterruptedException {// 线程池扩展调用MyThreadPoolExecutor executor = new MyThreadPoolExecutor(2410,TimeUnit.SECONDS, new LinkedBlockingQueue());for (int i = 0; i < 3; i++) {executor.execute(() -> {Thread.currentThread().getName();});}}/*** 线程池扩展*/static class MyThreadPoolExecutor extends ThreadPoolExecutor {// 保存线程执行开始时间private final ThreadLocal<Long> localTime = new ThreadLocal<>();public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}/*** 开始执行之前* @param t 线程* @param r 任务*/@Overrideprotected void beforeExecute(Thread t, Runnable r) {Long sTime = System.nanoTime(); // 开始时间 (单位:纳秒)localTime.set(sTime);System.out.println(String.format("%s | before | time=%s",t.getName(), sTime));super.beforeExecute(t, r);}/*** 执行完成之后* @param r 任务* @param t 抛出的异常*/@Overrideprotected void afterExecute(Runnable r, Throwable t) {Long eTime = System.nanoTime(); // 结束时间 (单位:纳秒)Long totalTime = eTime - localTime.get(); // 执行总时间System.out.println(String.format("%s | after | time=%s | 耗时:%s 毫秒",Thread.currentThread().getName(), eTime, (totalTime / 1000000.0)));super.afterExecute(r, t);}}
}

以上程序的执行结果如下所示:

pool-1-thread-1 | before | time=4570298843700
pool-1-thread-2 | before | time=4570298840000
pool-1-thread-1 | after | time=4570327059500 | 耗时:28.2158 毫秒
pool-1-thread-2 | after | time=4570327138100 | 耗时:28.2981 毫秒
pool-1-thread-1 | before | time=4570328467800
pool-1-thread-1 | after | time=4570328636800 | 耗时:0.169 毫秒

最后

最后我们总结一下:线程池的使用必须要通过 ThreadPoolExecutor 的方式来创建,这样才可以更加明确线程池的运行规则,规避资源耗尽的风险。同时,也介绍了 ThreadPoolExecutor 的七大核心参数,包括核心线程数和最大线程数之间的区别,当线程池的任务队列没有可用空间且线程池的线程数量已经达到了最大线程数时,则会执行拒绝策略,Java 自动的拒绝策略有 4 种,用户也可以通过重写 rejectedExecution() 来自定义拒绝策略,我们还可以通过重写 beforeExecute() 和 afterExecute() 来实现 ThreadPoolExecutor 的扩展功能。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/703830.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LLama3大模型本地部署 仅需6步完成对话模型本地安装部署。附送可视化ui安装、自定义模型目录,修改模型保存地址,第三方微调模型、中文模型下载地址

本篇分为三部分 一&#xff1a;6步完成llama3大模型本地部署 二&#xff1a;8步完成llama3可视化对话界面安装 三&#xff1a;重设模型文件路径 四&#xff1a;微调模型、中文模型下载资源分享 一、LLama3 大模型本地部署安装 首先去mata官网下载ollama客户端 Ollama 选择合适…

论:即时战略RTS游戏的小地图采用 自下而上的汇报式 还是 自上而下的查找式?

关键词&#xff1a;RTS 小地图 游戏设计 思路 卫星 位置映射 阵营 更新 汇报 询问 UE4 UE5 Unreal Engine 前言 你是否想过类似红色警戒的战略小地图的要素是采用何种方式更新数据的。大量数据实时更新&#xff0c;考虑频率&#xff0c;运行效率&#xff0c;开发中如何选型&a…

价格战开卷!字节发布豆包大模型,比行业便宜99.3%

豆包大模型正式亮相 5月15日&#xff0c;在2024春季火山引擎Force原动力大会上&#xff0c;字节跳动自研豆包大模型正式亮相。 &#xff08;图源&#xff1a;证券时报&#xff09; 火山引擎是字节跳动旗下云服务平台&#xff0c;据火山引擎总裁谭待介绍&#xff0c;豆包大模型…

民航电子数据库:select查询时部分字段缺失

目录 前言异常排查原因解决使用systemPath标签引入本地Jar包后无法打包 前言 1、对接民航电子数据库 2、框架为shardingsphere caedb mybatis 3、部分SQL查询时&#xff0c;会出现字段缺失的情况 4、查看日志打印出来的SQL&#xff0c;字段并未缺失 异常 这里省略SQL语句…

✅什么是时间轮?

一、问题解析 时间轮算法&#xff08;Time Wheel Algorithm&#xff09;是一种用于处理定时任务和调度的常见算法。 时间轮算法主要需要定义一个时间轮盘&#xff0c;在一个时间轮盘中划分出多个槽位&#xff0c;每个槽位表示一个时间段&#xff0c;这个段可以是秒级、分钟级、…

Invalid bound statement (not found) 六种解决方法

前五种参考博文&#xff1a; Invalid bound statement (not found) 五种解决方法-CSDN博客 第六种&#xff1a; 在启动类上加上MapperScan&#xff0c;指定扫描包

web前端框架设计第八课-表单控件绑定

web前端框架设计第八课-表单控件绑定 一.预习笔记 1.v-model实现表单数据双向绑定 2.搜索数据的实现 3.全选案例实现1—JQ方法 4.单选案例实现 5.数据级联&#xff08;二级级联&#xff09; 6.v-model中的修饰符 二.课堂笔记 三.课后回顾 –行动是治愈恐惧的良药&#xff0c…

DolphinScheduler(海豚调度)- docker部署实战

1.官方文档 https://dolphinscheduler.apache.org/zh-cn/docs/3.2.1/guide/start/docker 2.docker环境安装 版本情况&#xff08;这个地方踩了不少坑&#xff09;&#xff1a;docker-26.1.2&#xff0c;docker-compose-v2.11.0。 具体可使用我上传的安装包&#xff0c;一键安…

C++ | string详解

1、string是什么 string是STL文档的容器之一&#xff0c;是一个自定义类型&#xff0c;是一个类&#xff0c;由类模板basic_string实例化出来的一个类&#xff1b; 类模板basic_string实例化出来了四个类&#xff0c;如下图所示&#xff1b; 实例化出的这四个类不同的是他们的编…

2024最新互联网公司工作时长排行榜出炉!

“工作时长”&#xff0c;是选择公司的一个非常重要的参考指标。 我们在选择一个公司的时候&#xff0c;除了需要关注总收入package 以外&#xff0c;还需要考虑这家公司的加班时长是否人性化。 我们的工作时长是周工作小时数。法定工作时间是40小时(955)。大小周通常折算为周…