一文掌握线程池实现原理

线程池简介

Java在使用线程执行程序时,需要调用操作系统内核的API创建一个内核线程,操作系统要为线程分配一系列的资源;当该Java线程被终止时,对应的内核线程也会被回收。因此,频繁的创建和销毁线程需要消耗大量资源。此外,由于CPU核数有限,大量的线程上下文切换会增加系统的性能开销,无限制地创建线程还可能导致内存溢出。为此,Java在JDK1.5版本中引入了线程池。

线程池是一种重用线程的机制,用于提高线程的利用率和管理线程的生命周期,常用于多线程编程异步编程

线程池的优点:

  • 降低资源消耗:线程池中的线程可以重复使用,避免因频繁创建和销毁线程而带来的性能开销。
  • 提高响应速度:向线程池中提交任务时,无需创建线程即可执行任务处理。
  • 方便线程管理:线程池可以对其中的线程进行统一管理、监控,避免因大量创建线程而导致内存溢出。

线程池实现原理

Java线程池的核心实现类为ThreadPoolExecutor。

ThreadPoolExecutor依赖关系

ThreadPoolExecutor依赖关系图:

ThreadPoolExecutor依赖关系图

其中:

  • Executor(接口):该接口线程池处理任务的顶级接口,定义了一个用于执行任务的方法execute(Runnable command)。其中参数command为实现了Runnable或Callable接口的Task任务。
  • ExecutorService(接口):该接口继承了Executor接口,它扩展了Executor接口,并添加了一些管理线程池的方法(如:提交任务、关闭线程池等)。
  • AbstractExecutorService(抽象类):该类实现了ExecutorService接口。

构造函数

如ThreadPoolExecutor依赖关系图所示,ThreadPoolExecutor类提供了四个构造函数,其中原始的构造函数(另外三个构造函数由原始构造函数衍生而来):

线程池的构造函数:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

核心参数:

  • corePoolSize:线程池中的核心线程数,当向线程池中提交任务时,线程池创建新线程执行任务处理,直到当前线程数达到corePoolSize设定的值。此时提交的新任务会被添加到阻塞队列中,等待其他线程执行完成。默认情况下,空闲的核心线程并不会被回收(即:属性allowCoreThreadTimeOut的值默认为false),如果需要回收空闲的核心线程,设置属性allowCoreThreadTimeOut的值为true即可。

  • maximumPoolSize:线程池中的最大线程数,当线程池中的线程数达到corePoolSize设定的值且核心线程均被占用时,如果阻塞队列已满并向线程池中继续提交任务,则创建新的线程执行任务处理,直到当前线程数达到maximumPoolSize设定的值。

  • keepAliveTime:线程池中线程的存活时间,该参数只会在线程数大于corePoolSize设定的值时才生效,即:非核心线程的空闲时间超过keepAliveTime设定的值时会被回收。

  • unit:线程池中参数keepAliveTime的时间单位,默认为TimeUnit.MILLISECONDS(毫秒),其他时间单位:

    • TimeUnit.NANOSECONDS(纳秒)
    • TimeUnit.MICROSECONDS(微秒)
    • TimeUnit.MILLISECONDS(毫秒)
    • TimeUnit.SECONDS(秒)
    • TimeUnit.MINUTES(分钟)
    • TimeUnit.HOURS(小时)
    • TimeUnit.DAYS(天)
  • workQueue:线程池中保存任务的阻塞队列,当线程池中的线程数达到corePoolSize设定的值,继续提交任务时会将任务添加到阻塞队列中等待。默认为LinkedBlockingQueue,可选择的阻塞队列:

    • LinkedBlockingQueue(基于链表实现的阻塞队列)。
    • ArrayBlockingQueue(基于数组实现的阻塞队列)。
    • SynchronousQueue(只有一个元素的阻塞队列)。
    • PriorityBlockingQueue(实现了优先级的阻塞队列)。
    • DelayQueue(实现了延迟功能的阻塞队列)。
  • threadFactory:线程池中创建线程的工厂,可以通过自定义线程工厂的方式为线程设置一个便于识别的线程名,默认为DefaultThreadFactory。

  • handler:线程池的拒绝策略(又称饱和策略),当线程池中的线程数达到maximumPoolSize设定的值且阻塞队列已满时,继续向线程池中提交任务,就会触发拒绝策略,默认为AbortPolicy。可选的拒绝策略:

    • AbortPolicy:丢弃任务,并抛出RejectedExecutionException异常。
    • DiscardPolicy:丢弃任务,不抛出异常。
    • DiscardOldestPolicy:丢弃队列中最早的未处理的任务,执行当前任务。
    • CallerRunsPolicy:由调用者所在的线程来执行任务。

线程池生命周期

在ThreadPoolExecutor类中定义了线程池的五种状态。

源码如下:

// ctl共32位,其中高3位表示线程池运行状态,低29位表示线程池中的线程数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 二进制为0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
// RUNNING状态:二进制为111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN状态:二进制为000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP状态:二进制为001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// TIDYING状态:二进制为010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED状态:二进制011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 线程池中的线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

其中:

ctl是一个AtomicInteger类型的变量:

  • 高3位表示线程池的运行状态。
  • 低29位表示线程池中的线程数量。

线程池运行状态:

  • RUNNING:处于RUNNING状态的线程池会接受新任务提交,同时也会处理任务队列中的任务。
  • SHUTDOWN:处于SHUTDOWN状态的线程池不会接受新任务提交,但是会处理任务队列中的任务。
  • STOP:处于STOP状态的线程池不会接受新任务提交,也不会处理任务队列中的任务,并且会中断正在执行的任务。
  • TIDYING:当所有的任务已终止、workerCount数量为0时,线程池会进入TIDYING状态,进入TIDYING状态会执行钩子方法terminated()。
  • TERMINATED:执行完钩子方法terminated()后进入TERMINATED状态,此时线程池已终止。

线程池状态转换,如图所示:

线程池状态转换图

工作线程

Worker(工作线程)是ThreadPoolExecutor的一个内部类,它继承了
AbstractQueuedSynchronizer(即:AQS)并实现了Runnable接口。其中:

  • 通过继承AbstractQueuedSynchronizer类,可以控制在主线程中调用shutdown()方法时不会中断正在执行的工作线程。注意:调用shutdownNow()时会中断正在执行的工作线程。
  • 通过实现Runnable接口,可以在run()方法中调用ThreadPoolExecutor#runWorker方法执行任务处理。

提交任务

向线程池中提交任务的方式有两种:

  • 调用execute()方法。
  • 调用submit()方法。

execute与submit的区别:

  • 1)execute是Executor接口的方法,submit是ExecuteService接口的方法。
  • 2)execute的入参为Runnable,submit的入参可以为Runnable、Callable、Runnable和一个返回值。
  • 3)execute没有返回值,submit有返回值。
  • 4)异常处理:execute会直接抛出异常,submit会在获取结果时抛出异常,如果不获取结果,submit不抛出异常。

以execute()方法为例,ThreadPoolExecutor实现了Executor接口定义的execute(Runnable command) 方法,该方法的主要作用是将任务提交到线程池中的执行。

execute(Runnable command) 方法源码如下:

public void execute(Runnable command) {// 如果command为null,则抛出空指针异常if (command == null)throw new NullPointerException();// 获取ctl变量值,ctl低29位用来表示线程池中的线程数量。int c = ctl.get();// 如果线程池中的线程数小于设定的核心线程数,则将任务封装成Worker线程并调用其start()方法启动线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 如果线程池处于运行状态并向任务队列中添加任务成功,则执行如下逻辑(此处线程池中的线程数大于等于核心线程数)if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 继续判断线程池处于运行状态,如果线程池不是运行状态且从线程池中移除任务成功,则执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 如果线程池中线程数为0,则创建Worker线程并执行任务else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果创建Worker线程失败,则执行拒绝策略else if (!addWorker(command, false))reject(command);
}

ThreadPoolExecutore#xecute()方法执行逻辑,如图所示:

ThreadPoolExecutor#execute()方法执行逻辑

处理流程:

1)主线程通过execute()方法向线程池中提交任务:

  • 线程池中的线程数小于核心线程数,继续向线程池中提交任务时,创建线程执行任务处理。

  • 线程池中的线程数大于等于核心线程数:

    a) 如果阻塞队列未满,继续向线程池中提交任务时,将任务添加到阻塞队列中,等待处理。

    b) 如果阻塞队列已满且线程数小于最大线程数,继续向线程池中提交任务时,创建线程执行任务处理。

    c) 如果阻塞队列已满且线程数大于等于最大线程数,继续向线程池中提交任务时,执行拒绝策略。

执行任务

任务提交到线程池中后,会执行工作线程(Worker)的ThreadPoolExecutor.Worker#run方法执行任务处理,而run()方法中会调用ThreadPoolExecutor#runWorker方法。

ThreadPoolExecutor#runWorker方法中,通过自旋的方式从Worker工作线程或阻塞队列中获取任务进行处理:

  • 如果Worker工作线程中存在任务,则执行Worker工作线程中的任务。
  • 如果Worker工作线程中不存在任务,则通过getTask()方法从获取任务,并执行该任务。
  • 如果Worker工作线程和阻塞队列中没有可执行的任务,则退出并销毁Worker工作线程。

线程池工作流程

线程池工作流程,如图所示:

线程池工作流程

处理流程:

1)向线程池中提交任务,判断线程池中线程数是否小于核心线程数:

  • 线程数小于核心线程数,则创建线程执行任务处理。

  • 线程数大于等于核心线程数,则判断线程池中阻塞队列是否已满:

    a) 阻塞队列未满,则将任务添加到阻塞队列中,等待执行。

    b) 阻塞队列已满,则判断线程池中的线程数是否小于最大线程数:

    线程数小于最大线程数,则创建线程执行任务处理。

    线程数大于等于最大线程数,则执行拒绝策略。

线程池创建方式

线程池的创建方式一般分为两种:

  • 通过Executors类创建线程池。
  • 通过ThreadPoolExecutor类创建线程池。

通过Executors类创建线程池

Executors类是JDK提供的一个创建线程池的工具类,内部通过调用ThreadPoolExecutor构造函数来实现,通过该类提供的静态方法可以快速创建一些常用线程池。

newFixedThreadPool

创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。

静态方法:

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

其中:

  • 核心线程数为初始化时指定的线程数,核心线程数等于最大线程数。
  • 线程存活时间keepAliveTime为0,表示线程空闲时立刻被回收。
  • 阻塞队列为LinkedBlockingQueue(默认值)。

newCachedThreadPool

创建一个可缓存的线程池,该线程池的大小为Integer.MAX_VALUE,对于提交的新任务:

  • 如果存在空闲线程,则使用空闲线程来执行任务处理。
  • 如果不存在空闲线程,则新建一个线程来执行任务处理。
  • 如果线程池中的线程数超过处理任务需要的线程数,则空闲线程缓存一段时间(60s)后会被回收。

静态方法:

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

其中:

  • 核心线程数为0。
  • 最大线程数为Integer.MAX_VALUE。
  • 线程存活时间keepAliveTime为60s。
  • 阻塞队列为SynchronousQueue(同步队列)。

newSingleThreadExecutor

创建一个单线程化的线程池,它只有一个工作线程来执行任务,所有任务按照先进先出的顺序执行。

静态方法:

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

其中:

  • 核心线程数为1。
  • 最大线程数为1。
  • 线程存活时间keepAliveTime为0,表示线程空闲时立刻被回收。
  • 阻塞队列为LinkedBlockingQueue。

newScheduledThreadPool

创建一个计划线程池,支持定时或周期性的执行任务(如:延时任务)。

静态方法:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}

其中:

  • 核心线程数为初始化时指定的线程数。
  • 最大线程数为Integer.MAX_VALUE。
  • 线程存活时间keepAliveTime为0,表示线程空闲时立刻被回收。
  • 阻塞队列为DelayedWorkQueue(延时队列)。

其他线程池

Executors工具类除了能快速创建以上的常用线程池外,还可以创建很多其他线程池。如:

  • Executors.newSingleThreadScheduledExecutor:创建一个单线程化的计划线程池(newScheduledThreadPool的单线程版本)。
  • Executors.newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定),该线程池JDK1.8添加。

代码示例:

ExecutorsExample.java

@Slf4j
public class ExecutorsExample {/*** 创建一个固定大小的线程池*/private static ExecutorService executor = Executors.newFixedThreadPool(10);public static void main(String[] args) {for (int i = 0; i < 10000; i++) {// 在线程池中执行任务executor.execute(new Task());}}/*** 任务线程*/static class Task implements Runnable {@Overridepublic void run() {try {Thread.sleep(1000);log.info("执行线程:{}", Thread.currentThread().getName());} catch (InterruptedException e) {log.info("线程中断异常");}}}
}

不推荐使用Executors类创建线程池,主要原因:Executors类创建线程池默认使用LinkedBlockingQueue,LinkedBlockingQueue默认大小为Integer.MAX_VALUE(相当于无界队列),在高负载情况下很容易导致OOM。因此,强烈建议使用有界队列创建线程池。

通过ThreadPoolExecutor类创建线程池

通过ThreadPoolExecutor类手动创建线程池(推荐方式)。

代码示例:

ThreadPoolExecutorExample.java

@Slf4j
public class ThreadPoolExecutorExample {/*** 定义线程池*/private static ExecutorService pool ;public static void main(String[] args) {// 创建线程池pool = new ThreadPoolExecutor(10, 20, 0,TimeUnit.SECONDS,new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 在线程池中执行任务for (int i = 0; i < 10000; i++) {pool.execute(new ThreadPoolExecutorExample.Task());}}/*** 任务线程*/static class Task implements Runnable {@Overridepublic void run() {try {Thread.sleep(1000);log.info("执行线程:{}", Thread.currentThread().getName());} catch (InterruptedException e) {log.info("线程中断异常");}}}
}

其他推荐方式:通过commons-lang3、com.google.guava等工具包创建线程池,创建示例自行查阅相关资料即可。

线程池大小设置

在并发编程领域,提升性能本质上就是提升硬件的利用率(即:提升CPU和I/O设备综合利用率)的问题。因此,线程池大小需要根据CPU密集型和I/O密集型场景进行设置:

  • 对于CPU密集型场景,线程池大小一般设置为:CPU核数 + 1。
  • 对于I/O密集型场景,线程池大小一般设置为:CPU核数 * [1 +(I/O耗时 / CPU耗时)]。

以上公式仅作为参考值,具体情况需要根据压测情况进行设置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/593218.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS(鸿蒙)——单击事件

2.4 实现ClickedListener接口并重写onClick方法 2.5 实现onClick方法中的具体逻辑&#xff0c;以此完成点击事件的相关业务操作 三、测试 3.1 登录远程模拟器 3.2 运行项目 四、精选好文 一、简介 1.1 什么是组件 组件就是文本、按钮、图片等元素的统称 1.2 什么是事件 …

机器学习——卷积的变种

机器学习——卷积的变种 卷积神经网络&#xff08;Convolutional Neural Networks, CNNs&#xff09;是深度学习领域中最重要的技术之一&#xff0c;它在图像处理、语音识别、自然语言处理等领域取得了巨大成功。在CNN中&#xff0c;卷积层是最核心的组成部分之一&#xff0c;…

可以写网易云的了!

你好&#xff0c;我是云桃桃。 一个希望帮助更多朋友快速入门 WEB 前端的程序媛。 1枚程序媛&#xff0c;大专生&#xff0c;2年时间从1800到月入过万&#xff0c;工作5年买房。 分享成长心得。 259篇原创内容-gzh 后台回复“前端工具”可获取开发工具&#xff0c;持续更新中…

问题解决:写CSDN博文时图片大小不适应,不清晰,没法排版

项目环境&#xff1a; Window10&#xff0c;Edge123.0.2420.65 问题描述&#xff1a; 当我在CSDN写博文的时候&#xff0c;会经常插入一些图片&#xff0c;但有时候我插入的图片太大了&#xff0c;影响了整体排版。 比如我加入了一张图片&#xff0c;就变成了下面这个样子&…

x86汇编写矩阵乘法问题(实现一个3×3矩阵乘法的汇编代码)

&#x1f3c6;本文收录于「Bug调优」专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收藏&&…

微信小程序python+uniapp高校图书馆图书借阅管理系统ljr9i

根据日常实际需要&#xff0c;一方面需要在系统中实现基础信息的管理&#xff0c;同时还需要结合实际情况的需要&#xff0c;提供图书信息管理功能&#xff0c;方便图书管理工作的展开&#xff0c;综合考虑&#xff0c;本套系统应该满足如下要求&#xff1a; 首先&#xff0c;在…

《数据结构学习笔记---第十篇》--- 堆堆排序(超详细图解)

目录 1.堆是什么? 2.问题引入&#xff1a;当我们插入一个新的元素时&#xff0c;那么他还是堆吗。 3.堆的元素插入 4.问题引入&#xff1a;当我们删除一个堆顶元素时&#xff0c;我们又该如何调整呢&#xff1f; 5.堆顶元素删除 6.如何建堆&#xff1f; 6.1向上调整建堆…

图解PyTorch中的torch.gather函数和 scatter 函数

前言 torch.gather在目前基于 transformer or query based 的目标检测中&#xff0c;在最后获取目标结果时&#xff0c;经常用到。 这里记录下用法&#xff0c;防止之后又忘了。 介绍 torch.gather 官方文档对torch.gather()的定义非常简洁 定义&#xff1a;从原tensor中获…

ctf_show笔记篇(web入门---SSRF)

ssrf简介 ssrf产生原理&#xff1a; 服务端存在网络请求功能/函数&#xff0c;例如&#xff1a;file_get_contens()这一类类似于curl这种函数传入的参数用户是可控的没有对用户输入做过滤导致的ssrf漏洞 ssrf利用: 用于探测内网服务以及端口探针存活主机以及开放服务探针是否存…

深入浅出 -- 系统架构之分布式架构

​​​​​​分布式架构&#xff1a; 根据业务功能对系统做拆分&#xff0c;每个业务功能模块作为独立项目开发&#xff0c;称为一个服务。 当垂直应用越来越多时&#xff0c;应用之间的交互不可避免&#xff0c;可将共用的基础服务或核心模块抽取出来作为独立服务&#xff0c…

ubuntu无法粘贴复制windows中的内容,分辨率无法自适应电脑自带系统

1、直接在命令行执行以下命令 sudo apt-get autoremove open-vm-tools //卸载已有的工具 sudo apt-get install open-vm-tools //安装工具open-vm-tools sudo apt-get install open-vm-tools-desktop //安装open-vm-tools-desktop 2、重启Ubuntu系统即可 3.如果上述…

mbti,ESTP型人格的心理问题分析

什么是ESTP型人格 ESTP分别代表外向&#xff0c;实感&#xff0c;理智&#xff0c;依赖&#xff0c;而ESTP型人格则是一种性格上十分激进&#xff0c;喜欢冒险&#xff0c;并且总是因为情绪起伏过大&#xff0c;而一下子做出应激行为的相对冒险的人格。具有ESTP型人格的人一般…