大数据-玩转数据-双流JOIN

一、双流JOIN

在Flink中, 支持两种方式的流的Join: Window Join和Interval Join

二、Window Join

窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素.
注意:
1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了)
2.join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳。
Window join 仍然可分为 滚动窗口、滑动窗口Join、会话窗口Join

滚动窗口Join代码段示例
在这里插入图片描述

package com.lyh.flink12;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/24 22:09*/
public class Flink01_Join_Window_Tumbling {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));s1.join(s2).where(WaterSensor::getId).equalTo(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 必须使用窗口.apply(new JoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic String join(WaterSensor first, WaterSensor second) throws Exception {return "first: " + first + ", second: " + second;}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

三、Interval Join

间隔流join(Interval Join), 是指使用一个流的数据按照key去join另外一条流的指定范围的数据.
如下图: 橙色的流去join绿色的流.范围是由橙色流的event-time + lower bound和event-time + upper bound来决定的.
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

在这里插入图片描述
Interval Join只支持event-time
必须是keyBy之后的流才可以interval join

package com.lyh.flink12;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;

import java.time.Duration;public class  Sql_Join_Windows_Interval{public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));s1.keyBy(WaterSensor::getId).intervalJoin(s2.keyBy(WaterSensor::getId)).between(Time.seconds(-2),Time.seconds(3)).process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor left,WaterSensor right,Context ctx,Collector<String> out) throws Exception {out.collect(left + "," + right);}}).print();try{env.execute();} catch (Exception e){e.printStackTrace();}}}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/123806.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spark SQL 任务参数调优1

1.背景 要了解spark参数调优&#xff0c;首先需要清楚一部分背景资料Spark SQL的执行原理&#xff0c;方便理解各种参数对任务的具体影响。 一条SQL语句生成执行引擎可识别的程序&#xff0c;解析&#xff08;Parser&#xff09;、优化&#xff08;Optimizer&#xff09;、执行…

106.从中序与后序遍历序列构造二叉树

力扣题目链接(opens new window) 根据一棵树的中序遍历与后序遍历构造二叉树。 注意: 你可以假设树中没有重复的元素。 例如&#xff0c;给出 中序遍历 inorder [9,3,15,20,7]后序遍历 postorder [9,15,7,20,3] 返回如下的二叉树&#xff1a; class Solution { public:Tr…

大语言模型之十四-PEFT的LoRA

在《大语言模型之七- Llama-2单GPU微调SFT》和《大语言模型之十三 LLama2中文推理》中我们都提到了LoRA&#xff08;低秩分解&#xff09;方法&#xff0c;之所以用低秩分解进行参数的优化的原因是为了减少计算资源。 我们以《大语言模型之四-LlaMA-2从模型到应用》一文中的图…

AJAX--Express速成

一、基本概念 1、AJAX(Asynchronous JavaScript And XML)&#xff0c;即为异步的JavaScript 和 XML。 2、异步的JavaScript 它可以异步地向服务器发送请求&#xff0c;在等待响应的过程中&#xff0c;不会阻塞当前页面。浏览器可以做自己的事情。直到成功获取响应后&#xf…

【Spring Cloud】深入探索统一网关 Gateway 的搭建,断言工厂,过滤器工厂,全局过滤器以及跨域问题

文章目录 前言为什么需要网关以及网关的作用网关的技术实现 一、Gateway 网关的搭建1.1 创建 Gateway 模块1.2 引入依赖1.3 配置网关1.4 验证网关是否搭建成功1.5 微服务结构分析 二、Gateway 断言工厂2.1 Spring 提供的断言工厂2.2 示例&#xff1a;设置断言工厂 三、Gateway …

RabbitMQ安装与简单使用

安装 下载资源 可以访问官网查看下载信息rabbitmq官网 选择合适的版本&#xff0c;注意&#xff1a;rabbitmq需要下载一个Erlang才能使用 我自己是在一下两个连接中下载的 rabbitmq 3.8.8 erlang 21.3.8.15 需要下载其他版本的同学注意erlang版本是否匹配&#xff0c;可以访…

Java Stream的基本使用

Stream特点 Stream的一系列操作组成了Stream的流水线, Stream流水线包含: 数据源: 这里的数据源可能是集合/数组, 可能是生成器, 甚至可能是IO通道(Files.lines)零个或多个中间操作: 中间操作会导致流之间的转化, 如filter(Predicate)一个终端操作: 终端操作会产生最终所需要的…

【考研英语】2011 年英语(一)排序题思路复盘(费曼学习法)

文章目录 引言一、找语段特征词二、确定位置写在最后 引言 英语一中的新题型之一 —— 排序题&#xff0c;我是看的刘琦老师的方法课&#xff0c;她用的 2011 年的真题来讲解方法。讲完让我们回去用“费曼学习法”复盘以下&#xff0c;我个人感觉是一个不错的方法&#xff0c;…

Altium Designer 批量添加元器件后缀

Altium Designer 批量添加元器件后缀 方法一方法二可能出现的问题要注意 方法一 您可以使用 Altium Designer 中的“批量修改元器件名称”功能来批量添加元器件后缀。具体步骤如下&#xff1a; 1.为了方便显示 操作流程&#xff0c;我这里复制了几个原理图的文件&#xff0c;粘…

8.3Jmeter使用json提取器提取数组值并循环(循环控制器)遍历使用

Jmeter使用json提取器提取数组值并循环遍历使用 响应返回值例如&#xff1a; {"code":0,"data":{"totalCount":11,"pageSize":100,"totalPage":1,"currPage":1,"list":[{"structuredId":&q…

“构建高效的前端表单验证与增删改功能实现“

目录 引言1. 基础增删改功能代码展示后端代码 2. 表单验证基本表单验证自定义验证规则 总结 引言 在现代Web开发中&#xff0c;前端表单验证和增删改功能是不可或缺的核心要素。本文将介绍如何使用ElementUI库来实现高效的前端表单验证和增删改功能。我们将从基础开始&#xf…

Flutter+SpringBoot实现ChatGPT流实输出

FlutterSpringBoot实现ChatGPT流式输出、上下文了连续对话 最终实现Flutter的流式输出上下文连续对话。 这里就是提供一个简单版的工具类和使用案例&#xff0c;此处页面仅参考。 服务端 这里直接封装提供工具类&#xff0c;修改自己的apiKey即可使用&#xff0c;支持连续…