JUC并发编程(5)(自定义线程池 + 共享模型之工具2)

JUC并发编程(5)(自定义线程池 + 共享模型之工具2)

笔记内容来源于黑马程序员教学视频

一、共享模型之工具2

①:读写锁

1、ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用读写锁读-读可以并发,提高性能。 类似于数据库中的select ... from ... lock in share mode

提供一个数据容器类内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

测试

class DataContainer {private Object data;private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock r = rw.readLock();private ReentrantReadWriteLock.WriteLock w = rw.writeLock();public Object read() {log.debug("获取读锁...");r.lock();try {log.debug("读取");sleep(1);return data;} finally {log.debug("释放读锁...");r.unlock();}}public void write() {log.debug("获取写锁...");w.lock();try {log.debug("写入");sleep(1);} finally {log.debug("释放写锁...");w.unlock();}}
}

测试读锁-读锁可以并发

DataContainer dataContainer = new DataContainer();
new Thread(() -> {dataContainer.read();
}, "t1").start();
new Thread(() -> {dataContainer.read();
}, "t2").start();

输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响

14:05:14.341 c.DataContainer [t2] - 获取读锁... 
14:05:14.341 c.DataContainer [t1] - 获取读锁... 
14:05:14.345 c.DataContainer [t1] - 读取
14:05:14.345 c.DataContainer [t2] - 读取
14:05:15.365 c.DataContainer [t2] - 释放读锁... 
14:05:15.386 c.DataContainer [t1] - 释放读锁... 

测试读锁-写锁相互阻塞

DataContainer dataContainer = new DataContainer();
new Thread(() -> {dataContainer.read();
}, "t1").start();
Thread.sleep(100);
new Thread(() -> {dataContainer.write();
}, "t2").start();

输出结果

14:04:21.838 c.DataContainer [t1] - 获取读锁... 
14:04:21.838 c.DataContainer [t2] - 获取写锁... 
14:04:21.841 c.DataContainer [t2] - 写入
14:04:22.843 c.DataContainer [t2] - 释放写锁... 
14:04:22.843 c.DataContainer [t1] - 读取
14:04:23.843 c.DataContainer [t1] - 释放读锁... 

写锁-写锁也是相互阻塞的,这里就不测试了

注意事项

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
r.lock();
try {// ...w.lock();try {// ...} finally{w.unlock();}
} finally{r.unlock();
}
  • 重入时降级支持:即持有写锁的情况下去获取读锁
class CachedData {Object data;// 是否有效,如果失效,需要重新计算 datavolatile boolean cacheValid;final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();void processCachedData() {rwl.readLock().lock();if (!cacheValid) {// 获取写锁前必须释放读锁rwl.readLock().unlock();rwl.writeLock().lock();try {// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新if (!cacheValid) {data = ...cacheValid = true;}// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存rwl.readLock().lock();} finally {rwl.writeLock().unlock();}}// 自己用完数据, 释放读锁 try {use(data);} finally {rwl.readLock().unlock();}}
}

*2、应用之缓存

01. 缓存更新策略

更新时,是先清缓存还是先更新数据库

先清缓存

image.png

先更新数据库

image.png

补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询

这种情况的出现几率非常小,见 facebook 论文

02. 读写锁实现一致性缓存

使用读写锁实现一个简单的按需加载缓存

class GenericCachedDao<T> {// HashMap 作为缓存非线程安全, 需要保护HashMap<SqlPair, T> map = new HashMap<>();ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); GenericDao genericDao = new GenericDao();public int update(String sql, Object... params) {SqlPair key = new SqlPair(sql, params);// 加写锁, 防止其它线程对缓存读取和更改lock.writeLock().lock();try {int rows = genericDao.update(sql, params);map.clear();return rows;} finally {lock.writeLock().unlock();}}public T queryOne(Class<T> beanClass, String sql, Object... params) {SqlPair key = new SqlPair(sql, params);// 加读锁, 防止其它线程对缓存更改lock.readLock().lock();try {T value = map.get(key);if (value != null) {return value;}} finally {lock.readLock().unlock();}// 加写锁, 防止其它线程对缓存读取和更改lock.writeLock().lock();try {// get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据// 为防止重复查询数据库, 再次验证T value = map.get(key);if (value == null) {// 如果没有, 查询数据库value = genericDao.queryOne(beanClass, sql, params);map.put(key, value);}return value;} finally {lock.writeLock().unlock();}}// 作为 key 保证其是不可变的class SqlPair {private String sql;private Object[] params;public SqlPair(String sql, Object[] params) {this.sql = sql;this.params = params;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}SqlPair sqlPair = (SqlPair) o;return sql.equals(sqlPair.sql) &&Arrays.equals(params, sqlPair.params);}@Overridepublic int hashCode() {int result = Objects.hash(sql);result = 31 * result + Arrays.hashCode(params);return result;}}
}

注意

  • 以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑

    • 适合读多写少,如果写操作比较频繁,以上实现性能低
    • 没有考虑缓存容量
    • 没有考虑缓存过期
    • 只适合单机
    • 并发性还是低,目前只会用一把锁
    • 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)
  • 乐观锁实现:用 CAS 去更新

*3、读写锁原理

01. 图解流程

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

t1 w.lock,t2 r.lock

1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁 使用的是 state 的高 16 位

image.png

2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写 锁占据,那么 tryAcquireShared 返回 -1 表示失败

tryAcquireShared 返回值表示

  • -1 表示失败
  • 0 表示成功,但后继节点不会继续唤醒
  • 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1

image.png

3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

image.png

4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁

5)如果没有成功,在 doAcquireShared 内 for (;😉 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;😉 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park

image.png

t3 r.lock,t4 w.lock

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

image.png

t1 w.unlock

这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

image.png

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

image.png

这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

image.png

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

image.png

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

image.png

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

image.png

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

t2 r.unlock,t3 r.unlock

t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

image.png

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

image.png

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束

image.png

02. 源码分析
1. 写锁上锁流程
static final class NonfairSync extends Sync {// ... 省略无关代码// 外部类 WriteLock 方法, 方便阅读, 放在此处public void lock() {sync.acquire(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final void acquire(int arg) {if (// 尝试获得写锁失败!tryAcquire(arg) &&// 将当前线程关联到一个 Node 对象上, 模式为独占模式// 进入 AQS 队列阻塞acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();}}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();// 获得低 16 位, 代表写锁的 state 计数int w = exclusiveCount(c);//表示有写锁或者有读锁if (c != 0) {if (// c != 0 and w == 0 表示有读锁, 或者w == 0 ||// 如果 exclusiveOwnerThread 不是自己current != getExclusiveOwnerThread()) {// 获得锁失败return false;}// 写锁计数超过低 16 位, 报异常if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 写锁重入, 获得锁成功setState(c + acquires);return true;} if (// 判断写锁是否该阻塞, 或者//非公平锁下,总是返回falsewriterShouldBlock() ||// 尝试更改计数失败!compareAndSetState(c, c + acquires)) {// 获得锁失败return false;}// 获得锁成功setExclusiveOwnerThread(current);return true;}// 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞final boolean writerShouldBlock() {return false;}
}

总结:

  • lock -> syn.acquire ->tryAquire

    • 如果有锁:

      • 如果是写锁或者锁持有者不为自己,返回false
      • 如果时写锁且为自己持有,则重入
    • 如果无锁:

      • 判断无序阻塞并设置state成功后,将owner设为自己,返回true
  • 成功,则获得了锁

  • 失败:

    • 调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)进入阻塞队列,将节点状态设置为EXCLUSIVE,之后的逻辑与之前的aquireQueued类似。
2. 写锁释放流程
static final class NonfairSync extends Sync {// ... 省略无关代码// WriteLock 方法, 方便阅读, 放在此处public void unlock() {sync.release(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final boolean release(int arg) {// 尝试释放写锁成功if (tryRelease(arg)) {// unpark AQS 中等待的线程Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc = getState() - releases;// 因为可重入的原因, 写锁计数为 0, 才算释放成功boolean free = exclusiveCount(nextc) == 0;if (free) {setExclusiveOwnerThread(null);}setState(nextc);return free;}
}

总结:

  • unlock->syn.release->tryRelease

    • state状态减少

      • 如果减为零,表示解锁成功,返回true
      • 没有减为0,当前线程依旧持有锁
  • 成功:解锁成功

    • 如果ASQ队列不为空,则唤醒第一个节点。
  • 失败:解锁失败。

3. 读锁上锁流程
static final class NonfairSync extends Sync {// ReadLock 方法, 方便阅读, 放在此处public void lock() {sync.acquireShared(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final void acquireShared(int arg) {// tryAcquireShared 返回负数, 表示获取读锁失败//大于0的情况在读写锁这里无区别,后面信号量会做进一步处理。if (tryAcquireShared(arg) < 0) {doAcquireShared(arg);}}// Sync 继承过来的方法, 方便阅读, 放在此处protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();// 如果是其它线程持有写锁, 获取读锁失败if ( exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current) {return -1;}int r = sharedCount(c);if (// 读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且!readerShouldBlock() &&// 小于读锁计数, 并且r < MAX_COUNT &&// 尝试增加计数成功compareAndSetState(c, c + SHARED_UNIT)) {// ... 省略不重要的代码return 1;}return fullTryAcquireShared(current);}// 非公平锁 readerShouldBlock 看 AQS 队列中第一个节点是否是写锁// true 则该阻塞, false 则不阻塞final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}// AQS 继承过来的方法, 方便阅读, 放在此处// 与 tryAcquireShared 功能类似, 但会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {int c = getState();if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;} else if (readerShouldBlock()) {// ... 省略不重要的代码}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) {// ... 省略不重要的代码return 1;}}}// AQS 继承过来的方法, 方便阅读, 放在此处private void doAcquireShared(int arg) {// 将当前线程关联到一个 Node 对象上, 模式为共享模式final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {// 再一次尝试获取读锁int r = tryAcquireShared(arg);// 成功if (r >= 0) {// ㈠// r 表示可用资源数, 在这里总是 1 允许传播//(唤醒 AQS 中下一个 Share 节点)setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (// 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)shouldParkAfterFailedAcquire(p, node) &&// park 当前线程parkAndCheckInterrupt()) {interrupted = true;}}} finally {if (failed)cancelAcquire(node);}}// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check below// 设置自己为 headsetHead(node);// propagate 表示有共享资源(例如共享读锁或信号量)// 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE// 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATEif (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 如果是最后一个节点或者是等待共享读锁的节点if (s == null || s.isShared()) {// 进入 ㈡doReleaseShared();}}}// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处private void doReleaseShared() {// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark// 如果 head.waitStatus == 0 ==> Node.PROPAGATE, 为了解决 bug, 见后面分析for (;;) {Node h = head;// 队列还有节点if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases// 下一个节点 unpark 如果成功获取读锁// 并且下下个节点还是 shared, 继续 doReleaseSharedunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
}

总结:

  • lock->syn.acquireShare->tryAcquireShare

    • 如果其他线程持有写锁:则失败,返回-1
    • 否则:判断无需等待后,将state加上一个写锁的单位,返回1
  • 返回值大于等于0:成功

  • 返回值小于0:

    • 调用doAcquireShare,类似之前的aquireQueued,将当前线程关联节点,状态设置为SHARE,插入AQS队列尾部。在for循环中判断当前节点的前驱节点是否为头节点

      • 是:调用tryAcquireShare

        • 如果返回值大于等于0,则获取锁成功,并调用setHeadAndPropagate,出队,并不断唤醒AQS队列中的状态为SHARE的节点,直到下一个节点为EXCLUSIVE。记录打断标记,之后退出方法(不返回打断标记)
    • 判断是否在失败后阻塞

      • 是:阻塞住,并监测打断信号。
      • 否则:将前驱节点状态设为-1。(下一次循环就又要阻塞了)
4. 读锁释放流程
static final class NonfairSync extends Sync {// ReadLock 方法, 方便阅读, 放在此处public void unlock() {sync.releaseShared(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryReleaseShared(int unused) {// ... 省略不重要的代码for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc)) {// 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程// 计数为 0 才是真正释放return nextc == 0;}}}// AQS 继承过来的方法, 方便阅读, 放在此处private void doReleaseShared() {// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark// 如果 head.waitStatus == 0 ==> Node.PROPAGATE for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0// 防止 unparkSuccessor 被多次执行if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}// 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}} 
}

总结:

  • unlock->releaseShared->tryReleaseShared,将state减去一个share单元,最后state为0则返回true,不然返回false。
  • 返回tue:调用doReleaseShare,唤醒队列中的节点。
  • 返回false:解锁不完全。

4、StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用 加解读锁

long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){// 锁升级
}

提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法

class DataContainerStamped {private int data;private final StampedLock lock = new StampedLock();public DataContainerStamped(int data) {this.data = data;}public int read(int readTime) {//获取戳long stamp = lock.tryOptimisticRead();log.debug("optimistic read locking...{}", stamp);//读取数据sleep(readTime);//读取数据之后再验戳if (lock.validate(stamp)) {log.debug("read finish...{}, data:{}", stamp, data);return data;}//如果验戳失败,说明已经数据已经被修改,需要升级锁重新读。// 锁升级 - 读锁log.debug("updating to read lock... {}", stamp);try {stamp = lock.readLock();log.debug("read lock {}", stamp);sleep(readTime);log.debug("read finish...{}, data:{}", stamp, data);return data;} finally {log.debug("read unlock {}", stamp);lock.unlockRead(stamp);}}public void write(int newData) {long stamp = lock.writeLock();log.debug("write lock {}", stamp);try {sleep(2);this.data = newData;} finally {log.debug("write unlock {}", stamp);lock.unlockWrite(stamp);}}
}

测试读-读可以优化

public static void main(String[] args) {DataContainerStamped dataContainer = new DataContainerStamped(1);new Thread(() -> {dataContainer.read(1);}, "t1").start();sleep(0.5);new Thread(() -> {dataContainer.read(0);}, "t2").start();
}

输出结果,可以看到实际没有加读锁

15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1 

测试读-写时优化读补加读锁

public static void main(String[] args) {DataContainerStamped dataContainer = new DataContainerStamped(1);new Thread(() -> {dataContainer.read(1);}, "t1").start();sleep(0.5);new Thread(() -> {dataContainer.write(100);}, "t2").start();
}

输出结果

15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 
15:57:00.717 c.DataContainerStamped [t2] - write lock 384 
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 
15:57:02.719 c.DataContainerStamped [t1] - read lock 513 
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513 

注意

  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入

②:Semaphore

1、基本使用

[ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限。

public static void main(String[] args) {// 1. 创建 semaphore 对象Semaphore semaphore = new Semaphore(3);// 2. 10个线程同时运行for (int i = 0; i < 10; i++) {new Thread(() -> {// 3. 获取许可try {semaphore.acquire();//对于非打断式获取,如果此过程中被打断,线程依旧会等到获取了信号量之后才进入catch块。//catch块中的线程依旧持有信号量,捕获该异常后catch块可以不做任何处理。} catch (InterruptedException e) {e.printStackTrace();}try {log.debug("running...");sleep(1);log.debug("end...");} finally {// 4. 释放许可semaphore.release();}}).start();}
}

输出

07:35:15.485 c.TestSemaphore [Thread-2] - running... 
07:35:15.485 c.TestSemaphore [Thread-1] - running... 
07:35:15.485 c.TestSemaphore [Thread-0] - running... 
07:35:16.490 c.TestSemaphore [Thread-2] - end... 
07:35:16.490 c.TestSemaphore [Thread-0] - end... 
07:35:16.490 c.TestSemaphore [Thread-1] - end... 
07:35:16.490 c.TestSemaphore [Thread-3] - running... 
07:35:16.490 c.TestSemaphore [Thread-5] - running... 
07:35:16.490 c.TestSemaphore [Thread-4] - running... 
07:35:17.490 c.TestSemaphore [Thread-5] - end... 
07:35:17.490 c.TestSemaphore [Thread-4] - end... 
07:35:17.490 c.TestSemaphore [Thread-3] - end... 
07:35:17.490 c.TestSemaphore [Thread-6] - running... 
07:35:17.490 c.TestSemaphore [Thread-7] - running... 
07:35:17.490 c.TestSemaphore [Thread-9] - running... 
07:35:18.491 c.TestSemaphore [Thread-6] - end... 
07:35:18.491 c.TestSemaphore [Thread-7] - end... 
07:35:18.491 c.TestSemaphore [Thread-9] - end... 
07:35:18.491 c.TestSemaphore [Thread-8] - running... 
07:35:19.492 c.TestSemaphore [Thread-8] - end... 

说明:

  • Semaphore有两个构造器:Semaphore(int permits)Semaphore(int permits,boolean fair)
  • permits表示允许同时访问共享资源的线程数。
  • fair表示公平与否,与之前的ReentrantLock一样。

*2、Semaphore 应用

semaphore 限制对共享资源的使用

  • 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机 线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
  • 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的
@Slf4j(topic = "c.Pool")
class Pool {// 1. 连接池大小private final int poolSize;// 2. 连接对象数组private Connection[] connections;// 3. 连接状态数组 0 表示空闲, 1 表示繁忙private AtomicIntegerArray states;private Semaphore semaphore;// 4. 构造方法初始化public Pool(int poolSize) {this.poolSize = poolSize;// 让许可数与资源数一致this.semaphore = new Semaphore(poolSize);this.connections = new Connection[poolSize];this.states = new AtomicIntegerArray(new int[poolSize]);for (int i = 0; i < poolSize; i++) {connections[i] = new MockConnection("连接" + (i+1));}}// 5. 借连接public Connection borrow() {// t1, t2, t3// 获取许可try {semaphore.acquire(); // 没有许可的线程,在此等待} catch (InterruptedException e) {e.printStackTrace();}for (int i = 0; i < poolSize; i++) {// 获取空闲连接if(states.get(i) == 0) {if (states.compareAndSet(i, 0, 1)) {log.debug("borrow {}", connections[i]);return connections[i];}}}// 不会执行到这里return null;}// 6. 归还连接public void free(Connection conn) {for (int i = 0; i < poolSize; i++) {if (connections[i] == conn) {states.set(i, 0);log.debug("free {}", conn);semaphore.release();break;}}}
}

*3、Semaphore 原理

01. 加锁解锁流程

Semaphore有点像一个停车场,permits就好像停车位数量,当线程获得了permits就像是获得了停车位,然后停车场显示空余车位减一。

刚开始,permits(state)为 3,这时 5 个线程来获取资源

image.png

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞

image.png
这时 Thread-4 释放了 permits,状态如下

image.png

接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接 下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

image.png

02. 源码分析
static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {// permits 即 statesuper(permits);}// Semaphore 方法, 方便阅读, 放在此处public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}// 尝试获得共享锁protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}// Sync 继承过来的方法, 方便阅读, 放在此处final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires; if (// 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptiblyremaining < 0 ||// 如果 cas 重试成功, 返回正数, 表示获取成功compareAndSetState(available, remaining)) {return remaining;}}}// AQS 继承过来的方法, 方便阅读, 放在此处private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {// 再次尝试获取许可int r = tryAcquireShared(arg);if (r >= 0) {// 成功后本线程出队(AQS), 所在 Node设置为 head// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark// 如果 head.waitStatus == 0 ==> Node.PROPAGATE // r 表示可用资源数, 为 0 则不会继续传播setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}// Semaphore 方法, 方便阅读, 放在此处public void release() {sync.releaseShared(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
}
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check below// 设置自己为 headsetHead(node);// propagate 表示有共享资源(例如共享读锁或信号量)// 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE// 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATEif (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 如果是最后一个节点或者是等待共享读锁的节点if (s == null || s.isShared()) {doReleaseShared();}}
}
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}
}
1. 加锁流程总结:
  • acquire->acquireSharedInterruptibly(1)->tryAcquireShared(1)->nonfairTryAcquireShared(1),如果资源用完了,返回负数,tryAcquireShared返回负数,表示失败。否则返回正数,tryAcquireShared返回正数,表示成功。

    • 如果成功,获取信号量成功。

    • 如果失败,调用doAcquireSharedInterruptibly,进入for循环:

      • 如果当前驱节点为头节点,调用tryAcquireShared尝试获取锁

        • 如果结果大于等于0,表明获取锁成功,调用setHeadAndPropagate,将当前节点设为头节点,之后又调用doReleaseShared,唤醒后继节点。
      • 调用shoudParkAfterFailure,第一次调用返回false,并将前驱节点改为-1,第二次循环如果再进入此方法,会进入阻塞并检查打断的方法。

2. 解锁流程总结:
  • release->sync.releaseShared(1)->tryReleaseShared(1),只要不发生整数溢出,就返回true

    • 如果返回true,调用doReleaseShared,唤醒后继节点。
    • 如果返回false,解锁失败。
03. 为什么要有 PROPAGATE

③:CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。

其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3);new Thread(() -> {log.debug("begin...");sleep(1);latch.countDown();log.debug("end...{}", latch.getCount());}).start();new Thread(() -> {log.debug("begin...");sleep(2);latch.countDown();log.debug("end...{}", latch.getCount());}).start();new Thread(() -> {log.debug("begin...");sleep(1.5);latch.countDown();log.debug("end...{}", latch.getCount());}).start();log.debug("waiting...");latch.await();log.debug("wait end...");
}

输出

18:44:00.778 c.TestCountDownLatch [main] - waiting... 
18:44:00.778 c.TestCountDownLatch [Thread-2] - begin... 
18:44:00.778 c.TestCountDownLatch [Thread-0] - begin... 
18:44:00.778 c.TestCountDownLatch [Thread-1] - begin... 
18:44:01.782 c.TestCountDownLatch [Thread-0] - end...2 
18:44:02.283 c.TestCountDownLatch [Thread-2] - end...1 
18:44:02.782 c.TestCountDownLatch [Thread-1] - end...0 
18:44:02.782 c.TestCountDownLatch [main] - wait end... 

相比于join,CountDownLatch能配合线程池使用。

public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3);ExecutorService service = Executors.newFixedThreadPool(4);service.submit(() -> {log.debug("begin...");sleep(1);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(() -> {log.debug("begin...");sleep(1.5);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(() -> {log.debug("begin...");sleep(2);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(()->{try {log.debug("waiting...");latch.await();log.debug("wait end...");} catch (InterruptedException e) {e.printStackTrace();}});
}

*1、应用之同步等待多线程准备完毕

AtomicInteger num = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {return new Thread(r, "t" + num.getAndIncrement());
});
CountDownLatch latch = new CountDownLatch(10);
String[] all = new String[10];
Random r = new Random();
for (int j = 0; j < 10; j++) {int x = j;service.submit(() -> {for (int i = 0; i <= 100; i++) {try {//随机休眠,模拟网络延迟Thread.sleep(r.nextInt(100));} catch (InterruptedException e) {}all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")";//\r可以让当前输出覆盖上一次的输出。System.out.print("\r" + Arrays.toString(all));}latch.countDown();});
}
latch.await();
System.out.println("\n游戏开始...");
service.shutdown();

中间输出

[t0(52%), t1(47%), t2(51%), t3(40%), t4(49%), t5(44%), t6(49%), t7(52%), t8(46%), t9(46%)] 

最后输出

[t0(100%), t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%), t8(100%), 
t9(100%)] 
游戏开始... 

*2、应用之同步等待多个远程调用结束

@RestController
public class TestCountDownlatchController {@GetMapping("/order/{id}")public Map<String, Object> order(@PathVariable int id) {HashMap<String, Object> map = new HashMap<>();map.put("id", id);map.put("total", "2300.00");sleep(2000);return map;}@GetMapping("/product/{id}")public Map<String, Object> product(@PathVariable int id) {HashMap<String, Object> map = new HashMap<>();if (id == 1) {map.put("name", "小爱音箱");map.put("price", 300);} else if (id == 2) {map.put("name", "小米手机");map.put("price", 2000);}map.put("id", id);sleep(1000);return map;}@GetMapping("/logistics/{id}")public Map<String, Object> logistics(@PathVariable int id) {HashMap<String, Object> map = new HashMap<>();map.put("id", id);map.put("name", "中通快递");sleep(2500);return map;}private void sleep(int millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}
}

rest远程调用

RestTemplate restTemplate = new RestTemplate();
log.debug("begin");
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);
Future<Map<String,Object>> f1 = service.submit(() -> {Map<String, Object> r =restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);return r;
});
Future<Map<String, Object>> f2 = service.submit(() -> {Map<String, Object> r =restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);return r;
});
Future<Map<String, Object>> f3 = service.submit(() -> {Map<String, Object> r =restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);return r;
});
Future<Map<String, Object>> f4 = service.submit(() -> {Map<String, Object> r =restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);return r;
});
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
log.debug("执行完毕");
service.shutdown();

执行结果

19:51:39.711 c.TestCountDownLatch [main] - begin 
{total=2300.00, id=1} 
{price=300, name=小爱音箱, id=1} 
{price=2000, name=小米手机, id=2} 
{name=中通快递, id=1} 
19:51:42.407 c.TestCountDownLatch [main] - 执行完毕

说明:

  • 这种等待多个带有返回值的任务的场景,还是用future比较合适,CountdownLatch适合任务没有返回值的场景。

④:CyclicBarrier

CountdownLatch的缺点在于不能重用,见下:

private static void test1() {ExecutorService service = Executors.newFixedThreadPool(5);for (int i = 0; i < 3; i++) {CountDownLatch latch = new CountDownLatch(2);service.submit(() -> {log.debug("task1 start...");sleep(1);latch.countDown();});service.submit(() -> {log.debug("task2 start...");sleep(2);latch.countDown();});try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}log.debug("task1 task2 finish...");}service.shutdown();
}

想要重复使用CountdownLatch进行同步,必须创建多个CountDownLatch对象。

[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执 行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行

CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行
new Thread(()->{System.out.println("线程1开始.."+new Date());try {cb.await(); // 当个数不足时,等待} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}System.out.println("线程1继续向下运行..."+new Date());
}).start();
new Thread(()->{System.out.println("线程2开始.."+new Date());try { Thread.sleep(2000); } catch (InterruptedException e) { }try {cb.await(); // 2 秒后,线程个数够2,继续运行} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}System.out.println("线程2继续向下运行..."+new Date());
}).start();

注意

  • CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比 喻为『人满发车』
  • CountDownLatch的计数和阻塞方法是分开的两个方法,而CyclicBarrier是一个方法。
  • CyclicBarrier的构造器还有一个Runnable类型的参数,在计数为0时会执行其中的run方法。

⑤:线程安全集合类概述

image-20220316174115768

线程安全集合类可以分为三大类:

  • 遗留的线程安全集合如HashtableVector

  • 使用Collections装饰的线程安全集合,如:

    • Collections.synchronizedCollection
    • Collections.synchronizedList
    • Collections.synchronizedMap
    • Collections.synchronizedSet
    • Collections.synchronizedNavigableMap
    • Collections.synchronizedNavigableSet
    • Collections.synchronizedSortedMap
    • Collections.synchronizedSortedSet
    • 说明:以上集合均采用修饰模式设计,将非线程安全的集合包装后,在调用方法时包裹了一层synchronized代码块。其并发性并不比遗留的安全集合好。
  • java.util.concurrent.*

重点介绍java.util.concurrent.*下的线程安全集合类,可以发现它们有规律,里面包含三类关键词: Blocking、CopyOnWrite、Concurrent

  • Blocking 大部分实现基于锁,并提供用来阻塞的方法

  • CopyOnWrite 之类容器修改开销相对较重

  • Concurrent 类型的容器

    • 内部很多操作使用 cas 优化,一般可以提供较高吞吐量

    • 弱一致性

      • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍 历,这时内容是旧的
      • 求大小弱一致性,size 操作未必是 100% 准确
      • 读取弱一致性

遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出 ConcurrentModificationException,不再继续遍历

⑥:ConcurrentHashMap

1、应用之单词计数

搭建练习环境:

public class Test {public static void main(String[] args){//在main方法中实现两个接口}//开启26个线程,每个线程调用get方法获取map,从对应的文件读取单词并存储到list中,最后调用accept方法进行统计。public static <V> void  calculate(Supplier<Map<String,V>> supplier, BiConsumer<Map<String,V>, List<String>> consumer) {Map<String, V> map = supplier.get();CountDownLatch count = new CountDownLatch(26);for (int i = 1; i < 27; i++) {int k = i;new Thread(()->{ArrayList<String> list = new ArrayList<>();read(list,k);consumer.accept(map,list);count.countDown();}).start();}try {count.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(map.toString());}//读单词方法的实现public static void read(List<String> list,int i){try{String element;BufferedReader reader = new BufferedReader(new FileReader(i + ".txt"));while((element = reader.readLine()) != null){list.add(element);}}catch (IOException e){}}//生成测试数据public void construct(){String str = "abcdefghijklmnopqrstuvwxyz";ArrayList<String> list = new ArrayList<>();for (int i = 0; i < str.length(); i++) {for (int j = 0; j < 200; j++) {list.add(String.valueOf(str.charAt(i)));}}Collections.shuffle(list);for (int i = 0; i < 26; i++) {try (PrintWriter out = new PrintWriter(new FileWriter(i + 1 + ".txt"))) {String collect = list.subList(i * 200, (i + 1) * 200).stream().collect(Collectors.joining("\n"));out.println(collect);} catch (IOException e) {e.printStackTrace();}}}
}
01. 实现一:
demo(// 创建 map 集合// 创建 ConcurrentHashMap 对不对?() -> new ConcurrentHashMap<String, Integer>(),// 进行计数(map, words) -> {for (String word : words) {Integer counter = map.get(word);int newValue = counter == null ? 1 : counter + 1;map.put(word, newValue);}}
);

输出:

{a=186, b=192, c=187, d=184, e=185, f=185, g=176, h=185, i=193, j=189, k=187, l=157, m=189, n=181, o=180, p=178, q=185, r=188, s=181, t=183, u=177, v=186, w=188, x=178, y=189, z=186}
47

错误原因:

  • ConcurrentHashMap虽然每个方法都是线程安全的,但是多个方法的组合并不是线程安全的。
02. 正确答案一:
demo(() -> new ConcurrentHashMap<String, LongAdder>(),(map, words) -> {for (String word : words) {// 注意不能使用 putIfAbsent,此方法返回的是上一次的 value,首次调用返回 nullmap.computeIfAbsent(word, (key) -> new LongAdder()).increment();}}
);

说明:

  • computIfAbsent方法的作用是:当map中不存在以参数1为key对应的value时,会将参数2函数式接口的返回值作为value,put进map中,然后返回该value。如果存在key,则直接返回value
  • 以上两部均是线程安全的。
03. 正确答案二:
demo(() -> new ConcurrentHashMap<String, Integer>(),(map, words) -> {for (String word : words) {// 函数式编程,无需原子变量map.merge(word, 1, Integer::sum);}}
);

*2、ConcurrentHashMap 原理

01. JDK 7 HashMap 并发死链
1. 测试代码

注意

  • 要在 JDK 7 下运行,否则扩容机制和 hash 的计算方法都变了
  • 以下测试代码是精心准备的,不要随便改动
public static void main(String[] args) {// 测试 java 7 中哪些数字的 hash 结果相等System.out.println("长度为16时,桶下标为1的key");for (int i = 0; i < 64; i++) {if (hash(i) % 16 == 1) {System.out.println(i);}}System.out.println("长度为32时,桶下标为1的key");for (int i = 0; i < 64; i++) {if (hash(i) % 32 == 1) {System.out.println(i);}}// 1, 35, 16, 50 当大小为16时,它们在一个桶内final HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();// 放 12 个元素map.put(2, null);map.put(3, null);map.put(4, null);map.put(5, null);map.put(6, null);map.put(7, null);map.put(8, null);map.put(9, null);map.put(10, null);map.put(16, null);map.put(35, null);map.put(1, null);System.out.println("扩容前大小[main]:"+map.size());new Thread() {@Overridepublic void run() {// 放第 13 个元素, 发生扩容map.put(50, null);System.out.println("扩容后大小[Thread-0]:"+map.size());}}.start();new Thread() {@Overridepublic void run() {// 放第 13 个元素, 发生扩容map.put(50, null);System.out.println("扩容后大小[Thread-1]:"+map.size());}}.start();
}
final static int hash(Object k) {int h = 0;if (0 != h && k instanceof String) {return sun.misc.Hashing.stringHash32((String) k);}h ^= k.hashCode();h ^= (h >>> 20) ^ (h >>> 12);return h ^ (h >>> 7) ^ (h >>> 4);
}
2. 死链复现

调试工具使用 idea

在 HashMap 源码 590 行加断点

int newCapacity = newTable.length;

断点的条件如下,目的是让 HashMap 在扩容为 32 时,并且线程为 Thread-0 或 Thread-1 时停下来

newTable.length==32 &&(Thread.currentThread().getName().equals("Thread-0")||Thread.currentThread().getName().equals("Thread-1"))

断点暂停方式选择 Thread,否则在调试 Thread-0 时,Thread-1 无法恢复运行

运行代码,程序在预料的断点位置停了下来,输出

长度为16时,桶下标为1的key 
1 
16 
35 
50 
长度为32时,桶下标为1的key 
1 
35 
扩容前大小[main]:12 

接下来进入扩容流程调试

在 HashMap 源码 594 行加断点

Entry<K,V> next = e.next; // 593
if (rehash) // 594
// ...

这是为了观察 e 节点和 next 节点的状态,Thread-0 单步执行到 594 行,再 594 处再添加一个断点(条件 Thread.currentThread().getName().equals(“Thread-0”))

这时可以在 Variables 面板观察到 e 和 next 变量,使用view as -> Object查看节点状态

e (1)->(35)->(16)->null 
next (35)->(16)->null 

在 Threads 面板选中 Thread-1 恢复运行,可以看到控制台输出新的内容如下,Thread-1 扩容已完成

newTable[1] (35)->(1)->null 
扩容后大小:13 

这时 Thread-0 还停在 594 处, Variables 面板变量的状态已经变化为

e (1)->null 
next (35)->(1)->null 

为什么呢,因为 Thread-1 扩容时链表也是后加入的元素放入链表头,因此链表就倒过来了,但 Thread-1 虽然结 果正确,但它结束后 Thread-0 还要继续运行

接下来就可以单步调试(F8)观察死链的产生了

下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1] (1)->null 
e (35)->(1)->null 
next (1)->null

下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1] (35)->(1)->null 
e (1)->null 
next null 

再看看源码

e.next = newTable[1];
// 这时 e (1,35)
// 而 newTable[1] (35,1)->(1,35) 因为是同一个对象
newTable[1] = e; 
// 再尝试将 e 作为链表头, 死链已成
e = next;
// 虽然 next 是 null, 会进入下一个链表的复制, 但死链已经形成了
3. 源码分析

HashMap 的并发死链发生在扩容时

// 将 table 迁移至 newTable
void transfer(Entry[] newTable, boolean rehash) { int newCapacity = newTable.length;for (Entry<K,V> e : table) {while(null != e) {Entry<K,V> next = e.next;// 1 处if (rehash) {e.hash = null == e.key ? 0 : hash(e.key);}int i = indexFor(e.hash, newCapacity);// 2 处// 将新元素加入 newTable[i], 原 newTable[i] 作为新元素的 nexte.next = newTable[i];newTable[i] = e;e = next;}}
}

假设 map 中初始元素是

原始链表,格式:[下标] (key,next)
[1] (1,35)->(35,16)->(16,null)
线程 a 执行到 1 处 ,此时局部变量 e 为 (1,35),而局部变量 next 为 (35,16) 线程 a 挂起
线程 b 开始执行
第一次循环
[1] (1,null)
第二次循环
[1] (35,1)->(1,null)
第三次循环
[1] (35,1)->(1,null)
[17] (16,null)
切换回线程 a,此时局部变量 e 和 next 被恢复,引用没变但内容变了:e 的内容被改为 (1,null),而 next 的内
容被改为 (35,1) 并链向 (1,null)
第一次循环
[1] (1,null)
第二次循环,注意这时 e 是 (35,1) 并链向 (1,null) 所以 next 又是 (1,null)
[1] (35,1)->(1,null)
第三次循环,e 是 (1,null),而 next 是 null,但 e 被放入链表头,这样 e.next 变成了 35 (2 处)
[1] (1,35)->(35,1)->(1,35)
已经是死链了

小结

  • 究其原因,是因为在多线程环境下使用了非线程安全的 map 集合
  • JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能 够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)
02. JDK 8 ConcurrentHashMap
1. 重要属性和内部类
// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}
// hash 表
transient volatile Node<K,V>[] table;
// 扩容时的 新 hash 表
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}
// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> {}
2. 重要方法
// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
3. 构造器分析

可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);// tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 ... int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;
}
4. get流程
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// spread 方法能确保返回结果是正数int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 如果头结点已经是要查找的 keyif ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// hash 为负数表示该 bin 在扩容中或是 treebin, 这时调用 find 方法来查找else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;// 正常遍历链表, 用 equals 比较while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}

总结:

  • 如果table不为空且长度大于0且索引位置有元素

    • if 头节点key的hash值相等

      • 头节点的key指向同一个地址或者equals

        • 返回value
    • else if 头节点的hash为负数(bin在扩容或者是treebin)

      • 调用find方法查找
    • 进入循环(e不为空):

      • 节点key的hash值相等,且key指向同一个地址或equals

        • 返回value
  • 返回null

5. put 流程

以下数组简称(table),链表简称(bin)

public V put(K key, V value) {return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 其中 spread 方法会综合高位低位, 具有更好的 hash 性int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {// f 是链表头节点// fh 是链表头结点的 hash// i 是链表在 table 中的下标Node<K,V> f; int n, i, fh;// 要创建 tableif (tab == null || (n = tab.length) == 0)// 初始化 table 使用了 cas, 无需 synchronized 创建成功, 进入下一轮循环tab = initTable();// 要创建链表头节点else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 添加链表头使用了 cas, 无需 synchronizedif (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;}// 帮忙扩容else if ((fh = f.hash) == MOVED)// 帮忙之后, 进入下一轮循环tab = helpTransfer(tab, f);else {V oldVal = null;// 锁住链表头节点synchronized (f) {// 再次确认链表头节点没有被移动if (tabAt(tab, i) == f) {// 链表if (fh >= 0) {binCount = 1;// 遍历链表for (Node<K,V> e = f;; ++binCount) {K ek;// 找到相同的 keyif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;// 更新if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;// 已经是最后的节点了, 新增 Node, 追加至链表尾if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 红黑树else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;// putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNodeif ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}// 释放链表头节点的锁}if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD)// 如果链表长度 >= 树化阈值(8), 进行链表转为红黑树treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 增加 size 计数addCount(1L, binCount);return null;
}
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)Thread.yield();// 尝试将 sizeCtl 设置为 -1(表示初始化 table)else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 获得锁, 创建 table, 这时其它线程会在 while() 循环中 yield 直至 table 创建try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}break;}}return tab;
}
// check 是之前 binCount 的个数
private final void addCount(long x, int check) {CounterCell[] as; long b, s;if (// 已经有了 counterCells, 向 cell 累加(as = counterCells) != null ||// 还没有, 向 baseCount 累加!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (// 还没有 counterCellsas == null || (m = as.length - 1) < 0 ||// 还没有 cell(a = as[ThreadLocalRandom.getProbe() & m]) == null ||// cell cas 增加计数失败!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// 创建累加单元数组和cell, 累加重试fullAddCount(x, uncontended);return;}if (check <= 1)return;// 获取元素个数s = sumCount();}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// newtable 已经创建了,帮忙扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 需要扩容,这时 newtable 未创建else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}
}

总结:

  • 进入for循环:

    • if table为null或者长度 为0

      • 初始化表
    • else if 索引处无节点

      • 创建节点,填入key和value,放入table,退出循环
    • else if 索引处节点的hash值为MOVE(ForwardingNode),表示正在扩容和迁移

      • 帮忙
    • else

      • 锁住头节点

        • if 再次确认头节点没有被移动

          • if 头节点hash值大于0(表示这是一个链表)

            • 遍历链表找到对应key,如果没有,创建。
          • else if 节点为红黑树节点

            • 调用putTreeVal查看是否有对应key的数节点

              • 如果有且为覆盖模式,将值覆盖,返回旧值
              • 如果没有,创建并插入,返回null
        • 解锁

      • if binCount不为0

        • 如果binCount大于树化阈值8

          • 树化
        • 如果旧值不为null

          • 返回旧值
        • break

  • 增加size计数

  • return null

6. size 计算流程

size 计算实际发生在 put,remove 改变集合元素的操作之中

  • 没有竞争发生,向 baseCount 累加计数

  • 有竞争发生,新建 counterCells,向其中的一个 cell 累加计

    • counterCells 初始有两个 cell
    • 如果计数竞争比较激烈,会创建新的 cell 来累加计数
public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}
final long sumCount() {CounterCell[] as = counterCells; CounterCell a;// 将 baseCount 计数与所有 cell 计数累加long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;
}
7. 总结

Java 8 数组(Node) +( 链表 Node | 红黑树 TreeNode ) 以下数组简称(table),链表简称(bin)

  • 初始化,使用 cas 来保证并发安全,懒惰初始化 table
  • 树化,当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程 会用 synchronized 锁住链表头
  • put,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素 添加至 bin 的尾部
  • get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新 table 进行搜索
  • 扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时妙的是其它竞争线程也不是无事可 做,它们会帮助把其它 bin 进行扩容,扩容时平均只有 1/6 的节点会把复制到新 table 中
  • size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中。最后统计数量时累加 即可

源码分析 http://www.importnew.com/28263.html

其它实现 Cliff Click’s high scale lib

03. JDK 7 ConcurrentHashMap

它维护了一个 segment 数组,每个 segment 对应一把锁

  • 优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 jdk8 中是类似的
  • 缺点:Segments 数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化
1. 构造器分析
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// ssize 必须是 2^n, 即 2, 4, 8, 16 ... 表示了 segments 数组的大小int sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}// segmentShift 默认是 32 - 4 = 28this.segmentShift = 32 - sshift;// segmentMask 默认是 15 即 0000 0000 0000 1111this.segmentMask = ssize - 1;if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c)cap <<= 1;// 创建 segments and segments[0]Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;
}

可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好

其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment

例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位

image.png

结果再与 this.segmentMask 做位于运算,最终得到 1010 即下标为 10 的 segment

image-20220317180935914

image.png

2. put 流程
public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();int hash = hash(key);// 计算出 segment 下标int j = (hash >>> segmentShift) & segmentMask;// 获得 segment 对象, 判断是否为 null, 是则创建该 segmentif ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) {// 这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null,// 因此在 ensureSegment 里用 cas 方式保证该 segment 安全性s = ensureSegment(j);}// 进入 segment 的put 流程return s.put(key, hash, value, false);
}

segment 继承了可重入锁(ReentrantLock),它的 put 方法为

final V put(K key, int hash, V value, boolean onlyIfAbsent) {// 尝试加锁HashEntry<K,V> node = tryLock() ? null :// 如果不成功, 进入 scanAndLockForPut 流程// 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程// 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来scanAndLockForPut(key, hash, value);// 执行到这里 segment 已经被成功加锁, 可以安全执行V oldValue;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {// 更新K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) { oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;} break;}e = e.next;}else {// 新增// 1) 之前等待锁时, node 已经被创建, next 指向链表头if (node != null)node.setNext(first);else// 2) 创建新 nodenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1; // 3) 扩容if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);else// 将 node 作为链表头setEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {unlock();}return oldValue;
}
3. rehash 流程

发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全

private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;int newCapacity = oldCapacity << 1;threshold = (int)(newCapacity * loadFactor);HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];int sizeMask = newCapacity - 1;for (int i = 0; i < oldCapacity ; i++) {HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;int idx = e.hash & sizeMask;if (next == null) // Single node on listnewTable[idx] = e;else { // Reuse consecutive sequence at same slotHashEntry<K,V> lastRun = e;int lastIdx = idx;// 过一遍链表, 尽可能把 rehash 后 idx 不变的节点重用for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}newTable[lastIdx] = lastRun;// 剩余节点需要新建for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}// 扩容完成, 才加入新的节点int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;// 替换为新的 HashEntry tabletable = newTable;
}

附,调试代码

public static void main(String[] args) {ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();for (int i = 0; i < 1000; i++) {int hash = hash(i);int segmentIndex = (hash >>> 28) & 15;if (segmentIndex == 4 && hash % 8 == 2) {System.out.println(i + "\t" + segmentIndex + "\t" + hash % 2 + "\t" + hash % 4 +"\t" + hash % 8);}}map.put(1, "value");map.put(15, "value"); // 2 扩容为 4 15 的 hash%8 与其他不同map.put(169, "value");map.put(197, "value"); // 4 扩容为 8map.put(341, "value");map.put(484, "value");map.put(545, "value"); // 8 扩容为 16map.put(912, "value");map.put(941, "value");System.out.println("ok");
}
private static int hash(Object k) {int h = 0;if ((0 != h) && (k instanceof String)) {return sun.misc.Hashing.stringHash32((String) k);}h ^= k.hashCode();// Spread bits to regularize both segment and index locations,// using variant of single-word Wang/Jenkins hash.h += (h << 15) ^ 0xffffcd7d;h ^= (h >>> 10);h += (h << 3);h ^= (h >>> 6);h += (h << 2) + (h << 14);int v = h ^ (h >>> 16);return v;
}
4. get 流程

get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新 表取内容

public V get(Object key) {Segment<K,V> s; // manually integrate access methods to reduce overheadHashEntry<K,V>[] tab;int h = hash(key);// u 为 segment 对象在数组中的偏移量long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;// s 即为 segmentif ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;
}
5. size 计算流程
  • 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回
  • 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回
public int size() {// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.final Segment<K,V>[] segments = this.segments;int size;boolean overflow; // true if size overflows 32 bitslong sum; // sum of modCountslong last = 0L; // previous sumint retries = -1; // first iteration isn't retrytry {for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) {// 超过重试次数, 需要创建所有 segment 并加锁for (int j = 0; j < segments.length; ++j)ensureSegment(j).lock(); // force creation}sum = 0L;size = 0;overflow = false;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {sum += seg.modCount;int c = seg.count;if (c < 0 || (size += c) < 0)overflow = true;}}if (sum == last)break;last = sum;}} finally {if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}}return overflow ? Integer.MAX_VALUE : size;
}

⑦:BlockingQueue

*1、BlockingQueue 原理

01. 基本的入队出队
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {static class Node<E> {E item;/*** 下列三种情况之一* - 真正的后继节点* - 自己, 发生在出队时* - null, 表示是没有后继节点, 是最后了*/Node<E> next;Node(E x) { item = x; }}
}
1. 初始化链表

last = head = new Node(null);Dummy 节点用来占位,item 为 null

2. 当一个节点入队

last = last.next = node;

再来一个节点入队last = last.next = node;

3. 出队
//临时变量h用来指向哨兵
Node<E> h = head;
//first用来指向第一个元素
Node<E> first = h.next;
h.next = h; // help GC
//head赋值为first,表示first节点就是下一个哨兵。
head = first;
E x = first.item;
//删除first节点中的数据,表示真正成为了哨兵,第一个元素出队。
first.item = null;
return x;

h = head

image.png

first = h.next

image.png

h.next = h

image.png

head = first

image.png

E x = first.item;
first.item = null;
return x;

image.png

02. 加锁分析

高明之处在于用了两把锁和 dummy 节点

  • 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行

  • 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行

    • 消费者与消费者线程仍然串行
    • 生产者与生产者线程仍然串行

线程安全分析

  • 当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争
  • 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
  • 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
// 用于 put(阻塞) offer(非阻塞)
private final ReentrantLock putLock = new ReentrantLock();
// 用户 take(阻塞) poll(非阻塞)
private final ReentrantLock takeLock = new ReentrantLock();

put 操作

public void put(E e) throws InterruptedException {//LinkedBlockingQueue不支持空元素if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;// count 用来维护元素计数final AtomicInteger count = this.count;putLock.lockInterruptibly();try {// 满了等待while (count.get() == capacity) {// 倒过来读就好: 等待 notFullnotFull.await();}// 有空位, 入队且计数加一enqueue(node);c = count.getAndIncrement(); // 除了自己 put 以外, 队列还有空位, 由自己叫醒其他 put 线程if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}// 如果队列中有一个元素, 叫醒 take 线程if (c == 0)// 这里调用的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是为了减少竞争signalNotEmpty();
}

take 操作

public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 如果队列中只有一个空位时, 叫醒 put 线程// 如果有多个线程进行出队, 第一个线程满足 c == capacity, 但后续线程 c < capacityif (c == capacity)// 这里调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争signalNotFull()return x;
}

由 put 唤醒 put 是为了避免信号不足

03. 性能比较

主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较

  • Linked 支持有界,Array 强制有界
  • Linked 实现是链表,Array 实现是数组
  • Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
  • Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
  • Linked 两把锁,Array 一把锁

⑧:ConcurrentLinkedQueue

ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是

  • 两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
  • dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争
  • 只是这【锁】使用了 cas 来实现

事实上,ConcurrentLinkedQueue 应用还是非常广泛的

例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了 ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用

Connector->NIO EndPoint
Executor
有读
有读
socketProcessor
socketProcessor
LimitLatch
Acceptor
SocketChannel 1
SocketChannel 2
Poller
worker1
worker2

*1、ConcurrentLinkedQueue 原理

01. 模仿 ConcurrentLinkedQueue

初始代码

package cn.itcast.concurrent.thirdpart.test;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
public class Test3 {public static void main(String[] args) {MyQueue<String> queue = new MyQueue<>();queue.offer("1");queue.offer("2");queue.offer("3");System.out.println(queue);}
}
class MyQueue<E> implements Queue<E> {@Overridepublic String toString() {StringBuilder sb = new StringBuilder();for (Node<E> p = head; p != null; p = p.next.get()) {E item = p.item;if (item != null) {sb.append(item).append("->");}}sb.append("null");return sb.toString();}@Overridepublic int size() {return 0;}@Overridepublic boolean isEmpty() {return false;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<E> iterator() {return null;}@Overridepublic Object[] toArray() {return new Object[0];}@Overridepublic <T> T[] toArray(T[] a) {return null;}@Overridepublic boolean add(E e) {return false;}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean containsAll(Collection<?> c) {return false;}@Overridepublic boolean addAll(Collection<? extends E> c) {return false;}@Overridepublic boolean removeAll(Collection<?> c) {return false;}@Overridepublic boolean retainAll(Collection<?> c) {return false;}@Overridepublic void clear() {}@Overridepublic E remove() {return null;}@Overridepublic E element() {return null;}@Overridepublic E peek() {return null;}public MyQueue() {head = last = new Node<>(null, null);}private volatile Node<E> last;private volatile Node<E> head;private E dequeue() {/*Node<E> h = head;Node<E> first = h.next;h.next = h;head = first;E x = first.item;first.item = null;return x;*/return null;}@Overridepublic E poll() {return null;}@Overridepublic boolean offer(E e) {return true;}static class Node<E> {volatile E item;public Node(E item, Node<E> next) {this.item = item;this.next = new AtomicReference<>(next);}AtomicReference<Node<E>> next;}
}

offer

public boolean offer(E e) {Node<E> n = new Node<>(e, null);while(true) {// 获取尾节点AtomicReference<Node<E>> next = last.next;// S1: 真正尾节点的 next 是 null, cas 从 null 到新节点if(next.compareAndSet(null, n)) {// 这时的 last 已经是倒数第二, next 不为空了, 其它线程的 cas 肯定失败// S2: 更新 last 为倒数第一的节点last = n;return true;}}
}

⑨:CopyOnWriteArrayList

CopyOnWriteArraySet是它的马甲 底层实现采用了 写入时拷贝 的思想,增删改操作会将底层数组拷贝一份,更 改操作在新数组上执行,这时不影响其它线程的并发读读写分离。 以新增为例:

public boolean add(E e) {synchronized (lock) {// 获取旧的数组Object[] es = getArray();int len = es.length;// 拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程)es = Arrays.copyOf(es, len + 1);// 添加新元素es[len] = e;// 替换旧的数组setArray(es);return true;}
}

这里的源码版本是 Java 11,在 Java 1.8 中使用的是可重入锁而不是 synchronized

其它读操作并未加锁,例如:

public void forEach(Consumer<? super E> action) {Objects.requireNonNull(action);for (Object x : getArray()) {@SuppressWarnings("unchecked") E e = (E) x;action.accept(e);}
}

适合『读多写少』的应用场景

1、get 弱一致性!

image.png

时间点操作
1Thread-0 getArray()
2Thread-1 getArray()
3Thread-1 setArray(arrayCopy)
4Thread-0 array[index]

不容易测试,但问题确实存在

2、迭代器弱一致性

CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Iterator<Integer> iter = list.iterator();
new Thread(() -> {list.remove(0);System.out.println(list);
}).start();
sleep1s();
//此时主线程的iterator依旧指向旧的数组。
while (iter.hasNext()) {System.out.println(iter.next());
}

不要觉得弱一致性就不好

  • 数据库的 MVCC 都是弱一致性的表现
  • 并发高和一致性是矛盾的,需要权衡

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/164354.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Tomcat的类加载器

详情可以参考&#xff1a;https://tomcat.apache.org/tomcat-10.1-doc/class-loader-howto.html 简要说明 Tomcat安装了多种类加载器&#xff0c;以便容器的不同部分、容器中的应用访问能够不同的类和资源。 在Java环境中&#xff0c;类加载器被组织为父-子树的形式。通常情况…

发电机综合特性测试仪

发电机综合特性测试仪是可以测量发电机的电压、电流、功率因数、频率等参数&#xff0c;以评估发电机的质量和性能。可以测量发电机的输出电压&#xff0c;以确保其符合规定的标准和要求。测量发电机的输出电流&#xff0c;以确定其负载能力和稳定性。可以测量发电机的功率因数…

五、计算机网络

&#xff08;一&#xff09;OSI/RM 七层模型 七层模型是计算机网络的基石&#xff0c;整个计算机网络是构建与七层模型之上的。 在数据链路层&#xff0c;数据开始以帧为单位&#xff0c;网卡的 MAC 地址就是数据帧的地址&#xff0c;数据的传输开始有地址了。 局域网是工作…

关于稳定扩散最详细的介绍

推荐基于稳定扩散(stable diffusion) AI 模型开发的自动纹理工具&#xff1a; DreamTexture.js自动纹理化开发包 - NSDT Stable Diffusion 用途广泛&#xff0c;因为它可以以多种不同的方式使用。首先&#xff0c;让我们关注仅从文本 &#xff08;text2img&#xff09; 生成图像…

Vb6 TCP Server服务端监听多个RFID读卡器客户端上传的刷卡数据

本示例使用设备介绍&#xff1a;WIFI无线4G网络RFID云读卡器远程网络开关物流网阅读器TTS语音-淘宝网 (taobao.com) Option ExplicitConst BUSY As Boolean False 定义常量 Const FREE As Boolean TrueDim ConnectState() As Boolean 定义连接状态 Dim ServerSendbuf(…

ansible第一天

ansible 第一天 以上主机使用rhel-8.2-x86_64-dvd.iso镜像&#xff0c;配置ip、yum源&#xff0c;关闭防火墙和selinux规则 安装中文包&#xff0c;重启生效 [rootcontrol ~]# yum -y install langpacks-zh_CN.noarch && reboot 配置名称解析 [rootcontrol ~]# echo…

HUAWEI悦盒ec6108v9c 如何刷成海纳思系统(家用低功耗服务器,使用Home Assistant服务)

环境&#xff1a; 1.HW悦盒ec6108v9c一套 2.16G U盘 3.格式化软件USB_format.exe 4.固件 mv100-mdmo1g-usb-flash.zip&#xff08;底层是Ubuntu 20.04系统&#xff09; 5.十字螺丝刀 6.翘片/薄铲子 7.有线网络环境 8.镊子/回形针 问题描述&#xff1a; 最近玩智能家居…

R语言实操记录——导出高清图片(矢量图)

R语言 R语言实操记录——导出高清图片&#xff08;矢量图&#xff09; 文章目录 R语言一、起因&#xff08;闲聊&#xff0c;可跳过&#xff09;二、如何在R中导出高清图片&#xff08;矢量图&#xff09;2.1、保存为EPS图片格式后转AI编辑2.2、保存为PDF格式&#xff08;推荐…

使用PCtoLCD2002提取字模

“模式”---“字符模式” LCD显示&#xff0c;汉字使用宋体还是比较好的&#xff0c;16*16是长、宽都是16个像素显示。

aardio 中文字符转换unicode及Unicode 编码转换为中文

废话不多说 直接开干&#xff01; 知识点 需要库 import string; import inet.whttp; import console; import inet import inet.url string.unescape Unicode解码 string.fromto 编码 inet.url.encode 解码 import win.ui; import string; import inet.whttp; import consol…

【1107】

interface是面向对象编程语言中接口操作的关键字&#xff0c;功能是把所需成员组合起来&#xff0c;用来封装一定功能的集合。 它好比一个模板&#xff0c;在其中定义了对象必须实现的成员&#xff0c;通过类或结构来实现它。 接口不能直接实例化&#xff0c;即ICount icnew iC…

2023中国视频云市场报告:腾讯云音视频解决方案份额连续六次蝉联榜首,加速全球化布局

近日&#xff0c;国际数据公司&#xff08;IDC&#xff09;发布了《中国视频云市场跟踪&#xff08;2023上半年&#xff09;》报告&#xff0c;腾讯云音视频的解决方案份额连续六次蝉联榜首&#xff0c;并在视频生产创作与媒资管理市场份额中排名第一。同时&#xff0c;在实时音…