【Linux线程(三)】生产者消费者模型

目录

前言:

一、什么是生产者消费者模型

(一)概念

(二)321原则

1.三个关系

2. 两种角色

3.一个场所

(三)生产者消费者模型的优缺点

二、基于阻塞队列实现生产者消费者模型

(一)介绍

(二)代码实现

BlockQueue.hpp

LockGuard.hpp

Task.hpp

 Main.cc

三、POSIX信号量

四、基于环形队列实现生产者消费者模型

(一)原理

(二)代码实现

RingQueue.hpp

Task.hpp

Main.cc

 (五)总结


前言:

在之前的学习中,我们了解到在多线程中往往会存在很多问题,比如竞态条件、死锁、数据竞争等问题,我们可以用互斥和同步来解决这些问题。

互斥和同步是一种具体的实现方法,它侧重于线程之间对共享资源的安全访问,今天我们学习一种抽象模型:生产者消费者模型。生产者消费者模型更关注于解决整体的生产者和消费者之间的协作问题。

一、什么是生产者消费者模型

(一)概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过这个容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给容器,消费者不找生产者要数据,而是直接从容器里取,容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个容器就是用来给生产者和消费者解耦的。

生产者的主要任务是生成一些数据,并将其放入共享的缓冲区中。消费者的任务则是从缓冲区中取出数据并进行处理。这种模型的目标是确保生产者和消费者之间的协调,以避免出现数据竞争或缓冲区溢出等问题。

在实现生产者-消费者模型时,通常会使用一些同步机制,比如信号量、互斥锁或条件变量等,来确保生产者和消费者之间的正确协作。

(二)321原则

将生产者消费者模型抽象成一个超市、顾客和供应商

供应商和顾客之间不需要直接接触,通过超市就能实现资源的共享。

1.三个关系

供应商和供应商之间需要维持互斥:

向超市中放入资源时,会有多个供应商同时放入,这会导致资源覆盖或丢失、缓冲区溢出等问题,因此需要保证同一时刻只能有一个供应商向超市中放入资源。

顾客和顾客之间需要维持互斥:

跟供应商之间的关系相同,可能会有多个顾客同时从超市中取出资源。

供应商和顾客之间需要维持互斥和同步:

  • 因为供应商和顾客都需要进入超市,所以需要互斥机制使得同一时刻只能有一方进入。
  • 由于顾客必须等待供应商先向超市中放入资源才能取出资源,所以供应商和顾客之间形成了先后的顺序关系,这种关系需要利用同步机制来维持。

2. 两种角色

两种角色指的是生产者和消费者这两个角色,在我们的超市中就是供应商和顾客。

3.一个场所

一个场所指的是存放共享资源的缓冲区,超市就是这个场所。

生产者消费者模型可以用321原则来描述,通过321原则我们可以更好的理解生产者消费者模型并更容易对其编程。

(三)生产者消费者模型的优缺点

优点:

  1. 解耦合:生产者和消费者之间通过共享缓冲区进行通信,使得它们的实现相对独立。这种解耦合使得系统更易于扩展和维护。

  2. 资源利用率高:生产者和消费者可以并行执行,从而充分利用系统资源,提高系统的效率。

  3. 平衡生产和消费速率:通过合理设计缓冲区的大小和同步机制,可以平衡生产者和消费者之间的速率,避免生产者速度过快导致消费者无法处理所有数据,或者消费者速度过快导致资源浪费的问题。

  4. 简化并发编程:生产者-消费者模型提供了一种结构化的方法来管理并发编程中的数据共享与同步,简化了并发编程的复杂性。

缺点:

  1. 同步开销:实现生产者-消费者模型通常需要使用同步机制来保证生产者和消费者之间的正确协作,这可能引入额外的同步开销,影响系统的性能。

  2. 死锁和饥饿:如果同步机制设计不当,可能会导致死锁或者某些线程长时间得不到执行的饥饿问题。

  3. 缓冲区大小限制:生产者-消费者模型中的缓冲区大小需要事先确定,如果缓冲区过小,可能会导致生产者或者消费者无法及时处理数据;如果缓冲区过大,可能会占用过多的系统资源。

  4. 复杂性增加:虽然生产者-消费者模型简化了并发编程中的一些问题,但是对于初学者来说,理解和正确实现该模型仍然是有一定挑战的。

总的来说,生产者-消费者模型在适当的场景下是非常有用的,并且可以有效地提高系统的性能和可维护性,但是在使用时需要注意合理设计同步机制和缓冲区大小,以及避免死锁和饥饿等问题。

二、基于阻塞队列实现生产者消费者模型

(一)介绍

我们今天利用阻塞队列来充当生产者和消费者之间的缓冲区。阻塞队列是一种特殊类型的队列,它支持当队列为空时阻塞消费者队列,当队列为满时阻塞生产者队列。利用这个队列,我们可以有效的管理生产者和消费者之间的数据共享和同步,简化并发编程。

(二)代码实现

由于阻塞队列需要我们自己创建,我们就创建一个hpp文件用来编写阻塞队列,用vector实现其主体,利用信号量来实现阻塞。

BlockQueue.hpp

#pragma once
#include<queue>
#include<iostream>
#include<pthread.h>
#include"LockGuard.hpp"
#include"Task.hpp"const int defaultcap = 5;template<class T>
class BlockQueue
{
public:BlockQueue(int cap = defaultcap):_capacity(cap){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_p_cond,nullptr);pthread_cond_init(&_c_cond,nullptr);}bool IsFull(){return _q.size() == _capacity;}bool IsEmpty(){return _q.size() == 0;}void Push(const T &in)  //给生产者的{LockGurad lock(&_mutex);//pthread_mutex_lock(&_mutex);while(IsFull()){//阻塞等待pthread_cond_wait(&_p_cond,&_mutex);}_q.push(in);pthread_cond_signal(&_c_cond);//pthread_mutex_unlock(&_mutex);        }void Pop(T *out)  // 给消费者的{LockGurad lock(&_mutex);//pthread_mutex_lock(&_mutex);while(IsEmpty()){//阻塞等待pthread_cond_wait(&_c_cond,&_mutex);}*out = _q.front();_q.pop();pthread_cond_signal(&_p_cond);//pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_p_cond);pthread_cond_destroy(&_c_cond);}
private:std::queue<T> _q;int _capacity;pthread_mutex_t _mutex;pthread_cond_t _p_cond; // 给生产者的pthread_cond_t _c_cond; // 给消费者的// int consumer_water_line = _capacity / 3 * 2;  // 消费者水位线,当水位线上升到三分之二时就开始消费// int productor_water_line = _capacity / 3;  // 生产者水位线,当水位线下降到三分之一时就开始生产
};

在阻塞队列中,我们将锁封装为LockGuard,用互斥锁实现互斥机制。

LockGuard.hpp

#pragma once
#include<pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t *lock):_lock(lock){}void Lock(){pthread_mutex_lock(_lock);}void Unlock(){pthread_mutex_unlock(_lock);}~Mutex(){}
private:pthread_mutex_t *_lock;
};class LockGurad
{
public:LockGurad(pthread_mutex_t *lock):_mutex(lock){_mutex.Lock();}~LockGurad(){_mutex.Unlock();}
private:Mutex _mutex;
};

我们让生产者生产任务并放入队列,让消费者拿取任务并实行,任务内容为一个算术表达式。

Task.hpp

#pragma once
#include <iostream>const int defaultvalue = 0;
std::string opers = "+-*/%)(";enum
{ok = 0,div_zero,mod_zero,unknow
};
class Task
{
public:Task(){}Task(int x, int y, char op): data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok){}void Run(){switch (oper){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;case '%':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;default:code = unknow;break;}}std::string PrintTask(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=?";return s;}std::string PrintResult(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=";s += std::to_string(result);s += "[";s += std::to_string(code);s += "]";return s;}~Task(){}private:int data_x;int data_y;char oper;int result;int code;
};

 Main.cc

#include "BlockQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Task.hpp"class ThreadData
{
public:BlockQueue<Task> *bq;std::string name;
};void *consumer(void *args)
{ThreadData *td = (ThreadData *)args;// BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true){// sleep(1);Task t;// 1.消费数据  bq->Pop(&data)td->bq->Pop(&t);t.Run();// 2.进行处理std::cout << "consumer data:" << t.PrintResult() << ", " << td->name << std::endl;}return nullptr;
}
void *productor(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true){// 1.有数据int data1 = rand() % 10;usleep(rand() % 123);int data2 = rand() % 10;usleep(rand() % 123);char oper = opers[rand() % (opers.size())];Task t(data1, data2, oper);std::cout << "productor data:" << t.PrintTask() << std::endl;// 2.进行生产// bq->push(&data);bq->Push(t);sleep(1);}return nullptr;
}
int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());BlockQueue<Task> *bq = new BlockQueue<Task>();pthread_t c[3], p[2]; // 消费者和生产者ThreadData *td1 = new ThreadData();td1->bq = bq;td1->name = "thread-1";pthread_create(&c[0], nullptr, consumer, td1);ThreadData *td2 = new ThreadData();td2->bq = bq;td2->name = "thread-2";pthread_create(&c[1], nullptr, consumer, td2);ThreadData *td3 = new ThreadData();td3->bq = bq;td3->name = "thread-3";pthread_create(&c[2], nullptr, consumer, td3);pthread_create(&p[0], nullptr, productor, bq);pthread_create(&p[1], nullptr, productor, bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(c[2], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);return 0;
}

三、POSIX信号量

http://t.csdnimg.cn/mhTLS

在上面这篇博客中,我已经介绍了System V信号量,今天要讲的是POSIX信号量。

POSIX(Portable Operating System Interface)信号量是一种同步原语,用于在多线程或多进程环境中实现进程之间的同步与互斥。POSIX标准定义了一组API,使得程序员可以在各种UNIX-like操作系统上使用相同的信号量接口。


 

       #include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);

作用:

sem_init是POSIX信号量的初始化函数,用于初始化一个新的信号量对象。它的作用是创建或者初始化一个新的POSIX信号量,并设置信号量的初始值。

参数:

  • sem: 指向要初始化的信号量对象的指针。
  • pshared: 指定信号量的类型,可以是 0 表示信号量在进程内部共享,也可以是 1 表示信号量在进程间共享。
  • value: 指定信号量的初始值,即初始时信号量的计数值。

返回值:

  • 如果初始化成功,则返回 0
  • 如果出现错误,则返回 -1,并设置 errno 来指示具体的错误原因。
       #include <semaphore.h>int sem_destroy(sem_t *sem);

作用:

sem_destroy函数用于销毁一个已经初始化的POSIX信号量对象。它的作用是释放由sem_init创建的信号量对象所占用的资源,并将该信号量对象恢复到未初始化状态。

参数:

  • sem: 指向要销毁的信号量对象的指针。

返回值:

  • 如果销毁成功,则返回 0
  • 如果出现错误,则返回 -1,并设置 errno 来指示具体的错误原因。
       #include <semaphore.h>int sem_wait(sem_t *sem);

作用:

sem_wait函数用于等待一个信号量,并在信号量的值大于0时将其递减。如果信号量的值大于0,则sem_wait会将信号量的值减1,并立即返回;如果信号量的值为0,则sem_wait会阻塞当前线程,直到信号量的值大于0为止。

参数:

  • sem: 指向要等待的信号量对象的指针。

返回值:

  • 如果等待成功(即信号量的值大于0),则返回 0
  • 如果出现错误,则返回 -1,并设置 errno 来指示具体的错误原因。

 

       #include <semaphore.h>int sem_post(sem_t *sem);

作用:

sem_post函数用于增加(释放)一个信号量的值。它的作用是增加信号量的值,并唤醒一个等待该信号量的线程(如果有的话),以通知其继续执行

参数:

  • sem: 指向要增加值的信号量对象的指针。

返回值:

  • 如果增加成功,则返回 0
  • 如果出现错误,则返回 -1,并设置 errno 来指示具体的错误原因。

四、基于环形队列实现生产者消费者模型

(一)原理

在之前对共享资源的理解中,我们潜意识把共享资源看成了一个整体,那可不可以把共享资源拆成很多部分,让信号量去管理这些共享资源块,每有一个线程来申请信号量,就是将这个共享资源块分给这个线程,当所有的共享资源块都被使用了,信号量也就为0,不允许再次申请了。

这种方式称为信号量的计数模式。每个信号量计数的单位可以被看作是一个共享资源块。当一个线程需要访问共享资源时,它会申请一个信号量计数单位,如果所有的计数单位都被使用了,信号量的值就会为0,进而阻塞其他线程的申请操作,直到有其他线程释放了信号量。

利用这个理解,我们可以利用环形队列和信号量来实现生产者消费者模式。

(二)代码实现

首先我们需要创建环形队列,我们需要分别记录生产者和消费者在队列中的位置以及队列的大小,而生产者需要的资源是空间资源,消费者需要数据资源。生产者需要先对空间信号量进行P操作,然后写入数据后,对数据信号量进行V操作;而消费者则相反。所以还需要两个信号量,一个是空间信号量,一个是数据信号量。

RingQueue.hpp

#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<semaphore.h>using namespace std;const int defaultsize = 5;template<class T>
class RingQueue
{
private:void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}
public:RingQueue(int size = defaultsize):_ringqueue(size),_size(size),_p_step(0),_c_step(0){sem_init(&_space_sem,0,size);sem_init(&_data_sem,0,0);pthread_mutex_init(&_c_mutex);pthread_mutex_init(&_p_mutex);}void Push(const T &in){// 生产// 先加锁,还是先申请信号量?pthread_mutex_lock(&_p_mutex);P(_space_sem);_ringqueue[_p_step] = in;_p_step++;_p_step %= _size;V(_data_sem);pthread_mutex_unlock(&_p_mutex);}void Pop(T *out){// 消费pthread_mutex_lock(&_c_mutex);P(_data_sem);*out = _ringqueue[_c_step];_c_step++;_c_step %= _size;V(_space_sem);pthread_mutex_lock(&_c_mutex);}~RingQueue(){sem_destroy(&_space_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(_c_mutex);pthread_mutex_destroy(_p_mutex);}
private:vector<T> _ringqueue;    int _size;int _p_step;  // 生产者的生产位置int _c_step;  // 消费者的消费位置sem_t _space_sem;  // 生产者sem_t _data_sem;   // 消费者pthread_mutex_t _c_mutex;pthread_mutex_t _p_mutex;
};

生产者向队列中写入的数据我们就用之前阻塞队列中的任务

Task.hpp

#pragma once
#include <iostream>const int defaultvalue = 0;
std::string opers = "+-*/%)(";enum
{ok = 0,div_zero,mod_zero,unknow
};
class Task
{
public:Task(){}Task(int x, int y, char op): data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok){}void Run(){switch (oper){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;case '%':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;default:code = unknow;break;}}std::string PrintTask(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=?";return s;}std::string PrintResult(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=";s += std::to_string(result);s += "[";s += std::to_string(code);s += "]";return s;}~Task(){}private:int data_x;int data_y;char oper;int result;int code;
};

Main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>void *Productor(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// 数据怎么来的?// 1.有数据int data1 = rand() % 10;//usleep(rand() % 123);int data2 = rand() % 10;//usleep(rand() % 123);char oper = opers[rand() % (opers.size())];Task t(data1, data2, oper);std::cout << "productor data:" << t.PrintTask() << std::endl;rq->Push(t);//sleep(1);}
}
void *Consumer(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){Task t;rq->Pop(&t);t.Run();cout<<"consumer done,data is :"<<t.PrintResult()<<endl;}
}
int main()
{srand((uint64_t)time(nullptr)^pthread_self());pthread_t c, p;RingQueue<Task> *rq = new RingQueue<Task>();pthread_create(&c, nullptr, Consumer, rq);pthread_create(&c, nullptr, Productor, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

 (五)总结

这样我们基于阻塞队列和环形队列实现了生产者消费者模型,它们的基本思想就是将共享资源分成多个部分的方式,也就是信号量的计数模式,该模式有以下优点:

  1. 更细粒度的控制:可以更灵活地控制对共享资源的访问,避免了对整个共享资源的串行访问。
  2. 提高并发性:允许多个线程同时访问不同的共享资源块,提高了并发性和吞吐量。
  3. 减小竞争:由于资源被分割成多个部分,不同的线程可以独立地访问不同的资源块,减小了竞争的可能性。

环形队列并不一定会优于阻塞队列,我们需要根据不同的环境选择不同的实现方式,才能提高程序的效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/705982.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

yolov8 模型架构轻量化 | 极致降参数量

模型轻量化加速是深度学习领域的重要研究方向&#xff0c;旨在减小模型的体积和计算复杂度&#xff0c;从而提高在资源受限设备上的运行效率&#xff0c;模型参数量在轻量化加速中扮演着至关重要的角色。 首先&#xff0c;模型参数量直接决定了模型的复杂度和存储空间需求。随…

HNU-算法设计与分析-作业6

第六次作业【分支限界法】 文章目录 第六次作业【分支限界法】<1> 算法实现题6-2 最小权顶点覆盖问题<2> 算法实现题6-6 n后问题<3> 算法实现题6-7 布线问题 <1> 算法实现题6-2 最小权顶点覆盖问题 ▲问题重述 问题描述&#xff1a; 给定一个赋权无向…

【SQL】SQL常见面试题总结(3)

目录 1、聚合函数1.1、SQL 类别高难度试卷得分的截断平均值&#xff08;较难&#xff09;1.2、统计作答次数1.3、得分不小于平均分的最低分 2、分组查询2.1、平均活跃天数和月活人数2.2、月总刷题数和日均刷题数2.3、未完成试卷数大于 1 的有效用户&#xff08;较难&#xff09…

C++ I/O流(一)——输出流

一、IO流概念 IO流可分为输入流和输出流,用于从设备(如键盘、文件、网络等)读取数据或向设备写入数据。C++标准库提供了丰富的IO流类,包括iostream、fstream、stringstream等,分别用于处理控制台输入输出、文件输入输出和字符串流操作。 读操作:输入流中读取数据到程序中…

Spring Boot | SpringBoot 中 自定义 “用户授权管理“ : 自定义“用户访问控制“、自定义“用户登录控制“

目录: 一、SpringBoot 中 自定义 "用户授权管理" ( 总体内容介绍 ) :二、 自定义 "用户访问控制" ( 通过 "HttpSecurity类" 的 authorizeRequests( )方法来实现 "自定义用户访问控制" ) :1.基础项目文件准备2.实现 "自定义身份认…

论文阅读-《MHFormer: Multi-Hypothesis Transformer for 3D Human Pose Estimation》

目录 1 摘要 2 介绍 3 相关工作 3.1 3D HPE 3.2 ViT 3.3 多假设方法 4 MHFormer 4.1 概述 4.2 准备阶段 4.2.1 多头自注意力机制&#xff08;MSA&#xff09; 4.2.2 多层感知器&#xff08;MLP&#xff09; 4.3 MHG-多假设生成 4.3.1 概述 4.3.2 详细解释&#x…

镭速助力企业加密上传大文件

在当代的数字化社会中&#xff0c;海量数据已成为我们日常生活的一个不可分割的组成部分。尤其是对于企业来说&#xff0c;如何在互联网上安全地传输庞大数据文件&#xff0c;是一个至关重要的问题。本文将深入探讨镭速技术如何利用加密手段&#xff0c;安全地将大型数据文件上…

无需MAC,也能打开Sketch文件:多平台兼容软件介绍

Sketch是专门为Macos开发的矢量图形绘制软件&#xff0c;帮助很多设计师创作了很多优秀的作品&#xff0c;其强大的功能受到很多设计师的喜爱。但是Sketch受到Macos系统的限制&#xff0c;这也让很多设计师非常苦恼。有时候他们不能在Mac打开Sketch文件&#xff0c;那么他们能在…

Spring Boot | Spring Boot 中 自定义“用户退出控制“、获取“登录用户信息“

目录: 一、SpringBoot 中 自定义 "用户授权管理" ( 总体内容介绍 ) :二、 自定义 "用户退出控制" ( 通过 "HttpSecurity类" 的 logout( )方法来实现 "自定义用户用户登录控制" ) :1.基础项目文件准备2.实现 "自定义身份认证"…

基于WTVxxx语音芯片方案在智能小家电领域的应用介绍

一、产品市场&#xff1a; WTVxxx系列语音芯片凭借其出色的性价比&#xff0c;在小家电制造业中脱颖而出&#xff0c;它在确保优异音质及全面功能的基础上&#xff0c;大幅度削减了生产成本&#xff0c;为产品在激烈的市场竞争中赢得了价格优势&#xff0c;并为制造商拓宽了盈利…

[Algorithm][回溯][字母大小写全排列][优美的排列][N皇后]详细讲解

目录 1.字母大小写全排列1.题目链接2.算法原理详解3.代码实现 2.优美的排列1.题目链接2.算法原理详解3.代码实现 3.N 皇后1.题目链接2.算法原理详解3.代码实现 1.字母大小写全排列 1.题目链接 字母大小写全排列 2.算法原理详解 本题逻辑与子集大致相同 思路一&#xff1a;每…

爬虫界的“闪电侠”:异步爬虫与分布式系统的实战秘籍

Hi&#xff0c;我是阿佑&#xff0c;前文给大家讲了&#xff0c;如何做一个合法“采蜜”的蜜蜂&#xff0c;有了这么个自保的能力后&#xff0c;阿佑今天就将和大家踏入 —— 异步爬虫 的大门&#xff01; 异步爬虫大法 1. 引言1.1 爬虫框架的价值&#xff1a;效率与复杂度管理…