代码结构
- 任务:这里用一个int类型的taskNumber代替任务
- 任务队列类:封装了任务队列,存,取等操作。
- 生产者工作函数:生产者执行的函数,向任务队列中添加任务,每个生产者生产3个任务
- 消费者工作函数:消费者执行的函数,从任务队列中拿任务,如果5秒内一直没有任务,则销毁
C++实现代码
#include <stdio.h>
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <unistd.h>
#include <chrono>
using namespace std;int taskNumber = 0; //任务class task_queue{
public:// 构造函数task_queue(int maxNum=10){this->maxNum = maxNum;}// 添加任务void add_task(int i){ //i为生产者编号myMutex.lock(); // 访问临界资源,加锁// 这里必须使用while而不是if,考虑以下情况:// A B两个线程同时因为任务队列满而阻塞,现在来了一个空位置,AB同时解除阻塞(因为我们用的是notify_all()函数唤醒所有线程)// 这时假设A抢到了互斥锁myMutex并添加任务,之后释放myMutex。这时B又抢到锁,但是还没有空位置,如果用if就会出错。所以需要while循环判断是否有任务while(q.size() == maxNum){ //如果任务队列已经满了condFull.wait(myMutex); //条件锁阻塞}q.push(++taskNumber); //添加任务condEmpty.notify_all(); //告知因为没有任务而被阻塞的消费者线程解除阻塞cout<<"【任务"<<taskNumber<<"】已经被【生产者"<<i<<"】添加到任务队列中"<<endl;myMutex.unlock(); //解锁sleep(1);}// 弹出任务bool get_task(int i){ //i为消费者编号myMutex.lock(); // 访问临界资源,加锁// 使用while而不是if的原因同上,如果条件锁的唤醒函数使用的是notify_one()函数,理论上可以使用ifwhile(q.empty()){ //任务队列为空,则等待cv_status flag =condEmpty.wait_for( myMutex, chrono::seconds(5)); //等待5秒if(flag == std::__1::cv_status::timeout){ //timeout表示5秒都没有任务cout<<"【子线程"<<i<<"】退出"<<endl;myMutex.unlock(); //这里要解锁,不然退出的线程会一直占用互斥锁导致死锁return false; // 等待5秒都没有任务要执行,则退出线程}}int x = q.front(); //取任务q.pop();condEmpty.notify_all(); //告知因为任务队列满而阻塞的生产者解除阻塞cout<<"【消费者"<<i<<"】正在执行【任务"<<x<<"】......"<<endl;myMutex.unlock(); //互斥锁解锁sleep(1);return true;}// 获得当前任务数目int get_task_num(){lock_guard<mutex>my_lock_guard(myMutex); //使用lockguard自动释放锁return (int)q.size();}// 获得最大任务数据int get_task_max_num(){lock_guard<mutex>my_lock_guard(myMutex); //使用lockguard自动释放锁return maxNum;}private:queue<int>q; //假设一个int代表一个任务taskint maxNum; //最大任务数mutex myMutex; // 临界资源互斥锁condition_variable_any condFull; // 任务队列满条件锁condition_variable_any condEmpty; // 任务队列空条件锁
};void producer_task(task_queue &q, int i){ // 生产者工作函数,每个生产者生产3个任务, i(0~4)代表生产者编号int num=3;while(num--){q.add_task(i);
// sleep(1);}
}
void consumer_task(task_queue &q, int i){ // 消费者工作函数,消费者循环消费任务,如果5秒内没有任务则停止工作while(1){bool flag = q.get_task(i);
// sleep(1);if(flag==false)break;}
}int main(){task_queue q(5); //最大任务数为5thread producer[5]; // 5个生产者对象thread consumer[5]; // 5个消费者对象for(int i=0;i<5;++i){
// producer[i] = thread(&task_queue::add_task, &q, i,i); //给生产者指定任务
// consumer[i] = thread(&task_queue::get_task, &q,i); //给消费者指定任务producer[i] = thread(producer_task, ref(q),i); //给生产者指定任务consumer[i] = thread(consumer_task, ref(q),i); //给消费者指定任务}for(int i=0;i<5;++i){ //主线程等待子线程执行完毕producer[i].join();consumer[i].join();}return 0;
}
运行结果:
尾部的运行结果,可以看出任务是按照添加的顺序执行的,在等待5秒之后,线程依次退出