生产环境使用boost::fiber

简介

boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。
同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对一些组件进行简单封装。

fiber封装

boost::fiber支持设置pthread和fiber的比例是1:n还是m:n,同时也支持设置调度方式是随机调度还是抢占调度。
本文中选择使用抢占式调度,并且是m:n的比例,这种选择适用面更加广。
既然pthread和fiber比例是m:n,那么这个m一般等于逻辑核数量,也就是需要设置fiber调度的线程控制在大小为固定的线程池中。fiber中抢占式调度方式也要求固定的线程池数量,外部前程加入时,可能会影响抢占式调度,即不能在外部线程中调用fiber,不然这个线程就加入到了fiber调度的pthread中了。

这时,需要一个设置一个队列,外部线程往这个队列中添加任务;内部线程池从队列中取任务,同时触发fiber,在fiber中可以继续触发fiber。触发队列、内部队列、工作线程、外部线程的关系如下图所示:

图片

运行逻辑被装箱到一个任务中,然后被添加到任务队列,这一步利用模板和上转型实现,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class IFiberTask {public:IFiberTask() = default;virtual ~IFiberTask() = default;IFiberTask(const IFiberTask& rhs) = delete;IFiberTask& operator=(const IFiberTask& rhs) = delete;IFiberTask(IFiberTask&& other) = default;IFiberTask& operator=(IFiberTask&& other) = default;virtual void execute() = 0;public:inline static std::atomic_size_t fibers_size {0};
};template <typename Func>
class FiberTask: public IFiberTask {public:explicit FiberTask(Func&& func) :func_{std::move(func)} { }~FiberTask() override = default;FiberTask(const FiberTask& rhs) = delete;FiberTask& operator=(const FiberTask& rhs) = delete;FiberTask(FiberTask&& other)  noexcept = default;FiberTask& operator=(FiberTask&& other)  noexcept = default;void execute() override {fibers_size.fetch_add(1);func_();fibers_size.fetch_sub(1);}private:Func func_;
};

IFiberTask是任务基类,不可拷贝;FiberTask是模板类,成员变量func_存储算子。使用IFiberTask类指针指向特化后的FiberTask对象,这时就实现的装箱操作,调用execute时,实际调用了子类的execute,触发封装的func_对象。

外部队列基于boost::fibers::buffered_channel实现,这是一个支持并发的队列,队列的元素类型为std::tuple<boost::fibers::launch, std::unique_ptr>,其中tuple第一元素存储任务的触发形式,进入队列还是立即触发。

接着是任务装箱,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template<typename Func, typename... Args>
auto Submit(boost::fibers::launch launch_policy, Func&& func, Args&&... args) {// 捕获lambda极其参数auto capture = [func = std::forward<Func>(func),args = std::make_tuple(std::forward<Args>(args)...)]() mutable {return std::apply(std::move(func), std::move(args));};// 任务的返回值类型using task_result_t = std::invoke_result_t<decltype(capture)>;// 该任务packaged_task的using packaged_task_t = boost::fibers::packaged_task<task_result_t()>;// 创建任务对象packaged_task_t task {std::move(capture)};// 装箱到FiberTask中using task_t = fiber::FiberTask<packaged_task_t>;// 获取packaged_task的futureauto result_future = task.get_future();// 添加到buffered_channel中auto status = work_queue_.push(std::make_tuple(launch_policy, std::make_unique<task_t>(std::move(task))));if (status != boost::fibers::channel_op_status::success) {return std::optional<std::decay_t<decltype(result_future)>> {};}return std::make_optional(std::move(result_future));
}

代码中,先捕获lambda表达式及其参数,获取返回值类型并添加到packaged_task中,然后装箱到FiberTask中,使用packaged_task获取future并返回,FiberTask对象添加到队列中,使用IFiberTask的指针指向这个对象,实现装箱操作。

接着是内部任务触发的逻辑,首先创建一个线程池,每个线程注册调度器,接着从队列中获取任务,触发fiber。
工作线程的执行函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 注册调度算法为抢占式调度
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(threads_size_, true);
// 创建task类型
auto task_tuple = typename decltype(work_queue_)::value_type {};// 从队列中获取任务
while(boost::fibers::channel_op_status::success == work_queue_.pop(task_tuple)) {// 解包auto& [launch_policy, task_to_run] = task_tuple;// 触发 fiber并detachboost::fibers::fiber(launch_policy, [task = std::move(task_to_run)]() {task->execute();}).detach();
}

抢占式调度在注册时需要指定线程池大小,这时不能在外部线程中调用fiber,因为调用fiber的时候会把该线程添加到fiber调度的线程中,也就调整了fiber的worker线程数量。

以上代码实现了fiber触发器、任务队列、工作线程池等逻辑。
理论上可以创建多个fiber调度组件对象,每个组件根据自己的需要设置资源情况。
但实际应用中,还是建议使用一个全局调度组件,因为当A调度器中的任务依赖B调度器的任务的同时,就会出现阻塞工作线程,影响实际性能。

下面封装一个全局调度器,提供递交任务的接口和结束调度的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DefaultPool {private:static auto* Pool() {const static size_t size = std::thread::hardware_concurrency();static fiber::FiberPool pool(size, size*8);return &pool;}public:template<typename Func, typename... Args>static auto SubmitJob(boost::fibers::launch launch_policy, Func &&func, Args &&... args) {return Pool()->Submit(launch_policy, std::forward<Func>(func), std::forward<Args>(args)...);}template<typename Func, typename... Args>static auto SubmitJob(Func &&func, Args &&... args) {return Pool()->Submit(std::forward<Func>(func), std::forward<Args>(args)...);}static void Close() {Pool()->CloseQueue();}private:DefaultPool() = default;
};

其他组件封装

上面对boost::fiber进行封装,得到一个能投入生产环境的调度器。
但是仅仅是这些是不够的,毕竟对于生产环境中的服务而言,外部服务、中间件的依赖是不能少的。
纤程是具有传染性的,对于外部组件提供的sdk,发送请求并进行同步等待会阻塞纤程对应的工作线程,影响整套机制。
为此,需要对现有的组件进行封装,对于同步接口,需要使用线程池配合fiber::promise;对于异步接口,可以改造成fiber::promise、future机制。下面介绍几种常用组件的fiber封装。

redis客户端封装

同步接口加线程池的方式将同步接口改造成异步接口的方案,存在较大的安全隐患。
线程池的容量不可控,当流量突然增加时,需要大量线程去等待,从而耗尽线程池资源,造成任务大量积压,服务崩溃。
而扩大线程池数量,又消耗了大量的资源。

综上,对于fiber化封装,还是建议采用异步接口。hiredis库支持异步接口,redis_plus_plus库对hiredis进行了c++封装,同时也提供了异步接口,本节将面向这个接口进行改造。

redis提供了挺多的接口,这里只对del、get、set三个接口做个示范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
template<typename Type>
using Promise = boost::fibers::promise<Type>;template<typename Type>
using Future = boost::fibers::future<Type>;Future<long long > Del(const StringView &key) {auto promise = std::make_unique<Promise<long long >>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.del(key, [promise =promise.release()](sw::redis::Future<long long > &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<OptionalString> Get(const StringView &key) {auto promise = std::make_unique<Promise<OptionalString>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.get(key, [promise = promise.release()](sw::redis::Future<OptionalString> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<bool> Set(const StringView &key, const StringView &val) {auto promise = std::make_unique<Promise<bool>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.set(key, val, [promise = promise.release()](sw::redis::Future<bool> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}

注意,redis_plus_plus对每个回调函数通过模板进行判断,因此无法使用mutable+移动捕获promise,只能使用指针赋值的方式实现。redis_plus_plus在1.3.6以后的版本才有回调函数机制,之前的版本不支持。
上面原理是,创建fiber的promise和future,然后让redis的回调函数中捕获promise,并在promise中对数据进行赋值。而外部使用fiber的future进行等待,并不会阻塞工作线程。

grpc客户端封装

跟上面的redis客户端类似,这里也建议对grpc的异步客户端进行改造,支持fiber的promise、future机制。
grpc的异步客户端需要牵扯到grpc::CompletionQueue,里面实现了一套poll engine,需要绑定一个线程去进行epoll_wait操作。首先定义一个GrpcClient类,包含四个成员变量、两个成员函数,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class GrpcClient {public:explicit GrpcClient(const ClientOption& option);~GrpcClient();// 对外提供的接口Future<meta::HelloResponse> Call(const meta::HelloRequest& request);private:// worker线程执行的逻辑void Work();private:std::unique_ptr<grpc::CompletionQueue> completion_queue_;std::thread worker_;std::shared_ptr<grpc::Channel> channel_;gpr_timespec timespec_{};
};

异步客户端分为三个部分逻辑,第一个是请求发送(Call函数),第二个是io线程批量处理,第三个是外部等待Future。
为了能够让io线程里给Promise进行赋值,需要Call函数中将Promise及其相关上下文传递到io线程中,这里定义一个上下文结构体:

1
2
3
4
5
6
struct CallData {grpc::ClientContext context;          // grpc上下文Promise<meta::HelloResponse> promise; // Promise对象grpc::Status status;                  // grpc调用状态meta::HelloResponse response;         // 相应包
};

Call函数中的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 创建上下文对象
auto data = new CallData;
// 设置超时时间
data->context.set_deadline(timespec_);
// 创建桩
meta::HelloService::Stub stub(channel_);
auto future = data->promise.get_future();
// 异步调用,添加到完成队列中
auto rpc = stub.Asynchello(&data->context, request, completion_queue_.get());
// 绑定response、status,并将上下文对象作为tag传下去
rpc->Finish(&data->response, &data->status, reinterpret_cast<void*>(data));
return future;

data对象在该函数中创建,在Work函数中释放,不存在内存泄漏问题。
grpc的异步稍微有点麻烦,发送之后,还要绑定数据。
接着是Work线程中的逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CallData* data = nullptr;
bool ok = false;
// 获取状态完毕的数据
while (completion_queue_->Next((void**)&data, &ok)) {// 判断队列是否已经结束if (!ok) {break;}// 如果grpc状态ok,则赋值if (data->status.ok()) {data->promise.set_value(std::move(data->response));} else {// 否则设置异常data->promise.set_exception(std::make_exception_ptr(std::runtime_error(data->status.error_message())));}// 删除数据delete data;data = nullptr;
}

调用完成队列的Next函数会阻塞,如果队列中存在状态达到最终状态的数据,则返回一条。从完成对于中取到的数据的顺序与入队顺序不同。

上面两个函数组合实现了Future获取和Promise赋值的操作,使得grpc客户端能在fiber中使用。

参考

  • fiberpool代码
  • 生产环境使用fiber
  • grpc异步客户端
  • hiredis
  • 生产环境使用boost::fiber

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/158274.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库深入浅出,数据库介绍,SQL介绍,DDL、DML、DQL、TCL介绍

一、基础知识&#xff1a; 1.数据库基础知识 数据(Data)&#xff1a;文本信息(字母、数字、符号等)、音频、视频、图片等&#xff1b; 数据库(DataBase)&#xff1a;存储数据的仓库&#xff0c;本质文件&#xff0c;以文件的形式将数据保存到电脑磁盘中 数据库管理系统(DBMS)&…

postgresql 触发器如何生成递增序列号,从1开始,并且每天重置

大家好&#xff0c;我是三叔&#xff0c;许久不见&#xff0c;这期给大家介绍一下笔者在开发中遇到的业务处理&#xff1a;pgsql 创建触发器生成每日递增序列&#xff0c;并且第二天重置&#xff0c;根据不同的用户进行不同的控制。 1.创建生成递增序列的 table 表 -- 创建us…

springboot之拦截器、servlet过滤器

一 使用maven新建Spring Boot项目 1. File --> New --> Project... --> Maven &#xff0c;如下图所示 Project SDK下拉列表框中选择前面安装的 Java1.8&#xff0c;如果下拉列表框中不存在Java 1.8&#xff0c;可以单击New按钮&#xff0c;找到安装Java的位置&…

第02章-变量与运算符

1 关键字 关键字&#xff1a;被Java语言赋予了特殊含义&#xff0c;用作专门用途的字符串&#xff08;或单词&#xff09;。如class、public、static、void等&#xff0c;这些单词都被Java定义好了&#xff0c;称为关键字。 特点&#xff1a;关键字都是小写字母&#xff1b;官…

CSS与基本选择器

<div class"c1" id"d1"></div> CSS基本知识 什么是css&#xff1a;CSS&#xff08;Cascading Style Sheet&#xff0c;层叠样式表)定义如何显示HTML元素。 当浏览器读到一个样式表&#xff0c;他就会按照这个样式l来进行渲染。其实就是让HT…

Langchain-Chatchat项目:4.1-P-Tuning v2实现过程

常见参数高效微调方法(Parameter-Efficient Fine-Tuning&#xff0c;PEFT)有哪些呢&#xff1f;主要是Prompt系列和LoRA系列。本文主要介绍P-Tuning v2微调方法。如下所示&#xff1a; Prompt系列比如&#xff0c;Prefix Tuning(2021.01-Stanford)、Prompt Tuning(2021.09-Goo…

音视频rtsp rtmp gb28181在浏览器上的按需拉流

按需拉流是从客户视角来看待音视频的产品功能&#xff0c;直观&#xff0c;好用&#xff0c;为啥hls flv大行其道也是这个原因&#xff0c;不过上述存在的问题是延迟没法降到实时毫秒级延迟&#xff0c;也不能随心所欲的控制。通过一段时间的努力&#xff0c;结合自己闭环技术栈…

基于Java(SpringBoot框架)毕业设计作品成品(35)AI人工智能毕业设计AI图像卡通动漫化图像风格迁移系统设计与实现

博主介绍&#xff1a;《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者&#xff0c;CSDN博客专家&#xff0c;在线教育专家&#xff0c;CSDN钻石讲师&#xff1b;专注大学生毕业设计教育和辅导。 所有项目都配有从入门到精通的基础知识视频课程&#xff0c;免费 项…

Rust 语言常见的一些概念(下)

目录 1、函数 参数 语句和表达式 具有返回值的函数 2、注释 文档注释 多行注释 3、控制流 3.1 if 表达式 3.2 使用esle if 处理多重条件 3.3 在 let 语句中使用 if 3.4 使用循环重复执行 使用 loop 重复执行代码 从循环中返回值 循环标签&#xff1a;在多个循环…

【PyQt学习篇 · ⑨】:QWidget -控件交互

文章目录 是否可用是否显示/隐藏是否编辑是否为活跃窗口关闭综合案例信息提示状态提示工具提示“这是什么”提示 焦点控制单个控件角度父控件角度 是否可用 setEnabled(bool)&#xff1a;该函数用于设置QWidget控件的可用性&#xff0c;参数bool为True表示该控件为可用状态&…

08-Docker-网络管理

Docker 在网络管理这块提供了多种的网络选择方式&#xff0c;他们分别是桥接网络、主机网络、覆盖网络、MACLAN 网络、无桥接网络、自定义网络。 1-无桥接网络&#xff08;None Network&#xff09; 当使用无桥接网络时&#xff0c;容器不会分配 IP 地址&#xff0c;也不会连…

Day18力扣打卡

打卡记录 寻找重复数&#xff08;双指针&#xff09; 链接 Floyd判圈法&#xff0c;先用快慢指针以不同速率进行移动&#xff0c;最终一定会出现相遇点&#xff0c;然后在使一指针从初始开始&#xff0c;两指针再以同步调移动&#xff0c;再次相遇的点一定为循环开始的点位。 …