Flink+Kafka消费

引入jar

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.8.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.8.0</version>
</dependency>
<!-- flink整合kafka_2.11 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version>
</dependency>

二、处理逻辑

//2、定义环境 => Env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(9);
env.enableCheckpointing(1000);FlinkKafkaConsumer<String> consumer = this.getConsumer();//调用下面的方法获取数据源
consumer.setStartFromLatest();//消费最新数据//2、绑定数据源=> resource
DataStream<String> stream = env.addSource(consumer);//3、批量读取的方法=>
stream.timeWindowAll(Time.milliseconds(500)) //timeWindowAll:时间滚动窗口,滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠.apply(new ReadKafkaFlinkWindowFunction())//使用自己定义的apply来收集.addSink(new KafkaBatchSink());//批量的sink方法
env.execute();

2、定义消费者,并且将消费者consumer转成FlinkKafkaConsumer

public FlinkKafkaConsumer<String> getConsumer(){//定义消费者信息Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.131.147:9092");properties.put("group.id", "flink-consumer-kafka-group");properties.put("auto.offset.reset", "latest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), properties);return consumer;
}

3、收集数据ReadKafkaFlinkWindowFunction的实现类

4、KafkaBatchSink的实现逻辑

总结

分布式处理引擎Flink使用至少一个【job】调度和至少一个【task】实现分布式处理

有界:就是指flink【消费指定范围内】的数据。例如我定义某个作业间隔时间为0.5秒,则flink已0.5秒为界,进行数据处理。有界数据用在离线数据的处理场景较多

无界:就是指flink始终【监听数据源】里的数据,获取到就处理。无界数据往往用在【实时数据】处理下的场景较多。

我这里结合我们项目的场景来给各位说一下该选那种处理。我们的场景为:

1:尽量支持最多的数据落地
2:数据必须要准确。所以我们最终了有界处理,将flink的界限设置为0.5秒,0.5秒内收集的所有数据整体使用一个算
子消费。保证数据的准确和消费高效性。

1、一定要有抛出异常的机制

我们都知道抛出异常会终止消费,但是为什么要抛出异常呢?这注意是因为如果用户不抛出异常的话,flink会认为当前的数据时正常消费的,这就造成了我们的kafka数据误消费

2、关于并行度parallelism

并行度的配置都是setParallelism,对于env和stream来说,stream的优先级比env高

3、关于checkpoint

我们如果定义程序运行在SPring Boot时,一定要配置检查点这个是flink实现容错的核心配置!

4、关于并行度

我们在设置并行度的时候,将里边的数字设置为多少,最终就会有多少个线程来执行任务。
所以大家一定要清楚对于数据准确性高的数据来说,宁愿牺牲多线程带来的效率提升也要只设置一个线程来执行消费。
可能大家没有注意,如果你不设置flink的并行度为1时。它是以的是系统的线程数来作为并行度!这样顺序是会乱的。

5、saveBatch很好

但是我建议你先封装一下或者改为批量的保存。可能大家都知道或者说都用过mybatis plus的saveBatch,它能将一个列表的inseert封装为一条sql(insert into a values(a1),(a2),(a3),但是我们一条sql的长度过长的话会存在性能问题。建议在批量处理的时候每隔1000条记录saveBatch一次

为什么flink消费kafka比官方的listener都要快

1、并行度和分区处理: Flink 具有高度的并行度支持

可以为每个 Kafka 分区创建独立的消费者实例,以便并行地处理多个分区。这使得 Flink能够更有效地利用资源,并提高整体的消费速度。相比之下,一些官方 Kafka Consumer 实现可能没有明确的并行度配置或并行处理策略。

2、事件时间处理

Flink 强调【事件时间】处理,支持按照事件的实际发生时间进行【有序处理】。这对于一些需要处理时间相关业务逻辑的应用来说很重要。Flink 可以轻松处理乱序事件,并确保事件按照正确的顺序进行处理。官方 Kafka Consumer 也提供了类似的功能,但 Flink在这方面的设计更加深入和专注。

状态管理

Flink 提供了强大的状态管理机制,对于需要在处理过程中保持状态的任务,这一点非常重要。在消费 Kafka消息时,可能需要追踪某些状态,例如记录已处理的偏移量。Flink 的状态管理可以更好地支持这种场景,而官方 Kafka Consumer可能没有提供类似的状态管理机制。

异步处理模型:

Flink 使用异步处理模型,这使得在处理一条消息时,可以同时进行其他处理而无需等待。这有助于提高整体的处理效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/278110.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

迅为RK3568开发板使用OpenCV处理图像-ROI区域-位置提取ROI

在图像处理过程中&#xff0c;我们可能会对图像的某一个特定区域感兴趣&#xff0c;该区域被称为感兴趣区域&#xff08;Region of Interest, ROI&#xff09;。在设定感兴趣区域 ROI 后&#xff0c;就可以对该区域进行整体操作。 位置提取 ROI 本小节代码在配套资料“iTOP-3…

万户 OA OfficeServer.jsp 任意文件上传漏洞复现

0x01 产品简介 万户OA是面向政府组织及企事业单位的FlexOffice自主安全协同办公平台。 0x02 漏洞概述 万户OA OfficeServer.jsp接口存在任意文件上传漏洞,攻击者可通过该漏洞上传任意文件从而控制整个服务器。 0x03 复现环境 FOFA: (banner="OASESSIONID" &a…

机器人与3D视觉 Robotics Toolbox Python 二 空间位姿描述

空间位姿描述 二维空间位姿描述 二维空间位姿表示方法 from spatialmath.base import * from spatialmath import * T1 SE2(x3,y3,theta30,unit"deg") trplot2(T1.A,frame"T1",dims[0, 5, 0, 5]) T2transl2(3, 4) trplot2(T2,frame"T2",dims…

数据持久化与临时存储的对决:localStorage 与 sessionStorage(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

ArcGIS Pro SDK文件选择对话框

文件保存对话框 // 获取默认数据库var gdbPath Project.Current.DefaultGeodatabasePath;//设置文件的保存路径SaveItemDialog saveLayerFileDialog new SaveItemDialog(){Title "Save Layer File",OverwritePrompt true,//获取或设置当同名文件已存在时是否出现…

设计模式——桥接模式(结构型)

引言 桥接模式是一种结构型设计模式&#xff0c; 可将一个大类或一系列紧密相关的类拆分为抽象和实现两个独立的层次结构&#xff0c; 从而能在开发时分别使用。 问题 抽象&#xff1f; 实现&#xff1f; 听上去挺吓人&#xff1f; 让我们慢慢来&#xff0c; 先考虑一个简单的…

将 Github token 添加至远程仓库

将 Github token 添加至远程仓库后便于每次 push 重复输入的麻烦 首先,将已生成的 token 记录(注:生成后的 token 确认后便无法查看只能重新生成)并找到对应的项目 git 本地文件路径下 其次,将其与项目所关联,按如下格式配置即可 token 格式类似于 ghp_CAxxxxxxxxxxxxxxxxxGx5j…

C++STL中string详解(零基础/小白,字符串)

目录 1. 基本概念&#xff1a; 1.1 本质&#xff1a; 1.2 string和char*区别&#xff1a; 1.3 特点&#xff1a; 2. 构造函数(初始化) 3. 赋值操作 4. 字符串拼接 5 查找 和 替换 6. 字符串比较 7. 字符存取 8. 插入和删除 9. 子串获取 1. 基本概念&#xff1a; 1.…

字节与位在物联网传输协议中的使用

1个字节(byte) 8个位(bit) 如下例子&#xff0c;是一个上报数据类型的表格&#xff0c;总有48位(6个字节) 假如报文给的数据类型数据是&#xff1a; 0x06 时&#xff0c;06十六进制转为二进制&#xff0c;结果是00000110 那么在图下就是 (bit1 和 bit2 都为 1) &#xff…

[极客大挑战 2019]BuyFlag1

打开网站&#xff1a; 右上角有个菜单 (menu) &#xff0c;先点一下&#xff0c;然后就进入了 pay.php 页面。 其中关键信息如下&#xff1a; ## FlagFlag need your 100000000 money### attentionIf you want to buy the FLAG:You must be a student from CUIT!!!You must…

【离线】牛客小白月赛39 G

登录—专业IT笔试面试备考平台_牛客网 题意 思路 考虑离线Bit做法 这种离线Bit&#xff0c;一般都是去考虑二维数点就能写清楚了 确定好两维&#xff1a;x 轴是1 ~ n&#xff0c; y 轴是 k 的大小 然后去遍历值域&#xff0c;如果值域很大的话需要排序离散化&#xff0c;但…

RT-DETR 图片目标计数 | 特定目标进行计数

全类别计数特定类别计数如何使用 RT-DETR 进行对象计数 有很多同学留言说想学 RT-DETR 目标计数。那么今天这篇博客,我将教大家如何使用 RT-DETR 进行对象计数。RT-DETR 是一种非常强大的对象检测模型,它可以识别图像中的各种对象。我们将学习如何利用这个模型对特定对象进行…