0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)

大纲

  • Tumbling Count Windows
    • map
    • reduce
      • Window Size为2
      • Window Size为3
      • Window Size为4
      • Window Size为5
      • Window Size为6
  • 完整代码
  • 参考资料

之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是数据会源源不断输入。
在这里插入图片描述
对于这种数据,我们称之为无界流,即没有“终止的界限”。但是程序在底层一定不能等着无止境的数据都传递结束再处理,因为“无止境”就意味着“终止的界限”触发计算的条件是不存在的。那么我们可以人为的给它设置一个“界”,这就是我们本节介绍的窗口。

Tumbling Count Windows

Tumbling Count Windows是指按元素个数计数的滚动窗口。
滚动窗口是指没有元素重叠的窗口,比如下面图是个数为2的窗口。(元素重叠的窗口我们会在《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》介绍)
在这里插入图片描述
个数为3的窗口
在这里插入图片描述
我们用代码探索下这个概念

map

word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

这段代码构造了一个KeyedStream,用于存储word_count_data中的数据。
我们并没有让Source是流的形式,是因为为了降低例子复杂度。但是我们将runntime mode设置为流(STREAMING)模式。
在这里插入图片描述

reduce

我们需要定义一个Reduce类,用于对元组中的数据进行计算。这个类需要继承于WindowFunction,并实现相应方法(本例中是apply)。
apply会计算一个相同key的元素个数。比如key是“E”的元组个数是6。

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]

Window Size为2

    # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

(A,2)
(B,2)
(C,2)
(C,2)
(D,2)
(D,2)
(E,2)
(E,2)
(E,2)

  • A的个数是2是因为A的确只有两个元组,而一个Size为2的Window正好承载了这两个元素。于是有(A,2)这个结果;
  • B的个数是3。但是会产生两个窗口,第一个窗口承载了前两个元素,第二个窗口当前只有一个元素。于是第一个窗口进行了Reduce计算,得出一个(B,2);第二个窗口还没进行reduce计算,就没有展现出结果;
  • C有4个,正好可以被2个窗口承载。这样我们就看到2个(C,2)。
  • D有5个,情况和B类似。它被分成了3个窗口,只有2个窗口满足个数条件,于是就输出2个(D,2);最后一个窗口因为元素不够,就没尽兴reduce计算了。
  • E有6个,正好被3个窗口承载。我们就看到3个(E,2)。
    在这里插入图片描述

Window Size为3

    # reducingreduced=keyed.count_window(3) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(B,3)
(C,3)
(D,3)
(E,3)
(E,3)

在这里插入图片描述

Window Size为4

    # reducingreduced=keyed.count_window(4) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(C,4)
(D,4)
(E,4)

在这里插入图片描述

Window Size为5

    # reducingreduced=keyed.count_window(5) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(D,5)
(E,5)

在这里插入图片描述

Window Size为6

    # reducingreduced=keyed.count_window(6) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,6)

在这里插入图片描述

完整代码

from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/157112.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于Kubernetes——cka认证含金量怎么样?

2019年和2020年,Rancher分别对近1,000名专业人员展开了调查。调查结果表明,Kubernetes在不同行业连续两年保持了90%以上的采用率,而生产环境中的容器采用率从2019年的85%增长至2020年的87%。 SUSE大中华区总裁秦小康表示:“从调研…

进阶|HDR-ISP支持ROS2以及GPU实时处理啦!

引言 之前我们开源了一份HDR-ISP代码供大家入门学习,但很多后台同学反馈CPU版本是实时性不够、对于相机无法实时处理。没关系,今天Cuda加速、支持ROS2可以实时处理的的HDR-ISP GPU版本来啦! 此次GPU版本开源版本只提供lib供学习测试&#x…

使用LWP::UserAgent库程序

使用LWP::UserAgent库的下载器程序,它使用Perl下载图片。以下是代码: #!/usr/bin/perl -w use strict; use LWP::UserAgent; ​ # 创建对象 my $proxy LWP::UserAgent->new(proxies > {http > ""}); ​ # 加载网页 my $response $…

arcpy.message实现探索

arcpy 位置D:\Program Files\GeoScene\Pro\Resources\ArcPy\arcpy\__init__.py ”““AddMessage(消息) 创建可以使用任何GetMessages函数访问的地理处理信息消息(Severity0)。 message(字符串):要添加的消息。”“ arcpy.geoprocessing D:\Program Files\GeoScene\Pro\Re…

容斥原理,多步容斥

容斥意义法 设计状态表示容斥的过程。比较简单的容斥题目一般可以容斥意义。 如果我们要求方案数的话,通常情况下我们的把限制视为两个方面,一方面是总限制,一方面是对于每个物品的限制,这样设集合 S i S_i Si​表示满足总限制以及…

十八、模型构建器(ModelBuilder)快速提取城市建成区——批量掩膜提取夜光数据、夜光数据转面、面数据融合、要素转Excel(基于参考比较法)

一、前言 前文实现批量投影栅格、转为整型,接下来重点实现批量提取夜光数据,夜光数据转面、夜光数据面数据融合、要素转Excel。将相关结果转为Excel,接下来就是在Excel中进行阈值的确定,阈值确定无法通过批量操作,除非采用其他方式,但是那样的学习成本较高,对于参考比较…

《Generic Dynamic Graph Convolutional Network for traffic flow forecasting》阅读笔记

论文标题 《Generic Dynamic Graph Convolutional Network for traffic flow forecasting》 干什么活:交通流预测(traffic flow forecasting )方法:动态图卷积网络(Dynamic Graph Convolutional Network)…

2. 网络之网络编程

网络编程 文章目录 网络编程1. UDP1.1 DatagramSocket1.1.1 DatagramSocket 构造方法1.1.2 DatagramSocket 方法: 1.2 DatagramPacket1.2.1 DatagramPacket构造方法1.2.2 DaragramPacket方法1.2.3InetSocketAddress API 1.3 UDP回显服务器1.3.1 框架结构1.3.2 读取请…

miniconda快速安装

目录 一、Linux下miniconda安装 1.1、安装 1.2、miniconda初始化 二、Windows下miniconda安装 三、maOS下miniconda安装 3.1、安装 3.2、miniconda初始化 四、参考: 本文给出windows、macos、linux下快速安装miniconda方法。 对比conda,minicond…

Maven Repository使用

1.Maven Repository网站 https://mvnrepository.com/https://mvnrepository.com/ 2.查询需要的依赖 3.参考例子 <!-- https://mvnrepository.com/artifact/org.freeswitch.esl.client/org.freeswitch.esl.client --> <dependency> <groupId>org.freesw…

第22期 | GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区&#xff0c;集成了生成预训练 Transformer&#xff08;GPT&#xff09;、人工智能生成内容&#xff08;AIGC&#xff09;以及大型语言模型&#xff08;LLM&#xff09;等安全领域应用的知识。在这里&#xff0c;您可以…

浅谈安科瑞无线测温产品在埃及某房建配电项目中的应用

1.电气接点测温的必要性 电力系统的一次系统一般由供电线路&#xff08;包括架空线路和电缆&#xff09;、变压器、母线、开关柜等电气设备组成。其相互之间存在大量的电气连接点&#xff0c;由于电流流过产生热量&#xff0c;所以几乎所有的电气故障都会导致故障点温度的变化…