Apache Beam Pipeline写到多个sink



我有一个场景,我需要做以下事情:

  1. 从存储在GCS中的文件中读取数据
  2. 对数据应用多个转换
  3. 在Google大查询中持久化PCollection。
  4. 当步骤3。成功,写PCollection到卡夫卡的主题。

我的问题是我怎样才能实现它在Apache梁。我的主要要求是,如果插入大查询成功,我只能写Kafka。

我发现了类似的东西:https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample/blob/HEAD/BeamAvro/src/main/java/com/google/cloud/solutions/beamavro/AvroToBigQuery.java

,数据被写入GCS和BQ。但是我的要求是写BQ成功后才写Kafka。有没有人知道这是可能的,我如何才能实现它?

您最好的选择是使用由BigQueryIO.Write转换产生的WriteResult。它生成成功插入的PCollections,然后你可以将其写入Kafka(通过一些类型转换)。以下是要点:

WriteResult writeResult = pcol.apply(
"InsertIntoBigQuery",
BigQueryIO.writeTableRows());
PCollection<MyType> myType = writeResult
.getSuccessfulInserts()
.apply(
"ConvertFromTableRowToMyType",
/* Transform goes here. */);

从那里你只输出元素到Kafka,这样你应该只有成功写入BigQuery的元素才会写入Kafka。

最新更新