海豚调度清理:使用 API 轻松清理工作流历史版本记录,一键减少关系日志和任务定义日志表的数据量

💡  本系列文章是 DolphinScheduler 由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。

推荐阅读:

  • 海豚调度监控:使用图关系解决核心链路告警问题,减轻任务运维负担,用户五星好评!

  • 海豚调度异常处理 | 使用 arthas 在内存中删除启动失败的工作流,无需修改代码!

  • 海豚调度监控:新增依赖缺失巡检,上游改动再也不用担心了!

祝开卷有益 😃

大家好,我是小陶,今天是清理调度数据的第二篇文章,之前分享过如何使用API清理工作流实例和任务实例,可以看这篇文章:海豚调度清理:使用 API 轻松清理历史工作流实例以及日志文件

我们知道 DolphinScheduler 的工作流是有版本控制的,每一次更新任务、添加任务、修改任务等等操作,都会生成一个新的版本号,同时 process_definition_log 和 process_task_relation_log 的数据也会增加,久而久之,会积累大量的"无用数据",MySQL 的记录越来越多,会影响调度的服务,进而影响用户使用体验和 MySQL 服务。

来看一个例子,往下看。👇👇👇

如下图所示,该工作流随着迭代,已经积累了 600 多个版本,我们用了这么长时间的调度,没有发生过需要切换历史版本的情况,历史的版本数据基本都算做“无用”数据了,同时为了保持稳定性,和数仓同学协商,只保留最近 20 个版本

file

所以,需要清理以上历史版本记录,保证页面影响速度和 MySQL 服务。

清理调度任务历史版本记录,依然是使用API的方式,直接操作数据库风险比较高。

本文的内容也比较简单,先是说明 API 的逻辑,最后再介绍如何使用一个 Python 脚本来调用 API 删除历史版本记录。

1.API 逻辑介绍

DolphinScheduler 本身提供了删除版本记录的接口,请求类型:DELETE,接口地址:process-definition/{dag_code}/versions/{version} ,接口逻辑比较简单,这里就不赘述了。

2.使用 Python 脚本调用API

Python脚本的逻辑比较简单,使用了4个API,按照顺序是:

1.获取项目列表
2.获取工作流列表
3.获取当前工作流版本信息列表
4.删除历史版本

第三步,需要注意的是,获取版本信息列表的时候,指定了分页大小是 20 ,从第二页开始。因为我们要保留最近的 20 个版本记录。

入参:无

Python 环境 2.7

具体的代码如下:

#!/usr/bin/python
# -*- coding: utf8 -*-
## 清理调度任务历史版本记录,依然是使用API的方式,直接操作数据库风险比较高。
## 会减少 process_definition_log 和 process_task_relation_log 的数据。import io
import subprocess
import requests
import json
import time
import datetime# 配置信息: ip 端口 token自行修改
base_url = 'http://xxxx:xxxx'
token = 'xxxxx'
# 获取项目列表
def get_project_list():
    url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=&_t=0.3741042528841678".format(base_url=base_url)
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    response = requests.request("GET", url, headers=headers, data=payload)
    response_data = json.loads(response.text)
    totalList = response_data['data']['totalList']
    return totalList# 获取工作定义列表
def get_definition_detail(project_code):
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    all_data = []
    pageNo = 1
    while True:
        url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition?searchVal=&pageSize=50&pageNo={pageNo}".format(project_code=project_code,pageNo=pageNo,base_url=base_url)
        response = requests.request("GET", url, headers=headers, data=payload)
        response_data = json.loads(response.text)
        page_data = response_data['data']['totalList']
        totalPage = response_data['data']['totalPage']        if len(page_data) == 0:
            print('工作定义列表为空,退出循环...')
            break
        all_data.extend(page_data)        if pageNo >= totalPage:
            print('工作定义列表到头了,退出循环...')
            break
        pageNo += 1
    # 返回全部数据
    return all_data# 获取工作定义的版本信息列表,注意,这里从第二页开始!!!size是 20
def get_version_detail(project_code,dag_code,current_version):
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }    all_version = []
    pageNo = 2    while True:
        if pageNo <= 1:
            print('获取工作定义的版本信息列表,pageNo 必须大于1!!!')
            break        url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions?searchVal=&pageSize=20&pageNo={pageNo}".format(project_code=project_code,dag_code=dag_code,pageNo=pageNo,base_url=base_url)
        response = requests.request("GET", url, headers=headers, data=payload)
        response_data = json.loads(response.text)
        page_data = response_data['data']['totalList']
        totalPage = response_data['data']['totalPage']        if len(page_data) == 0:
            print('version列表为空,退出循环...')
            break        for page in page_data:
            version = int(page['version'])
            # 保留近20个版本
            if version + 20 <= current_version:
                all_version.append(version)        if pageNo >= totalPage:
            print('version列表到头了,退出循环...')
            break        pageNo += 1    # TODO 分析all_data里面是否包含 current_version    # 返回正常的数据
    return all_versiondef delete(project_code,dag_code,version):
    print('即将删除的项目,工作流以及版本')
    print(project_code)
    print(dag_code)
    print(version)
    url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions/{version}".format(project_code=project_code,dag_code=dag_code,version=version,base_url=base_url)
    # 'processInstanceIds=89767'
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Content-Type': 'application/x-www-form-urlencoded',
      'Origin': 'http://10.1.19.150:7080',
      'Referer': 'http://10.1.19.150:7080/dolphinscheduler/ui/',
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token,
      'Cookie': 'sessionId=680b2a0e-624c-4804-9e9e-58c7d4a0b44c; language=zh_CN; userName=admin; HERA_Token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzc29JZCI6Ii0xIiwic3NvX25hbWUiOiJhZG1pbiIsImF1ZCI6IjJkZmlyZSIsImlzcyI6ImhlcmEiLCJleHAiOjE2NDYwMjk3MDYsInVzZXJJZCI6IjEiLCJpYXQiOjE2NDU3NzA1MDYsInVzZXJuYW1lIjoiYWRtaW4ifQ.YEhr9Mi7FDsQIAn5GJorB0U3lL92KQA8YvP26QMhh9g; sessionId=680b2a0e-624c-4804-9e9e-58c7d4a0b44c'
    }
    response = requests.request("DELETE", url, headers=headers, data=payload)
    print('执行结果如下:')
    print(response.text)if __name__ == '__main__':
    # # 需要处理的项目
    projects = get_project_list()
    # 依次处理项目
    for project in projects:
        project_code = project['code']
        print('正在处理项目:'+ str(project_code))
        all_dags = get_definition_detail(project_code)
        for dag in all_dags:
            # 工作流code和当前版本
            dag_code = dag['code']
            current_version = dag['version']
            print(dag_code)
            print(current_version)
            # 获取该工作流历史版本记录...
            all_data = get_version_detail(project_code,dag_code,current_version)
            # TODO 删除
            print(all_data)
            for v in all_data:
                delete(project_code,dag_code,v)

使用示例:dolphin_clean_version.py 是上面的脚本。

python  dolphin_clean_version.py

脚本在 GitHub 也维护了一份,欢迎 star 图片

https://github.com/aikuyun/dolphin_practices/blob/main/dolphin_clean_version.py

3.注意事项

1.token 获取的方式
file

以上就使用 API 一键减少关系日志表和任务定义日志表的数据量的过程,如果有任何疑问,都可以与我交流,希望可以帮到你,下次见。

本文由 白鲸开源 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/793846.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

吴恩达 AI 完整课程资源2020汇总

风度78 于 2020-02-19 11:30:00 发布https://blog.csdn.net/fengdu78/article/details/104403851编辑 | Will 出品 | 字节AI 吴恩达(Andrew Ng),毫无疑问,是全球人工智能(AI)领域的大 IP!随着近些年来 AI 越来越火的大趋势下,吴恩达一直致力于普及、宣传、推广 AI 教育…

509迷宫

想法还是太过于巧妙了。 首先有一个很简单的容斥 \(n^2\) 做法。 然后我们能发现 \(mod\) 很小,注意:\(\forall_{1 \le i < mod}\) \(C_{mod}^{i} = 0\)。 所以就有个天才的做法,将矩阵沿着对角线切开,类似这样:如果我们每隔 \(mod\) 进行一次切割,那么我们就会发现如…

尿素

大周期看在走2浪的回调

零基础学习地平线 征程6 QAT 量化感知训练

1. 背景 首先感谢地平线工具链用户手册和官方提供的示例,给了我很大的帮助,特别是代码注释写了很多的知识点,超赞!要是注释能再详细点,就是超超赞了!下面开始正文。 最近想着学 QAT(量化感知训练)玩玩,大体看了一下地平线的用户手册,不说精度调优之类比较复杂的,光一个…

征程 6E/M 快速上手实战 Sample-PYM

01 IPC模块简述 1.1 硬件数据流 PYM(Pyramid)作为一个硬件加速模块(图像缩小及 ROI 提取),对输入的图像按照金字塔图层的方式处理,并输出到 DDR:PYM 模块在 Camsys 子系统的数量和位置如下,总共 3 个 PYM 硬件,PYM0、PYM1、PYM4(只支持 offline)。1.2 PYM-Sample 软…

#CAMA | 以视觉为中心的静态地图元素标注方法

01 现有标注方法的局限性 在自动驾驶领域,静态地图元素的精确标注是实现高精度环境感知的关键。然而,现有的公共数据集在一致性和准确性方面存在局限,无法满足日益增长的高精度训练数据需求。图一展示了 nuScenes 数据集中的默认高清地图无法在一致性和准确性两个方面提供准…

巧手打字通-在线打字练习网站功能大全

巧手打字通 Hello,大家好,今天来给大家介绍一个实用的在线打字练习网站。就是我们看到的“巧手打字通”。一个专为打字初学者,特别是中小学生量身设计的专业性平台。 打开网站,我们可以看到,整个网站的课程设计主要分为:入门课程,学拼音,说英语,读诗词,弹钢琴,玩游戏…

大模型API实战-console.bce.baidu.com/qianfan/

百度千帆大模型平台API调用实战 需要注册并实名制,然后到模型服务-->模型推理,选择可以免费开通的模型开通(其他都是收费的有坑)ACCESS_KEY、SECRET_KEY 和 AK、SK的获取 ACCESS_KEY、SECRET_KEYpython调用 # 安装包(Python >= 3.7):pip install qianfan import os …

main() 方法

根据 Java 语言规范,main() 方法必须被申明为 public。在 Java 1.4 及之后的版本中,Java 解释器强制要求 main() 方法必须是 public。 Java 语言规范(Java Language and Virtual Machine Specifications) public:被 JVM 调用,访问权限足够大。 static:被 JVM 调用,不用…

敏捷开发中的类型关系

​​ 欢迎来到我的博客:计算机软件技术总结 ‍

Linux(centos)安装安全狗

Step1:下载linux安全狗 在安全狗官网直接下载软件安装包(.tar.gz 格式:safedog_linux64.tar.gz)使用finalshell将文件发送到centos指定文件夹 [root@localhost ~]# ls anaconda-ks.cfg original-ks.cfg safedog_linux64.tar.gz vulhubStep2:解压并安装 解压缩safedog_lin…

小程序授权登录前后端对接及用户信息完善

对接后台登录流程 微信官方早都已经禁止开发者直接通过 api 获取用户信息数据了,大家拿个用户的 openid 注册好,剩下的让用户填写就行了。 先上官方的经典登录流程图:步骤拆分解析:前端通过 调用官方 API wx.login,将回调中的 code 临时登陆凭证传递给(请求)后台 后台去…