文章目录
- 0.认识IPC
- 1.什么是进程间通信?
- 2.IPC的手段
- 3.进程间通信的必要性
- 4.进程间通信的技术背景
- 5.进程间通信的本质理解
- 6.IPC的标准
- 1.学习管道
- 1.1.管道的认识
- 1.2管道的工作原理
- 1.3管道的特点
- 2.模拟匿名管道
- 3.模拟进程池
- 3.1task.hpp
- 3.2processpool.cc
0.认识IPC
1.什么是进程间通信?
知乎好文
Linux 内核提供的常见的进程通信机制:
管道(也称作共享文件)
消息队列(也称作消息传递)
共享内存(也称作共享存储)
信号量和 PV 操作
信号
套接字(Socket)
2.IPC的手段
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变
3.进程间通信的必要性
- 单进程无法使用并发能力,无法实现多进程协同
- IPC通常是为了传输数据,同步执行流,消息通知等
- IPC不是目的而是手段,通过IPC这种手段实现多进程协同===》实现多进程协同才是目的!
4.进程间通信的技术背景
- 进程具有独立性。虚拟地址空间+页表==>保证进程运行的独立性(进程内核数据结构+进程的代码和数据
- 通信成本会比较高
5.进程间通信的本质理解
进程间通信的前提:
- 让不同的进程看到同一块资源(可以理解为“内存”)(特定的结构组织的)
- 这份资源不能隶属于任何一个进程,而应该更强调共享。
6.IPC的标准
标准更多在我们使用者看来,都是接口上具有一定的规律
- Linux原生能提供 ==》匿名/命名管道
- SystemV—多进程–单机通信
共享内存
信号量(主要学原理)
消息队列(不常用) - posix–多线程—网络通信
1.学习管道
1.1.管道的认识
管道是Unix中最古老的进程间通信的形式。把从一个进程连接到另一个进程的一个数据流称为“管道”
- 有一个入口,有一个出口
- 单向传输内容
- 传输的都是"资源“,数据
计算机通信领域的设计者,设计了一种单向通信的方式 — 起名为管道
1.2管道的工作原理
管道通信的本质: 进程间通过管道(文件)通信, 文件属于内核,即管道通信需要内核(OS)提供技术支持。
- 分别以读写方式打开同一个文件(OS有能力打开一个只存在于内存而磁盘上无记录的文件,且该文件无需向磁盘刷新数据,因为压根没必要)
- fork()创建子进程
- 双方进程各自关闭白己不需要的文件描述符
复习文件结构体
管道工作原理
管道函数
int pipefd[2]: 输出型参数,期望通过调用它,得到被打开的文件fd
-DDEBUG:调试版本 #表示取消该选项即恢复release版本
int snprintf(char *str, size_t size, const char *format, ...);
1.3管道的特点
- 管道是用来进行具有血缘关系的进程进行IPC的-- 常用于父子通信
- 管道具有通过让进程间协同,提供了访问控制!内核会对管道操作进行同步与互斥
管道是一个文件 — 读取 ----- 具有访问控制
显示器是文件,父子同时往显示器写入的时候,直接刷屏 — 缺乏访问控制 - 管道提供的是面向流式的通信服务 ---- 面向字节流 ---- 协议(线程讲)
- 管道是基于文件的,文件的生命周期是随进程的 ==> 管道的生命周期是随进程(与这个文件/管道相关的所有进程,相关的所有进程都不再使用该文件/管道,管道关闭)的[通信双方退出,管道自动释放]
- 管道是单向通信的 ==> 半双工通信的一种特殊情况,管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
什么是半双工通信?
一端只能进行发/收的一个动作(并不是只能进行收/发,还可以有其他动作,只是这两个动作不能同时进行)==> 同时进行为全双工
2.模拟匿名管道
读和写的4种情况
a. 写快,读慢,写满不能在写了,只有被读了之后才能继续写
b. 写慢,读快,管道没有数据的时候,读必须等待
c. 写关,读返0,⇒ 读到了文件结尾
d. 读关,写可以继续写,0S会终止写进程(写的内容无人读,无意义,OS会去终止写进程)
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>using namespace std;// 全局buffer无法进行通信:有写时拷贝的存在 无法更改通信int main()
{// 1. 创建管道 0子进程读 1父进程写int pipefd[2] = {0}; // pipefd[0]: 读端 pipefd[1]: 写端int n = pipe(pipefd);assert(n != -1);(void)n; // assert只在断言下生效 在release下无用 若注释该行代码 release下会报错#ifdef DEBUGcout << "pipefd[0]: " << pipefd[0] << endl; // 3cout << "pipefd[1]: " << pipefd[1] << endl; // 4
#endif// 2. 创建子进程pid_t id = fork();assert(id != -1);if (id == 0){// 3.1 构建单向通信的信道 关闭子进程不需要的fdclose(pipefd[1]); // 子进程 - 读 关闭写端char receive_buffer[1024 * 8];while (true){// sleep(20); debug: 父写的快 子读的慢 父写满了无法再写只有子读了之后才能继续写// 写端fd未关闭 有数据就读 没数据就等// 写端fd关闭 读端 read返回0 ==> 读到了文件的结尾// ssize_t read(int fd, void *buf, size_t count);ssize_t s = read(pipefd[0], receive_buffer, sizeof(receive_buffer) - 1);if (s > 0){receive_buffer[s] = 0; // 字符串自定义约定cout << "child[" << getpid() << "] get a message: Father[" << getppid() << "]# " << receive_buffer << endl;}else if (s == 0){cout << "child: father(writer) exit, me(reader) quit!" << endl<< endl;break;}}// close(pipefd[0]); 进程退出 fd自动被关掉exit(0);}// 3.1 构建单向通信的信道 关闭父进程不需要的fdclose(pipefd[0]); // 父进程 - 写 关闭读端string message = "Father is sending messages";int count = 0;char send_buffer[1024 * 8];while (true){// 3.2 构建一个变化的字符串// int snprintf(char *str, size_t size, const char *format, ...);snprintf(send_buffer, sizeof(send_buffer), "%s: %d", message.c_str(), count++);// 3.3 写入// ssize_t write(int fd, const void *buf, size_t count);write(pipefd[1], send_buffer, strlen(send_buffer)); // 文件无需遵守Cstr的规定 不用将\0传入sleep(1);cout << count << endl;if (count == 5){cout << endl<< "father: me(writer) quit!" << endl<< endl;break;}}close(pipefd[1]);// pid_t waitpid(pid_t pid, int *status, int options);pid_t ret = waitpid(id, nullptr, 0);cout << "child_id : " << id << " waitpid(): " << ret << endl;assert(ret > 0);(void)ret;return 0;
}
3.模拟进程池
复习waitpid():
pid_t waitpid(pid_t pid, int *status, int options);
3.1task.hpp
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <unistd.h>
#include <functional>// c++11包装器 using func = std::function<void()>;
typedef std::function<void()> func; std::vector<func> cmdSet; //指令集 存储指令函数
std::unordered_map<int, std::string> cmdContent; //指令内容 指令Id:指令信息void accessSQL()
{std::cout << "sub process[" << getpid() << "] 正在执行访问数据库的任务..." << std::endl<< std::endl;
}void AnalyzeUrl()
{std::cout << "sub process[" << getpid() << "] 正在执行解析网址任务..." << std::endl<< std::endl;
}void calResult()
{std::cout << "sub process[" << getpid() << "] 正在执行计算结果任务..." << std::endl<< std::endl;
}void saveData()
{std::cout << "sub process[" << getpid() << "] 正在执行保存数据任务..." << std::endl<< std::endl;
}void Load()
{size_t cmdId = 0;cmdContent.insert({cmdId++, "accessSQL: 访问数据库"});cmdSet.push_back(accessSQL);cmdContent.insert({cmdId++, "AnalyzeUrl: 解析网址"});cmdSet.push_back(AnalyzeUrl);cmdContent.insert({cmdId++, "calResult: 计算结果"});cmdSet.push_back(calResult);cmdContent.insert({cmdId, "saveData: 保存数据"});cmdSet.push_back(saveData);
}void showCmdset()
{ for (const auto &cmd : cmdContent){std::cout << cmd.first << "\t" << cmd.second << std::endl;}
}int cmdSetsize()
{return cmdSet.size();
}
3.2processpool.cc
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"
using namespace std;#define PROCESS_NUM 5/*
typedef struct slot //插槽
{pid_t subid;int fd;
}slot;
*/
typedef pair<pid_t, int> slot;// 父进程随机生成一个命令编号 通过father_pipefd[1]写到管道
// 子进程调用通过child_pipefd[0]获取父进程传输的命令编号存入command_ref并返回
// 若command_ref合法 则执行相关操作// 父进程把命令编号通过father_pipefd[1]写到管道
void sendCommand(int fatherWriter, uint32_t command_ref)
{// ssize_t write(int __fd, const void *__buf, size_t __n)write(fatherWriter, &command_ref, sizeof(command_ref));
}// 子进程通过child_pipefd[0]获取父进程传输的命令编号存入command_ref并返回
int getCommand(int childReader, bool &quit)
{uint32_t command_ref = 0; // u:无符号 32:32bit ==> 无符号4byte// ssize_t read(int __fd, void *__buf, size_t __nbytes)ssize_t s = read(childReader, &command_ref, sizeof(command_ref));if (s == 0){quit = true;return -1;}assert(s == sizeof(uint32_t));return command_ref;
}int main()
{Load();vector<pair<pid_t, int>> slots; // 新创建的child_pid : fatherWriter// 创建PROCESS_NUM个进程for (int i = 0; i < PROCESS_NUM; i++){// 每创建一个新子进程 执行如下操作// fork一直是father执行的 fork之后的每一个child都掉入while中直到满足某条件使该子进程退出// 每创建一个新子进程 把该子进程pid 和 父进程写端 存入slots// 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n == 0);(void)n;pid_t id = fork();assert(id != -1);// child -- 读 关闭写端if (id == 0){close(pipefd[1]);while (true){// 阻塞等待获取命令bool quit = false;int command_ref = getCommand(pipefd[0], quit);if (quit == true)break;// 执行对应的命令if (command_ref >= 0 && command_ref < cmdSetsize()){cmdSet[command_ref]();}else{cout << "非法command: " << command_ref << endl;}}exit(1);}// father -- 写 关闭读端close(pipefd[0]);slots.push_back(pair<pid_t, int>(id, pipefd[1]));}// 父进程派发任务// void srand(unsigned int __seed) 时间戳^父进程^乱码 ==> 让数据源/初始值更随机//异或操作:位运算,速度快srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); while (true){// int rand(); rand会先调用srand 将其返回值作为初始值int command_ref = rand() % cmdSetsize(); // 随机生成命令编号 -- 调用哪一个命令// 随机选择进程来完成任务 ==> 随机数方式的负载均衡int child_ref = rand() % slots.size(); // 随机生成子进程编号 -- 调用哪一个子进程// void sendCommand(pid_t execChild, int fatherWriter, uint32_t command_ref)sendCommand(slots[child_ref].second, command_ref);cout << "father[" << getpid() << "] call child[" << slots[child_ref].first<< "] execute " << cmdContent[command_ref]<< " through fatherWriter " << slots[child_ref].second << endl;sleep(1);// while循环未设置结束操作 下面的for无法执行// 致使fd写端未完全关闭(测试程序 ctrl+C结束该主进程 对应fd全部自动关闭)}// close father对每一个管道的pipefd[1] 之后每一个child_read读到文件尾-->退出for (const auto &slot : slots){close(slot.second);}// 阻塞等待创建的每一个child 获取退出状态 回收信息for (const auto &slot : slots){waitpid(slot.first, nullptr, WNOHANG);}
}