Flink SQL 基于Update流出现空值无法过滤问题

问题背景

  • 问题描述
基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入,
超过失败次数失败的情况
  • 问题报错
	Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dws_table_name (op_date, school_year, campus_name, school_name, depart_name, total_opfare, ids, update_time) VALUES ('2024-03-11 00:00:00+08', '2023', 'xxxx', 'xxxx学校', 'xxxx小学部', '203333300000', '57', '2024-03-21 09:31:08.47+08') ON DUPLICATE KEY UPDATE school_year=VALUES(school_year), total_opfare=VALUES(total_opfare), ids=VALUES(ids), update_time=VALUES(update_time) was aborted: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint  Call getNextException to see other errors in the batch.at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332]... 1 moreCaused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraintat com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332

在这里插入图片描述

定位

定位思路

1.方向一:怀疑数据库插入存在数据处理时,造成数据处理出现空值的情况,即数据本身不为空,但是数据插入却出现了空
2.方向二:Flink-SQL在消费kafka数据时存在了空值,故加工的数据计算结果存在空值

定位过程

  • 因插入数据库定位比较麻烦,且数据库已经设置该字段为主属性,故出现插入时处理为空值的概率较小。故先从较为简单的Flink SQL查询数据
  • 定位方法一,查询该字段为空的记录,待作业执行完成后,未查询到空值对应记录
 select  select * from table_name where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 因考虑到使用Flink-CDC进行变更数据捕获,故对应的update流存在-U,+U,-D,+I记录,因此随着插入记录存在空值被记录进去的情况,故采用view的方式,先将宽表的加工、关联方式创建为view,然后进行空值的过滤。实施如下
create view view_prd as 
select a.* ,b.*  from a join b on a.id = b.idselect * from view_prd where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 通过查询结果,发现存在最后一条记录存在空值的原因,往源头定位,发现该字段之前为空,后面进行更新填充到值出现-U记录,导致数据插入持续失败
    在这里插入图片描述

原因

  • 因为flink-SQL消费的数据时kafka topic,flink以upsert-kafka形式的connector进行写入,故存在changelog 流中数据更新存在-U,+U的记录(按照Key进行区分唯一条记录),value 为空(-U)的记录kafka也,导致出现空值,
    在这里插入图片描述

解决

通过在DWS宽表创建一层View(如上),在写入DWS宽表的kafka topic之前,现将该字段空值过滤,即可排除空值涉及记录被纳入结果指标计算的范围中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/573114.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于大语言模型的云故障根因分析|顶会EuroSys24论文

*马明华 微软主管研究员 2021年CCF国际AIOps挑战赛程序委员会主席(第四届) 2021年博士毕业于清华大学,2020年在佐治亚理工学院做访问学者。主要研究方向是智能运维(AIOps)、软件可靠性。近年来在ICSE、FSE、ATC、EuroS…

151 shell编程,正则表达式,在C语言中如何使用正则表达式

零,坑点记录:bash 和 dash 的区别,导致的坑点 查看当前用的shell 是啥,用的是/bin/bash hunandedehunandede-virtual-machine:~$ echo $SHELL /bin/bash 当shell 脚本运行的时候(后面会学到方法,这里是最…

SQLite中的动态内存分配(五)

返回:SQLite—系列文章目录 上一篇:SQLite中的原子提交(四) 下一篇:SQLite使用的临时文件(二) ​概述 SQLite使用动态内存分配来获得 用于存储各种对象的内存 (例如&#xff1a…

现货黄金投资锁仓是什么意思?

在金融投资领域,特别是黄金市场中,“锁仓”是一个常见的策略。那么,现货黄金投资中所说的“锁仓”是什么意思呢?简单来说,锁仓是投资者为了保护自己的投资不受未来价格波动的影响,同时锁定当前的价格水平&a…

SpringBoot整合Redis:缓存击穿--互斥锁解决

🎉🎉欢迎光临,终于等到你啦🎉🎉 🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀 🌟持续更新的专栏Redis实战与进阶 本专栏讲解Redis从原理到实践 …

flutter Got socket error trying to find package nested at

flutter Got socket error trying to find package nested at xxx 报错信息:“Got socket error trying to find package nested at” 通常出现在Flutter尝试从pub.dev获取依赖包时,由于网络问题导致无法连接到pub.dev或者无法正确解析包的路径。 例如&…

遥感卫星影像质量评价指标汇总

1. 主观评价方法 以人为图像的评价者,根据自己的评价尺度和经验对图像质量进行评价。 2. 客观评价方法 1)均方差 2)信噪比 主要用来评价影像经压缩、传输、增强等处理前后的质量变化情况,其本质与均方差类似。 3)方差 反映了图像各个像元灰度相对…

【前端面试3+1】03深拷贝浅拷贝、let和var、css盒模型、【有效括号】

一、深拷贝浅拷贝 深拷贝和浅拷贝都是用于复制对象或数组的概念,但它们之间有着重要的区别: 1. 浅拷贝: 浅拷贝是指在拷贝对象或数组时,只会复制一层对象的属性或元素,而不会递归地复制嵌套的对象或数组。因此&#xf…

Kubeflow文档1:介绍与架构

Kubeflow 2024/3/19版本的文档 此专栏用来展示相关的内容翻译,重点关注本地部署,关于运营商的方案,请自行查阅 文档地址https://www.kubeflow.org/docs/ 开始编辑时间:2024/3/27;最后编辑时间2024/3/27 Kubeflow文…

记录关于智能家居的路程的一个bug___Segmentation fault(段错误)

前言 其实发生段错误的情况有很多: 其实在项目的开发中最有可能的错误就是①和②,考虑到本项目数组用的比较少,所以主要是考虑错误①指针的误用。 有时候错误就是那么离谱,声音也算是一种设备??&#xff…

论文笔记:Retrieval-Augmented Generation forAI-Generated Content: A Survey

北大202402的RAG综述 1 intro 1.1 AICG 近年来,人们对人工智能生成内容(AIGC)的兴趣激增。各种内容生成工具已经精心设计,用于生产各种模态下的多样化对象 文本&代码:大型语言模型(LLM)…

抖音短视频矩阵系统源代码开发部署/SaaS贴牌/源码开源

抖音短视频矩阵系统的源代码开发部署可以分为以下几个步骤: 确定需求:首先,你需要确定你想要开发的具体功能和特性,比如用户注册、上传短视频、评论等。 开发源代码:根据需求,你可以选择使用合适的编程语言…