Java修仙传之Flink篇

大道三千:最近我修Flink

目前个人理解:

处理有界,无界流的工具

FLINK:

FLINK定义:

Flink特点

Flink分层API

流的定义

有界数据流(批处理):

有界流:数据结束了,程序也就结束了

知道数据开始以及结束的地方

无界数据流:

特征:读一条,计算一条,输出一次结果

知道数据开始的地方,却不知道结束的地方

(好似长江大河,会一直一直一直产生数据)

流的状态

个人理解:(有状态流会基于内存保存之前的数据)

如果后续流的操作需要用到之前的数据,这个流时有状态的

如果后续流的操作不需要用到之前的数据,这个流是无状态的

DataSet API:有界流批处理( 已淘汰)

1:创建执行环境

2:读取流(数据)

3:将读取到的数据,转换为方便处理的格式

4:将收集到的数据进行(分组,求和,最大,最小等....)操作

//批处理方式(有界流,因为很明确的知道这个文件在哪里结束)
public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据  按行读取(存储的元素就是每行的文本)DataSource<String> lineDS = env.readTextFile("input/words.txt");// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Override         //一行数据       // 数据收集器     out:相当于是一个按照 下面格式收集数据的收集器  格式=out.collect(Tuple2.of(word,1L));public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");  //一行数据按照" "拆分for (String word : words) {   //word = 一行中的每一个字段    如果1改成2,则统计时数目会成2Tuple2<String, Long> of = Tuple2.of(word, 1L);//每个的那次都转为这种格式out.collect(of);  // 收集器添加数据 (转换格式为 (循环到的字段,1L))}}});// 4. 按照 word 进行分组    按照第一个字段分组.(字段,1L),就是按照第一个字段分组(A,1),(b,1),(c,1),(d,1),(d,1) 就是按照abcd分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计   根据第二个字段求和,即将每个分组的第二个字段相加,得到该分组的总和AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}

DataStream API:流、批一体处理

转换(flatMap)、

分组(keyBy)、

求和(sum)、

执行(execute)、

读取文本(readTextFile,有界流)

1:创建流式执行环境(基于StreamExecutionEnvironment)

2:读取文件

3:转换、分组、求和,得到统计结果

4:打印输出

5:执行

//流处理方式 (有界流,因为很明确的知道这个文件在哪里结束),如果不是本地而是网络则是无界流
public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");// 3. 转换、分组、求和,得到统计结果                                                                          SingleOutputStreamOperator<Tuple2<String, Integer>> resultList = lineStream.flatM输入类型,输出类型ap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Override           //当前行数据   //要返回的类型public void flatMap(String line, Collector<Tuple2<String, Integer>> list) throws Exception {String[] fields = line.split(" ");for (String field : fields) {Tuple2<String, Integer> result = Tuple2.of(field, 1);list.collect(result);}}});//分组                                                                                    // 传入的数据类型()           要分组的数据类型KeyedStream<Tuple2<String, Integer>, String> gropbyDate = resultList.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0; //这里是类型的第一位。如(hello,1),则是根据hello进行分组}});//求和。      以上一个为例子:(hello,1)分组之后,根据1索引即第二位(hello,1)的1进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> sum = gropbyDate.sum(1);//打印输出sum.print();//执行env.execute();}}
        // 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);

结果:

读取socket(无界流)

事件监听(环境对象.socketTextStream(IP,端口号))

备注:先启动linux 输入命令nc -lk 7777

然后启动代码监听 7777

此时linux输入的数据会被代码抓取到

备注2:跟前两个的区别就是这个是调用的socketTextStream。其他无任何区别

//监听7777端口的数据流
// 这里代码监听了  IP地址192.168.200.130  端口号7777 的操作   。ip地址那里写主机名也行
public class SocketStreamWordCount {public static void main(String[] args) throws Exception {//构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//拿到数据DataStreamSource<String> lineStream = env.socketTextStream("192.168.200.130", 7777);// 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> convert = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] fields = line.split(" ");for (String field : fields) {Tuple2<String, Long> of = Tuple2.of(field, 1L);out.collect(of);}}});//分组KeyedStream<Tuple2<String, Long>, Object> gropBy = convert.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}});//求和SingleOutputStreamOperator<Tuple2<String, Long>> sum = gropBy.sum(1);//输出sum.print();//执行env.execute();}
}
 SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(data -> data.f0).sum(1);

LMD存在泛型擦除,解决方案看这里

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/155441.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从红队视角看AWD攻击

AWD的权限维持 攻防兼备AWD模式是一种综合考核参赛团队攻击、防御技术能力、即时策略的比赛模式。在攻防模式中&#xff0c;参赛队伍分别防守同样配置的虚拟靶机&#xff0c;并在有限的博弈时间内&#xff0c;找到其他战队的薄弱环节进行攻击&#xff0c;同时要对自己的靶机环…

VNC图形化远程连接Ubuntu服务器

我的Ubuntu版本22.04.3&#xff0c;带有gnome图形桌面。配置过程参考了几篇博客&#xff0c;大致流程如下。因为是配置完之后才整理的流程&#xff0c;可能有疏漏。 Ubuntu服务器上的配置 1.先在服务器上下载vnc server&#xff08;任何一种版本均可&#xff09; vncserver有…

从小白到精通:揭秘perf工具的全部功能与操作技巧

揭秘perf工具的全部功能与操作技巧 一、引言二、理解perf工具的基本概念三、安装与配置perf工具3.1、不同操作系统的perf工具安装3.2、perf工具的配置选项和环境设置 四、perf工具的常用命令和功能4.1、perf工具的基本命令结构和常用参数4.2、perf工具的常见用法和功能4.3、per…

cmake多目录构建初步成功

目录和代码和 首次cmake 多目录构建失败 此文一样&#xff1b; 只有一个CMakeLists.txt&#xff1b; cmake_minimum_required(VERSION 3.10) project(mytest3 VERSION 1.0) include_directories("${PROJECT_SOURCE_DIR}/include") add_executable(mytest3 src/main…

MySQL - 索引详解以及优化;Explain执行计划

官网文档 MySQL :: MySQL 5.7 Reference Manual :: 8.3 Optimization and Indexes Mysql优化&#xff08;出自官方文档&#xff09; - 第八篇&#xff08;索引优化系列&#xff09; 目录 Mysql优化&#xff08;出自官方文档&#xff09; - 第八篇&#xff08;索引优化系列&…

【ChatGPT瀑布到水母】AI 在驱动软件研发的革新与实践

这里写目录标题 前言内容简介作者简介专家推荐读者对象目录直播预告 前言 计算机技术的发展和互联网的普及&#xff0c;使信息处理和传输变得更加高效&#xff0c;极大地改变了金融、商业、教育、娱乐等领域的运作方式。数据分析、人工智能和云计算等新兴技术&#xff0c;也在不…

pytorch 笔记:KLDivLoss

1 介绍 对于具有相同形状的张量 ypred​ 和 ytrue&#xff08;ypred​ 是输入&#xff0c;ytrue​ 是目标&#xff09;&#xff0c;定义逐点KL散度为&#xff1a; 为了在计算时避免下溢问题&#xff0c;此KLDivLoss期望输入在对数空间中。如果log_targetTrue&#xff0c;则目标…

预安装win11的电脑怎么退回正版win10?

对于新购的笔记本 通常来讲预装的系统是全新安装的&#xff0c;是没有之前Windows10系统文件的&#xff0c;无法回退。 可以打开设置-----系统----恢复-----看下是否有该选项。 ------------------------------------------------------------------------------- 若是在上述…

Yakit工具篇:WebFuzzer模块之重放和爆破

简介 Yakit的Web Fuzzer模块支持用户自定义HTTP原文发送请求。为了让用户使用简单&#xff0c;符合直觉&#xff0c;只需要关心数据相关信息&#xff0c;Yakit后端(yaklang)做了很多工作。 首先我们先来学习重放请求的操作&#xff0c;在日常工作中可以使用 Web Fuzzer进行请…

MWeb Pro for Mac:博客生成编辑器,助力你的创作之旅

在当今数字化时代&#xff0c;博客已经成为了许多人记录生活、分享知识和表达观点的重要渠道。而要打造一个专业、美观且易于管理的博客&#xff0c;选择一款强大的博客生成编辑器至关重要。今天&#xff0c;我向大家推荐一款备受好评的Mac软件——MWeb Pro。 MWeb Pro是一款专…

C++核心编程---友元

目录 友元 友元的关键字 friend 友元的三种实现方式 1. 全局函数做友元 2. 类做友元 3. 成员函数做友元 友元 生活中你的家有客厅(Public)&#xff0c;有你的卧室(Private) 客厅所有来的客人都可以进去&#xff0c;但是你的卧室是私有的&#xff0c;也就是说只有你能进…

基因家族扩张与收缩分析-CAFE5

CAFE(Computational Analysis of gene Family Evolution)是一款以解释系统发育历史的方式分析基因家族大小变化的软件&#xff0c;这种分析常被称为基因家族收缩扩张(Gene family expansions and contractions)分析。 CAFE使用出生和死亡过程来模拟用户指定的系统发育树中的基…