PyODPS获取MaxComputer数据仓中业务表的最新更新时间并输出
1.准备两张数据表:
1.1 需要统计的库表中间表:
CREATE TABLE IF NOT EXISTS dim_river_system_business_time (`id` STRING COMMENT '序号',`dept_name` STRING COMMENT '',`system_name` STRING COMMENT '所属系统',`table_name` STRING COMMENT '数据仓表名称',`table_comment` STRING COMMENT '表中文名称',`column_name` STRING COMMENT '根据那个字段取最大值',`load_time` STRING COMMENT '数据录入时间' ) COMMENT '业务数据表最新更新时间(配置表)';
1.2 最终结果存储表:
CREATE TABLE IF NOT EXISTS dwd_river_system_business_time (`dept_name` STRING COMMENT '',`system_name` STRING COMMENT '系统名称',`table_name` STRING COMMENT '库表名称',`table_comment` STRING COMMENT '表中文名称',`table_rows` BIGINT COMMENT '表数据量',`business_maxtime` STRING COMMENT '表业务最新更新时间',`etl_time` STRING COMMENT '取值时间(etl_time)' ) COMMENT '业务数据表最新更新时间' PARTITIONED BY (`dt` STRING COMMENT '按天分区字段' );
1.3 PyODPS脚本创建:
from odps import ODPS import datetime import sys# 解决控制台输出乱码 reload(sys); sys.setdefaultencoding('utf-8');# 创建连接 o = ODPS('id', 'key', 'project',endpoint='');# 获取时间函数 def getDate(format_string="%Y%m%d%"):return datetime.datetime.now().strftime(format_string)# 判断时间是否为空,为空赋值初始值 def timeReplaceIsNUll(time):if len(time)==0 or not time :return "1998-01-01 23:59:59";else :return time;# 字符串截取,处理SQL拼接最后一个UNION ALL def strjq(strData):return strData[0:len(strData)-10]+";";# 获取分区时间和吸入数据时间 # 1.分区时间 timeHH=getDate("%Y%m%d"); # 2.写入数据时间 times=getDate("%Y-%m-%d %H:%M:%S");# 组装统计SQL def SQLCreate(table_name,table_comment,timeRex,system_name,dept_name):sql="SELECT '"+ table_name +"' AS table_name,'"+table_comment+"' as table_comment,count(1) as table_rows,'"+system_name+"' as system_name,'"+dept_name+"' as dept_name,DATE_FORMAT(max("+timeRex+"),'yyyy-MM-dd hh:mm:ss','false') as Business_maxtime,getDATE() as etl_time from "+table_name;return sql;# 批量SQL模板暂存区 SQLTemplateBath=[] # 读取数据表 with o.execute_sql('select table_name,table_comment,column_name,system_name,dept_name from dim_river_system_business_time').open_reader() as reader:for record in reader: # 处理每一个record。 SQLTemplateBath.append(SQLCreate(record.table_name,record.table_comment,record.column_name,record.system_name,record.dept_name));# 存储批量脚本运行结果 Datas=[] # 运行脚本存储过程数据 bath1=""; bath2=""; bath3=""; bath4=""; bath5=""; bath6=""; bath7=""; # 获取表输入输出流【目的表】 table = o.get_table('dwd_river_system_business_time', project='FGDN_odps') for i in range(len(SQLTemplateBath)):if i <= 300 :bath1 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第一批次SQL当前下标:{}".format(i));if i > 300 and i <= 600 :bath2 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第二批次SQL当前下标:{}".format(i));if i > 600 and i <= 1000 :bath3 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第三批次SQL当前下标:{}".format(i));if i > 1000 and i <= 1300 :bath4 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第四批次SQL当前下标:{}".format(i));if i > 1300 and i <= 1600 :bath5 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第五批次SQL当前下标:{}".format(i));if i > 1600 and i<=1900 :bath6 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第六批次SQL当前下标:{}".format(i));if i > 1900 and i <= 2200 :bath7 += SQLTemplateBath[i]+" UNION ALL ";print(">>> 组装第七批次SQL当前下标:{}".format(i));if i > 2200break;print(">>> 配置批量数据超出限制:{}".format(i));# 配置启动脚本 def startBaths():if len(bath1) > 1 :print("<<< 开始启动批量脚本 1.....");with o.execute_sql('%s'%strjq(bath1)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath2) > 1 :print("<<< 开始启动批量脚本 2.....");with o.execute_sql('%s'%strjq(bath2)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath3) > 1 :print("<<< 开始启动批量脚本 3.....");with o.execute_sql('%s'%strjq(bath3)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath4) > 1 :print("<<< 开始启动批量脚本 4.....");with o.execute_sql('%s'%strjq(bath4)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath5) > 1 :print("<<< 开始启动批量脚本 5.....");with o.execute_sql('%s'%strjq(bath5)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath6) > 1 :print("<<< 开始启动批量脚本 6.....");with o.execute_sql('%s'%strjq(bath6)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);if len(bath7) > 1 :print("<<< 开始启动批量脚本 7.....");with o.execute_sql('%s'%strjq(bath7)).open_reader() as reader:for record in reader:re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];Datas.append(re);# 启动脚本 startBaths();# 写数据到库表 if range(len(Datas))>=1:with table.open_writer(partition="dt='%s'"%timeHH,create_partition=True) as writer:writer.write(Datas)print("<<< 数据写入结束!!")