【入门Flink】- 02Flink经典案例-WordCount

WordCount

需求:统计一段文字中,每个单词出现的频次

添加依赖

	<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>

1.批处理

基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数。

1.1.数据准备

resources目录下新建一个 input 文件夹,并在下面创建文本文件words.txt

words.txt

hello flink
hello world
hello java

1.2.代码编写

public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)String filePath = Objects.requireNonNull(BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataSource<String> lineDS = env.readTextFile(filePath);// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}});// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}

打印结果如下:(结果正确)

image-20231031193224024

上述代码是基于 DataSet API 的,也就是对数据的处理转换,是看作数据集来进行操作的。

事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。从Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2.流处理

DataStreamAPI可以直接处理批处理和流处理的所有场景

2.1读取文件

还是上述words.txt文件

代码实现:

public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件String filePath = Objects.requireNonNull(StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataStreamSource<String> lineStream = env.readTextFile(filePath);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

与批处理程序BatchWordCount有几点不同:

  • 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操做调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key。
  • 最后执行execute方法,开始执行任务。

2.2读取Socket文件流

实际生产中,真正的数据多是无界的,需要持续地捕获数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。

  1. 简单改动,只需将StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取socket文本流的方法socketTextStream
public class StreamSocketWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}
  1. 在 Linux 环境的主机 124.222.253.33 上,执行下列命令,发送数据进行测试
nc -lk 7777

注意:要先启动端口,后启动 StreamSocketWordCount 程序,否则会报超时连接异常。

  1. 从Linux发送数据

1、输入“hello flink”,输出如下内容

image-20231031201232801

2、再输入“hello world”,输出如下内容

image-20231031201316467

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,对于 flatMap 里传入的 Lambda 表达式,系统只能推断出返回的是Tuple2类型,而无法得到 Tuple2<String, Long>。需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/157282.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python小工具分享:优雅地实现进度条和系统通知

shigen坚持日更的博客写手&#xff0c;擅长Java、python、vue、shell等编程语言和各种应用程序、脚本的开发。坚持记录和分享从业两年以来的技术积累和思考&#xff0c;不断沉淀和成长。 趁着休息的空隙&#xff0c;研究了一下两个比较有意思的脚本&#xff0c;在日常的使用中很…

MySQL用户管理和授权

目录 一.用户管理 1.1.新建用户 1.2.查看用户 1.3.重命名用户rename 1.4.删除用户 1.5.修改当前登录用户密码 1.6.修改其他用户密码 1.7.忘记root 密码并找回 二.数据库用户授权 2.1.all privilege包含的权限 2.2.授予权限 ①允许指定用户查询指定数据库表 ②允许…

初识Vue 输出Hello World 及注意事项

在我们还没接触Vue之前&#xff0c;我同学常说我可以直接在元素里输出JS的表达式吗&#xff1f;肯定是不太行。当我们接触vue.js后&#xff0c;这个想法成了现实。 每当我们学习一门新的语言或者框架时&#xff0c;我们都习惯打印一个“hello world”&#xff0c;在我们vue当中…

Node.js基础-三大模块

文章目录 一、概念对比作用总结 二、fs文件模块三、path路径模块四、http模块服务器相关概念IP地址域名和域名服务器端口号 创建基本的web服务器基本步骤req请求对象res响应对象根据不同的url响应不同的html内容 总结 一、概念 借助浏览器引擎&#xff08;提供BOM&#xff0c;…

【机器学习】一、机器学习概述与模型的评估、选择

机器学习简介 由来 阿瑟.萨缪尔Arthur Samuel,1952年研制了一个具有自学习能力的西洋跳棋程序&#xff0c;1956年应约翰.麦卡锡John McCarthy&#xff08;人工智能之父&#xff09;之邀&#xff0c;在标志着人工智能学科诞生的达特茅斯会议上介绍这项工作。他发明了“机器学习…

系列十五、idea全局配置

一、全局Maven配置 IDEA启动页面>Customize>All settings>Build,Execution,Deployment>Build Tools>Maven 二、全局编码配置 IDEA启动页面>Customize>All settings>Editor>File Encodings 三、全局激活DevTools配置 IDEA启动页面>Customize>A…

python连接clickhouse (CK)

Author: tkhywang 2810248865qq.com Date: 2023-11-01 11:28:58 LastEditors: tkhywang 2810248865qq.com LastEditTime: 2023-11-01 11:36:25 FilePath: \PythonProject02\Python读取clickhouse2 数据库数据.py Description: 这是默认设置,请设置customMade, 打开koroFileHead…

61. 旋转链表、Leetcode的Python实现

博客主页&#xff1a;&#x1f3c6;李歘歘的博客 &#x1f3c6; &#x1f33a;每天不定期分享一些包括但不限于计算机基础、算法、后端开发相关的知识点&#xff0c;以及职场小菜鸡的生活。&#x1f33a; &#x1f497;点关注不迷路&#xff0c;总有一些&#x1f4d6;知识点&am…

Android开发知识学习——Kotlin进阶

文章目录 次级构造主构造器init 代码块构造属性data class相等性解构Elvis 操作符when 操作符operatorLambdainfix 函数嵌套函数注解使用处目标函数简化函数参数默认值扩展函数类型内联函数部分禁用用内联具体化的类型参数抽象属性委托属性委托类委托 Kotlin 标准函数课后题 次…

JavaWeb 怎么在servlet向页面输出Html元素?

service()方法里面的方法体&#xff1a; resp.setContentType("text/html;charsetutf-8");//获得输出流PrintWriter对象PrintWriter outresp.getWriter();out.println("<html>");out.println("<head><title>a servlet</title>…

EASYX实现多物体运动

eg1:单个物体运动使用easyx实现单个小球的运动 #include <stdio.h> #include <easyx.h> #include <iostream> #include <math.h> #include <stdlib.h> #include <conio.h> #include <time.h> #define PI 3.14 #define NODE_WIDTH 4…

vs2013/2015/2019扩展-联机提示“未能建立到服务器的连接“/“基础连接已经关闭: 发送时发生错误“/“远程主机强迫关闭了一个现有的连接“

VS2013\VS2015 输入命令 [Net.ServicePointManager]::SecurityProtocol[Net.ServicePointManager]::SecurityProtocol-bOR [Net.SecurityProtocolType]::Tls12 采用上述方法偶尔可以有效&#xff0c;重新启动VS就没用了 VS2019 怎么样都不行 最终解决办法&#xff1a;换一…