Linux使用匿名管道实现进程池得以高效通信

                                               🎬慕斯主页修仙—别有洞天

                                              ♈️今日夜电波:Nonsense—Sabrina Carpenter

                                                                0:50━━━━━━️💟──────── 2:43
                                                                    🔄   ◀️   ⏸   ▶️    ☰  

                                      💗关注👍点赞🙌收藏您的每一次鼓励都是对我莫大的支持😍


目录

思路梳理

匿名管道知识回忆

匿名管道实现进程池思路

池化技术怎么提高效率?

具体实现

进程以及管道的创建操作

分发任务操作

回收资源操作

解决上述所提到Bug

总体代码及代码效果

Makefile

Task.hpp

mulpipe.cpp

实现效果


思路梳理

匿名管道知识回忆

        上一篇的文章中,详细介绍了管道的知识点,下面还是复习一下对于匿名管道相关的知识点。如下是匿名管道的一个示例图,这表现了两个“有血缘关系”的两个进程之间通过匿名管道进行通信的过程,我们通过控制struct files *fd_array[]中读或者写的struct files的开关来实现两个进程间的通信:

        我们主要通过如下的接口创建匿名管道来实现以上的匿名管道通信:

#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码

匿名管道实现进程池思路

        说大白话(●—●):我们使用一个父进程创建很多的管道,再创建对应数量的进程,然后这些管道分别与其他的进程进行直接的连接。这样我们就提前创建和联系好了一定数量的进程。我们想让父进程向其中一个子进程发消息就可通过选择管道直接发消息,而如果不发消息其它进程只会等待。通过信息的发送,我们可以就通过选择管道从而让子进程分别执行对应的任务。大致的图解如下:

池化技术怎么提高效率?

        池化技术通过资源共享和优化来提高效率,具体表现在以下几个方面:

  • 资源复用:池化技术通过重用已创建的资源,减少了频繁创建和销毁资源的开销。例如,线程池中的线程可以被多个任务重复使用,这样可以避免每次任务执行时都创建新线程的开销。
  • 减少等待时间:池化技术可以减少请求的等待时间。因为资源是预先分配好的,当有新的请求到来时,可以立即使用池中的资源,而不需要等待资源的创建过程。
  • 提高响应速度:由于资源已经准备好,池化技术可以快速响应请求,提高了处理速度。这对于需要快速响应的系统来说尤其重要。
  • 统一管理:池化技术提供了对资源的集中管理,这有助于监控系统资源的使用情况,及时回收不再使用的资源,避免资源浪费。
  • 优化性能:在大数据处理等场景中,池化技术可以通过合并多个请求来减少数据处理的时间和空间复杂度,从而提高数据处理的性能。

具体实现

进程以及管道的创建操作

        创建一个channel类用来来存储管道的读文件描述符ctrlfd以及进程描述符workerid,完成初始化操作以及后续的销毁操作。

static int number = 1;//标识对应的进程和管道class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + std::to_string(number++);}public:int ctrlfd;pid_t workerid;std::string name;
};

        再根据先描述在组织的原则,我们在主函数使用一个std::vector<channel> channels来管理上面的结构体,可以根据需求进行增删查改等等操作。在完成这些预备操作后,创建对应数量的进程以及管道。

        特别注意:如下函数中的std::vector<int> old;以及如下代码是为了解决父子进程继承而产生的一些bug,这个将在最后解释:

        	std::vector<int> old;if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}old.push_back(pipefd[1]);

        如下函数的操作为创建管道以及进程,可以先忽略上述所提到的解决bug的代码,通过循环创建对应的子进程、关闭父子进程对应的读写文件,并且存储到vector<channel> *c(也就是主函数的中channels),下面的work()函数为子进程要做的工作,可以理解了后面的分发任务操作再来理解。

const int num = 5;//全局定义创建管道以及进程数
void CreateChannels(std::vector<channel> *c)
{std::vector<int> old;for (int i = 0; i < num; i++){// 1. 定义并创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);(void)n;// 2. 创建进程pid_t id = fork();assert(id != -1);// 3. 构建单向通信信道if (id == 0) // child{if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}close(pipefd[1]);//关闭写dup2(pipefd[0], 0);//重定向写入Work();//子进程工作exit(0); // 会自动关闭自己打开的所有的fd}// fatherclose(pipefd[0]);c->push_back(channel(pipefd[1], id));old.push_back(pipefd[1]);// childid, pipefd[1]}
}

        因为前面我们已经重定向了管道的写入作为子进程的写入,接下来通过read就会读取对应的操作,需要注意的是:我们是通过一个int类型的变量来控制要完成的任务,后续再task.hpp中会定义对应要完成的任务。通过read的返回值来判断是要执行任务还是退出,我们在管道的知识中知道,read会等待write,也就是等待数据的输入。当写进程退出,读进程会跟着退出,我们根据以上特性来判断是要执行任务还是等待任务还是退出进程:

void Work()
{while (true){int code = 0;ssize_t n = read(0, &code, sizeof(code));if (n == sizeof(code)){if (!init.CheckSafe(code))continue;init.RunTask(code);}else if (n == 0){break;}else{// do nothing}}std::cout << "child quit" << std::endl;
}

分发任务操作

        当我们创建好管道以及进程后,可以注意到接下来的都是没有经过 if (id == 0) 控制下的程序,也就是说接下来的程序都是父进程运行的程序,因此我们就可以让父进程来分配任务使得子进程完成任务。接下来,我们创建一个task.hpp来模拟创建的任务以及对应的封装、任务分发等等操作如下:

#pragma once#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <stdlib.h>// using task_t = std::function<void()>;
typedef std::function<void()> task_t;void Download()
{std::cout << "我是一个下载任务"<< " 处理者: " << getpid() << std::endl;
}void PrintLog()
{std::cout << "我是一个打印日志的任务"<< " 处理者: " << getpid() << std::endl;
}void PushVideoStream()
{std::cout << "这是一个推送视频流的任务"<< " 处理者: " << getpid() << std::endl;
}// void ProcessExit()
// {
//     exit(0);
// }class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合std::vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr) ^ getpid());}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}std::string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init; // 定义对象

        传入主函数用于管理的channels,flag用于控制进程执行完任务后是否需要退出(1表示要退出,0表示不退出),num为要执行任务的次数,需要注意的是num是要在flag为1的前提下才能有效的,如果不传则程序只会执行一次。在选择完任务以及进程后,通过write来向指定的管道写入。具体读写操作可看:Linux进程间通信(IPC)机制之一:管道(Pipes)详解:匿名管道的特性与情况

void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{int pos = 0;while (true){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// debugstd::cout << "send command " << init.ToDesc(command) << "[" << command << "]"<< " in "<< channel.name << " worker is : " << channel.workerid << std::endl;// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));// 4. 判断是否要退出if (!flag){num--;if (num <= 0)break;}sleep(1);}std::cout << "SendCommand done..." << std::endl;
}

回收资源操作

        通过close关闭对应的管道,waitpid等待子进程的结束。

const int num = 5;//全局定义创建管道以及进程数
void ReleaseChannels(std::vector<channel> c)
{// version 2// int num = c.size() - 1;// for (; num >= 0; num--)// {//     close(c[num].ctrlfd);//     waitpid(c[num].workerid, nullptr, 0);// }// version 1for (const auto &channel : c){close(channel.ctrlfd);waitpid(channel.workerid, nullptr, 0);}// for (const auto &channel : c)// {//     pid_t rid = waitpid(channel.workerid, nullptr, 0);//     if (rid == channel.workerid)//     {//         std::cout << "wait child: " << channel.workerid << " success" << std::endl;//     }// }
}

解决上述所提到Bug

        如下代码:

        	std::vector<int> old;if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}old.push_back(pipefd[1]);

        这是个什么Bug呢?如果不加上上这段代码,那么关闭进程及管道就必须从最后面生成的进程和管道向前关闭。为啥呢?这是因为当我们父进程依次创建子进程,其中的对于管道的读写操作struct file也被继承了下来,也就是说有着上一个生成的子进程会被下一个子进程的写操作struct file指向,而下下个生成的子进程会指向前面两个子进程,以此类推...因此,我们需要在创建的时候关闭新创建子进程对应的写操作。

        具体的子进程继承写操作的struct file例子如下:

总体代码及代码效果

Makefile

processpool:mulpipe.cppg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool

Task.hpp

#pragma once#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <stdlib.h>// using task_t = std::function<void()>;
typedef std::function<void()> task_t;void Download()
{std::cout << "我是一个下载任务"<< " 处理者: " << getpid() << std::endl;
}void PrintLog()
{std::cout << "我是一个打印日志的任务"<< " 处理者: " << getpid() << std::endl;
}void PushVideoStream()
{std::cout << "这是一个推送视频流的任务"<< " 处理者: " << getpid() << std::endl;
}// void ProcessExit()
// {
//     exit(0);
// }class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合std::vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr) ^ getpid());}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}std::string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init; // 定义对象

mulpipe.cpp

#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"const int num = 5;
static int number = 1;class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + std::to_string(number++);}public:int ctrlfd;pid_t workerid;std::string name;
};void Work()
{while (true){int code = 0;ssize_t n = read(0, &code, sizeof(code));if (n == sizeof(code)){if (!init.CheckSafe(code))continue;init.RunTask(code);}else if (n == 0){break;}else{// do nothing}}std::cout << "child quit" << std::endl;
}void PrintFd(const std::vector<int> &fds)
{std::cout << getpid() << " close fds: ";for(auto fd : fds){std::cout << fd << " ";}std::cout << std::endl;
}// 传参形式:
// 1. 输入参数:const &
// 2. 输出参数:*
// 3. 输入输出参数:&
void CreateChannels(std::vector<channel> *c)
{// bugstd::vector<int> old;for (int i = 0; i < num; i++){// 1. 定义并创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);(void)n;// 2. 创建进程pid_t id = fork();assert(id != -1);// 3. 构建单向通信信道if (id == 0) // child{if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}close(pipefd[1]);//关闭写dup2(pipefd[0], 0);//重定向写入Work();exit(0); // 会自动关闭自己打开的所有的fd}// fatherclose(pipefd[0]);c->push_back(channel(pipefd[1], id));old.push_back(pipefd[1]);// childid, pipefd[1]}
}void PrintDebug(const std::vector<channel> &c)
{for (const auto &channel : c){std::cout << channel.name << ", " << channel.ctrlfd << ", " << channel.workerid << std::endl;}
}void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{int pos = 0;while (true){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// debugstd::cout << "send command " << init.ToDesc(command) << "[" << command << "]"<< " in "<< channel.name << " worker is : " << channel.workerid << std::endl;// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));// 4. 判断是否要退出if (!flag){num--;if (num <= 0)break;}sleep(1);}std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannels(std::vector<channel> c)
{// version 2// int num = c.size() - 1;// for (; num >= 0; num--)// {//     close(c[num].ctrlfd);//     waitpid(c[num].workerid, nullptr, 0);// }// version 1for (const auto &channel : c){close(channel.ctrlfd);waitpid(channel.workerid, nullptr, 0);}// for (const auto &channel : c)// {//     pid_t rid = waitpid(channel.workerid, nullptr, 0);//     if (rid == channel.workerid)//     {//         std::cout << "wait child: " << channel.workerid << " success" << std::endl;//     }// }
}
int main()
{std::vector<channel> channels;// 1. 创建信道,创建进程CreateChannels(&channels);// 2. 开始发送任务const bool g_always_loop = true;// SendCommand(channels, g_always_loop);SendCommand(channels, !g_always_loop, 10);// 3. 回收资源,想让子进程退出,并且释放管道,只要关闭写端ReleaseChannels(channels);return 0;
}

实现效果


                   感谢你耐心的看到这里ღ( ´・ᴗ・` )比心,如有哪里有错误请踢一脚作者o(╥﹏╥)o! 

                                       

                                                                        给个三连再走嘛~  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/438461.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Harmony的自定义组件和Page的数据同步

在开发过程中会经常使用自定义组件,就会遇到一个问题,在页面中引入组件后,如何把改变的值传递到自定义组件中呢,这就用到了装饰器,在这是单向传递的,用的装饰器是@State和@Prop @State在page页面中监听数据的变化 @Prop在自定义组件中监听page页面传递过来的变化值,并赋…

探索设计模式的魅力:深入了解适配器模式-优雅地解决接口不匹配问题

设计模式专栏&#xff1a;http://t.csdnimg.cn/nolNS 目录 一、引言 1. 概述 2. 为什么需要适配器模式 3. 本文的目的和结构 二、简价 1. 适配器模式的定义和特点 定义 特点 2. 适配器模式的作用和适用场景 作用 适用场景 3. 适配器模式与其他设计模式的比较 三、适配…

【图形学】双三次贝塞尔曲线绘制方法

双三次贝塞尔曲线的定义 双三次贝塞尔曲面是由16个控制点定义的曲面&#xff0c;通常表示为4x4矩阵。 曲面的公式如下&#xff1a; p ( u , v ) ∑ i 0 3 ∑ j 0 3 P i , j B i , 3 ( u ) B j , 3 ( v ) , ( u , v ) ∈ [ 0 , 1 ] [ 0 , 1 ] p(u,v)\sum_{i0}^3\sum_{j0}…

Vue-40、Vue中TodoList案例

1、MyHeader.vue <template><div class"todo-header"><input type"text" placeholder"请输入你的任务名称&#xff0c;按回车键确认" v-model"title" keyup.enter"add"></div> </template>&…

类加载子系统

类加载子系统 文章目录 类加载子系统1. 内存结构概述2. 类加载器与类的加载过程2.1 类加载器ClassLoader角色2.2 类的加载过程2.2.1 加载2.2.2 链接2.2.3 初始化2.2.4 补充 3. 类加载器分类3.1 虚拟机自带的加载器3.2 用户自定义类加载器 4. 关于ClassLoader5. 双亲委派机制5.1…

Mysql-存储引擎-InnoDB

数据文件 下面这条SQL语句执行的时候指定了ENGINE InnoDB存储引擎为InnoDB: CREATE TABLE tb_album (id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 编号,title varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 相册名称,image varc…

ORB-SLAM策略思考之RANSAC

ORB-SLAM策略思考之RANSAC 1. 初始化器的RANSAC ORB-SLAM中的初始化器是一个端到端的地图初始化策略&#xff0c;即不需要人的参与双线程同时计算本质矩阵和单应性矩阵使用基于RANSAC和卡方检验的评价方法 为了保证两种算法评价的一致性&#xff0c;计算本质矩阵F和单应性矩阵…

shell脚本基础之循环语句

目录 一、循环语句的概念 二、for循环语句 1、列表循环 2、列表for循环案例大全 案例一 案例二 案例三 案例四 案例五 案例六 案例七 案例八 3、不带列表循环 4、类似C语言风格的for循环 5、for循环总结 三、while循环语句 1、while循环语句格式 2、while死循…

FreeRTOS

1.新建一个无FreeRTOS的工程&#xff0c;取名为Motor&#xff0c;根据风扇模块PDF原理图和操作文档让风扇转动 2.新建一个包含FreeRTOS的工程&#xff0c;取名为Semaphore 具体步骤&#xff1a;创建两个任务和一个共享资源&#xff0c;在两个任务中使用信号量来同时访问共享资源…

GPT store和Assistants API横空出世,AI Agent创业公司将何去何从?

Look&#xff01;&#x1f440;我们的大模型商业化落地产品&#x1f4d6;更多AI资讯请&#x1f449;&#x1f3fe;关注Free三天集训营助教在线为您火热答疑&#x1f469;&#x1f3fc;‍&#x1f3eb; 根据OpenAI发布的产品时间线&#xff0c;我们可以看到OpenAI在短短一年内迅…

2024年数学建模美赛C题(预测 Wordle)——思路、程序总结分享

1: 问题描述与要求 《纽约时报》要求您对本文件中的结果进行分析&#xff0c;以回答几个问题。 问题1&#xff1a;报告结果的数量每天都在变化。开发一个模型来解释这种变化&#xff0c;并使用您的模型为2023年3月1日报告的结果数量创建一个预测区间。这个词的任何属性是否会…

鸿蒙原生应用开发已全面启动,你还在等什么?

2019年&#xff0c;鸿蒙系统首次公开亮相&#xff0c;你们说&#xff0c;等等看&#xff0c;还不成熟&#xff1b; 2021年&#xff0c;鸿蒙系统首次在手机端升级&#xff0c;你们说&#xff0c;等等看&#xff0c;还不完善&#xff1b; 2024年&#xff0c;鸿飞计划发布&#…