Flink代码单词统计 ---批处理

  • flatMap:一对多转换操作,输入句子,输出分词后的每个词
  • groupBy:按Key分组,0代表选择第1列作为Key
  • sum:求和,1代表按照第2列进行累加
  • print:打印最终结果

1.WordCount代码编写

需求:统计一段文字中,每个单词出现的频次。

环境准备:在src/main/java目录下,新建一个包,命名为com.atguigu.wc。

1.1 批处理

批处理基本思路:

①.先逐行读入文件数据

②.然后将每一行文字拆分成单词

③.接着按照单词分组

④.统计每组数据的个数

⑤.就是对应单词的频次。

1.2 创建项目

1)创建工程

(1)打开IntelliJ IDEA,创建一个Maven工程。

(2)将这个Maven工程命名为Flinkdemo。

2)添加项目依赖

在项目的pom文件中,添加Flink的依赖,包括flink-java、flink-streaming-java,以及flink-clients(客户端,也可以省略)。

<properties><flink.version>1.17.0</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
</dependencies>

3)数据准备

(1)在工程根目录下新建一个Data文件夹,并在下面创建文本文件words.txt

(2)在words.txt中输入一些文字,例如:

hello flink
hello world
hello java

4)代码编写

(1)在com.atguigu.wc包下新建Java类Demo01_BatchProcess,在静态main方法中编写代码。具体代码实现如下:

package com.atguigu.wordcount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/**as* Created by Smexy on 2023/9/3**  计算的套路:*      ①计算的环境*          spark:SparkContext*          mr:   Driver*          flink:ExecutionEnvironment*      ②把要计算的数据封装为计算模型*         spark:  RDD(spark core)*                 DataFrame|DataSet(sparksql)*                 DStream(sparkstreaming)*         mr:     K-V*         flink:  DataSource**      ③调用计算api*          RDD.转换算子()*          mr: 自己去编写Mapper,Reducer*          flink: DataSource.算子()**  使用的是DataSetAPI(批处理)**  -------------------------*      了解。后续不用了!***/
public class Demo01_BatchProcess
{public static void main(String[] args) throws Exception {//创建支持flink计算的环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//使用环境去读数据,封装为计算模型DataSource<String> source = env.readTextFile("data/words.txt");//调用计算apisource/*hello hi hi hi变为 (hello,1)(h1,1)(h1,1)(h1,1)输出到下游*/.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>(){/*String value: 输入的一行内容Collector<String> out: 输出结果的收集器。帮你把结果自动收集,输出到下游。单词,1: 输出的数据是多列,此时就应该使用集合或Bean来封装。flink提供了Tuple的集合。用于封装多个列。Tuple2: 用来封装2列Tuple3: 用来封装3列....Tuple25: 用来封装25列*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {//Tuple2<String, Integer> data = new Tuple2<>(word, 1);Tuple2<String, Integer> data = Tuple2.of(word, 1);//收集要输出的数据out.collect(data);}}})/*收到的是 (单词,1) 格式计算: 得到 (单词,N)groupBy(int fileds): 适用于 对Tuple类型的数据进行聚合。传入int N,N代表Tuple中的列的索引。groupBy(String fileds): 适用于对Bean类型的数据进行聚合,传入的String就是Bean中的属性名。*/.groupBy(0)// 对tuple2分组后的第二列进行sum运算.sum(1)//在控制台打印输出.print();}
}

5).输出

(flink,1)
(world,1)
(hello,3)
(java,1)

1.3  常见问题

问题1.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解决方式:maven项目的 pom.xml安装依赖:

        <dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.32</version></dependency>

问题2.

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

解决办法:log4j没有配置日志记录的位置,需要配置log4j.properties,在src目录main目录resources文件夹下下新建log4j.properties

log4j.properties配置文件:

log4j.rootLogger=warn,CONSOLE,File#Console
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n#File  DailyRollingFileAppender
log4j.logger.File=info
log4j.appender.File=org.apache.log4j.DailyRollingFileAppender
log4j.appender.File.layout=org.apache.log4j.PatternLayout
log4j.appender.File.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
log4j.appender.File.datePattern='.'yyyy-MM-dd
log4j.appender.File.Threshold = info
log4j.appender.File.append=true
log4j.appender.File.File=d://code/logs/flink/disk.log

 此时再次执行成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/492906.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker基础(一)

文章目录 1. 基础概念2. 安装docker3. docker常用命令3.1 帮助命令3.2 镜像命令3.3 容器命令3.4 其他命令 4. 使用案例 1. 基础概念 镜像&#xff08;Image&#xff09;&#xff1a;Docker 镜像&#xff08;Image&#xff09;&#xff0c;就相当于是一个 root 文件系统。比如官…

精酿啤酒:酵母的选择与发酵时间的影响

酵母是啤酒酿造过程中不可或缺的微生物&#xff0c;其选择和发酵时间对啤酒的口感、品质和风格产生着重要影响。Fendi Club啤酒在酵母的选择与发酵时间的控制方面有着与众不同的技巧和经验。 首先&#xff0c;酵母的选择对啤酒的发酵过程和品质重要。Fendi Club啤酒选用品质、高…

kubectl 命令行管理K8S

目录 陈述式资源管理方法 1.基本信息查看 查看版本信息 查看资源对象简写 查看集群信息 配置kubectl自动补全 node节点查看日志 查看 master 节点状态 查看命名空间 查看default命名空间的所有资源 创建命名空间yy 删除命名空间yy 在命名空间kube-public 创建副本…

vue2实现无感刷新token

&#x1f3ac; 江城开朗的豌豆&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 &#x1f4dd; 个人网站 :《 江城开朗的豌豆&#x1fadb; 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! 目录 &#x1f4d8; 引言&#xff1a; &#x1f4…

力扣用例题:2的幂

此题的解题方法在于根据用例调整代码 bool isPowerOfTwo(int n) {if(n1){return true;}if(n<0){return false;}while(n>2){if(n%21){return false;}nn/2; }if(n1){return false;}return true;}

Kubernetes基础(二十五)-Kubernetes GC原理

1 K8s 的垃圾回收策略 当给k8s一个资源对象设置OwnerReference的时候&#xff0c;删除该资源对象的owner, 该对象也会被连带删除。这个时候用的就是k8s的垃圾回收机制。 k8s目前支持三种回收策略&#xff1a; 1&#xff09;前台级联删除&#xff08;Foreground Cascading De…

129 Linux 系统编程7 ,make 的编写和解析

前文中&#xff0c;我们有多少个.c文件&#xff0c;就需要build 出来多少个.o文件 假设我们的项目很大&#xff0c;怎么管理这些 .c文件呢&#xff1f; 这里就要学习一个make文件的编写了。 makefile 本质上是一个脚本语言 脚本语言实际上就是将一系列命令放在一起执行 mak…

Java 存图方式

一、邻接矩阵 二、邻接表 以点为基本单位,尾插。 三、链式前向星 chain forward star 模拟链表,采用 头插 方法,以边为单位,记录每一条边的目标点。 head[i]:存储以结点 i 为起点的所有边的起始位置 edge[i]:存储第 i 条边的信息 1、初始化 定义三个数组: int m…

【前端素材】推荐优质后台管理系统Skydash平台模板(附源码)

一、需求分析 后台管理系统&#xff08;或称作管理后台、管理系统、后台管理平台&#xff09;是一种专门用于管理网站、应用程序或系统后台运营的软件系统。它通常由一系列功能模块组成&#xff0c;为管理员提供了管理、监控和控制网站或应用程序的各个方面的工具和界面。以下…

web安全学习笔记【17】——信息打点(7)

信息打点-APP资产&知识产权&应用监控&静态提取&动态抓包&动态调试 #知识点&#xff1a; 1、业务资产-应用类型分类 2、Web单域名获取-接口查询 3、Web子域名获取-解析枚举 4、Web架构资产-平台指纹识别 ------------------------------------ 1、开源-CMS指…

lv20 QT入门与基础控件 1

1 QT简介 QT是挪威Trolltech开发的多平台C图形用户界面应用程序框架 典型应用 2 工程搭建 2.1 新建ui工程 不要写中文路径 2.1 不勾选UI&#xff08;主讲&#xff09; 3 QT信号与槽机制 语法&#xff1a;Connect&#xff08;A, SIGNLA(aaa()), B, SLOT(bbb())&#xff09;…

【leetcode热题】杨辉三角 II

难度&#xff1a; 简单通过率&#xff1a; 41.1%题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 题目描述 给定一个非负索引 k&#xff0c;其中 k ≤ 33&#xff0c;返回杨辉三角的第 k 行。 在杨辉三角中&#xff0c;每个数是它左上方和右上方的数的和。 示…