使用Dataflow (Python)发布/订阅BigQuery (Batch)



我在Python中创建了一个流数据流管道,只是想澄清一下下面的代码是否符合我的预期。这就是我要做的:

  1. 从Pub/Sub连续消费
  2. 每1分钟批量加载到BigQuery中,而不是流式加载以降低成本

这是Python

中的代码片段
options = PipelineOptions(
subnetwork=SUBNETWORK,
service_account_email=SERVICE_ACCOUNT_EMAIL,
use_public_ips=False,
streaming=True,
project=project,
region=REGION,
staging_location=STAGING_LOCATION,
temp_location=TEMP_LOCATION,
job_name=f"pub-sub-to-big-query-xxx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
)
p = beam.Pipeline(DataflowRunner(), options=options)

pubsub = (
p
| "Read Topic" >> ReadFromPubSub(topic=INPUT_TOPIC)
| "To Dict" >> Map(json.loads)
| "Write To BigQuery" >> WriteToBigQuery(table=TABLE, schema=schema, method='FILE_LOADS',
triggering_frequency=60, max_files_per_bundle=1,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND))

我可以知道上面的代码是否在做我想做的事情吗?从Pub/Sub流,每隔60秒,它将批量插入到BigQuery中。我特意设置了max_files_per_bundle1,防止创建超过1个分片,以便每分钟只有1个文件被加载,但不确定我是否做得对。Java版本有withnumfilesards选项,但我在Python中找不到等效的选项。我参考了下面的文档:https://beam.apache.org/releases/pydoc/2.31.0/apache_beam.io.gcp.bigquery.html apache_beam.io.gcp.bigquery.WriteToBigQuery

https://cloud.google.com/blog/products/data-analytics/how-to-efficiently-process-both-real-time-and-aggregate-data-with-dataflow

只是好奇我是否应该使用窗口来实现我想要做的事情?

options = PipelineOptions(
subnetwork=SUBNETWORK,
service_account_email=SERVICE_ACCOUNT_EMAIL,
use_public_ips=False,
streaming=True,
project=project,
region=REGION,
staging_location=STAGING_LOCATION,
temp_location=TEMP_LOCATION,
job_name=f"pub-sub-to-big-query-xxx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
)
p = beam.Pipeline(DataflowRunner(), options=options)
pubsub = (
p
| "Read Topic" >> ReadFromPubSub(topic=INPUT_TOPIC)
| "To Dict" >> Map(json.loads)
| 'Window' >> beam.WindowInto(window.FixedWindows(60), trigger=AfterProcessingTime(60),
accumulation_mode=AccumulationMode.DISCARDING)
| "Write To BigQuery" >> WriteToBigQuery(table=TABLE, schema=schema, method='FILE_LOADS',
triggering_frequency=60, max_files_per_bundle=1,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND))

第一个方法是足够好的没有第二个方法的窗口?我现在正在使用第一种方法,但我不确定是否每分钟,它的从多个文件进行多次加载,或者它实际上合并所有pub/sub消息为1,并做单个批量加载?

谢谢!

不是python的解决方案,但我最终求助于Java版本

public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow() {
return new JsonToTableRow();
}
private static class JsonToTableRow
extends PTransform<PCollection<String>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> expand(PCollection<String> stringPCollection) {
return stringPCollection.apply("JsonToTableRow", MapElements.via(
new SimpleFunction<String, TableRow>() {
@Override
public TableRow apply(String json) {
try {
InputStream inputStream =
new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
return TableRowJsonCoder.of().decode(inputStream, Context.OUTER);
} catch (IOException e) {
throw new RuntimeException("Unable to parse input", e);
}
}
}));
}
}

public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
options.setDiskSizeGb(10);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from PubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply(jsonToTableRow())
.apply("WriteToBigQuery", BigQueryIO.writeTableRows().to(options.getOutputTable())
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(1))
.withNumFileShards(1)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
pipeline.run();

最新更新