ThreadPool解析

news/2025/1/17 13:42:23/文章来源:https://www.cnblogs.com/haruu/p/18676776

Thread_Pool 项目解析

简介

ThreadPool 是一个轻量级的 C++ 线程池实现,旨在简化多线程编程。

项目分析

我们首先上github的项目地址:https://github.com/progschj/ThreadPool,然后克隆项目到本地。

点开项目的ThrealPool.h文件,查看源码:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>class ThreadPool {
public:ThreadPool(size_t);template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();
private:// need to keep track of threads so we can join themstd::vector< std::thread > workers;// the task queuestd::queue< std::function<void()> > tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads):   stop(false)
{for(size_t i = 0;i<threads;++i)workers.emplace_back([this]{for(;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for(std::thread &worker: workers)worker.join();
}#endif

类成员分析

接下来,我们一步一步分析源代码。

在整个文件中只定义一个类ThreadPool,它的类成员有:

    std::vector< std::thread > workers;//存储处理任务的线程std::queue< std::function<void()> > tasks;//存储任务的队列std::mutex queue_mutex; // 互斥锁std::condition_variable condition; // 条件变量,和上面的互斥锁保证多线程的同步和互斥bool stop; // 线程池的是否停止的标志

ThreadPool初始化

先上代码:

inline ThreadPool::ThreadPool(size_t threads):   stop(false)
{for(size_t i = 0;i<threads;++i)workers.emplace_back([this]{for(;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}

ThreadPool 的初始化需传入一个参数threads,且将stop赋值为0.

接着往workers里加入threads个线程,每个线程都执行死循环:

            for(;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}

在循环中,先定义锁,再调用condition.wait()方法,当线程池运行且任务队列为空时,线程堵塞,否则线程继续运行,然后当线程池停止且任务队列为空时,跳出循环,结束线程。否则从取出任务队列的第一个任务,执行任务。

ThreadPool enqueue 加入队列

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;
}

enqueue 方法是模板函数,传入可调用对象F和任意数量的的参数args,,返回一个future对象,返回线程异步操作的结果。

using return_type = typename std::result_of<F(Args...)>::type;

首先,定义返回类型return_type,表示传入的可调用对象的返回值的类型。

auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

程序创建智能指针task,其指向了一个使用bind绑定的可调用对象(该对象调用f,并传入参数args),再使用packaged_task包装成可调用对象。创建智能指针的目的是为了其他线程的使用。

std::future<return_type> res = task->get_future();
{std::unique_lock<std::mutex> lock(queue_mutex);if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });
}

使用res保存任务线程的异步结果,并作为返回值。
然后在代码块中使用互斥锁加锁,然后将任务加入任务队列中。
最后通知线程池中的一个线程处理任务并返回res。

ThreadPool 析构函数

看注释就可以了:

inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);        stop = true;//表示线程池停止。}condition.notify_all();                 // 通知所有线程for(std::thread &worker: workers)worker.join();                      // 等待所有线程结束
}

总结:

ThreadPool 的运行步骤可以分为以下几步:

  1. 创建ThreadPool对象,传入线程池工作线程数量。在线程池中填加工作线程,并堵塞等待任务线程的通知。
  2. 调用enqueue方法,传入可调用对象和参数。在该方法中,enqueue先通过一系列操作调整传入的参数,再将其加入任务队列。
  3. 以上操作完成后,通知线程池中的一个线程处理任务。在线程池中取出任务队列的当前最先进来的任务处理。
  4. 处理完任务将结果保存到enqueue里的异步返回结果的future对象中,并通过enqueue返回。
  5. ThreadPool对象被销毁时,将标志stop设置为true,并会通知所有堵塞线程,等待线程池中的所有线程结束。
    ThreadPool 实现简单的线程池,使用简单的先进先出策略调度任务,如果可以使用更加复杂的策略,我们可以自己修改代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/870754.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[docker逃逸] 使用DirtyPipe漏洞逃逸

本文作者CVE-柠檬i CSDN:https://blog.csdn.net/weixin_49125123 博客园:https://www.cnblogs.com/CVE-Lemon 微信公众号:Lemon安全 前言 本文使用代码下载链接:利用CVE-2022-0847 (Dirty Pipe) 实现容器逃逸 (github.com) 由于本人才疏学浅,本文不涉及漏洞原理,仅有复现…

RestAPI实现聚合

API语法 聚合条件与query条件同级别,因此需要使用request.source()来指定聚合条件。聚合的结果解析:@Override public Map<String, List<String>> filters(RequestParams params) {try {// 1.准备RequestSearchRequest request = new SearchRequest("hotel&…

elasticsearch之数据聚合

**聚合(aggregations)**可以让我们极其方便的实现对数据的统计、分析、运算。例如:什么品牌的手机最受欢迎? 这些手机的平均价格、最高价格、最低价格? 这些手机每月的销售情况如何?实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近实时搜索效…

【通讯协议】OPC协议

OPC通讯协议 特点:支持多种数据结构和负责数据类型,需要多的硬件和软件资源,成本较高,安全性较高。 应用场景:连接多个不同工业自动化设备 什么是OPC通讯协议 OPC是英文“OLE for Process Control”的缩写,是工业自动化领域中的一种工业通信标准。它通过定义一些在不同平…

海外泼天流量|浅谈全球化技术架构

本文对海外泼天流量现状做了快速整理,旨在抛砖引玉,促进国内企业在出海过程中,交流如何构建全球化技术架构的落地经验,相信会有越来越多资深人士分享更深层次的实践。 登陆小红书,搜索 refugee,你就能看到一个不一样的小红书。随机点击几个,让大数据记住你,就能持续看到…

绿联网卡

目录1: 安装2:检查3:常见问题网络连接有网卡,状态为已禁用 1: 安装插入电脑 弹窗“Setup.exe”,安装驱动, 如果没有驱动,则找到 Ugreen Wireless进行驱动安装。驱动安装成功后效果2:检查驱动安装好后,u盘插拔一下,观察确定是哪个WLAN3:常见问题 网络连接有网卡,状态为…

kali安装教程

kali和GNOME桌面安装教程 kali下载 https://www.kali.org/get-kali/ 到kali官网,下载镜像安装下载完应该是:kali-linux-2024.4-installer-amd64.iso 然后新建虚拟机选择稍后安装操作系统:选择如图所示操作系统 后面的,我都给的挺多,主要不想它卡,哈哈哈网络选择NAT就行,…

车辆拥堵交通事故识别系统

车辆拥堵交通事故识别系统通过在关键路段部署监控摄像机,车辆拥堵交通事故识别系统借助 YOLOv8 算法的强大目标检测能力,能够精准识别出车辆、行人等交通参与者。一旦发生车相撞、车辆倾翻、骑车倾翻、路面有人摔倒或打架等异常事件,系统便迅速触发抓拍预警。系统还具备灵活…

[rustGUI][iced]基于rust的GUI库iced(0.13)的部件学习(04):实现窗口主题(颜色)变换(暨menu菜单的使用)

前言 本文是关于iced库的部件介绍,iced库是基于rust的GUI库,作者自述是受Elm启发。 iced目前的版本是0.13.1,相较于此前的0.12版本,有较大改动。 本合集是基于新版本的关于分部件(widget)的使用介绍,包括源代码介绍、实例使用等。 环境配置 系统:window10 平台:visual…

网络认证

网络认证 网络认证概述 网络认证:Windows网络认证是指在Windows操作系统中进行网络通信和资源访问时,验证用户身份和授权权限的过程。它确保只有经过身份验证的用户能够访问网络资源,并根据其权限级别进行授权操作。 网络认证有哪些? 1.用户名和密码认证:这是最常见的认证…

ffmpeg简易播放器(1)--了解视频格式

视频帧 对于一份视频,实质上是多张图片高速播放形成的。每一张图片即为该视频的一帧。而每秒钟播放的图片张数便为所谓的帧率(Frame Rate/Frame Per Second)。常见的帧率有24fps(即一秒播放24张图片),60fps(一秒播放60张图片)等。也就是说,对于一个时长60秒的图片,如果帧率…

Issac Gym出现error: subprocess-exited-with-error报错

1. 前言 一方面便于日后自己的温故学习,另一方面也便于大家的学习和交流。 如有不对之处,欢迎评论区指出错误,你我共同进步学习! 2. 正文 我在安装humanoid gym pip install -e .的时候,出现下列问题:解决方法: pip install --upgrade setuptools没解决就先卸载setupt…