基于循环队列和信号量的生产和消费者模型

这一节为什么要基于信号量来实现同一个模型,原因:

 void push(const T& in){pthread_mutex_lock(&_lock);while(is_Full()){//这里说明阻塞队列是满的,需要让生产者等待pthread_cond_wait(&_pcond,&_lock);}//这里说明阻塞队列至少有一个空位可以插入_queue.push(in);//唤醒消费者去消费pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_lock);}

在我们访问公共资源的时候,消费者需要竞争同一把锁,然后还要继续判断是否能在临界区中生产数据。如果竞争到了锁然后判断不能生产数据,则需要继续等待。竞争锁需要消耗时间,判断等待也需要,这就导致了程序效率的低下。因此信号量的存在就解决了这一个问题:在竞争锁之前就提前预知了临界资源是否就绪。

POSIX信号量

信号量就相当于一个计数器,计数了临界资源中资源的数目,接下来我们学习使用它的接口:

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0 表示线程间共享,非零表示进程间共享
value :信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减 1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1
int sem_post(sem_t *sem);//V()

基队于环形列的生产消型费模型

引入循环队列

循环队列是一个逻辑结构,是由数组演变而成。通过对数组的下标访问进行模运算变成一个循环访问,这就是循环队列。当消费者和生产者在同一位置时,可能循环队列为空,另一种可能就是生产者消费满了,消费者来不及消费。因此其余时间生产消费者都不在同一位置生产和消费数据。

实现模型的三个必要条件

1.生产者不能套消费者一圈。

2.消费者不能超越生产者。

3.如果生产者和消费者在同一个位置,队列为空让生产者先走,队列为满让消费者先走。

引入信号量

通过信号量我们可以预知知道队列中资源的数目,如果生产者套了消费者一圈,这时生产者就申请不到信号量,只有消费者能够申请,因此只有消费者可以消费,生产者必须等待。同理消费者把数据消费光和生产者在同一位置时,消费者申请不到信号量只有生产者可以,因此只有生产者可以生产而消费者必须等待,通过引入信号量满足了该模型的三个必要条件。

代码实现

简易版的代码实现:

main.cc

#include <iostream>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
#include <unistd.h>
#include "RingQueue.hpp"using namespace std;void *consumer(void *queue)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(queue);while (true){int result = 0;rq->pop(&result);cout << "消费者消费了一个数据:" << result << endl;sleep(1);}
}void *product(void *queue)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(queue);while (true){//(1);int data = rand() % 10 + 1;rq->push(data);cout << "生产者生产了一个数据:" << data << endl;}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123456);RingQueue<int> *rq = new RingQueue<int>();pthread_t c, p;pthread_create(&c, nullptr, consumer, (void *)rq);pthread_create(&p, nullptr, product, (void *)rq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete rq;return 0;
}

RingQueue.hpp 

#include <iostream>
#include <semaphore.h>
#include <vector>
#include <cassert>using namespace std;static const int gcap = 6;template <class T>
class RingQueue
{private:void P(sem_t *sem) // 申请信号量 --{int n = sem_wait(sem);assert(n == 0);(void)n;}void V(sem_t *sem) // 释放信号量 ++{int n = sem_post(sem);assert(n == 0);(void)n;}public:RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap){int n = sem_init(&_spaceSem, 0, cap);assert(n == 0);int m = sem_init(&_dataSem, 0, 0);assert(m == 0);_productStep = _consumerStep = 0;}void push(const T &in){P(&_spaceSem); // 申请空间信号量_queue[_productStep++] = in;_productStep %= _cap;V(&_dataSem); // 释放数据信号量}void pop(T *out){P(&_dataSem);*out = _queue[_consumerStep++];_consumerStep %= _cap;V(&_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);}private:std::vector<T> _queue;int _cap;sem_t _spaceSem;sem_t _dataSem;int _productStep;int _consumerStep;
};

引入计算任务:

Task.hpp

#include <iostream>
#include <functional>
#include <string>class CalTask
{
public:using func_t = std::function<int(int, int, char)>;CalTask() {}CalTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func){}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[64];snprintf(buffer, sizeof buffer, "%d %c %d =%d", _x, _op, _y, result);return buffer;}std::string to_string(){char buffer[64];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}~CalTask(){}private:int _x;int _y;char _op;func_t _callback;
};int mymath(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':if (y == 0){std::cerr << "div zero error" << std::endl;break;}result = x / y;break;case '%':if (y == 0){std::cerr << "mod zero error" << std::endl;break;}result = x % y;break;}return result;
}

RingQueue.hpp

#include <iostream>
#include <semaphore.h>
#include <vector>
#include <string>
#include <cassert>using namespace std;static const int gcap = 6;
static const string symbol ="+-*/%";template <class T>
class RingQueue
{private:void P(sem_t *sem) // 申请信号量 --{int n = sem_wait(sem);assert(n == 0);(void)n;}void V(sem_t *sem) // 释放信号量 ++{int n = sem_post(sem);assert(n == 0);(void)n;}public:RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap){int n = sem_init(&_spaceSem, 0, cap);assert(n == 0);int m = sem_init(&_dataSem, 0, 0);assert(m == 0);_productStep = _consumerStep = 0;}void push(const T &in){P(&_spaceSem); // 申请空间信号量_queue[_productStep++] = in;_productStep %= _cap;V(&_dataSem); // 释放数据信号量}void pop(T *out){P(&_dataSem);*out = _queue[_consumerStep++];_consumerStep %= _cap;V(&_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);}private:std::vector<T> _queue;int _cap;sem_t _spaceSem;sem_t _dataSem;int _productStep;int _consumerStep;
};

main.cc

#include <iostream>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
#include <unistd.h>
#include "RingQueue.hpp"
#include "Task.hpp"using namespace std;void *consumer(void *queue)
{RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(queue);while (true){CalTask result ;rq->pop(&result);cout << "消费者处理了一个任务:" << result() << endl;sleep(1);}
}void *product(void *queue)
{RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(queue);while (true){int x =rand()%10;int y =rand()%10;char op =symbol[rand()%symbol.size()];CalTask t(x,y,op, mymath);rq->push(t);cout << "生产者生产了一个任务:" << t.to_string() << endl;}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123456);RingQueue<CalTask> *rq = new RingQueue<CalTask>();pthread_t c, p;pthread_create(&c, nullptr, consumer, (void *)rq);pthread_create(&p, nullptr, product, (void *)rq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete rq;return 0;
}

多线程多并发执行:

想要实现多线程并发的进行消费和生产,我们只需要保证临界资源的访问安全,因此我们可以在访问临界资源的时候加上锁:

但是这么加锁有一个问题:当我们申请锁成功后才能申请信号量,锁只有一把需要多个线程竞争后才能交付并且申请信号量也需要花费时间。因此我们可以先申请信号量,让多个线程先预定好共享资源中的资源,然后再申请锁进行临界资源的访问,这样做提高了程序的效率。

最终代码:

//RingQueue.hpp#include <iostream>
#include <semaphore.h>
#include <vector>
#include <string>
#include <cassert>using namespace std;static const int gcap = 6;
static const string symbol ="+-*/%";template <class T>
class RingQueue
{private:void P(sem_t *sem) // 申请信号量 --{int n = sem_wait(sem);assert(n == 0);(void)n;}void V(sem_t *sem) // 释放信号量 ++{int n = sem_post(sem);assert(n == 0);(void)n;}public:RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap){int n = sem_init(&_spaceSem, 0, cap);assert(n == 0);int m = sem_init(&_dataSem, 0, 0);assert(m == 0);_productStep = _consumerStep = 0;pthread_mutex_init(&_pmutex,nullptr);pthread_mutex_init(&_cmutex,nullptr);}void push(const T &in){P(&_spaceSem); // 申请空间信号量pthread_mutex_lock(&_pmutex);_queue[_productStep++] = in;_productStep %= _cap;pthread_mutex_unlock(&_pmutex);V(&_dataSem); // 释放数据信号量}void pop(T *out){P(&_dataSem);pthread_mutex_lock(&_cmutex);*out = _queue[_consumerStep++];_consumerStep %= _cap;pthread_mutex_unlock(&_cmutex);V(&_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}private:std::vector<T> _queue;int _cap;sem_t _spaceSem;sem_t _dataSem;int _productStep;int _consumerStep;pthread_mutex_t _pmutex;pthread_mutex_t _cmutex;
};//Task.hpp#include <iostream>
#include <functional>
#include <string>class CalTask
{
public:using func_t = std::function<int(int, int, char)>;CalTask() {}CalTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func){}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[64];snprintf(buffer, sizeof buffer, "%d %c %d =%d", _x, _op, _y, result);return buffer;}std::string to_string(){char buffer[64];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}~CalTask(){}private:int _x;int _y;char _op;func_t _callback;
};int mymath(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':if (y == 0){std::cerr << "div zero error" << std::endl;break;}result = x / y;break;case '%':if (y == 0){std::cerr << "mod zero error" << std::endl;break;}result = x % y;break;}return result;
}//main.cc#include <iostream>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
#include <unistd.h>
#include "RingQueue.hpp"
#include "Task.hpp"using namespace std;string SelfName()
{char name[64];snprintf(name,sizeof name,"thread[0x%x]",pthread_self());return name;
}void *consumer(void *queue)
{RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(queue);while (true){CalTask result ;rq->pop(&result);cout << SelfName()<<"消费者处理了一个任务:" << result() << endl;sleep(1);}
}void *product(void *queue)
{RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(queue);while (true){int x =rand()%10;int y =rand()%10;char op =symbol[rand()%symbol.size()];CalTask t(x,y,op, mymath);rq->push(t);cout << SelfName()<<"生产者生产了一个任务:" << t.to_string() << endl;}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123456);RingQueue<CalTask> *rq = new RingQueue<CalTask>();pthread_t c[10], p[6];for(int i =0;i<10;++i)pthread_create(c+i, nullptr, consumer, (void *)rq);for(int i =0;i<6;++i)pthread_create(p+i, nullptr, product, (void *)rq);for(int i =0;i<10;++i)pthread_join(c[i], nullptr);for(int i =0;i<6;++i)pthread_join(p[i], nullptr);delete rq;return 0;
}

这里所说的消费者和生产者模型的高效性和上一节基于阻塞队列的是一样的,如果不清楚的话可以点击《基于阻塞队列的生产和消费模型》进行复习!!!到这里生产者和消费者模型讲解就结束了,下一节我们学习线程池,敬请期待!!!!!记得点赞多多支持喔!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/100894.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云备份——实用类工具实现

一&#xff0c;文件实用类设计实现 不管是客户端还是服务端&#xff0c;文件的传输备份都涉及到文件的读写&#xff0c;包括数据管理信息的持久化也是如此&#xff0c;因此首先设计封装文件操作类&#xff0c;这个类封装完毕之后&#xff0c;则在任意模块中对文件进行操作时都将…

[机器学习]分类算法系列①:初识概念

目录 1、概念 2、数据集介绍与划分 2.1、数据集的划分 2.2、sklearn数据集介绍 2.2.1、API 2.2.2、分类和回归数据集 分类数据集 回归数据集 返回类型 3、sklearn转换器和估计器 3.1、转换器 三种方法的区别 3.2、估计器 3.2.1、简介 3.2.2、API 3.3、工作流程 …

static关键字

static 是Java中的一个关键字&#xff0c;它可以用于修饰类的成员变量和方法&#xff0c;具有特殊的含义和用途。下面是关于static关键字的主要用法和含义&#xff1a; 静态变量&#xff08;Static Variables&#xff09;&#xff1a; 静态变量也称为类变量&#xff0c;它们属于…

ZDH-权限模块

本次介绍基于ZDH v5.1.2版本 目录 项目源码 预览地址 安装包下载地址 ZDH权限模块 ZDH权限模块-重要名词划分 ZDH权限模块-菜单管理 ZDH权限模块-角色管理 ZDH权限模块-用户配置 ZDH权限模块-权限申请 项目源码 zdh_web: GitHub - zhaoyachao/zdh_web: 大数据采集,抽…

【Apollo】Apollo的入门介绍

阿波罗是百度发布的名为“Apollo&#xff08;阿波罗&#xff09;”的向汽车行业及自动驾驶领域的合作伙伴提供的软件平台。 帮助汽车行业及自动驾驶领域的合作伙伴结合车辆和硬件系统&#xff0c;快速搭建一套属于自己的自动驾驶系统。 百度开放此项计划旨在建立一个以合作为中…

idea VCS配置多个远程仓库

Idea VCS配置多个远程仓库 首先要有连个远程仓库地址 idea 添加数据源 查看推送记录 添加数据源 ok之后填写账号密码 推送本地项目 选择不同远程地址 push 查看不同远程地址的 不同分支的 推送记录 不期而遇的温柔&#xff1a; 应用开源架构进行项目开发&#xff0c;特别是那…

Unity3D开发流程及注意事项

使用Unity3D开发游戏需要遵循一定的流程和注意事项&#xff0c;以确保项目的顺利进行并获得良好的结果。以下是一般的游戏开发流程以及一些注意事项&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 游…

RabbitMQ高级特性

目录 消息的可靠投递confirm和return Consumer Ack 消费端限流 TTL Time To Live&#xff08;存活时间/过期时间&#xff09; 死信队列&#xff08;死信交换机&#xff09; 延迟队列 日志与监控 rabbitmqctl管理和监控 消息追踪 消息的可靠投递confirm和return 持久…

2023全国大学生数学建模A题B题C题D题E题竞赛选题建议,思路模型

目录 国赛数学建模思路模型代码&#xff1a;9.7开赛后第一时间更新&#xff0c;完整思路获取见文末名片 一、题目选择 二、国赛摘要及论文写作技巧 1、国赛摘要 2、论文写作技巧 三、历年国赛真题及对应算法模型 完整国赛题思路模型获取见此 国赛数学建模思路模型代码&am…

[machine learning]误差分析,模型分析

1.目的是什么 当我们找到一个算法去计算某些东西的时候,我们通常要对这个算法进行一定的分析,比如时间复杂度,空间复杂度(前者更加重要),来进行比较,判断一个算法的优劣性. 对于一个训练的模型来说,同样需要某种模型来进行分析,例如代价函数等等,通过比较拟合程度,正确精度等…

AR眼镜: 与人并行的智能伙伴

AR眼镜&#xff1a; 与人并行的智能伙伴 增强现实&#xff08;Augmented Reality&#xff0c;AR&#xff09;眼镜是一种将虚拟信息与真实世界进行融合的设备&#xff0c;通过眼镜或头戴设备让用户能够看到真实世界&#xff0c;并在其上叠加数字内容和图像。目前工业级AR眼镜已…

vue表格不显示列号123456

我在网上找了半天&#xff0c;都是如何添加列号123456的&#xff0c;没有找到不显示列号的参考&#xff0c;现在把这个解决了&#xff0c;特此记录一下。 没有加右边的就会显示&#xff0c;加上右边的就隐藏了