多线程数据读写顺序处理
一个典型的生产者-消费者模型,在这个模型中,多个工作线程并行处理从共享队列中获取的数据,并将处理结果以保持原始顺序的方式放入另一个队列。
多线程处理模型,具体细节如下:
1.数据:数据里必须有个递增的标识符
和一个结束标识(ending)
2. 读队列(安全队列):用于存放待处理的数据。
-
处理线程:每个线程都是一个死循环
读数据-处理数据-写数据
,它们被编号为1、2、3、4等。这些线程负责从读队列
中取出数据进行处理。线程的结束:判断数据里的ending为true
. -
结果聚合:处理完成后,判断数据的
递增的标识符
,是否为全局的递增的标识符
,如果相等 继续执行。以保持数据的一致性。 -
写队列(安全队列):用于处理好的数据按照读的顺序写入,写入数据到输出队列的顺序是保持一致的。
自定义设计多线程模版:
#include "queuestable.h"
#ifndef QUEUESTABLE_H
#define QUEUESTABLE_H#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>//定义一个线程安全的队列template <typename T>
class QueueStable
{
public:QueueStable() = default;QueueStable(const QueueStable<T>&) = delete;QueueStable& operator=(const QueueStable<T>&) = delete;QueueStable(unsigned int max_size){m_max_size = max_size;}//设置存放数据的最大容量void set_max_size(unsigned int max_size){m_max_size = max_size;}void push(T value){std::unique_lock<std::mutex> lock(m_mutex);//队列大于20就阻塞m_cv.wait(lock, [&] { return m_queue.size() < m_max_size; }); //为 true 继续执行,否则,解锁-》等待。唤醒后,加锁//入队m_queue.push(std::move(value));//m_queue.push(value);//解锁lock.unlock();m_cv.notify_one(); //唤醒另一个}T pop(){//加锁std::unique_lock<std::mutex> lock(m_mutex);m_cv.wait(lock, [&] { return !m_queue.empty(); }); //为 true 继续执行,否则,解锁-》等待。唤醒后,加锁//出队T data = std::move(m_queue.front());//T data = m_queue.front();m_queue.pop();//解锁lock.unlock();m_cv.notify_one(); //唤醒另一个return data;}T front() const{std::lock_guard<std::mutex> lock(m_mutex);return m_queue.front();}// bool empty() const// {// std::lock_guard<std::mutex> lock(m_mutex);// return m_queue.empty();// }// int size()// {// //加锁// std::lock_guard<std::mutex> lock(m_mutex);// return m_queue.size();// }private:std::queue<T> m_queue;std::mutex m_mutex;std::condition_variable m_cv;unsigned int m_max_size = 1;
};//包数据
struct BaseData
{//`递增的标识符` sequence_numberint64_t sequence_number ; //记录当前位置,保证数据顺序一致bool ending = false; //true 代表结尾};//多线程处理
template <typename T,typename P>
class MultiThreadProcessing
{static_assert(std::is_base_of<BaseData, T>::value, "T must be derived from BaseData!");public:enum class ThreadMode{Detach, // 分离Join // 阻塞};MultiThreadProcessing(){m_ending = false;m_thread_mode = ThreadMode::Join;}//设置上下文结构使用的数据 - 必须设置void set_contexts(const std::vector<std::shared_ptr<P>>& contexts){//生成用于线程的结构for(int i=0; i<contexts.size(); ++i){auto& context = m_contexts.emplace_back();context.id = i;context.ext_context = contexts[i];}}//设置运行的处理函数 - 必须设置void set_execute_function(const std::function<void(const std::shared_ptr<T>&,const std::shared_ptr<P>&)>& function){m_execute_function = function;}//设置线程模式void set_thread_mode(MultiThreadProcessing::ThreadMode thread_mode = ThreadMode::Join){m_thread_mode = thread_mode;}//多线程同步检测void start(const std::shared_ptr<QueueStable<std::shared_ptr<T>>>& read_queue,const std::shared_ptr<QueueStable<std::shared_ptr<T>>>& write_queue){//检查处理函数是否可调用if (!m_execute_function){// 打印异常信息std::cerr << "MultiThreadProcessing: Invalid to execute function." << std::endl;throw std::runtime_error("Invalid to execute function.");return;}std::shared_ptr<DataPacket> data_packer = read_queue->pop();//取出第一个包,查看序号 不处理m_sequence_number = data_packer->sequence_number;//先找到第一个包的序号m_ckeck_sequence_number = data_packer->sequence_number;write_queue->push(data_packer);m_sequence_number++;m_ckeck_sequence_number++;//循环开启线程for(auto& context : m_contexts){//开启线程context.thread = std::make_shared<std::thread>([this,&context,&read_queue,&write_queue](){std::cout << "context.id: " << context.id<<" Started." << std::endl;for(;;){// 读数据 逻辑//加锁m_read_mutex.lock();//为true,末尾标志,不在继续,直接结束if(m_ending){m_read_mutex.unlock();break;}context.temp_data = read_queue->pop();//检测是否为递增1if(m_ckeck_sequence_number.load() == context.temp_data->sequence_number){m_ckeck_sequence_number++;}else{m_ending = true;// 打印异常信息std::cerr << "MultiThreadProcessing: The sequence number must be incremented by one." << std::endl;throw std::runtime_error("The sequence number must be incremented by one.");break;//异常}//true 包的末尾,結束if(context.temp_data->ending){m_ending = true;std::unique_lock<std::mutex> lock(m_write_mutex);m_write_cond_var.wait(lock, [&] { return m_sequence_number.load() == context.temp_data->sequence_number; }); //为 true 继续执行,否则,解锁-》等待。唤醒后,加锁//写入包write_queue->push(context.temp_data);//解锁lock.unlock();m_write_cond_var.notify_all(); //唤醒所有break;}m_read_mutex.unlock();//自定义函数处理数据m_execute_function(context.temp_data,context.ext_context);//写数据逻辑std::unique_lock<std::mutex> lock(m_write_mutex);m_write_cond_var.wait(lock, [&] { return m_sequence_number.load() == context.temp_data->sequence_number; }); //为 true 继续执行,否则,解锁-》等待。唤醒后,加锁//写入包write_queue->push(context.temp_data);m_sequence_number++;//解锁lock.unlock();m_write_cond_var.notify_all(); //唤醒所有}std::cout << "context.id: " << context.id<<" Finished." << std::endl;});}//阻塞线程for(auto& context : m_contexts){if (context.thread->joinable()){if(m_thread_mode == ThreadMode::Join){context.thread->join();}else if (m_thread_mode == ThreadMode::Detach){context.thread->detach();}}}}private:struct S{std::shared_ptr<T> temp_data;std::shared_ptr<std::thread> thread;uint32_t id;std::shared_ptr<P> ext_context; //外部的自定义数据};std::vector<S> m_contexts;//每个线程的临时数据,用于多线程临时//结束标志std::atomic<bool> m_ending = false;std::mutex m_read_mutex;//读锁std::mutex m_write_mutex;//写锁std::condition_variable m_write_cond_var;//写条件变量 同步数据顺序std::atomic<int64_t> m_sequence_number;//保证数据顺序std::atomic<int64_t> m_ckeck_sequence_number;//检查数据序号,若数据序号不是递增为1,抛出异常std::function<void(const std::shared_ptr<T>&,const std::shared_ptr<P>&)> m_execute_function; //每个数据单次处理函数MultiThreadProcessing::ThreadMode m_thread_mode = ThreadMode::Join;
};#endif // QUEUESTABLE_H
QueueStable
安全队列类
BaseData
基本数据
MultiThreadProcessing
多线程模版类 ,处理继承基本数据BaseData
的结构体
使用 QueueStable
类确保数据的读写正常,MultiThreadProcessing
多线程处理数据,保证数据先读的先写。
举例:下面实现openVINO + yolov8
推理代码,使用上面的多线程模版:
yolov8_2.h
#ifndef YOLOV8_2_H
#define YOLOV8_2_H#include "filterbase.h"
#include "queuestable.h"
//包数据
struct DataPacket : BaseData
{AVMediaType av_media_type = AVMEDIA_TYPE_UNKNOWN;//记录当前的索引,分辨是音频还是视频,字幕,等std::shared_ptr<AVPacketPtr> packet; //存放包//记录av_media_type类型数据,如果数据没解码,数据在packet中std::vector<std::shared_ptr<AVFramePtr>> frame_vector;
};class YOLOV8_2 : public FilterBase
{
public:struct Config{float nms_threshold;float score_threshold;std::string model_path;std::string bin_path = {};std::string properties = "GPU.0";uint32_t image_interval = 2; // 处理图像的间隔,每 image_interval处理一次};struct Detection{int class_id;std::string class_name; //类型名float confidence;//置信度cv::Rect box; // 矩形框位置};struct InferContext{ov::InferRequest request;uint32_t image_interval; // 处理图像的间隔,每 image_interval处理一次};YOLOV8_2(){}YOLOV8_2(const Config& config);~YOLOV8_2(){}//实现基类的纯虚函数 start 是个接口,用于实现多态,基类不做实现void start(const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& read_queue,const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& write_queue) override;//多线程同步检测void detect(const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& read_queue,const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& write_queue);protected:void initial();//预处理ov::Tensor preprocess(const cv::Mat& frame, cv::Mat& pre_frame);//后处理void postprocess(cv::Mat& frame, const ov::Tensor& output_tensor);cv::Mat letterbox(const cv::Mat& input_image, const cv::Size& target_size, const cv::Scalar& fill_color = cv::Scalar(0, 0, 0), float* m_ratio = nullptr,int* m_top_offset = nullptr,int* m_left_offset = nullptr);private:Config m_config; // 参数float m_ratio; // 原图与模型输入图 缩放比例int m_top_offset;int m_left_offset;ov::CompiledModel m_compiled_model;MultiThreadProcessing<DataPacket,InferContext> m_multi_thread;std::vector<std::shared_ptr<InferContext>> m_infer_request_vector; //推理列表,用于多路推理uint32_t m_infer_request_size;
};
#endif // YOLOV8_2_H
#include "yolov8_2.h"
const std::vector<std::string> coconame = { "person","bicycle","car","motorcycle","airplane","bus","train","truck","boat","traffic light","fire hydrant","stop sign","parking meter","bench","bird","cat","dog","horse","sheep","cow","elephant","bear","zebra","giraffe","backpack","umbrella","handbag","tie","suitcase","frisbee","skis","snowboard","sports ball","kite","baseball bat","baseball glove","skateboard","surfboard","tennis racket","bottle","wine glass","cup","fork","knife","spoon","bowl","banana","apple","sandwich","orange","broccoli","carrot","hot dog","pizza","donut","cake","chair","couch","potted plant","bed","dining table","toilet","tv","laptop","mouse","remote","keyboard","cell phone","microwave","oven","toaster","sink","refrigerator","book","clock","vase","scissors","teddy bear","hair drier","toothbrush" };YOLOV8_2::YOLOV8_2(const YOLOV8_2::Config& config)
{this->m_infer_request_size = 12;m_config = config;initial();
}//多线程同步检测
void YOLOV8_2::detect(const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& read_queue,const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& write_queue)
{std::function<void(const std::shared_ptr<DataPacket>&,const std::shared_ptr<InferContext>&)> lambda = [this](const std::shared_ptr<DataPacket>& data_packer,const std::shared_ptr<InferContext>& context){if(data_packer->av_media_type == AVMEDIA_TYPE_VIDEO && !data_packer->frame_vector.empty()){for(const auto& frame : data_packer->frame_vector){if(frame->get_number() % context->image_interval != 0){continue;}// 创建 cv::Mat 对象,注意这里直接使用 AVFrame 的数据cv::Mat image = cv::Mat(frame->get()->height, frame->get()->width, CV_8UC3, frame->get()->data[0], frame->get()->linesize[0]);if (!image.empty()){cv::Mat pre_image;//临时存储//预处理ov::Tensor input_tensor = preprocess(image,pre_image);//开始推理context->request.set_input_tensor(input_tensor);context->request.infer();//等待完成处理结果const ov::Tensor& output_tensor = context->request.get_output_tensor();this->postprocess(image, output_tensor);}}}};m_multi_thread.set_contexts(m_infer_request_vector);m_multi_thread.set_execute_function(lambda);m_multi_thread.start(read_queue,write_queue);
}
void YOLOV8_2::start(const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& read_queue,const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& write_queue)
{detect(read_queue,write_queue);
}
void YOLOV8_2::initial()
{//创建推理引擎 ieov::Core core;//读取模型/** std::shared_ptr<ov::Model> model = core.read_model(this->onnx_path);* @brief从IR / ONNX / PDPD / TF / TFLite文件格式读取模型。* @param model_path模型的路径。* @param bin_path数据文件的路径。* 对于IR格式(*.bin) :* 如果`bin_path`为空,将尝试读取与XML同名的bin文件* *如果没有找到同名的bin文件,将加载无权重的IR。* 对于以下文件格式,不使用`bin_path`参数:* ONNX格式(*.onnx)* *PDPD(*.pdmodel)* *TF(*.pb)* *TFLite(*.tflite)* @返回一个模型。*/std::shared_ptr<ov::Model> model = core.read_model(m_config.model_path,m_config.bin_path);ov::preprocess::PrePostProcessor ppp = ov::preprocess::PrePostProcessor(model);ppp.input().tensor().set_element_type(ov::element::u8).set_layout("NHWC").set_color_format(ov::preprocess::ColorFormat::BGR)/*.set_spatial_static_shape(640, 640) //640*640 yolov8输入大小*/;//ppp.input().tensor().set_shape(ov::PartialShape({ 1,640,640,3 }));//自定义输入大小,确保和模型大小同样ppp.input().preprocess().convert_layout("NCHW").convert_element_type(ov::element::f32).convert_color(ov::preprocess::ColorFormat::RGB).scale({ 255, 255, 255 });// .scale({ 112, 112, 112 });//ppp.input().preprocess().resize(ov::preprocess::ResizeAlgorithm::RESIZE_NEAREST, 640, 640);//ppp.input().model().set_layout("NCHW");ppp.output().postprocess().convert_element_type(ov::element::f32);//ppp.output().tensor().set_element_type(ov::element::f32);model = ppp.build();this->m_compiled_model = core.compile_model(model,m_config.properties);//创建推理请求for(size_t i=0;i<m_infer_request_size;++i){std::shared_ptr<InferContext> sh = std::make_shared<InferContext>();sh->request = m_compiled_model.create_infer_request();sh->image_interval = m_config.image_interval;m_infer_request_vector.push_back(sh);}
}// Letterbox 缩放函数
/** input_image 输入原图像* target_size 目标图像大小* fill_color 填充颜色* m_ratio 缩放比例* m_top_offset 缩放的图像 在目标图像中的 y 位置* m_left_offset 缩放的图像 在目标图像中的 x 位置*/
cv::Mat YOLOV8_2::letterbox(const cv::Mat& input_image, const cv::Size& target_size, const cv::Scalar& fill_color, float* m_ratio,int* m_top_offset,int* m_left_offset)
{//输出图像cv::Mat output_image(target_size, input_image.type(), fill_color);//输入图像和输出图像 高度和宽度 都相等,直接复制返回if(input_image.cols == output_image.cols && input_image.rows == output_image.rows){input_image.copyTo(output_image);//获取比例if (m_ratio){*m_ratio = 1.0;}if (m_top_offset){*m_top_offset = 0;}if (m_left_offset){*m_left_offset = 0;}return output_image;}float r = 0.0;cv::Rect dest_rect;//输入图像宽 > 图像高,宽对齐,高至中if (input_image.cols > input_image.rows){// 宽缩放 m_ratio ,那么高也要缩放 m_ratior = static_cast<float>(input_image.cols) / output_image.cols;int new_rows = static_cast<int>(input_image.rows / r);dest_rect = cv::Rect(0, (output_image.rows - new_rows) / 2, output_image.cols, new_rows);//dest_rect = cv::Rect(0, 0, output_image.cols, new_rows);}else{// 高缩放 m_ratio ,那么宽也要缩放 m_ratior = static_cast<float>(input_image.rows) / output_image.rows;int new_cols = static_cast<int>(input_image.cols / r);dest_rect = cv::Rect((output_image.cols - new_cols) / 2, 0, new_cols, output_image.rows);}//获取比例if (m_ratio){*m_ratio = r;}if (m_top_offset){*m_top_offset = dest_rect.y;}if (m_left_offset){*m_left_offset = dest_rect.x;}cv::resize(input_image, output_image(dest_rect), dest_rect.size(), cv::INTER_LINEAR);return output_image;
}
//预处理
ov::Tensor YOLOV8_2::preprocess(const cv::Mat& frame, cv::Mat& pre_frame)
{//预处理const ov::Shape& shape = m_compiled_model.input().get_shape();//shape 对应 ppp.input().tensor().set_element_type(ov::element::u8).set_layout("NHWC") 中 NHWCpre_frame = letterbox(frame, cv::Size(shape.at(2), shape.at(1)), cv::Scalar(100, 100, 100), &m_ratio, &m_top_offset, &m_left_offset);uchar* input_data = pre_frame.data;return ov::Tensor(m_compiled_model.input().get_element_type(), m_compiled_model.input().get_shape(), input_data);
}
//后处理
void YOLOV8_2::postprocess(cv::Mat& frame, const ov::Tensor& output_tensor)
{std::vector<cv::Rect> boxes;std::vector<int> class_ids;std::vector<float> confidences;const ov::Shape& output_shape = output_tensor.get_shape();const int& out_rows = output_shape.at(1);const int& out_cols = output_shape.at(2);const cv::Mat det_output(out_rows, out_cols, CV_32F, (float*)output_tensor.data<float>());CHECK(det_output.cols == 8400)CHECK(det_output.rows == 84)//找到所有符合的 类别 矩形,置信度,for (int i = 0; i < det_output.cols; ++i){const cv::Mat& classes_scores = det_output.col(i).rowRange(4, 84);cv::Point class_id_point;double score;cv::minMaxLoc(classes_scores, nullptr, &score, nullptr, &class_id_point);//阈值大于0.25 认为检测出结果//if (score > 0.3){//坐标const float& x = det_output.at<float>(0, i);const float& y = det_output.at<float>(1, i);const float& w = det_output.at<float>(2, i);const float& h = det_output.at<float>(3, i);cv::Rect box;box.x = static_cast<int>(x);box.y = static_cast<int>(y);box.width = static_cast<int>(w);box.height = static_cast<int>(h);boxes.push_back(box);class_ids.push_back(class_id_point.y);confidences.push_back(score);}}std::vector<int> nms_result;//nms 去重,找到最优数据cv::dnn::NMSBoxes(boxes, confidences, m_config.score_threshold, m_config.nms_threshold, nms_result);std::vector<Detection> output;for (int i = 0; i < nms_result.size(); ++i){Detection result;int idx = nms_result.at(i);result.class_id = class_ids.at(idx);result.confidence = confidences.at(idx);result.class_name = coconame.at(result.class_id) + ' ' + std::to_string(result.confidence).substr(0, 4);result.box.width = boxes.at(idx).width * this->m_ratio;result.box.height = boxes.at(idx).height * this->m_ratio;result.box.x = (boxes.at(idx).x - 0.5 * boxes.at(idx).width - this->m_left_offset) * this->m_ratio ;result.box.y = (boxes.at(idx).y - 0.5 * boxes.at(idx).height - this->m_top_offset) * this->m_ratio ;output.push_back(result);}//绘制for (int i = 0; i < output.size(); ++i){auto detection = output.at(i);auto box = detection.box;auto class_string = detection.class_name;float xmax = box.x + box.width;float ymax = box.y + box.height;//生成随机颜色// 获取当前系统时间作为种子auto current_time = std::chrono::system_clock::now().time_since_epoch().count();// 使用随机种子创建 RNG 对象cv::RNG rng(current_time);cv::Scalar color= cv::Scalar(rng.uniform(100, 256),rng.uniform(100, 256),rng.uniform(100, 256));// Detection boxcv::rectangle(frame, cv::Point(box.x, box.y), cv::Point(xmax, ymax), color, 2);// Detection box textcv::Size textSize = cv::getTextSize(class_string, cv::FONT_HERSHEY_DUPLEX, 1, 2, 0);cv::Rect textBox(box.x, box.y - 40, textSize.width + 10, textSize.height + 20);cv::rectangle(frame, textBox, color, cv::FILLED);cv::putText(frame, class_string, cv::Point(box.x + 5, box.y - 10), cv::FONT_HERSHEY_DUPLEX, 1, cv::Scalar(0, 0, 0));}
}
这个例子是ffmpeg读取的数据,用openVINO推理实现对每张图片进行实时推理,推理后的数据,按照顺序写入显示,保证数据顺序一致性。