线程池图解
线程池与主线程之间通过一个阻塞队列来平衡任务分配,阻塞队列中既可以满足线程等待,又要接收主线程的任务。
线程池实现
使用一个双向链表实现任务队列
创建任务队列
//阻塞队列
public class BlockingQueue<T> {//双线链表private Deque<T> queue = new ArrayDeque();//锁private ReentrantLock lock =new ReentrantLock();//生产者条件变量private Condition fullWaitSet = lock.newCondition();//消费者条件变量private Condition emptyWaitSet = lock.newCondition();//容器容量大小private int capacity;//阻塞获取public T pull(long timeOut, TimeUnit unit){lock.lock();//判断链表中是否存在任务待处理try {//将尝试时间转化为纳秒long nanos = unit.toNanos(timeOut);while (queue.isEmpty()){try {if (nanos<0){return null;}//awaitNanos返回结果是最大等待时间减去睡眠时间的剩余时间nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}//阻塞添加public void put(T element){lock.lock();try{while(queue.size()==capacity){//说明满了,暂时无法添加新的任务try {fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(element);emptyWaitSet.signal();}finally {lock.unlock();}}//获取队列任务数量public int size(){lock.lock();try {return queue.size();}finally {lock.unlock();}}
}
创建线程池
public class ThreadPool {//任务队列private BlockingQueue<Runnable> blockingQueue;//线程集合private HashSet<Worker> workers = new HashSet();//核心线程数private int coreNum;//超时时间private long timeOut;private TimeUnit unit;public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity) {System.out.println("初始化线程池");this.coreNum = coreNum;this.timeOut = timeOut;this.unit = unit;this.blockingQueue = new BlockingQueue<>(queueCapacity);}//线程执行任务public void execute(Runnable task) {//当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueuesynchronized (workers) {if (workers.size() < coreNum) {Worker worker = new Worker(task);System.out.println("新增worker"+worker);workers.add(worker);worker.start();} else {System.out.println("从消息队列中获取task");blockingQueue.put(task);}}}class Worker extends Thread {private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {while (task != null || (task = blockingQueue.pull(timeOut, unit)) != null) {try {System.out.println("Worker执行任务");task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers){System.out.println("Worker执行完毕"+this);workers.remove(this);}}}
}
测试
public class Test {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(3,3000, TimeUnit.MILLISECONDS,5);for (int i = 0; i < 10; i++) {int j = i;threadPool.execute(()->{try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("生产任务:"+j);});}}
}
初始化线程池
新增workerThread[Thread-0,5,main]
新增workerThread[Thread-1,5,main]
新增workerThread[Thread-2,5,main]
Worker执行任务
Worker执行任务
加入任务队列TheadPool.Test$$Lambda$1/1078694789@7ba4f24f
Worker执行任务
加入任务队列TheadPool.Test$$Lambda$1/1078694789@3b9a45b3
加入任务队列TheadPool.Test$$Lambda$1/1078694789@7699a589
加入任务队列TheadPool.Test$$Lambda$1/1078694789@58372a00
加入任务队列TheadPool.Test$$Lambda$1/1078694789@4dd8dc3
等待加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736
生产任务:2
生产任务:1
生产任务:0
Worker执行任务
Worker执行任务
加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736
Worker执行任务
加入任务队列TheadPool.Test$$Lambda$1/1078694789@378bf509
生产任务:3
生产任务:4
生产任务:5
Worker执行任务
Worker执行任务
Worker执行任务
生产任务:6
生产任务:8
生产任务:7
Worker执行任务
Worker执行完毕Thread[Thread-1,5,main]
Worker执行完毕Thread[Thread-0,5,main]
生产任务:9
Worker执行完毕Thread[Thread-2,5,main]
添加拒绝策略
上面测试中,有一点不友好的是,当任务队列满了之后,再向其中添加任务时,主线程会死等任务添加成功。
对此我们可以选择多种解决方案
- 死等
- 添加超时时间
- 让调用者方式执行
- 让调用者抛出异常
- 让调用者自己执行
创建拒绝策略
@FunctionalInterface
public interface RejectPolicy<T> {void reject(BlockingQueue<T> queue,T task);
}
修改线程池的执行方法
//添加属性private RejectPolicy rejectPolicy;//构造方法public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity, RejectPolicy rejectPolicy) {System.out.println("初始化线程池");this.coreNum = coreNum;this.timeOut = timeOut;this.unit = unit;this.blockingQueue = new BlockingQueue<>(queueCapacity);this.rejectPolicy = rejectPolicy;}//线程执行任务public void execute(Runnable task) {//当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueuesynchronized (workers) {if (workers.size() < coreNum) {Worker worker = new Worker(task);System.out.println("新增worker" + worker);workers.add(worker);worker.start();} else {
// System.out.println("从消息队列中获取task");
// blockingQueue.put(task);blockingQueue.tryPut(rejectPolicy,task);}}}
任务队列添加方法
public void tryPut(RejectPolicy rejectPolicy, T task) {lock.lock();try {if (queue.size() == capacity) {//如果满了,需要调用拒绝策略rejectPolicy.reject(this,task);} else {queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}
测试
public class Test {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(3,3000,TimeUnit.MILLISECONDS,5,(queue,task)->{//由调用者决定任务队列满了之后如何处理后续任务queue.put(task);//死等queue.offer(task,1000,TimeUnit.MILLISECONDS);//超时返回//啥也不干,直接丢弃任务task.run();//调用者自己执行throw new RuntimeException("任务秩序异常");//抛出异常});for (int i = 0; i < 10; i++) {int j = i;threadPool.execute(()->{try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("生产任务:"+j);});}}
}