流式处理文件接收器不将数据摄取到 s3



>我创建了简单的摄取服务,该服务使用StreamingFileSink选择本地文件并摄取到s3。

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html

我已经按照文档设置了所有内容,但它不起作用。我在本地路径上使用接收器位置测试了另一个本地位置,文件正在到达那里(但隐藏为.part文件(

这是否意味着零件文件也会发送到 s3 但不可见?

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String path = "/tmp/component_test";
MyFileInputFormat myFileInputFormat = new MyFileInputFormat(new Path(path));
myFileInputFormat.setNumSplits(1);
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(myFileInputFormat,
FileProcessingMode.PROCESS_CONTINUOUSLY,
env.getParallelism(), 1000);

// the monitor has always DOP 1
DataStream<TimestampedFileInputSplit> splits = env.addSource(monitoringFunction);
ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(myFileInputFormat);
TypeInformation<String> typeInfo = new SimpleStringSchema().getProducedType();
// the readers can be multiple
DataStream<String> content = splits.transform("FileSplitReader", typeInfo, reader);
SingleOutputStreamOperator<Tuple2<String, String>> ds = content.flatMap(
new XMLSplitter());

//new Path("s3://<bucket_name>/raw/")
//new Path("file:///tmp/raw/")
StreamingFileSink<Tuple2<String, String>> sink = StreamingFileSink
.forRowFormat(new Path("s3a://<bucket-name>/raw/"),
(Tuple2<String, String> element, OutputStream stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.f1);
})
// Determine component type for each record
.withBucketAssigner(new ComponentBucketAssigner())
.withRollingPolicy(DefaultRollingPolicy.create().withMaxPartSize(100).withRolloverInterval(1000).build())
.withBucketCheckInterval(100)
.build();
ds.addSink(sink);       
FileSystem.initialize(GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR")));
env.execute();
...

我正在 s3 中查找零件文件,还是需要对 StreamingFileSink 进行任何更改才能滚动最小大小的零件文件?

09:37:39,387 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 34d46d2671c996d6150d88a2f74b4218 (7558 bytes in 38 ms).
09:37:39,388 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 received completion notification for checkpoint with id=1.
09:37:39,389 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 1 received completion notification for checkpoint with id=1.
09:37:39,390 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 2 received completion notification for checkpoint with id=1.
09:37:39,391 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 3 received completion notification for checkpoint with id=1.
09:37:39,391 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER1>/part-1-0 with MPU ID CEYMmUslgCnA2KcD5pslz.7dpaQuCAqmTJo6oDPv7P.Rj45O4tHrVTfDQMABxrRvdWSTwO2RoIR.r9VP2s4IMxlPtHz9r6CP_iQ7.DcP9yGDLjIN1gaLPTunAhVGuGen
09:37:39,391 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER2>/part-0-0 with MPU ID ExM_.cfOZVvXHHGNakUeshSQrkLFtm3HytooPAxDet1MoXBEJYhxlEJBYyXFmeSpk7b.ElmoydrMgotnpZAgmsh6lGhQgMYoS2hFJtOZLtPCOLyJvOt3TKRecc8YqSAJ
09:37:39,391 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER3>/part-2-0 with MPU ID 64._ocicEwPAwrMrI_LXcKyEfqYtISKsLsheAjgXwGdpf3qTH0qvOM2C3k8s2L6UDJ8yZfm9YEJhopgQIrL0hmFokCyMa49bzUbhgm3KQmiCVe9CoNiTEb4ETnEJCZFA
09:37:39,393 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 4 received completion notification for checkpoint with id=1.
09:37:39,394 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER4>/part-3-0 with MPU ID yuFGGVfh9YOL36mUUTIAyyLehCMyQGrYoabdv0BBe.e3uCIkLYLI6S4RfnCGtFsT2pjiEJq97bfftMycp4wGW5KKX4jsrmZAfK.kqiYnMUeWWcolXKmWOktVvwHvmSpB
09:37:39,394 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 5 received completion notification for checkpoint with id=1.
09:37:39,395 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 6 received completion notification for checkpoint with id=1.
09:37:39,394 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER5>/part-4-0 with MPU ID Ab7sTpLJp3fNCCYVXe2nUO5qWmYxMeYQlOssRpeawoY2LDV.a58eShdp.Anfe6YxTnVIewCmReKiYSguJS2SlBxwNRPh2ax50nCXuSdfkyVazgiNMZYMUQJjbzTxgdYW
09:37:39,395 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER6>/part-5-0 with MPU ID xDbouvLhpX7q9rFrs9y93lc7wWO20L5mxKTCWFBAmAVkTWzEiGEu2bU5H2nnCrZWbcPDMePSdpOBK64lVoS8txuhLFtq_nkBfXIs2K6OY6NuTtiSDGWi4SrWwnedC6RM
09:37:39,395 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 7 received completion notification for checkpoint with id=1.
09:37:39,397 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER7>/part-6-0 with MPU ID 0uZ35XrL2ShWxZL5nlY3Z1KHTSHBsQhiaJ6HZ9CbzfgxFIf7bwRNjdGHQHWPs9N0WfcpQXBM12XbNENjfILXQ6CLCx0XZrgvGHakUgeWhfeBiOURrO8xUVMT1ot7gxIY

StreamingFileSink 仅在启用了检查点的情况下起作用。零件文件作为检查点过程的一部分完成。

文档最近已更新以解释这一点,但目前仅在文档的夜间版本中可用:https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html

最新更新