linux入门---用匿名管道实现一个功能

前言

在之前的学习中我们知道通信的概念和匿名管道的使用,那么接下来我们就要用匿名管道来实现一个功能,首先我们有很多的函数需要被执行,然后创建一些子进程通过匿名管道方式给子进程传递一些信息,然后子进程就根据这些信息来确定要执行的函数并执行,比如说下面的图片:
在这里插入图片描述
那么接下来我们就要一步一步的实现这个功能。

第一步:创建子进程并连接管道

首先我们要定义一个宏用来表示要创建多少个子进程来执行不同的任务,然后在main函数里面首先创建一个for循环用来不停的创建子进程,在循环的开始先创建一个数组用来记录管道的写端和读端,然后使用pipe函数创建一个匿名管道,因为管道可能会创建失败,所以创建一个变量用来记录pipe函数的返回值,如果变量的值等于0的话就断言一下,然后就可以使用fork函数来创建子进程,并根据fork函数的返回值来将父子进程执行的代码进行分开,那么这里的代码如下:

#include<iostream>
#include<unistd.h>
#include<cassert>
#define PROCSS_NUM 3
using namespace std;
int main()
{for(int i=0;i<PROCSS_NUM;i++){int fds[2]={0};int n=pipe(fds);assert(n==0);(void)n;pid_t id=fork();if(id==0){//这里是子进程执行的代码}//这里是父进程执行的代码}return 0;
}

然后在子进程里面干的第一件事就是关闭管道文件的写端,并且创建一个while循环,在循环里面要不停的读取管道里面的内容并且根据读取的内容执行对应的函数,那么这里的代码如下:

 if(id==0){//这里是子进程执行的代码close(fds[1]);while(true){//读取管道里面的内容//根据管道的内容执行对应的函数}}

因为这里存在多个子进程每个子进程都要有对应的写端 pid 和名字,所以为了方便描述这些子进程我们可以创建一个结构体来对其进行描述:

class subEp // 描述子进程
{
public:subEp(pid_t subId, int writeFd): subId_(subId), writeFd_(writeFd){char nameBuffer[1024];snprintf(nameBuffer, sizeof nameBuffer, "process-%d[pid(%d)-fd(%d)]", num++, subId_, writeFd_);//给子进程创建名字name_ = nameBuffer;}public:static int num;//用于区别每个子进程std::string name_;//记录子进程的名字pid_t subId_;//子进程的idint writeFd_;//子进程的写端
};

那么创建完子进程之后父进程就得关闭管道的读端并且创建一个描述子进程的结构体对象,因为子进程存在多个所以描述子进程的对象也会存在多个,所以为了方便管理我们就得创建一个vector对象来管理结构体对象,那么目前的代码就如下:

#include<iostream>
#include<unistd.h>
#include<cassert>
#include<string>
#include<vector>
#define PROCSS_NUM 10
using namespace std;
class subEp // 描述子进程
{
public:subEp(pid_t subId, int writeFd): subId_(subId), writeFd_(writeFd){char nameBuffer[1024];snprintf(nameBuffer, sizeof nameBuffer, "process-%d[pid(%d)-fd(%d)]", num++, subId_, writeFd_);//给子进程创建名字name_ = nameBuffer;}public:static int num;//用于区别每个子进程std::string name_;//记录子进程的名字pid_t subId_;//子进程的idint writeFd_;//子进程的写端
};int subEp::num = 0;
int main()
{vector<subEP> subs;for(int i=0;i<PROCSS_NUM;i++){int fds[2]={0};int n=pipe(fds);assert(n==0);(void)n;pid_t id=fork();if(id==0){//这里是子进程执行的代码close(fds[1]);while(true){//读取管道里面的内容//根据管道的内容执行对应的函数}}//这里是父进程执行的代码close[fds[0]];subs.push_back(subEP(id,fds[1]));}return 0;
}

我们上面写的是子进程和管道的创建过程,那么我们可以把这个模块写成一个函数,因为这个函数要对subs对象进行修改,并且子进程还有执行对应的函数,所以这个函数需要两个参数,一个是指向vector对象的指针,一个是指向函数集合的引用,那么这里的代码如下:

void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{for(int i=0;i<PROCSS_NUM;i++){int fds[2]={0};int n=pipe(fds);assert(n==0);(void)n;pid_t id=fork();if(id==0){//这里是子进程执行的代码close(fds[1]);while(true){//读取管道里面的内容//根据管道的内容执行对应的函数}}//这里是父进程执行的代码close[fds[0]];subs.push_back(subEP(id,fds[1]));}
}

第二步:建立被执行函数的集合

这里我们就可以创建很多函数,并且每个函数里面我们都可以添加一个打印一些话用来表示当前函数已经被执行了,那么这里的代码如下:

void downLoadTask()
{std::cout << getpid() << ": 下载任务\n"<< std::endl;sleep(1);
}void ioTask()
{std::cout << getpid() << ": IO任务\n"<< std::endl;sleep(1);
}void flushTask()
{std::cout << getpid() << ": 刷新任务\n"<< std::endl;sleep(1);
}

然后我们可以对函数指针进行重名,并在main函数里面创建一个函数指针的vector对象,然后创建一个函数用于将这些函数加载进vector对象,那么这里的代码如下:

typedef void (*func_t)();
void loadTaskFunc(std::vector<func_t> *out)
{assert(out);out->push_back(downLoadTask);out->push_back(ioTask);out->push_back(flushTask);
}
int main()
{vector<func_t> funcMap;loadTaskFunc(&funcMap);vector<subEP> subs;//创建子进程createSubProcess(subs,funcmap);return 0;
}

到这里我们的子进程创建完了,管道也和进程之间连接成功了,那么接下来我们就要让父进程往管道里面发送信息。

第三步:父进程发送信息

函数创建完成之后父进程就可以给子进程发送信息,那么这里我们可以创建一个专门给子进程发送信息的函数,但是在发送信息之前得先选择一个子进程进行发送吗,因为我们要多次发送信号,所以在发送信号的时候我们得保证子进程选着的随机性和公平性,保证每个进程接收到的信号数目是差不多,所以这里我们这里采用随机数的方式来选择子进程,首先设计一个时间戳:

srand((unsigned long)time(nullptr) ^ getpid() ^ 0x171237 ^ rand() % 1234);

然后创建一个子进程的控制函数,这个函数需要三个参数:子进程的合集,函数的合集,信号的个数,那么函数的声明就如下:

void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{}

首先得到子进程和预处理函数的个数,然后这里我们做一个约定了,如果count的值一开始就被设计为0话就表明这里要不停的发送信号并处理函数,那么这里的代码如下:

void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{int processnum=subs.num();int tasknum=funcmap.num();bool forver =(count==0? true:false);while(true){}
}

在while循环里面我们就要获得一个随机数,然后通过模上processnum值形式来限制其范围,得到随机数之后就可以通过subs对象找到对应的子进程并获得该进程对应管道的写端,然后就可以往该管道发送数据,管道随机的同时还得保证信号的随机,所以我们还得再创建一个随机数用来保证信号的随机,然后选着进程发送信号,那么这里我们可以创建一个函数来完成发送信号的过程,该函数需要一个描述子进程的结构体和一个表示函数集的下标,那么这里的代码如下:

void sendTask(const subEp &process, int taskNum)
{}
void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{int processnum=subs.num();int tasknum=funcmap.num();bool forever =(count==0? true:false);while(true){int subIdx=rand()%processnum;int taskIdx=rand()%tasknum;sendTask(subs[subIdx], taskIdx);}
}

那么在sendTask函数里面我们就可以打印一句话用来表示我们已经调用过该函数,然后就调用write函数往管道里面写入函数的下标,因为我们每次往管道里面就写入一个整型,write函数的返回值为写入数据的大小,所以我们这里可以拿一个参数接收write函数的返回值,并用assert函数进行一下判断,那么这里的代码如下:

void sendTask(const subEp &process, int taskNum)
{cout << "send task num: " << taskNum << " send to -> " << process.name_ << endl;int n=write(process.writeFd_,tasknum);assert(n==sizeof(int));(void)n;
}

将信号发送完之后我们就得对count的值进行一下判断,如果forever的值为fasle的话就对count的值进行减一,如果减完后count的值为0的话就用break结束循环,循环结束之后就表明信号全部发送完了,那么这时我们就可以通过for循环关闭父进程对应的写端,那么这里的代码如下:

void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{int processnum=subs.num();int tasknum=funcmap.num();bool forever =(count==0? true:false);while(true){int subIdx=rand()%processnum;int taskIdx=rand()%tasknum;sendTask(subs[subIdx], taskIdx);}if(!forever){--count;if(count==0){break;}}for(int i=0;i<process;i++){close(subs[i].writeFd_);}
}

那么这就是信号发送的有关功能的实现。

第四步:信号的接收和实行函数

信号发送之后我们就可以创建一个函数来接收函数,在之前创建子进程的时候我们还没有将函数实现完:

void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{for(int i=0;i<PROCSS_NUM;i++){int fds[2]={0};int n=pipe(fds);assert(n==0);(void)n;pid_t id=fork();if(id==0){//这里是子进程执行的代码close(fds[1]);while(true){//读取管道里面的内容//根据管道的内容执行对应的函数}}//这里是父进程执行的代码close[fds[0]];subs.push_back(subEP(id,fds[1]));}
}

那么这里我们就可以再创建一个函数用来接收管道里面的信号,这个函数需要一个参数用来表示子进程的读端,返回值表示要执行的函数下标,那么函数的声明如下:

int recvTask(int readFd)
{}

函数的开始创建一个code变量用来记录信号的值,然后使用read函数读取信号的值,read的第一个参数为对应管道的读端,第二个参数传递code变量的地址,第三个参数表示读取数据的大小那么这里就是4,因为读取的结果可能会出问题,所以我们创建一个变量用来记录该函数的返回值,如果等于4就直接返回code,如果小于等于0就返回-1其他情况就返回0,那么这里的代码如下:

int recvTask(int readFd)
{int code=0;ssize_t  s=read(fds[0],&code,sizeof(int));if(s==4)    {return code;}else if(s<=0)   {return -1;}else    {return 0}
}

然后在createSubProcess函数里面我们就先创建一个变量接收recvTask函数的返回值,然后进行判断如果返回值为-1的话就直接退出,其他情况就正常执行:

 while(true){//读取管道里面的内容int commondCode=recvTask(fds[0]);//根据管道的内容执行对应的函数if(commondCode>=0&&commondCode<funcMap.size()){funcMap[commandCode]();}else if(commandCode == -1) {break;}}

第五步:子进程等待函数

我们先完善一下main函数的内容:

int main()
{vector<func_t> funcMap;loadTaskFunc(&funcMap);vector<subEP> subs;//创建子进程createSubProcess(subs,funcmap);return 0;
}

首先加载函数然后创建子进程,子进程创建之后读端就已经开始等待写端信号传递过来了,那么这时我们就得控制一下子进程往里面发送信号,所以这个时候就得调用loadBlanceContrl函数发送信号:

int main()
{vector<func_t> funcMap;loadTaskFunc(&funcMap);vector<subEP> subs;//创建子进程createSubProcess(subs,funcmap);int count=10;loadBlanceContrl(subs,funcMap,count);return 0;
}

等loadBlanceContrl函数执行完成之后,读端也就已经关闭了,读端关闭了也就不会再有信号的产生,所以到这里我们还只需干一件事就是使用waitpid函数回收子进程的信息,那么我们可以再创建一个函数进行回收:

void waitProcess(std::vector<subEp> processes)
{int processnum = processes.size();for(int i = 0; i < processnum; i++){waitpid(processes[i].subId_, nullptr, 0);std::cout << "wait sub process success ...: " << processes[i].subId_ << std::endl;}
}
int main()
{vector<func_t> funcMap;loadTaskFunc(&funcMap);vector<subEP> subs;//创建子进程createSubProcess(subs,funcmap);int count=10;loadBlanceContrl(subs,funcMap,count);waitProcess(subs);return 0;
}

程序的测试

PROCSS_NUM的值为3,并且count的值为10,所以当前程序运行起来之后会创建出3个子进程然后一共执行10个任务,那么运行的现象就是这样:
在这里插入图片描述
可以看到这里创建了三个子进程来处理不同的函数任务,等函数的执行个数达到10个的时候就停止了执行函数,最后对创建的三个子进程进行回收:
在这里插入图片描述

程序的bug

程序的运行符合我们的预期但是这里存在一个隐藏的bug,我们创建管道的时候是以读和写的方式来打开一个管道,后来我们会关闭父进程的读端和子进程的写段,然后子进程就可以只从管道读取数据,父进程从管道里面写入数据,当我们创建子进程的时候子进程会继承父进程的文件描述符表,可是这里会出现问题,当我们创建第一个子进程的时候父进程会指向一号管道的写段,子进程会指向1号管道的读端,当我们创建第二个子进程的时候,该子进程会继承父进程的文件描述符表,所以第二个子进程也会指向1号管道的写端,同样的道理当我们创建第三个子进程的时候,他还会继承父进程的文件描述符表,所以也会指向1号管道和2号管道的写端,又因为我们在子程序的开始过程中只会关闭进程对应管道的写段,不会管其他管道的,这就会导致当我们想要单独关闭某个主进程对管道的写端,从而结束通信时这个管道是无法通过返回0来直接关闭子进程的,所以这里依然会出现数据的损失,和子进程无法关闭的情况,那么为了解决这个问题我们就可以创建一个vector对象用来记录当前已经创建的管道,每次创建子进程时,子进程里面就一一关闭该子进程对这些管道的写段,然后在父进程里面就将新创建的管道的写段尾差到vector里面,那么这一功能的实现就放到createSubProcess函数里面,那么这里的代码如下:

void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{std::vector<int> deleteFd;for(int i=0;i<PROCSS_NUM;i++){int fds[2]={0};int n=pipe(fds);assert(n==0);(void)n;pid_t id=fork();if(id==0){cout<<"创建了子进程"<< endl;for(int i = 0; i < deleteFd.size(); i++) close(deleteFd[i]);//这里是子进程执行的代码close(fds[1]);while(true){//读取管道里面的内容int commondCode=recvTask(fds[0]);//根据管道的内容执行对应的函数if(commondCode>=0&&commondCode<funcMap.size()){funcMap[commondCode]();}else if(commondCode == -1) {break;}}exit(0);}//这里是父进程执行的代码close(fds[0]);subEp sub(id,fds[1]);subs->push_back(sub);deleteFd.push_back(fds[1]);}
}

那么这就是本篇文章的全部内容完整的代码如下:

#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <cassert>
#include <ctime>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define PROCSS_NUM 3
using namespace std;    
typedef void (*func_t)();
class subEp // 描述子进程
{
public:subEp(pid_t subId, int writeFd): subId_(subId), writeFd_(writeFd){char nameBuffer[1024];snprintf(nameBuffer, sizeof nameBuffer, "process-%d[pid(%d)-fd(%d)]", num++, subId_, writeFd_);//给子进程创建名字name_ = nameBuffer;}
public:static int num;//用于区别每个子进程std::string name_;//记录子进程的名字pid_t subId_;//子进程的idint writeFd_;//子进程的写端
};
int subEp ::num =0;
void downLoadTask()
{std::cout << getpid() << ": 下载任务\n"<< std::endl;sleep(1);
}void ioTask()
{std::cout << getpid() << ": IO任务\n"<< std::endl;sleep(1);
}void flushTask()
{std::cout << getpid() << ": 刷新任务\n"<< std::endl;sleep(1);
}
int recvTask(int readFd)
{int code=0;ssize_t  s=read(readFd,&code,sizeof(int));if(s==4)    {return code;}else if(s<=0)   {return -1;}else    {return 0;}
}
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{std::vector<int> deleteFd;for(int i=0;i<PROCSS_NUM;i++){int fds[2]={0};int n=pipe(fds);assert(n==0);(void)n;pid_t id=fork();if(id==0){cout<<"创建了子进程"<< endl;for(int i = 0; i < deleteFd.size(); i++) close(deleteFd[i]);//这里是子进程执行的代码close(fds[1]);while(true){//读取管道里面的内容int commondCode=recvTask(fds[0]);//根据管道的内容执行对应的函数if(commondCode>=0&&commondCode<funcMap.size()){funcMap[commondCode]();}else if(commondCode == -1) {break;}}exit(0);}//这里是父进程执行的代码close(fds[0]);subEp sub(id,fds[1]);subs->push_back(sub);deleteFd.push_back(fds[1]);}
}void loadTaskFunc(std::vector<func_t> *out){assert(out);out->push_back(downLoadTask);out->push_back(ioTask);out->push_back(flushTask);}
void sendTask(const subEp &process, int taskNum)
{cout << "send task num: " << taskNum << " send to -> " << process.name_ << endl;int n=write(process.writeFd_,&taskNum,sizeof(taskNum));assert(n==sizeof(int));(void)n;
}
void loadBlanceContrl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{int processnum=subs.size();int tasknum=funcMap.size();bool forever =(count==0? true:false);while(true){int subIdx=rand()%processnum;int taskIdx=rand()%tasknum;sendTask(subs[subIdx], taskIdx);sleep(1);if(!forever){--count;if(count==0){break;}}}for(int i=0;i<processnum;i++){close(subs[i].writeFd_);}
}
void waitProcess(std::vector<subEp> processes)
{int processnum = processes.size();for(int i = 0; i < processnum; i++){waitpid(processes[i].subId_, nullptr, 0);std::cout << "wait sub process success ...: " << processes[i].subId_ << std::endl;}
}
int main()
{srand((unsigned long)time(nullptr) ^ getpid() ^ 0x171237 ^ rand() % 1234);vector<func_t> funcMap;loadTaskFunc(&funcMap); vector<subEp> subs;//创建子进程createSubProcess(&subs,funcMap);int count=10;loadBlanceContrl(subs,funcMap,count);waitProcess(subs);return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/106376.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Grafana配置邮件告警

1、创建一个监控图 2、grafana邮件配置 vim /etc/grafana/grafana.ini [smtp] enabled true host smtp.163.com:465 user qinziteng05163.com password xxxxx # 授权码 from_address qinziteng05163.com from_name Grafanasystemctl restart grafana-serv…

优化SOCKS5的方法

在今天的互联网世界中&#xff0c;保护个人隐私和提升网络速度至关重要。作为一种常用的代理协议&#xff0c;SOCKS5代理服务器不仅可以保护您的隐私&#xff0c;还可以实现更快速的网络访问。本文将为您介绍一些优化SOCKS5代理服务器的方法&#xff0c;以提高网络速度和安全性…

工作不好找,普通打工人如何破局

大家好&#xff0c;我是苍何&#xff0c;我的一位阿里朋友被裁后&#xff0c;找工作找了一个月都没结果&#xff0c;很多到最后一面被pass了&#xff0c;不由得做一下感慨&#xff0c;即使是大厂背景又如何&#xff0c;面对经济环境和大环境市场&#xff0c;每个人都不容易。 …

【Redis】Redis 的学习教程(九)之 发布 Pub、订阅 Sub

1. Pub/Sub 介绍 Redis 的发布订阅&#xff08;Pub/Sub&#xff09;模式是一种消息传递机制&#xff0c;它允许在发送者和接收者之间建立松耦合的通信关系。在这种模式中&#xff0c;发送者&#xff08;发布者&#xff09;将消息发布到一个指定的频道或模式&#xff0c;而接收…

数据结构--二叉树-堆(1)

文章目录 树概念相关的基本概念树的表示 二叉树概念特殊二叉树性质 堆二叉树的顺序结构堆的概念 堆的实现初始化数组初始化为堆向上调整向下调整插入删除打印、摧毁、判空、获取堆顶数据验证 堆的应用堆排序TopK问题 树 概念 树是一种常见的非线性的数据结构&#xff0c;&…

Linux 6.6 初步支持AMD 新一代 Zen 5 处理器

AMD 下一代 Zen 5 CPU 现已开始为 Linux 6.6 支持提交相关代码&#xff0c;最新补丁包括提供温度监控和 EDAC 报告等。 最新的 Linux 6.6 代码中已经加入了包括支持硬件监视器温度监控和 EDAC 报告的补丁。此外&#xff0c;新版本还加入了 x86 / misc 补丁&#xff0c;Phoronix…

简单5步骤搞定windows server2019 配置IIS支持PHP

测试成功&#xff0c;记录一笔&#xff0c;感谢网上各位大佬的技术支持。 一、安装vcredist_x64.exe 否则可能会出现 FastCGI进程意外退出 二、IIS开启CGI&#xff08;可能需要重启&#xff09; 控制面板&#xff0c;启用或关闭windows程序&#xff0c;IIS--应用程序开发--CGI…

Android12之/proc/pid/status参数含义(一百六十五)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

TypeScript 类

类在面向对象编程中起着创建对象的蓝图&#xff0c;描述所创建的对象共同的属性和方法的作用。 创建类 与JS差不多&#xff0c;通过 Class 关键字来定义一个类&#xff1a; class Greeter {// 静态属性static cname: string "Greeter";// 成员属性greeting: strin…

23062C++QTday4

仿照string类&#xff0c;完成myString 类 代码&#xff1a; #include <iostream> #include <cstring> using namespace std; class myString {private:char *str; //记录c风格的字符串int size; //记录字符串的实际长度public://无参构造my…

SAP CO之内部订单简单介绍

目录 介绍 一、内部订单类别 二、订单类别和订单类型的定义及区别: 三、常见的内部订单类型分类&#xff1a; 介绍 在SAP里面&#xff0c;内部订单是一种成本对象&#xff0c;它和成本中心可以视为 CO 成本管理中的最常用的两种成本费用流出流入的“容器”。我们可以对内部…

Linux内核分析与应用5-中断

本系列是对 陈莉君 老师 Linux 内核分析与应用[1] 的学习与记录。讲的非常之好&#xff0c;推荐观看 留此记录&#xff0c;蜻蜓点水,可作抛砖引玉 中断机制概述 中断是CPU对系统发生的某个事件作出的一种反应, 当中断发生时,CPU暂停正在执行的程序,保留现场后,自动转去执行相应…