【转】-Java实现生产者和消费者的5种方式

Java实现生产者和消费者的5种方式

该博客转载自掘金Java实现生产者和消费者的5种方式

1. 前言

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

1706867654525.webp

现在用四种方式来实现生产者消费者模型

2. wait()和notify()方法的实现

这也是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

/*** 生产者和消费者,wait()和notify()的实现* @author ZGJ* @date 2017年6月22日*/
public class Test1 {private static Integer count = 0;private static final Integer FULL = 10;private static String LOCK = "lock";public static void main(String[] args) {Test1 test1 = new Test1();new Thread(test1.new Producer()).start();new Thread(test1.new Consumer()).start();new Thread(test1.new Producer()).start();new Thread(test1.new Consumer()).start();new Thread(test1.new Producer()).start();new Thread(test1.new Consumer()).start();new Thread(test1.new Producer()).start();new Thread(test1.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}synchronized (LOCK) {while (count == FULL) {try {LOCK.wait();} catch (Exception e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);LOCK.notifyAll();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (LOCK) {while (count == 0) {try {LOCK.wait();} catch (Exception e) {}}count--;System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);LOCK.notifyAll();}}}}
}

结果:

Thread-0生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-2生产者生产,目前总共有1
Thread-6生产者生产,目前总共有2
Thread-7消费者消费,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-6生产者生产,目前总共有2
Thread-1消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-2生产者生产,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-2生产者生产,目前总共有2
Thread-1消费者消费,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-2生产者生产,目前总共有1

3. 可重入锁ReentrantLock的实现

java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,通过对lock的lock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。

可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响,简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*** 生产者和消费者,ReentrantLock的实现* * @author ZGJ* @date 2017年6月22日*/
public class Test2 {private static Integer count = 0;private static final Integer FULL = 10;//创建一个锁对象private Lock lock = new ReentrantLock();//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空private final Condition notFull = lock.newCondition();private final Condition notEmpty = lock.newCondition();public static void main(String[] args) {Test2 test2 = new Test2();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}//获取锁lock.lock();try {while (count == FULL) {try {notFull.await();} catch (InterruptedException e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count);//唤醒消费者notEmpty.signal();} finally {//释放锁lock.unlock();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}lock.lock();try {while (count == 0) {try {notEmpty.await();} catch (Exception e) {e.printStackTrace();}}count--;System.out.println(Thread.currentThread().getName()+ "消费者消费,目前总共有" + count);notFull.signal();} finally {lock.unlock();}}}}
}

4. 阻塞队列BlockingQueue的实现

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  1. 当队列满了的时候进行入队列操作
  2. 当队列空了的时候进行出队列操作
    因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
    从上可知,阻塞队列是线程安全的。
    下面是BlockingQueue接口的一些方法:
操作 抛异常 特定值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove(o) poll(o) take(o) poll(timeout, timeunit)
检查 element(o) peek(o)

这四类方法分别对应的是:

  1. ThrowsException:如果操作不能马上进行,则抛出异常
  2. SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
  3. Blocks:如果操作不能马上进行,操作会被阻塞
  4. TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false

下面来看由阻塞队列实现的生产者消费者模型,这里我们使用take()和put()方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/*** 使用BlockingQueue实现生产者消费者模型* @author ZGJ* @date 2017年6月29日*/
public class Test3 {private static Integer count = 0;//创建一个阻塞队列final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);public static void main(String[] args) {Test3 test3 = new Test3();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}try {blockingQueue.put(1);count++;System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}try {blockingQueue.take();count--;System.out.println(Thread.currentThread().getName()+ "消费者消费,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();}}}}
}

5. 信号量Semaphore的实现

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,在操作系统中是一个非常重要的问题,可以用来解决哲学家就餐问题。Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量,可以使用acquire()方法获得一个许可,当许可不足时会被阻塞,release()添加一个许可。在下列代码中,还加入了另外一个mutex信号量,维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行

import java.util.concurrent.Semaphore;
/*** 使用semaphore信号量实现* @author ZGJ* @date 2017年6月29日*/
public class Test4 {private static Integer count = 0;//创建三个信号量final Semaphore notFull = new Semaphore(10);final Semaphore notEmpty = new Semaphore(0);final Semaphore mutex = new Semaphore(1);public static void main(String[] args) {Test4 test4 = new Test4();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}try {notFull.acquire();mutex.acquire();count++;System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();} finally {mutex.release();notEmpty.release();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}try {notEmpty.acquire();mutex.acquire();count--;System.out.println(Thread.currentThread().getName()+ "消费者消费,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();} finally {mutex.release();notFull.release();}}}}
}

6. 管道输入输出流PipedInputStream和PipedOutputStream实现

在java的io包下,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。
它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。

使用方法:先创建一个管道输入流和管道输出流,然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据,消费者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯,但是这种方式在生产者和生产者、消费者和消费者之间不能保证同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的,多个生成者和多个消费者者之间则不行

/*** 使用管道实现生产者消费者模型* @author ZGJ* @date 2017年6月30日*/
public class Test5 {final PipedInputStream pis = new PipedInputStream();final PipedOutputStream pos = new PipedOutputStream();{try {pis.connect(pos);} catch (IOException e) {e.printStackTrace();}}class Producer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = (int) (Math.random() * 255);System.out.println(Thread.currentThread().getName() + "生产者生产了一个数字,该数字为: " + num);pos.write(num);pos.flush();} } catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = pis.read();System.out.println("消费者消费了一个数字,该数字为:" + num);}} catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) {Test5 test5 = new Test5();new Thread(test5.new Producer()).start();new Thread(test5.new Consumer()).start();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/741553.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

geoserver忘记密码的解决过程

geoserver文档 https://www.osgeo.cn/geoserver-user-manual/security/passwd.html 在geoserver数据目录,找到 /data/GeoserverData/security/usergroup/default/users.xml文件将<user enabled="true" name="admin" password="***">中的p…

最近很火的Vue Vine是如何实现一个文件中写多个组件

相信你最近应该看到了不少介绍Vue Vine的文章,这篇文章我们另辟蹊径来讲讲Vue Vine是如何实现在一个文件里面写多个vue组件。前言 在今年的Vue Conf 2024大会上,沈青川大佬(维护Vue/Vite 中文文档)在会上介绍了他的新项目Vue Vine。Vue Vine提供了全新Vue组件书写方式,主要…

[Java SE] 字节操作工具类:ByteUtils

0 引言与嵌入式软件数据交互过程中,必然涉及各种的、大量的字节操作场景。如:16进制与10进制、2进制间的转换,字符串、byte数组与int之间的转换等。故此有此核心工具类的沉淀。1 ByteUtils 依赖 <properties><!-- 编程提效工具 --><lombok.version>1.18.2…

Android Framework之Activity启动流程

原文地址 https://juejin.cn/post/7212432799848579133 启动Activty触发点为Activity中的startActivity。 Activity startActivity -> Instrumentation --> execStartActivitytry {intent.migrateExtraStreamToClipData(who);intent.prepareToLeaveProcess(who);//int re…

3大主流分布式事务框架详解(图文总结)

1 简要介绍 随着微服务架构的不断发展,分布式系统逐渐普及到后端领域的每一个角落。 在分布式系统中,跨多个服务的数据一致性一直是一个重大挑战,为解决这一挑战,分布式事务应运而生。 作者在之前的文章《五种分布式事务解决方案》和《4大主流分布式算法介绍》中,详细介绍…

数据跨境传输法规日趋完善,企业如何规避合规风险?

随着全球化的发展,跨境数据传输变得日益频繁。在数字化时代,数据安全是企业运营的关键。数据跨境传输由于涉及不同国家和地区,其安全合规性面临着更大的风险和挑战。2022年,国家网信办发布了《数据出境安全评估办法》(以下简称《办法》),并于同年9月1日开始实施。《办法…

asp.net mvc中数据传递的几种方式

2.第二步,要想session正常使用,在启动文件中配置如下 3.第三步,接受数据

保密U盘仍然存在数据安全危机?该怎么用才能规避?

保密U盘以前主要用于国家涉密单位或部门,但随着人们对于信息安全的重视越来越高,在民用企事业单位以及个人用户方面也应用得日益广泛。使用保密U盘在安全性上比普通U盘具有优势,但却仍然存在安全危机,具体为: 病毒和木马程序的风险: 保密U盘在使用过程中,极易被植入“摆…

离散傅里叶变换(DFT)和快速傅里叶变换(FFT)

离散傅里叶变换(DFT)和快速傅里叶变换(FFT)是信号处理和数字信号处理中的基本工具。它们用于将时间域的信号转换为频率域的表示,帮助分析信号的频谱成分。 1. 离散傅里叶变换(DFT) 1.1 DFT的基本概念 DFT是将离散时间信号转换为频域表示的工具。对于长度为 N 的离散信号…

Simple WPF: WPF实现一个MINIO等S3兼容对象存储上传文件的小工具

之前在阿里云ECS 99元/年的活动实例上搭建了一个测试用的MINIO服务,以前都是直接当基础设施来使用的,这次准备自己学一下S3兼容API相关的对象存储开发,因此有了这个小工具。目前仅包含上传功能,后续计划开发一个类似图床的对象存储应用。最新内容优先发布于个人博客:小虎技…

前缀和简析

前缀和 前置知识:$\sum_{i = 1}^{r}{a_i} = a_l + a_{l + 1} + \dots + a_{r - 1} + a_r$ 考虑以下数组:$i$ $1$ $2$ $3$$a_I$ $3$ $5$ $7$如果我们要查询 $\sum_{i = 1}^{2}{a_i}$,很显然可以得到 $\sum_{i = 1}^{2}{a_i} = 3 + 5 = 8$。如果我们要查询 $\sum_{i = l}^{r}{…

推挽输出和开漏输出

一、推挽输出 1.1推挽输出的概念 推挽(push-pull)输出是由两个MOS或者三极管组成,两个管子始终保持一个导通,另一个截止的状态。当输入高电平时,叫做推;上管Q1导通,下管Q2关闭;电流走向VCC→Q1→Vout。当输入低电平时,叫做挽;上管Q1关闭,下管Q2导通;电流走向Vout→…