定义:让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。线程池便是这种模式。
饥饿:
固定大小线程池也会有饥饿现象。
若一个线程池有两个线程,能够处理两种任务。但是两种任务间有先后顺序。若来一个任务的时候,线程A先处理,然后需要线程B在线程A内帮助处理后续问题。当来两个任务时,线程A,B同时处理这两个任务,但是没有线程处理后续问题。这时便出现了饥饿现象。
当两个线程处理一个任务时:
import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;@Slf4j(topic = "TC47")
public class TC47 {static final List<String> foods = Arrays.asList("地三鲜","锅包肉","小炒肉","菠萝饭");static Random random = new Random();static String cooking() { return foods.get(random.nextInt(foods.size()));}public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2);//result: 中间有线程2帮助做饭//16:09:36.117 [pool-1-thread-1] DEBUG TC47 - 开始点单了....//16:09:36.148 [pool-1-thread-2] DEBUG TC47 - 开始做饭了....//16:09:36.148 [pool-1-thread-1] DEBUG TC47 - 上菜了: 小炒肉....pool.execute(()->{log.debug("开始点单了....");Future<String> food = pool.submit(() -> {log.debug("开始做饭了....");return cooking();});try {log.debug("上菜了: {}....",food.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}});}
}
当两个线程处理两个任务时--出现饥饿
import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;@Slf4j(topic = "TC47")
public class TC47 {static final List<String> foods = Arrays.asList("地三鲜","锅包肉","小炒肉","菠萝饭");static Random random = new Random();static String cooking() { return foods.get(random.nextInt(foods.size()));}public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2);//result: 线程1,2同时点单,没有线程帮助做饭,出现饥饿//16:13:21.932 [pool-1-thread-2] DEBUG TC47 - 开始点单了....//16:13:21.932 [pool-1-thread-1] DEBUG TC47 - 开始点单了....pool.execute(()->{log.debug("开始点单了....");Future<String> food = pool.submit(() -> {log.debug("开始做饭了....");return cooking();});try {log.debug("上菜了: {}....",food.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}});pool.execute(()->{log.debug("开始点单了....");Future<String> food = pool.submit(() -> {log.debug("开始做饭了....");return cooking();});try {log.debug("上菜了: {}....",food.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}});}
}
避免饥饿
不同类型的任务应使用不同类型的线程池
import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;@Slf4j(topic = "TC48")
public class TC48 {static final List<String> foods = Arrays.asList("地三鲜","锅包肉","小炒肉","菠萝饭");static Random random = new Random();static String cooking() {return foods.get(random.nextInt(foods.size()));}public static void main(String[] args) {ExecutorService waiterPool = Executors.newFixedThreadPool(1);ExecutorService cookPool = Executors.newFixedThreadPool(1);waiterPool.execute(()->{log.debug("开始点单....");Future<String> food = cookPool.submit(() -> {log.debug("开始做菜....");return cooking();});try {log.debug("上菜了: {}",food.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}});waiterPool.execute(()->{log.debug("开始点单....");Future<String> food = cookPool.submit(() -> {log.debug("开始做菜....");return cooking();});try {log.debug("上菜了: {}",food.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}});}
}
合理创建线程池
- 线程池过小会导致不能充分利用系统资源,容易产生饥饿现象
- 线程池过大会导致更多的线程上下文切换,占用更多内存,影响性能
CPU密集型运算
适合做数据分析
线程数=CPU核数+1
I/O密集型运算
当I/O密集运算时,可以利用闲置的CPU。
任务调度线程池
Timer
Timer执行定时功能但是task2受task1的Sleep影响。
import lombok.extern.slf4j.Slf4j;import java.util.Timer;
import java.util.TimerTask;
@Slf4j(topic = "TC49")
public class TC49 {public static void main(String[] args) {Timer time = new Timer();TimerTask task1 = new TimerTask() {@Overridepublic void run() {log.debug("Task1...");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {log.debug("Task2...");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}};log.debug("start");time.schedule(task1,1000);time.schedule(task2,1000);}
}
ScheduleExecutorService
schedule()
若任务中发生异常,它不会抛出异常也不会在控制台里打印出来,需要我们自己进行try/catch 捕获或这throw。
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Slf4j(topic = "TC50")
public class TC50 {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);//延迟1S执行pool.schedule(()->{log.debug("Taks1....");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("Finish task1...");},1, TimeUnit.SECONDS);pool.schedule(()->{log.debug("Taks2....");log.debug("Finish task2...");},1, TimeUnit.SECONDS);}
}
SchedulelAtFixedRate() 执行时间会抵消delay时间
//result: 如果有sleep, scheduleAtFixedRate的delay会被第一次执行的时间抵消掉。//task3 第一次执行2S完后,抵消了第二次的delay 1S,所以第二次就直接在2S后执行。
//15:54:14.730 [pool-1-thread-1] DEBUG TC50 - Taks3....
//15:54:16.755 [pool-1-thread-1] DEBUG TC50 - Taks3....
//15:54:18.762 [pool-1-thread-1] DEBUG TC50 - Taks3....
//15:54:20.766 [pool-1-thread-1] DEBUG TC50 - Taks3....//任务,初始延迟时间,延迟周期,时间单位pool.scheduleAtFixedRate(()->{log.debug("Taks3....");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}},1,1,TimeUnit.SECONDS);
scheduleWithFixedDelay() 执行时间不会抵消delay时间
//result: 如果有sleep, scheduleWithFixedDelay的delay不会被第一次执行的时间抵消掉,而是在第二次开始重新计算。
//task4 第一次执行2S完后,再delay 1S,再次执行第二次。//16:04:06.244 [pool-1-thread-1] DEBUG TC50 - Taks4....//16:04:09.280 [pool-1-thread-1] DEBUG TC50 - Taks4....//16:04:12.302 [pool-1-thread-1] DEBUG TC50 - Taks4....//16:04:15.329 [pool-1-thread-1] DEBUG TC50 - Taks4....pool.scheduleWithFixedDelay(()->{log.debug("Taks4....");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}},1,1,TimeUnit.SECONDS);
异常处理
- 自己使用try/catch 捕获
- 使用Callable +Future-->如果有异常就能使用get()拿到异常。