进程间通信方式
管道
单向传输,先进先出,容量较小,效率低
匿名管道
特点
- 作用于父子进程间
- 只能保持一个读端一个写端
- 生命周期应当保持一致
管道只剩下读端时会进入非阻塞状态,只剩下写端的时候写满然后阻塞等待读端读数据
示例代码
#include <iostream>
#include <unistd.h>
#include <cstring>
int main() {int pipefd[2]; // 文件描述符数组,用于存放管道的读写端pid_t pid; // 进程ID// 创建管道if (pipe(pipefd) == -1) {static_assert(false,"pipe");}// 创建子进程pid = fork();if (pid == -1) {static_assert(false,"fork");}if (pid == 0) { // 子进程close(pipefd[1]); // 关闭子进程的写端// 读取父进程发送的消息while (1){char buffer[100];read(pipefd[0], buffer, sizeof(buffer));std::cout << "子进程收到消息:" << std::string(buffer) << std::endl;}//close(pipefd[0]); // 关闭读端} else { // 父进程close(pipefd[0]); // 关闭父进程的读端const char* message = "你好,这是父进程发送的消息。\n";write(pipefd[1], message, strlen(message) + 1);while(1){}}
}
命名管道
特点
- 作用不局限于父子进程间
- 可以理解为一个文件
mkfifo ./fifo
写进程
int main(){int fd = open("./fifo", O_WRONLY); // 打开管道的读取端const char* message = "Hello";write(fd, message, strlen(message) + 1);
}
读进程
int main(){int fd = open("./fifo", O_RDONLY); // 打开管道的读取端char ch[1024]={0};read(fd, ch, strlen(ch)-1);cout<<ch;
}
消息队列
保存在内核中的消息链表,按照消息类型进行传递,具有较高的可靠性和稳定性,消息体有最大长度限制,存在用户态和内核态之间的拷贝开销
用户态消息队列,使用互斥锁保证线程安全
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
// 消息队列类
template<typename T>
class MessageQueue {
public:// 添加消息到队列void push(const T& msg) {std::lock_guard<std::mutex> lock(mutex_);queue_.push(msg);condition_.notify_one(); // 通知等待中的线程}// 从队列中获取消息T pop() {std::unique_lock<std::mutex> lock(mutex_);// 使用while循环等待直到队列非空while (queue_.empty()) {condition_.wait(lock);}T msg = queue_.front();queue_.pop();return msg;}
private:std::queue<T> queue_;std::mutex mutex_;std::condition_variable condition_;
};
// 生产者函数,向队列中添加消息
void producer(MessageQueue<int>& mq) {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 模拟生产消息的延迟mq.push(i);std::cout << "Produced: " << i << std::endl;}
}
// 消费者函数,从队列中获取消息并处理
void consumer(MessageQueue<int>& mq) {for (int i = 0; i < 10; ++i) {int msg = mq.pop();std::cout << "Consumed: " << msg << std::endl;}
}int main() {MessageQueue<int> mq;// 创建生产者和消费者线程std::thread producerThread(producer, std::ref(mq));std::thread consumerThread(consumer, std::ref(mq));// 等待线程执行完毕producerThread.join();consumerThread.join();return 0;
}
内核态消息队列(POSIX 消息队列)
#include <iostream>
#include <fcntl.h>
#include <mqueue.h>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#define QUEUE_NAME "/my_queue"
#define MAX_MESSAGES 10
#define MAX_MSG_SIZE 256
void producer() {mqd_t mq;struct mq_attr attr;char buffer[MAX_MSG_SIZE];// 设置消息队列属性attr.mq_flags = 0;attr.mq_maxmsg = MAX_MESSAGES;attr.mq_msgsize = MAX_MSG_SIZE;attr.mq_curmsgs = 0;// 创建或打开消息队列mq = mq_open(QUEUE_NAME, O_CREAT | O_WRONLY, 0644, &attr);if (mq == (mqd_t)-1) {perror("mq_open");exit(1);}for (int i = 0; i < 10; ++i) {snprintf(buffer, MAX_MSG_SIZE, "Message %d", i);// 发送消息到消息队列if (mq_send(mq, buffer, strlen(buffer) + 1, 0) == -1) {perror("mq_send");continue;}std::cout << "Produced: " << buffer << std::endl;sleep(1); // 模拟生产消息的延迟}// 关闭消息队列mq_close(mq);
}void consumer() {mqd_t mq;char buffer[MAX_MSG_SIZE];// 打开消息队列mq = mq_open(QUEUE_NAME, O_RDONLY);if (mq == (mqd_t)-1) {perror("mq_open");exit(1);}while (true) {// 接收消息if (mq_receive(mq, buffer, MAX_MSG_SIZE, NULL) == -1) {perror("mq_receive");continue;}std::cout << "Consumed: " << buffer << std::endl;}// 关闭消息队列mq_close(mq);
}
int main() {// 创建生产者和消费者进程pid_t producer_pid = fork();if (producer_pid == 0) {producer();} else if (producer_pid > 0) {consumer();} else {perror("fork");return 1;}return 0;
}
共享内存
映射一段能被其它进程所访问的内存,由一个进程创建多个进程可以访问,共享内存不需要进入内核态,是最快的进程间通信方式。
当多个进程同时使用共享内存会产生数据紊乱的情况,所以常常搭配信号量一起使用。
简单的来讲就是通过系统调用由内核开辟一块共享内存,然后使用mmap进行映射,malloc底层mmap系统调用可以理解为申请了一块匿名共享内存。
#include <iostream>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
int main() {// 共享内存大小const size_t SHM_SIZE = 4096;// 创建或打开共享内存对象int shm_fd = shm_open("/my_shared_memory", O_CREAT | O_RDWR, 0666);if (shm_fd == -1) {std::cerr << "Failed to create shared memory" << std::endl;return 1;}// 调整共享内存对象的大小if (ftruncate(shm_fd, SHM_SIZE) == -1) {std::cerr << "Failed to set size of shared memory" << std::endl;return 1;}// 映射共享内存到进程的地址空间void* ptr = mmap(0, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);if (ptr == MAP_FAILED) {std::cerr << "Failed to map shared memory" << std::endl;return 1;}// 写入数据到共享内存const char* message = "Hello, shared memory!";std::memcpy(ptr, message, std::strlen(message) + 1);std::cout << "Data written to shared memory: " << message << std::endl;// 解除映射并关闭共享内存对象if (munmap(ptr, SHM_SIZE) == -1) {std::cerr << "Failed to unmap shared memory" << std::endl;}if (close(shm_fd) == -1) {std::cerr << "Failed to close shared memory file descriptor" << std::endl;}// 删除共享内存对象if (shm_unlink("/my_shared_memory") == -1) {std::cerr << "Failed to unlink shared memory" << std::endl;}return 0;
}
信号量
是一个计数器,用于控制对资源的访问,可以理解为一种锁机制,包含P,V操作,P是-1当信号量为0时会等待直到信号量大于0可以理解为加锁,V是+1当资源使用完的话信号量执行V操作,当信号量大于0时表示资源可以使用。
一般结合共享内存使用,可以作为共享内存的加锁方式。
#include <iostream>
#include <fcntl.h>
#include <sys/mman.h>
#include <semaphore.h>
#include <cstring>
#include <unistd.h>const char* SHARED_MEMORY_NAME = "/my_shared_memory";
const char* SEMAPHORE_NAME = "/my_semaphore";struct SharedData {int counter;char message[256];
};int main() {// 创建或打开共享内存对象int shm_fd = shm_open(SHARED_MEMORY_NAME, O_CREAT | O_RDWR, 0666);if (shm_fd == -1) {perror("shm_open");return 1;}// 调整共享内存对象的大小if (ftruncate(shm_fd, sizeof(SharedData)) == -1) {perror("ftruncate");return 1;}// 映射共享内存到进程的地址空间SharedData* shared_data = static_cast<SharedData*>(mmap(nullptr, sizeof(SharedData), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0));if (shared_data == MAP_FAILED) {perror("mmap");return 1;}// 创建或打开信号量sem_t* semaphore = sem_open(SEMAPHORE_NAME, O_CREAT, 0666, 1);if (semaphore == SEM_FAILED) {perror("sem_open");return 1;}// 初始化共享数据shared_data->counter = 0;strcpy(shared_data->message, "Hello, shared memory!");// 启动子进程pid_t pid = fork();if (pid == -1) {perror("fork");return 1;} else if (pid == 0) {// 子进程for (int i = 0; i < 5; ++i) {sem_wait(semaphore); // 等待信号量std::cout << "Child: Counter = " << ++(shared_data->counter) << ", Message = " << shared_data->message << std::endl;sem_post(semaphore); // 释放信号量sleep(1); // 模拟子进程操作}} else {// 父进程for (int i = 0; i < 5; ++i) {sem_wait(semaphore); // 等待信号量std::cout << "Parent: Counter = " << ++(shared_data->counter) << ", Message = " << shared_data->message << std::endl;sem_post(semaphore); // 释放信号量sleep(1); // 模拟父进程操作}}// 关闭和删除共享内存对象munmap(shared_data, sizeof(SharedData));close(shm_fd);shm_unlink(SHARED_MEMORY_NAME);// 关闭和删除信号量sem_close(semaphore);sem_unlink(SEMAPHORE_NAME);return 0;
}
信号
异步通信机制,通知某个进程某个事件发生了,比如kill杀死进程就是告知对应的信号你的任务完成了你需要调用信号处理函数下班了。
套接字
可以用于不同主机间的进程通信,这里就不赘述了