Flink双流(join)

 一、介绍

Join大体分类只有两种:Window Join和Interval Join

Window Join有可以根据Window的类型细分出3种:Tumbling(滚动) Window Join、Sliding(滑动) Window Join、Session(会话) Widnow Join。

        🌸Window 类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作。

        🌸Interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理,目前Stream join的结果是数据的卡尔积。

二、Window Join

✨Tumbling Window Join

        执行翻滚窗口联接时,具有公共键和公告翻滚窗口的所有元素将成对组合联接,并传递JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射。

        如图所示,我们定义了一个为2毫秒的翻滚窗口,结果窗口的形式为[0,1]、[2,3]..............该图显示了每个窗口中所以元素的成对组合,这些元素将传递给JoinFunction。注意在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

✨Sliding Window Join

        在执行滑动窗口联接时,具有公共键和公共滑动窗口的所以元素将作为成对组合联接,并传递JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!请注意,某些元素可能会联接到一个滑动窗口中,但不会联接到另一个滑动窗口中!

        在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,从而产生滑动窗口[-1,0],[1,2],[2,3]...........x轴下方的连续元素时传递给每个滑动窗口的Join Function的元素。在这里,你还可以看到,例如在窗口[2,3]中,橙色②和绿色③连接,但在窗口[1,2]中没有与任何对象连接。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

✨Session Window Join

        在执行会话窗口联接时,具有相同键(当“组合”满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出

        这里,我们定义一个会话窗口连接,其中每个会话被至少1毫秒的时间分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

三、Interval Join

        前面学习的Window Join必须要在一个Window中进行Join,那如果没有Window如何处理呢?interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

也就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且,流B的元素的时间戳 ≤ 流A的元素时间戳

 

在上面的示例中,我们将两个流“orange”和“green”连接起来,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是可以应用.lowerBoundExclusive()和.upperBoundExclusive来更改行为orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/484738.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringIOC之support模块StaticApplicationContext

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

数据库事物复习

事务 比如说将张三的银行账户拿出一千给李四&#xff0c;首先需要查询张三的账户余额&#xff0c;扣除1000&#xff0c;然后如果给李四加上1000的过程中出现异常会回滚事务&#xff0c;临时修改的数据会回复回去。 -- 1. 查询张三账户余额 select * from account where name …

Leetcode535(设计短网址)

例题&#xff1a; https://leetcode.cn/problems/encode-and-decode-tinyurl/description/ 分析&#xff1a; 题目要求可以将一个长网址变成一个短网址&#xff08;encode&#xff09;&#xff0c;也可以通过短网址找到原来的长网址&#xff0c;我们可以使用两个hashMap集合来实…

Flutter插件开发指南01: 通道Channel的编写与实现

Flutter插件开发指南01: 通道Channel的编写与实现 视频 https://www.bilibili.com/video/BV1ih4y1E7E3/ 前言 本文将会通过一个加法计算&#xff0c;来实现 Channel 的双向通讯&#xff0c;让大家有个一个体会。 Flutter插件 Flutter插件是Flutter应用程序与原生平台之间的桥…

【Vuforia+Unity】AR05-实物3D模型识别功能实现

对于3D物体的识别&#xff0c;可以是虚拟的也可以是实物的&#xff0c;但是对于虚拟的三维模型意义不大&#xff0c;我们完全可以把三维模型放在屏幕上截一张图&#xff0c;以图片识别的方式召唤数字内容&#xff0c;不过在虚拟现实中或许有用。 因此本文探讨的技术路线主要是…

新疆营盘古城及古墓群安防舱体实施方案

3 总体布局 3.1设计原则 3.1.1执行有效的国家标准、国家军用标准和行业标准&#xff1b; 3.1.2满足指标要求&#xff1b; 3.1.3采用通用化、模块化设计&#xff0c;提高设备可维修性&#xff1b; 3.1.4采用人机工程学知识进行设计&#xff0c;充分考虑安全性。 3.2 总体…

Flutter 3.19.0 版本新特性

其实在每个版本的更新中呢&#xff0c;都会合并很多很多的这个合并请求、还有开发建议&#xff0c;那么本版本的也不例外&#xff0c;社区官方发布的公告是合并了168个社区成员的1429个拉请求。 当然&#xff0c;如果你的时间允许的话&#xff0c;你可以去查看一下这些请求&am…

算法-搜索二维矩阵 II

1、题目来源 240. 搜索二维矩阵 II - 力扣&#xff08;LeetCode&#xff09; 2、题目描述 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 示例 1&#x…

【力扣hot100】刷题笔记Day9

前言 阴天睡得还挺舒服&#xff0c;9点半才醒&#xff0c;用刷题开启美好新一天&#xff01; 141. 环形链表 - 力扣&#xff08;LeetCode&#xff09; 哈希表 class Solution:def hasCycle(self, head: Optional[ListNode]) -> bool:seen set() # 哈希集合# seen {} #…

【Java 面试题】MySQL与Redis 如何保证双写一致性

目录 方案一:延时双删方案二: 删除缓存重试机制方案三:读取biglog异步删除缓存系列文章版本记录方案一:延时双删 延时双删流程 先删除缓存再更新数据库休眠一会(比如1秒),再次删除缓存。这个休眠一会,一般多久呢?都是1秒? 这个休眠时间 = 读业务逻辑数据

dell戴尔电脑灵越系列Inspiron 15 3520原厂Win11系统中文版/英文版

Dell戴尔笔记本灵越3520原装出厂Windows11系统包&#xff0c;恢复出厂开箱预装OEM系统 链接&#xff1a;https://pan.baidu.com/s/1mMOAnvXz5NCDO_KImHR5gQ?pwd3nvw 提取码&#xff1a;3nvw 原厂系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、Office办公软件、MyD…

Leo赠书活动-18期 《高效使用Redis》

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; 赠书活动专栏 ✨特色专栏&#xff1a;…