我们可以使用Windows + GroupBy或State和及时在apache-beam的批处理管道中打破融合黑白ParDo吗?



上下文:我有N个请求,我需要为其放置提取请求(FetchData()ParDo),该请求反过来返回一个结果,我可以使用该结果下载数据(DownloadData()Paredo)。现在这些ParDo正在被融合,因为单个工作放置获取请求&下载数据,然后再次发出请求&下载数据等等。

因此,我想将这些步骤并行化,以便在下一步下载一些数据时,一旦我从fetch步骤+fetch步骤获得结果,就开始下载数据,以放置另一个请求。

尝试打破融合:

request
| 'Fetch' >> beam.ParDo(FetchData())
| "GlobalWindow" >> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(
trigger.AfterAny(
trigger.AfterProcessingTime(int(1.0 * 1)),
trigger.AfterCount(1)
)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'GroupBy' >> beam.GroupBy()
| 'Download' >> beam.ParDo(DownloadData())

事实上,我想打破w.r.t.FetchData()&DownloadData()ParDo,所以我想到了这种方法来拥有一个GlobalWindows()&然后使用GroupBy()对每个窗口元素进行分组,并将其进一步发送到DownloadData()ParDo,而FetchData()则并行工作。

但我在这里观察到的是,GroupBy()在将其进一步发送到DownloadData()ParDo之前,会累积所有元素(在其步骤首先处理之前等待所有元素)。

我做的事对吗?无论如何都要让GroupBy()提前返回?或者有人有其他方法来实现我的目标吗?

更新:

尝试-2使用states&及时:

request
| 'Fetch' >> beam.ParDo())
| "SetRequestKey" >> beam.ParDo(SetRequestKeyFn())
| 'RequestBucket' >> beam.ParDo(RequestBucket())
| 'Download' >> beam.ParDo(DownloadData())
#Sets the request_id as the key
class SetRequestKeyFn(beam.DoFn):
def process(self, element):
return element[2]['href'], element

class RequestBucket(beam.DoFn):
"""Stateful ParDo for storing requests."""
REQUEST_STATE = userstate.BagStateSpec('requests', DillCoder())
EXPIRY_TIMER = userstate.TimerSpec('expiry_timer', userstate.TimeDomain.REAL_TIME)
def process(self,
element,
request_state=beam.DoFn.StateParam(REQUEST_STATE),
timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):
logger.info(f"Adding new state {element[0]}.")
request_state.add(element)
# Set a timer to go off 0 seconds in the future.
timer.set(Timestamp.now() + Duration(seconds=0))
@userstate.on_timer(EXPIRY_TIMER)
def expiry_callback(self, request_state=beam.DoFn.StateParam(REQUEST_STATE)):
""""""
requests = list(request_state.read())
request_state.clear()
logger.info(f'Yielding for {requests!r}...')
yield requests[0]

在这里,该SetRequestKeyFn() ParDo还在其步骤之前等待所有元素首先被处理,然后将其进一步发送到RequestBucketParDo。

在批处理中,所有的融合屏障(包括GroupByKey)都是全局屏障,即上游的一切都在下游的一切开始之前完成。

如果问题是FetchData()具有高扇出,那么可以做的一件事是尝试提前拆分廉价扇出,然后添加一个重组,即

request
| ComputeFetchOperations()
| beam.Reshuffle()
| FetchOne()
| DownloadData()
...

这仍然会融合FetchOneDownloadData操作,但其他获取可以由其他线程(或工作线程)并行处理。

您还可以考虑在apachebeam中执行异步API调用中描述的多线程DoFn。

另一种选择可能是尝试将其写成流媒体管道,尽管这可能会带来额外的复杂性。

相关内容

最新更新