Spark Structured Streaming 分流或双写多表 / 多数据源(Multi Sinks / Writes)

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

在 Spark Structured Streaming 中,我们有时候需要将最后的处理结果分流或双写到多张表或多个数据源(Multi Sinks / Writes),一个典型的例子是:在 CDC 数据入湖场景里,一个 Kafka Topic 上存放着整库或多张表的 CDC 消息,使用 Spark 从 Kafka 中摄取这些消息后,需要根据消息中提供的数据库名和数据表名对 CDC 消息分流,然后写到数据湖上对应的 ODS 表中,这就是一种典型的“数据分流”场景。在 Spark Structured Streaming 中,实现多表 / 多数据源的分流或双写主要依赖 foreachBatchforeach 这两个方法,本文就围绕它们介绍一下分流或双写多表 / 多数据源的具体实现。

首先,要明确 foreachforeachBatch 都是 action,也就意味着使用它们时已经到了流的末端,绝大数情况下,就是要将记录写入目标数据源了,这也是foreachforeachBatch 这两个方法绝大多数的应用场景。通常,在 Spark 中将数据写入一个数据源是这样做的(以写 parquet 文件为例):

writeStream.format("parquet").option("path", "path/to/destination/dir").start()

由于 Spark 内置了 parquet 格式的 data writer, 所以我们只需填写一些相应的配置,就可以直接把 DF 按对应的格式写到目标位置了,那什么情况下我们要使用 foreachforeachBatch 呢?下面展开来介绍一下。

1. foreachBatch 的应用场景


大多数情况下,一条的流处理的 pipeline 都是从一个 Source 开始,中间经历各种处理后,最终写入了一个 Sink,但是,在某些场景下,我们流的重点可能需要写入的并不是一个 Sink,而是多个,典型的情形有:

  • 数据分流:需要将数据“分流”写入不同的数据源或数据表( 简单说就是 dispatch )

  • 数据多写:需要同时向多个下游数据源相同相同数据( 简单说就是 duplicate )

虽然我们可以非常“粗暴”地通过 for 循环构建多个 writer 实现上述两种典型的写入需求,但是这种做法会让每一个 sink 变成独立的 streaming query(作业),是代价很高的应对方法,并不实用。最好的做法就是通过 foreachBatch 来实现,实际上上面两种需求正是 foreachBatch 的典型应用场景。我们看一下 foreachBatch 的接口声明:

def foreachBatch(function: (Dataset[T], Long) ⇒ Unit): DataStreamWriter[T]

我们需要为 foreachBatch 传入一个函数字面量,它有两参数,第一个对应一个 micro-batch 的 DataFrame, 第二个是这个 micro-batch 的 ID,拿到 micro-batch 的 DataFrame 后,我们可以在这个 DataFrame 上作相应的转换处理,最后调用现成的 writer 写入目标端。这里涉及到 Spark Streaming 的 Micro-Batch,也就是上述参数列表中的 Dataset[T] 类型的那个 DataFrame ,关于 Micro-Batch 在流上运行方式,下图给出了非常形象的描绘:

在这里插入图片描述

简单地说,Micro-Batch 模式下需要收集齐一定量(或一小段时间范围内)的数据,整理成一个 DataFrame 去处理,它的延迟是在秒级。上图下方时间轴上的每一小撮数据就是 foreachBatch 中传入的那个 DataFrame。

这里,我们特别澄清一个容易误解的地方: foreachBatch 是没有“循环”语义的,这里的 foreach 其实是意在针对每一个 micro-batch 的,不是空间维度上迭代多个 micro-batch, 而是时间维度上针对每一个流经的 micro-batch 进行处理。这里也能提现从 source 构建出的 DF 和这个方法里的 micro-batch 的 DF 的差异,前者是一个无界的 DF,本质上是一个流,更加“实体”的 DF 其实是 foreachBatch 中的这个 DF,它是较短时间内聚齐的“一小撮”数据,边界是确定的!

下面,我们针对分流和双写两种典型场景给出详细的示例代码。

1.1. 通过 foreachBatch 实现数据“分流”


我们以向两种不同的 Hudi 表写入数据为例,先将数据过滤,得到分流后的 DF,然后向对应的 Hudi 表中写入:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>// 分流 table_1 的数据并写入filteredDF1 = batchDF.filter(...)filteredDF1.write.format("hudi").option(TABLE_NAME, "table_1").mode(SaveMode.Append).save("/path/1")// 分流 table_2 的数据并写入filteredDF2 = batchDF.filter(...)filteredDF2.write.format("hudi").option(TABLE_NAME, "table_2").mode(SaveMode.Append).save("/path/2")
}

1.2. 通过 foreachBatch 实现数据“双写”


我们以向两种不同的数据源写入数据为例,可以调用多次 write 操作,但是,由于每次写入都会导致数据被 recomputed,流本身可能不再存在或状态发生了改变,所以,必须要在写入前使用 persist, 保证向下游多次写入的数据是完全一样,最后记得再执行一遍 unpersist 即可。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>batchDF.persist()batchDF.write.format("csv").save(...)  // location 1, 对应数据源的 data writer 是已存在的batchDF.write.format("hudi").save(...)  // location 2, 对应数据源的 data writer 是已存在的batchDF.unpersist()
}

2. foreach 的应用场景


foreachBatch 很实用,但是在如下两种场景下无法工作的:

  • 没有现成的支持目标数据源的 data writer;
  • 当前流运行于 continuous processing 模式,不支持 micro-batch

如果是上述情形,我们就得使用 foreach 了,因为 foreach 要自行实现对目标数据源的链接和读写,同时,它的自定义处理逻辑又是作用到每一行上的,所以它能解决上述两种场景的问题。某种程度上,foreach 相比 foreachBatch 是一种更底层的 API。使用 foreach 需要提供一个 ForeachWriter,实现 open, process, 和close 三个方法,不过要注意的是这三个方案的调用时机是不同的,open / close 显然是 per-partition 要调用一次的, proess 则是要针对每条记录进行处理的。以下是 一个自行实现 foreach 的代码模板:

streamingDatasetOfString.writeStream.foreach(// 没有现成的 DataStreamWriter,需要自行实现行级别的存储逻辑。new ForeachWriter[String] {// 在 partition 这个粒度上创建针对目标数据源的连接,这比较符合常规def open(partitionId: Long, version: Long): Boolean = {// Open connection}// 数据梳理逻辑会作用到记录级别,而不是 miro-batch 的 df 级别。def process(record: String): Unit = {// Write string to connection}// 关闭连接,释放资源def close(errorOrNull: Throwable): Unit = {// Close the connection}}
).start()

关于 foreach 更多信息可以参考官方文档,这里就不再深究了,大多数情况,我们更多使用的还是 foreachBatch

3. 小结


foreachforeachBatch 都能在向目标数据源写入数据时实现定制化的逻辑,它们之间的差别在于:

  • foreachBatch多应用于数据分流或双写场景,目标数据源往往是已经有线程的 data writer 了
  • foreach 则要自行实现对目标数据的连接和读写处理
  • 两者操纵数据的颗粒度不同,foreach 对数据的梳理逻辑(process 方法)作用到 DF 中的每一行上,而 foreachBatch 则直接操纵的是每一个 micro-batch 对应的 DF。

参考资料

  • Spark 关于 foreach 和 foreachBatch 的官方文档
  • Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/661228.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTML:认识HTML及基本语法

目录 1. HTML介绍 2. 关于软件选择和安装 3. HTML的基本语法 1. HTML介绍 HyperText Markup Language 简称HTML,意为:超文本标记语言 超文本:是指页面内可以包含的图片,链接,声音,视频等内容 标记&am…

网站升级提示:我用react+go重构了网站并记录了部署项目简要步骤

先贴出来地址,这是我网站的地址易查网 可能有细心的小伙伴们已经看到了,原来我的网站是这样式的 妥妥的phph5 改造 前端react框架 前段时间学习了react,正愁无处练手,就有人说我的网站很low,我感觉这正是一个好的机会&#xff…

uniapp + uView动态表单校验

项目需求&#xff1a;动态循环表单&#xff0c;并实现动态表单校验 页面&#xff1a; <u--form label-position"top" :model"tmForm" ref"tmForm" label-width"0px" :rulesrules><div v-for"(element, index) in tmForm…

洞察Agent AI智能体的未来:机遇与挑战并存

&#x1f512;文章目录&#xff1a; &#x1f6f4;什么是Agent AI智能体 &#x1f4a5;Agent AI智能体的技术组成 ☂️Agent AI智能体的应用场景 &#x1f4a3;Agent AI智能体的挑战与问题 &#x1f6b2; Agent AI智能体在未来社会中的角色和影响 ❤️对Agent AI智能体未来的期…

2024五一杯数学建模B题思路分析 - 未来新城背景下的交通需求规划与可达率问题

文章目录 1 赛题选题分析 2 解题思路详细的思路过程放在文档中 ! ! &#xff01;&#xff01;&#xff01;&#xff01;&#xff01;3 最新思路更新 1 赛题 B题 未来新城背景下的交通需求规划与可达率问题 随着城市化的持续发展&#xff0c;交通规划在新兴城市建设中显得尤为关…

【Linux】进程创建

思维导图 学习内容 在这一篇博客的主要内容是学习fork函数&#xff0c;了解fork函数的功能、返回值等。我们需要学会使用fork函数创建子进程。 学习目标 进程的概念fork函数的初始fork函数的返回值写时拷贝fork函数的常规用法fork函数调用失败的原因 零、进程的概念 进程&am…

【C++】封装哈希表 unordered_map和unordered_set容器

目录​​​​​​​ 一、unordered系列关联式容器 1、unordered_map 2、unordered_map的接口 3、unordered_set 二、哈希表的改造 三、哈希表的迭代器 1、const 迭代器 2、 operator 3、begin()/end() ​ 4、实现map[]运算符重载 四、封装 unordered_map 和 unordered_se…

Python+PYGObject/PYGtk+CSS样式--2024python示例

隔久点不用老是会忘&#xff0c;留个笔记。。 PythonPYGObject/PYGtk&#xff0c;加载 CSS 样式的演示代码 demo 运行的效果截图&#xff1a; #!/usr/bin/env python3 import sys import gigi.require_version("Gtk", "3.0") from gi.repository import …

C语言(操作符)1

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸各位能阅读我的文章&#xff0c;诚请评论指点&#xff0c;关注收藏&#xff0c;欢迎欢迎~~ &#x1f4a5;个人主页&#xff1a;小羊在奋斗 &#x1f4a5;所属专栏&#xff1a;C语言 本系列文章为个人学习笔记&#x…

基于SSM的志愿者管理系统(含源码+sql+视频导入教程+文档+PPT)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1 、功能描述 基于SSM的志愿者管理系统3拥有三个角色&#xff1a; 管理员&#xff1a;用户管理、志愿组织管理、注册申请观看、活动管理、报名管理、打卡管理、公告管理等 用户&#xff1a;登录注册、…

动态规划-子序列问题2

文章目录 1. 最长定差子序列&#xff08;1218&#xff09;2. 最长的斐波那契子序列的长度&#xff08;873&#xff09;3. 最长等差数列&#xff08;1027&#xff09;4. 等差数列划分 II - 子序列&#xff08;446&#xff09; 1. 最长定差子序列&#xff08;1218&#xff09; 题…

制定语音芯片的语音识别指令时需要关注的内容

背景 最近定义设备识别的语音指令以及对应的语音反馈。虽然语音控制在软件里只是很小的一块功能&#xff0c;但也不能太马虎。新人入坑就要学习&#xff0c;学习前人的经验规避问题&#xff0c;最后总结经验给后人&#xff0c;给未来的自己。好记性不如烂笔头~ 下面一些问题是…