【Linux】生产者消费者模型

目录

一、生产者消费者模型的优点

二、基于BlockingQueue的生产者消费者模型

1、BlockingQueue

2、C++ queue模拟阻塞队列的生产消费模型

3、POSIX信号量

3.1、信号量接口

三、基于环形队列的生产消费模型

1、模型说明

2、代码实现

3、互斥锁与信号量


一、生产者消费者模型的优点

 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点:

  • 解耦
  • 支持并发
  • 支持忙闲不均
  • 高效:体现在生产者获取数据时,消费者可以从“仓库”中读取数据。消费者在处理数据时,生产者可以向“仓库”存放数据。互不影响。

 在生产者消费者模型中,存在如下几个概念:

  • 3种关系:
    生产者与生产者之间:互斥关系
    消费者与消费者之间:互斥关系
    生产者与消费者之间:同步关系和互斥关系
  • 2种角色:
    生产者和消费者
  • 1个交易场所:
    通常是缓冲区

二、基于BlockingQueue的生产者消费者模型

1、BlockingQueue

 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

2、C++ queue模拟阻塞队列的生产消费模型

模拟单消费者单生产者模型的C++代码如下:

//BlockQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;const int gcap = 5;template<class T>
class BlockQueue
{
public:BlockQueue(const int cap = gcap):_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_consumerCond, nullptr);pthread_cond_init(&_productorCond, nullptr);}bool isFull() { return _q.size() == _cap; }bool isEmpty() { return _q.size() == 0; }void push(const T& in){pthread_mutex_lock(&_mutex);//细节1:一定要保证,在任何时候,都是符合条件,才进行生产//为了防止误唤醒的操作,这里选择使用while判断,而不是if判断while(isFull()) //1.我们只能在临界区内部,判断临界资源是否就绪,这注定了我们当前一定是持有锁的{            //2.要让线程休眠等待,就不能持有锁等待//  所以phread_cond_wait函数要有锁的释放的能力pthread_cond_wait(&_productorCond, &_mutex);//3.休眠后,再醒来时,需要从临界区内部继续运行,因为线程是在临界区被切走的//4.所以当线程被唤醒时,继续从pthread_cond_wait函数向后运行,又要重新申请锁,申请成功才会彻底返回}// 没满_q.push(in);// 唤醒函数放在释放锁的前后都可以。因为唤醒另一个线程时,另一个线程会在申请锁的地方阻塞住等待pthread_cond_signal(&_consumerCond);pthread_mutex_unlock(&_mutex);}void pop(T* out){pthread_mutex_lock(&_mutex);//为了防止误唤醒,使用while进行判断while(isEmpty()){pthread_cond_wait(&_consumerCond, &_mutex);}*out = _q.front();_q.pop();pthread_cond_signal(&_productorCond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumerCond);pthread_cond_destroy(&_productorCond);}
private:queue<T> _q;int _cap;pthread_mutex_t _mutex; //访问同一个队列,所以只需要一把锁pthread_cond_t _consumerCond;  //消费者对应的条件变量,空,waitpthread_cond_t _productorCond; //生产者对应的条件变量,满,wait
}; //main.cc
#include "blockQueue.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>void* consumer(void* args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);while(1){sleep(1);//1.从blockqueue中获取数据int data = 0;bq->pop(&data);//2.结合某种业务逻辑,处理数据cout << "consumer data: " << data << endl;}
}void* productor(void* args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);while(1){//1.先通过某种渠道获取数据int data = rand() % 10 + 1;//2.将数据推送到blockqueue,完成生产过程bq->push(data);cout << "productor data: " << data << endl;}
}int main()
{srand((uint64_t)time(nullptr) ^ getpid());BlockQueue<int>* bq = new BlockQueue<int>();//单生产单消费pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

阻塞队列不是只能放整数、字符串等数据类型的,也可以放对象。

下面是阻塞队列中存放任务的模拟实现代码:

//task.hpp
#pragma once
#include <iostream>
#include <string>class Task
{
public:Task(){}Task(int x, int y, int op): _x(x), _y(y), _op(op), _result(0), _exitCode(0){}void operator()(){switch (_op){case '+':_result = _x + _y;break;case '-':_result = _x - _y;break;case '*':_result = _x * _y;break;case '/':{if (_y == 0)_exitCode = -1;else_result = _x / _y;}break;case '%':{if (_y == 0)_exitCode = -2;else_result = _x % _y;}break;default:break;}}std::string formatArg(){return std::to_string(_x) + _op + std::to_string(_y) + '=';}std::string formatRes(){return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";}~Task(){}private:int _x;int _y;char _op;int _result;int _exitCode;
};//main.cc
#include "blockQueue.hpp"
#include "task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>void* consumer(void* args)
{BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);while(1){//1.从blockqueue中获取数据Task t;bq->pop(&t);t();//2.结合某种业务逻辑,处理数据cout << "consumer data: " << t.formatArg() << t.formatRes() << endl;}
}void* productor(void* args)
{BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);string opers = "+-*/%";while(1){sleep(1);//1.先通过某种渠道获取数据int x = rand() % 20 + 1;int y = rand() % 10 + 1;char op = opers[rand() % opers.size()];//2.将数据推送到blockqueue,完成生产过程Task t(x, y, op);bq->push(t);cout << "productor Task: " << t.formatArg() << " ? " << endl;}
}int main()
{srand((uint64_t)time(nullptr) ^ getpid());BlockQueue<Task>* bq = new BlockQueue<Task>();//单生产单消费pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

 实际上,虽然我们模拟实现的是单生产者,单消费者的模型,但是上面的代码是可以直接承载多生产者多消费者的模型的,因为所有的生产和消费使用的都是同一把锁,根本原因在于,生产和消费访问的是同一个queue,queue被当作整体使用。

3、POSIX信号量

 POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

 信号量本质是一个计数器,用于描述临界资源中资源数目。每一个线程,在访问对应的资源时,先申请信号量,申请成功,表示该线程在以后的某个时刻被允许使用该资源,否则不允许。信号量的工作机制是一种资源预定机制

3.1、信号量接口

初始化信号量:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值

销毁信号量:

int sem_destroy(sem_t *sem);

等待信号量:

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量:

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()

 下面基于环形队列的生产消费模型,使用POSIX信号量重写上面的模型代码。

三、基于环形队列的生产消费模型

1、模型说明

环形队列采用数组模拟,用模运算来模拟环状特性:

 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态:

 我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。

需要注意的问题:

  • 生产者与消费者关心的“资源”不同。生产者关心空间,消费者关心数据。
  • 只要信号量不为0,就表示资源可用,表示线程可访问
  • 只要访问环形队列不同的区域,生产和消费就可以同时进行。
  • 只有当队列为空或者为满时,生产者与消费者才会访问同一个位置。
    当队列为空时,让生产者先动。
    当队列为满时,让消费者先动。

2、代码实现

 使用POSIX信号量来实现该模型。生产者关心的是空间资源,设置信号量初始值为 N ,消费者关心的资源是数据,设置信号量初始值为 0

下面是单生产单消费模型代码:

//RingQueue.hpp
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>using namespace std;static const int N = 5;template<class T>
class RingQueue
{
public:RingQueue(int num = N):_ring(num),_cap(num),_c_step(0),_p_step(0){sem_init(&_data_sem, 0, 0);sem_init(&_space_sem, 0, num);}void push(const T& in){//生产P(_space_sem);//一定拿到了对应的空间资源!不用再做判断//确定拿到了哪一个资源_ring[_p_step++] = in;_p_step %= _cap;V(_data_sem);}void pop(T* out){//消费P(_data_sem);*out = _ring[_c_step++];_c_step %= _cap;V(_space_sem);}~RingQueue(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);}private:void P(sem_t& s){sem_wait(&s);}void V(sem_t& s){sem_post(&s);}private:std::vector<T> _ring;int _cap; //环形队列的容量sem_t _data_sem; //只有消费者关心sem_t _space_sem; //只有生产者关心int _c_step;     //消费位置int _p_step;     //生产位置
};//task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>class Task
{
public:Task(){}Task(int x, int y, int op): _x(x), _y(y), _op(op), _result(0), _exitCode(0){}void operator()(){switch (_op){case '+':_result = _x + _y;break;case '-':_result = _x - _y;break;case '*':_result = _x * _y;break;case '/':{if (_y == 0)_exitCode = -1;else_result = _x / _y;}break;case '%':{if (_y == 0)_exitCode = -2;else_result = _x % _y;}break;default:break;}usleep(100000);}std::string formatArg(){return std::to_string(_x) + _op + std::to_string(_y) + "= ?";}std::string formatRes(){return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";}~Task(){}private:int _x;int _y;char _op;int _result;int _exitCode;
};//main.cc
#include "RingQueue.hpp"
#include "task.hpp"
#include <time.h>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>const char* ops = "+-*/%";void* consumerRoutine(void* args)
{RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);while(true){Task t;rq->pop(&t);t();cout << "consumer done: " << t.formatRes() << endl;sleep(1); }
}void* productorRoutine(void* args)
{RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);while(true){int x = rand() % 100;int y = rand() % 100;char op = ops[(x + y) % strlen(ops)];Task t(x, y, op);rq->push(t);cout << "productor done: " << t.formatArg() << endl;}
}int main()
{srand(time(nullptr) ^ getpid());RingQueue<Task>* rq = new RingQueue<Task>();//单生产,单消费pthread_t c, p;pthread_create(&c, nullptr, consumerRoutine, rq);pthread_create(&p, nullptr, productorRoutine, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete rq;return 0;
}

运行观察结果:

 如果要实现多生产、多消费的模型,需要修改代码。其中修改的本质是维护多生产与多消费之间的关系。

 为了保证消费者与消费者之间、生产者与生产者之间的原子性,我们需要进行加锁操作。

 注意:这里是信号量与锁配合使用,应该先申请信号量,再申请锁。这是因为:

  •  如果先申请锁,那么其他线程就无法进入函数,没办法提前处理分配数据,会降低效率。
  •  不用在临界区内部做判断,就可以知道临界资源的使用情况,如果已经没有资源了,那么申请锁没有任何意义,浪费资源。
//RingQueue.hpp
#pragma once#include <iostream>
#include <vector>
#include <mutex>
#include <semaphore.h>using namespace std;static const int N = 5;template<class T>
class RingQueue
{
public:RingQueue(int num = N):_ring(num),_cap(num),_c_step(0),_p_step(0){sem_init(&_data_sem, 0, 0);sem_init(&_space_sem, 0, num);pthread_mutex_init(&_c_mtx, nullptr);pthread_mutex_init(&_p_mtx, nullptr);}//生产void push(const T& in){P(_space_sem);Lock(_p_mtx);//一定拿到了对应的空间资源!不用再做判断//确定拿到了哪一个资源_ring[_p_step++] = in;_p_step %= _cap;Unlock(_p_mtx);V(_data_sem);}//消费void pop(T* out){P(_data_sem);Lock(_c_mtx);*out = _ring[_c_step++];_c_step %= _cap;Unlock(_c_mtx);V(_space_sem);}~RingQueue(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);pthread_mutex_destroy(&_c_mtx);pthread_mutex_destroy(&_p_mtx);}private:void P(sem_t& s){sem_wait(&s);}void V(sem_t& s){sem_post(&s);}void Lock(pthread_mutex_t& m){pthread_mutex_lock(&m);}void Unlock(pthread_mutex_t& m){pthread_mutex_unlock(&m);}private:std::vector<T> _ring;int _cap; //环形队列的容量sem_t _data_sem; //只有消费者关心sem_t _space_sem; //只有生产者关心int _c_step;     //消费位置int _p_step;     //生产位置pthread_mutex_t _c_mtx;  //消费者的锁pthread_mutex_t _p_mtx;  //生产者的锁
};//task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>class Task
{
public:Task(){}Task(int x, int y, int op): _x(x), _y(y), _op(op), _result(0), _exitCode(0){}void operator()(){switch (_op){case '+':_result = _x + _y;break;case '-':_result = _x - _y;break;case '*':_result = _x * _y;break;case '/':{if (_y == 0)_exitCode = -1;else_result = _x / _y;}break;case '%':{if (_y == 0)_exitCode = -2;else_result = _x % _y;}break;default:break;}usleep(100000);}std::string formatArg(){return std::to_string(_x) + _op + std::to_string(_y) + "= ?";}std::string formatRes(){return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";}~Task(){}private:int _x;int _y;char _op;int _result;int _exitCode;
};//main.cc
#include "RingQueue.hpp"
#include "task.hpp"
#include <time.h>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>const char* ops = "+-*/%";void* consumerRoutine(void* args)
{RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);while(true){Task t;rq->pop(&t);t();cout << "consumer done: " << t.formatRes() << endl;sleep(1); }
}void* productorRoutine(void* args)
{RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);while(true){int x = rand() % 100;int y = rand() % 100;char op = ops[(x + y) % strlen(ops)];Task t(x, y, op);rq->push(t);cout << "productor done: " << t.formatArg() << endl;}
}int main()
{srand(time(nullptr) ^ getpid());RingQueue<Task>* rq = new RingQueue<Task>();// //单生产,单消费// pthread_t c, p;// pthread_create(&c, nullptr, consumerRoutine, rq);// pthread_create(&p, nullptr, productorRoutine, rq);// pthread_join(c, nullptr);// pthread_join(p, nullptr);//多生产,多消费pthread_t c[3], p[2];for(int i = 0; i < 3; i++)pthread_create(c + i, nullptr, consumerRoutine, rq);for(int i = 0; i < 2; i++)pthread_create(p + i, nullptr, productorRoutine, rq);for(int i = 0; i < 3; i++)pthread_join(c[i], nullptr);for(int i = 0; i < 2; i++)pthread_join(p[i], nullptr);delete rq;return 0;
}

 多生产、多消费的意义不在于从缓冲区中放入和拿取,而在于放入前并发构建Task,获取后多线程可以并发处理Task,因为这些操作没有加锁。

3、互斥锁与信号量

 什么时候使用锁,什么时候使用信号量,这取决于对应的临界资源是否被整体使用。如果被整体使用,则使用锁。如果被分开使用,则使用信号量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/17566.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows+IDEA+Nginx反向代理本机实现简单集群

先简单创建一个项目&#xff0c;可以是Maven也可以是Spring Initializr&#xff0c;如果是 Maven则需要自己配置启动类 按照目录路径创建controller类 package com.cloud.SR.controller;import org.springframework.beans.factory.annotation.Autowired; import org.springfram…

动作捕捉技术在发布会中的应用,虚拟数字人如何实现实时动作捕捉

动作捕捉又称为动态捕捉&#xff0c;是一种将真人动作转化为数字数据的技术&#xff0c;通过传感器等设备记录真人的运动轨迹&#xff0c;再将这些动作捕捉数据转化为计算机可以识别的数字数据&#xff0c;近年来在发布会中得到了广泛应用。 素材源于网络 在2023海丝之路文化和…

Linux:项目自动化构建工具——make/Makefile

文章目录 一.make与Makefile的关系1.Makefile2.make 二.项目清理1.clean2. .PHONY 前言&#xff1a; 本章主要内容有认识与学习Linux环境下如何使用项目自动化构建工具——make/makefile。 一.make与Makefile的关系 当我们编写一个较大的软件项目时&#xff0c;通常需要将多个…

集成学习-BaggingVoting和多个模型的混淆矩阵

当涉及到集成学习时&#xff0c;投票法和袋装法是两种常见的技术&#xff0c;用于将多个基学习器&#xff08;base learner&#xff09;组合成一个强大的集成模型。 投票法&#xff08;Voting&#xff09;&#xff1a;投票法是一种简单且常用的集成学习方法。在投票法中&#…

Graalvm编译spring boot 3 + jpa 的原生镜像

编译spring boot 3 native jpa的原生镜像 其中涉及版本&#xff1a; maven: 3.5.4 jdk: 17 graalvm: 22.3 springboot jpa: 3.0.8 一、Windows 1、graalvm安装 GraalVM22.3.0安装地址 解压到任意目录后添加JAVA_HOME环境变量 新增path&#xff1a;%JAVA_HOME%与%JAVA_H…

MySQL数据库(三)

前言 聚合查询、分组查询、联合查询是数据库知识中最重要的一部分&#xff0c;是将表的行与行之间进行运算。 目录 前言 一、聚合查询 &#xff08;一&#xff09;聚合函数 1、count 2、sum 3、avg 4、max 5、min 二、分组查询 &#xff08;一&#xff09;group by …

【python手写算法】利用梯度下降实现线性拟合

利用梯度下降实现线性拟合&#xff0c;效果和sklearn LinearRegression()差不多。 学习率初始设置为0.1结果算高的&#xff0c;直接让我的参数变成了nan。&#xff08;体会到了飞出去的感觉&#xff09; 把学习率调小了之后就正常了 # codingutf-8 import matplotlib.pyplot a…

<数据结构>NO9.选择类排序|直接选择排序|堆排序

文章目录 选择排序1.直接选择排序优化直接选择排序 2. 堆排序 选择排序 基本思想 选组排序是从待排序数据中选出最大/最小的元素放入到序列的起始位置&#xff0c;直到待排序数据全部有序。 直接选择排序和堆排序的基本思想均符合选择排序。 1.直接选择排序 假设数据按升序…

时间序列预测 | Matlab基于粒子群算法(PSO)优化径向基神经网络(PSO-RBF)的时间序列预测

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 时间序列预测| Matlab基于粒子群算法(PSO)优化径向基神经网络(PSO-RBF)的时间序列预测 评价指标包括:MAE、MBE和R2等,代码质量极高,方便学习和替换数据。要求2018版本及以上。 部分源码 %% 清空环境变量 warni…

改进二进制粒子群算法在配电网重构中的应用(Matlab实现)【论文复现】

目录 0 概述 1 配电网重构的目标函数 2 算例 3 matlab代码实现 0 概述 配电系统中存在大量的分段开关和联络开关&#xff0c;配电网重构正是通过调整分段开关和联络升大的组合状态来变换网络结构,用于优化配电网某些指标&#xff0c;使其达到最优状态。正常运行时,则通过两…

高效工作:ADManager Plus的利器

在当今快节奏的商业环境中&#xff0c;高效的工作是成功的关键。对于IT管理员来说&#xff0c;管理和维护Active Directory&#xff08;AD&#xff09;环境是一项重要的任务。然而&#xff0c;手动处理繁琐的AD管理任务可能会占用大量的时间和精力&#xff0c;影响工作效率。为…

RLHF-基于人类反馈的强化学习

RLHF 文章目录 RLHF强化学习基础回顾为什么要使用基于人类反馈的强化学习大纲RLHF的起源大预言模型中的RLHF案例 ChatGPT RLHF中的技术细节预训练语言模型训练奖励模型基于RL进行微调 RLHF 的未来 强化学习基础回顾 智能体通过采取行动与环境进行交互&#xff0c;并返回状态和奖…