上一篇文章,主要讲述了如果通过线程池进行执行任务,以及相关的核心流程,线程执行框架本身提供了一系列的类,封装了线程创建、关闭、执行、管理等跟业务逻辑无关的代码逻辑,一方面将业务和非业务逻辑进行解耦合,另一方面也可以达到复用。
Executor、ExecutorService、Executors
Executor和 ExecutorService都是接口,前者定义了execute方法,后者添加了一些基础的线程关闭提交等方法。Executors是一个工具类。用来创建执行器。
public interface Executor {void execute(Runnable command);
}
newFixedThreadPool
newFixedThreadPool 是一个创建固定线程池的,核心线程和最大都是nThreads,可以看出都是核心线程池,所以线程都不会销毁。但是工作队列 LinkedBlockingQueue 却是一个无界队列,默认是 Integer.MAX_VALUE。所以如果是使用这种方式,虽然工作线程是固定的数量,但是任务队列是无界的,如果人多比较多,那么处理慢的话,队列可能快速挤压,撑爆内存OOM。永远不会执行拒绝策略。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}
newSingleThreadExecutor
创建一个单线程进行处理,核心线程就是1,最大线程数也是1。但是任务队列也是Integer的最大值。
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
newCachedThreadPool
核心线程是0,最大线程是Integer的最大值,超过60S就会销毁。但是任务队列是长度为0的阻塞队列,不存储任何的等待执行的任务,如果线程池有空闲线程,那么空闲线程进行处理,没有的话,就会创建新的线程进行处理。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
newScheduledThreadPool
newScheduledThreadPool 定时或者周期性的执行任务,线程池的核心线程大小为corePoolSize ,最大线程池大小为 Integer.MAX_VALUE
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
在阿里的手册中,也标记的有 1.要使用线程池进行处理任务,2.不要使用Executors去创建任务。Fixed 和single的任务队列是Integer的最大值,有大量请求的时候可能OOM, cache的最大线程池是Integer.MaxValue值,会频繁创建线程。
上述的方式其实就有问题,没有定义任务队列的大小,如果任务过多的时候,其实会撑爆内存,OOM。
OOM问题
执行之后,会循环1亿次,然后因为使用的是cached所以会不断的创建线程处理任务。最终
Exception in thread “pool-1-thread-63” java.lang.OutOfMemoryError: Java heap space
private void oom1() throws InterruptedException {ExecutorService threadPool = Executors.newCachedThreadPool();for (int i = 0; i < 100000000; i++) {threadPool.execute(() -> {String payload = IntStream.rangeClosed(1, 1000000).mapToObj(__ -> "a").collect(Collectors.joining("")) + UUID.randomUUID().toString();try {TimeUnit.HOURS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(payload);});}threadPool.shutdown();threadPool.awaitTermination(1, TimeUnit.HOURS);}
实际应用
所以在实际的开发中,如果需要使用多线程进行处理任务,那么一定不要使用juc内置的方法,而要根据自己业务的QPS 衡量下 应该设置的核心、最大、回收策略、工作队列的类型等。一般都需要设置有届的工作队列和可控的线程数,
1.手动创建 2.定义自定义的线程名
public static MdcThreadPoolExecutor newCustomThreadPool(int corePoolSize, int maximumPoolSize, int capacity, String featureOfGroup) {return new MdcThreadPoolExecutor(corePoolSize, maximumPoolSize,0L, new LinkedBlockingQueue<>(capacity), featureOfGroup);}private MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, BlockingQueue<Runnable> workQueue, String featureOfGroup) {super(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQueue,new NamedThreadFactory(featureOfGroup));this.featureOfGroup = featureOfGroup;}ExecutorService service = MdcThreadPoolExecutor.newCustomThreadPool(6, 6, 50, "xxxx");public class NamedThreadFactory implements ThreadFactory {private final String namePrefix;private final AtomicInteger nextId = new AtomicInteger(1);public NamedThreadFactory(String featureOfGroup) {namePrefix = "NamedThreadFactory's " + featureOfGroup + "-Worker-";}@Overridepublic Thread newThread(Runnable task) {String name = namePrefix + nextId.getAndDecrement();Thread thread = new Thread(null, task, name, 0);if (thread.isDaemon()) {thread.setDaemon(false);}if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}
}