Flink之DataStream API开发Flink程序过程与Flink常见数据类型

开发Flink程序过程与Flink常见数据类型

  • DataStream API
    • Flink三层API
    • DataStream API概述
  • 开发Flink程序过程
    • 添加依赖
    • 创建执行环境
    • 执行模式
    • 创建Data Source
    • 应用转换算子
    • 创建Data Sink
    • 触发程序执行
    • 示例
  • Flink常见数据类型
    • 基本数据类型
    • 字符串类型
    • 时间和日期类型
    • 数组类型
    • 元组类型
    • 列表类型
    • 映射类型
    • POJO类型
    • Row类型
    • 可序列化类型
    • 类型提示

DataStream API

Flink三层API

在这里插入图片描述

SQL & TableAPI

SQL & TableAPI同时适用于批处理和流处理,意味着可以对有界数据流和无界数据流以相同的语义进行查询,并产生相同的结果。除了基本查询外,它还支持自定义的标量函数,聚合函数以及表值函数,可以满足多样化的查询需求。

DataStream & DataSetAPI

DataStream & DataSetAPI是Flink数据处理的核心API,支持使用Java语言或Scala语言进行调用,提供了数据读取,数据转换和数据输出等一系列常用操作的封装。

StatefulStreamProcessing

StatefulStreamProcessing是最低级别的抽象,它通过ProcessFunction函数内嵌到DataStreamAPI中。ProcessFunction是Flink提供的最底层API,具有最大的灵活性,允许开发者对于时间和状态进行细粒度的控制。

DataStream API概述

Flink的DataStream API是Flink中最主要的API之一,它用于处理无限流数据。DataStreamAPI支持高级的流处理操作,例如窗口计算、状态管理、流分区等,并且在处理大规模数据时表现出色。

由于Flink DataSet和DataStream API的高度相似,并且DataStream API提供流批一体处理的能力,官方也推荐直接使用DataStream API,因此学习DataStream API如何使用即可。

流(STREAMING)执行模式适用于需要连续增量处理,而且预计无限期保持在线的无边界作业。

批(BATCH)执行模式适用于有一个已知的固定输入,而且不会连续运行的有边界作业。

开发Flink程序过程

确定需求:明确想要解决的问题或实现的功能。导入依赖:在项目中导入Apache Flink相关的依赖,可以使用Maven、Gradle或其他构建工具来管理依赖关系。创建StreamExecutionEnvironment:使用StreamExecutionEnvironment.getExecutionEnvironment()创建Flink的执行环境对象,它用于配置和执行流处理作业。读取数据:从适合的数据源(例如文件、Kafka、Socket等)读取数据,可以使用readTextFile()、addSource()等方法来读取数据并转换为DataStream。转换操作:对读取到的数据进行处理和转换操作,可以使用诸如map、flatmap、filter等方法来进行各种转换和处理。窗口操作(可选):如果需要对数据进行窗口操作(例如滚动窗口、滑动窗口等),可以使用Flink提供的窗口操作方法。结果处理:将转换后的数据写入文件、数据库、消息队列或其他输出源,或者使用print()、collect()等方法将数据打印到控制台。设置作业配置和调优(可选):根据需求和性能要求,可以设置作业的并行度、时间特性、状态后端、容错机制、资源配置等。执行作业:通过调用env.execute()方法来执行流处理作业。作业将提交到Flink集群或本地运行。监控和调试(可选):可以通过Flink的监控界面查看作业的状态和指标,并使用日志和调试工具追踪和解决问题。

添加依赖

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency>

创建执行环境

Flink程序可以在各种上下文环境中运行:

可以在本地 JVM 中执行程序可以提交到远程集群上运行

创建执行环境是使用StreamExecutionEnvironment类,调用这个类的静态方法来创建执行环境。

在这里插入图片描述

获取到程序执行环境后,还可以对执行环境进行灵活的设置。

可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

1.本地执行环境

使用createLocalEnvironment()方法创建一个本地执行环境

可以在调用时传入一个参数,指定默认的并行度,默认并行度是电脑CPU核心数

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

2.集群执行环境
使用createRemoteEnvironment("node01", 8888,"/root/demo.jar")方法创建一个集群执行环境

需要在调用时指定JobManager的主机名和端口号,以及在集群中运行的Jar包

/*** JobManager 主机名* JobManager 进程端口号* 提交给JobManager的JAR包*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("node01", 8888,"/root/demo.jar");

3.自适应执行环境

使用getExecutionEnvironment()方法根据当前运行的上下文直接得到正确的执行环境

如果程序独立运行,则返回一个本地执行环境。如果创建了jar包,然后在命令行调用它并提交到集群执行,则返回集群的执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

4.本地执行环境+Web UI
使用createLocalEnvironmentWithWebUI(conf)方法创建一个本地执行环境,同时启动Web监控UI。

需要创建一个配置文件,设置相关参数,如设置Web UI端口,默认使用端口8081

Configuration conf = new Configuration();
conf.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

执行模式

DataStream API执行模式包括:流执行模式、批执行模式和自动模式。

流执行模式Streaming

流执行模式是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。

批执行模式Batch

批执行模式是专门用于批处理的执行模式

自动模式AutoMatic

在自动模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

配置批执行模式

执行模式可以通过 execute.runtime-mode 设置来配置。有三种可选的值:

STREAMING: 经典 DataStream 执行模式(默认)BATCH:DataStream API 上进行批量式执行AUTOMATIC: 让系统根据数据源的边界性来决定

1.通过命令行配置

提交作业时,增加execution.runtime-mode参数,指定值为BATCH

bin/flink run -Dexecution.runtime-mode=BATCH

2.通过代码配置

// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 基于执行环境调用setRuntimeMode方法,传入BATCH模式。不建议,推荐通过命令行传递参数
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

创建Data Source

创建执行环境后,可以使用其提供的一些方法,通过这些方法可以创建Data Source

例如:从文件中读取数据:可以直接逐行读取数据,像读CSV文件一样,或使用任何第三方提供的source

String filePath = "data/test.text";final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.readTextFile(filePath);

应用转换算子

这将生成一个DataStream,然后可以在上面应用转换算子transformation来创建新的派生DataStream。可以调用DataStream上具有转换功能的方法来应用转换。

例如: 应用一个map的转换算子,它将通过把原始集合中的每一个字符串转换为一个整数来创建一个新的DataStream。

DataStream<String> text = ...;DataStream<Integer> parsed = text.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) {return Integer.parseInt(value);}
});

创建Data Sink

一旦有了包含最终结果的DataStream,就可以通过创建sink把它写到外部系统。

// 简单skin:将DataStream以文本格式写入path指定的文件
parsed.writeAsText("data/out");// 控制台打印
parsed.print();

触发程序执行

需要调用StreamExecutionEnvironment 的execute()、executeAsync()方法来触发程序执行

execute()方法将等待作业完成,然后返回一个JobExecutionResult,其中包含执行时间和累加器结果。

JobExecutionResult result = env.execute();

如果不想等待作业完成,使用executeAsync() 方法来触发作业异步执行。它会返回一个 JobClient,可以通过它与刚刚提交的作业进行通信。

JobClient jobClient = env.executeAsync();
JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

示例

public static void main(String[] args) throws Exception {// 获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从文件中读取数据DataStreamSource<String> text = env.readTextFile("data/test.text");// 应用转换算子DataStream<Integer> parsed = text.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) {int number = Integer.parseInt(value);System.out.println("number = " + number);return number;}});// 简单skin:将DataStream以文本格式写入path指定的文件parsed.writeAsText("data/out");// 控制台打印parsed.print();// 触发执行env.execute();}

Flink常见数据类型

原始数据类型:例如布尔值、整数(byte、short、int、long)、浮点数(float、double)和字符(char)字符串类型:表示为 Java 类型 String 或 scala 类型 String时间和日期类型:包括 Timestamp 和 Date,以及 Interval 类型,用于表示时间间隔数组类型:数组是同一类型的元素的有序集合元组类型:元组是不同类型的元素的有序集合列表类型:列表是具有相同元素类型的有序元素集合映射类型:映射是键值对的无序集合,键和值可以是任何类型POJO类型:POJO 是普通的 Java 对象,它们包含字段或属性,可以通过名称或 getter 和 setter 方法进行访问Row类型:Row 是一个有序的、命名的字段集合。与POJO类型类似,但没有setter 和getter方法可序列化类型:即实现 java.io.Serializable 接口的类型

基本数据类型

Flink支持Java中的所有基本数据类型,例如布尔值、整数(byte、short、int、long)、浮点数(float、double)和字符(char)。

在Flink中定义一个int类型的流

DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);

字符串类型

字符串类型在Flink中也很常见,可以使用Java或Scala中的String类型表示。

DataStream<String> stream = env.fromElements("hello", "world");

时间和日期类型

时间和日期类型包括DATE、TIME、TIMESTAMP类型,用于表示时间间隔。

DataStream<Tuple2<String, Timestamp>> stream = env.fromElements(Tuple2.of("event-1", new Timestamp(System.currentTimeMillis())),Tuple2.of("event-2", new Timestamp(System.currentTimeMillis() - 1000))
);

数组类型

数组是同一类型的元素的有序集合。包括基本数据类型数组(PRIMITIVE_ARRAY)和复杂数据类型数组(OBJECT_ARRAY。其中,基本数据类型数组可以是任意基本数据类型的数组,而复杂数据类型数组则可以是结构体或者嵌套的数组。

DataStream<int[]> stream = env.fromElements(new int[]{1, 2, 3}, new int[]{4, 5, 6});

元组类型

元组是复合类型,包含固定数量的各种类型的字段。Java API提供了从Tuple1到Tuple25,不支持空字段

元组是不同类型的元素的有序集合。也就是说元组的每个字段都可以是任意Flink 类型,包括更多元组,从而产生嵌套元组

DataStream<Tuple3<String, Integer, Double>> stream = env.fromElements(Tuple3.of("a", 1, 1.1),Tuple3.of("b", 2, 2.2)
);

列表类型

列表是具有相同元素类型的有序元素集合。

DataStream<List<String>> stream = env.fromElements(Arrays.asList("hello", "world"), Arrays.asList("foo", "bar"));

映射类型

映射是键值对的无序集合,键和值可以是任何类型。

Map<String, Integer> map1 = new HashMap<>();
map1.put("a", 1);
map1.put("b", 2);Map<String, Integer> map2 = new HashMap<>();
map2.put("c", 3);
map2.put("d", 4);DataStream<Map<String, Integer>> stream = env.fromElements(map1, map2);

POJO类型

POJO是普通的Java对象,它们包含字段或属性,可以通过名称或getter和setter方法进行访问

Flink对POJO 类型的要求如下:

类是公有public的
有一个无参的构造方法
所有属性都是公有public的,要么必须可通过 getter 和 setter 函数访问
所有属性的类型都是可以序列化的
public class Person {public String name;public int age;public String getName() { return name; }public void setName(String name) { this.name = name; }public int getAge() { return age; }public void setAge(int age) { this.age = age; }
}DataStream<Person> stream = env.fromElements(new Person("Alice", 25),new Person("Bob", 30)
);

Row类型

Row是一个有序的、命名的字段集合。与 POJO类型类似,但没有setter 和 getter 方法。可以认为是具有任意个字段的元组,并支持空字段。

可序列化类型

即实现 java.io.Serializable 接口的类型。

public class MySerializableClass implements Serializable {private int value;public MySerializableClass(int value)

类型提示

Flink的类型提示Type Hints机制,它可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。也就是说可以帮助Flink更好地理解数据集中元素的类型,从而提高程序的性能。

使用TypeHint或Types类来指定数据集元素的类型

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> input = env.fromCollection(Arrays.asList("a b", "b c", "c d"));SingleOutputStreamOperator<Tuple2<String, Integer>> sum = input.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}})// 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据.returns(new TypeHint<Tuple2<String, Integer>>() {})
//                .returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(value -> value.f0).sum(1);sum.print();env.execute();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/133956.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java 进阶篇】JavaScript Array数组详解

当我们编写JavaScript代码时&#xff0c;经常需要处理一组数据。JavaScript中的数组&#xff08;Array&#xff09;是一种用于存储多个值的数据结构&#xff0c;它提供了许多方法和功能&#xff0c;使我们能够方便地操作这些数据。在本篇博客中&#xff0c;我们将详细探讨JavaS…

“Jwt认证在前后端分离架构中的应用与优化“

目录 引言1. JWT的简介1.1 什么是JWT1.2 JWT的优势 2. JWT工具类2.1 JWT生成与解析2.2 JWT与安全性 3. JWT案例演示后台改动前台改动 总结 引言 在当今互联网应用开发中&#xff0c;前后端分离架构已经成为一种主流的开发模式。而身份验证和授权是保证系统安全性的重要环节之一…

内存空间的分配与回收之连续分配管理方式

1.连续分配管理方式 连续分配:指为用户进程分配的必须是一个连续的内存空间。 1.单一连续分配 在单一连续分配方式中&#xff0c;内存被分为系统区和用户区。系统区通常位于内存的低地址部分&#xff0c;用于存放操作系统相关数据;用户区用于存放用户进程相关数据。内存中只…

Windows下DataGrip连接Hive

DataGrip连接Hive 1. 启动Hadoop2. 启动hiveserver2服务3. 启动元数据服务4. 启动DG 1. 启动Hadoop 在控制台中输入start-all.cmd后&#xff0c;弹出下图4个终端&#xff08;注意终端的名字&#xff09;2. 启动hiveserver2服务 单独开一个窗口启动hiveserver2服务&#xff0c;…

HarmonyOS 远端状态订阅开发实例

IPC/RPC 提供对远端 Stub 对象状态的订阅机制&#xff0c; 在远端 Stub 对象消亡时&#xff0c;可触发消亡通知告诉本地 Proxy 对象。这种状态通知订阅需要调用特定接口完成&#xff0c;当不再需要订阅时也需要调用特定接口取消。使用这种订阅机制的用户&#xff0c;需要实现消…

clone()方法使用时遇到的问题解决方法(JAVA)

我们平时在自定义类型中使用这个方法时会连续遇到 4 个问题。 基础代码如下&#xff1a; class A {int[] a {1,2,3}; }public class Test {public static void main(String[] args) {} } 第一个&#xff1a; 当我们直接调用时报错原因是Object类中的clone方法是被protecte…

竞赛 深度学习 YOLO 实现车牌识别算法

文章目录 0 前言1 课题介绍2 算法简介2.1网络架构 3 数据准备4 模型训练5 实现效果5.1 图片识别效果5.2视频识别效果 6 部分关键代码7 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于yolov5的深度学习车牌识别系统实现 该项目较…

支付宝企业转账到个人账号[新接口版](php源码,亲测)

前言 之前专栏写过一篇企业支付宝转账到个人的文章,里面用的是老接口,官方已经不再维护。最近有人找到帮忙使用新接口实现这个功能,看了下文档以及官方的sdk,为了这一个接口,我还要去下载官方庞大的sdk,而且php低版本的还不支持composer,就很离谱,经过一天的研究,把单…

Pytorch之ConvNeXt图像分类

文章目录 前言一、ConvNeXt设计决策1.设计方案2.Training Techniques3.Macro Design&#x1f947;Changing stage compute ratio&#x1f948;Change stem to "Patchify" 4.ResNeXt-ify5. Inverted Bottleneck6.Large Kernel Size7.Micro Design✨Replacing ReLU wit…

Diffusion Model论文/DALL E 2

B站视频学习记录 Ho J, Jain A, Abbeel P. Denoising diffusion probabilistic models[J]. Advances in neural information processing systems, 2020, 33: 6840-6851. Atwood J, Towsley D. Diffusion-convolutional neural networks[J]. Advances in neural information pro…

基于PLC的机械手控制系统设计

目录 摘 要......................................................................................................................... 1 第一章 绪论.............................................................................................................…

接口

一、认识接口 public interface A {//成员变量&#xff08;常量&#xff09;String NAME"啦啦啦";//成员方法 抽象方法void test();//不能有代码块 // static {}//不能有构造器 // public A{ // // }// 接口不能创建对象 }二、接口的好处 三、JDK8 接口新增…