Flink之迟到的数据

迟到数据的处理

  1. 推迟水位线推进: WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  2. 设置窗口延迟关闭:.allowedLateness(Time.seconds(3))
  3. 使用侧流接收迟到的数据: .sideOutputLateData(lateData)
public class Flink12_LateDataCorrect {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] fields = line.split(",");return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 水位线延迟2秒.withTimestampAssigner((event, ts) -> event.getTs()));ds.print("input");OutputTag<WordCountWithTs> lateOutputTag = new OutputTag<>("late", Types.POJO(WordCountWithTs.class));//new OutputTag<WordCount>("late"){}SingleOutputStreamOperator<UrlViewCount> urlViewCountDs = ds.map(event -> new WordCountWithTs(event.getUrl(), 1 , event.getTs())).keyBy(WordCountWithTs::getWord).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(5))  // 窗口延迟5秒关闭.sideOutputLateData(lateOutputTag) // 捕获到侧输出流.aggregate(new AggregateFunction<WordCountWithTs, UrlViewCount, UrlViewCount>() {@Overridepublic UrlViewCount createAccumulator() {return new UrlViewCount();}@Overridepublic UrlViewCount add(WordCountWithTs value, UrlViewCount accumulator) {accumulator.setCount((accumulator.getCount() == null ? 0L : accumulator.getCount()) + value.getCount());return accumulator;}@Overridepublic UrlViewCount getResult(UrlViewCount accumulator) {return accumulator;}@Overridepublic UrlViewCount merge(UrlViewCount a, UrlViewCount b) {return null;}},new ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>.Context context, Iterable<UrlViewCount> elements, Collector<UrlViewCount> out) throws Exception {UrlViewCount urlViewCount = elements.iterator().next();//补充urlurlViewCount.setUrl(key);//补充窗口信息urlViewCount.setWindowStart(context.window().getStart());urlViewCount.setWindowEnd(context.window().getEnd());// 写出out.collect(urlViewCount);}});urlViewCountDs.print("window") ;//TODO 将窗口的计算结果写出到Mysql的表中, 有则更新,无则插入/*窗口触发计算输出的结果,该部分数据写出到mysql表中执行插入操作,后续迟到的数据,如果窗口进行了延迟, 窗口还能正常对数据进行计算, 该部分数据写出到mysql执行更新操作。建表语句:CREATE TABLE `url_view_count` (`url` VARCHAR(100) NOT NULL  ,`cnt` BIGINT NOT NULL,`window_start` BIGINT NOT NULL,`window_end` BIGINT NOT NULL,PRIMARY KEY (url, window_start, window_end )  -- 联合主键) ENGINE=INNODB DEFAULT CHARSET=utf8*/SinkFunction<UrlViewCount> jdbcSink = JdbcSink.<UrlViewCount>sink("replace into url_view_count(url, cnt ,window_start ,window_end) value (?,?,?,?)",new JdbcStatementBuilder<UrlViewCount>() {@Overridepublic void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {preparedStatement.setString(1, urlViewCount.getUrl());preparedStatement.setLong(2, urlViewCount.getCount());preparedStatement.setLong(3, urlViewCount.getWindowStart());preparedStatement.setLong(4, urlViewCount.getWindowEnd());}},JdbcExecutionOptions.builder().withBatchSize(2).withMaxRetries(3).withBatchIntervalMs(1000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/test").withUsername("root").withPassword("000000").build());urlViewCountDs.addSink(jdbcSink) ;//捕获侧输出流SideOutputDataStream<WordCountWithTs> lateData = urlViewCountDs.getSideOutput(lateOutputTag);lateData.print("late");//TODO 将侧输出流中的数据,写出到mysql中的表中,需要对mysql中已经存在的数据进行修正//转换结构  WordCountWithTs => UrlViewCount//调用flink计算窗口的方式, 基于当前数据的时间计算对应的窗口SingleOutputStreamOperator<UrlViewCount> mapDs = lateData.map(wordCountWithTs -> {Long windowStart = TimeWindow.getWindowStartWithOffset(wordCountWithTs.getTs()/*数据时间*/, 0L/*偏移*/, 10000L/*窗口大小*/);Long windowEnd = windowStart + 10000L;return new UrlViewCount(wordCountWithTs.getWord(), 1L, windowStart, windowEnd);});// 写出到mysql中SinkFunction<UrlViewCount> lateJdbcSink = JdbcSink.<UrlViewCount>sink("insert into url_view_count (url ,cnt , window_start ,window_end) values(?,?,?,?) on duplicate key update cnt = VALUES(cnt) + cnt  ",new JdbcStatementBuilder<UrlViewCount>() {@Overridepublic void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {preparedStatement.setString(1, urlViewCount.getUrl());preparedStatement.setLong(2, urlViewCount.getCount());preparedStatement.setLong(3, urlViewCount.getWindowStart());preparedStatement.setLong(4, urlViewCount.getWindowEnd());}},JdbcExecutionOptions.builder().withBatchSize(2).withMaxRetries(3).withBatchIntervalMs(1000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/test").withUsername("root").withPassword("000000").build());mapDs.addSink(lateJdbcSink) ;try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

withIdleness关键字

解决某条流长时间没有数据,不能推进水位线,导致下游窗口的窗口无法正常计算。

public class Flink12_WithIdleness {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);SingleOutputStreamOperator<Event> ds1 = env.socketTextStream("hadoop102", 8888).map(line -> {String[] words = line.split(" ");return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs())//如果超过10秒钟不发送数据,就不等待该数据源的水位线.withIdleness(Duration.ofSeconds(10)));ds1.print("input1");SingleOutputStreamOperator<Event> ds2 = env.socketTextStream("hadoop102", 9999).map(line -> {String[] words = line.split(" ");return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs())//如果超过10秒钟不发送数据,就不等待该数据源的水位线
//                                .withIdleness(Duration.ofSeconds(10)));ds2.print("input2");ds1.union(ds2).map(event->new WordCount(event.getUrl(),1)).keyBy(WordCount::getWord).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("count").print("window");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

基于时间的合流

窗口联结Window Join

WindowJoin: 在同一个窗口内的相同key的数据才能join成功。

orderDs.join( detailDs ).where( OrderEvent::getOrderId )  // 第一条流用于join的key.equalTo( OrderDetailEvent::getOrderId) // 第二条流用于join的key.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<OrderEvent, OrderDetailEvent, String>() {@Overridepublic String join(OrderEvent first, OrderDetailEvent second) throws Exception {// 处理join成功的数据return  first + " -- " + second ;}}).print("windowJoin");

时间联结intervalJoin

在这里插入图片描述

IntervalJoin : 以一条流中数据的时间为基准, 设定上界和下界, 形成一个时间范围, 另外一条流中相同key的数据如果能落到对应的时间范围内, 即可join成功。

核心代码:

 orderDs.keyBy(OrderEvent::getOrderId).intervalJoin(detailDs.keyBy( OrderDetailEvent::getOrderId)).between(Time.seconds(-2) , Time.seconds(2))//.upperBoundExclusive()  排除上边界值//.lowerBoundExclusive()  排除下边界值.process(new ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>() {@Overridepublic void processElement(OrderEvent left, OrderDetailEvent right, ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>.Context ctx, Collector<String> out) throws Exception {//处理join成功的数据out.collect( left + " -- " + right );}}).print("IntervalJoin");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/266919.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

两线制 4-20mA 隔离变送器(输出回路供电)

两线制 4-20mA 隔离变送器(输出回路供电) 特征&#xff1a; ◆低成本&#xff0c;小体积&#xff0c;符合 UL94-V0 阻燃标准 ◆安装方式采用国际标准 DIN35mm 导轨安装方式 ◆双隔离(信号输入、信号输出相互之间 3000VDC 隔离) ◆4-20mA 电流输入与输出&#xff0c;精度高(失真…

Vue3封装一个轮播图组件

先看效果 编写组件代码 CarouselChart.vue <template><div classimg-box><el-button clickpreviousImages v-ifprops.showBtn>←</el-button><div classimg><div styledisplay: flex;gap: 20px idmove><imgclassimg-item v-for(item…

TrustZone之安全虚拟化

在Armv7-A首次引入虚拟化时,它仅在非安全状态中添加。在Armv8.3之前,Armv8也是如此,如下图所示: 如前所述在切换安全状态时,EL3用于托管固件和安全监视器。安全EL0/1托管受信任的执行环境(TEE),由受信任的服务和内核组成。 在安全状态下,没有对多个虚拟机的需…

云计算 云原生

一、引言 云计算需要终端把信息上传到服务器&#xff0c;服务器处理后再返回给终端。在之前人手一台手机的情况下&#xff0c;云计算还是能handle得过来的。但是随着物联网的发展&#xff0c;什么东西都要联网&#xff0c;那数据可就多了去了&#xff0c;服务器处理不过来&…

【docker】常用命令

启动docker服务 systemctl start docker 停止docker服务 systemctl stop docker 重启docker服务 systemctl restart docker 查看docker服务状态 systemctl status docker 设置开机启动docker服务 systemctl enable docker 设置关闭开机启动docker服务 systemctl disable …

设计模式——建造者模式(创建型)

引言 生成器模式是一种创建型设计模式&#xff0c; 使你能够分步骤创建复杂对象。 该模式允许你使用相同的创建代码生成不同类型和形式的对象。 问题 假设有这样一个复杂对象&#xff0c; 在对其进行构造时需要对诸多成员变量和嵌套对象进行繁复的初始化工作。 这些初始化代码…

修改汽车的控制系统实现自动驾驶,基于一个开源的汽车驾驶辅助系统实现全自动驾驶

修改汽车的控制系统实现自动驾驶,基于一个开源的汽车驾驶辅助系统实现全自动驾驶。 自动驾驶汽车依靠人工智能、视觉计算、雷达、监控装置和全球定位系统协同合作,让电脑可以在没有任何人类主动的操作下,自动安全地操作机动车辆。 演示视频: Openpilot :一个开源的汽车驾…

Mac安装DevEco Studio

下载 首先进入鸿蒙开发者官网&#xff0c;顶部导航栏选择开发->DevEco Studio 根据操作系统下载不同版本&#xff0c;其中Mac(X86)为英特尔芯片&#xff0c;Mac(ARM)为M芯片。 安装 下载完毕后&#xff0c;开始安装。 点击Agree 首次使用&#xff0c;请选择Do not impor…

windows10安装MongoDB的入门简易教程【学习自用】

一、下载安装包 下载地址 https://www.mongodb.com/try/download/community 因为我是在windows上安装&#xff0c;所以下载windows版本的压缩包格式 二、下载后解压到要安装的目录里去 我这里在D盘装&#xff0c;所以就解压到D盘中我自己建的的mongoDB文件夹中。 三、在解…

青少年CTF-Misc(持续更新中)

FLAG&#xff1a;当觉得自己很菜的时候&#xff0c;就静下心来学习 专研方向:Web安全&#xff0c;CTF 每日emo&#xff1a;听一千遍反方向的钟&#xff0c;我们能回到过去吗&#xff1f; 1.StegoTXT&#xff1a; 解压缩文件。发现字母中存在覆盖。使用0宽隐写在线解密得到flag…

打包less

接HTML和css之后对less进行打包 1.在之前的文件夹里的src文件夹创建一个less文件 2.打开webpack——>中文文档——>Loader——>less—loader 3.复制下图代码到终端 4.复制下图内容到webpack.config.js脚本 5.在src里的js文件年引入less文件 6.在终端运行 npm run te…

通用的AGI 安全风险

传统安全风险 平台基础设施安全风险 模型与数据层安全风险 应用层安全风险 平台基础设施安全风险 &#xff08;1&#xff09;物理攻击&#xff1a;机房管控不到位 &#xff08;2&#xff09;网络攻击 &#xff08;3&#xff09;计算环境&#xff1a;自身安全漏洞&#xf…