大数据-玩转数据-Flink时间滚动动窗口

一、说明

时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸.
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp())
时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。

二、思路

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定.
2.我们传递给window函数的对象叫窗口分配器.

三、数据准备

准备一个WaterSensor类方便演示

package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}

四、代码

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list  = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T>  list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}

五、结果

在hadoop100 服务器
输入nc -lk 999
在这里插入图片描述

消费结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/81102.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用CSS实现一个响应式轮播图?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 使用CSS实现响应式轮播图的示例⭐ HTML 结构⭐ CSS 样式 (styles.css)⭐ JavaScript 代码 (script.js)⭐ 实现说明⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带…

无人机精细化巡检方案制定:提高效率与准确性的关键

在当前技术日新月异的时代&#xff0c;无人机在多个领域的应用已成为行业标配。但如何制定出一套有效、细致的无人机巡检方案&#xff0c;确保其最大效能&#xff0c;成为许多组织与公司的核心议题。其中&#xff0c;复亚智能在此领域已展现出了卓越的实力与深入的见解。 1. 精…

cpolar+JuiceSSH实现手机端远程连接Linux服务器

文章目录 1. Linux安装cpolar2. 创建公网SSH连接地址3. JuiceSSH公网远程连接4. 固定连接SSH公网地址5. SSH固定地址连接测试 处于内网的虚拟机如何被外网访问呢?如何手机就能访问虚拟机呢? cpolarJuiceSSH 实现手机端远程连接Linux虚拟机(内网穿透,手机端连接Linux虚拟机) …

jQuery 在图片和文字中插入内容(多种情况考虑)

昨天接到一个新的需要&#xff0c;在后台文章编辑器中&#xff0c;每一个文章的正文前面&#xff0c;可以单独添加一个电头字段&#xff0c;但是如果在富文本编辑器中最上面就添加图片的话&#xff0c;图片就会把电头和正文中的文字给隔开。需要做的是获取到电头字段&#xff0…

CSS 盒子模型

前言 盒子模型-组成 CSS盒子模型是一种用来描述元素在页面布局中占据空间的模型。它将每个元素看作由内容区域、内边距、边框和外边距组成的一个矩形框。 盒子模型的组成部分包括&#xff1a; 内容区域&#xff08;Content&#xff09;&#xff1a;显示元素的实际内容&#xf…

Unity实现广告滚动播放、循环播放、鼠标切换的效果

效果&#xff1a; 场景结构&#xff1a; 特殊物体&#xff1a;panel下面用排列组件horizent layout group放置多个需要显示的面板&#xff0c;用mask遮罩好。 using System.Collections; using System.Collections.Generic; using DG.Tweening; using UnityEngine; using Unity…

matlab工具箱Filter Designer设计butterworth带通滤波器

1、在matlab控制界面输入fdatool; 2、在显示的界面中选择合适的参数&#xff1b;本实验中采样频率是200&#xff0c;低通30hz&#xff0c;高通60hz,点击butterworth滤波器。 3、点击设计滤波器按钮后&#xff0c;在生成的界面点击红框按钮&#xff0c;可生成simulink模型到当前…

NineData x SelectDB 完成产品兼容互认证

近日&#xff0c;新一代实时数据仓库厂商 SelectDB 与云原生智能数据管理平台 NineData 完成产品兼容互认证。经过严格的联合测试&#xff0c;双方软件完全相互兼容、功能完善、整体运行稳定且性能表现优异。基于本次的合作&#xff0c;双方将进一步为数据管理与大数据分析业务…

Module not found: Error: Can‘t resolve ‘vue-pdf‘ in ‘xxx‘

使用命令npm run serve时vue项目报错&#xff1a; Module not found: Error: Cant resolve vue-pdf in xxx 解决方案&#xff1a; 运行命令&#xff1a; npm install vue-pdf --save --legacy-peer-deps 即可解决。 再次顺利执行npm run serve

Cookie 和 Session 的工作流程

目录 一、Cookie是什么&#xff1f; 二、Session是什么? 三、Cookie的工作流程 四、Session的工作流程 五、Session和Cookie的区别和联系 一、Cookie是什么&#xff1f; Cookie是一种在网站和用户之间交换信息的机制。它是由Web服务器发送给用户浏览器的小型文本文件&#xff…

【填坑向】MySQL常见报错及处理系列(ERROR! The server quit without updating PID file)

本系列其他文章 【填坑向】MySQL常见报错及处理系列&#xff08;Communications link failure & Access denied for user ‘root‘‘localhost‘&#xff09;_AQin1012的博客-CSDN博客翻一下大致的意思就是默认会按照如下的顺序读取配置文件&#xff0c;我上面贴出的配置文…

elementUI moment 年月日转时间戳 时间限制

changeStartTime(val){debuggerthis.startT val// this.startTime parseInt(val.split(-).join())this.startTime moment(val).unix() * 1000 //开始时间毫秒if(this.endTime){this.endTime moment(this.endT).unix() * 1000 //结束时间毫秒if(this.startTime - this.endTi…