Apache Beam DirectRunner支持对不同ParDo/DoFn进行多线程处理



我正在将Apache Beam与DirectRunner一起使用。我有5个用ParDos包裹的DoFns,它们被一个接一个地应用。当Pipeline运行((时,首先DoFn处理它的所有工作,然后第二个,然后第三个。我希望第二个DoFn在第一个发出输出后立即开始工作,以便并行处理,因为目前在任何给定时间最多有一个DoFn在工作(相关:Apache Beam中DoFn的线程同步(。

val pipelineOptions = PipelineOptionsFactory
.fromArgs("--streaming", "--experiments=use_runner_v2")
.withValidation()
.create()
.`as`(DirectOptions::class.java)
pipelineOptions.isBlockOnRun = true
pipelineOptions.isEnforceEncodability = true
pipelineOptions.isEnforceImmutability = true
pipelineOptions.targetParallelism = Runtime.getRuntime().availableProcessors() * 2
pipelineOptions.appName = name
val pipeline = Pipeline.create(pipelineOptions)
pipeline
.apply(...)
.apply(...)
.apply(...)
.apply(...)
.apply(...)
.apply(...)
pipeline
.run()
.waitUntilFinish()

Java DirectRunner已经独立异步地执行每个阶段。它有固定数量的工作线程。你所经历的是由你的输入都在一个捆绑包中引起的,所以它们都是一起提交的。DirectRunner不会重新绑定输入,因此绑定将一起通过管道。

如果您有一个真正的流式无边界数据源,DirectRunner将处理从源发出的每个束。

最新更新