通信工具类

news/2024/9/8 9:08:58/文章来源:https://www.cnblogs.com/sprinining/p/18326732
作用
Semaphore 限制线程的数量
Exchanger 两个线程交换数据
CountDownLatch 线程等待直到计数器减为 0 时开始工作
CyclicBarrier 作用跟 CountDownLatch 类似,但是可以重复使用
Phaser 增强的 CyclicBarrier

Semaphore


Semaphore 翻译过来是信号的意思。顾名思义,这个工具类提供的功能就是多个线程彼此“传信号”。而这个“信号”是一个int类型的数据,也可以看成是一种“资源”。

// 传入初始资源总数,默认情况下,是非公平的同步器
public Semaphore(int permits) {sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

最主要的方法是 acquire 方法和 release 方法。acquire()方法会申请一个 permit,而 release 方法会释放一个 permit。当然,你也可以申请多个 acquire(int permits)或者释放多个 release(int permits)

每次 acquire,permits 就会减少一个或者多个。如果减少到了 0,再有其他线程来 acquire,那就要阻塞这个线程直到有其它线程 release permit 为止。

Semaphore 往往用于资源有限的场景中,去限制线程的数量。举个例子,我想限制同时只能有 3 个线程在工作:

public class Main {static class MyThread implements Runnable {private final int value;private final Semaphore semaphore;public MyThread(int value, Semaphore semaphore) {this.value = value;this.semaphore = semaphore;}@Overridepublic void run() {try {// 获取 permitsemaphore.acquire();System.out.printf("当前线程是%d, 还剩%d个资源,还有%d个线程在等待%n",value, semaphore.availablePermits(), semaphore.getQueueLength());// 睡眠随机时间,打乱释放顺序Random random = new Random();Thread.sleep(random.nextInt(1000));System.out.printf("线程%d释放了资源%n", value);} catch (InterruptedException e) {e.printStackTrace();} finally {// 释放 permitsemaphore.release();}}}public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);for (int i = 0; i < 10; i++) {new Thread(new MyThread(i, semaphore)).start();}}
}

Semaphore 默认的 acquire 方法是会让线程进入等待队列,且抛出异常中断。但它还有一些方法可以忽略中断或不进入阻塞队列:

// 忽略中断
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)// 不进入等待队列,底层使用CAS
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore 内部有一个继承了 AQS 的同步器 Sync,重写了tryAcquireShared方法。在这个方法里,会去尝试获取资源。如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入 AQS 的等待队列。

Exchanger


Exchanger 类用于两个线程交换数据。它支持泛型,也就是说可以在两个线程之间传送任何数据。

public class Main {public static void main(String[] args) throws InterruptedException {Exchanger<String> exchanger = new Exchanger<>();new Thread(() -> {try {System.out.println("这是线程A,得到了另一个线程的数据:"+ exchanger.exchange("这是来自线程A的数据"));} catch (InterruptedException e) {e.printStackTrace();}}).start();// 当一个线程调用 exchange 方法后,会处于阻塞状态,只有当另一个线程也调用了 exchange 方法,它才会继续执行System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");Thread.sleep(1000);new Thread(() -> {try {System.out.println("这是线程B,得到了另一个线程的数据:"+ exchanger.exchange("这是来自线程B的数据"));} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}

根据 JDK 里面注释的说法,可以总结为一下特性:

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景。

Exchanger 类还有一个有超时参数的方法,如果在指定时间内没有另一个线程调用 exchange,就会抛出一个超时异常。

public V exchange(V x, long timeout, TimeUnit unit)

Exchanger 是可以重复使用的。也就是说。两个线程可以使用 Exchanger 在内存中不断地再交换数据。

CountDownLatch


public class CountDownLatch {private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;// 计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且 CountDownLatch没有提供任何机制去重新设置这个计数值Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}private final Sync sync;public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}// 等待public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}// 超时等待public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}// count - 1public void countDown() {sync.releaseShared(1);}// 获取当前还有多少countpublic long getCount() {return sync.getCount();}public String toString() {return super.toString() + "[Count = " + sync.getCount() + "]";}
}
public class Main {// 定义前置任务线程static class PreTaskThread implements Runnable {private final String task;private final CountDownLatch countDownLatch;public PreTaskThread(String task, CountDownLatch countDownLatch) {this.task = task;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {Random random = new Random();Thread.sleep(random.nextInt(1000));System.out.println(task + " - 任务完成");countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {// 假设有三个模块需要加载CountDownLatch countDownLatch = new CountDownLatch(3);// 主任务new Thread(() -> {try {System.out.println("等待数据加载...");System.out.printf("还有%d个前置任务%n", countDownLatch.getCount());countDownLatch.await();System.out.println("数据加载完成,正式开始游戏!");} catch (InterruptedException e) {e.printStackTrace();}}).start();// 前置任务new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();}
}

CyclicBarrier


CyclicBarrirer 从名字上来理解是“循环屏障”的意思。前面提到了 CountDownLatch 一旦计数值count被降为 0 后,就不能再重新设置了,它只能起一次“屏障”的作用。而 CyclicBarrier 拥有 CountDownLatch 的所有功能,还可以使用reset()方法重置屏障。

如果参与者(线程)在等待的过程中,Barrier 被破坏,就会抛出 BrokenBarrierException。可以用isBroken()方法检测 Barrier 是否被破坏。

  1. 如果有线程已经处于等待状态,调用 reset 方法会导致已经在等待的线程出现 BrokenBarrierException 异常。并且由于出现了 BrokenBarrierException,将会导致始终无法等待。
  2. 如果在等待的过程中,线程被中断,会抛出 InterruptedException 异常,并且这个异常会传播到其他所有的线程。
  3. 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出 BrokenBarrierException,屏障被损坏。
  4. 如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出 BrokenBarrierException 异常。
// 构造方法
public CyclicBarrier(int parties) {this(parties, null);
}public CyclicBarrier(int parties, Runnable barrierAction) {// 具体实现
}
public class Main {static class PreTaskThread implements Runnable {private final String task;private final CyclicBarrier cyclicBarrier;public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {this.task = task;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {// 假设总共三个关卡for (int i = 1; i < 4; i++) {try {Random random = new Random();Thread.sleep(random.nextInt(1000));System.out.printf("关卡%d的任务%s完成%n", i, task);// 一旦调用 await 方法的线程数量等于构造方法中传入的任务总量(这里是 3),就代表达到屏障了。cyclicBarrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}}}public static void main(String[] args) {// CyclicBarrier 允许我们在达到屏障的时候可以执行一个任务,可以在构造方法传入一个 Runnable 类型的对象。CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {System.out.println("本关卡所有前置任务完成,开始游戏...");});new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();}
}

CyclicBarrier 内部使用的是 Lock + Condition 实现的等待/通知模式。详情可以查看这个方法的源码:

private int dowait(boolean timed, long nanos)

Phaser


Phaser 是 Java 7 中引入的一个并发同步工具,它提供了对动态数量的线程的同步能力,这与 CyclicBarrier 和 CountDownLatch 不同,因为它们都需要预先知道等待的线程数量。Phaser 是多阶段的,意味着它可以同步不同阶段的多个操作。

前面我们介绍了 CyclicBarrier,可以发现它在构造方法里传入了“任务总量”parties之后,就不能修改这个值了,并且每次调用await()方法也只能消耗一个parties计数。但 Phaser 可以动态地调整任务总量!

Phaser 是阶段性的,所以它有一个内部的阶段计数器。每当我们到达一个阶段的结尾时,Phaser 会自动前进到下一个阶段。

名词解释:

  • Party:Phaser 的上下文中,一个 party 可以是一个线程,也可以是一个任务。当我们在 Phaser 上注册一个 party 时,Phaser 会递增它的参与者数量。
  • arrive:对应一个 party 的状态,初始时是 unarrived,当调用arriveAndAwaitAdvance()或者 arriveAndDeregister()进入 arrive 状态,可以通过getUnarrivedParties()获取当前未到达的数量。
  • register:注册一个新的 party 到 Phaser。
  • deRegister:减少一个 party。
  • phase:阶段,当所有注册的 party 都 arrive 之后,将会调用 Phaser 的onAdvance()方法来判断是否要进入下一阶段。

Phaser 的终止有两种途径,Phaser 维护的线程执行完毕或者onAdvance()返回true

public class Main {static class PreTaskThread implements Runnable {private final String task;private final Phaser phaser;public PreTaskThread(String task, Phaser phaser) {this.task = task;this.phaser = phaser;}@Overridepublic void run() {for (int i = 1; i < 4; i++) {try {// 从第二个关卡起,不加载新手教程if (i >= 2 && "加载新手教程".equals(task)) {continue;}Random random = new Random();Thread.sleep(random.nextInt(1000));System.out.printf("关卡%d,需要加载%d个模块,当前模块【%s】%n",i, phaser.getRegisteredParties(), task);// 从第二个关卡起,不加载新手教程if (i == 1 && "加载新手教程".equals(task)) {System.out.println("下次关卡移除加载【新手教程】模块");// 移除一个模块phaser.arriveAndDeregister();} else {phaser.arriveAndAwaitAdvance();}} catch (InterruptedException e) {e.printStackTrace();}}}}public static void main(String[] args) {Phaser phaser = new Phaser(4) {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.printf("第%d次关卡准备完成%n", phase + 1);return phase == 3 || registeredParties == 0;}};new Thread(new PreTaskThread("加载地图数据", phaser)).start();new Thread(new PreTaskThread("加载人物模型", phaser)).start();new Thread(new PreTaskThread("加载背景音乐", phaser)).start();new Thread(new PreTaskThread("加载新手教程", phaser)).start();}
}

这里要注意关卡 1 的输出,在“加载新手教程”线程中调用了arriveAndDeregister()减少一个 party 之后,后面的线程使用getRegisteredParties()得到的是已经被修改后的 parties 了。但是当前这个阶段(phase),仍然是需要 4 个 parties 都 arrive 才触发屏障的。从下一个阶段开始,才需要 3 个 parties 都 arrive 就触发屏障。

Phaser 类用来控制某个阶段的线程数量很有用,但它并不在意这个阶段具体有哪些线程 arrive,只要达到它当前阶段的 parties 值,就触发屏障。所以我这里的案例虽然制定了特定的线程(加载新手教程)来更直观地表述 Phaser 的功能,但其实 Phaser 是没有分辨具体是哪个线程的功能的,它在意的只是数量。

它内部使用了两个基于 Fork-Join 框架的原子类辅助:

private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;static final class QNode implements ForkJoinPool.ManagedBlocker {// 实现代码
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/772528.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

正新轮胎真的挺好

sc12 龙王胎, 雨天根本不滑7月份买的,2424 ,新的很,就一百多,换好160左右。还是很值的,别用什么顾满德,人家推这个产品是因为利润,不是因为他觉得轮胎好, 之前老板前后轮卖给我400块钱,包安装,含泪赚了至少200 。。太坑了,轮胎本身也没什么问题,就是硬,耐磨,有点…

perf抓取火焰图

目录一、Perf工具安装(压缩包在帖子下方的附件中)二、热点信息生成火焰图1、容器外抓取热点信息生成火焰图2、容器内抓取热点信息生成火焰图3、生成火焰图效果三、在容器中查看实时热点信息 一、Perf工具安装(压缩包在帖子下方的附件中) *****************************ARM*****…

OpenAI深夜丢炸弹硬杠谷歌搜索

这几年科技变革太快,AI更是飞速发展,作为一名IT老兵,使用过的搜索引擎也是一换再换。这不,刚消停了一段时间的OpenAI又丢出一个炸弹SearchGPT,直接跟谷歌掀桌子了。这几年科技变革太快,AI更是飞速发展,作为一名IT老兵,使用过的搜索引擎也是一换再换。这不,刚消停了一段…

GDB详解

GDB简介 GDB(GNU Debugger)是Linux下一款C/C++程序调试工具,通过在命令行中执行相应的命令实现程序的调试,使用GDB时只需要在shell中输入gdb命令或gdb filename(filename为可执行程序文件名)即可进入GDB调试环境。 GDB主要有以下功能:设置断点 单步调试 查看变量的值 动态…

从零开始搭建博客系列-终

结束,也是新的开始。结束,也是新的开始。 ‍ 不知不觉也写了接近 30 篇博客了,也帮助到了很多人,甚是欣慰。 本文就做一个小结吧 🎉 ‍ 搭建博客非一日之功 我从 2022 年 4 月开始搭建博客,每天/每周都抽出点时间搞,前前后后花了几个月时间,基本功能才算完成了,并且几…

进度报告9

(1)1.学习string和arraylist集合的使用2.案例练习

在python3.8虚拟环境 执行pip 安装Excel的库

1、在开始菜单 打开Anaconda prompt(anaconda3) 2、查看环境列表 3、进入虚拟环境 4、在虚拟换进下使用清华源安装读取excel的库 和写入excel的库 读取Excel文件的库:pip install xlrd -i https://pypi.tuna.tsinghua.edu.cn/simple 写入Excel文件的库:pip install xlwt …

51nod-基因匹配+luogu-【模板】最长公共子序列

https://www.luogu.com.cn/problem/P1439 https://class.51nod.com/Html/Textbook/ChapterIndex.html#textbookId=126&chapterId=338 以上两个都是特例,一个是每个元素不重复,一个是每个元素均有5个。正确性说明参考:https://www.luogu.com.cn/article/1bcs9052 由于一般…

大语言模型的Scaling Law:如何随着模型大小、训练数据和计算资源的增加而扩展

人工智能的世界正在经历一场革命,大型语言模型正处于这场革命的前沿,它们似乎每天都在变得更加强大。从BERT到GPT-3再到PaLM,这些AI巨头正在推动自然语言处理可能性的边界。但你有没有想过是什么推动了它们能力的飞速提升? 在这篇文章中,我们将介绍使这些模型运作的秘密武…

教你如何管理Linux网络,一招鲜吃遍天?!

01 准备工作 当前操作的虚拟机版本信息:CentOS8 当前操作的虚拟化软件:VMware workstation 由于虚拟化软件中有3种网络模式,我们这里选择使用NAT模式 提前查看虚拟机的网段信息是多少,方便我们后续配置网络能够有效使用在配置网络之前您需要了解一些基础知识: 在给Linux系…

使用iwctl连接无线网络

检查wifi模块驱动是否正确 ip addr #输出的信息查看是否 包含‘w’开头的网卡安装iwd这里使用iwd管理WiFi,主要原因是小巧,方便使用无需额外配置# 安装 apt install iwd # 设置开机启动 systemctl enable --now iwd # 查看无线网卡 iwctl device list# 扫描并获取无线网络 iw…

Hisiphp2.0.11的文件上传

php第二个复现漏洞,危险函数PclZip()侵权声明 本文章中的所有内容(包括但不限于文字、图像和其他媒体)仅供教育和参考目的。如果在本文章中使用了任何受版权保护的材料,我们满怀敬意地承认该内容的版权归原作者所有。 如果您是版权持有人,并且认为您的作品被侵犯,请通过以…