Flink学习(七)-单词统计

前言

Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

一、代码基础格式

//1st 设置执行环境
xxxEnvironment env = xxxEnvironment.getEnvironment;//2nd 设置流
DataSource xxxDS=env.xxxx();//3rd 设置转换
Xxx transformation =xxxDS.xxxx();//4th 设置sink
transformation.print();//5th 可能需要
env.execute();

二、Demo1 批处理

  • 源码

 public static void main(String[] args) throws Exception {//1,创建一个执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2,获取输入流DataSource<String> lineDS = env.readTextFile("input/word.txt");//3,处理数据FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {//3.1 分隔字符串String[] values = value.split(" ");//3.2 汇总统计for (String word : values) {Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);collector.collect(wordTuple);}}});//4,按单词聚合UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);//5,分组内聚合AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);//6,输出结果sum.print();}
  • 效果展示

三、Demo2 流处理

  • 源码

   public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> temp = Tuple2.of(word, 1);collector.collect(temp);}}});KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);sum.print();env.execute();}
  • 效果展示

四、Demo3 无边界流处理

  • 源码

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);sum.print();env.execute();}
  • 效果展示 

往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/637628.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux环境变量深度解析

文章目录 一、引言二、环境变量的基本概念1、环境变量的定义2、环境变量的作用与意义 三、环境变量的导入1、导入所需文件2、登陆时的导入 四、环境变量的设置方法1、查看环境变量的方式2、使用export命令临时设置环境变量3、修改配置文件以永久设置环境变量 五、命令行参数与环…

编写函数fun,它的功能是:利用以下所示的简单迭代方法求方程COS(X)-X=0的一个实根。

本文收录于专栏:算法之翼 https://blog.csdn.net/weixin_52908342/category_10943144.html 订阅后本专栏全部文章可见。 本文含有题目的题干、解题思路、解题思路、解题代码、代码解析。本文分别包含C语言、C++、Java、Python四种语言的解法完整代码和详细的解析。 题干 编写…

一个简单的记工tkinter窗口

代码分享: 导入datetime模块&#xff0c;用于获取当前日期 import datetime as da 导入csv模块&#xff0c;用于读写csv文件 import csv 导入tkinter模块&#xff0c;用于创建窗口和按钮 from tkinter import * 创建主窗口 appTk() 设置窗口大小为1048x2048&#xff0…

学习亚马逊云科技AWS云计算技术的三款官方免费3A游戏大作

玩3A大作免费电脑游戏&#xff0c;就能成为AWS云架构师、云开发大&#x1f42e;&#xff1f;这么好的事尊的假的&#xff1f;小李哥今天就来给大家介绍&#xff0c;如何通过玩AWS官方的定制版虚拟人生、炉石传说和密室逃脱游戏学习AWS。这三个游戏完全免费&#xff0c;没有任何…

webpack-babel2

浏览器的兼容性问题 浏览器的兼容性问题不知包括随屏幕大小而变化&#xff0c;还包括针对浏览器支持的特性&#xff08;如css特性&#xff0c;js特性&#xff09; 做处理。 目前市场上有很多浏览器&#xff1a;Chrome,Safari,IE,Edge等&#xff0c;要根据它们的市场占有率来决…

安全狗云眼的主要功能有哪些?

"安全狗云眼"是一款综合性的网络安全产品&#xff0c;主要用于实时监控和保护企业的网络安全。其核心功能包括威胁检测、漏洞扫描、日志管理和合规性检查等。 以下是安全狗云眼的主要功能详细介绍&#xff1a; 1、资产管理 定期获取并记录主机上的Web站点、Web容器、…

Redis底层数据结构之SDS

目录 一、概述二、SDS结构三、为什么使用SDS 下一篇 redis底层数据结构之ziplist 一、概述 Redis 中的 SDS&#xff08;Simple Dynamic String&#xff0c;简单动态字符串&#xff09;是 Redis 用于存储字符串值的底层实现&#xff0c;是对 C 语言传统字符串&#xff08;以 nu…

【信号处理】基于CNN自编码器的心电信号异常检测识别(tensorflow)

关于 本项目主要实现卷积自编码器对于异常心电ECG信号的检测和识别&#xff0c;属于无监督学习中的生理信号检测的典型方法之一。 工具 方法实现 读取心电信号 normal_df pd.read_csv("/heartbeat/ptbdb_normal.csv").iloc[:, :-1] anomaly_df pd.read_csv(&quo…

7.Eureka注册中心

将user-service服务注册到eureka 将order-service服务注册到eureka eureka:client:service-url:defaultZone: http://localhost:10086/eureka/ <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix…

[转载] 在IIS上启用https的免费ssl证书使用教程

一、申请证书 数字证书管理服务&#xff08;原SSL证书&#xff09;_SSL数字证书_HTTPS加密_服务器证书_CA认证-阿里云 二、添加证书 1、在控制台上做如下操作&#xff1a;文件》添加/删除管理单元》可用的管理单元》证书》添加》确定。 2、在证书管理单元中选择&#xff1a;…

基于spark进行数据分析的心力衰竭可视化大屏项目

基于spark进行数据分析的心力衰竭可视化大屏项目 项目背景 在当今的医疗领域&#xff0c;数据驱动的决策变得日益重要。心力衰竭作为常见的心血管疾病&#xff0c;其临床数据的分析对于改善患者治疗结果至关重要。本文将介绍如何利用Apache Spark进行大规模心力衰竭临床数据的…

IOS 32位调试环境搭建

一、背景 调试IOS程序经常使用gdb&#xff0c;目前gdb只支持32位程序调试&#xff0c;暂不支持IOS 64位程序调试。IOS 32位程序使用GDB调试之前&#xff0c;必须确保手机已越狱&#xff0c;否则无法安装和使用GDB调试软件。下面详细介绍GDB调试IOS 32位程序的环境搭建。 二、I…