预备知识:Java 线程挂起的常用方式有以下几种
Thread.sleep(long millis)
:这个方法可以让线程挂起一段时间,并释放 CPU 时间片,等待一段时间后自动恢复执行。这种方式可以用来实现简单的定时器功能,但如果不恰当使用会影响系统性能。Object.wait()
和Object.notify()
或Object.notifyAll()
:这是一种通过等待某个条件的发生来挂起线程的方式。wait()
方法会让线程等待,直到其他线程调用了notify()
或notifyAll()
方法来通知它。这种方式需要使用 synchronized 或者 ReentrantLock 等同步机制来保证线程之间的协作和通信。LockSupport.park()
和LockSupport.unpark(Thread thread)
:这两个方法可以让线程挂起和恢复。park()
方法会使当前线程挂起,直到其他线程调用了unpark(Thread thread)
方法来唤醒它。这种方式比较灵活,可以根据需要控制线程的挂起和恢复。
先上结论:
1.futureTask.get时通过LockSupport.park()挂起线程
2.在Thread.run() 方法中 调用 setException(ex)或set(result),然后调用LockSupport.unpark(t)唤醒线程。
一:示例-引入主题
public class FutureTaskDemo {public static void main(String[] args) {FutureTask<String> futureTask = new FutureTask<>(new Callable() {@Overridepublic Object call() throws Exception {System.out.println("异步线程执行");Thread.sleep(3000);//模拟线程执行任务需要3秒return "ok";}});Thread t1 = new Thread(futureTask, "线程一");t1.start();try {//关键代码String s = futureTask.get(2, TimeUnit.SECONDS); //最大等待线程2秒} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
二:进入futureTask.get(2, TimeUnit.SECONDS);
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //重点awaitDone,即完成了最大等待,依然没有结果就抛出异常逻辑throw new TimeoutException();return report(s);}
awaitDone返回线程任务执行状态,即小于等于COMPLETING(任务正在运行,等待完成)抛出异常TimeoutException
三:进入(awaitDone(true, unit.toNanos(timeout)))原理分析
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}
3.1 总体解读awaitDone
利用自旋(for (;😉)的方式 ,检查state(任务状态)与waitNode(维护等待的线程),
第一步:首先检查if (Thread.interrupted()) 线程是否被打断(LockSupport.parkNanos挂起的线程被打断不抛出异常),
第二步:判断任务状态与waitNode是否入队+确定最大等待时间
若已完成(if (s > COMPLETING))返回任务状态
若已完成(if (s == COMPLETING))-->表示正在完成,但尚未完成。则让出 CPU,进入就绪状态,等待其他线程的执行
若if (q == null)==>创建等待等待节点
若if (!queued)==>表示上一步创建的节点没有和当前线程绑定,故绑定
最后else if (timed)与else,判断最大等待时间
static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
state可能转换的过程 1.NEW -> COMPLETING -> NORMAL (成功完成)2.NEW -> COMPLETING -> EXCEPTIONAL (异常)3.NEW -> CANCELLED (任务被取消)4.NEW -> INTERRUPTING -> INTERRUPTED(任务被打断)
3.2 关键代码
LockSupport.park(this, nanos) ==内部实现==> UNSAFE.park(false, nanos)();
即让当前线程堵塞直至指定的时间(nanos),该方法同Thread.sleep()一样不会释放持有的对象锁,但不同的是Thread.sleep会被打断(interrupted)并抛出异常,而LockSupport.park被打断不会抛出异常,故在自旋时(for (;😉)需判断if (Thread.interrupted())线程是否被打断(手动抛出异常)。
四:线程运行时state的变化轨迹
4.1:新建时利用构造器设置state=NEW
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // 赋值状态}
4.2: 线程运行时state可能变化轨迹
public void run() {..........防止多次运行stat()方法..............try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex); //异常轨迹---> 见下分析}if (ran)set(result); // 正常轨迹--->见下分析}} finally {runner = null;//----最后结束---防止线程被打断int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
异常轨迹setException(ex)
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();//轨迹变化 2.NEW -> COMPLETING -> EXCEPTIONAL (异常)}//否则1: 3.NEW -> CANCELLED (任务被取消)//否则2: 4.NEW -> INTERRUPTING -> INTERRUPTED(任务被打断)
}
正常轨迹 set(result);
1.NEW -> COMPLETING -> NORMAL (成功完成)