Flink中FileSink的使用

在Flink中提供了StreamingFileSink用以将数据流输出到文件系统.
这里结合代码介绍如何使用FileSink.
首先FileSink有两种模式forRowFormatforBulkFormat

    public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(final Path basePath, final Encoder<IN> encoder) {return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());}public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory) {return new DefaultBulkFormatBuilder<>(basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());}

二者的区别是forRowFormat是一行一行的处理数据,而forBulkFormat则是可以一次处理多条数据,而多条处理的好处就是可以帮助生成列式存储的文件如ParquetFileORCFile,而forRowFormat则做不到这点,关于列式存储和行式存储的区别可通过数据存储格式这篇文章简单做一个了解.

下面以forRowFormat作为示例演示一下代码

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/6/27* @Description: 测试**/
public class FlinkFileSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1env.setParallelism(1);// 这里是生成数据流,CustomizeSource这个类是自定义数据源(为了方便测试)DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new CustomizeSource());// 现将数据转换成字符串形式SingleOutputStreamOperator<String> map = dataStreamSource.map(bean -> bean.toString());// 构造FileSink对象,这里使用的RowFormat,即行处理类型的FileSink<String> fileSink = FileSink// 配置文件输出路径及编码格式.forRowFormat(new Path("/Users/xxx/data/testData/"), new SimpleStringEncoder<String>("UTF-8"))// 设置文件滚动策略(文件切换).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(180)) // 设置间隔时长180秒进行文件切换.withInactivityInterval(Duration.ofSeconds(20)) // 文件20秒没有数据写入进行文件切换.withMaxPartSize(MemorySize.ofMebiBytes(1)) // 设置文件大小1MB进行文件切换.build())// 分桶策略(划分子文件夹).withBucketAssigner(new DateTimeBucketAssigner<String>()) // 按照yyyy-mm-dd--h进行分桶//设置分桶检查时间间隔为100毫秒.withBucketCheckInterval(100)// 输出文件文件名相关配置.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("test_") // 文件前缀.withPartSuffix(".txt") // 文件后缀.build()).build();// 输出到文件map.print();map.sinkTo(fileSink);env.execute();}
}

代码内容这里就不详细说明了,注释已经写得很清楚了.有一点要注意使用FileSink的时候我们要加上对应的pom依赖.我这里使用Flink版本是1.15.3

        <!-- File connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>

这里我们先看一下生成的结果文件

-rw-r--r--  1 xxx  staff   1.0M  6 27 14:43 .test_-eb905337-488d-46f1-8177-86fbb46f778f-0.txt.inprogress.91e49c89-cc79-44f5-940d-ded2770b61a1
-rw-r--r--  1 xxx  staff   1.0M  6 27 14:44 .test_-eb905337-488d-46f1-8177-86fbb46f778f-1.txt.inprogress.c548bd30-8583-48d5-91d2-2e11a7dca2cd
-rw-r--r--  1 xxx  staff   1.0M  6 27 14:45 .test_-eb905337-488d-46f1-8177-86fbb46f778f-2.txt.inprogress.a041dba1-8f37-4307-82da-682c48b0796b
-rw-r--r--  1 xxx  staff   280K  6 27 14:45 .test_-eb905337-488d-46f1-8177-86fbb46f778f-3.txt.inprogress.e05d1759-0a38-4a25-bcd0-1216ce6dda59

这里有必要说明一下由于我使用的是Mac在生成文件的时候会出现一个小问题,上面的那种文件会隐藏起来,直接点开文件夹是看不到的可以通过command + shift + .来显示隐藏文件,或者像我这种直接通过终端ll -a来查看,windows没有发现这个问题.
可以看到除了最后一个文件,其他的文件大小基本都是1MB,最后一个是因为写入的数据大小还没有满足1MB,并且写入时间也没有满足滚动条件,所以还在持续写入中.
而且通过文件名我们可以看到所有文件中都带有inprogress这个状态,这是因为我们没有开启checkpoint,这里先说一下FileSink写入文件时的三个文件状态,官网原图如下:
在这里插入图片描述
这三种状态分别是inprogresspendingfinished,对应的就是处理中、挂起和完成,官网中同时也说明了FileSink必须和checkpoint配合使用,不然文件的状态只会出现inprogresspending,原文内容如下:
在这里插入图片描述
下面我们在看一下加入checkpoint的代码和结果文件
代码如下

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/6/27* @Description: 测试**/
public class FlinkFileSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1env.setParallelism(1);// 这里是生成数据流,CustomizeSource这个类是自定义数据源(为了方便测试)DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new CustomizeSource());// 现将数据转换成字符串形式SingleOutputStreamOperator<String> map = dataStreamSource.map(bean -> bean.toString());// 每20秒作为checkpoint的一个周期env.enableCheckpointing(20000);// 两次checkpoint间隔最少是10秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);// 程序取消或者停止时不删除checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint必须在60秒结束,否则将丢弃env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只能有一个checkpointenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置EXACTLY_ONCE语义,默认就是这个env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoint存储位置env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 设置执行模型为Streaming方式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 构造FileSink对象,这里使用的RowFormat,即行处理类型的FileSink<String> fileSink = FileSink// 配置文件输出路径及编码格式.forRowFormat(new Path("/Users/xxx/data/testData/"), new SimpleStringEncoder<String>("UTF-8"))// 设置文件滚动策略(文件切换).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(180)) // 设置间隔时长180秒进行文件切换.withInactivityInterval(Duration.ofSeconds(20)) // 文件20秒没有数据写入进行文件切换.withMaxPartSize(MemorySize.ofMebiBytes(1)) // 设置文件大小1MB进行文件切换.build())// 分桶策略(划分子文件夹).withBucketAssigner(new DateTimeBucketAssigner<String>()) // 按照yyyy-mm-dd--h进行分桶//设置分桶检查时间间隔为100毫秒.withBucketCheckInterval(100)// 输出文件文件名相关配置.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("test_") // 文件前缀.withPartSuffix(".txt") // 文件后缀.build()).build();// 输出到文件map.print();map.sinkTo(fileSink);env.execute();}
}

看一下结果文件:

-rw-r--r--  1 xxx  staff   761K  6 27 15:13 .test_-96ccd42e-716d-4ee0-835e-342618914e7d-2.txt.inprogress.aa5fccaa-f99f-4059-93e7-6d3c548a66b3
-rw-r--r--  1 xxx  staff   1.0M  6 27 15:11 test_-96ccd42e-716d-4ee0-835e-342618914e7d-0.txt
-rw-r--r--  1 xxx  staff   1.0M  6 27 15:12 test_-96ccd42e-716d-4ee0-835e-342618914e7d-1.txt

可以看到已经完成的文件状态中已经没有inprogress和其他的后缀了,而正在写入的文件则是处于inprogress状态.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/3665.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu的USB相关操作

这里写目录标题 0.信息查看1. 串口设备设置2. 串口调试助手 0.信息查看 指令lsusb输出Bus 004 Device 002: ID 05e3:0620 Genesys Logic, Inc. USB3.2 Hub Bus 004 Device 001: ID 1d6b:0003 Linux Foundation 3.0 root hub Bus 003 Device 006: ID 5986:115f Acer, Inc Integ…

ShaderGraph制作“红旗迎风飘扬”效果(Unity2019版)

文章目录 零、准备“旗面”游戏物体一、核心1 模仿旗面的“起伏”二、核心2 让旗面的“吹动”起来三、几点改进A、 “旗面的摆动幅度”改进01&#xff1a;前后对称B、 “旗面的摆动幅度”改进02&#xff1a;从左往右逐渐增大C、 “旗面的飘动方向”改进01&#xff1a;只让在X轴…

webassembly简单Demo——hello world

参考官网 Emscripten Tutorial 一、创建C/C文件 hello.c #include <stdio.h>int main() {printf("hello, world!\n");return 0; } 二、编译成html 命令行切到hello.c目录下&#xff0c;执行如下命令(注意需要em的环境变量&#xff0c;参考&#xff1a;emsr…

CVE-2023-34541 LangChain 任意命令执行

漏洞简介 LangChain是一个用于开发由语言模型驱动的应用程序的框架。 在LangChain受影响版本中&#xff0c;由于load_prompt函数加载提示文件时未对加载内容进行安全过滤&#xff0c;攻击者可通过构造包含恶意命令的提示文件&#xff0c;诱导用户加载该文件&#xff0c;即可造成…

续篇-docker篇: 优势与安装方式 及基础指令整合

目录 1. 前言简介: 1.1 docker的优势: 1.2 docker的简易理解 2. 指令安装 2.1 安装yum的插件 ps: 提示没权限加上sudo即可 root用户不用 2.2 设置yum仓库地址 ps: 设置多个镜像仓库, 不设置可能会下载变慢 ps: 如图所示 2.3 更新缓存 2.4 安装docker 2.5 查看do…

Learn Mongodb DB功能命令索引等搜索 ⑤

作者 : SYFStrive 博客首页 : HomePage &#x1f4dc;&#xff1a; PHP MYSQL &#x1f4cc;&#xff1a;个人社区&#xff08;欢迎大佬们加入&#xff09; &#x1f449;&#xff1a;社区链接&#x1f517; &#x1f4cc;&#xff1a;觉得文章不错可以点点关注 &#x1f44…

JAVA 初识序列化与反序列化

JAVA 初识序列化与反序列化 目录 JAVA 初识序列化与反序列化初识序列化与反序列化1 概述2 特点/应用场景3 涉及到的流对象4 代码实现序列化与反序列化4.1 步骤1&#xff1a;创建学生类Student4.2 步骤2&#xff1a;创建序列化测试类 5 测试报错NotSerializableException:6 测试…

leetcode 26.删除有序数组中的重复项

⭐️ 题目描述 &#x1f31f; leetcode链接&#xff1a;删除有序数组中的重复项 代码&#xff1a; /*思路&#xff1a;双指针问题[1,1,2]src-> [ 1 , 1 , 2 ]destnums[src] nums[dest] > src;src-> [ 1 , 1 , 2 ]destnums[src]…

计算机视觉:多相机硬件同步拍摄

计算机视觉&#xff1a;多相机硬件同步拍摄 传感器同步硬件同步信号FSYNC信号STROBE信号 硬件接线硬件设备接线步骤&#xff1a; 软件驱动参考文献 传感器同步 目前主要有两种方法来同步不同传感器的信息&#xff08;帧、IMU数据包、ToF等&#xff09;&#xff1a; 硬件同步&…

7.用python写网络爬虫,验证码处理

前言 验证码&#xff08;CAPTCHA&#xff09;的全称为全自动区分计算机和人类的公开图灵测试&#xff08;Completely Automated Public Turing testtotellComputersand Humans Apart&#xff09;从其全称可以看出&#xff0c;验证码用 于测试用户是否为真实人类。一个典型的验证…

基于Web的小学学科数字教学资源管理系统

摘要 小学学科数字教学资源管理是一个典型的学习项目&#xff0c;从教学资源、教材信息的统计和分析&#xff0c;在过程中会产生大量的、各种各样的数据。本文以小学学科数字教学资源管理系统为目标&#xff0c;采用B/S模式&#xff0c;以Springboot为开发框架&#xff0c;java…

STM32单片机(五)第二节:EXTI外部中断练习2(旋转编码器计次)

❤️ 专栏简介&#xff1a;本专栏记录了从零学习单片机的过程&#xff0c;其中包括51单片机和STM32单片机两部分&#xff1b;建议先学习51单片机&#xff0c;其是STM32等高级单片机的基础&#xff1b;这样再学习STM32时才能融会贯通。 ☀️ 专栏适用人群 &#xff1a;适用于想要…