【Java并发知识总结 | 第四篇】Java线程池详讲

在这里插入图片描述

文章目录

  • 4.Java线程池详讲
    • 4.1线程池介绍
      • 4.1.1什么是线程池
      • 4.1.2线程池的作用
      • 4.1.3没有线程池的线程执行任务模型
    • 4.2JAVA线程池
      • 4.2.1Executor框架介绍
      • 4.2.2Executor框架的组成
        • (1)任务(Runnable/Callable)
        • (2)任务的执行(Executor)
        • (3)异步计算的结果(Future)
      • 4.2.3Executor框架的使用步骤
    • 4.3ThreadPoolExecutor类介绍(重要)
      • 4.3.1构造方法介绍
      • 4.3.2`ThreadPoolExecutor` 饱和策略定义
      • 4.3.3线程池创建的两种方式
        • (1)通过`ThreadPoolExecutor`构造函数来创建(推荐)
        • (2)通过 `Executor` 框架的工具类 `Executors` 来创建
      • 4.3.4线程池常用的阻塞队列总结
    • 4.4线程池原理分析(重要)
      • 4.4.1ThreadPoolExecutor
        • (1)创建任务
        • (2)创建线程池
        • (3)输出结果
      • 4.4.2线程池原理流程分析
    • 4.5几个常见的对比
      • 4.5.1Runnable VS Callable
      • 4.5.2 execure() VS submit()
      • 4.5.3 shutdown() VS shutdownNow()
      • 4.5.4 isTerminated() VS isShutdown()
    • 4.6几种常见的内置线程池
      • 4.6.1FixedThreadPool
        • (1)基本介绍
        • (2)执行任务过程介绍
        • (3)不使用FixedThreadPool的原因
      • 4.6.2SingleThreadExecutor
        • (1)基本介绍
        • (2)执行任务过程介绍
        • (3)不使用SingleThreadExecutor的原因
      • 4.6.3CachedThreadPool
        • (1)基本介绍
        • (2)执行任务过程介绍
        • (3)不适用CachedThreadPool的原因

4.Java线程池详讲

4.1线程池介绍

4.1.1什么是线程池

  1. 线程池是一种利用池化技术思想来实现的线程管理技术,主要是为了复用线程、便利地管理线程和任务、并将线程的创建和任务的执行解耦开来。
  2. 我们可以创建线程池来复用已经创建的线程降低频繁创建和销毁线程所带来的资源消耗
  3. 在JAVA中主要是使用ThreadPoolExecutor类来创建线程池,并且JDK中也提供了Executors工厂类来创建线程池(不推荐使用)。

4.1.2线程池的作用

  1. 降低资源消耗,复用已创建的线程来降低创建和销毁线程的消耗。
  2. 提高响应速度,任务到达时,可以不需要等待线程的创建立即执行。
  3. 提高线程的可管理性,使用线程池能够统一的分配、调优和监控。

4.1.3没有线程池的线程执行任务模型

在没有线程池之前创建线程执行任务的模型如下图所示:

image-20240313214833899

从上面可以看出之前显示的创建线程的一些缺点:

  1. 不受控制风险,对于每个创建的线程没有统一管理的地方,每个线程创建后我们不知道线程的去向。
  2. 每执行一个任务都需要创建新的线程来执行,创建线程对系统来说开销很高

4.2JAVA线程池

4.2.1Executor框架介绍

Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Threadstart 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题

this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发令人疑惑的错误。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

4.2.2Executor框架的组成

Executor 框架结构主要由三大部分组成:

(1)任务(Runnable/Callable)

执行任务需要实现的 Runnable 接口Callable接口Runnable 接口Callable 接口 实现类都可以被 ThreadPoolExecutorScheduledThreadPoolExecutor 执行。

(2)任务的执行(Executor)

如下图所示,包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutorScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。

Executor接口提供了将任务的执行线程的创建以及使用解耦开来的抽象

ExecutorService接口继承了Executor接口,在Executor的基础上,增加了一些关于管理线程池本身的一些方法,比如查看任务的状态stop/terminal线程池获取线程池的状态等等。

img

Java中线程池的核心实现类是ThreadPoolExecutor,可以通过该类地构造方法来构造一个线程池

(3)异步计算的结果(Future)

Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。

当我们把 Runnable接口Callable 接口 的实现类提交给 ThreadPoolExecutorScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

4.2.3Executor框架的使用步骤

Executor 框架的使用示意图

  1. 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象

  2. 把创建完成的实现 Runnable/Callable接口的对象直接交给 ExecutorService 执行:

    • ExecutorService.execute(Runnable command)
    • ExecutorService 执行(ExecutorService.submit(Runnable task)
    • ExecutorService.submit(Callable <T> task))。
  3. 如果执行 ExecutorService.submit(…)ExecutorService返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别

    submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。

  4. 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行

4.3ThreadPoolExecutor类介绍(重要)

线程池实现类 ThreadPoolExecutorExecutor 框架最核心的类。

4.3.1构造方法介绍

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生

    /*** 用给定的初始参数创建一个新的ThreadPoolExecutor。*/public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量int maximumPoolSize,//线程池的最大线程数long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间TimeUnit unit,//时间单位BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 任务队列未达到队列容量时,最大可以同时运行的线程数量
  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数
  • workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数 :

  • keepAliveTime:线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :executor 创建新线程的时候会用到。
  • handler :饱和策略。关于饱和策略下面单独介绍一下

线程池各个参数的关系

4.3.2ThreadPoolExecutor 饱和策略定义

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。

举个例子:

Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列

4.3.3线程池创建的两种方式

(1)通过ThreadPoolExecutor构造函数来创建(推荐)

通过构造方法实现

(2)通过 Executor 框架的工具类 Executors 来创建

我们可以创建多种类型的 ThreadPoolExecutor

  • FixedThreadPool:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。
    • 当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。
    • 若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor 该方法返回一个只有一个线程的线程池
    • 若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
  • ScheduledThreadPool:该返回一个用来在给定的延迟后运行任务或者定期执行任务的线程池。

对应 Executors 工具类中的方法如图所示:

img

Executors 返回线程池对象的弊端如下(后文会详细介绍到):

  • FixedThreadPoolSingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
  • ScheduledThreadPoolSingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
// 无界队列 LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}// 无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}// 同步队列 SynchronousQueue,没有容量,最大线程数是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}

4.3.4线程池常用的阻塞队列总结

新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。

  • 容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列):FixedThreadPoolSingleThreadExectorFixedThreadPool最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExector只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。
  • SynchronousQueue(同步队列):CachedThreadPoolSynchronousQueue 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。
  • DelayedWorkQueue(延迟阻塞队列):ScheduledThreadPoolSingleThreadScheduledExecutorDelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程

4.4线程池原理分析(重要)

写一个 ThreadPoolExecutor 的小 Demo

4.4.1ThreadPoolExecutor

(1)创建任务

首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口)

import java.util.Date;/*** 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。* @author shuang.kou*/
public class MyRunnable implements Runnable {private String command;public MyRunnable(String s) {this.command = s;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());processCommand();System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());}private void processCommand() {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic String toString() {return this.command;}
}
(2)创建线程池

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolExecutorDemo {private static final int CORE_POOL_SIZE = 5;private static final int MAX_POOL_SIZE = 10;private static final int QUEUE_CAPACITY = 100;private static final Long KEEP_ALIVE_TIME = 1L;public static void main(String[] args) {//使用阿里巴巴推荐的创建线程池的方式//通过ThreadPoolExecutor构造函数自定义参数创建ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,new ArrayBlockingQueue<>(QUEUE_CAPACITY),new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i < 10; i++) {//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)Runnable worker = new MyRunnable("" + i);//执行Runnableexecutor.execute(worker);}//终止线程池executor.shutdown();while (!executor.isTerminated()) {}System.out.println("Finished all threads");}
}
  • corePoolSize: 核心线程数为 5。
  • maximumPoolSize:最大线程数 10
  • keepAliveTime : 等待时间为 1L。
  • unit: 等待时间的单位为 TimeUnit.SECONDS。
  • workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  • handler:饱和策略为 CallerRunsPolicy
(3)输出结果
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
Finished all threads  // 任务全部执行完了才会跳出来,因为executor.isTerminated()判断为true了才会跳出while循环,当且仅当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

4.4.2线程池原理流程分析

我们通过前面的代码输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。

  • 分析execute()方法,使用executor.execute(worker)t提交一个任务到线程池中
   // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static int workerCountOf(int c) {return c & CAPACITY;}//任务队列private final BlockingQueue<Runnable> workQueue;public void execute(Runnable command) {// 如果任务为null,则抛出异常。if (command == null)throw new NullPointerException();// ctl 中保存的线程池当前的一些状态信息int c = ctl.get();//  下面会涉及到 3 步 操作// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里,表明创建新的线程失败。// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。if (!isRunning(recheck) && remove(command))reject(command);// 如果当前工作线程数量为0,新创建一个线程并执行。else if (workerCountOf(recheck) == 0)addWorker(null, false);}//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。// 传入 false 代表增加线程时判断当前线程数是否少于 maxPoolSize//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。else if (!addWorker(command, false))reject(command);}

这里简单分析一下整个流程(对整个逻辑进行了简化,方便理解):

  1. 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
  2. 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行
  3. 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
  4. 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,饱和策略会调用RejectedExecutionHandler.rejectedExecution()方法

图解线程池实现原理

execute 方法中,多次调用 addWorker 方法。addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。

    // 全局锁,并发操作必备private final ReentrantLock mainLock = new ReentrantLock();// 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合private int largestPoolSize;// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合private final HashSet<Worker> workers = new HashSet<>();//获取线程池状态private static int runStateOf(int c)     { return c & ~CAPACITY; }//判断线程池的状态是否为 Runningprivate static boolean isRunning(int c) {return c < SHUTDOWN;}/*** 添加新的工作线程到线程池* @param firstTask 要执行* @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小* @return 添加成功就返回true否则返回false*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {//这两句用来获取线程池的状态int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {//获取线程池中工作的线程的数量int wc = workerCountOf(c);// core参数为false的话表明队列也满了,线程池大小变为 maximumPoolSizeif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//原子操作将workcount的数量加1if (compareAndIncrementWorkerCount(c))break retry;// 如果线程的状态改变了就再次执行上述操作c = ctl.get();if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 标记工作线程是否启动成功boolean workerStarted = false;// 标记工作线程是否创建成功boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//获取线程池状态int rs = runStateOf(ctl.get());//rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中//(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker// firstTask == null证明只新建线程而不执行任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);//更新当前工作线程的最大容量int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;// 工作线程是否启动成功workerAdded = true;}} finally {// 释放锁mainLock.unlock();} 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例if (workerAdded) {t.start();/// 标记线程启动成功workerStarted = true;}}} finally {// 线程启动失败,需要从工作线程中移除对应的Workerif (! workerStarted)addWorkerFailed(w);}return workerStarted;}

我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。

4.5几个常见的对比

4.5.1Runnable VS Callable

  1. Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口会返回结果或抛出检查异常,但是 Callable 接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。
  2. 工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。(Executors.callable(Runnable task)Executors.callable(Runnable task, Object result))。
@FunctionalInterface
public interface Runnable {/*** 被线程执行,没有返回值也无法抛出异常*/public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {/*** 计算结果,或在无法这样做时抛出异常。* @return 计算得出的结果* @throws 如果无法计算结果,则抛出异常*/V call() throws Exception;
}

4.5.2 execure() VS submit()

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Futureget()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法的话,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException

举例1:使用 get()方法获取返回值

ExecutorService executorService = Executors.newFixedThreadPool(3);Future<String> submit = executorService.submit(() -> {try {Thread.sleep(5000L);} catch (InterruptedException e) {e.printStackTrace();}return "abc";
});String s = submit.get();
System.out.println(s);
executorService.shutdown();
  • 输出:
abc

示例 2:使用 get(long timeout,TimeUnit unit)方法获取返回值。

ExecutorService executorService = Executors.newFixedThreadPool(3);Future<String> submit = executorService.submit(() -> {try {Thread.sleep(5000L);} catch (InterruptedException e) {e.printStackTrace();}return "abc";
});String s = submit.get(3, TimeUnit.SECONDS);
System.out.println(s);
executorService.shutdown();
  • 返回值
Exception in thread "main" java.util.concurrent.TimeoutExceptionat java.util.concurrent.FutureTask.get(FutureTask.java:205)

4.5.3 shutdown() VS shutdownNow()

  • shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN线程池不再接受新任务了,但是队列里的任务得执行完毕
  • shutdownNow() :关闭线程池,线程池的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List

4.5.4 isTerminated() VS isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

4.6几种常见的内置线程池

4.6.1FixedThreadPool

(1)基本介绍

FixedThreadPool 被称为==可重用固定线程数==的线程池。通过 Executors 类中的相关源代码来看一下相关实现:

   /*** 创建一个可重用固定数量线程的线程池*/public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}
  • FixedThreadPool 的实现方法
    public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

上面源代码可以看出新创建的 FixedThreadPoolcorePoolSizemaximumPoolSize 都被设置为 nThreads这个 nThreads 参数是我们使用的时候自己传递的

即使 maximumPoolSize 的值比 corePoolSize 大,也至多只会创建 corePoolSize 个线程。这是因为FixedThreadPool 使用的是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列),队列永远不会被放满

(2)执行任务过程介绍

FixedThreadPoolexecute() 方法运行示意图(该图片来源:《Java 并发编程的艺术》):

FixedThreadPool的execute()方法运行示意图

过程步骤:

  1. 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
  2. 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入 LinkedBlockingQueue
  3. 线程池中的线程执行完 手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行
(3)不使用FixedThreadPool的原因

FixedThreadPool 使用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
  2. 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPoolcorePoolSizemaximumPoolSize 被设置为同一个值。
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
  4. 运行中的 FixedThreadPool(未执行 shutdown()shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出

4.6.2SingleThreadExecutor

(1)基本介绍

SingleThreadExecutor 是只有一个线程的线程池。下面看看SingleThreadExecutor 的实现:

   /***返回只有一个线程的线程池*/public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}
   public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}

从上面源代码可以看出新创建的 SingleThreadExecutorcorePoolSizemaximumPoolSize 都被设置为 1,其他参数和 FixedThreadPool 相同。

(2)执行任务过程介绍

SingleThreadExecutor 的运行示意图(该图片来源:《Java 并发编程的艺术》):

SingleThreadExecutor的运行示意图

上图说明

  1. 如果当前运行的线程数少于 corePoolSize,则创建一个新的线程执行任务;
  2. 当前线程池中有一个运行的线程后,将任务加入 LinkedBlockingQueue
  3. 线程执行完当前的任务后,会在循环中反复从LinkedBlockingQueue 中获取任务来执行
(3)不使用SingleThreadExecutor的原因

SingleThreadExecutorFixedThreadPool 一样,使用的都是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列)作为线程池的工作队列。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点,就是可能会导致 OOM。

4.6.3CachedThreadPool

(1)基本介绍

CachedThreadPool 是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool 的实现:

    /*** 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。*/public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}
    public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

CachedThreadPoolcorePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

(2)执行任务过程介绍

CachedThreadPoolexecute() 方法的执行示意图(该图片来源:《Java 并发编程的艺术》):

CachedThreadPool的execute()方法的执行示意图

过程说明:

  1. 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;
  2. 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成
(3)不适用CachedThreadPool的原因

CachedThreadPool 使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/537231.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频格式怎么批量转换?5 个批量视频转换器分享

可以同时转换多个视频吗&#xff1f;您是否正在寻找一款有用的批量视频转换器&#xff1f;最好的批量视频转换器是什么&#xff1f; 使用批量视频转换器同时转换多个视频文件是一个好方法。这篇文章为您总结了 5 个最好的批量视频转换器。 5 个批量视频转换器分享 1、奇客视频…

不用ChatGPT会扣分!韩国大学大胆拥抱生成式AI

“请尝试使用人工智能(AI)进行创作”。 这是韩国成均馆大学影像系教授李慧旼(音)在上学期的课堂中给学生们留的作业。她希望学生们能利用生成型绘图AI软件Dall-E或Midjourney来创作作品。学生们利用AI创作的作品非常有趣&#xff0c;一名学生用文字描述了喜欢用的香水后&#x…

【相关问题解答1】bert中文文本摘要代码:import时无法找到包时,几个潜在的原因和解决方法

【相关问题解答1】bert中文文本摘要代码 写在最前面问题1问题描述一些建议import时无法找到包时&#xff0c;几个潜在的原因和解决方法1. 模块或包的命名冲突解决方法&#xff1a; 2. 错误的导入路径解决方法&#xff1a; 3. 第三方库的使用错误解决方法&#xff1a; 4. 包未正…

【C++教程从0到1入门编程】第八篇:STL中string类的模拟实现

一、 string类的模拟实现 下面是一个列子 #include <iostream> namespace y {class string{public: //string() //无参构造函数// :_str(nullptr)//{}//string(char* str) //有参构造函数// :_str(str)//{}string():_str(new char[1]){_str[0] \0;}string(c…

wordpress博客趣主题个人静态网页模板

博客趣页面模板适合个人博客&#xff0c;个人模板等内容分享。喜欢的可以下载套用自己熟悉的开源程序建站。 博客趣主题具有最小和清洁的设计&#xff0c;易于使用&#xff0c;并具有有趣的功能。bokequ主题简约干净的设计、在明暗风格之间进行现场切换。 下载地址 清新个人…

上海亚商投顾:沪指缩量调整 传媒、游戏股逆势大涨

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 三大指数昨日震荡调整&#xff0c;上证50指数跌近1%&#xff0c;保险等权重板块走低&#xff0c;中国太保跌超…

yolov5模型压缩-torch_pruning

参考论文:DepGraph: Towards Any Structural Pruning(https://arxiv.org/abs/2301.12900) 主要原理:物理的移除参数,并自动找出层与层以及层之间的依赖,完成模型的自动裁剪 模型压缩效果:yolov5剪枝流程如下: pip install torch_pruning 新建prune.py: import torch_…

MySQL的事务隔离是如何实现的?

目录 从一个例子说起 快照读和当前读 事务的启动时机和读视图生成的时刻 MVCC 隐藏字段 Undo Log回滚日志 Read View - 读视图 可重复读(RC)隔离级别下的MVCC 读提交(RR)隔离级别下的MCC 关于MVCC的一些疑问 1.为什么需要 MVCC &#xff1f;如果没有 MVCC 会怎样&am…

开源分子对接程序rDock使用方法(2)-高通量虚拟筛选HTVS

欢迎浏览我的CSND博客&#xff01; Blockbuater_drug …点击进入 文章目录 前言一、rDock用于高通量虚拟筛选HTVSMulti-Step Protocol HTVS步骤及注意事项 二、rDock中Multi-Step Protocol用于HTVS的用法Step 1. Exhaustive dockingStep 2. sdreport summaryStep 3. 运行rbhtfi…

C++进阶--红黑树

概念与特性 红黑树&#xff0c;是一种自平衡的二叉搜索树&#xff0c;它具有以下特点&#xff1a; 每个节点要么是红色&#xff0c;要么是黑色根节点是黑色的。每个叶子节点&#xff08;NIL节点&#xff0c;空节点&#xff09;都是黑色的。如果一个节点是红色的&#xff0c;那…

Python 查找PDF中的指定文本并高亮显示

在处理大量PDF文档时&#xff0c;有时我们需要快速找到特定的文本信息。本文将提供以下三个Python示例来帮助你在PDF文件中快速查找并高亮指定的文本。 查找并高亮PDF中所有的指定文本查找并高亮PDF某个区域内的指定文本使用正则表达式搜索指定文本并高亮 本文将用到国产第三方…

mac启动skywalking报错

这个命令显示已经成功 但是日志报错了以上内容。 然后去修改。vim .bash_profile 查看全局变量&#xff0c;这个jdk却是有2个。所以这个问题没解决。