Linux线程(四) 生产者消费者模型

目录

一、什么是生产者消费者模型

基本概念

优点以及应用场景

二、 基于阻塞队列的生产者消费者模型

三、POSIX信号量

四、基于环形队列的生产消费模型


一、什么是生产者消费者模型

        Linux下的生产者消费者模型是一种经典的多线程或多进程编程设计模式,它用于解决资源访问的同步问题,特别是在涉及任务分配、数据处理和资源共享的场景中。

基本概念

生产者:负责生成数据项并将其放入共享的缓冲区(队列)。当缓冲区满时,生产者可能需要等待(阻塞)直到有空间可用。

消费者:从缓冲区中取出数据项进行处理。如果缓冲区为空,消费者可能需要等待(阻塞)直到有新数据产生。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

优点以及应用场景

生产者消费者模型作为一种经典的并发设计模式,在软件开发中特别是涉及多线程或多进程协作的场景下,展现出诸多优势。

解耦:生产者和消费者之间通过共享缓冲区(如队列)进行间接通信,减少了直接的依赖关系,使得生产者和消费者的代码可以独立开发和维护,提高了模块的复用性和系统的灵活性。

支持并发:生产者和消费者通常作为独立的执行单元运行,可以并行工作,充分利用多核处理器的计算能力,提升系统整体的吞吐量和响应速度。

平衡资源利用:通过调整缓冲区的大小和管理生产者与消费者的数量,可以有效平衡生产速率和消费速率,防止生产过剩导致资源浪费或者消费过快导致资源饥饿,从而优化系统性能。

应用场景:

生产者消费者模型广泛应用于各种领域,如网络通信中的数据包处理、数据库的异步写入、GUI应用中的事件处理系统、多线程下载和处理等,任何需要解耦数据生产与数据消费过程的场景都可以考虑使用这一模式。 

二、 基于阻塞队列的生产者消费者模型

        在这个模型中,阻塞队列扮演了生产者和消费者之间的中介角色,它负责存储生产者产生的数据,并安全地传递给消费者处理。关键在于,阻塞队列能够自动管理同步问题,确保线程安全,同时提供阻塞机制来平衡生产与消费的速度。

阻塞队列属于仓库这一临界资源,而同一时刻只能有一个线程进入阻塞队列进行操作,所以要用到互斥锁,同时还要思考如果是消费者该如何知道有东西可以买了呢,如果是生产者如何知道仓库的东西不够了需要生产呢,这个时候就需要两个条件变量push_cond和pop_cond

关于条件变量在上篇文章中讲过,可以参考:

Linux线程(三)死锁与线程同步

push_cond

当生产者将阻塞队列放满时,就需要等待消费者消费完来唤醒生产者继续生产。

pop_cond

当消费者把队列消费空时,消费者会等待生产者往阻塞队列加资源后来唤醒消费者继续消费。 

接下来我们来实现一个基于阻塞队列的生产者消费者模型

访问阻塞队列一定会涉及到加锁,我们首先可以设计一个LockGuard(RAII)思想,利用类出作用域自动销毁来实现解锁,防止忘记解锁造成死锁。

LockGuard.hpp

#pragma once
#include <pthread.h>class Mutex
{
private:pthread_mutex_t* _mutex;public:Mutex(pthread_mutex_t* lock):_mutex(lock){}void Lock(){pthread_mutex_lock(_mutex);}void Unlock(){pthread_mutex_unlock(_mutex);}~Mutex(){}
};class LockGuard
{
private:Mutex _mutex;
public:LockGuard(pthread_mutex_t *lock):_mutex(lock){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}
};

随后我们来实现一个任务类,模仿消费者拿到资源:

Task.hpp:

#pragma once
#include <iostream>
#include <string>const int defaultvalue = 0;enum
{ok = 0,div_zero,mod_zero,unknow
};const std::string opers = "+-*/%)(&";class Task
{
public:Task(){}Task(int x, int y, char op): data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok){}void Run(){switch (oper){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;case '%':{if (data_y == 0)code = mod_zero;elseresult = data_x % data_y;}break;default:code = unknow;break;}}void operator()(){Run();}std::string PrintTask(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=?";return s;}std::string PrintResult(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=";s += std::to_string(result);s += " [";s += std::to_string(code);s += "]";return s;}~Task(){}private:int data_x;int data_y;char oper; // + - * / %int result;int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
}; 

随后我们来实现一个阻塞队列,要注意一个时刻只能有一个线程访问,所以再push操作和pop操作时要加锁。

block_queue.hpp:

#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
#include"LockGuard.hpp"const int defaultcap=5;//默认容量为5
template<class T>
class block_queue
{
private:std::queue<T> _q;int _capacity;   //_q.size() == _capacity, 满了,不能在生产,_q.size() == 0, 空,不能消费了pthread_mutex_t _mutex;pthread_cond_t _push_cond;  //给生产者pthread_cond_t _pop_cond;  //给消费者// int _consumer_water_line;  // _consumer_water_line = _capacity / 3 * 2// int _productor_water_line; // _productor_water_line = _capacity / 3
public:block_queue(int cap=defaultcap):_capacity(cap){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_push_cond,nullptr);pthread_cond_init(&_pop_cond,nullptr);}bool IsFull(){return _q.size() == _capacity;}bool IsEmpty(){return _q.size() == 0;}void Push(const T &in){LockGuard lockguard(&_mutex);while(IsFull())pthread_cond_wait(&_push_cond,&_mutex);_q.push(in);//通知消费者可以消费了// if(_q.size() > _productor_water_line) pthread_cond_signal(&_c_cond);  // 也可以是当资源数量大于指定阈值时再通知pthread_cond_signal(&_pop_cond);}void Pop(T *out)//要取出任务{LockGuard lockgugrd(&_mutex);while(IsEmpty())pthread_cond_wait(&_pop_cond,&_mutex);*out=_q.front();_q.pop();//通知生产者可以生产了// if(_q.size() < _consumer_water_line) pthread_cond_signal(&_p_cond);      //也可以是当资源数量大于指定阈值时再通知pthread_cond_signal(&_push_cond);}~block_queue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_push_cond);pthread_cond_destroy(&_pop_cond);}};

makefile文件,当然也可手动生成可执行文件,使用这个较为方便

test_block:Main.ccg++ -o  $@ $^ -lpthread -std=c++11.PHONY:clean
clean:rm -f test_block

使用Main.cc来测试这个模型:

#include"block_queue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>void *consumer(void *args)
{block_queue<Task> *bq=static_cast<block_queue<Task>* >(args);while(true){sleep(1);Task t;//取出任务bq->Pop(&t);t();//运行任务std::cout<<"consumer data: "<<t.PrintResult()<<std::endl;}return nullptr;
}void *producror(void *args)
{block_queue<Task> *bq=static_cast<block_queue<Task>*>(args);while(true){sleep(1);int x=rand()%10;usleep(rand()%123);int y=rand()%10;usleep(rand()%1234);char oper=opers[rand()%(opers.size())];Task t(x,y,oper);std::cout<<"productor data: "<<t.PrintTask()<<std::endl;bq->Push(t);}return nullptr;
}int main()
{srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self()); // 只是为了形成更随机的数据block_queue<Task> *bq=new block_queue<Task>();pthread_t c,p;pthread_create(&c,nullptr,consumer,bq);pthread_create(&p,nullptr,producror,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}

运行结果如下

可以看到生产者每生产一个,消费者就拿到一个。

如果让生产者不休眠

可以看到消费者将阻塞队列填满,消费者取队首元素来执行。

总体流程就是:

生产过程:生产者创建数据项,并尝试将数据放入阻塞队列。如果队列已达到其容量限制,生产者的push()操作将被阻塞,直到队列中有空间可以添加新数据。

消费过程:消费者从阻塞队列中取出数据项进行处理。当队列为空时,消费者的pop()操作也会被阻塞,直到有新的数据被生产者放入队列。

通知与唤醒:一旦队列状态发生变化(例如有数据被放入或移出),阻塞队列会自动唤醒相应等待的线程,实现高效且线程安全的同步。 

三、POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

在传统的基于阻塞队列的生产者消费者模型中,虽然使用阻塞队列自身可以避免一些复杂的同步问题,但确实存在这样一个情况:当生产者试图向满队列添加数据或消费者试图从空队列中取数据时,它们都需要对整个队列进行加锁,这实际上导致了生产者和消费者之间不必要的锁竞争。

比如生产者消费者模型,生产者只需要关注空间是否足够生产,消费者只需要关注资源是否足够消费,所以开始的时候生产者的信号量就是队列的大小,消费者的信号量就是0,当生成者生产一个资源,生产者信号量-1,消费者+1;当消费者消费一个资源, 生产者信号量+1,消费者-1。

使用POSIX信号量确实可以进一步优化这一模型,使得生产者与生产者之间、消费者与消费者之间存在锁竞争,而生产者和消费者之间不存在直接的锁竞争。这是因为信号量可以用来精确控制对资源的访问权限,而不仅仅是简单地锁定整个资源。

信号量的本质是一个计数器。

这个计数器用于跟踪某个资源(如共享内存区域、打印机等)的可用单位数。信号量机制通过这个计数器来控制多个进程或线程对共享资源的访问,确保资源的合理分配和同步。通过这个计数器的增加和减少,信号量不仅能够控制访问权限,还能协调进程间的同步,是解决并发控制问题的一种有效工具。  

 初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量(P操作)
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量(V操作)
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上一个生产者 - 消费者的例子是基于阻塞队列 , 其空间可以动态分配 , 现在基于固定大小的环形队列重写这个程序

四、基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性
生产者只需要关注空间 spaceSem 是否足够生产,消费者只需要关注资源 dataSem 是否足够消费,所以开始的时候生产者的信号量就是队列的大小,消费者的信号量就是0,当生成者生产一个资源,生产者信号量-1,消费者+1;当消费者消费一个资源, 生产者信号量+1,消费者-1。

如图所示

代码示例,基于环形队列的生产消费模型其中的资源依旧使用Task来模拟

ringqueue.hpp

#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include "LockGuard.hpp"// 定义默认队列大小
const int defaultSize = 5;// 泛型环形队列类模板
template <typename T>
class RingQueue
{
private:// 信号量P操作,减少信号量计数,若计数<0则阻塞当前线程void P(sem_t &sem){sem_wait(&sem);}// 信号量V操作,增加信号量计数,若唤醒等待的线程void V(sem_t &sem){sem_post(&sem);}public:// 构造函数,初始化环形队列RingQueue(int size = defaultSize): _ringQueue(size), _size(size), _prodStep(0), _consStep(0){// 初始化空间信号量,初始值为队列大小,表示初始时所有空间都是空闲的sem_init(&_spaceSem, 0, size);// 初始化数据信号量,初始值为0,表示队列初始无数据sem_init(&_dataSem, 0, 0);// 初始化生产者和消费者的互斥锁,保护各自的操作步骤pthread_mutex_init(&_prodMutex, nullptr);pthread_mutex_init(&_consMutex, nullptr);}// 向队列添加元素void Push(const T &item){// 1. 减少空间信号量,尝试获取生产空间,若无空间则阻塞P(_spaceSem);{// 2. 加生产者锁,确保生产操作的原子性LockGuard lockGuard(&_prodMutex);// 执行实际的入队操作_ringQueue[_prodStep] = item;_prodStep++;        // 移动生产指针_prodStep %= _size; // 环状处理边界}V(_dataSem);}// 从队列移除元素void Pop(T *outItem){// 1. 减少数据信号量,尝试获取数据,若无数据则阻塞P(_dataSem);{// 2. 加消费者锁,确保消费操作的原子性LockGuard lockGuard(&_consMutex);// 执行实际的出队操作*outItem = _ringQueue[_consStep];_consStep++;        // 移动消费指针_consStep %= _size; // 环状处理边界}//消费者V操作时不冲突,可以解锁 信号量的P操作(wait/减)和V操作(signal/增)都是原子操作。V(_spaceSem);}// 析构函数,释放资源~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_prodMutex);pthread_mutex_destroy(&_consMutex);}private:// 环形队列底层使用std::vector存储std::vector<T> _ringQueue;int _size; // 队列大小// 生产者和消费者的步进索引int _prodStep;int _consStep;// 信号量,管理空间和数据的可用性sem_t _spaceSem;sem_t _dataSem;// 互斥锁,分别保护生产者和消费者的步骤更新pthread_mutex_t _prodMutex;pthread_mutex_t _consMutex;
};

Main.cc

#include"ringqueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <pthread.h>
#include <ctime>void *Productor(void *args)
{// sleep(5);RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// 数据怎么来的?// 1. 有数据,从具体场景中来,从网络中拿数据// 生产前,你的任务从哪里来的呢???int data1 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODOusleep(rand() % 123);int data2 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODOusleep(rand() % 123);char oper = opers[rand() % (opers.size())];Task t(data1, data2, oper);std::cout << "productor task: " << t.PrintTask() << std::endl;// rq->push();rq->Push(t);sleep(1);}
}void *Consumer(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// sleep(1);Task t;rq->Pop(&t);t();std::cout << "consumer done, data is : " << t.PrintResult() << std::endl;}
}int main()
{srand((uint64_t)time(nullptr) ^ pthread_self());pthread_t c[3], p[2];// 唤醒队列中只能放置整形???// RingQueue<int> *rq = new RingQueue<int>();RingQueue<Task> *rq = new RingQueue<Task>();pthread_create(&p[0], nullptr, Productor, rq);pthread_create(&p[1], nullptr, Productor, rq);pthread_create(&c[0], nullptr, Consumer, rq);pthread_create(&c[1], nullptr, Consumer, rq);pthread_create(&c[2], nullptr, Consumer, rq);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(c[2], nullptr);return 0;
}

运行如图所示 

环形队列中的生产者和消费者通过同步与互斥机制维持着一种动态平衡,确保数据的连续生产和消费,体现了典型的生产者-消费者问题的解决方案。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/702452.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分体工业读写器的适用场景有哪些?

工业读写器根据设计方式不同&#xff0c;可分为一体式读写器和分体式读写器&#xff0c;不同读写器特点不同&#xff0c;适用场景也不同&#xff0c;下面我们就一起来了解一下超高频分体读写器适用场景有哪些。 超高频分体读写器介绍 超高频分体读写器是一种射频识别(RFID)设…

【ARM】解决Keil MDK报错提示找不到编译器路径的问题

问题场景&#xff1a; 在打开MDK的时候&#xff0c;会跳出提示 Warning: Registered ARM Compiler Version not found in path: ARMCLANG!”&#xff08;如图1&#xff09;这样的提示信息。提示无法找到编译器的路径&#xff0c;这样的问题经常出现在添加旧版本编译器过程中操…

消防物资存储|基于SSM+vue的消防物资存储系统的设计与实现(源码+数据库+文档)

消防物资存储系统 目录 基于SSM&#xff0b;vue的消防物资存储系统的设计与实现 一、前言 二、系统设计 三、系统功能设计 1用户功能模块 2 管理员功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介…

界面控件Telerik UI for WPF中文教程 - 如何轻松实现日期范围选择?

Telerik UI for WPF拥有超过100个控件来创建美观、高性能的桌面应用程序&#xff0c;同时还能快速构建企业级办公WPF应用程序。UI for WPF支持MVVM、触摸等&#xff0c;创建的应用程序可靠且结构良好&#xff0c;非常容易维护&#xff0c;其直观的API将无缝地集成Visual Studio…

物联网促进信息化——​青创智通工业物联网解决方案​

随着传感器网络&#xff08;WSN)、无线射频识别&#xff08;RFID&#xff09;以及微电子机械系统(MEIVIS&#xff09;等技术的不断成熟,扩展了人们对信息获取和使用的能力&#xff0c;并将提高制造效率、改善产品质量、降低产品成本和资源消耗、为用户提供更加透明和个性化的服…

【漏洞复现】泛微OA E-Cology SignatureDownLoad SQL注入漏洞

漏洞描述&#xff1a; 泛微OA E-Cology是一款面向中大型组织的数字化办公产品&#xff0c;它基于全新的设计理念和管理思想&#xff0c;旨在为中大型组织创建一个全新的高效协同办公环境。泛微OA E-Cology SignatureDownLoad存在SQL注入漏洞&#xff0c;允许攻击者非法访问和操…

unity 学习笔记

一、 事件顺序 gameObjet Instantiate gameObjet.自定义函数 gameObjet.Start 二、预设体使用 例子&#xff1a;Button 点击创建 预设体 BagPanel

常类API(Math,System,Runtime)

1、Math 是帮助我们用于进行数学计算的工具类私有化构造方法&#xff0c;所有的方法都是静态的 方法名 说明public static int abs(int a) 获取参数绝对值 public static double ceil(int a)向上取整public static double floor(int a)向下取…

axios传参方式

params参数通常用于GET请求添加查询参数&#xff0c;POST一般使用data参数传递参数 1、data传参 1-1、表单传参 // 方法定义 export function save(data) {return request({url: /url,headers: { Content-Type: multipart/form-data },method: post,data: data,}) }// 调用函…

Flutter 3.22 发布,快来看看有什么更新吧?

Flutter 3.22 发布&#xff0c;快来看看有什么更新吧&#xff1f; 本次 Flutter 跟随 Google I/O 发布的版本是 3.22 &#xff0c;该版本主要还是带来了 Vulkan backend 和 Wasm Native 的落地&#xff0c;另外还有一个重点就是 Dart macros &#xff0c;但是它更多只是一个预…

Windows11系统配置WSL2网络使它支持LAN访问

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、WSL2安装二、使用步骤1.NAT2.镜像 三、写在最后总结 前言 WSL2的出现感觉真的是一个惊喜&#xff0c;又想玩Linux&#xff0c;又怕日用搞不了的最佳替代方…

二分图及图匹配(图论学习总结部分内容)

文章目录 前言四、二分图及图匹配二分图常见模型二分图例题 e g 1 : eg1: eg1: [ Z J O I 2009 ZJOI2009 ZJOI2009​\][假期的宿舍](https://ac.nowcoder.com/acm/contest/34649/B)(二分图最大匹配板题) e g 2 : eg2: eg2:​​ [C-Going Home](https://ac.nowcoder.com/acm/con…