Flink(56):Flink高级特性之文件写入(File Sink) |
您所在的位置:网站首页 › 抖音推广报价表格 › Flink(56):Flink高级特性之文件写入(File Sink) |
目录 0. 相关文章链接 1. 文件写入介绍 2. 案例演示 0. 相关文章链接Flink文章汇总 1. 文件写入介绍官网介绍:Apache Flink 1.12 Documentation: File Sink 新的 Data Sink API (Beta) 之前发布的 Flink 版本中[1],已经支持了 source connector 工作在流批两种模式下,因此在 Flink 1.12 中,社区着重实现了统一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 what 和 how:SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);Committer 和 GlobalCommitter,封装了如何处理 committables。框架会负责 when 和 where:即在什么时间,以及在哪些机器或进程中commit。 这种模块化的抽象允许为 BATCH 和 STREAMING 两种执行模式,实现不同的运行时策略,以达到仅使用一种 sink 实现,也可以使两种模式都可以高效执行。Flink 1.12 中,提供了统一的 FileSink connector,以替换现有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也将逐步迁移到新的接口。 Flink 1.12的 FileSink 为批处理和流式处理提供了一个统一的接收器,它将分区文件写入Flink文件系统抽象所支持的文件系统。这个文件系统连接器为批处理和流式处理提供了相同的保证,它是现有流式文件接收器的一种改进。 2. 案例演示 import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import java.util.concurrent.TimeUnit; /** * Desc */ public class FileSinkDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10)); env.setStateBackend(new FsStateBackend("file:///D:/ckp")); //2.source DataStreamSource lines = env.socketTextStream("node1", 9999); //3.sink //设置sink的前缀和后缀 //文件的头和文件扩展名 //prefix-xxx-.txt OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix") .withPartSuffix(".txt") .build(); //设置sink的路径 String outputPath = "hdfs://node1:8020/FlinkFileSink/parquet"; final FileSink sink = FileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder("UTF-8")) .withBucketAssigner(new DateTimeBucketAssigner()) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .withOutputFileConfig(config) .build(); lines.sinkTo(sink).setParallelism(1); env.execute(); } }注:此博客根据某马2020年贺岁视频改编而来 -> B站网址 注:其他相关文章链接由此进 -> Flink文章汇总 |
今日新闻 |
点击排行 |
|
推荐新闻 |
图片新闻 |
|
专题文章 |
CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭 |