文章目录
线程池最主要有两部分,阻塞队列和线程池,阻塞队列使用双端队列
,我们需要定义队列的容量
、 还需要锁
来保证将任务放入队列和取出队列的正确性、使用Reentrentlock定义两个条件变量:生产者条件变量和消费者条件变量,当队列满时任务不能放入队列,在生产者条件变量await,当队列为空时不能从队列中取任务,在消费者的条件变量中await,当然的话还需要有唤醒逻辑,以及取任务和存任务都可以带超时时间,避免总等待的情况。
线程池:定义了线程集合(hashset)、核心线程数、获取任务的超时时间、任务队列、拒绝策略(当没能力处理任务时,这些任务怎么办)
线程池中处理任务的逻辑:当任务数没有超过核心线程数时,每个任务都有对应的线程处理,并且线程的run方法里面是有while循环的,只要任务队列中还有任务时,会源源不断的执行这些任务,直到没有任务时,才将线程池中的线程回收,当然这个是线程池有能力处理这些任务的情况下,当任务队列满了,可以使用策略模式,将策略传入阻塞队列,然后执行拒绝的这种策略。
package cn.itcast.n8;import lombok.extern.slf4j.Slf4j;
import org.springframework.core.log.LogDelegateFactory;import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{// 1. 死等
// queue.put(task);// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行
// log.debug("放弃{}", task);// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);// 5) 让调用者自己执行任务task.run();});for (int i = 0; i < 4; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task);
}@Slf4j(topic = "c.ThreadPool")
class ThreadPool {// 任务队列private BlockingQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 核心线程数private int coreSize;// 获取任务时的超时时间private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 执行任务public void execute(Runnable task) {// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行// 如果任务数超过 coreSize 时,加入任务队列暂存synchronized (workers) {if(workers.size() < coreSize) {Worker worker = new Worker(task);log.debug("新增 worker{}, {}", worker, task);workers.add(worker);worker.start();} else {
// taskQueue.put(task);// 1) 死等// 2) 带超时等待// 3) 让调用者放弃任务执行// 4) 让调用者抛出异常// 5) 让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}}public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1) 当 task 不为空,执行任务// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行...{}", task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {log.debug("worker 被移除{}", this);workers.remove(this);}}}
}
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {// 1. 任务队列private Deque<T> queue = new ArrayDeque<>();// 2. 锁private ReentrantLock lock = new ReentrantLock();// 3. 生产者条件变量private Condition fullWaitSet = lock.newCondition();// 4. 消费者条件变量private Condition emptyWaitSet = lock.newCondition();// 5. 容量private int capcity;public BlockingQueue(int capcity) {this.capcity = capcity;}// 带超时阻塞获取public T poll(long timeout, TimeUnit unit) {lock.lock();try {// 将 timeout 统一转换为 纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {try {// 返回值是剩余时间if (nanos <= 0) {return null;}nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞获取public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞添加public void put(T task) {lock.lock();try {while (queue.size() == capcity) {try {log.debug("等待加入任务队列 {} ...", task);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();} finally {lock.unlock();}}// 带超时时间阻塞添加public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capcity) {try {if(nanos <= 0) {return false;}log.debug("等待加入任务队列 {} ...", task);nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();return true;} finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否满if(queue.size() == capcity) {rejectPolicy.reject(this, task);} else { // 有空闲log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}
}