生产消费者模型

生产消费者模型概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而 通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者 要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队 列就是用来给生产者和消费者解耦的。

321原则

  • 三种关系:生产者之间互斥,消费者之间互斥,生产者和消费者之间互斥和同步关系
  • 两种角色:生产者线程和消费者线程
  • 一个交易场所:一个特定结构的缓冲区

生产者消费者模型优点

解耦  支持并发  支持忙闲不均 

基于阻塞队列的生产消费模型

实现一个demo:

创建3个线程,线程1当作队列1的生产者,线程2既作为队列1的消费者又是队列2的生产者,线程3则是队列2的消费者。给每个线程分配任务实现生产消费模型。

        
BlockQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>const int gmaxcap = 5;template <class T>
class BlockQueue
{
public:BlockQueue(const int &maxcap = gmaxcap) : _maxcap(maxcap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}void push(const T &in) // 输入型参数,const &{pthread_mutex_lock(&_mutex);// 充当条件判断的语法必须是while,不能用ifwhile (is_full()){// 细节:pthread_cond_wait这个函数的第二个参数,必须是我们正在使用的互斥锁!// a. pthread_cond_wait: 该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起// b. pthread_cond_wait: 该函数在被唤醒返回的时候,会自动的重新获取你传入的锁pthread_cond_wait(&_pcond, &_mutex); // 因为生产条件不满足,无法生产,此时我们的生产者进行等待}_q.push(in);pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_mutex);}void pop(T *out) // 输出型参数:*, // 输入输出型:&{pthread_mutex_lock(&_mutex);while (is_empty()){pthread_cond_wait(&_ccond, &_mutex);}*out = _q.front();_q.pop();pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}private:bool is_empty(){return _q.empty();}bool is_full(){return _q.size() == _maxcap;}private:std::queue<T> _q;int _maxcap; // 队列中元素的上限pthread_mutex_t _mutex;pthread_cond_t _pcond; // 生产者对应的条件变量pthread_cond_t _ccond; // 消费者对应的条件变量
};
MainCp.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <sys/types.h>
#include <unistd.h>
#include <ctime>//C:计算
//S: 存储
template<class C, class S>
class BlockQueues
{
public:BlockQueue<C> *c_bq;BlockQueue<S> *s_bq;
};void *productor(void *bqs_)
{BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->c_bq;while (true){//生产活动int x = rand() % 100 + 1; int y = rand() % 10;int operCode = rand() % oper.size();CalTask t(x, y, oper[operCode], mymath);bq->push(t);std::cout << "productor thread, 生产计算任务: " << t.toTaskString() << std::endl;sleep(1);}return nullptr;
}void *consumer(void *bqs_)
{BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->c_bq;BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;while (true){// 消费活动CalTask t;bq->pop(&t);std::string result = t(); std::cout << "cal thread,完成计算任务: " << result << " ... done"<< std::endl;SaveTask save(result, Save);save_bq->push(save);std::cout << "cal thread,推送存储任务完成..." << std::endl; //sleep(1);}return nullptr;
}void *saver(void *bqs_)
{BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;while(true){SaveTask t;save_bq->pop(&t);t();std::cout << "save thread,保存任务完成..." << std::endl; }return nullptr;
}int main()
{srand((unsigned long)time(nullptr) ^ getpid());BlockQueues<CalTask, SaveTask> bqs;bqs.c_bq = new BlockQueue<CalTask>();bqs.s_bq = new BlockQueue<SaveTask>();pthread_t c, p, s;pthread_create(&p, nullptr, productor, &bqs);pthread_create(&c, nullptr, consumer, &bqs);pthread_create(&s, nullptr, saver, &bqs);pthread_join(p, nullptr);pthread_join(c, nullptr);pthread_join(s, nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0;
}

Task.hpp
#pragma once#include <iostream>
#include <string>
#include <cstdio>
#include <functional>class CalTask
{using func_t = std::function<int(int,int,char)>;// typedef std::function<int(int,int)> func_t;
public:CalTask(){}CalTask(int x, int y, char op, func_t func):_x(x), _y(y), _op(op), _callback(func){}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}
private:int _x;int _y;char _op;func_t _callback;
};const std::string oper = "+-*/%";int mymath(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}elseresult = x / y;}break;case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}elseresult = x % y;}break;default:break;}return result;
}class SaveTask
{typedef std::function<void(const std::string&)> func_t;
public:SaveTask(){}SaveTask(const std::string &message, func_t func): _message(message), _func(func){}void operator()(){_func(_message);}
private:std::string _message;func_t _func;
};void Save(const std::string &message)
{const std::string target = "./log.txt";FILE *fp = fopen(target.c_str(), "a+");if(!fp){std::cerr << "fopen error" << std::endl;return;}fputs(message.c_str(), fp);fputs("\n", fp);fclose(fp);
}

生产消费模型高效在哪

可以在生产之前和消费之后,让线程并行执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/411153.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pygame学习(三)——支持多种类型的事件

大家好&#xff01;我是码银&#x1f970; 欢迎关注&#x1f970;&#xff1a; CSDN&#xff1a;码银 公众号&#xff1a;码银学编程 实时事件循环 为了保证程序的持续刷新、保持打开的状态&#xff0c;我们会创建一个无限循环&#xff0c;通常使用的是while语句&#xff0c;w…

Apache POI 导出Excel报表

大家好我是苏麟 , 今天聊聊Apache POI . Apache POI 介绍 Apache POI 是一个处理Miscrosoft Office各种文件格式的开源项目。简单来说就是&#xff0c;我们可以使用 POI 在 Java 程序中对Miscrosoft Office各种文件进行读写操作。 一般情况下&#xff0c;POI 都是用于操作 E…

实时云渲染服务:流式传输 VR 和 AR 内容

想象一下无需专用的物理计算机&#xff0c;甚至无需实物连接&#xff0c;就能获得高质量的 AR/VR 体验是种什么样的体验&#xff1f; 过去&#xff0c;与 VR 交互需要专用的高端工作站&#xff0c;并且根据头显、壁挂式传感器和专用的物理空间。VR 中的复杂任务会突破传感器范…

路由器初始化配置、功能配置

实验环境 拓扑图 Ip规划表&#xff08;各组使用自己的IP规划表&#xff09; 部门 主机数量 网络地址 子网掩码 网关 可用ip Vlan 市场部 38 192.168.131.0 255.255.255.0 192.168.131.1 2-254 11 研发部 53 192.168.132.0 255.255.255.0 192.168.132.1 2-2…

appium之联动pycharm

前置条件&#xff1a; 1.java环境安装好了 2.android-sdk安装好&#xff08;uiautomatorviewer 也可以把这个启动起来&#xff09; 3.appium安装好 4.adb devices查看下设备是否连接 pycharm入门代码--固定写法 from appium import webdriver# 定义字典变量 desired_caps …

【机器学习300问】9、梯度下降是用来干嘛的?

当你和我一样对自己问出这个问题后&#xff0c;分析一下&#xff01;其实我首先得知道梯度下降是什么&#xff0c;也就它的定义。其次我得了解它具体用在什么地方&#xff0c;也就是使用场景。最后才是这个问题&#xff0c;梯度下降有什么用&#xff1f;怎么用&#xff1f; 所以…

QT 原生布局和QML的区别

一、QML 与 Qt Quick的区别 1.1 从概念上区分 为了更精确地对两者进行说明&#xff0c;先看助手对 QML 的描述&#xff1a; QML is a user interface specification and programming language. QML 是一种用户界面规范和标记语言&#xff0c;允许开发人员和设计师创建高性能、流…

[go语言]输入输出

目录 知识结构 输入 1.Scan ​编辑 2.Scanf 3.Scanln 4.os.Stdin --标准输入&#xff0c;从键盘输入 输出 1.Print 2.Printf 3.Println 知识结构 输入 为了展示集中输入的区别&#xff0c;将直接进行代码演示。 三者区别的结论&#xff1a;Scanf格式化输入&#x…

Django项目中的默认文件都有什么用

manager.py&#xff1a; 是django用于管理本项目的命令行工具&#xff0c;之后进行站点运行&#xff0c;数据库自动生成等都是通过本文件完成。 djangoStudy/__init__.py&#xff1a; 告诉python该目录是一个python包&#xff0c;暂无内容&#xff0c;后期一些工具的初始化可…

从零开始的源码搭建:详解连锁餐饮行业中的点餐小程序开发

时下&#xff0c;点餐小程序成为了许多餐饮企业引入的一种创新工具&#xff0c;不仅方便了顾客的用餐体验&#xff0c;同时也提高了餐厅的运营效率。本文将详细探讨如何从零开始搭建一个源码&#xff0c;并深入解析连锁餐饮行业中的点餐小程序开发过程。 一、需求分析与规划 在…

Kafka-消费者-Consumer Group Rebalance设计

在同一个Consumer Group中&#xff0c;同一个Topic的不同分区会分配给不同的消费者进行消费&#xff0c;那么为消费者分配分区的操作是在Kafka服务端完成的吗?分区是如何进行分配呢?下面来分析Rebalance操作的原理。 方案一 Kafka最开始的解决方案是通过ZooKeeper的Watcher…

一条sql是如何运行的

在我们平时使用sql的时候&#xff0c;基本是基于黑盒的使用方式&#xff0c;在客户端输入一条sql语句&#xff0c;然后回显想要的数据&#xff0c;对于mysql server端内部如何运行的以及与存储引擎如何交互的不得而知。 通过下面一幅图&#xff0c;大致描述客户端和服务端交互…

利用IP应用场景API识别真实用户

引言 在当今数字化时代&#xff0c;随着互联网的普及和应用的广泛&#xff0c;验证用户身份的重要性变得越来越突出。在许多场景中&#xff0c;特别是在涉及安全性、用户体验以及个人隐私保护方面&#xff0c;确定用户的真实身份至关重要。而IP应用场景API则是一种强大的工具&…

FreeRTOS学习第7篇--周期性延迟和相对性延迟函数

目录 FreeRTOS学习第7篇--周期性延迟和相对性延迟函数时间延迟vTaskDelay函数原型vTaskDelayUntil函数原型PrintTask_Task任务相关代码片段实验现象本文中使用的测试工程 FreeRTOS学习第7篇–周期性延迟和相对性延迟函数 本文目标&#xff1a;学习与使用FreeRTOS中的延迟函数&…

Spring Boot - Application Events 的发布顺序_ApplicationFailedEvent

文章目录 Pre概述Code源码分析 Pre Spring Boot - Application Events 的发布顺序_ApplicationEnvironmentPreparedEvent 概述 Spring Boot 的广播机制是基于观察者模式实现的&#xff0c;它允许在 Spring 应用程序中发布和监听事件。这种机制的主要目的是为了实现解耦&#…

选择安全数据交换系统时 要考虑哪些因素?

安全数据交换系统是一种专门设计用于在不同的网络环境&#xff08;如内部不同网络&#xff0c;内部网络和外部网络&#xff09;之间安全传输数据的解决方案。它通常包括一系列的技术和流程&#xff0c;旨在确保数据在传输过程中的完整性、机密性和可用性。 安全数据交换系统可以…

Maven工程 — 继承与聚合 相关知识点详解

简介&#xff1a;这篇帖子主要讲解Maven工程中的继承与聚合的相关知识点&#xff0c;用简洁的语言和小编自己的理解&#xff0c;深入浅出的说明Maven工程的继承与聚合。 目录 1、继承 1.1 继承关系的实现 1.2 版本锁定 2、聚合 2.1 聚合方法 3、总结 1、继承 图 1-1 继承…

springBoot项目打包发布

打包 项目代码编写完成后&#xff0c;在pom.xml文件中引用打包的插件&#xff1a; <!-- 打包插件坐标--><build><plugins><!--打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-mave…

vue 指定区域可拖拽的限定拖拽区域的div(如仅弹窗标题可拖拽的弹窗)

<template><div class"container" ref"container"><div class"drag-box" v-drag><div class"win_head">弹窗标题</div><div class"win_content">弹窗内容</div></div><…

探索设计模式的魅力:抽象工厂模式的艺术

抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;是一种创建型设计模式&#xff0c;用于在不指定具体类的情况下创建一系列相关或相互依赖的对象。它提供了一个接口&#xff0c;用于创建一系列“家族”或相关依赖对象&#xff0c;而无需指定它们的具体类。 主要参…