Spark Structured Streaming使用教程

文章目录

    • 1、输入数据源
    • 2、输出模式
    • 3、sink输出结果
    • 4、时间窗口
      • 4.1、时间窗口
      • 4.2、时间水印(Watermarking)
    • 5、使用例子

Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎,Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。
Structured Streaming把持续不断的流式数据当做一个不断追加的表,这使得新的流处理模型与批处理模型非常相似。您将把流计算表示为在静态表上的标准批处理查询,Spark将其作为无界输入表上的增量查询运行。

1、输入数据源

  • File source - 以数据流的形式读取写入目录中的文件。文件将按照文件修改时间的先后顺序进行处理。如果设置了latestFirst,则顺序将相反。支持的文件格式为text, CSV, JSON, ORC, Parquet。请参阅DataStreamReader接口的文档,了解最新的列表,以及每种文件格式支持的选项。注意,监视目录的文件改变,只能是原子性的改变,比如把文件放入该目录,而不是持续写入该目录中的某个文件。
  • Kafka source - 从Kafka读取数据。它兼容Kafka代理版本0.10.0或更高版本。查看Kafka集成指南了解更多细节。
  • Socket source (用于测试) - 从套接字连接读取UTF8文本数据。监听服务器套接字位于驱动程序。请注意,这应该仅用于测试,因为它不提供端到端的容错保证。
  • Rate source (用于测试) - 以每秒指定的行数生成数据,每个输出行包含一个时间戳和值。其中timestamp为包含消息发送时间的timestamp类型,value为包含消息计数的Long类型,从0开始作为第一行。该源代码用于测试和基准测试。

2、输出模式

  • 我们可以定义每次结果表中的数据更新时,以何种方式,将哪些数据写入外部存储。有3种模式:
  • complete mode:所有数据都会被写入外部存储。具体如何写入,是根据不同的外部存储自身来决定的。
  • append mode:只有新的数据,以追加的方式写入外部存储。只有当我们确定,result table中已有的数据是肯定不会被改变时,才应该使用append mode。
  • update mode:只有被更新的数据,包括增加的和修改的,会被写入外部存储中。
aggDF.writeStream().outputMode("complete").format("console").start();

3、sink输出结果

  • File sink - 将输出存储到一个目录。
    输出模式支持Append
writeStream.format("parquet")        // can be "orc", "json", "csv", etc..option("path", "path/to/destination/dir").start()
  • Kafka sink - 将输出存储到Kafka中的一个或多个主题。
    输出模式支持Append, Update, Complete
writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "updates").start()
  • Console sink (用于测试) - 每次有触发器时,将输出打印到控制台/标准输出。支持两种输出模式:Append和Complete。这应该用于在低数据量上进行调试,因为在每次触发后都会收集整个输出并将其存储在驱动程序的内存中。
    输出模式支持Append, Update, Complete
writeStream.format("console").start()
  • Memory sink (用于测试) - 输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
    输出模式支持Append, Complete
输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
  • 自定义输出Foreach和ForeachBatch - Foreach:针对每条数据的输出;ForeachBatch:针对每批次的数据输出。
    输出模式支持Append, Update, Complete
// Foreach
streamingDatasetOfString.writeStream().foreach(new ForeachWriter<String>() {@Override public boolean open(long partitionId, long version) {// Open connection}@Override public void process(String record) {// Write string to connection}@Override public void close(Throwable errorOrNull) {// Close the connection}}
).start();// ForeachBatch
streamingDatasetOfString.writeStream().foreachBatch(new VoidFunction2<Dataset<String>, Long>() {public void call(Dataset<String> dataset, Long batchId) {// Transform and write batchDF}    }
).start();

4、时间窗口

4.1、时间窗口

在业务场景中,经常会遇到按时间段进行聚合操作,Spark提供了基于滑动窗口的事件时间集合操作,每个时间段作为一个分组,并对每个组内的每行数据进行聚合操作。
在这里插入图片描述

可以使用groupBy()和window()操作来表示窗口聚合。

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),words.col("word")
).count();

4.2、时间水印(Watermarking)

WaterMarking的作用主要是为了解决:延迟到达的数据是否丢弃,系统可以删除过期的数据。
在这里插入图片描述

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.withWatermark("timestamp", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"),col("word")).count();

5、使用例子

package com.penngo.spark;import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.art.Art;import java.io.Serializable;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.window;public class SparkStructStream {private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");public static class DataTxt implements Serializable {private String text;private Timestamp time;public DataTxt(String text, LocalDateTime time) {this.text = text;this.time = Timestamp.valueOf(time);}public String getText() {return text;}public void setText(String text) {this.text = text;}public Timestamp getTime() {return time;}public void setTime(Timestamp time) {this.time = time;}}public static void socket(SparkSession spark) throws Exception{// 运行:nc -lk 9999Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9999).load();Dataset<DataTxt> words = lines.as(Encoders.STRING()).map((MapFunction<String, DataTxt>) x -> {String[] strs = x.split(",");LocalDateTime date = LocalDateTime.parse(strs[1],formatter);Arrays.asList(x.split(",")).iterator();DataTxt data = new DataTxt(strs[0], date);return data;}, Encoders.bean(DataTxt.class));Dataset<Row> wordCounts = words.toDF().withWatermark("time", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃.groupBy(window(col("time"), "10 minutes", "5 minutes"),col("text")).count();wordCounts.writeStream().outputMode("append").foreach(new ForeachWriter<Row>() {@Override public boolean open(long partitionId, long version) {
//                        System.out.println("open==========partitionId:" + partitionId + ",version:" + version);return true;}@Override public void process(Row record) {// Write string to connectionSystem.out.println("recordxxxxxxxxxxxxxxxxxx:======" + record);}@Override public void close(Throwable errorOrNull) {// Close the connection
//                        System.out.println("close==========errorOrNull:" + errorOrNull);}})
//                .format("console").start().awaitTermination();}public static void kafka(SparkSession spark) throws Exception{// Subscribe to 1 topicDataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "192.168.245.1:9092").option("subscribe", "topic-news").option("startingOffsets","latest").option("maxOffsetsPerTrigger",1000).load();df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");df.printSchema();df.writeStream().outputMode("append").format("console").start().awaitTermination();}public static void main(String[] args) throws Exception{Logger.getLogger("org.apache.spark").setLevel(Level.WARN);Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF);Logger.getLogger("org.apache.kafka").setLevel(Level.WARN);System.setProperty("hadoop.home.dir", "/usr/local/hadoop-3.3.6");System.setProperty("HADOOP_USER_NAME", "root");SparkSession spark = SparkSession.builder().appName("SparkStructStream").master("local[*]").getOrCreate();//        socket(spark);kafka(spark);}
}

参考自官方文档:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/256241.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 使用FluentScheduler触发定时任务

写在前面 FluentScheduler是.Net平台下的一个自动任务调度组件&#xff0c;以前经常用的是Quarz.Net&#xff0c;相对而言FluentScheduler的定时配置更为直观&#xff0c;可直接用接口进行参数化设置&#xff0c;对Cron表达式有恐惧症的人来说简直就是福音&#xff0c;使用起来…

推荐一个FL Studio最适配的midi键盘?

Hello大家好&#xff01;好消息&#xff01;好消息&#xff01;特大好消息&#xff01; 水果党们&#xff01;终于有属于自己的专用MIDI键盘啦&#xff01; 万众期待的Novation FLKEY系列 正式出炉&#xff01; 做编曲和音乐制作的朋友们&#xff0c;对水果软件FLSTUDIO应该…

msvcp140_1.dll是什么东西?找不到msvcp140_1.dll文件的5种解决方法

关于msvcp140_1.dll丢失的问题。相信很多电脑用户都遇到过这个问题&#xff0c;但是不知道该如何解决。那么&#xff0c;接下来我将从三个方面为大家介绍&#xff1a;msvcp140_1.dll文件属性介绍、msvcp140_1.dll丢失原因以及msvcp140_1.dll丢失的5个解决方法。 首先&#xff…

C语言课程设计

内容与设计思想 1、系统功能与分析&#xff08;填写你所设计的菜单及流程图&#xff09;。 菜单&#xff1a; 日历打印 日历推算 日历间隔倒计时牌 退出程序 模块设计 根据功能需要&#xff1a; 源文件&#xff1a; #include<stdio.h> #include&…

SAP ABAP 通过右键菜单完成Tree Control 节点的增删改功能

通过右键菜单完成Tree Control 节点的增删改功能 Tree 节点的增删改是很重要的功能&#xff0c;包括&#xff1a;增加本级节点&#xff0c;增加子节点&#xff0c;修改节点&#xff0c;删 除节点。完成后效果如下&#xff1a; 选择根节点&#xff0c;单击右键&#xff0c;弹…

随笔-这都是命吗

我与鹏哥、小付有个小群&#xff0c;前几天&#xff0c;鹏哥在群里发了一个图&#xff0c;是他那个城市准备扶持的高新产业&#xff0c;有元宇宙、量子信息、生物制药、人工智能什么的。 先前的时候鹏哥给我说过&#xff0c;当地准备了六百多亩地&#xff0c;准备发展高新产业…

秒懂设计模式笔记

秒懂设计模式笔记 为了让软件架构良好、稳固&#xff0c;设计模式针对各种场景提供了适合的代码模块的复用及扩展解决方案。 面向对象的三大特性&#xff1a;继承、封装、多态。 封装 现实中&#xff0c;计算机主机机箱对于主板、CPU及内存等配件的封装等。 饮料是被装在杯…

ZKP Understanding Nova (2) Relaxed R1CS

Understanding Nova Kothapalli, Abhiram, Srinath Setty, and Ioanna Tzialla. “Nova: Recursive zero-knowledge arguments from folding schemes.” Annual International Cryptology Conference. Cham: Springer Nature Switzerland, 2022. Nova: Paper Code 2. Unders…

软件测试面试题解析--什么题是必问的?

设计测试用例的主要方法有哪些&#xff1f; 简述一下缺陷的生命周期&#xff1f; 测试流程&#xff1f; 项目流程&#xff1f; 验收测试中和β测试区别&#xff1f; 如何维护测试用例&#xff1f; 每天测多少用例 怎么分配的测试的 一天能找多少bug 你在上一家公司&#xff0c;…

el-select的多选multible带全选组件二次封装(vue2,elementUI)

1.需求 Select 选择器 多选需要增加 全选 和 取消全选 功能&#xff0c;前端框架为vue2&#xff0c;UI组件为elementUI。 2. 代码 html部分 <template><el-tooltip effect"dark" :disabled"defaultValue.length < 0" :content"defaul…

信号完整性分析

目录 前言一、信号完整性SI1.1 信号失真1.2 串扰1.3 衰减 二、电源完整性PI2.1 地弹2.2 电源轨道塌陷 三、电磁兼容EMC3.1 电磁辐射3.2 抗干扰 前言 本篇介绍信号完整性分析的知识体系&#xff0c;以及部分分析方法。   什么是信号完整性?通俗来讲&#xff0c;信号在互连线的…

MySQL数据库从小白到入门(二)

多表关系&#xff1a; 项目开发中&#xff0c;在进行数据库表结构设计时&#xff0c;会根据业务需求及业务模块之间的关系&#xff0c;分析并设计表结构。由于业务之间相互关联&#xff0c;所以各个表结构之间也存在着各种联系&#xff0c;基本上分为三种。 外键&#xff1a; 创…