Apache Flink(五):Apache Flink快速入门 - 环境准备及入门案例

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. Flink开发环境准备

2. Flink入门案例

2.1 IDEA Project创建及配置

​​​​​​​2.2 案例数据准备


​​​​​​​1. Flink开发环境准备

学习一门新的编程语言时,往往会从“hello world”程序开始,而接触一套新的大数据计算框架时,则一般会从WordCount案例入手,下面以大数据中最经典入门案例WordCount为例,来编写Flink代码,Flink底层源码是基于Java代码进行开发,在Flink编程中我们除了可以使用Java语言来进行编写Flink程序外,还可以使用Scala、Python语言来进行编写Flink程序,在后续章节中我们将会主要使用Java和Scala来编写Flink程序。下面来准备下Flink开发环境。

  • Flink版本

本套课程中我们采用Flink最新版本1.16.0,Flink1.16.0版本官方文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/

  • JDK环境

Flink核心模块均采用Java开发,所以运行环境需要依赖JDK,Flink可以基于类UNIX 环境中运行,例如:Linux、Max OS、Windows等,在这些系统上运行Flink时都需要配置JDK环境,Flink 1.16.0版本需要JDK版本为JDK11,目前版本也支持使用JDK8,后续版本对JDK8的支持将会移除。

考虑到Flink后期与一些大数据框架进行整合,这些大数据框架对JDK11的支持并不完善,例如:Hive3.1.3版本还不支持JDK11,所以本课程采用JDK8来开发Flink。对JDK8安装及配置不再详述。

附:JDK11 下载地址如下:

https://www.oracle.com/java/technologies/javase-jdk11-downloads.html。

  • 开发工具

我们可以选择IntelliJ IDEA或者Eclipse作为Flink应用的开发IDE,Flink开发官方建议使用IntelliJ IDEA,因为它默认集成了Scala和Maven环境,使用更加方便,我们这门课使用IntelliJ IDEA开发工具,具体安装步骤不再详述。

  • Maven环境

通过IntelliJ IDEA进行开发Flink Application时,可以使用Maven来作为项目jar包管理工具,需要在本地安装Maven及配置Maven的环境变量,需要注意的是,Maven版本需要使用3.0.4及以上,否则编译或开发过程中会有问题。这里使用Maven 3.2.5版本。

  • Scala环境

Flink开发语言可以选择Java、Scala、Python,如果用户选择使用Scala作为Flink应用开发语言,则需要安装Scala执行环境。

在Flink1.15之前版本,如果只是使用Flink的Java api ,对于一些没有Scala模块的包和表相关模块的包需要在Maven引入对应的包中加入scala后缀,例如:flink-table-planner_2.11,后缀2.11代表的就是Scala版本。在Flink1.15.0版本后,Flink添加对opting-out(排除) Scala的支持,如果你只使用Flink的Java api,导入包也不必包含scala后缀,你可以使用任何Scala版本。如果使用Flink的Scala api,需要选择匹配的Scala版本。

从Flink1.7版本往后支持Scala 2.11和2.12版本,目前Flink1.15.0版本后只支持Scala 2.12,不再支持Scala 2.11。Scala环境可以通过本地安装Scala执行环境,也可以通过Maven依赖Scala-lib引入,如果本地安装了Scala某个版本,建议在Maven中添加Scala-lib依赖。Scala2.12.8之后的版本与之前的2.12.x版本不兼容,建议使用Scala2.12.8之后版本。

  • Hadoop环境

Flink可以操作HDFS中的数据及基于Yarn进行资源调度,所以需要对应的Hadoop环境,Flink1.16.0版本支持的Hadoop最低版本为2.8.5,本课程中我们使用Hadoop3.3.4版本。关于Hadoop3.3.4版本搭建,参照第三章节。

​​​​​​​2. Flink入门案例

需求:读取本地数据文件,统计文件中每个单词出现的次数。​​​​​​​

2.1 IDEA Project创建及配置

本课程编写Flink代码选择语言为Java和Scala,所以这里我们通过IntelliJ IDEA创建一个目录,其中包括Java项目模块和Scala项目模块,将Flink Java api和Flink Scala api分别在不同项目模块中实现。步骤如下:

1) 打开IDEA,创建空项目

2) 在IntelliJ IDEA中安装Scala插件

使用IntelliJ IDEA开发Flink,如果使用Scala api 那么还需在IntelliJ IDEA中安装Scala的插件,如果已经安装可以忽略此步骤,下图为以安装Scala插件。

3) 打开Structure,创建项目新模块

创建Java模块:

继续点击“+”,创建Scala模块:

创建好“FlinkScalaCode”模块后,右键该模块添加Scala框架支持,并修改该模块中的“java”src源为“scala”:

在“FlinkScalaCode”模块Maven pom.xml中引入Scala依赖包,这里使用的Scala版本为2.12.10。

<!-- Scala包 -->
<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.10</version>
</dependency>
<dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.12.10</version>
</dependency>
<dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.12.10</version>
</dependency>

4) Log4j日志配置

为了方便查看项目运行过程中的日志,需要在两个项目模块中配置log4j.properties配置文件,并放在各自项目src/main/resources资源目录下,没有resources资源目录需要手动创建并设置成资源目录。log4j.properties配置文件内容如下:

log4j.rootLogger=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n

并在两个项目中的Maven pom.xml中添加对应的log4j需要的依赖包,使代码运行时能正常打印结果:

<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.36</version>
</dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.17.2</version>
</dependency>

5) 分别在两个项目模块中导入Flink Maven依赖

“FlinkJavaCode”模块导入Flink Maven依赖如下:

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink.version>1.16.0</flink.version><slf4j.version>1.7.36</slf4j.version><log4j.version>2.17.2</log4j.version>
</properties><dependencies><!-- Flink DataStream 依赖包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink批和流开发依赖包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- slf4j&log4j 日志相关包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>${log4j.version}</version></dependency></dependencies>

“FlinkScalaCode”模块导入Flink Maven依赖如下:

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink.version>1.16.0</flink.version><slf4j.version>1.7.31</slf4j.version><log4j.version>2.17.1</log4j.version><scala.version>2.12.10</scala.version><scala.binary.version>2.12</scala.binary.version>
</properties><dependencies><!-- Flink批和流开发依赖包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Scala包 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><!-- slf4j&log4j 日志相关包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>${log4j.version}</version></dependency></dependencies>

注意:在后续实现WordCount需求时,Flink Java Api只需要在Maven中导入“flink-clients”依赖包即可,而Flink Scala Api 需要导入以下三个依赖包:

flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients

主要是因为在Flink1.15版本后,Flink添加对opting-out(排除)Scala的支持,如果你只使用Flink的Java api,导入包不必包含scala后缀,如果使用Flink的Scala api,需要选择匹配的Scala版本。

​​​​​​​​​​​​​​2.2 案例数据准备

在项目“MyFlinkCode”中创建“data”目录,在目录中创建“words.txt”文件,向文件中写入以下内容,方便后续使用Flink编写WordCount实现代码。

hello Flink
hello MapReduce
hello Spark
hello Flink
hello Flink
hello Flink
hello Flink
hello Java
hello Scala
hello Flink
hello Java
hello Flink
hello Scala
hello Flink
hello Flink

 更多Flink内容参考下个博文:Apache Flink(六):Apache Flink快速入门 - Flink案例实现


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/238876.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Yocto版本信息查询

文章目录 yocto官方发布版本当前版本完整版本信息yocto与内核版本对应 Yocto工程查找版本Yocto镜像查找版本启动串口打印系统配置 参考 yocto官方发布版本 当前版本 如下图所示&#xff0c;当前yocto的主要维护版本&#xff0c;几乎每年一年版本&#xff0c;当前为5.0版本 …

C语言之联合和枚举

C语言之联合和枚举 文章目录 C语言之联合和枚举1. 联合体1.1 联合体的声明1.2 联合体的特点1.3 结构体和联合体对比1.4 联合体大小的计算1.5 联合体小练习 2. 枚举2.1 枚举类型的声明2.2 枚举类型的优点2.3 枚举类型的使用 1. 联合体 1.1 联合体的声明 像结构体⼀样&#xff…

TZOJ 1429 小明A+B

答案&#xff1a; #include <stdio.h> int main() {int T0, A0, B0, sum0;scanf("%d", &T); //输入测试数据的组数while (T--) //循环T次{scanf("%d %d", &A, &B); //输入AB的值sum A B;if (sum > 100) //如果是三位数{…

java原子类型

AtomicBoolean AtomicInteger AtomicLong AtomicReference<V> StringBuilder - 不是原子类型。StringBuilder 是 java.lang 包下的类 用法&#xff1a;无需回调改变数值

k8s ingress 无法找到端点

文章目录 ingress rule无法找到端点这个注解是什么意思呢&#xff1f;为何不生效呢&#xff1f;端点无法更新&#xff1f;如何确认ingressclass呢&#xff1f;修复端点无法发现的问题多个ingress controller 架构 ingress rule无法找到端点 在vnnox-cn集群创建ingress&#xf…

SCA技术进阶系列(四):DSDX SBOM供应链安全应用实践

一、SBOM的发展趋势 数字时代&#xff0c;软件已经成为维持生产生活正常运行的必备要素之一。随着容器、中间件、微服务、 DevOps等技术理念的演进&#xff0c;软件行业快速发展&#xff0c;但同时带来软件设计开发复杂度不断提升&#xff0c;软件供应链愈发复杂&#xff0c;软…

六、三台主机免密登录和时钟同步

目录 1、免密登录 1.1 为什么要免密登录 1.2 免密 SSH 登录的原理

WordPress采集器自动采集发布的工具

WordPress作为最受欢迎的内容管理系统之一&#xff0c;其强大的功能和灵活性使其成为许多网站、博客和电子商务平台的首选。WordPress采集器自动采集发布内置采集规则是一项备受关注的功能&#xff0c;让用户可以轻松收集并发布内容。WordPress采集器自动采集发布内置采集规则的…

Unittest(1):unittest单元测试框架简介setup前置初始化和teardown后置操作

unittest单元测试框架简介 unittest是python内置的单元测试框架&#xff0c;具备编写用例、组 织用例、执行用例、功能&#xff0c;可以结合selenium进行UI自动化测 试&#xff0c;也可以结合appium、requests等模块做其它自动化测试 官方文档&#xff1a;https://docs.pytho…

Beta冲刺总结随笔

这个作业属于哪个课程软件工程A这个作业要求在哪里beta冲刺事后诸葛亮作业目标Beta冲刺总结随笔团队名称橘色肥猫团队置顶集合随笔链接Beta冲刺笔记-置顶-橘色肥猫-CSDN博客 文章目录 一、Beta冲刺完成情况二、改进计划完成情况2.1 需要改进的团队分工2.2 需要改进的工具流程 三…

【实验】配置设备作为DHCP Client,动态获取IP地址案例

热门IT课程介绍及视频教程文章浏览阅读56次。认证课程介绍&#xff1a;华为HCIA试听课程 &#xff1a; 华为HCIA试听课程&#xff1a;华为HCIA试听课程&#xff1a;华为HCIP试听课程&#xff1a;思科CCNA试听课程&#xff1a;思科CCNA试听课程&#xff1a;思科CCNA试听课程&…

MSSQL注入

目录 基本的UNION注入&#xff1a; 错误基于的注入&#xff1a; 时间基于的盲注入&#xff1a; 堆叠查询&#xff1a; 理解MSSQL注入是学习网络安全的一部分&#xff0c;前提是您在合法、授权的环境中进行&#xff0c;用于了解如何保护您的应用程序免受此类攻击。以下是有关…