workflow源码解析:ThreadTask

1、使用程序,一个简单的加法运算程序

#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。// 定义INPUT
struct AddInput
{int x;int y;
};// 定义OUTPUT
struct AddOutput
{int res;
};// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{output->res = input->x + input->y;
}using AddTask = WFThreadTask<AddInput, AddOutput>;void callback(AddTask *task)
{auto *input = task->get_input();auto *output = task->get_output();assert(task->get_state() == WFT_STATE_SUCCESS);fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}int main()
{using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;AddTask *task = AddFactory::create_thread_task("add_task",add_routine,callback);AddInput *input = task->get_input();input->x = 1;input->y = 2;task->start();getchar();return 0;
}

2、类继承关系

WFThreadTaskFactory代码

// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:using T = WFThreadTask<INPUT, OUTPUT>;...
public:static T *create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (T *)> callback);...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),WFGlobal::get_compute_executor(),std::move(routine),std::move(callback));
}

__WFThreadTask代码

// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:virtual void execute()  //实现ExecSession的纯虚函数{this->routine(&this->input, &this->output); //执行用户程序的routine}protected:std::function<void (INPUT *, OUTPUT *)> routine;public:__WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (INPUT *, OUTPUT *)>&& rt,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),routine(std::move(rt)){}
};

WFThreadTask代码

// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:void start();void dismiss();INPUT *get_input() { return &this->input; }OUTPUT *get_output() { return &this->output; }void *user_data;int get_state() const { return this->state; }int get_error() const { return this->error; }void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:virtual SubTask *done();protected:INPUT input;OUTPUT output;std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;public:WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :ExecRequest(queue, executor),callback(std::move(cb)){// 初始化}protected:virtual ~WFThreadTask() { }
};

ExecRequest代码

// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:ExecRequest(ExecQueue *queue, Executor *executor);ExecQueue *get_request_queue() const { return this->queue; }void set_request_queue(ExecQueue *queue) { this->queue = queue; }virtual void dispatch()  // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口{this->executor->request(this, this->queue);...}protected:int state;int error;ExecQueue *queue;Executor *executor;protected:virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};

SubTask代码

class SubTask
{// 子任务被调起的时机virtual void dispatch() = 0;// 子任务执行完成的时机virtual SubTask *done() = 0;// 内部实现,决定了任务流走向void subtask_done();...
};

ExecSession代码

/src/kernel/Executor.h
class ExecSession
{
private:virtual void execute() = 0;virtual void handle(int state, int error) = 0;protected:ExecQueue *get_queue() { return this->queue; }private:ExecQueue *queue;...
};

继承关系图

__WFThreadTask__目前还未用到,暂不清楚

在这里插入图片描述

3、两个重要成员: ExecQueue, Executor

ExecQueue代码

/src/kernel/Executor.h
class ExecQueue
{...
private:struct list_head task_list;pthread_mutex_t mutex;
};

Executor代码

/src/kernel/Executor.h
class Executor
{
public:// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中int request(ExecSession *session, ExecQueue *queue);private:// 执行器和系统资源,是一个包含关系thrdpool_t *thrdpool;
};

request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine

// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{ExecSessionEntry *entry = new ExecSessionEntry;session->queue = queue;entry->session = session;entry->thrdpool = this->thrdpool;queue->mutex.lock();list_add_tail(&entry->list, &queue->session_list);if (queue->session_list.next == &entry->list){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/if (thrdpool_schedule(&task, this->thrdpool) < 0){list_del(&entry->list);delete entry;entry = NULL;}}queue->mutex.unlock();return -!entry;
}
struct ExecSessionEntry
{struct list_head list;ExecSession *session;thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{ExecQueue *queue = (ExecQueue *)context;ExecSessionEntry *entry;ExecSession *session;queue->mutex.lock();entry = list_entry(queue->session_list.next, ExecSessionEntry, list);list_del(&entry->list);session = entry->session;if (!list_empty(&queue->session_list)){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/__thrdpool_schedule(&task, entry, entry->thrdpool);}elsedelete entry;queue->mutex.unlock();session->execute(); //这里会执行到用户routinesession->handle(ES_STATE_FINISHED, 0);
}

4、参考链接

https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/410073.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

提升线上会议效率,解决Teams会议中常见网络问题

在企业组网场景中&#xff0c;在线会议是混合办公、跨地区办公模式下很重要的协作沟通手段&#xff0c;而在线会议如Teams这类应用对网络的实时性和即时性要求非常高&#xff0c;网络频繁中断、接入速度慢、登不进去等问题分分钟加剧用户的不满&#xff0c;导致汇报失败或者是交…

将图片添加到 PDF 的 5 种方法

需要一种称为 PDF 编辑器的特定工具才能将图片添加到 PDF。尽管大多数浏览器在查看和注释 PDF 文件方面都非常出色&#xff0c;但如果您使用图像到 PDF 技术&#xff0c;则只能将照片放入 PDF 中。无需修改即可将 PDF 文件恢复为原始格式的能力是使用此类软件程序甚至在线服务的…

Java字符串替换方法:替换指定字符串之前的内容

Java字符串替换方法&#xff1a;替换指定字符串之前的内容 在开发过程中&#xff0c;有时我们需要在字符串中找到指定的子字符串&#xff0c;然后替换该子字符串之前的内容。在这篇博客中&#xff0c;我们将演示如何使用Java编写一个方法来实现这个需求。 1. 编写替换方法 首先…

C语言总结十一:自定义类型:结构体、枚举、联合(共用体)

本篇博客详细介绍C语言最后的三种自定义类型&#xff0c;它们分别有着各自的特点和应用场景&#xff0c;重点在于理解这三种自定义类型的声明方式和使用&#xff0c;以及各自的特点&#xff0c;最后重点掌握该章节常考的考点&#xff0c;如&#xff1a;结构体内存对齐问题&…

Git版本控制——分支

分支 几乎所有的版本控制系统都以某种形式支持分支。 使用分支意味着可以把工作从开发主线上分离开来进行重大的Bug修改、开发新的功能&#xff0c;以免影响开发主线。 查看本地分支 git branch创建本地分支 git branch 分支名切换分支(checkout) git checkout 分支名创建…

项目架构之Zabbix部署

1 项目架构 1.1 项目架构的组成 业务架构&#xff1a;客户端 → 防火墙 → 负载均衡&#xff08;四层、七层&#xff09; → web缓存/应用 → 业务逻辑&#xff08;动态应用&#xff09; → 数据缓存 → 数据持久层 运维架构&#xff1a;运维客户端 → 跳板机/堡垒机&#x…

centos7配置时间同步网络时间

centos7配置时间同步网络时间 1、安装 NTP 工具。 sudo yum install -y ntp2启动 NTP 服务。 sudo systemctl start ntpd3、将 NTP 服务设置为开机自启动。 sudo systemctl enable ntpd4、验证 date

序列到序列模型

一.序列到序列模型的简介 序列到序列&#xff08;Sequence-to-Sequence&#xff0c;Seq2Seq&#xff09;模型是一类用于处理序列数据的深度学习模型。该模型最初被设计用于机器翻译&#xff0c;但后来在各种自然语言处理和其他领域的任务中得到了广泛应用。 Seq2Seq模型的核…

C++(1) —— 基础语法入门

目录 一、C初识 1.1 第一个C程序 1.2 注释 1.3 变量 1.4 常量 1.5 关键字 1.6 标识符命名规则 二、数据类型 2.1 整型 2.2 sizeof 关键字 2.3 实型&#xff08;浮点型&#xff09; 2.4 字符型 2.5 转义字符 2.6 字符串型 2.7 布尔类型 bool 2.8 数据的输入 三…

SQL进阶3

二、多表连结 1、什么叫联结 下面&#xff0c;我们举个例子来说明&#xff1a; 学校的安排的课程信息&#xff0c;我们平时都会为主要人员负责的对应课程信息创建表格&#xff0c;让其更好地检索得到对应数据信息。学生可以查到自己本身的课程信息&#xff0c;而老师也可以查…

游戏素材永不缺,免费在线AI工具Scenario功能齐全,简单易用

Scenario是一个在线的AI驱动的工具&#xff0c;主要用于游戏艺术创作。它提供了一套全面的功能&#xff0c;旨在帮助游戏开发者创建与其独特风格和艺术方向相符的独特、高质量的游戏艺术。Scenario的突出特点之一是它的微调能力&#xff0c;允许用户根据独特的风格和艺术方向训…

GitLab Runner 实现项目 CI/CD 发布

Gitlab Runner简介 Gitlab实现CICD的方式有很多&#xff0c;比如通过Jenkins&#xff0c;通过Gitlab Runner等&#xff0c;今天主要介绍后者。Gitlab在安装的时候&#xff0c;就默认包含了Gitlab CI的能力&#xff0c;但是该能力只是用于协调作业&#xff0c;并不能真的去执行…