- 新建类
package test01;import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;public class TestOutputFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());executionEnvironment.setParallelism(1);//监听数据端口DataStreamSource<String> dataSource = executionEnvironment.socketTextStream("localhost", 9999);//开启checkpoint,这样到了一定节点就会关闭文件,否则文件一直都是inprogress,此处设置的检查点是2秒。executionEnvironment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);//输出至文件FileSink<String> fileSink = FileSink//设置按行输出,指定输出的路径及编码格式,这里的泛型指定的是字符串类型。.<String>forRowFormat(new Path("D:/IT/testfilnk"), new SimpleStringEncoder<>("UTF-8"))//设置输出文件名的前缀和后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("test-flink-output-").withPartSuffix(".log").build())//设置文件滚动策略,这里设置的是20s和1024B(1KB),滚动策略满足其一就会重新写新文件。.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(20)).withMaxPartSize(new MemorySize(1024)).build()).build();dataSource.sinkTo(fileSink);executionEnvironment.execute();}
}
- 启动程序并启动nc -lp
输入数据:
正在写入的文件会有inprogress的标识(在指定的目录下生成文件时会按照日期的年月日时进行分目录,因为我在执行时的时间是2023/7/12 22点,所以它就会自动生成一个2023-07-12--22目录,分桶策略也可以自己在代码中配置。):
当满足滚动策略时,会结束当前文件,然后重新写入新文件:
查看文件内容: