【入门Flink】- 10基于时间的双流联合(join)

统计固定时间内两条流数据的匹配情况,需要自定义来实现——可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了内置的 join 算子。

窗口联结(Window Join)

一段时间的双流合并

定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

stream1.join(stream2).where(<KeySelector>) // stream1 的 keyBy.equalTo(<KeySelector>) // stream2 的 keyBy.window(<WindowAssigner>).apply(<JoinFunction>)
public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String,Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));DataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0) // ds1 的keyby.equalTo(r2 -> r2.f0) // ds2 的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据,调用 join 方法* @param first ds1 的数据* @param second ds2 的数据*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}

输出:

image-20231112153403293

window join:

  1. 两条流落在同一个时间窗口范围内才能匹配
  2. 根据 keyBy 的 key,来进行匹配关联
  3. 只能拿到匹配上的数据,类似有固定时间范围的inner join

间隔联结(Interval Join)

存在如下场景:两条流匹配的两个数据有可能刚好“卡在”窗口边缘两侧,窗口内就都没有匹配了,可以使用“间隔联结”(interval join)来解决。

原理

给定两个时间点,分别叫作间隔的“上界”(upperBound)“下界”(lowerBound);可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp +upperBound], 即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:这段时间作为可以匹配另一条流数据的“窗口”范围。

匹配的条件为:

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

image-20231112154002415

stream1
.keyBy(<KeySelector>)// KeyedStream 调用   
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right,Context ctx, Collector<String> out){out.collect(left + "," + right);}
});

处理迟到数据,可以使用左右侧输出流

完整代码:

public class IntervalJoinWithLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.socketTextStream("hadoop102", 7777).map((MapFunction<String, Tuple2<String, Integer>>) value -> {String[] datas = value.split(",");return Tuple2.of(datas[0], Integer.valueOf(datas[1]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.socketTextStream("hadoop102", 8888).map((MapFunction<String, Tuple3<String, Integer, Integer>>) value -> {String[] datas = value.split(",");return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));/*** 【Interval join】* 1、只支持事件时间* 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后* 3、process 中,只能处理 join 上的数据* 4、两条流关联后的 watermark,以两条流中最小的为准* 5、如果 当前数据的事件时间 < 当前的 watermark,就是迟到数据,主流的 process 不处理* => between 后,可以指定将 左流 或 右流的迟到数据放入侧输出流* *///1. 分别做 keyby,key 其实就是关联条件KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);//2. 调用 interval join// 左右测输出流迟到标签OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)) // 指定上下界.sideOutputLeftLateData(ks1LateTag) // 将ks1的迟到数据,放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将ks2的迟到数据,放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left ks1 的数据* @param right ks2 的数据* @param ctx 上下文* @param out 采集器*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}});process.print("主流");process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/169933.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vmware workstation 与 device/credential guard 不兼容

VM虚拟机报错 vmware虚拟机启动时报错&#xff1a;vmware workstation 与 device/credential guard 不兼容&#xff1a; 系统是win10专业版&#xff0c;导致报错原因最终发现是安装了docker&#xff0c;docker自带下载虚拟机Hyper-V&#xff0c;而导致vmware workstation 与 …

git分支与tag标签的介绍与使用)

git分支与tag标签的介绍与使用 一.什么是分支与标签1.2.开发环境分层 二git分支介绍2.1分支操作2.2.IDEA中操作分支 三、Git标签的讲解3.1.GitBashHere操作标签3.2. IDEA中操作标签 一.什么是分支与标签 分支&#xff08;Branches&#xff09;&#xff1a; 功能开发&#xff…

Ubuntu取消sudo的输入密码

Ubuntu最近要安装软件&#xff0c;每次sudo都要输入一次密码&#xff0c;感觉很麻烦&#xff0c;于是想能不能设置为不输入密码&#xff0c;在网上找了一下解决办法。 主要参考这篇文章&#xff1a; Ubuntu取消sudo时输入密码 上面这篇文章使用的是vim&#xff0c;但是按照博…

github私有仓库开发,公开仓库发布版本

文章目录 github私有仓库开发,公开仓库发布版本需求背景实现思路GitHub Releases具体步骤广告 github私有仓库开发,公开仓库发布版本 需求背景 github私有仓库开发,公开仓库发布版本&#xff0c;既可以保护源代码,又可以发布版本给用户使用。许多知名软件项目都采用了这样的开…

mysql主从复制-使用心得

文章目录 前言环境配置主库从库 STATEMENTbinloggtidlog-errorDistSQL总结 前言 mysql 主从复制使用感受&#xff0c;遇到一些问题的整理&#xff0c;也总结了一些排查问题技巧。 环境 mysql5.7 配置 附&#xff1a;千万级数据快速插入配置可以参考&#xff1a;mysql千万数…

【Liunx】服务器解决 跨域问题

首先打开后端的站点 在站点设置内打开 "配置文件" 然后在 “server_name 本机ip ”下方添加跨域配置,添加成功后重启nginx即可 add_header Access-Control-Allow-Origin *; add_header Access-Control-Allow-Methods GET, POST, OPTIONS, PUT, DELETE; add_header A…

业务出海之服务器探秘

这几年随着国内互联网市场的逐渐饱和&#xff0c;越来越多的公司加入到出海的行列&#xff0c;很多领域都取得了很不错的成就。虽然出海可以获得更加广阔的市场&#xff0c;但也需要面对很多之前在国内可能没有重视的一些问题。集中在海外服务器的选择维度上就有很大的变化。例…

数据分析实战 | 泊松回归——航班数据分析

目录 一、数据及分析对象 二、目的及分析任务 三、方法及工具 四、数据读入 五、数据理解 六、数据准备 七、模型训练 八、模型评价 一、数据及分析对象 CSV文件&#xff1a;o-ring-erosion-only.csv 数据集链接&#xff1a;https://download.csdn.net/download/m0_7…

从0到0.01入门React | 001.精选 React 面试题

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

小样本目标检测(Few-Shot Object Detection)综述

背景 前言:我的未来研究方向就是这个,所以会更新一系列的文章,就关于FSOD,如果有相同研究方向的同学欢迎沟通交流,我目前研一,希望能在研一发文,目前也有一些想法,但是具体能不能实现还要在做的过程中慢慢评估和实现.写文的主要目的还是记录,避免重复劳动,我想用尽量简洁的语言…

【机器学习】八、规则学习

知识图谱与基本概念 基本概念 规则学习定义&#xff1a;从训练数据中学习出一组能用于对未见示例进行判别的规则。 规则定义&#xff1a;规则一般是&#xff1a;语义明确、能描述数据分布所隐含的客观规律或领域概念。 逻辑规则定义&#xff1a;⊕←?1⋀?2⋀?3…⋀??⊕…

登顶request模块

华子目录 Requests介绍安装requests模块常用方法常用属性实例引入各种请求方式基于get请求带参数的get请求推荐写法 基于post请求添加headers信息content获取二进制数据bytes类型获取json数据第一种方式第二种方式 response响应状态码判断 高级操作会话维持通过cookie维持会话通…