文章目录
- 1. 线程和现场池的对比
- 2. ThreadPoolExecutor分析
- 3. ThreadPoolExecutor源码详解
1. 线程和现场池的对比
下面代码创建了100000个线程
public class Main {public static void main(String[] args) throws InterruptedException {Long start = System.currentTimeMillis();final Random random = new Random();final List<Integer> list = new ArrayList<Integer>();for(int i=0; i<100000; i++){Thread thread = new Thread() {@Overridepublic void run() {list.add(random.nextInt());}};thread.start();thread.join();}System.out.println("时间:" + (System.currentTimeMillis()-start));System.out.println("大小:" + list.size());}
}
使用了14s左右的时间执行
而下面代码则使用线程池
public class Main {public static void main(String[] args) throws InterruptedException {Long start = System.currentTimeMillis();final Random random = new Random();final List<Integer> list = new ArrayList<Integer>();//创建只有一个线程的线程池ExecutorService executorService = Executors.newSingleThreadExecutor();for(int i=0;i<100000;i++){executorService.execute(new Runnable() {@Overridepublic void run() {list.add(random.nextInt());} });}executorService.shutdown();executorService.awaitTermination(1, TimeUnit.DAYS);System.out.println("时间:"+(System.currentTimeMillis()-start));System.out.println("大小:"+list.size());}
}
发现时间只需要97ms,性能得到了大大的提升,那为什么提升会如此大?原因是代码1使用了100000个线程,而代码2只使用到了1个线程,要知道线程的创建以及上下文切换都是十分消耗CPU资源的,而代码1大量的时间都花到了这部分上面。所以在这种场景下使用线程池会更快一些,但使用线程池一定会更快吗? 看下面的代码:
public class Main {public static void main(String[] args) throws InterruptedException {ExecutorService executorService1 = Executors.newCachedThreadPool();ExecutorService executorService2 = Executors.newFixedThreadPool(10);ExecutorService executorService3 = Executors.newSingleThreadExecutor();for (int i = 0; i < 100; i++) {executorService3.execute(new MyTask(i));}}
}class MyTask implements Runnable {int i = 0;public MyTask(int i) {this.i = i;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"第"+i+"个任务");try{Thread.sleep(300L);}catch (Exception e){e.printStackTrace();}}
}
上面代码我们分别用三个线程池来执行100个任务,执行结果可以发现,三个线程池由上到下速度越来越慢,前面我们代码知道了Executors.newSingleThreadExecutor
这个线程池执行100000个任务的速度非常快,那为什么这里执行100个任务却这么慢呢。这时候我们就需要从我们执行的任务类型开始分析,首先从源码开始分析:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
可以发现三个线程池底层都是基于ThreadPoolExecutor
这个类创建的,我们来重点分析一下这个类。
2. ThreadPoolExecutor分析
ThreadPoolExecutor的构造函数源码如下
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}
corePoolSize
:核心线程数,核心线程数是线程池中一直存在的线程数量,即使它们处于空闲状态
maximumPoolSize
:最大线程数,最大线程数是线程池允许的最大线程数量,包括空闲和活动线程。
BlockingQueue
:阻塞队列,存储我们要执行的任务
ThreadFactory
:线程工厂,用来创建线程
keepAliveTime
:线程生命周期
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
corePoolSize
:核心线程数为0
maximumPoolSize
:最大线程数为Integer.MAX_VALUE
线程生命周期
:60s
可以发现newCachedThreadPool这个现场池没有核心线程,它返回一个可以根据需要创建新线程的线程池,但在先前构建的线程可用时将重用它们。它的特点主要如下:
- 动态线程数: newCachedThreadPool 创建一个具有可变大小的线程池,根据需要动态地创建新的线程。如果有可用的空闲线程,它将重用这些线程,否则将创建新线程。
- 无限制线程数: 线程池的最大线程数理论上是无限的,因为它可以根据需要动态创建新线程。这可能会导致线程数在极端情况下无限增长,因此在某些情况下需要谨慎使用,以防止资源耗尽。
- 自动线程回收: 如果某个线程在60秒内没有被使用(空闲状态),则该线程将被终止并从线程池中移除。这有助于自动释放系统资源。
- 适用于短生命周期的异步任务: 适用于执行大量的、相对较短的任务,因为线程池会根据需要创建新线程,并在空闲一段时间后终止不再需要的线程。
- 不适用于长时间运行的任务: 由于线程池的最大线程数理论上是无限的,不适用于长时间运行的任务,因为这可能导致线程数过多,从而影响性能。
newFixedThreadPool
newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}
corePoolSize
:核心线程数为1
maximumPoolSize
:最大线程数为10
线程生命周期
:永久
LinkedBlockingQueue
:阻塞队列的大小无限大
该线程池将一直保持指定数量的线程,即核心线程数,在任何时候都是活动的。如果有新的任务提交,而线程池中的线程数尚未达到核心线程数,将会创建新线程来处理任务。以下是 newFixedThreadPool 的主要特点:
- 固定线程数: newFixedThreadPool 创建一个具有固定数量线程的线程池,这个数量在创建线程池时被指定。
- 无限制的工作队列: 线程池使用一个无界的工作队列(LinkedBlockingQueue)来保存等待执行的任务。这意味着如果任务提交速度超过了线程池处理任务的速度,任务将被放入队列中等待执行,而不会导致线程数继续增加。
- 适用于长时间运行的任务: 适用于执行长时间运行的任务,因为线程池中的线程数是固定的,不会根据任务的数量而动态变化。
- 控制资源占用: 由于线程数是固定的,可以更好地控制系统资源的占用,不会因为任务数量过多而导致过度的线程创建。
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
corePoolSize
:核心线程数为1
maximumPoolSize
:最大线程数为1
线程生命周期
:永久
LinkedBlockingQueue
:阻塞队列的大小无限大
是一个只包含单个线程的线程池。该线程池确保所有任务按照它们被提交的顺序执行,并且在任何时候都只有一个线程处于活动状态。以下是 newSingleThreadExecutor 的主要特点:
- 单线程执行: newSingleThreadExecutor 创建一个只有一个线程的线程池。这意味着所有任务都将按照它们被提交的顺序在同一个线程中执行。
- 无限制的工作队列: 类似于 newFixedThreadPool,newSingleThreadExecutor 也使用一个无界的工作队列(LinkedBlockingQueue)来保存等待执行的任务。
- 适用于需要顺序执行的任务: 适用于需要按照提交顺序执行的任务,确保任务之间不会并发执行。
- 线程异常处理: 由于只有一个线程,因此可以更容易地捕获和处理线程抛出的异常。
下面就来解释一下为什么上面不同的线程池执行速度会不同,对于newCachedThreadPool,它会根据需要创建很多线程,也会出现线程复用的情况所以它执行速度很快,然后newFixedThreadPool它只创建了10个线程去执行100个任务,速度会比newCachedThreadPool慢,最后是newSingleThreadExecutor它只有一个线程执行100个任务所以它是最慢的。这就是从线程池角度出发,为什么会出现上面不同线程池执行速度不同的情况。然后从任务的角度出发,对于newSingleThreadExecutor它前面执行100000个任务,由于任务很小(代码可以看出)能在瞬间完成,这个唯一的线程能不断复用,所以速度会非常快。而对于Mytask任务,该任务里面执行了sleep()
函数,算一个比较大的任务,所以单一线程会执行非常慢。这就是出现前面情况的原因。
注意:阿里规范是不推荐使用JDK提供的线程池的
- 线程资源无限制: Executors.newCachedThreadPool 和 Executors.newFixedThreadPool 等工厂方法创建的线程池,由于没有明确地设置最大线程数或工作队列容量,可能会导致线程资源无限制地增长,从而影响系统的稳定性。
- 无法控制线程池的最大大小: 这些工厂方法创建的线程池,虽然能够满足任务提交需求,但是它们没有提供直接的方式来设置合理的最大线程数,特别是在并发任务量剧增的情况下。
- 无界队列可能导致内存溢出: Executors.newCachedThreadPool 和 Executors.newFixedThreadPool 默认使用无界的任务队列,如果任务提交速度远远快于线程池处理速度,会导致队列无限增长,最终可能导致内存溢出。
相反,阿里规范推荐使用ThreadPoolExecutor手动创建线程池,以便有更多的控制权。通过手动配置 ThreadPoolExecutor 的参数,可以限制线程的数量、设置合理的队列大小,并明确定义饱和策略。这样可以更好地适应不同的业务场景和负载情况,提高系统的稳定性和可维护性。
3. ThreadPoolExecutor源码详解
如下面我们自定义了一个线程池来执行前面的Mytask任务,现场池有10个核心线程,20个最大现场数,阻塞队列的大小为10个,仔细观察执行结果:
public class Main {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(10,20,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));//自定义线程for (int i = 0; i < 100; i++) {threadPoolExecutor.execute(new MyTask(i));}}
}
- 我们发现执行任务是无序的
- 出现了拒绝任务的错误
为了分析出现上面两个情况的原因,我们要深入ThreadPoolExecutor的源码进行分析。
- ThreadPoolExecutor类结构图
- Executor类
Executor 接口是 Java 并发框架的一部分,它定义了一种执行任务的简单机制。
public interface Executor {/*** 在将来的某个时间执行给定的命令。* 该命令可以在新线程、池化线程或调用线程中执行,这由 Executor 的实现决定。** @param command 要执行的命令* @throws RejectedExecutionException 如果不能接受该任务,则抛出此异常。* @throws NullPointerException 如果命令为 null,则抛出此异常。*/void execute(Runnable command);
}
- ExecutorService
ExecutorService 是 Executor 接口的子接口,它扩展了 Executor 的功能,提供了更丰富的任务执行和管理功能。ExecutorService 主要用于执行和管理异步任务,可以提交任务、取消任务、获取任务执行的结果等。
public interface ExecutorService extends Executor {
//平缓地关闭执行器,不再接受新任务,但会等待已经提交的任务执行完成。void shutdown();//立即关闭执行器,尝试停止所有正在执行的任务,并返回等待执行的任务列表。List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;//提交一个可调用的任务,并返回一个表示该任务未来结果的 Future。<T> Future<T> submit(Callable<T> task);//提交一个可运行的任务,并返回一个表示该任务未来结果的 Future。<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);//执行给定的任务列表,返回一个包含 Future 的列表,表示任务的未来结果。在所有任务完成之前,此方法将阻塞。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
- AbstractExecutorService
AbstractExecutorService 是 ExecutorService 接口的抽象实现,它提供了一些基本的执行器服务功能,使得实现 ExecutorService 接口的类能够更容易地实现这个接口。
public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}}
ThreadPoolExecutor
我们在定义完ThreadPoolExecutor后会调用excute方法去执行任务
execute
public void execute(Runnable command) { //任务为空抛出空指针异常if (command == null)throw new NullPointerException();//获取当前线程池的控制状态(ctl 是 AtomicInteger 类型的变量,用于表示线程池的状态和工作线程数)。int c = ctl.get();//检查当前工作线程数量是否小于核心线程数 (corePoolSize)。如果是,尝试通过 addWorker 方法增加一个核心线程来执行任务。如果成功添加了线程,则直接返回,任务将由新线程执行。如果无法添加线程,则重新获取控制状态并继续执行后续逻辑。if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果无法添加核心线程,检查线程池是否处于运行状态(isRunning(c))。如果是,并且成功将任务放入工作队列 (workQueue.offer(command)),则继续执行。if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//如果此时线程池不再运行,并且任务无法从队列中移除,说明任务将被拒绝(reject)。if (! isRunning(recheck) && remove(command))reject(command);//如果此时线程池中没有工作线程,则尝试添加一个非核心线程(addWorker(null, false))。else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果无法将任务添加到工作队列中,则尝试通过 addWorker 方法添加一个非核心线程来执行任务。如果无法添加非核心线程,则说明线程池已经饱和,任务将被拒绝。else if (!addWorker(command, false))reject(command);}
- 1-10的任务可以交给核心线程去执行
- 11-20的任务会放在阻塞队列中
- 21-30的任务由于阻塞队列满了,所以会创建10个非核心线程执行
- 31的任务无法处理了直接执行拒绝策略
以上就是为什么会出现拒绝错误的问题
addWorker
上面execute源码我们可以看出多次调用了addWorker,这个方法用于尝试添加一个新的工作线程到线程池,下面我们来详细分析一下这个方法。
private boolean addWorker(Runnable firstTask, boolean core) {retry://首先,使用一个无限循环标签retry开始迭代。在每次迭代中,首先获取当前的控制状态c和运行状态rs。for (;;) {int c = ctl.get();int rs = runStateOf(c);// 然后,检查线程池的运行状态是否已经达到 SHUTDOWN 或更高,并且工作队列为空。如果是,说明线程池已经关闭或正在关闭,并且工作队列已经为空,此时不应该再添加新的线程,返回false。if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;
//在内层循环中,检查工作线程数是否达到线程池容量的上限(CAPACITY)或者达到了当前设置的核心线程数或最大线程数上限。如果工作线程数已经超过上限,则直接返回 false,不再添加新的线程。for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//如果可以增加工作线程数,通过 compareAndIncrementWorkerCount(c) 尝试原子地增加工作线程数。如果成功增加,则跳出外层循环,返回 true 表示成功添加了工作线程。如果增加失败,重新获取控制状态 c 并检查运行状态是否发生了变化。如果运行状态发生变化,重新进入外层循环进行重试。if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}//workerStarted 和 workerAdded 是用于记录是否成功启动工作线程的标志。boolean workerStarted = false;boolean workerAdded = false;//创建一个Worker 对象,Worker 是 ThreadPoolExecutor中的内部类,表示线程池中的工作线程。Worker w = null;try {//首先通过new Worker(firstTask) 创建一个新的工作线程对象,final Thread t = w.thread; 获取该工作线程的Thread对象。w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//获取mainLock,这是线程池的主锁,确保在更新线程池状态和工作线程集合时是同步的。final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//在获取锁的情况下,再次检查线程池的运行状态 rs。只有在线程池处于运行状态(SHUTDOWN 状态及以上),或者线程池处于关闭状态且提交的任务为 null 时,才允许添加工作线程。int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();//在获取锁的情况下,再次检查线程池的运行状态 rs。只有在线程池处于运行状态(SHUTDOWN 状态及以上),或者线程池处于关闭状态且提交的任务为 null 时,才允许添加工作线程。if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//如果成功添加了工作线程(非核心线程),调用 t.start() 启动工作线程,并将 workerStarted 设置为 truet.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
简而言之上面代码就是做了一件事,检查现场池的状态,然后根据状态是否考虑将工作现场加入到现场池中开始执行任务。执行任务调用的是t.start()方法,默认执行现场的Worker类的run方法,如下
public void run() {runWorker(this);}
run方法中调用了一个runWorker方法
final void runWorker(Worker w) {
//Thread wt = Thread.currentThread(); 获取当前工作线程的 Thread 对象。Thread wt = Thread.currentThread();//Runnable task = w.firstTask; 获取工作线程的初始任务。Runnable task = w.firstTask;//w.firstTask = null; 清空工作线程的初始任务。(用于线程复用)w.firstTask = null;//w.unlock(); 解锁工作线程,允许被中断。w.unlock(); // allow interrupts//用于标记工作线程是否突然终止,默认为 true。boolean completedAbruptly = true;try {//这是一个主要的工作循环,该循环不断从工作队列中取任务并执行while (task != null || (task = getTask()) != null) {//获取工作线程的独占锁,确保在执行任务过程中是独占锁的w.lock();//判断线程池是否正在关闭。如果线程池处于 STOP 状态或更高状态,并且当前线程未被中断,就中断当前线程。if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务的run方法。task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
这段代码描述了工作线程的主要执行逻辑,其中包括任务的获取、执行、异常处理、线程状态的检查以及线程的清理工作。