Apache Beam从Pub/sub Json获取CSV文件



我有CSV文件被推到Google Storage和PubSub订阅中,该订阅通知我到达时。我要完成的工作是编写一个光束程序,该程序将从PubSub订阅中获取JSON数据,然后从GS读取CSV文件,然后处理这些文件。我有一个过程,可以处理PubSub的过程,然后将其处理为PCollection。到目前为止,我都有:

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
final String output = options.getOutput();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.readStrings().fromSubscription(StaticValueProvider.of("beamsub")));

  PCollection<String> files = input.apply(ParDo.of(new ParseOutGSFiles()));

现在我需要做这样的事情:

pipeline.apply("ReadLines", TextIO.read().from(FILEsFROMEARLIER).withCompressionType(TextIO.CompressionType.GZIP))

任何想法还是不可能...似乎应该很容易

预先感谢

表达您读取的自然方法是使用textio.readall()方法,该方法从文件名的输入pcollection中读取文本文件。该方法已在Beam Codebase中引入,但目前不在发布版本中。它将包含在Beam 2.2.0版本和相应的DataFlow 2.2.0版本中。

您的结果代码看起来像

Options options = PipelineOptionsFactory.fromArgs(args)
    .withValidation().as(Options.class);
final String output = options.getOutput();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> files = pipeline
    .apply(PubsubIO.readStrings().fromSubscription("beamsub"))
    .apply(ParDo.of(new ParseOutGSFiles()));
PCollection<String> contents = files
    .apply(TextIO.readAll().withCompressionType(TextIO.CompressionType.GZIP));

最新更新