时间语义与窗口

时间语义

在Flink中,时间语义分为两种 : 处理时间和事件时间。时间语义与窗口函数是密不可分的。以窗口为单位进行某一段时间内指标统计,例如想要统计8点-9点的某个页面的访问量,此时就需要用到了窗口函数,这里的关键时间点是8点到9点,而这个时间点指的是事件时间,不是处理时间,也就是某个时间发生的时间。

一条日志数据发生的时间是8:59,而经过一系列转换到达系统开始处理的时间可能是9:01,很显然,如果想要统计8点到9点的数据,应该是根据事件的发生时间,而不是处理时间。

  • WaterMark

上面说到,在窗口函数开窗以及关窗中,不应该以当前系统的处理时间,而应该取当前的事件时间,这个事件时间就是WaterMark 用于标识当前流数据的时间到什么时候了,这个时间不是由系统去推进的,而是基于事件的到来去推进的。

  • WaterMark的延时

在流处理中,数据并不是一批一批到达的而是流式到达,且到达分布式系统处理的时间可能是乱序的

8:57 、 9:01 、 8:59 、 9:02 …

如果 想要统计8点到9点的数据统计,按照事件时间,当 9:01的数据到达之后,便认为,8点到9点的窗口可以关闭进行运算了,但是8:59 这条数据就 不在统计范围内了,这样就存在一定意义上的数据丢失。要解决这个问题,就是让Watermark的时间进行延时,也就是说,本来是计划9点关窗计算的,让Watermark的时间减少2分钟,那9:01这条数据到来之后,WaterMark的时间其实是8:59,9:02的数据来到以后,WaterMark才会到9:00,此时8点-9点才会关窗计算。这个等多久的时间就是WaterMark的延时。

窗口

窗口函数的统计基于时间语义的watermark,当窗口接收到数据以后,首先需要根据数据身上的时间戳来判断此条数据属于哪个窗口,将属于某个窗口的事件分配给窗口,而关窗计算的时间取决于WaterMark。在Flink中,窗口在同一时刻是可以存在多个,而不是只有一个。

在这里插入图片描述

数据流的WaterMark定义

  • 在DataStream上游数据流通过assignTimestampsAndWatermarks 分配watermark
new WatermarkStrategy<Event>() {@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return null;}@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return WatermarkStrategy.super.createTimestampAssigner(context);}}

createWatermarkGenerator 方法用于返回 WatermarkGenerator

@Public
public interface WatermarkGenerator<T> {void onEvent(T var1, long var2, WatermarkOutput var4);void onPeriodicEmit(WatermarkOutput var1);
}

WatermarkGenerator 两个方法分别对基于事件的WaterMark以及周期性的生成watermark两种生成策略发射watermark。

而每条数据上的时间戳该如何提前,是通过 TimestampAssigner 这个时间戳分配器提取的。

@FunctionalInterface
@Public
public interface TimestampAssigner<T> {long NO_TIMESTAMP = -9223372036854775808L;long extractTimestamp(T var1, long var2);
}

extractTimestamp 返回了一个时间戳,获取到这个时间以后,便可以根据这个时间去推进WaterMark的前进。

  • WaterMark时间分配器定义
//时间戳分配器,获取到事件时间后用于推进WaterMark的前进
class MyTimeStampAss implements TimestampAssigner<Event>{@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}
}
  • WaterMark生成器定义
//水位线提取器,基于事件或者周期性的生成水位线
class MyWaterMarkGenerator implements WatermarkGenerator<Event>{private long maxTimeStamp = 0;@Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {maxTimeStamp = Math.max(event.timestamp,maxTimeStamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {watermarkOutput.emitWatermark(new Watermark(maxTimeStamp)); //发射Watermark,让下游窗口感知}
}
  • 给DataStream设置WaterMark
public class WaterMarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = Env.getEnv();DataStream<Event> dataStream = env.addSource(new ClickSource());//设置数据流的watermarkSingleOutputStreamOperator<Event> stream = dataStream.assignTimestampsAndWatermarks(new MyWaterMarkStrategy());env.execute();}}class MyWaterMarkStrategy implements WatermarkStrategy<Event>{@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWaterMarkGenerator();}@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new MyTimeStampAss();}
}

默认的周期性生成WaterMark为200ms

 env.getConfig().setAutoWatermarkInterval(100);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/95842.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

githubPage部署Vue项目

github中新建项目 my-web &#xff08;编写vue项目代码&#xff09; myWebOnline(存放Vue打包后的dist包里面的文件) 发布流程 &#xff08;假设my-web项目已经编写完成&#xff09;Vue-cli my-web vue.config.js文件中 const { defineConfig } require(vue/cli-service)…

【LeetCode】3. 无重复字符的最长子串

3. 无重复字符的最长子串&#xff08;中等&#xff09; 方法&#xff1a;滑动窗口 哈希表 思路 这道题主要用到思路是&#xff1a;滑动窗口 什么是滑动窗口&#xff1f; 其实就是一个队列,比如例题中的 abcabcbb&#xff0c;进入这个队列&#xff08;窗口&#xff09;为 ab…

Python Opencv实践 - 轮廓检测

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/map.jpg") print(img.shape) plt.imshow(img[:,:,::-1])#Canny边缘检测 edges cv.Canny(img, 127, 255, 0) plt.imshow(edges, cmapplt.cm.gray)#查找轮廓 #c…

DHorse v1.3.2 发布,基于 k8s 的发布平台

版本说明 新增特性 构建版本、部署应用时的线程池可配置化&#xff1b; 优化特性 构建版本跳过单元测试&#xff1b; 解决问题 解决Vue应用详情页面报错的问题&#xff1b;解决Linux环境下脚本运行失败的问题&#xff1b;解决下载Maven安装文件失败的问题&#xff1b; 升…

大学物理 之 安培环路定理

文章目录 前言什么是安培环路定理安培环路定理有什么作用 深入了解深入学习 前言 什么是安培环路定理 安培环路定理的物理意义在于描述了电流和磁场之间的相互作用&#xff0c;以及如何在一个封闭的回路中分析这种相互作用。 简单的来说 , 用环路定理来解决在磁场中B对任意封…

【操作系统】一文快速入门,很适合JAVA后端看

作者简介&#xff1a; 目录 1.概述 2.CPU管理 3.内存管理 4.IO管理 1.概述 操作系统可以看作一个计算机的管理系统&#xff0c;对计算机的硬件资源提供了一套完整的管理解决方案。计算机的硬件组成有五大模块&#xff1a;运算器、控制器、存储器、输入设备、输出设备。操作…

2022年下半年系统架构设计师真题(下午带答案)

试题一 (25分) 某电子商务公司拟升级其会员与促销管理系统&#xff0c;向用户提供个性化服务&#xff0c;提高用户的粘性。在项目立项之初&#xff0c;公司领导层一致认为本次升级的主要目标是提升会员管理方式的灵活性&#xff0c;由于当前用户规模不大&#xff0c;业务也相对…

前端基础4——jQuery

文章目录 一、基本了解1.1 导入jQuery库1.2 基本语法1.3 选择器 二、操作HTML2.1 隐藏和显示元素2.2 获取与设置内容2.3 获取、设置和删除属性2.4 添加元素2.5 删除元素2.6 设置CSS样式 三、jQuery Ajax3.1 基本语法3.2 回调函数3.3 常用HTTP方法3.4 案例一3.4.1 准备工作3.4.2…

Vue2023 面试归纳及复习

目录 1. Vue 3中的Composition API&#xff08;Hooks&#xff09;是什么&#xff1f;它与Options API有何不同&#xff1f; Composition API 的优势 2 Options API 语法格式 3 setup 语法糖 4 Vue3的生命周期 5. 请解释一下Vue 3中的Teleport&#xff08;传送&#xf…

设计模式-迭代器

文章目录 1. 引言1.1 概述1.2 设计模式1.3 迭代器模式的应用场景1.4 迭代器模式的作用 2. 基本概念2.1 迭代器 Iterator2.2 聚合 Aggregate2.3 具体聚合 ConcreteAggregate 3. Java 实现迭代器模式3.1 Java 集合框架3.2 Java 迭代器接口3.3 Java 迭代器模式实现示例 4. 迭代器模…

ElasticSearch(一)数据类型

ElasticSearch&#xff08;一&#xff09;数据类型 1.简述 Es数据类型分为基础数据类型和复杂类型数据&#xff0c;掌握ES数据类型才能进一步使用ES检索数据内容。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot…

【python爬虫】10.指挥浏览器自动工作(selenium)

文章目录 前言selenium是什么怎么用设置浏览器引擎获取数据解析与提取数据自动操作浏览器 实操运用确认目标分析过程代码实现 本关总结 前言 上一关&#xff0c;我们认识了cookies和session。 分别学习了它们的用法&#xff0c;以及区别。 还做了一个项目&#xff1a;带着小…