0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

大纲

  • map
  • reduce
  • 完整代码
  • 参考资料

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分
在这里插入图片描述
那么有没有办法让上图中(B,2)和(D,5)也会被计算呢?
这就可以使用本节介绍的时间滚动窗口。它不依赖于窗口中元素的个数,而是窗口的时间,即窗口时间到了,计算就会进行。
我们稍微修改下《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》的例子,让元素集中在“A”上。

map

class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

reduce

    # reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

这儿我们的Window使用的是滚动时间窗口,其中参数Time.milliseconds(2)是指窗口时长,即2毫秒一个窗口。
我们运行多次代码可以得到不同的结果

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698771761164, end=1698771761166)
(A,12)
(‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771761166, end=1698771761168)
(A,8)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) TimeWindow(start=1698771731386, end=1698771731388)
(A,16)
(‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771731388, end=1698771731390)
(A,4)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771714992, end=1698771714994)
(A,20)

在这里插入图片描述

可以发现结果并不稳定。但是可以发现,每个元素都参与了计算,而不像个数滚动窗口那样部分数据没有被触发计算。

完整代码

from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, TumblingProcessingTimeWindowsclass SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.TumblingProcessingTimeWindows.html#pyflink.datastream.window.TumblingProcessingTimeWindows

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/157873.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【蓝桥杯基础题】星期一

👑专栏内容:蓝桥杯刷题⛪个人主页:子夜的星的主页💕座右铭:前路未远,步履不停目录 一、题目描述二、题目分析三、代码汇总1、C++代码2、Java代码四、总结求解闰年一、题目描述 题目链接:星期一 整个20世纪(1901年1月1日至2000年12月31日之间),一共有多少个星期一…

docker-compose 简单部署MySQL Database

docker-compose 简单部署MySQL Database 本博文部署MySQL 并与上篇部署的 Flask进行关联 主博客目录:《从零开始学习搭建量化平台笔记》 文章目录 docker-compose 简单部署MySQL Database部署 MySQLMySQL 开放端口与权限 主项目计划需要搭建一个MySQL 数据库为其他部…

[ThinkPHP]源码阅读:Model的获取器

目录 1、ThinkPHP组件版本 2、业务Model代码 3、阅读框架源码 4、跳过获取器获取原始数据写法 1、ThinkPHP组件版本 topthink/think-orm v2.0.58 topthink/think-helper v3.1.6 2、业务Model代码 原理:Model通过调用toArray方法使用自定义的获取器 3、阅读框架…

Java学习笔记(六)——面向对象编程(基础)

一、类与对象 (一)类与对象的概念 (二)对象内存布局 ​编辑 对象分配机制 ​编辑 (三)属性/成员变量 (四)创建对象与访问属性 二、成员方法 (一)方法…

springboot2.x使用@RestControllerAdvice实现通用异常捕获

文章目录 demo地址实现效果引入基础类准备1.通用枚举与错误状态枚举2.定义通用返回结果3.自定义业务异常 统一异常捕获测试 demo地址 demo工程地址 实现效果 当我们输入1时,正常的返回通用的响应结果当我们输入2时,抛出异常,被捕获然后返回…

WoShop跨境电商源码:告别繁琐,一键实现批量发货

随着全球电子商务的飞速发展,越来越多的商家开始进军跨境电商领域。然而,搭建一个成功的跨境电商平台需要考虑众多因素,如订单处理、物流配送、支付结算等。在这个过程中,WoShop跨境电商源码因其强大的功能和简易的操作&#xff0…

深度学习之基于ResNet18的神经网络水果分类系统

欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介二、功能三、神经网络水果分类系统四. 总结 一项目简介 基于ResNet18神经网络的水果分类系统是一个利用深度学习技术进行水果图像分类的系统。下面是该系统…

应急响应—日志分析工具

应急响应—日志分析工具 1. 工具提供2. 日志提取工具2.1. 七牛Logkit2.1.1. 优点2.1.2. 支持类型2.1.3. 下载地址2.1.4. 使用方式2.1.4.1. 修改配置2.1.4.2. 启动工具2.1.4.3. 使用测试 2.2. 观星应急工具 3. 日志分析工具3.1. 360星图3.1.1. 使用方式3.1.1.1. 开始运行3.1.1.2…

hugetlb核心组件

1 概述 hugetlb机制是一种使用大页的方法,与THP(transparent huge page)是两种完全不同的机制,它需要: 管理员通过系统接口reserve一定量的大页,用户通过hugetlbfs申请使用大页, 核心组件如下图: 围绕着…

无需数据搬迁,10倍性能提升!携程的统一分析之旅

作者:携程技术中心大数据总监 许鹏 携程自 2022 年起引入了 StarRocks,目前已经成为了集团内部的主要技术栈,应用到酒店、机票、商旅、度假、市场、火车票等多个关键业务线。目前,携程内部已经拥有超过 10 个 StarRocks 集群&…

Linux下使用vscode编写Python项目

我此处是使用VScode远程连接的服务器,具体方法可看如下: 1、vscode中安装Python插件 按上面步骤安装好Python插件后,重启vscode; 2、选择Python解释器 创建Python项目结构: 按下F1,打开vscode命令栏&am…

ViT Vision Transformer超详细解析,网络构建,可视化,数据预处理,全流程实例教程

关于ViT的分析和教程,网上又虚又空的东西比较多,本文通过一个实例,将ViT全解析。 包括三部分内容,网络构建;orchview.draw_graph 将网络每一层的结构与输入输出可视化;数据预处理。附完整代码 网络构建 …