自定义FileInputFormat始终将一个文件平面分配给一个插槽



我一直在我们的S3存储桶中写下Protobuf记录。我想使用FLINK DATASET API从中读取。因此,我实现了一个自定义的fileInputFormat来实现这一目标。代码如下。

public class ProtobufInputFormat extends FileInputFormat<StandardLog.Pageview> {
    public ProtobufInputFormat() {
    }
    private transient boolean reachedEnd = false;
    @Override
    public boolean reachedEnd() throws IOException {
        return reachedEnd;
    }
    @Override
    public StandardLog.Pageview nextRecord(StandardLog.Pageview reuse) throws IOException {
        StandardLog.Pageview pageview = StandardLog.Pageview.parseDelimitedFrom(stream);
        if (pageview == null) {
            reachedEnd = true;
        }
        return pageview;
    }
    @Override
    public boolean supportsMultiPaths() {
        return true;
    }
}

public class BatchReadJob {
    public static void main(String... args) throws Exception {
        String readPath1 = args[0];
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ProtobufInputFormat inputFormat =  new ProtobufInputFormat();
        inputFormat.setNestedFileEnumeration(true);
        inputFormat.setFilePaths(readPath1);
        DataSet<StandardLog.Pageview> dataSource = env.createInput(inputFormat);
        dataSource.map(new MapFunction<StandardLog.Pageview, String>() {
            @Override
            public String map(StandardLog.Pageview value) throws Exception {
                return value.getId();
            }
        }).writeAsText("s3://xxx", FileSystem.WriteMode.OVERWRITE);
        env.execute();
    }
}

问题在于,Flink总是将一个文件平面分配给一个并行性插槽。换句话说,它总是处理与并行数量相同数量的文件拆分。

我想知道实现自定义fileInputformat的正确方法。

谢谢。

我相信您看到的行为是因为ExecutionJobVertex调用FileInputFormat. createInputSplits()方法,其minNumSplits参数等于顶点(数据源)并行性。因此,如果您想要不同的行为,则必须覆盖createInputSplits方法。

尽管您没有说您实际想要的行为。例如,如果您只需要每个文件一个拆分,则可以覆盖FileInputFormat子类中的testForUnsplittable()方法始终返回true;它还应将(受保护的)unsplittable布尔值设置为true。

最新更新