首先新建一个线程池管理工具类ThreadPoolManagerUtil
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;/*** 线程池管理工具类* 提供单例线程池和多实例线程池的管理功能,支持动态配置、线程池状态监控、优雅关闭等操作。*/ @Slf4j public class ThreadPoolManagerUtil {/*** 单例线程池实例(全局通用线程池)*/private static volatile ThreadPoolExecutor singleThreadPoolExecutor;/*** 多线程池实例集合*/private static final ConcurrentHashMap<String, ThreadPoolExecutor> threadPools = new ConcurrentHashMap<>();/*** 线程池默认核心线程数*/private static final int DEFAULT_CORE_POOL_SIZE = 4;/*** 线程池默认最大线程数*/private static final int DEFAULT_MAXIMUM_POOL_SIZE = 8;/*** 线程池默认空闲线程存活时间*/private static final long DEFAULT_KEEP_ALIVE_TIME = 60L;/*** 线程池默认阻塞队列容量*/private static final int DEFAULT_QUEUE_CAPACITY = 100;/*** 私有构造方法,避免外部实例化*/private ThreadPoolManagerUtil() {}/*** 获取单例线程池(默认配置)** @return 单例线程池实例*/public static ThreadPoolExecutor getSingleThreadPoolExecutor() {if (singleThreadPoolExecutor == null) {initializeSingleThreadPoolExecutor();}return singleThreadPoolExecutor;}/*** 初始化单例线程池(懒加载,双重检查锁实现单例)* 1. 单例线程池的创建采用懒加载(Lazy Initialization)方式,即在需要的时候才初始化线程池,避免在类加载时就初始化占用资源。* 2. 使用双重检查锁(Double-Checked Locking)模式,保证线程池的实例化过程是线程安全的,同时避免了不必要的同步开销。* 3. 线程池创建时可以自定义核心线程数、最大线程数、线程空闲时间以及阻塞队列的容量等参数。* 4. 使用自定义线程工厂命名线程,方便日志排查,以及设置线程为守护线程(Daemon Thread)。* 5. 采用 CallerRunsPolicy 拒绝策略,保证在线程池资源耗尽时,任务不会被直接丢弃,而是由调用者线程处理任务。*/private static void initializeSingleThreadPoolExecutor() {if (singleThreadPoolExecutor == null) {synchronized (ThreadPoolManagerUtil.class) {if (singleThreadPoolExecutor == null) {log.info("Initializing single thread pool with custom configuration...");singleThreadPoolExecutor = new ThreadPoolExecutor(ThreadPoolManagerUtil.DEFAULT_CORE_POOL_SIZE,ThreadPoolManagerUtil.DEFAULT_MAXIMUM_POOL_SIZE,ThreadPoolManagerUtil.DEFAULT_KEEP_ALIVE_TIME,TimeUnit.SECONDS,new ArrayBlockingQueue<>(ThreadPoolManagerUtil.DEFAULT_QUEUE_CAPACITY),createCustomThreadFactory("SingleThreadPool"),new ThreadPoolExecutor.CallerRunsPolicy());}}}}/*** 动态调整线程池参数(单例线程池)** @param corePoolSize 新的核心线程数* @param maximumPoolSize 新的最大线程数*/public static void adjustSingleThreadPool(int corePoolSize, int maximumPoolSize) {if (singleThreadPoolExecutor != null) {log.info("Adjusting single thread pool: CorePoolSize={}, MaximumPoolSize={}", corePoolSize, maximumPoolSize);singleThreadPoolExecutor.setCorePoolSize(corePoolSize);singleThreadPoolExecutor.setMaximumPoolSize(maximumPoolSize);}}/*** 优雅关闭单例线程池*/public static void shutdownSingleThreadPool() {if (singleThreadPoolExecutor != null && !singleThreadPoolExecutor.isShutdown()) {log.info("Shutting down single thread pool...");singleThreadPoolExecutor.shutdown();try {if (!singleThreadPoolExecutor.awaitTermination(60, TimeUnit.SECONDS)) {singleThreadPoolExecutor.shutdownNow();}} catch (InterruptedException e) {singleThreadPoolExecutor.shutdownNow();Thread.currentThread().interrupt();}}}/*** 获取多实例线程池(通过名称区分)** @param name 线程池名称* @param corePoolSize 核心线程数* @param maximumPoolSize 最大线程数* @param keepAliveTime 空闲线程存活时间* @param queueCapacity 阻塞队列容量* @return 对应名称的线程池实例*/public static ThreadPoolExecutor getThreadPoolExecutor(String name,int corePoolSize,int maximumPoolSize,long keepAliveTime,int queueCapacity) {return threadPools.computeIfAbsent(name, k -> {log.info("Initializing thread pool: {}", name);return new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,new ArrayBlockingQueue<>(queueCapacity),createCustomThreadFactory(name),new ThreadPoolExecutor.CallerRunsPolicy());});}/*** 提供默认配置的多实例线程池** @param name 线程池名称* @return 对应名称的线程池实例*/public static ThreadPoolExecutor getThreadPoolExecutor(String name) {return getThreadPoolExecutor(name, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME, DEFAULT_QUEUE_CAPACITY);}/*** 优雅关闭所有多实例线程池*/public static void shutdownAllThreadPools() {log.info("Shutting down all thread pools...");threadPools.forEach((name, executor) -> {if (!executor.isShutdown()) {log.info("Shutting down thread pool: {}", name);executor.shutdown();try {if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}});}/*** 自定义线程工厂,支持线程命名和守护线程** @param threadNamePrefix 线程名前缀* @return 自定义线程工厂*/private static ThreadFactory createCustomThreadFactory(String threadNamePrefix) {return new ThreadFactory() {private final AtomicInteger threadNumber = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, threadNamePrefix + "-thread-" + threadNumber.getAndIncrement());// 设置为守护线程thread.setDaemon(true);return thread;}};}/*** 获取线程池状态信息** @param executor 线程池实例* @return 线程池状态信息*/public static String getThreadPoolStatus(ThreadPoolExecutor executor) {return String.format("ThreadPool Status: [CorePoolSize: %d, MaximumPoolSize: %d, ActiveCount: %d, QueueSize: %d, CompletedTaskCount: %d]",executor.getCorePoolSize(),executor.getMaximumPoolSize(),executor.getActiveCount(),executor.getQueue().size(),executor.getCompletedTaskCount());} }
简易使用示例
public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = ThreadPoolManagerUtil.getSingleThreadPoolExecutor();List<String> list = new ArrayList<>();List<CompletableFuture<Void>> futures = list.stream().map(vo -> CompletableFuture.runAsync(() -> {// todo 业务处理// ...// ... }, threadPoolExecutor)).toList();// 等待所有任务完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();// 根据需要确认是否关闭单例线程池 ThreadPoolManagerUtil.shutdownSingleThreadPool();}