【Flink入门修炼】1-3 Flink WordCount 入门实现

本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对 Flink 的各种概念和架构进行介绍。
下面将从创建项目开始,介绍如何创建出一个 Flink 项目;然后从 DataStream 流处理和 FlinkSQL 执行两种方式来带大家学习 WordCount 程序的开发。
Flink 各版本之间变化较多,之前版本的函数在后续版本可能不再支持。跟随学习时,请尽量选择和笔者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、创建项目

在很多其他教程中,会看到如下来创建 Flink 程序的方式。虽然简单方便,但对初学者来说,不知道初始化项目的时候做了什么,如果报错了也不知道该如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通过指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 来创建一个新的工程。同时 Flink 给我提供了更为方便的创建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我们手动来创建一个 Maven 项目,看看到底如何创建出一个 Flink 项目。
1、通过 IDEA 创建一个 Maven 项目
image.png

2、pom.xml 添加:
这里我们选择的是 Flink 1.13.2 版本(Flink 1.14 之后部分类和函数有变化,可自行探索)。

    <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.2</flink.version> <!-- 1.14 之后部分类和函数有变化,可自行探索 --><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies>

二、DataStream WordCount

一)编写程序

基础项目环境已经搞好了,接下来我们模仿一个流式环境,监听本地的 Socket 端口,使用 Flink 统计流入的不同单词个数。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {//参数检查if (args.length != 2) {// System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");// return;args = new String[]{"127.0.0.1", "9000"};}String hostname = args[0];Integer port = Integer.parseInt(args[1]);// 创建 streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据DataStreamSource<String> stream = env.socketTextStream(hostname, port);// 计数SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);sum.print();env.execute("Java WordCount from SocketTextStream Example");}public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {String[] tokens = s.toLowerCase().split("\\W+");for (String token: tokens) {if (token.length() > 0) {collector.collect(new Tuple2<String, Integer>(token, 1));}}}}
}

二)测试

接下来我们进行程序测试。
我们在本地使用 netcat 命令启动一个端口:

nc -l 9000

然后启动程序,能看到控制台一些输出:
image.png

接下来,在 nc 中输入:

$ nc -l 9000
hello world
flink flink flink

回到我们的程序,能看到统计的输出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

image.png

三)如果有报错

如果出现执行报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormatat com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormatat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:419)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)at java.lang.ClassLoader.loadClass(ClassLoader.java:352)... 1 more

在 IDE 中把 「Add dependencies with “Provided” scope to classpath」勾选上:
image.png

三、Flink Table & SQL WordCount

一)介绍 FlinkSQL

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
上面单词统计的逻辑可以转化为下面的 SQL。
直接来看这个 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要进行单词统计的表,我们会先做一些处理,将输入的单词都存放到这个表中
  • 表我们定义为两列(word, frequency),初始转化输入每个单词占一行,frequency 都是 1
  • 然后,就可以按照 SQL 的逻辑来进行统计聚合了。

其中,WordCount 表数据如下:

wordfrequency
hello1
world1
flink1
flink1
flink1

那么接下来我们看,如何写一个 FlinkSQL 的程序。

二)环境和程序

首先,添加 FlinkSQL 需要的依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>${flink.version}</version></dependency>

程序如下:

public class SQLWordCount {public static void main(String[] args) throws Exception {// 创建上下文环境ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);// 读取一行模拟数据作为输入String words = "hello world flink flink flink";String[] split = words.split("\\W+");ArrayList<WC> list = new ArrayList<>();for (String word : split) {WC wc = new WC(word, 1);list.add(wc);}DataSource<WC> input = fbEnv.fromCollection(list);// DataSet 转 SQL,指定字段名Table table = fbTableEnv.fromDataSet(input, "word,frequency");table.printSchema();// 注册为一个表fbTableEnv.createTemporaryView("WordCount", table);Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);ds1.printToErr();}public static class WC {public String word;public long frequency;public WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return  word + ", " + frequency;}}
}

执行,结果输出:

(`word` STRING,`frequency` BIGINT
)
flink, 3
world, 1
hello, 1

image.png

四、小结

本篇手把手的带大家搭建起 Flink Maven 项目,然后使用 DataStream 和 FlinkSQL 两种方式来学习 WordCount 单词计数这一最简单最经典的 Flink 程序开发。跟着步骤一步步执行下来,大家应该对 Flink 程序基本执行流程有个初步的了解,为后续的学习打下了基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/459553.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flutter开发实战-ijkplayer视频播放器功能

flutter开发实战-ijkplayer视频播放器功能 使用better_player播放器进行播放视频时候&#xff0c;在Android上会出现解码失败的问题&#xff0c;better_player使用的是video_player&#xff0c;video_player很多视频无法解码。最终采用ijkplayer播放器插件&#xff0c;在flutt…

[机器学习]K-means——聚类算法

一.K-means算法概念 二.代码实现 # 0. 引入依赖 import numpy as np import matplotlib.pyplot as plt # 画图依赖 from sklearn.datasets import make_blobs # 从sklearn中直接生成聚类数据# 1. 数据加载 # 生成&#xff08;n_samples&#xff1a;样本点&#xff0c;centers&…

06-OpenFeign-使用HtppClient连接池

默认下OpenFeign使用URLConnection 请求连接&#xff0c;每次都需要创建、销毁连接 1、添加ApacheHttpClient依赖 <!-- 使用Apache HttpClient替换Feign原生httpclient--><dependency><groupId>org.apache.httpcomponents</groupId><artifact…

【精选】java继承进阶——构造方法的访问特点 this、super使用

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…

相机图像质量研究(7)常见问题总结:光学结构对成像的影响--镜片固化

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

Excel——分类汇总

1.一级分类汇总 Q&#xff1a;请根据各销售地区统计销售额总数。 第一步&#xff1a;排序&#xff0c;我们需要根据销售地区汇总数据&#xff0c;我们就要对【销售地区】的内容进行排序。点击【销售地区】列中任意一个单元格&#xff0c;选择【数据】——【排序】&#xff0c…

【快速上手QT】02-学会查看QT自带的手册QT助手

QT助手 为什么大家都说QT简单&#xff0c;第一点就是确实简单&#xff08;bushi&#xff09;。 我个人觉得最关键的点就是人家QT官方就给你准备好了文档&#xff0c;甚至还有专门的IDE——QtCreator&#xff0c;在QTCreator里面还有很多示例代码&#xff0c;只要你会C的语法以…

1.CVAT建项目步骤

文章目录 1. 创建project2. 创建task2.1. label 标签详解2.2.高级配置 Advanced configuration 3. 分配任务4. 注释者规范 CVAT的标注最小单位是Task&#xff0c;每个Task为一个标注任务。 1. 创建project 假设你并不熟悉cvat的标注流程&#xff0c;这里以图像2D目标检测为例进…

夜天之书 #95 GreptimeDB 社群观察报告

GreptimeDB 是格睿科技&#xff08;Greptime&#xff09;公司研发的一款开源时序数据库&#xff0c;其源代码[1]在 GitHub 平台公开发布。 https://github.com/GreptimeTeam/greptimedb 我从 2022 年开始知道有 GreptimeDB 这个项目。2023 年&#xff0c;我注意到他们的 Commun…

Hive 主要内容一览

Hive架构 用户接口&#xff1a;Client CLI&#xff08;command-line interface&#xff09;、JDBC/ODBC(jdbc访问hive) 元数据&#xff1a;Metastore 元数据包括&#xff1a;表名、表所属的数据库&#xff08;默认是default&#xff09;、表的拥有者、列/分区字段、表的类型&am…

项目02《游戏-11-开发》Unity3D

基于 项目02《游戏-10-开发》Unity3D &#xff0c; 任务&#xff1a;飞行坐骑 效果&#xff1a; 首先创建脚本&#xff0c; 绑定脚本&#xff0c; using UnityEngine; public class Dragon : MonoBehaviour{ [SerializeField] private float speed 10f; …

PCIE Order Set

1 Training Sequence Training Sequence是由Order Set(OS) 组成&#xff0c;它们主要是用于bit aligment&#xff0c;symbol aligment&#xff0c;交换物理层的参数。当data_rate 2.5GT or 5GT 它们不会被扰码(scramble)&#xff0c;当date_rate 8GT or higher 根据特殊的规…