Flink DataSource介绍

介绍

Flink的Data Source(数据源、源算子)是Flink作业的起点,它定义了数据输入的来源。Flink可以从各种数据来源获取数据,例如文件系统、消息队列、数据库等。以下是对Flink Data Source的详细介绍:

概述:

  • Flink中的Data Source用于定义数据输入的来源。
  • 将数据源添加到Flink执行环境中,可以创建一个数据流。
  • Flink支持多种类型的数据源,包括内置数据源和自定义数据源。

内置数据源:

  • 基于集合构建:使用Flink的API(如fromCollection、fromElements等)将Java或Scala中的集合数据转化为数据流进行处理。
  • 基于文件构建:从文件系统中读取数据,支持多种文件格式,如CSV、JSON等。
  • 基于Socket构建:从Socket连接中读取数据,适用于实时流数据场景。

自定义数据源:

  • Flink允许用户通过实现SourceFunction接口或扩展RichParallelSourceFunction来自定义数据源。
  • 常见的自定义数据源包括从第三方系统连接器(如Kafka、RabbitMQ、MongoDB等)中读取数据。

添加数据源到Flink执行环境:

  • 使用StreamExecutionEnvironment.addSource(sourceFunction)方法将数据源添加到Flink执行环境中。
  • sourceFunction需要实现SourceFunction接口或扩展RichParallelSourceFunction。

数据流处理:

  • 一旦数据源被添加到Flink执行环境中,就可以创建一个数据流(DataStream)。
  • 接下来,可以使用Flink的各种算子(如map、filter、reduce等)对数据流进行转换处理。

输出结果:

  • 处理后的数据可以写入其他系统,如文件系统、数据库、消息队列等。
  • Flink支持多种输出方式,如使用DataStream的writeAsText、writeAsCsv等方法将数据写入文件,或使用Flink的连接器将数据写入Kafka、HBase等系统。

总之,Flink的Data Source是构建Flink数据流处理应用的重要组成部分。通过选择合适的数据源和输出方式,可以方便地构建高效、可靠的数据流处理应用。

样例

程序中添加数据源

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new SourceMySQL()).print();env.execute("Flink add mysql sourc");

Flink 已经提供了若干实现好了的SourceFunctions也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source,

stream sources

StreamExecutionEnvironment 加载Source

基于集合:

  1. fromCollection(Collection)
  2. fromCollection(Iterator, Class)
  3. fromElements(T …)
  4. fromParallelCollection(SplittableIterator, Class)
  5. generateSequence(from, to)
    例如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(new Event(1, "ba", 4.0),new Event(2, "st", 5.0),new Event(3, "fo", 6.0),...
);

文件

  1. readTextFile(String filePath)
  2. readTextFile(String filePath, String charsetName)
  3. readFile(FileInputFormat inputFormat, String filePath)
    样例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/file");

Socket

  1. socketTextStream(String hostname, int port)
  2. socketTextStream(String hostname, int port, String delimiter)
  3. socketTextStream(String hostname, int port, String delimiter, long maxRetry)
    样例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 8888) // 监听 localhost 的 8888端口过来的数据.flatMap(new Splitter()).keyBy(0).timeWindow(Time.seconds(5)).sum(1);

自定义资源

通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。
继承类

SourceFunction 非并行

SourceFunction 是一个用于定义数据源的函数接口。这个接口的实现通常负责从外部系统(如 Kafka、文件系统、数据库等)读取数据,并将这些数据作为 Flink 流处理或批处理作业的输入。

SourceFunction 通常与 Flink 的 DataStream API 一起使用,以定义和构建数据流处理任务。尽管 Flink 的内部实现和 API 可能会随着版本的更新而有所变化,但通常,一个 SourceFunction 会被实现为:

  1. 创建一个可以持续生成数据或等待外部数据到达的组件。
  2. 当有新数据到达时,将数据作为 Flink 的 DataStream 的一部分进行发射(emit)。

在 Flink 的内部,SourceFunction 可能会有多种不同的实现,具体取决于其要处理的数据源类型。例如,对于 Kafka 这样的消息队列,Flink 提供了专门的 Kafka 连接器和相应的 SourceFunction 实现,用于从 Kafka 主题中读取数据。

在实现自定义的 SourceFunction 时,你需要考虑以下几个方面:

  • 数据源的连接和断开连接逻辑。
  • 数据的读取和解析逻辑。
  • 如何在 Flink 运行时环境中优雅地处理可能的错误和失败。
  • 如何将读取的数据转换为 Flink 可以理解的格式(如 Tuple、POJO 或其他自定义类型)。

ParallelSourceFunction 并行

ParallelSourceFunction 是一个接口,用于定义并行数据源的行为。这个接口允许你创建自定义的数据源,这些数据源能够并行地读取数据并传递给 Flink 的数据处理管道。

ParallelSourceFunction 继承自 SourceFunction,但增加了并行处理的能力。当 Flink 任务需要并行处理多个数据流时,你可以通过实现 ParallelSourceFunction 来创建并行数据源。

Flink 还提供了一个 RichParallelSourceFunction 抽象类,它是 ParallelSourceFunction 的子类,并提供了更多的生命周期方法和上下文信息。使用 RichParallelSourceFunction 可以让你更容易地管理你的并行数据源,因为它提供了诸如 open()、close() 和 cancel() 等方法,这些方法可以在数据源的生命周期中的不同阶段被调用。

下面是一个简单的示例,演示了如何使用 RichParallelSourceFunction 创建一个并行数据源,该数据源生成递增的数字:

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;  
public class IncrementingNumberSource extends RichParallelSourceFunction<Long> {  private volatile boolean running = true;  private long count = 0L;  @Override  public void run(SourceContext<Long> ctx) throws Exception {  while (running) {  synchronized (ctx.getCheckpointLock()) {  ctx.collect(count++);  // 这里可以添加一些休眠或其他逻辑来控制数据的生成速度  Thread.sleep(100);  }  }  }  @Override  public void cancel() {  running = false;  }  
}

在这个示例中,IncrementingNumberSource 类继承了 RichParallelSourceFunction,并覆盖了 run() 和 cancel() 方法。在 run() 方法中,我们创建了一个无限循环来生成递增的数字,并使用 ctx.collect() 方法将每个数字发送到 Flink 的数据处理管道中。在 cancel() 方法中,我们设置了一个标志来停止 run() 方法中的循环,以便在需要时可以优雅地关闭数据源。

自定义资源DEMO

/*** Desc: 自定义 source mysql 数据*/
public class SourceMySQL extends RichSourceFunction<Map<String, Object>> {PreparedStatement ps;private Connection connection;/*** open 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = MySQLUtil.getConnection("com.mysql.jdbc.Driver","jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456");String sql = "select * from ST;";ps = this.connection.prepareStatement(sql);}/*** 关闭连接和释放资源的动作*/@Overridepublic void close() throws Exception {super.close();if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** DataStream 从run方法用来获取数据*/@Overridepublic void run(SourceContext<Map<String, Object>> ctx) throws Exception {ResultSet resultSet = ps.executeQuery();while (resultSet.next()) {Map<String, Object> rs = new HashMap<>();rs.put("id", resultSet.getInt("id"));rs.put("name", resultSet.getString("name").trim());ctx.collect(rs);}}@Overridepublic void cancel() {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/679262.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

内存卡突然罢工?数据恢复有高招!

内存卡作为我们日常生活中常见的存储设备&#xff0c;广泛应用于手机、相机等设备中。然而&#xff0c;有时我们会遇到内存卡损坏打不开的情况&#xff0c;这时该如何应对呢&#xff1f;本文将为您详细解析内存卡损坏的原因&#xff0c;并提供有效的数据恢复方案&#xff0c;帮…

射频无源器件之电桥

一. 电桥的定义及作用 电桥主要用于实现微波大功率功放系统的功率合成分配,信号采集等功能,被广泛应用于中国及全球4G/5G基站、5G网络覆盖、北斗导航天线、车载高精度导航(无人驾驶)天线等。可将信号分成有相位差的两路,90度电桥相位差90,180度电桥相位差180。 常说的3d…

[机器学习-01]一文了解|机器学习简介、工具选择和Python包基础应用

目录 前言 正文 01-机器学习简介 &#xff08;1&#xff09;诞生过程 &#xff08;2&#xff09;人工智能、机器学习和深度学习之间的关系 &#xff08;3&#xff09;机器学习核心 02-机器学习工具 &#xff08;1&#xff09;Anaconda简介 &#xff08;2&#xff09;Jupyte…

2024.05.08作业

登陆部分代码 /登陆槽函数 void Widget::btn_clicked() {if(edit1->text()"Admin" && edit2->text()"123456"){//登陆成功对话框QMessageBox box(QMessageBox::Information,"信息对话框","登陆成功",QMessageBox::Ok,t…

利用Python简单操作MySQL数据库,轻松实现数据读写

PyMySQL是Python编程语言中的一个第三方模块&#xff0c;它可以让Python程序连接到MySQL数据库并进行数据操作。它的使用非常简单&#xff0c;只需要安装PyMySQL模块&#xff0c;然后按照一定的步骤连接到MySQL数据库即 可。本文将介绍PyMySQL的安装、连接MySQL数据库、创建表、…

ZL-0895小动物活动记录仪可同时检测8只动物的活动量

简单介绍&#xff1a; 小动物活动记录仪是一种多用途、宽范围的小动物活动记录仪器&#xff0c;可用于小鼠、大鼠、豚鼠和兔的实验&#xff0c;小动物活动记录仪​具有不需对动物使用特别盛具的特点&#xff0c;可在不改变动物原生活环境的情况下&#xff0c;进行实时监测&…

今日刷三题(day11):不同路径的数目(一)+短距离最小路径和+把数字翻译成字符串

题目一&#xff1a;不同路径的数目&#xff08;一&#xff09; 题目描述&#xff1a; 一个机器人在mn大小的地图的左上角&#xff08;起点&#xff09;。机器人每次可以向下或向右移动。机器人要到达地图的右下角&#xff08;终点&#xff09;。可以有多少种不同的路径从起点…

实战BACnet/IP标准通信网关在楼宇自动化中的应用

智慧楼宇建设实现不同设备间的互联互通是一项巨大挑战&#xff0c;尤其是在那些历史悠久的建筑中&#xff0c;新旧系统并存的情况尤为普遍。某大型商业综合体就面临着这样的困境&#xff1a;老旧的暖通空调系统采用Modbus RTU协议&#xff0c;而新部署的能源管理系统却要求BACn…

物联网实战--平台篇之(三)账户后台数据库

目录 一、账户后台设计 二、账户数据库 三、数据库操作——增 四、数据库操作——改 五、数据库操作——查 本项目的交流QQ群:701889554 物联网实战--入门篇https://blog.csdn.net/ypp240124016/category_12609773.html 物联网实战--驱动篇https://blog.csdn.net/ypp240…

C++反射之检测struct或class是否实现指定函数

目录 1.引言 2.检测结构体或类的静态函数 3.检测结构体或类的成员函数 3.1.方法1 3.2.方法2 1.引言 诸如Java, C#这些语言是设计的时候就有反射支持的。c没有原生的反射支持。并且&#xff0c;c提供给我们的运行时类型信息非常少&#xff0c;只是通过typeinfo提供了有限的…

深入了解 NumPy:深度学习中的数学运算利器

文章目录 1. 导入NumPy2. 创建NumPy数组3. 数组的算术运算4. N维数组4.1 创建和操作多维数组4.2 高维数组 5. NumPy的广播功能5.1 基本广播示例5.2 更复杂的广播示例 6. 访问数组元素6.1 基于索引的访问6.2 遍历数组6.3 基于条件的访问6.4 高级索引6.5 性能考虑 在深度学习和数…

基于java,SpringBoot和Vue的智慧校园在线考试留言讨论系统设计

摘要 基于Java, SpringBoot和Vue的智慧校园在线考试留言讨论系统是一个为现代教育需求定制的Web应用&#xff0c;它结合了最新的前后端技术来提供一个互动性强、用户友好的学习和交流平台。该系统旨在通过提供实时留言和讨论功能&#xff0c;增进学生间的互动以及师生之间的沟…