PyODPS获取MaxComputer数据仓中业务表的最新更新时间并输出至表中

PyODPS获取MaxComputer数据仓中业务表的最新更新时间并输出

 1.准备两张数据表:

1.1 需要统计的库表中间表:

CREATE TABLE IF NOT EXISTS dim_river_system_business_time (`id`  STRING  COMMENT '序号',`dept_name`  STRING  COMMENT '',`system_name`  STRING  COMMENT '所属系统',`table_name`  STRING  COMMENT '数据仓表名称',`table_comment`  STRING  COMMENT '表中文名称',`column_name`  STRING  COMMENT '根据那个字段取最大值',`load_time`  STRING  COMMENT '数据录入时间'
)
COMMENT '业务数据表最新更新时间(配置表)';

1.2 最终结果存储表:

CREATE TABLE IF NOT EXISTS dwd_river_system_business_time (`dept_name`  STRING  COMMENT '',`system_name`  STRING  COMMENT '系统名称',`table_name`  STRING  COMMENT '库表名称',`table_comment`  STRING  COMMENT '表中文名称',`table_rows`  BIGINT  COMMENT '表数据量',`business_maxtime`  STRING  COMMENT '表业务最新更新时间',`etl_time`  STRING  COMMENT '取值时间(etl_time)'
)
COMMENT '业务数据表最新更新时间'
PARTITIONED BY (`dt`  STRING  COMMENT '按天分区字段'
);

1.3 PyODPS脚本创建:

 

from odps import ODPS
import datetime
import sys# 解决控制台输出乱码
reload(sys);
sys.setdefaultencoding('utf-8');# 创建连接
o = ODPS('id', 'key', 'project',endpoint='');# 获取时间函数
def getDate(format_string="%Y%m%d%"):return datetime.datetime.now().strftime(format_string)# 判断时间是否为空,为空赋值初始值
def timeReplaceIsNUll(time):if len(time)==0 or not time :return "1998-01-01 23:59:59";else :return time;# 字符串截取,处理SQL拼接最后一个UNION ALL
def strjq(strData):return strData[0:len(strData)-10]+";";# 获取分区时间和吸入数据时间
# 1.分区时间
timeHH=getDate("%Y%m%d");
# 2.写入数据时间
times=getDate("%Y-%m-%d %H:%M:%S");# 组装统计SQL
def SQLCreate(table_name,table_comment,timeRex,system_name,dept_name):sql="SELECT '"+ table_name +"' AS table_name,'"+table_comment+"' as table_comment,count(1) as table_rows,'"+system_name+"' as system_name,'"+dept_name+"' as  dept_name,DATE_FORMAT(max("+timeRex+"),'yyyy-MM-dd hh:mm:ss','false') as Business_maxtime,getDATE() as etl_time from "+table_name;return sql;# 批量SQL模板暂存区
SQLTemplateBath=[]
# 读取数据表
with o.execute_sql('select table_name,table_comment,column_name,system_name,dept_name from dim_river_system_business_time').open_reader() as reader:for record in reader:   # 处理每一个record。
            SQLTemplateBath.append(SQLCreate(record.table_name,record.table_comment,record.column_name,record.system_name,record.dept_name));# 存储批量脚本运行结果
Datas=[]
# 运行脚本存储过程数据
bath1="";
bath2="";
bath3="";
bath4="";
bath5="";
bath6="";
bath7="";
# 获取表输入输出流【目的表】
table = o.get_table('dwd_river_system_business_time', project='FGDN_odps')
for i in range(len(SQLTemplateBath)):if i <= 300 :bath1 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第一批次SQL当前下标:{}".format(i));if i > 300 and i <= 600 :bath2 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第二批次SQL当前下标:{}".format(i));if i > 600 and i <= 1000 :bath3 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第三批次SQL当前下标:{}".format(i));if i > 1000 and i <= 1300 :bath4 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第四批次SQL当前下标:{}".format(i));if i > 1300 and i <= 1600 :bath5 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第五批次SQL当前下标:{}".format(i));if i > 1600 and i<=1900 :bath6 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第六批次SQL当前下标:{}".format(i));if i > 1900 and i <= 2200 :bath7 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第七批次SQL当前下标:{}".format(i));if i > 2200break;print(">>> 配置批量数据超出限制:{}".format(i));# 配置启动脚本
def startBaths():if len(bath1) > 1 :print("<<< 开始启动批量脚本 1.....");with o.execute_sql('%s'%strjq(bath1)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath2) > 1 :print("<<< 开始启动批量脚本 2.....");with o.execute_sql('%s'%strjq(bath2)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath3) > 1 :print("<<< 开始启动批量脚本 3.....");with o.execute_sql('%s'%strjq(bath3)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath4) > 1 :print("<<< 开始启动批量脚本 4.....");with o.execute_sql('%s'%strjq(bath4)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath5) > 1 :print("<<< 开始启动批量脚本 5.....");with o.execute_sql('%s'%strjq(bath5)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath6) > 1 :print("<<< 开始启动批量脚本 6.....");with o.execute_sql('%s'%strjq(bath6)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath7) > 1 :print("<<< 开始启动批量脚本 7.....");with o.execute_sql('%s'%strjq(bath7)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);# 启动脚本
startBaths();# 写数据到库表
if range(len(Datas))>=1:with table.open_writer(partition="dt='%s'"%timeHH,create_partition=True) as writer:writer.write(Datas)print("<<< 数据写入结束!!")

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/837209.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

powerdesigner同时显示列的name和code

参考来源:数据库设计工具-PowerDesigner中table视图显示name与code 1、Tools-->Display Preferences2、Table->Advanced3、Columns-->放大镜图标4、勾选【code】并调整顺序到datatype之前5、单击ok,成功

RedisTemplate RedisConfig 序列化方式 fastjson2

Spring Data Redis 为我们提供了下面的Serializer:GenericToStringSerializer、Jackson2JsonRedisSerializer、JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。序列化方式对比:JdkSerializationRedisSerializer: 使用…

实验 20:备忘录模式

本次实验属于模仿型实验,通过本次实验学生将掌握以下内容: 1、理解备忘录模式的动机,掌握该模式的结构; 2、能够利用备忘录模式解决实际问题。[实验任务一]:多次撤销 改进课堂上的“用户信息操作撤销”实例,使得系统可以实现多次撤销(可以使用HashMap、ArrayList等集合…

实验 21:观察者模式

本次实验属于模仿型实验,通过本次实验学生将掌握以下内容: 1、理解观察者模式的动机,掌握该模式的结构; 2、能够利用观察者模式解决实际问题。[实验任务一]:股票提醒 当股票的价格上涨或下降5%时,会通知持有该股票的股民,当股民听到价格上涨的消息时会买股票,当价格下…

MaskLLM:英伟达出品,用于大模型的可学习`N:M`稀疏化 | NeurIPS24

来源:晓飞的算法工程笔记 公众号,转载请注明出处论文: MaskLLM: Learnable Semi-Structured Sparsity for Large Language Models论文地址:https://arxiv.org/abs/2409.17481 论文代码:https://github.com/NVlabs/MaskLLM创新性提出一种可学习的LLM半结构化剪枝方法MaskLLM…

Codeforces 1145 题目分析

本文将分析 Codeforces 1145 (April Fools Day Contest 2019)。 题目分析 A 题目描述:使用“灭霸排序”算法可以得到的最长子串的长度。 解法:暴力。 B 题目描述:输入一个整数 \(\in [1,99]\),若它的英文形式含有 knba 四个字母输出 NO 否则输出 YES。 解法:打表。 C 题目…

3.2点对点ppp

001ppp协议特点 ppp协议能承载不同网络层分组,ip数据报里有其他网络层协议存在。 能在多种类型的链路上运行 和广播方式相比,需要一个网络地址的协商,传送前有个协商过程,确认资源。地址002ppp协议组成 将ip数据报封装到串行链路的方法 链路控制协议LCP--建立链路,身份验证…

.net8报HTTP Error 500.30 - ASP.NET Core app failed to start

.net8调试正常,发布到IIS报HTTP Error 500.30 - ASP.NET Core app failed to start。 解决办法: 停止IIS站点,运行发布项目的“XXX.exe”,在运行窗口就可以看到具体的错误信息。 在根据具体错误信息解决。

设计和训练人工智能模型的意义是什么?

前一篇:《人工智能模型训练技术,正则化!》 序言:人工智能模型的真正价值在于其实际应用,而不仅仅停留在理论阶段。本节将通过一个简单而常见的应用场景,展示如何将前面几节所设计和训练的模型应用于实际问题。我们将使用训练好的模型对句子进行分类,具体来说,识别社交平…

【分享】数据传输新挑战:内外网文件如何实现安全高效摆渡?

随着信息化的不断深入,越来越多的数据被电子化,这使得数据安全问题变得更加突出。全球42%的企业在过去一年中至少经历过一次数据泄露事故,30%的企业IT人员在数据泄露事故中被开除,50%的企业IT认为文件管控难度大、成本高。 为了保护核心数据,大部分企业都实施了内外网隔离…

画图-利用画图工具修改聊天记录时间

1、用Windows自带的画图工具 打开要修改的图片 选择“矩形”用矩形选中要修改的时间即可对其进行 移动 或者覆盖的操作

Redis中常见的数据类型及其应用场景

五种常见数据类型 Redis中的数据类型指的是 value存储的数据类型,key都是以String类型存储的,value根据场景需要,可以以String、List等类型进行存储。各数据类型介绍:Redis数据类型对应的底层数据结构String 类型的应用场景 常用命令存放键值:set key value [EX seconds] …