原理
触发器监控工作流实例表,当工作流实例表中的状态更新后,针对状态为失败的任务进行企业微信告警。
发送企业微信消息函数
su - postgres
# 必须在pg的主机上线安装requests模块
pip install requests
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U postgres
# 创建插件plpython3u
create extension plpython3u;
# plpython3u为不受信语言,所以只能被超级用户使用
# 在tool模式下建立发送企业微信消息函数tool.sp_send_wechat
CREATE OR REPLACE FUNCTION tool.sp_send_wechat(message json, webhook character varying DEFAULT 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你自己的key'::character varying)RETURNS textLANGUAGE plpython3uSECURITY DEFINER
AS $function$
import requests
import json
"""
/** 作者 : v-yuzhenc* 功能 : 给企业微信发送一条消息* message : 需要发送的消息,json格式* webhook : 企业微信机器人的webhook* */
"""
import requests
import json# 企业微信自定义机器人的webhook地址
p_webhook = webhook
# 要发送的消息内容
p_message = json.loads(message)
# 发送POST请求
response = requests.post(p_webhook, data=json.dumps(p_message), headers={"Content-Type": "application/json"})# 打印响应结果
return response.text
$function$
;
--将函数直接转给tool
ALTER FUNCTION tool.sp_send_wechat(json, varchar) OWNER TO tool;
--公开函数的执行权限
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO public;
--将函数的执行权限授权给tool用户
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO tool;
\q
远程执行命令函数
- 由于海豚调度的任务日志是以文件的形式存储在操作系统中,所以,必须在数据库中实现这样一个函数,能够读取海豚服务器的日志文件
su - postgres
# 必须在pg的主机上安装paramiko模块
pip install paramiko
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U postgres
# 上面已经创建了plpython3u插件,这里不需要再次建立了
# 创建远程执行命令函数tool.sp_remote_exec_command_nopass
CREATE OR REPLACE FUNCTION tool.sp_remote_exec_command_nopass(remote_command text, remote_host character varying DEFAULT 'dpmaster'::character varying, remote_port integer DEFAULT 22222, remote_username character varying DEFAULT 'dp'::character varying, remote_return_mode character varying DEFAULT 'stdout'::character varying)RETURNS textLANGUAGE plpython3uSECURITY DEFINER
AS $function$
import paramiko
"""
/** 作者 : v-yuzhenc* 功能 : 免密(需要配置ssh免密)在远程服务器执行一条命令* remote_command : 需要执行的命令* remote_host : 远程主机名或ip* remote_port : ssh端口* remote_username : 免密登陆的用户* remote_return_mode : 返回信息的模式,stderr返回标准错误信息,否则返回标准输出* */
"""
# SSH连接信息
host = remote_host
port = remote_port
username = remote_username
private_key_path = '/home/postgres/.ssh/id_rsa'
ssh_command = remote_command# 连接SSH服务器
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
ssh.connect(host, port, username, pkey=private_key)# 通过SSH执行命令
stdin, stdout, stderr = ssh.exec_command(ssh_command)
p_stdout = stdout.read().decode().strip()
p_stderr = stderr.read().decode().strip()
# 关闭SSH连接
ssh.close()
# 打印响应结果
if remote_return_mode == 'stderr':return None if p_stderr == '' else p_stderr
else:return None if p_stdout == '' else p_stdout
$function$
;
-- Permissions
ALTER FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) OWNER TO tool;
GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO tool;
GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO dp;
\q
企业微信告警触发器
- 由于企业微信markdown格式的消息艾特指定的人只能通过企业微信中的userid(即用户在企业微信中的账号)调用,所以,我们在海豚调度的元数据表t_ds_user中增加wechat_userid字段,人工将海豚的用户对应的企业微信的userid维护上去
# 以dp用户登录etl数据库
psql etl -U dp
# 增加字段
alter table t_ds_user add wechat_userid varchar(100);
comment on column t_ds_user.wechat_userid is '对应的企业微信的userid';
# 维护wechat_userid中的数据
# 这里根据自己的企业实际情况做
update t_ds_user
set wechat_userid = 'YuZhenChao'
where user_name = 'yuzhenchao'
;
CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_wechat()RETURNS triggerLANGUAGE plpgsql
AS $function$
/** 作者:v-yuzhenc* 功能:海豚调度工作流失败自动告警* */
declarei record;v_content text;v_message varchar;
beginif new.state in (4,5,6) then for i in (select'<@'||d.wechat_userid||'>\r\n# [DolphinScheduler Job ]\r\n> 实例 id : ['||a.id::varchar||'/'||b.id||'](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/'||g.code||'/workflow/instances/'||a.id||'?code='||a.process_definition_code||')\r\n> 项目名称 : <font color=\"comment\">'||g.name||'('||g.code||')</font>'||'\r\n> 工作流名 : <font color=\"comment\">'||e.name||'('||a.process_definition_code||')</font>'||'\r\n> 任务名称 : <font color=\"comment\">'||b.name||'('||b.task_code||')</font>'||'\r\n> 任务类型 : <font color=\"comment\">'||b.task_type||'</font>\r\n> 开始时间 : <font color=\"comment\">'||to_char(b.start_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 结束时间 : <font color=\"comment\">'||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 所属用户 : <font color=\"comment\">'||d.user_name||'('||c.user_id||')</font>\r\n> 任务状态 : <font color=\"warning\">执行失败</font>'||'\r\n> 报错信息 : <font color=\"warning\">'||tool.sp_remote_exec_command_nopass($remote_command$cat $remote_command$||b.log_path||$remote_command$ | grep "\[ERROR\]\|等表超时" | awk -F' - ' '{print $2!=null ? $2 : $1}' | head -1 | sed 's/\"/\\\"/g'$remote_command$,split_part(b.host,':',1))||'</font>' as wechat_contentfrom t_ds_process_instance a inner join t_ds_task_instance b on (a.id = b.process_instance_id)inner join t_ds_task_definition c on (b.task_code = c.code and b.task_definition_version = c."version")inner join t_ds_user d on (c.user_id = d.id)inner join t_ds_process_definition e on (a.process_definition_code = e.code and a.process_definition_version = e."version")inner join t_ds_project g on (e.project_code = g.code)where c.task_type <> 'SUB_PROCESS'and a.state = 6and b.state = 6and a.id = new.id) loop v_content := i.wechat_content;v_message := $v_message${"msgtype":"markdown","markdown": {"content":"$v_message$||v_content||$v_message$"}
}$v_message$;--告警perform tool.sp_send_wechat(v_message::json);end loop;end if;return new;exception when others then return new;
end;
$function$
;
-- Permissions
ALTER FUNCTION dp.tg_ds_udef_alert_wechat() OWNER TO dp;
GRANT ALL ON FUNCTION dp.tg_ds_udef_alert_wechat() TO dp;
# 创建时候触发器
create trigger tg_state_ds_process_instance after update on dp.t_ds_process_instance for each row execute function dp.tg_ds_udef_alert_wechat();
\q
测试
- 新建一个工作流,选择SQL组件
-
保存工作流
-
上线工作流并运行工作流
-
工作流运行失败
-
随即企业微信来了消息提醒