控制Apache Beam数据流管道中的并行性



我们正在试验Apache Beam(使用Go SDK(和Dataflow来并行化我们的一项耗时任务。对于更多的上下文,我们有一个缓存作业,它接受一些查询,在数据库中运行它并缓存它们。每个数据库查询可能需要几秒钟到几分钟的时间,我们希望并行运行这些查询以更快地完成任务。

创建了一个看起来像这样的简单管道:

// Create initial PCollection.
startLoad := beam.Create(s, "InitialLoadToStartPipeline")
// Emits a unit of work along with query and date range.
cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)
// Emits a cache response which includes errCode, errMsg, time etc.
cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)
...

getCachePayloadsFn发射的数量单位并不多,在生产中大多为数百个,最多为数千个。

现在的问题是cacheQueryDoFn不是并行执行的,而是逐个按顺序执行查询。我们通过在StartBundleProcessElement中放入日志来确认这一点,并在缓存函数中记录goroutine id、进程id、开始和结束时间等,以确认执行中没有重叠。

即使只有10个查询,我们也希望始终并行运行查询。根据我们的理解和文档,它从整体输入创建捆绑包,这些捆绑包并行运行,在捆绑包中按顺序运行。有没有办法从负载中控制捆绑包的数量,或者有没有办法提高并行性?

我们尝试过的东西:

  1. 保留num_workers=2autoscaling_algorithm=None。它启动两个虚拟机,但运行Setup方法仅在一个虚拟机上初始化DoFn,并将其用于整个负载
  2. 在此处找到sdk_worker_parallelism选项。但不知道如何正确设置。尝试用beam.PipelineOptions.Set("sdk_worker_parallelism", "50")设置。没有效果

默认情况下,Create不是并行的,所有DoFn都与Create融合到同一个阶段,因此它们也没有并行性。看见https://beam.apache.org/documentation/runtime/model/#dependent-平行主义的一些更多信息。

可以使用Reshuffle变换显式强制进行融合中断。

最新更新