1. synchronized中的线程通信
调用wait方法会使线程处于等待状态,直到另一个线程调用notify线程才会唤醒等待中的某个线程,生产者和消费者模型可以很好的使用到该例子。
生产者代码:
public class Producer implements Runnable {private Queue<String> bags;private int maxSize;public Producer(Queue<String> bags, int maxSize) {this.bags = bags;this.maxSize = maxSize;}@Overridepublic void run() {int i = 0;while (true) {i++;synchronized (bags) {if (bags.size() == maxSize) {System.out.println("bags 满了");try {bags.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("生产者生产: bag" + i);bags.add("bag" + i);bags.notify(); //表示当前已经生产了数据,提示消费者可以消费了}}}
}
消费者代码:
public class Consumer implements Runnable{private Queue<String> bags;private int maxSize;public Consumer(Queue<String> bags, int maxSize) {this.bags = bags;this.maxSize = maxSize;}@Overridepublic void run() {int i = 0;while (true) {synchronized (bags) {if (bags.isEmpty()) {System.out.println("bags为空");try {bags.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}String bag = bags.remove();System.out.println("消费者消费:" + bag);bags.notify(); //这里只是唤醒Producer线程,并不能马上执行} // 同步代码块执行结束,monitorexit指令执行完成}}
}
客户端代码:
public class ProducerConsumerExample {public static void main(String[] args) throws InterruptedException {LinkedList<String> strings = new LinkedList<>();Producer producer = new Producer(strings, 10);Consumer consumer = new Consumer(strings, 10);new Thread(producer).start();Thread.sleep(100);new Thread(consumer).start();}
}
分析:客户端中分别启动了两个线程producer和consumer,producer中当生产的bag数达到最大值时,就会处于阻塞状态,此时消费者开始消费,消费者每消费一个,就会去唤醒处于阻塞中的producer线程,但是producer线程并不会立即执行,当它被唤醒后,此时consumer线程也在执行中,因此它们两个线程会去竞争锁,谁竞争到了就执行谁,直到consumer线程把bags消费完后,才会阻塞。
1.1 线程通信的猜想条件
1.wait和notify必须在synchronized中,有锁的竞争才会存在线程之间的通信
2.它们是基于共享的数据结构进行通信的,通信的话必然涉及到线程,而线程又是并行的,因此需要一个条件的变量去做互斥。
3.在等待或唤醒的过程中,需要进行一个等待或唤醒的通信,必须要有一个队列来进行处理
synchronized的底层线程通信:
2.Condition设计猜想‘
wait/notify,锁的实现是synchronized
在J.U.C中,锁的实现是Lock,而Condition的作用就类似于wait/notify
猜想:
1.作用:实现线程的阻塞和唤醒
2.前提条件:必须要先获得锁
3.await->让线程阻塞,并且释放锁
4.signal->唤醒阻塞的线程
5.加锁的操作,必然会涉及到AQS的阻塞队列
6.await释放锁的时候->AQS队列中是不会存在已经释放锁的线程的,这个被释放的线程去了哪里?
await方法释放的线程,必须要有一个地方来存储,并且还需要被阻塞:->那么就会存在一个等待队列,LockSuppart.park阻塞
7.signal唤醒被阻塞的线程,从哪里唤醒?
上面猜想到的等待队列中,唤醒一个线程,放哪里去?是不是应该再放到AQS队列中,去抢占锁。
2.1Condition源码分析之await
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 1. 添加到等待队列Node node = addConditionWaiter();// 2. 完整的释放锁(考虑重入问题)int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this); // 3. 阻塞当前线程// 4. 要判断当前被阻塞的线程是否是因为interrupt唤醒的if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 5. 重新竞争所,savedstate表示的是被释放锁的重入次数if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
a.加入到Condition的等待队列中addConditionWaiter,并且node的节点状态被标记为CONDITION
private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
b..完成的释放锁fullRelease,返回的savedState表示重入次数
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();// 释放锁if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}
c.判断node是否在同步队列中isOnSyncQueue(node)
final boolean isOnSyncQueue(Node node) {// 状态仍是CONDITION那一定不在,同步队列会有一个冗余的前置节点,如果prev无前置节点,则一定不在if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 如果next节点不为空,则一定在if (node.next != null) // If has successor, it must be on queuereturn true;// 兜底方法,自旋遍历整个队列链表return findNodeFromTail(node);}
d..如果不在同步队列中,则就阻塞。 LockSupport.park(this);
e..如果当前节点是在同步队列中,就会去竞争锁1.同步队列的prev 为head,则会去尝试竞争锁2.如果prev不为head,则会将状态由CONDITION使用CAS机制更改为SIGNAL
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 1. 去竞争锁if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 修改Node节点的状态if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
f..处理interrupt的中断响应checkInterruptWhileWaiting(node),LockSupport.park已经把当前线程挂起了,因此到这个地方一个可能是interrupt中断从而唤醒了该线程,而另一个可能就是Signal唤醒了该线程
private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}
分析:如果不是interrupt唤醒的,则无事发生,返回0;如果是intertupt唤醒的,则执行transferAfterCancelledWait(node),判断线程是发出SIgNAL之前还是之后,如果在发出信号之前,执行enq(node),加入到同步队列,抛出中断异常,如果是在发出SIGNAL信号之后,则可能出现的情况时,线程A发出了SIGNAL信号,但是还没有将线程同步到同步队列呢,因此执行Thread,yield先让线程缓一缓,保证同步到同步队列中,之后再正常中断掉该线程
final boolean transferAfterCancelledWait(Node node) {// 发生在signal信号之前 会抛出异常if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {enq(node);return true;}// 上面CAS失败,说明在Signal之后了。如果还不在同步队列中,则让线程先让步,缓一缓while (!isOnSyncQueue(node))Thread.yield();return false;}
2.2Condition源码分析之signal
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}final boolean transferForSignal(Node node) {if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 1. 将节点加入到同步队列中Node p = enq(node);int ws = p.waitStatus;// 2.修改节点状态为SIGNAL 并且唤醒这个node 是等待队列中的node的线程if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
分析: signal主要做了两件事,拿到等待队列中的Node,将它加入到同步队列中,并更新状态为SIGNAL,然后唤醒刚才等待队列中的那个node,再回到wait中的LockSupport.park方法之后接着执行2.1中的e和f。
3.总结
1.线程的通信是基于在同一个锁下,且线程之间存在竞争,并且还需要有一个共享的资源,彼此之间基于共享资源来进行交互的一种通信方式。
2.wait/notify是基于synchronized同步锁实现的
3.Condition是J.U.C中的实现,基于Lock锁