大数据-玩转数据-Flink窗口函数

一、Flink窗口函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。

二、ReduceFunction(增量聚合函数)

输入和输出必须一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}

运行结果
在这里插入图片描述
在这里插入图片描述

三、AggregateFunction(增量聚合函数)

输入和输出可以不一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}

运行结果
在这里插入图片描述
在这里插入图片描述

四、ProcessWindowFunction(全窗口函数)

上面例子里已经用到

new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/89920.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于使用远程工具连接mysql数据库时,提示:Public Key Retrieval is not allowed

我在使用DBeaver工具连接 数据库时&#xff0c;提示&#xff1a;Public Key Retrieval is not allowed&#xff0c; 我在前一天还是可以连接的&#xff0c;但是今天突然无法连接了&#xff0c; 但是最后捣鼓了一下又可以了。 具体方法&#xff1a;首先先把mysql服务停了&#x…

git rebase和merge区别

一、概述 merge和rebase 标题上的两个命令&#xff1a;merge和rebase都是用来合并分支的。 这里不解释rebase命令&#xff0c;以及两个命令的原理&#xff0c;详细解释参考这里。 下面的内容主要说的是两者在实际操作中的区别。 1.1 什么是分支 分支就是便于多人在同一项目…

Java基础 数据结构一【栈、队列】

什么是数据结构 数据结构是计算机科学中的一个重要概念&#xff0c;用于组织和存储数据以便有效地进行访问、操作和管理。它涉及了如何在计算机内存中组织数据&#xff0c;以便于在不同操作中进行查找、插入、删除等操作 数据结构可以看作是一种数据的组织方式&#xff0c;不…

性能优化维度

CPU 首先检查 cpu&#xff0c;cpu 使用率要提升而不是降低。其次CPU 空闲并不一定是没事做&#xff0c;也有可能是锁或者外部资源瓶颈。常用top、vmstat命令查看信息。 vmstat 命令: top: 命令 IO iostat 命令&#xff1a; Memory free 命令&#xff1a; 温馨提示&#xff1a…

二叉树中的堆

堆的概念和结构 大堆&#xff1a; 树中的任何一个父亲都大于等于孩子 小堆&#xff1a; 树中的任何一个父亲都小于等于孩子 堆在逻辑上是二叉树来存储的&#xff0c;就是在我们的想象中他是按二叉树来存储的&#xff0c;但是在实际上&#xff0c;它是以数组的形式来存储的&…

渗透测试漏洞原理之---【任意文件上传漏洞】

文章目录 1、任意文件上传概述1.1、漏洞成因1.2、漏洞危害 2、WebShell解析2.1、Shell2.2、WebShell2.2.1、大马2.2.2、小马2.2.3、GetShell 3、任意文件上传攻防3.1、毫无检测3.1.1、源代码3.1.2、代码审计3.1.3、靶场试炼 3.2、黑白名单策略3.2.1、文件检测3.2.2、后缀名黑名…

TCP/IP五层模型、封装和分用

1.网络通信基础2.协议分层OSI七层协议模型TCP/IP五层/四层协议模型【重点】 3. 封装&分用 1.网络通信基础 IP地址&#xff1a;表示计算机的位置&#xff0c;分源IP和目标IP&#xff1b;举个例子&#xff1a;买快递&#xff0c;商家从上海发货&#xff0c;上海就是源IP&…

vue3批量导出文件,打包成压缩包

1.下载插件 npm install jszip npm install file-saver2.封装方法 新建一个exportFileZip.js文件 // 引入实现下载压缩包的两个库 import JSZip from jszip; import FileSaver from file-saver; // 引入请求模块 import axios from axios // 实现下载压缩包按钮的方法 fileA…

【Java】树结构SQL数据的如何去实现搜索

这里写自定义目录标题 需要实现的效果前端需要的json格式&#xff1a;一定是一个完整的树结构错误错误的返回格式错误的返回格式实现的效果 正确正确的返回格式正确的展示画面 后端逻辑分析代码总览 数据库表结构 需要实现的效果 前端需要的json格式&#xff1a;一定是一个完整…

java定位问题工具

一、使用 JDK 自带工具查看 JVM 情况 在我的机器上运行 ls 命令&#xff0c;可以看到 JDK 8 提供了非常多的工具或程序&#xff1a; 接下来&#xff0c;我会与你介绍些常用的监控工具。你也可以先通过下面这张图了解下各种工具的基本作用&#xff1a; 为了测试这些工具&#x…

普通制造型企业,如何成就“链主品牌

“链主品牌”通常掌握产业链主导地位&#xff0c;对于普通制造型企业看起来是遥不可及的事情&#xff0c;事实上并非如此。从洞察穿越周期的“链主品牌”规律来看&#xff0c;做螺丝起家的伍尔特、做宠物牵引绳的福莱希等小企业也可以成为“链主品牌”。另外&#xff0c;由于新…

Ansible学习笔记10

1、在group1的被管理机里的mariadb里创建一个abc库&#xff1b; 1&#xff09; 然后我们到agent主机上进行检查&#xff1a; 可以看到数据库已经创建成功。 再看几个其他命令&#xff1a; #a组主机重启mysql&#xff0c;并设置开机自启 ansible a -m service -a "namemy…