是否可以在不使用侧输入的情况下在不同的输出汇点写入单个Pcollection



我有一个写管道数据的特定用例。我想做一个Pub/Sub订阅,并想从单个源读取这些数据,并在多个汇点写入Pcollection,而不需要再为其订阅Pub/Sub。我一直想做一条管道,使我在一个数据流中有多个管道并行工作,并写入相同的管道数据,首先是在谷歌云存储,其次是在Bigquery,只需使用一个订阅。相同的代码或参考资料会很有帮助,并为我的工作方向带来光明。

提前感谢!!

您只需要在Beam工作中执行多个接收器即可满足您的需求。

Beam中,您可以构建一个PCollection,然后将该PCollection汇到多个位置:

光束Python:示例

result_pcollection = (inputs | 'Read from pub sub' >> ReadFromPubSub(
subscription=subscription_path) 
| 'Map 1' >> beam.Map(your_map1) 
| 'Map 2' >> beam.Map(your_map2)
)
# Sink to Bigquery
(result_pcollection | 'Map 3' >> beam.Map(apply_transform_logic_bq)
| 'Write to BQ' >> beam.io.WriteToBigQuery(
project=project_id,
dataset=dataset,
table=table,
method='YOUR_WRITE_METHOD',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
)
# Sink to GCS
(result_pcollection | 'Map 4' >> beam.Map(apply_transform_logic_gcs)
| 'Windowing logic' >> WindowInto(FixedWindows(10*60))
|  fileio.WriteToFiles(path=known_args.output)
)

为了能够将流式传输流写入GCS,您需要应用窗口化并为每个窗口生成一个文件。

是的,这肯定是可能的。在Java中,您可以执行以下操作:

PCollection<PubsubMessage> messages = p.apply(PubsubIO.read()...);
// Write messages to GCS
messages.apply(TextIO.write()...);
// Write messages to BQ
messages.apply(BigQueryIO.write()...);

这些消息将只从pubsub消费一次。您可以定义管道的多个分支,这些分支都从同一个PCollection读取。

这里的缺点实际上是错误处理。如果您的BigQuery接收器存在导致管道失败的错误,它也会降低您的GCS输出。当一个管道中有多个汇点时,很难对这些失败场景进行推理。

你提到";首先在谷歌云存储中,其次在Bigquery";;如果写入顺序很重要(如果数据不在GCS中,则不希望数据显示在BQ中(,则表示起来要困难得多,最好创建第二个管道,从第一个管道的GCS输出中读取数据并写入BQ。

最新更新