flink 写入数据到 kafka 后,数据过一段时间自动删除

版本

  • flink 1.16.0
  • kafka 2.3

流程描述:

flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。

问题描述:

数据写入到新的topic后,过上几分钟的时间,利用工具offset explorer观察对应topic的数据量,显示为0。
刚写入没多久的数据消失了 ???大写的懵 ???

定位问题:

  • 首先查看kafka的日志:
  • 阅读flink 官方文档 kafkaSink的介绍:

DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write
all messages in a Kafka transaction that will be committed to Kafka on
a checkpoint. Thus, if the consumer reads only committed data (see
Kafka consumer config isolation.level), no duplicates will be seen in
case of a Flink restart. However, this delays record visibility
effectively until a checkpoint is written, so adjust the checkpoint
duration accordingly. Please ensure that you use unique
transactionalIdPrefix across your applications running on the same
Kafka cluster such that multiple running jobs do not interfere in
their transactions! Additionally, it is highly recommended to tweak
Kafka transaction timeout (see Kafka producer transaction.timeout.ms)»
maximum checkpoint duration + maximum restart duration or data loss
may happen when Kafka expires an uncommitted transaction.

  • 翻译过来的意思大概就是:

在EXACTLY_ONCE这种模式下,KafkaSink在事务中写入所有的消息,这些消息在checkpoint上提交给kafka。因此,在flink重启的情况下,如果消费者值读取提交的数据,不会看到重复的数据。缺点就是延迟记录可见性,知道写入检查点为止。强烈建议调整kafka的事务超时时间(见Kafka producer transaction.timeout.ms),超时时间要大于【最大检查点持续时间+最大重启持续时间】,否则当Kafka过期未提交的事务时可能会发生数据丢失。

  • 阅读kafka的官网介绍:

Producer Configs:
transaction.timeout.ms:60000(默认值)

参数描述:
The maximum amount of time in ms that the transaction coordinator will
wait for a transaction status update from the producer before
proactively aborting the ongoing transaction.If this value is larger
than the transaction.max.timeout.ms setting in the broker, the request
will fail with a InvalidTransactionTimeout error.

Broker Configs
transaction.max.timeout.ms:900000(默认值)

参数描述:
The maximum allowed timeout for transactions. If a client’s requested
transaction time exceed this, then the broker will return an error in
InitProducerIdRequest. This prevents a client from too large of a
timeout, which can stall consumers reading from topics included in the
transaction.

  • 最后排查
    在flink中设置的超时时间违反了kafka producer对应的参数规定。

解决问题

在kafkaSink的配置中,加入

Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可
properties.setProperty("transaction.timeout.ms","900000");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();

总结

在使用现有框架和工具的时候,往往只是懂得怎么用,具体底层的逻辑、原理,了解的很少。往往只有真正理解了原理,遇到了问题,才会更快、更准确的定位问题、解决问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/109152.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习jQuery库的第一天

简介 什么是 jQuery &#xff1f; jQuery 是一个广泛使用的 JavaScript 库。它简化了网页开发中常见的许多任务&#xff0c;例如 HTML 文档遍历、操作 HTML 元素、处理事件、动画效果、Ajax 网络请求等。通过使用 jQuery&#xff0c;开发人员可以更加高效地编写跨浏览器兼容的…

竞赛 基于机器视觉的火车票识别系统

文章目录 0 前言1 课题意义课题难点&#xff1a; 2 实现方法2.1 图像预处理2.2 字符分割2.3 字符识别部分实现代码 3 实现效果最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于机器视觉的火车票识别系统 该项目较为新颖&#xff0c;适合作为竞赛…

C++模版基础

代码地址 gitgithub.com:CHENLitterWhite/CPPWheel.git 专栏介绍 本专栏会持续更新关于STL中的一些概念&#xff0c;会先带大家补充一些基本的概念&#xff0c;再慢慢去阅读STL源码中的需要用到的一些思想&#xff0c;有了一些基础之后&#xff0c;再手写一些STL代码。 (如果你…

RabbitMQ常见问题

一、RabbitMQ如何保证消息不丢失&#xff1f; 这是面试时最喜欢问的问题&#xff0c;其实这是个所有MQ的一个共性的问题&#xff0c;大致的解 决思路也是差不多的&#xff0c;但是针对不同的MQ产品会有不同的解决方案。而RabbitMQ 设计之处就是针对企业内部系统之间进行调用设…

视频监控系统/安防监控/视频AI智能分析:小动物识别算法场景汇总

随着人们对生态环境的关注日益提升&#xff0c;大家对动物保护意识也逐渐增强。旭帆科技智能分析网关小动物识别算法应运而生。除了对保护动物的识别以外&#xff0c;旭帆科技AI智能分析网关还可以识别常见的老鼠等动物&#xff0c;助力明厨亮灶监管&#xff0c;保卫食品安全。…

基于python解决鸡兔同笼问题

一、什么是鸡兔同笼问题&#xff1f; 鸡兔同笼问题是一个经典的数学问题。问题描述&#xff1a;鸡和兔子共有头数a和脚数b&#xff0c;求鸡和兔子的数量。 解析&#xff1a;设鸡的数量为x&#xff0c;兔子的数量为y&#xff0c;那么可以得到以下两个方程&#xff1a; 1. x y…

Hadoop-Hive

1. hive安装部署 2. hive基础 3. hive高级查询 4. Hive函数及性能优化 1.hive安装部署 解压tar -xvf ./apache-hive-3.1.2-bin.tar.gz -C /opt/soft/ 改名mv apache-hive-3.1.2-bin/ hive312 配置环境变量&#xff1a;vim /etc/profile #hive export HIVE_HOME/opt/soft/hive…

motif(基序)

1、motif 是什么&#xff1f; “高出现频率的分子片段”&#xff0c;它与fragment的区别可能就是一个是高频出现一个不是高频出现的吧 维基百科&#xff1a;经常出现的统计学上非常重要的子图或子结构 下面我们给出例子&#xff0c;分子图通过一些分解手段来构造一些子结构&…

无涯教程-JavaScript - IFERROR函数

描述 如果公式的计算输出为错误,则IFERROR函数将返回您指定的值。否则,返回公式的输出。使用IFERROR函数可以捕获和处理公式中的错误。 语法 IFERROR (value, value_if_error) 争论 Argument描述Required/OptionalvalueThe argument that is checked for an error.Required…

生成式人工智能在高等教育 IT 中的作用

作者&#xff1a;Jared Pane 通过将你大学的数据与公共 LLMs 和 Elasticsearch 安全集成来找到你需要的答案。 根据 2023 年 4 月 EDUCAUSE 的一项调查&#xff0c;83% 的受访者表示&#xff0c;生成式人工智能将在未来三到五年内深刻改变高等教育。 学术界很快就询问和想象生…

数学实验-圆周率π的计算(Mathematica实现)

一、实验名称&#xff1a;圆周率π的计算 二、实验环境&#xff1a;机房、Mathematica 10.3软件 三、实验目的&#xff1a;通过各种方法在Mathematica中计算圆周率π的值&#xff0c; 四、实验内容及结果 1 数值积分法计算π 计算定积分的数值&#xff0c;就得到了的值&am…

参议员和科技巨头的私人人工智能峰会引发争议

周三&#xff0c;美国参议员查克舒默(D-NY)在参议院办公楼举办了一场关于潜在人工智能监管的“人工智能洞察论坛”。与会者包括亿万富翁和现代行业巨头&#xff0c;如埃隆马斯克、比尔盖茨、马克扎克伯格、OpenAI的萨姆奥特曼和英伟达的黄仁勋。但是这份公司客人名单22个中的14…