Flink 实战之 Real-Time DateHistogram

系列文章
  • Flink 实战之 Real-Time DateHistogram
  • Flink 实战之从 Kafka 到 ES

DateHistogram 用于根据日期或时间数据进行分桶聚合统计。它允许你将时间序列数据按照指定的时间间隔进行分组,从而生成统计信息,例如每小时、每天、每周或每月的数据分布情况。

Elasticsearch 就支持 DateHistogram 聚合,在关系型数据库中,可以使用 GROUP BY 配合日期函数来实现时间分桶。但是当数据基数特别大时,或者时间分桶较多时,这个聚合速度就非常慢了。如果前端想呈现一个时间分桶的 Panel,这个后端接口的响应速度将非常感人。

我决定用 Flink 做一个实时的 DateHistogram。

实验设计

场景就设定为从 Kafka 消费数据,由 Flink 做实时的时间分桶聚合,将聚合结果写入到 MySQL。

源端-数据准备

Kafka 中的数据格式如下,为简化程序,测试数据做了尽可能的精简:

testdata.json

{"gid" : "001254500828905","timestamp" : 1620981709790
}

KafkaDataProducer 源端数据生成程序,要模拟数据乱序到达的情况:

package org.example.test.kafka;import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.Random;
import java.util.Scanner;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import com.alibaba.fastjson.JSONObject;public class KafkaDataProducer {private static final String TEST_DATA = "testdata.json";private static final String GID = "gid";private static final String TIMESTAMP = "timestamp";// 定义日志颜色public static final String reset = "\u001B[0m";public static final String red = "\u001B[31m";public static final String green = "\u001B[32m";public static void main(String[] args) throws InterruptedException {Properties props = new Properties();String topic = "trace-2024";props.put("bootstrap.servers", "127.0.0.1:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<String, String>(props);InputStream inputStream = KafkaDataProducer.class.getClassLoader().getResourceAsStream(TEST_DATA);Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8.name());String content = scanner.useDelimiter("\\A").next();scanner.close();JSONObject jsonContent = JSONObject.parseObject(content);int totalNum = 2000;Random r = new Random();for (int i = 0; i < totalNum; i++) {// 对时间进行随机扰动,模拟数据乱序到达long current = System.currentTimeMillis() - r.nextInt(60) * 1000;jsonContent.put(TIMESTAMP, current);producer.send(new ProducerRecord<String, String>(topic, jsonContent.toString()));// wait some timeThread.sleep(2 * r.nextInt(10));System.out.print("\r" + "Send " + green + (i + 1) + "/" + totalNum + reset + " records to Kafka");}Thread.sleep(2000);producer.close();System.out.println("发送记录总数: " + totalNum);}
}

目标端-表结构设计

MySQL 的表结构设计:

CREATE TABLE `flink`.`datehistogram` (`bucket` varchar(255) PRIMARY KEY,`count` bigint
);

bucket 列用于存储时间分桶,形如 [09:50:55 - 09:51:00],count 列用于存储对应的聚合值。

实现

maven 依赖:

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.2-1.17</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.72</version></dependency>
</dependencies>

BucketCount 类用于转换 Kafka 中的数据为时间分桶格式,并便于聚合:

package org.example.flink.data;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;public class BucketCount {private long timestamp;private String bucket;private long count;public BucketCount(long timestamp) {this.timestamp = timestamp;this.bucket = formatTimeInterval(timestamp);this.count = 1;}public BucketCount(String bucket, long count) {this.bucket = bucket;this.count = count;}/*** 将时间戳格式化为时间区间格式* * @param time* @return 例如 [11:28:00 — 11:28:05]*/private String formatTimeInterval(long time) {// 定义输出的日期时间格式DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneId.systemDefault()).toLocalDateTime();// 提取秒数并计算区间开始时间int seconds = dateTime.getSecond();int intervalStartSeconds = (seconds / 5) * 5;// 创建区间开始和结束时间的 LocalDateTime 对象LocalDateTime intervalStartTime = dateTime.withSecond(intervalStartSeconds);LocalDateTime intervalEndTime = intervalStartTime.plusSeconds(5);// 格式化区间开始和结束时间为字符串String startTimeString = intervalStartTime.format(outputFormatter);String endTimeString = intervalEndTime.format(outputFormatter);// 返回格式化后的时间区间字符串return startTimeString + "-" + endTimeString;}// 省略Getter, Setter
}

RealTimeDateHistogram 类完成流计算:

package org.example.flink;import java.time.Duration;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.example.flink.data.BucketCount;import com.alibaba.fastjson.JSONObject;public class RealTimeDateHistogram {public static void main(String[] args) throws Exception {// 1. prepareConfiguration configuration = new Configuration();configuration.setString("rest.port", "9091");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(2 * 60 * 1000);// 使用rocksDB作为状态后端env.setStateBackend(new EmbeddedRocksDBStateBackend());// 2. Kafka SourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("trace-2024").setGroupId("group-01").setStartingOffsets(OffsetsInitializer.latest()).setProperty("commit.offsets.on.checkpoint", "true").setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(),"Kafka Source");sourceStream.setParallelism(2);	// 设置source算子的并行度为2// 3. 转换为易于统计的BucketCount对象结构{ bucket: 00:00, count: 200 }SingleOutputStreamOperator<BucketCount> mapStream = sourceStream.map(new MapFunction<String, BucketCount>() {@Overridepublic BucketCount map(String value) throws Exception {JSONObject jsonObject = JSONObject.parseObject(value);long timestamp = jsonObject.getLongValue("timestamp");return new BucketCount(timestamp);}});mapStream.name("Map to BucketCount");mapStream.setParallelism(2);	// 设置map算子的并行度为2// 4. 设置eventTime字段作为watermark,要考虑数据乱序到达的情况SingleOutputStreamOperator<BucketCount> mapStreamWithWatermark = mapStream.assignTimestampsAndWatermarks(WatermarkStrategy.<BucketCount>forBoundedOutOfOrderness(Duration.ofSeconds(60)).withIdleness(Duration.ofSeconds(60)).withTimestampAssigner(new SerializableTimestampAssigner<BucketCount>() {@Overridepublic long extractTimestamp(BucketCount bucketCount, long recordTimestamp) {// 提取eventTime字段作为watermarkreturn bucketCount.getTimestamp();}}));mapStreamWithWatermark.name("Assign EventTime as Watermark");// 5. 滚动时间窗口聚合SingleOutputStreamOperator<BucketCount> windowReducedStream = mapStreamWithWatermark.windowAll(TumblingEventTimeWindows.of(Time.seconds(5L)))	// 滚动时间窗口.trigger(ProcessingTimeTrigger.create()) // ProcessingTime触发器.allowedLateness(Time.seconds(120))  	 // 数据延迟容忍度, 允许数据延迟乱序到达.reduce(new ReduceFunction<BucketCount>() {@Overridepublic BucketCount reduce(BucketCount bucket1, BucketCount bucket2) throws Exception {// 将两个bucket合并,count相加return new BucketCount(bucket1.getBucket(), bucket1.getCount() + bucket2.getCount());}});windowReducedStream.name("Window Reduce");windowReducedStream.setParallelism(1);		// reduce算子的并行度只能是1// 6. 将结果写入到数据库DataStreamSink<BucketCount> sinkStream = windowReducedStream.addSink(JdbcSink.sink("insert into flink.datehistogram(bucket, count) values (?, ?) "+ "on duplicate key update count = VALUES(count);",(statement, bucketCount) -> {statement.setString(1, bucketCount.getBucket());statement.setLong(2, bucketCount.getCount());},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://127.0.0.1:3306/flink").withUsername("username").withPassword("password").build()));sinkStream.name("Sink DB");sinkStream.setParallelism(1);				// sink算子的并行度只能是1// 执行env.execute("Real-Time DateHistogram");}
}

几个关键点

window

Flink 的 window 将数据源沿着时间边界,切分成有界的数据块,然后对各个数据块进行处理。下图表示了三种窗口类型:

窗口划分策略比较
  • 固定窗口(又名滚动窗口)
    固定窗口在时间维度上,按照固定长度将无界数据流切片,是一种对齐窗口。窗口紧密排布,首尾无缝衔接,均匀地对数据流进行切分。

  • 滑动窗口
    滑动时间窗口是固定时间窗口的推广,由窗口大小和窗口间隔两个参数共同决定。当窗口间隔小于窗口大小时,窗口之间会出现重叠;当窗口间隔等于窗口大小时,滑动窗口蜕化为固定窗口;当窗口间隔大于窗口大小时,得到的是一个采样窗口。与固定窗口一样,滑动窗口也是一种对齐窗口。

  • 会话窗口
    会话窗口是典型的非对齐窗口。会话由一系列连续发生的事件组成,当事件发生的间隔超过某个超时时间时,意味着一个会话的结束。会话很有趣,例如,我们可以通过将一系列时间相关的事件组合在一起来分析用户的行为。会话的长度不能先验地定义,因为会话长度在不同的数据集之间永远不会相同。

EventTime

数据处理系统中,通常有两个时间域:

  • 事件时间:事件发生的时间,即业务时间。
  • 处理时间:系统发现事件,开始对事件进行处理的时间。

根据事件时间划分窗口 的方式在事件本身的发生时间备受关注时显得格外重要。下图所示为将无界数据根据事件时间切分成 1 小时固定时间窗口:

根据事件时间划分固定窗口

要特别注意箭头中所示的两个事件,两个事件根据处理时间所在的窗口,跟事件时间发生的窗口不是同一个。如果基于处理时间划分窗口的话,结果就是错的。只有基于事件时间进行计算,才能保证数据的正确性。

当然,天下没有免费的午餐。事件时间窗口功能很强大,但由于迟到数据的原因,窗口的存在时间比窗口本身的大小要长很多,导致的两个明显的问题是:

  • 缓存:事件时间窗口需要存储更长时间内的数据。
  • 完整性:基于事件时间的窗口,我们也不能判断什么时候窗口的数据都到齐了。Flink 通过 watermark,能够推断一个相对精确的窗口结束时间。但是这种方式并不能得到完全正确的结果。因此,Flink 还支持让用户能定义何时输出窗口结果,并且定义当迟到数据到来时,如何更新之前窗口计算的结果。

reduce

Reduce 算子基于 ReduceFunction 对集合中的元素进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。常用的聚合方法如 average, sum, min, max, count 都可以使用 reduce 实现。

效果预览

从效果图中可以看出,Sum Panel 中的 stat value(为 DateHistogram 中每个 Bucket 对应值的加和)和 Kafka 端的数据跟进的非常紧,代表 Flink 的处理延迟非常低。向 Kafka 中总计压入的数据量和 Flink 输出的数据总数一致,代表数据的统计结果是准确的。此外,最近一段时间的柱状图都在实时变化,代表 Flink 对迟到的数据按照 EventTime 进行了准确处理,把数据放到了准确的 date bucket 中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/836755.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软路由 + NAS 实现日常生活办公

组网拓扑设备监控指标设备主要用途或部署服务 1. OpenWrtWireGuard VPN 组网从而实现内网穿透便于访问家庭局域网络; 懂得都懂; 运行一些 docker 小玩意。2. QNAP NASQuObjects 对象存储服务器:Typora 图床功能、Joplin 笔记远程同步; Plex Media Server:搭建个人的影音库…

数据采集实践4

课程链接 https://edu.cnblogs.com/campus/fzu/2024DataCollectionandFusiontechnology作业链接 https://edu.cnblogs.com/campus/fzu/2024DataCollectionandFusiontechnology/homework/13288gitee仓库链接 https://gitee.com/wd_b/party-soldier-data-collection/tree/master/…

违规生产检测视频分析服务器安全帽安全服检测批量操作功能教程

在工业自动化和智能化的浪潮中,视频监控系统正经历着从传统监控向智能监控的转变。视频分析服务器,作为这一转变的核心,正以其独特的优势在安全管理领域扮演着越来越重要的角色。本文将详细介绍视频分析服务器的技术特点、优势以及如何通过批量操作来提高监控效率和安全性。…

使用WebRTC技术搭建小型的视频聊天页面

目录目录 参考资料 什么是WebRTC? 能做什么? 架构图 个人理解(类比)核心知识点 核心知识点类比ICE框架 STUN(协议) NAT(网络地址转换) TURN SDP(会话描述协议) WebRTC的核心API现在开始做饭 准备阶段环境准备 服务器搭建 Coturn TURN server(开源服务) 部署 Signal Server信令…

HarmonyOS-Chat聊天室|纯血鸿蒙Next5 api12聊天app|ArkUI仿微信

自研原生鸿蒙NEXT5.0 API12 ArkTS仿微信app聊天模板HarmonyOSChat。 harmony-wechat原创重磅实战纯血鸿蒙OS ArkUI+ArkTs仿微信App聊天实例。包括聊天、通讯录、我、朋友圈等模块,实现类似微信消息UI布局、编辑器光标处输入文字+emo表情图片/GIF动图、图片预览、红包、语音/位…

Apache Dolphinscheduler数据质量源码分析

Apache DolphinScheduler 是一个分布式、易扩展的可视化数据工作流任务调度系统,广泛应用于数据调度和处理领域。 在大规模数据工程项目中,数据质量的管理至关重要,而 DolphinScheduler 也提供了数据质量检查的计算能力。本文将对 Apache DolphinScheduler 的数据质量模块进…

通过域名访问内网服务器

cloudflare优选ip访问家用服务器 前言 由于一直有使用markdown写笔记的需求,但是每次处理图片的时候总是很头疼。突然,我瞥见了还在角落里面吃灰小主机,因此萌生了废物利用想法,搭建一个外网可访问的图床。图床直接使用lsky-pro就可以,关键还是在外网访问上。 于是在网上看…

【算法】KMP 与 Z 函数

1. KMP 1.1 算法简介 可以做到线性匹配的快速匹配字符串的算法,并可以维护字符串最长公共前后缀,扩展出计算字符串周期。 在 OI 界 KMP 算法是字符串板块中很经典的算法,可以扩展出很多巧妙的解题技巧。 1.2 算法流程 1.2.1 字符串匹配 考虑 \(O(n^2)\) 暴力的匹配,瓶颈在于…

apifox使用小记

1.copy as cURL(cmd)之后在apifox里直接import cURL 2.调用时发生301错误 通常情况下是因为有session校验存在(用户校验)。解决方案: F12里将cookie里的session取到,在apifox里全局配置 这里踩了一个坑 第一次我是import了一个get请求,发送后发生301,所以我去设置了co…

毕业实习总结报告

毕业实习总结报告这既是毕业实习要求的总结报告,也是我对AutoSAR的一点理解,更是个人对未来生活的一点思考。我不希望把这份报告草草水过,而是希望把现在的感受记录下来,给以后的自己看一看,好记性不如记下来。涉及到工作细节和隐私的部分不在此展示了。时间像一头野驴呀,…

IDEA 2024 最新激活码,激活至2099(附有效idea激活码+激活工具)

IDEA 2024最新激活码,激活至2099(附有效idea激活码+激活工具)若提示We could not validate your license ff83b7bd51f5460ca43aabd7a96863a0.信息,idea激活时提示激活码失效解决方法: IDEA 2024 解决 We could not validate your license ff83b7bd51f5460ca43aabd7a96863a…

大学物理上册

质点作曲线运动时,质点在某一点的速度方向就是沿该点曲线的切线方向。