Flink窗口分类简介及示例代码

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 1. 流式计算
      • 2. 窗口
      • 3. 窗口的分类
        • ◆ 基于时间的窗口(时间驱动)
          • 1) 滚动窗口(Tumbling Windows)
          • 2) 滑动窗口(Sliding Windows)
          • 3) 会话窗口(Session Windows)
        • ◆ 基于元素个数的(数据驱动)
          • 1) 滚动窗口(Tumbling Windows)
          • 2) 滑动窗口(Sliding Windows)

1. 流式计算

  Flink作为一个流式处理引擎,被设计用来处理无限数据集,理论上来说,无限数据集是一种不断产生,源源不断的数据集,说白了就是你不知道这个数据流它啥时候结束,这就是无限数据集。

  流式计算的思想是每来一个数据我就直接处理,而不用等,因此他非常适合在实时性要求比较高的场景下使用。

2. 窗口

  在流处理的场景下,如果我们想要统计过去某个时间段或过去多少条数据的指标时,就需要用到窗口,在Flink中,窗口(window)可以将流划分为有限块进行处理,Flink将这些有限的块抽象为“存储桶(bucket)”,我们可以在这些所谓的桶上做计算,也就实现了无限数据的有限计算。

  窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。

  窗口的声明周期是:一个窗口在第一个属于它的元素到达时就会被创建,然后在时间(event 或 processing time) 超过窗口的“结束时间戳 + 用户定义的 allowed lateness (可容忍的迟到时间)”时 被完全删除。Flink 仅保证删除基于时间的窗口,其他类型的窗口不做保证, 比如全局窗口(Global Windows)。 例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么第一个元素落入 12:00 至 12:05 这个区间时,Flink 就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06 时,这个窗口将被摧毁。关于窗口的详细介绍查看->官方对于窗口的介绍

3. 窗口的分类

◆ 基于时间的窗口(时间驱动)

1) 滚动窗口(Tumbling Windows)

window(TumblingProcessingTimeWindows.of(Time.seconds(10))),参数是时间滚动窗口大小。10秒滚动一个窗口

  滚动窗口将元素分发到指定大小的窗口。滚动窗口的大小是固定的,且个窗口之间没有空隙,不会重叠。比如说,如果你指定了滚动窗口的大小为5分钟,那么每5分钟就会有一个窗口被计算,且一个新的窗口被创建。如下图所示:
在这里插入图片描述
示例代码:

public class Flink01_Window_Time {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为10s的滚动窗口 每隔10s滚动一次.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(// 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {// 在窗口关闭的时候触发一次@Overridepublic void process(String s, // key ,keyBy之后的keyContext context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素Collector<Object> out) throws Exception {// 把Iterable中所有的元素取出并存入到 list 集合中List<WaterSensor> list = AnqclnUtil.toList(elements);// 获取窗口的相关信息,窗口开始时间和结束时间String startTime = AnqclnUtil.toDateTime(context.window().getStart());String endTime = AnqclnUtil.toDateTime(context.window().getEnd());out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

自定义的工具类:

public class AnqclnUtil {// 要先声明泛型public static <T>List<T> toList(Iterable<T> elements) {List<T> list = new ArrayList<>();for (T t : elements) {list.add(t);}return list;}// 将long类型的时间转换为时间字符串public static String toDateTime(long ts) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(ts);}
}

运行结果:

在这里插入图片描述

2) 滑动窗口(Sliding Windows)

window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))),参数是时间滑动窗口大小和滑动距离。5秒滑动一个窗口,每个窗口最多放10个元素

  与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

  比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

在这里插入图片描述

示例代码:

public class Flink01_Window_Time {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).process(// 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {// 在窗口关闭的时候触发一次@Overridepublic void process(String s, // key ,keyBy之后的keyContext context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素Collector<Object> out) throws Exception {// 把Iterable中所有的元素取出并存入到 list 集合中List<WaterSensor> list = AnqclnUtil.toList(elements);// 获取窗口的相关信息,窗口开始时间和结束时间String startTime = AnqclnUtil.toDateTime(context.window().getStart());String endTime = AnqclnUtil.toDateTime(context.window().getEnd());out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

3) 会话窗口(Session Windows)

window(ProcessingTimeSessionWindows.withGap(Time.seconds(4))),参数是会话间隔,也就是多久没有活跃就关闭当前会话。4秒不活跃就关闭窗口。

  会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

在这里插入图片描述

示例代码:

public class Flink01_Window_Time {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))// 定义一个session窗口,时间间隔为4s  对于session窗口来说,不同的key出发时间不同,每个key都维护自己的session.window(ProcessingTimeSessionWindows.withGap(Time.seconds(4))).process(// 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {// 在窗口关闭的时候触发一次@Overridepublic void process(String s, // key ,keyBy之后的keyContext context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素Collector<Object> out) throws Exception {// 把Iterable中所有的元素取出并存入到 list 集合中List<WaterSensor> list = AnqclnUtil.toList(elements);// 获取窗口的相关信息,窗口开始时间和结束时间String startTime = AnqclnUtil.toDateTime(context.window().getStart());String endTime = AnqclnUtil.toDateTime(context.window().getEnd());out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

◆ 基于元素个数的(数据驱动)

1) 滚动窗口(Tumbling Windows)

countWindow(3),参数是个数滚动窗口大小。3个元素滚动一个窗口

  每来多少个元素就滚动一次

示例代码:

public class Flink02_Window_Count {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发.countWindow(3).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String s,Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = AnqclnUtil.toList(elements);out.collect(" key: "+s+" "+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

2) 滑动窗口(Sliding Windows)

countWindow(3,2),参数是个数滑动窗口大小和滑动步长。每两个元素产生一个新的窗口,每个窗口最多放3个元素。

  就比滚动的多了个参数,滑动步长。步长是生成新窗口的条件,而窗口大小是指这个窗口最多能放多少个元素

示例代码:

public class Flink02_Window_Count {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发
//                .countWindow(3)// 定义一个长度为3(窗口内元素的最大个数)  每来两个2个元素滑动一次,.countWindow(3,2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String s,Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = AnqclnUtil.toList(elements);out.collect(" key: "+s+" "+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/65353.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ArcGIS Maps SDK for JavaScript系列之三:在Vue3中使用ArcGIS API加载三维地球

目录 SceneView类的常用属性SceneView类的常用方法vue3中使用SceneView类创建三维地球项目准备引入ArcGIS API创建Vue组件在OnMounted中调用初始化函数initArcGisMap创建Camera对象Camera的常用属性Camera的常用方法 要在Vue 3中使用ArcGIS API for JavaScript加载和展示三维地…

太牛了!国内版ChatDoc企业知识库,直接操作Doc、Docx、PDF、txt等文件

自ChatGPT问世以来&#xff0c;国外就有ChatPDF、ChatDOC等基于文档问答的项目&#xff0c;但是国内还一直处于对话类产品的研发中。 贵州猿创科技研发了基于本地向量模型的ChatDoc知识库系统&#xff0c;可以直接上传Doc、Docx、PDF、txt、网页链接等进行问答。 体验地址&…

【资讯速递】AI与人类思维的融合;OpenAI在中国申请注册“GPT-5”商标;移动大模型主要面向to B 智能算力是未来方向

2023年8月11日 星期五 癸卯年六月廿五 第000001号 欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于IT资讯速递专栏,本专栏主要用于发布各种IT资讯&#xff0c;为大家可以省时省力的就能阅读和了解到行业的一些新资讯 资…

redis的基础命令01

1、操作库的指令 1、清除当前库---flushdb 2、清除所有库---flushAll 2、操作key的指令 最常用的指令get、set 1&#xff09;set key value 2&#xff09;get key 基础指令 1、del 删除单个&#xff1a;del key 、批量删除&#xff1a;del key1 key2 key3 2、exists 判断key是否…

1.作用域

1.1局部作用域 局部作用域分为函数作用域和块作用域。 1.函数作用域: 在函数内部声明的变量只能在函数内部被访问&#xff0c;外部无法直接访问。 总结&#xff1a; (1)函数内部声明的变量&#xff0c;在函数外部无法被访问 (2)函数的参数也是函数内部的局部变量 (3)不同函数…

Eclipse-配置彩色输出打印

文章目录 前言配置下载查看是否安装 前言 这是一篇古老的文章&#xff0c;那个时候还在用Eclipse &#xff0c;现在已经换 IDEA 了… 这是一篇 2018 年的文章&#xff0c;我只是将文章从个人比较挪到了CSDN 中 配置 配置完然后下载下面插件即可生成彩色代码。 下载 ANSI …

章节5:Burp 扫描功能

章节5&#xff1a;Burp 扫描功能 参考资料 https://portswigger.net/burp/documentation/scanner https://portswigger.net/burp/documentation/desktop/scanning 模块总体介绍&#xff1a; https://portswigger.net/burp/vulnerability-scanner 扫描功能的使用&#xff…

MEC | 条款3 绝对不要以多态(polymorphically)方式处理数组

条款3 绝对不要以多态&#xff08;polymorphically&#xff09;方式处理数组 文章目录 条款3 绝对不要以多态&#xff08;polymorphically&#xff09;方式处理数组继承Example 打印基类-派生类数组传入BalencedBST 数组到函数 删除基类-派生类数组>>>>> 欢迎关…

06 为什么需要多线程;多线程的优缺点;程序 进程 线程之间的关系;进程和线程之间的区别

为什么需要多线程 CPU、内存、IO之间的性能差异巨大多核心CPU的发展线程的本质是增加一个可以执行代码工人 多线程的优点 多个执行流&#xff0c;并行执行。&#xff08;多个工人&#xff0c;干不一样的活&#xff09; 多线程的缺点 上下文切换慢&#xff0c;切换上下文典型值…

MySQL入门学习教程(三)

上一章给大家说的是数据库的视图&#xff0c;存储过程等等操作&#xff0c;这章主要讲索引&#xff0c;以及索引注意事项&#xff0c;如果想看前面的文章&#xff0c;url如下&#xff1a; MYSQL入门全套(第一部)MYSQL入门全套(第二部) 索引简介 索引是对数据库表中一个或多个…

Java面试——一分钟搞懂限流算法

为什么限流 运营网站&#xff0c;经常会遇到各种挑战&#xff1a;某黑客发起DoS攻击、网络爬虫网页抓取、商品秒杀活动、双十一与618等场景&#xff0c;会使流量突然激增&#xff0c;如果不限制流量的访问就会使系统宕机。 常见的限流算法 1.漏桶算法&#xff08; LEAKY BUC…

Qt扫盲-Qt Model/View 理论总结 [下篇]

Qt Model/View 理论总结 [下篇] 一、处理Item view 中的选择1. 概念1. 当前 item 和已选 item 2. 使用选择 model1. 选择 item2. 读取选区状态3. 更新选区4. 选择 model 中的所有项 二、创建新 model1. 设计一个 model2. 只读示例 model1. model 的尺寸2. model 头和数据 3. 可…