Flink DataStream之输出数据到File中

  • 新建类
package test01;import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;public class TestOutputFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());executionEnvironment.setParallelism(1);//监听数据端口DataStreamSource<String> dataSource = executionEnvironment.socketTextStream("localhost", 9999);//开启checkpoint,这样到了一定节点就会关闭文件,否则文件一直都是inprogress,此处设置的检查点是2秒。executionEnvironment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);//输出至文件FileSink<String> fileSink = FileSink//设置按行输出,指定输出的路径及编码格式,这里的泛型指定的是字符串类型。.<String>forRowFormat(new Path("D:/IT/testfilnk"), new SimpleStringEncoder<>("UTF-8"))//设置输出文件名的前缀和后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("test-flink-output-").withPartSuffix(".log").build())//设置文件滚动策略,这里设置的是20s和1024B(1KB),滚动策略满足其一就会重新写新文件。.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(20)).withMaxPartSize(new MemorySize(1024)).build()).build();dataSource.sinkTo(fileSink);executionEnvironment.execute();}
}
  • 启动程序并启动nc -lp

输入数据:

正在写入的文件会有inprogress的标识(在指定的目录下生成文件时会按照日期的年月日时进行分目录,因为我在执行时的时间是2023/7/12 22点,所以它就会自动生成一个2023-07-12--22目录,分桶策略也可以自己在代码中配置。):

 当满足滚动策略时,会结束当前文件,然后重新写入新文件:

查看文件内容:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/26842.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023年Q2京东厨卫大电市场分析报告(京东运营数据分析)

随着新产品推广和消费需求升级&#xff0c;今年Q2&#xff0c;京东厨卫大电市场的销售额突破百亿&#xff0c;从同环比来看均呈增长趋势。百亿市场中&#xff0c;油烟机、电热水器、燃气热水器这三大品类占据较大份额&#xff0c;这一期&#xff0c;我们重点来看一下京东厨卫大…

金蝶云星空大福利:奥威BI金蝶云星空SaaS版,开箱即用!

奥威BI金蝶云星空SaaS 版是一种基于金蝶云星空平台的BI大数据分析解决方案&#xff0c;主要特点是不用下载安装软件&#xff0c;开箱即用&#xff0c;普遍适用于零售、快消、餐饮、服务连锁、制造等行业。 不需准备环境、不需下载软件&#xff0c;简单四步&#xff0c;完成系统…

什么企业适合建设数据中台?

从历史脉络中&#xff0c;看到数据中台凸显价值&#xff0c;数据中台是大数据下一站。所有企业都适合建设数据中台吗&#xff1f;什么样应该建数据中台&#xff1f; 2018年我们在建数据中台前面临的窘境&#xff0c;通过了解我们建数据中台的背景&#xff0c;你也可以对照着看…

微信小程序活动抽奖简单实现,包教包会

问题&#xff1a; 针对用户使用活动抽奖&#xff0c;获得抽奖得到商品的成就以及满足感&#xff0c;那么我们应该怎么去实现小程序去转盘抽奖活动呢 例如&#xff1a;项目需要抽奖实现相应的奖品奖励 实现方法 实现的效果如下&#xff1a; 实现的主要代码&#xff1a; Page…

上手vue2的学习笔记5之在vue2项目中调用elment-ui

前言 上手vue2的学习笔记4之搭建vue环境 参考链接&#xff1a;vue2.0项目引入element-ui 一、安装elment-ui 进入搭建的vue项目中 cd vue_bing_test 安装 element npm i element-ui二、引入elment-ui elment官方教程 将main.js改成如下内容&#xff1a; import Vue fro…

行业报告 | AI 赋能,人形机器人产业提速,把握产业链受益机会(上)

文 | BFT机器人 01 核心观点 核心观点: 人形机器人产业发展仍处于 0-1 阶段&#xff0c;当前行业投资逻辑偏向事件驱动型的主题投资&#xff0c;但可落地服务场景的人形机器人成长空间非常广阔&#xff0c;值得长期关注。本文将围绕以下热点问题作出讨论: D当前节点人形机器人产…

常用API学习03(Java)

String char charAt(int index) 返回char指定索引处的值 char[] toCharArray() 将此字符串转换为新的字符数组 int compareTo(String anotherString) 按字典顺序比较两个字符串 boolean contains(CharSequence s) 当且仅当此字符串包含指定的char值序列才返…

C语言程序设计——字符、字符串、内存函数

一、长度不受限的字符串函数 1. strlen size_t strlen(const char* str); 功能&#xff1a;求字符串长度 &#xff08;1&#xff09;字符串以\0作为结束标志&#xff0c;strlen函数返回的是在字符串中\0之前出现的字符个数&#xff08;不包含\0&#xff09;。 &#xff08…

字符串相加(力扣)

Problem: 415. 字符串相加 文章目录 思路Code复杂度运行结果 思路 创建一个StringBuilder对象使用append方法追加每位数字相加&#xff0c;使用双指针的方式&#xff0c;指针i&#xff0c;j分别指向num1和num2的每位数字&#xff0c;从后往前&#xff0c;进位用carry存储着。 …

感知网络安全态势是什么?感知网络安全态势如何实施

网络安全是当今社会中一个非常重要的话题。随着互联网的普及和信息技术的发展&#xff0c;网络安全问题日益突出。为了有效应对各种网络威胁和攻击&#xff0c;网络安全态势感知成为了一种关键的技术手段。 网络安全态势感知的定义 网络安全态势感知是指通过对网络环境中的各种…

微服务保护

一、初识 Sentinel 1. 雪崩问题及解决方案 微服务调用链路中的某个服务故障&#xff0c; 引起整个链路中的所有微服务都不 可用&#xff0c;这就是雪崩 常见解决方式有四种&#xff1a; ① 超时处理&#xff1a;设定超时时间&#xff0c;请求超 过一定时间没有响应就返回错误信…

同步和异步的区别

同步&#xff0c;可以理解为在执行完一个函数或方法之后&#xff0c;一直等待系统返回值或消息&#xff0c;这时程序是处于阻塞的&#xff0c;只有接收到返回的值或消息后才往下执行其他的命令&#xff1b; 异步&#xff0c;执行完函数或方法后&#xff0c;不必阻塞性地等待返回…