我正在努力将Google Pub/sub数据发送到Apache Beam。这是我的基本代码。
p.begin()
.apply("Input", PubsubIO.readAvros(CmgData.class).fromTopic("topicname"))
.apply("Transform", ParDo.of(new TransformData()))
.apply("Write", BigQueryIO.writeTableRows()
.to(table)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
显然,Apache Beam认为数据是未结合的,因为它来自订阅,但是我想批处理数据并将其发送。有很多不同的物品有限,例如: - pcollection.isbounded(https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/sdk/values/pcollection/pcollection.isbounded.html( - 似乎对写作没有影响。
boundedReadReadFromunBoundedSource-(https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/io/boundedreadreadreadreadreadreadreadreadreadreadreadreadreadreadreadreadreadreadreadreadreadreadreadfromunboundedsource.html( - 找不到转换的方式PCollection到有限的源,反之亦然。
boundedwindow-(https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/sdk/transforms/windowing/boundedwindow.html( - 找不到工作用法
write.Method-(https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/sdk/io/gcp/gcp/bigquery/bigquery/bigqueryio.write.write.write.write.method.html( - 当我尝试使用它时,会抛出非法的exception。
有人可以将我指向如何声明对象的方向是有界数据,以便我可以批处理它而不仅仅是流?
有关更多详细信息,您可以看到我的其他问题BigQuery WritEtablerows总是写入缓冲区
添加以下三行意味着数据将受到绑定: -
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
.withNumFileShards(1000)