我有CSV文件被推到Google Storage和PubSub订阅中,该订阅通知我到达时。我要完成的工作是编写一个光束程序,该程序将从PubSub订阅中获取JSON数据,然后从GS读取CSV文件,然后处理这些文件。我有一个过程,可以处理PubSub的过程,然后将其处理为PCollection。到目前为止,我都有:
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
final String output = options.getOutput();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.readStrings().fromSubscription(StaticValueProvider.of("beamsub")));
PCollection<String> files = input.apply(ParDo.of(new ParseOutGSFiles()));
现在我需要做这样的事情:
pipeline.apply("ReadLines", TextIO.read().from(FILEsFROMEARLIER).withCompressionType(TextIO.CompressionType.GZIP))
任何想法还是不可能...似乎应该很容易
预先感谢
表达您读取的自然方法是使用textio.readall()方法,该方法从文件名的输入pcollection中读取文本文件。该方法已在Beam Codebase中引入,但目前不在发布版本中。它将包含在Beam 2.2.0版本和相应的DataFlow 2.2.0版本中。
您的结果代码看起来像
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation().as(Options.class);
final String output = options.getOutput();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> files = pipeline
.apply(PubsubIO.readStrings().fromSubscription("beamsub"))
.apply(ParDo.of(new ParseOutGSFiles()));
PCollection<String> contents = files
.apply(TextIO.readAll().withCompressionType(TextIO.CompressionType.GZIP));