触发器实现海豚调度失败企业微信自动告警

原理

触发器监控工作流实例表,当工作流实例表中的状态更新后,针对状态为失败的任务进行企业微信告警。

发送企业微信消息函数

su - postgres
# 必须在pg的主机上线安装requests模块
pip install requests
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U postgres
# 创建插件plpython3u
create extension plpython3u;
# plpython3u为不受信语言,所以只能被超级用户使用
# 在tool模式下建立发送企业微信消息函数tool.sp_send_wechat
CREATE OR REPLACE FUNCTION tool.sp_send_wechat(message json, webhook character varying DEFAULT 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你自己的key'::character varying)RETURNS textLANGUAGE plpython3uSECURITY DEFINER
AS $function$
import requests
import json
"""
/** 作者 : v-yuzhenc* 功能 : 给企业微信发送一条消息* message : 需要发送的消息,json格式* webhook : 企业微信机器人的webhook* */
"""
import requests
import json# 企业微信自定义机器人的webhook地址
p_webhook = webhook
# 要发送的消息内容
p_message = json.loads(message)
# 发送POST请求
response = requests.post(p_webhook, data=json.dumps(p_message), headers={"Content-Type": "application/json"})# 打印响应结果
return response.text
$function$
;
--将函数直接转给tool
ALTER FUNCTION tool.sp_send_wechat(json, varchar) OWNER TO tool;
--公开函数的执行权限
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO public;
--将函数的执行权限授权给tool用户
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO tool;
\q

远程执行命令函数

  • 由于海豚调度的任务日志是以文件的形式存储在操作系统中,所以,必须在数据库中实现这样一个函数,能够读取海豚服务器的日志文件
su - postgres
# 必须在pg的主机上安装paramiko模块
pip install paramiko
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U postgres
# 上面已经创建了plpython3u插件,这里不需要再次建立了
# 创建远程执行命令函数tool.sp_remote_exec_command_nopass
CREATE OR REPLACE FUNCTION tool.sp_remote_exec_command_nopass(remote_command text, remote_host character varying DEFAULT 'dpmaster'::character varying, remote_port integer DEFAULT 22222, remote_username character varying DEFAULT 'dp'::character varying, remote_return_mode character varying DEFAULT 'stdout'::character varying)RETURNS textLANGUAGE plpython3uSECURITY DEFINER
AS $function$
import paramiko
"""
/** 作者 : v-yuzhenc* 功能 : 免密(需要配置ssh免密)在远程服务器执行一条命令* remote_command : 需要执行的命令* remote_host : 远程主机名或ip* remote_port : ssh端口* remote_username : 免密登陆的用户* remote_return_mode : 返回信息的模式,stderr返回标准错误信息,否则返回标准输出* */
"""
# SSH连接信息
host = remote_host
port = remote_port
username = remote_username
private_key_path = '/home/postgres/.ssh/id_rsa'
ssh_command = remote_command# 连接SSH服务器
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
ssh.connect(host, port, username, pkey=private_key)# 通过SSH执行命令
stdin, stdout, stderr = ssh.exec_command(ssh_command)
p_stdout = stdout.read().decode().strip()
p_stderr = stderr.read().decode().strip()
# 关闭SSH连接
ssh.close()
# 打印响应结果
if remote_return_mode == 'stderr':return None if p_stderr == '' else p_stderr
else:return None if p_stdout == '' else p_stdout
$function$
;
-- Permissions
ALTER FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) OWNER TO tool;
GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO tool;
GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO dp;
\q

企业微信告警触发器

  • 由于企业微信markdown格式的消息艾特指定的人只能通过企业微信中的userid(即用户在企业微信中的账号)调用,所以,我们在海豚调度的元数据表t_ds_user中增加wechat_userid字段,人工将海豚的用户对应的企业微信的userid维护上去
# 以dp用户登录etl数据库
psql etl -U dp
# 增加字段
alter table t_ds_user add wechat_userid varchar(100);
comment on column t_ds_user.wechat_userid is '对应的企业微信的userid';
# 维护wechat_userid中的数据
# 这里根据自己的企业实际情况做
update t_ds_user 
set wechat_userid = 'YuZhenChao'
where user_name = 'yuzhenchao'
;
CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_wechat()RETURNS triggerLANGUAGE plpgsql
AS $function$
/** 作者:v-yuzhenc* 功能:海豚调度工作流失败自动告警* */
declarei record;v_content text;v_message varchar;
beginif new.state in (4,5,6) then for i in (select'<@'||d.wechat_userid||'>\r\n# [DolphinScheduler Job ]\r\n> 实例  id  :  ['||a.id::varchar||'/'||b.id||'](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/'||g.code||'/workflow/instances/'||a.id||'?code='||a.process_definition_code||')\r\n> 项目名称 : <font color=\"comment\">'||g.name||'('||g.code||')</font>'||'\r\n> 工作流名 : <font color=\"comment\">'||e.name||'('||a.process_definition_code||')</font>'||'\r\n> 任务名称 : <font color=\"comment\">'||b.name||'('||b.task_code||')</font>'||'\r\n> 任务类型 : <font color=\"comment\">'||b.task_type||'</font>\r\n> 开始时间 : <font color=\"comment\">'||to_char(b.start_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 结束时间 : <font color=\"comment\">'||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 所属用户 : <font color=\"comment\">'||d.user_name||'('||c.user_id||')</font>\r\n> 任务状态 : <font color=\"warning\">执行失败</font>'||'\r\n> 报错信息 : <font color=\"warning\">'||tool.sp_remote_exec_command_nopass($remote_command$cat $remote_command$||b.log_path||$remote_command$ | grep "\[ERROR\]\|等表超时" | awk -F' - ' '{print $2!=null ? $2 : $1}' | head -1 | sed 's/\"/\\\"/g'$remote_command$,split_part(b.host,':',1))||'</font>' as wechat_contentfrom t_ds_process_instance a inner join t_ds_task_instance b on (a.id = b.process_instance_id)inner join t_ds_task_definition c on (b.task_code = c.code and b.task_definition_version = c."version")inner join t_ds_user d on (c.user_id = d.id)inner join t_ds_process_definition e on (a.process_definition_code = e.code and a.process_definition_version = e."version")inner join t_ds_project g on (e.project_code = g.code)where c.task_type <> 'SUB_PROCESS'and a.state = 6and b.state = 6and a.id = new.id) loop v_content := i.wechat_content;v_message := $v_message${"msgtype":"markdown","markdown": {"content":"$v_message$||v_content||$v_message$"}
}$v_message$;--告警perform tool.sp_send_wechat(v_message::json);end loop;end if;return new;exception when others then return new;
end;
$function$
;
-- Permissions
ALTER FUNCTION dp.tg_ds_udef_alert_wechat() OWNER TO dp;
GRANT ALL ON FUNCTION dp.tg_ds_udef_alert_wechat() TO dp;
# 创建时候触发器
create trigger tg_state_ds_process_instance after update on dp.t_ds_process_instance for each row execute function dp.tg_ds_udef_alert_wechat();
\q

测试

  • 新建一个工作流,选择SQL组件

在这里插入图片描述

  • 保存工作流
    在这里插入图片描述

  • 上线工作流并运行工作流
    在这里插入图片描述

  • 工作流运行失败
    在这里插入图片描述

  • 随即企业微信来了消息提醒
    在这里插入图片描述
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/54978.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HDFS中的NAMENODE元数据管理(超详细)

元数据管理 元数据是什么元数据管理概述内存元数据元数据文件fsimage内存镜像文件edits log编辑日志 namenode加载元数据文件顺序 元数据管理相关目录文件元数据相关文件VERSIONseen_txid 元数据文件查看&#xff08;OIV,OEV&#xff09;SecondaryNameNode介绍checkpoint机制SN…

vue中显示在页面顶部的进度条插件——NProgress

我们在一些网站中经常见到导航栏上方的进度条显示&#xff0c;大家仔细观察&#xff0c;其实csnd中也有类似的效果&#xff0c;如下图显示效果&#xff0c;我们现在就来一起看看这个功能需求是怎么实现的。 一、功能需求 首先&#xff0c;实现这个功能其实不难&#xff0c;说实…

python 接入GoogleAuth

经常会用到GoogleAuth作为二次验证码&#xff0c;就扒了代码看看这块逻辑如何实现的&#xff0c;做个笔记。 import hmac import struct import time from hashlib import sha1 from urllib.parse import urlencode, quoteif __name__ __main__:# account会作为标识显示在身份…

yolo-v5学习(使用yolo-v5进行安全帽检测错误记录)

常见错误 跑YOLOv5遇到的问题_runtimeerror: a view of a leaf variable that requi_Pysonmi的博客-CSDN博客 python train.py --img 640 --batch 16 --epochs 10 --data ./data/custom_data.yaml --cfg ./models/custom_yolov5.yaml --weights ./weights/yolov5s.pt 1、梯度…

elementui Cascader 级联选择使用心得

相信大家对于elementui并不陌生&#xff0c;作为适配Vue的优秀UI框架之一&#xff0c;一直被所有的开发者痛并快乐着。今天要记录的就是里边的主角之一Cascader。 首先先介绍一下Cascader ---> 当一个数据集合有清晰的层级结构时&#xff0c;可通过级联选择器逐级查看并选择…

解密Redis:应对面试中的缓存相关问题2

面试官&#xff1a;Redis集群有哪些方案&#xff0c;知道嘛&#xff1f; 候选人&#xff1a;嗯~~&#xff0c;在Redis中提供的集群方案总共有三种&#xff1a;主从复制、哨兵模式、Redis分片集群。 面试官&#xff1a;那你来介绍一下主从同步。 候选人&#xff1a;嗯&#xff…

检测文本是否由AI生成,GPT、文心一言等均能被检测

背景 目前很多机构推出了ChatGPT等AI文本检测工具&#xff0c;但是准确率主打一个模棱两可&#xff0c;基本和抛硬币没啥区别。 先说结论&#xff0c;我们对比了常见的几款AI检测工具&#xff0c;copyleaks检测相比较而言最准确。 检测文本 AI文本片段1 来源&#xff1a;G…

Windows上安装 jdk 环境并配置环境变量 (超详细教程)

&#x1f468;‍&#x1f393;博主简介 &#x1f3c5;云计算领域优质创作者   &#x1f3c5;华为云开发者社区专家博主   &#x1f3c5;阿里云开发者社区专家博主 &#x1f48a;交流社区&#xff1a;运维交流社区 欢迎大家的加入&#xff01; &#x1f40b; 希望大家多多支…

Spring IOC

◆ 传统Javaweb开发的困惑 ◆ IoC、DI和AOP思想提出 ◆ Spring框架的诞生 Spring | Home IOC控制反转&#xff1a;BeanFactory 快速入门 package com.xiaolin.service.Impl;import com.xiaolin.dao.UserDao; import com.xiaolin.service.UserService;public class UserServic…

vue2-diff算法

1、diff算法是什么&#xff1f; diff算法是一种通过同层的树节点进行比较的高效算法。 其有两个特点&#xff1a; 比较只会在同层级进行&#xff0c;不会跨层级进行。 在diff比较的过程中&#xff0c;循环从两边向中间比较。 diff算法在很多场景中都有应用&#xff0c;在vue中&…

TabR:检索增强能否让深度学习在表格数据上超过梯度增强模型?

这是一篇7月新发布的论文&#xff0c;他提出了使用自然语言处理的检索增强Retrieval Augmented技术&#xff0c;目的是让深度学习在表格数据上超过梯度增强模型。 检索增强一直是NLP中研究的一个方向&#xff0c;但是引入了检索增强的表格深度学习模型在当前实现与非基于检索的…

react工程化配置

道阻且长&#xff0c;行而不辍&#xff0c;未来可期 1.安装react yarn create react-app demo --template typescript cd demo yarn start2.配置蓝图模版 2.1安装blueprint插件 https://github.com/shredor/blueprint-templates-cli#readme yarn add blueprint-templates-c…