HiJobQueue:一个简单的线程安全任务队列
概述
HiJobQueue
是一个线程安全的任务队列,用于在多线程环境中管理和执行异步任务。它的设计参考了 Cobalt 项目中的 JobQueue,并做了适当的简化。HiJobQueue
提供了任务推送(push
)、任务弹出(pop
)、队列退出(quit
)等功能,适用于需要异步任务调度的场景。
核心功能
-
线程安全:
- 使用
std::mutex
和std::condition_variable
实现线程安全的任务队列。
- 使用
-
任务调度:
- 支持任务的异步推送和弹出。
-
退出机制:
- 提供
quit()
方法,用于安全地停止任务队列。
- 提供
-
跨平台:
- 使用 C++ 标准库实现,不依赖平台特定的 API。
实现代码
以下是 HiJobQueue
的实现代码:
#pragma once#include <mutex>
#include <functional>
#include <queue>
#include <condition_variable>/*** @brief 线程安全的任务队列,用于管理和执行异步任务。*/
class HiJobQueue final {
public:using Job = std::function<void()>; // 任务类型public:HiJobQueue() : is_exit_(false) {}/*** @brief 推送任务到队列。* @param job 要执行的任务。* @return 如果队列已退出,返回 false;否则返回 true。*/bool push(Job job);/*** @brief 从队列中弹出任务。* @param job 用于存储弹出的任务。* @return 如果队列为空且已退出,返回 false;否则返回 true。*/bool pop(Job& job);/*** @brief 获取队列中的任务数量。* @return 队列中的任务数量。*/size_t size();/*** @brief 退出队列,停止任务处理。*/void quit();/*** @brief 检查队列是否已退出。* @return 如果队列已退出,返回 true;否则返回 false。*/bool is_quited();// 禁用拷贝构造函数和赋值运算符HiJobQueue(HiJobQueue&) = delete;HiJobQueue(const HiJobQueue&) = delete;private:bool is_exit_; // 队列退出标志std::mutex mutex_; // 互斥锁,保护队列访问std::condition_variable cond_; // 条件变量,用于任务通知std::queue<Job> queue_; // 任务队列
};// 实现bool HiJobQueue::push(Job job) {std::lock_guard<std::mutex> locker(mutex_);if (is_exit_) {return false;}queue_.push(std::move(job));cond_.notify_one();return true;
}bool HiJobQueue::pop(Job& job) {std::unique_lock<std::mutex> locker(mutex_);cond_.wait(locker, [this]() { return is_exit_ || !queue_.empty(); });if (is_exit_ && queue_.empty()) {return false;}job = std::move(queue_.front());queue_.pop();return true;
}size_t HiJobQueue::size() {std::lock_guard<std::mutex> locker(mutex_);return queue_.size();
}void HiJobQueue::quit() {std::lock_guard<std::mutex> locker(mutex_);is_exit_ = true;cond_.notify_all();
}bool HiJobQueue::is_quited() {std::lock_guard<std::mutex> locker(mutex_);return is_exit_;
}
测试用例
为了验证 HiJobQueue
的正确性和线程安全性,我们设计了以下测试用例:
测试代码
#include <gtest/gtest.h>
#include <future>
#include <atomic>
#include <thread>
#include <chrono>
#include "hi_job_queue.h"class TestCls {
public:void test(const char* text, int i) {printf("%s-%d\n", text, i);}
};TEST(HiJobQueueTest, ConcurrentPushPop) {HiJobQueue queue;TestCls cls;std::atomic<int> job_count{0}; // 用于统计执行的任务数量// 启动两个线程消费任务auto f1 = std::async(std::launch::async, [&] {HiJobQueue::Job job;while (queue.pop(job)) {job();job_count++;}});auto f2 = std::async(std::launch::async, [&] {HiJobQueue::Job job;while (queue.pop(job)) {job();job_count++;}});// 启动两个线程生产任务auto f3 = std::async(std::launch::async, [&] {for (int i = 0; i < 200; i++) {queue.push(std::bind(&TestCls::test, &cls, "test1", i));std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 跨平台休眠}});auto f4 = std::async(std::launch::async, [&] {for (int i = 0; i < 200; i++) {queue.push(std::bind(&TestCls::test, &cls, "test2", i));std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 跨平台休眠}});// 等待生产任务完成f3.wait();f4.wait();// 退出队列queue.quit();// 等待消费任务完成f1.wait();f2.wait();// 验证所有任务被执行EXPECT_EQ(job_count.load(), 400); // 200 (test1) + 200 (test2)
}TEST(HiJobQueueTest, QuitBehavior) {HiJobQueue queue;// 启动一个线程消费任务auto consumer = std::async(std::launch::async, [&] {HiJobQueue::Job job;while (queue.pop(job)) {job();}});// 推送一些任务for (int i = 0; i < 10; i++) {queue.push([]() {});}// 退出队列queue.quit();// 等待消费线程结束consumer.wait();// 验证队列已退出EXPECT_TRUE(queue.is_quited());// 验证退出后不能再 push 任务EXPECT_FALSE(queue.push([]() {}));
}TEST(HiJobQueueTest, EmptyQueueBehavior) {HiJobQueue queue;// 验证队列为空时的 pop 行为HiJobQueue::Job job;EXPECT_FALSE(queue.pop(job));// 退出队列queue.quit();// 验证退出后 pop 行为EXPECT_FALSE(queue.pop(job));
}
测试用例说明
-
ConcurrentPushPop
:- 测试多线程环境下
push
和pop
的并发行为。 - 验证所有任务是否被正确执行。
- 测试多线程环境下
-
QuitBehavior
:- 测试队列退出时的行为。
- 验证退出后是否不再接受新任务。
-
EmptyQueueBehavior
:- 测试队列为空时的行为。
- 验证退出后
pop
的行为。
适用场景
HiJobQueue
适用于以下场景:
-
多线程任务调度:
- 在需要将任务分发到多个工作线程执行的场景中,
HiJobQueue
可以作为任务调度器使用。 - 例如:线程池中的任务队列。
- 在需要将任务分发到多个工作线程执行的场景中,
-
事件驱动架构:
- 在事件驱动的系统中,
HiJobQueue
可以用于存储和处理事件。 - 例如:GUI 应用中的事件队列。
- 在事件驱动的系统中,
-
异步任务处理:
- 在需要异步执行任务的场景中,
HiJobQueue
可以用于存储任务并由后台线程处理。 - 例如:日志系统的异步写入。
- 在需要异步执行任务的场景中,
-
生产者-消费者模型:
- 在生产者-消费者模型中,
HiJobQueue
可以作为共享的任务缓冲区。 - 例如:多线程下载任务的分发。
- 在生产者-消费者模型中,
优缺点分析
优点
-
线程安全:
- 使用
std::mutex
和std::condition_variable
确保多线程环境下的安全性。
- 使用
-
简单易用:
- 提供了简洁的接口(
push
、pop
、quit
),易于集成到现有项目中。
- 提供了简洁的接口(
-
跨平台:
- 基于 C++ 标准库实现,不依赖平台特定的 API,具有良好的可移植性。
-
退出机制:
- 提供
quit()
方法,可以安全地停止任务队列,避免资源泄漏。
- 提供
-
轻量级:
- 代码简洁,性能开销小,适合对性能要求较高的场景。
缺点
-
功能单一:
- 仅支持基本的任务队列功能,不支持优先级调度或任务取消。
-
性能瓶颈:
- 在高并发场景下,
std::mutex
可能成为性能瓶颈。 - 如果需要更高的性能,可以考虑无锁队列(如
boost::lockfree::queue
)。
- 在高并发场景下,
-
任务类型限制:
- 任务类型为
std::function<void()>
,不支持返回值或参数传递。 - 如果需要更复杂的任务类型,需要自行扩展。
- 任务类型为
-
缺乏任务状态管理:
- 不支持任务的状态管理(如任务完成通知或错误处理)。
总结
HiJobQueue
是一个简单但功能强大的线程安全任务队列,适用于多线程环境中的异步任务调度。通过参考 Cobalt 项目中的 JobQueue,我们实现了一个更轻量级的版本,并通过单元测试验证了其正确性和线程安全性。希望这篇文章能帮助你理解和使用 HiJobQueue
。