【JavaEE】多线程案例-阻塞队列

在这里插入图片描述

1. 前言

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

  • 在队列为空时,获取元素的线程会等待队列变为非空
  • 当队列满时,存储元素的线程会等待队列可用

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

2. 什么是生产者-消费者模型

生产者消费者模型是一种多线程并发协作的模型,由两类线程和一个缓冲区组成:生产者线程生产数据并把数据放在缓冲区,消费者线程从缓冲区取数据并消费。生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品。当存储空间为空时,消费者阻塞;当存储空间满时,生产者阻塞。
在这里插入图片描述

3. 生产者-消费者模型的意义

  1. 解耦合
  2. 削峰填谷

3.1 解耦合

两个模块的联系越紧密,耦合度就越高,耦合度越高就意味着两个模块的影响程度很大,当一个模块出现问题的时候,另一个模块也会受到影响而导致出现问题,特别是对分布式系统来说:解耦合是非常重要的。

在这里插入图片描述
假设上面是一个简单的分布式系统,服务器 A 和服务器 B 之间直接进行交互(服务器 A 向服务器 B 发送请求并接收服务器 B 返回的信息,服务器 B 向服务器 A 发送请求,以及接收服务器 A 返回的信息),服务器 A 和服务器 B 之间的耦合度比较高,当两个服务器之间的一个发生故障的时候就会导致两个服务器都无法使用。

不仅如此,当我们想要再添加一个服务器 C 与服务器 A 之间进行交互的时候,不仅需要对服务器 C 做出修改,还需要对服务器 A 作出修改。

相比上面的情况,如果我们使用生产者-消费者模型的话就可以解决上面的耦合度过高的问题。

在这里插入图片描述
服务器 A 接收到客户端发来的请求不是直接发送给服务器 B ,而是将接收到的请求加入到阻塞队列中,然后服务器 B 从阻塞队列中获取到请求,这样就避免了两个服务器之间进行直接的交互,降低了耦合性;不仅如此,当我们需要额外添加一个服务器 C 的时候,就不需要对服务器 A 做出修改,而是直接从阻塞队列获取请求信息。

在这里插入图片描述

3.2 削峰填谷

当客户端向服务器 A 短时间发出大量请求信息的话,那么当服务器 A 接收到客户端发来的请求的时候,就会立即将收到的所有信息都发送给服务器 B ,但是由于虽然服务器 A 能够接收的请求量可以很多,但是服务器 B 却不能一次接收这么多请求,就会导致服务器 B 会挂掉。

如果使用生产者-消费者莫模型的话,当客户端向服务器 A 短时间发送大量请求的话,服务器 A 不会将请求发送给 B ,而是发送给阻塞队列中,当阻塞队列满了的时候,服务器 A 就会停止向阻塞队列中发送请求,陷入阻塞状态,等服务器 B 向阻塞队列中受到请求使得阻塞队列容量减少的时候,服务器 A 才会继续向阻塞队列中发送收到的请求,这样就避免了服务器 B 短时间内受到大量的请求而挂掉的情况;如果阻塞队列中收到的请求信息被读取完的时候,服务器 B 就会停止从阻塞队列中读取请求,进入阻塞状态,直到服务器 A 向阻塞队列中发送请求。
在这里插入图片描述

4. 如何使用Java标准库提供的阻塞队列

当知道了生产者-消费者模型的意义之后,我们就来看看如何使用阻塞队列。在Java标准库中提供了阻塞队列 BlockingQueue 可以直接使用。

在这里插入图片描述
因为 BlockingQueu 是一个接口,无法实例化,所以需要创建出实现了 BlockingQueue 接口的类,而 ArrayBlockingQueue 和 LinkedBlockingQueue 实现了这个接口。

我们还可以观察到,BlockingQueue 还继承了 Queue ,也就是说我们也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞队列中不使用这两个方法,因为这两个方法不具有阻塞特性,而是使用 put 和 take 方法。

public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new LinkedBlockingQueue<>();queue.put("123");queue.put("234");queue.put("345");queue.put("456");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}
}

在这里插入图片描述
这里向阻塞队列中加入了四个数据,但是读取的时候读取了五次,所以看到线程进入了阻塞状态。

5. 自己实现一个阻塞队列

阻塞队列是建立在队列的基础上的,所以要想实现一个阻塞队列,首先需要实现出来一个队列,那么就先来看看如何实现出一个循环队列。

5.1 实现出循环队列

队列比较容易实现,但是循环队列该如何实现呢?当数据到达数组的最后的时候,将数组的下标修改为0,这样就可以达到循环的目的。

在这里插入图片描述
当 tail == head 的时候有两种情况:

  1. 队列中没有数据的时候
  2. 队列满了的时候

为了区分这两种情况,我们可以使用两种方法:

  1. 浪费一个空间,当tail++之后,如果tail + 1 == head,则表示队列满了,将 tail 修改为 0
  2. 定义一个size变量来表示队列中有效数据的个数,当size == queue.length的时候,表示队列满了
class MyQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) {//当添加数据的时候需要判断队列的容量是否已经满了if(size == data.length) return;data[tail++] = str;size++;if(tail == data.length) tail = 0;}public String take() {//读取数据的时候需要判断队列是否为空if(size == 0) return null;String ret = data[head++];size--;if(head == data.length) head = 0;return ret;}
}

5.2 实现阻塞队列

阻塞队列就是在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。并且因为阻塞队列运用的环境是多线程,需要考虑到线程安全的问题。

5.2.1 加锁

当需要进行查询和修改的操作时,需要对该操作进行加锁。因为我们的 put 和 take 基本上都在查询和修改数据,所以可以将这两个操作直接进行加锁操作。

class MyBlockingQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) {synchronized (this) {if(size == data.length) return;data[tail++] = str;size++;if(tail == data.length) tail = 0;}}public String take() {synchronized (this) {if(size == 0) return null;String ret = data[head++];size--;if(head == data.length) head = 0;return ret;}}
}

5.2.2 进行阻塞操作

当进行完加锁操作之后,我们还需要实现阻塞的作用,当添加数据的时候,如果队列中容量满了的时候就进入阻塞等待状态,直到进行了 take 读取数据操作删除数据的时候,才停止等待;当读取数据的时候,如果队列为空,那么该线程就进入阻塞等待状态,直到进行了 put 操作。

class MyBlockingQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) throws InterruptedException {synchronized (this) {if(size == data.length) {this.wait();}data[tail++] = str;size++;if(tail == data.length) tail = 0;//这个 notify 用来唤醒 take 操作的等待this.notify();}}public String take() throws InterruptedException {synchronized (this) {if(size == 0) {this.wait();}String ret = data[head++];size--;if(head == data.length) head = 0;//这个 notify 用来唤醒 put 操作的等待this.notify();return ret;}}
}

5.2.3 解决因被 interrupt 唤醒 waiting 状态的问题

当使用了 wait 和 notify 对 put 和 take 操作进行了阻塞等待和唤醒操作之后,我们还需要注意,难道只有 notify 才会唤醒 WAITING 状态吗?前面我们学习了使用 interrupt 来终止线程,但是 interrup 还会唤醒处于 WAITING 状态的线程,也就是说这里的 WAITING 状态的线程不仅可以被 notify 唤醒,还可以被 interrupt 唤醒。

  1. 当线程是因为 put 操作队列满了的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 take 操作的 notify 唤醒的时候就意味着此时队列还是满的,当进行添加操作的时候,就会将有效的数据覆盖掉;
  2. 当线程是因为 take 操作队列为空的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 put 操作的 notify 唤醒的时候就意味着此时队列还是空的,如果进行删除操作,并没有意义。

为了解决 WAITING 状态被 interrupt 唤醒而造成的问题,当线程被唤醒的时候,需要进行判断 size 是否还等于 0 或者 queue.length,如果还等于,就继续进入 WAITING 状态,但是光一次判断是不够的,因为还可能是被 interrupt 唤醒的,所以需要进行多次判断,可以用 while 循环来解决。

class MyBlockingQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) throws InterruptedException {synchronized (this) {while(size == data.length) {this.wait();}data[tail++] = str;size++;if(tail == data.length) tail = 0;//这个 notify 用来唤醒 take 操作的等待this.notify();}}public String take() throws InterruptedException {synchronized (this) {while(size == 0) {this.wait();}String ret = data[head++];size--;if(head == data.length) head = 0;//这个 notify 用来唤醒 put 操作的等待this.notify();return ret;}}
}

5.2.4 解决因指令重排序造成的问题

因为 put 和 take 操作要进行读和写的操作,可能会因为指令重排序的问题造成其他问题,这里就需要使用 volatile 解决指令重排序问题。

class MyBlockingQueue {private final String[] data = new String[1000];private volatile int size;private volatile int head = 0;private volatile int tail = 0;public void put(String str) throws InterruptedException {synchronized (this) {while(size == data.length) {this.wait();}data[tail++] = str;size++;if(tail == data.length) tail = 0;//这个 notify 用来唤醒 take 操作的等待this.notify();}}public String take() throws InterruptedException {synchronized (this) {while(size == 0) {this.wait();}String ret = data[head++];size--;if(head == data.length) head = 0;//这个 notify 用来唤醒 put 操作的等待this.notify();return ret;}}
}

测试实现的阻塞队列

public class Demo4 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();Thread t1 = new Thread(() -> {while(true) {try {System.out.println("消费元素" + queue.take());} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(() -> {int count = 1;while(true) {try {queue.put(count + "");System.out.println("生产元素" + count);count++;Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}

让生产速度较慢,使得读取操作阻塞等待插入数据才执行。

在这里插入图片描述

public class Demo4 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();Thread t1 = new Thread(() -> {while(true) {try {System.out.println("消费元素" + queue.take());Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(() -> {int count = 1;while(true) {try {queue.put(count + "");System.out.println("生产元素" + count);count++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}

在这里插入图片描述

让生产 put 操作进入阻塞等待状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/111573.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

精益求精:Android应用体积优化的终极指南

精益求精&#xff1a;Android应用体积优化的终极指南 1. 介绍 在当今移动应用生态系统中&#xff0c;Android应用的体积优化是开发者需要高度重视的关键方面之一。一个庞大的应用体积不仅会对用户体验造成负面影响&#xff0c;还会导致以下问题&#xff1a; 下载速度延迟&am…

Re-Learn Linux Part1

1. Linux的目录结构 在Linux文件系统中有两个特殊的目录&#xff1a; 一个用户所在的工作目录&#xff0c;也叫当前目录&#xff0c;可以使用一个点 . 来表示&#xff1b;另一个是当前目录的上一级目录&#xff0c;也叫父目录&#xff0c;可以使用两个点 .. 来表示。 . &#…

Linux 忘记root密码解决方法(CentOS7.9)

忘记Linux系统的root密码&#xff0c;可以不用重新安装系统&#xff0c;进入单用户模式重新更改一下root密码即可。 步骤如下&#xff1a; 首先&#xff0c;启动Linux系统&#xff0c;进入开机界面&#xff0c;在界面中按"e"进入编辑界面&#xff0c;动作要快。按&q…

P2P协议的传输艺术

TP 采用两个 TCP 连接来传输一个文件。 控制连接&#xff1a;服务器以被动的方式&#xff0c;打开众所周知用于 FTP 的端口 21&#xff0c;客户端则主动发起连接。该连接将命令从客户端传给服务器&#xff0c;并传回服务器的应答。常用的命令有&#xff1a;list——获取文件目…

如何在 Excel 中进行加,减,乘,除

在本教程中&#xff0c;我们将执行基本的算术运算&#xff0c;即加法&#xff0c;减法&#xff0c;除法和乘法。 下表显示了我们将使用的数据以及预期的结果。 | **S / N** | **算术运算符** | **第一个号码** | **第二个号码** | **结果** | | 1 | 加法&#xff08;&#xff…

探索Redis速度之谜

Redis&#xff0c;作为一款高性能的内存数据库&#xff0c;一直以来都因其出色的速度而闻名。然而&#xff0c;Redis的速度之快究竟源自何处&#xff0c;其中隐藏着怎样的奥秘&#xff1f;在这篇博客中&#xff0c;我们将深入探索Redis速度之谜&#xff0c;揭开其快速性能背后的…

华为云云耀云服务器L实例评测|使用宝塔10分钟部署一个围猫猫小游戏

目录 前言一、选择华为云云耀云服务器L实例的原因二、华为云云耀云服务器的优势三、快速部署一个小游戏&#xff08;1&#xff09;终端部署1、使用Termius工具连接终端2、安装Nginx3、上传打包文件 &#xff08;2&#xff09;宝塔可视化面板部署1、进入宝塔2、宝塔菜单3、上传代…

千兆以太网网络层 ARP 协议的原理与 FPGA 实现

文章目录 前言一、ARP 帧的应用场景和存在目的二、ARP 帧工作原理三、以太网 ARP 帧发包实例设计四、以太网 CRC校验代码五、以太网 ARP 帧发包测试---GMII1.模拟数据发送2.仿真模块3.仿真波形六、以太网 ARP 帧发包测试---RGMII1.顶层文件2 .仿真代码七、上板测试(RGMII)前言…

Java的序列化

写在前面 本文看下序列化和反序列化相关的内容。 源码 。 1&#xff1a;为什么&#xff0c;什么是序列化和反序列化 Java对象是在jvm的堆中的&#xff0c;而堆其实就是一块内存&#xff0c;如果jvm重启数据将会丢失&#xff0c;当我们希望jvm重启也不要丢失某些对象&#xff…

css自学框架之图片懒加载

首先解释一下什么叫图片懒加载。图片懒加载是一种在页面加载时&#xff0c;延迟加载图片资源的技术&#xff0c;也就是说图片资源在需要的时候才会加载&#xff0c;就是在屏幕显示范围内加载图片&#xff0c;屏幕显示范围外图片不加载。 一、关键函数 用到的关键函数&#xf…

The driver has not received any packets from the server

在测试数据迁移时遇到的错误。 目录 一、错误 二、解决 三、数据迁移测试 3.1 环境 3.2 源码及测试 3.2.1 源码 3.2.2 测试结果&#xff08;太慢&#xff09; 3.2.3 源码修改 3.2.4 异常及解决 一、错误 The driver has not received any packets from the server. 二…

Openresty(二十一)ngx.balance和balance_by_lua灰度发布

一 openresty实现灰度发布 ① 灰度发布 说明&#xff1a; 早期博客对灰度发布的概念进行解读,并且对原生 nginx灰度实现进行讲解后续&#xff1a; 主要拿节点引流的灰度发布,并且关注gray灰度策略 相关借鉴 ② 回顾HTTP反向代理流程 ngx_http_upstream 可操作点&#…