我试图将几个任务链接在一起,例如
import ipyparallel
client = ipyparallel.Client()
view = client.load_balanced_view()
def task1(x):
## Do some work.
return x * 2
def task2(x):
## Do some work.
return x * 3
def task3(x):
## Do some work.
return x * 4
results1 = view.map_async(task1, [1, 2, 3])
results2 = view.map_async(task2, results1.get())
results3 = view.map_async(task3, results2.get())
但是,除非完成任务1,否则该代码不会提交任何任务2。我的任务可能需要不同的时间,而且效率很低。有一种简单的方法可以有效地链接这些步骤,并且引擎可以从前一步获得结果?类似:
def task2(x):
## Do some work.
return x.get() * 3 ## Get AsyncResult out.
def task3(x):
## Do some work.
return x.get() * 4 ## Get AsyncResult out.
results1 = [view.apply_async(task1, x) for x in [1, 2, 3]]
results2 = []
for x in result1:
view.set_flags(after=x.msg_ids)
results2.append(view.apply_async(task2, x))
results3 = []
for x in result2:
view.set_flags(after=x.msg_ids)
results3.append(view.apply_async(task3, x))
显然,这将失败,因为异步无法挑剔。
我正在考虑一些解决方案:
使用view.map_async(ordered = false)。
results1 = view.map_async(task1, [1, 2, 3], ordered=False) for x in results1: results2.append(view.apply_async(task2, x.get()))
,这必须等待所有任务1完成,然后才能提交任何任务3。它仍在阻挡。
使用asyncio。
@asyncio.coroutine def submitter(x): result1 = yield from asyncio.wrap_future(view.apply_async(task1, x)) result2 = yield from asyncio.wrap_future(view.apply_async(task2, result1) result3 = yield from asyncio.wrap_future(view.apply_async(task3, result2) yield result3 @asyncio.coroutine def submit_all(ls): jobs = [submitter(x) for x in ls] results = [] for async_r in asyncio.as_completed(jobs): r = yield from async_r results.append(r) ## Do some work, like analysing results.
它正在工作,但是当引入更复杂的任务时,代码很快就会变得凌乱和不直觉。
谢谢您的帮助。
选项1:连锁期货
ipython并行并不是最好的做法,因为连接必须在客户端级别完成。在提交结果之前,您必须等待结果完成并返回客户。从本质上讲,您的Asyncio submit_all是与iPython并行这样做的正确方法。您可以通过编写使用add_done_callback
的chain
函数来获得一些更通用的东西,该功能在上一项完成时提交新任务:
from concurrent.futures import Future
from functools import partial
def chain_apply(view, func, future):
"""Chain a call to view.apply(func, future.result()) when future is ready.
Returns a Future for the subsequent result.
"""
f2 = Future()
# when f1 is ready, submit a new task for func on its result
def apply_func(f):
if f.exception():
f2.set_exception(f.exception())
return
print('submitting %s(%s)' % (func.__name__, f.result()))
ar = view.apply_async(func, f.result())
# when ar is done, pass through the result to f2
ar.add_done_callback(lambda ar: f2.set_result(ar.get()))
future.add_done_callback(apply_func)
return f2
def chain_map(view, func, list_of_futures):
"""Chain a new callback on a list of futures."""
return [ chain_apply(view, func, f) for f in list_of_futures ]
# use builtin map with apply, since we want one Future per item
results1 = map(partial(view.apply, task1), [1, 2, 3])
results2 = chain_map(view, task2, results1)
results3 = chain_map(view, task3, results2)
print("Waiting for results")
[ r.result() for r in results3 ]
与add_done_callback
的任何示例一样,它可以用Coroutines编写,但是在这种情况下,我发现回调很好。这至少应该是一个相当通用的实用程序,您可以用来组合管道。
选项2:dask.distributed
全面披露:我是Ipython Parallel的主要作者,即将建议您使用其他工具。
可以通过引擎名称空间和ipython并行的DAG依赖项将结果传递到另一个任务,但老实说,如果您的工作流程看起来像这样,则应考虑使用Dask分布式,该分布式是专门为此类计算而设计的图形。如果您已经感到舒适且熟悉Ipython的平行线,那么开始Dask的负担就不那么多了。
ipython 5.1提供了一个方便的命令,用于将您的ipython并行群集变成一个dask分布式群集:
import ipyparallel as ipp
client = ipp.Client()
executor = client.become_distributed(ncores=1)
然后,DASK的关键相关功能是您可以将期货作为参数提交给后续地图电话,并且在结果准备就绪时,调度程序可以照顾它,而不是在客户端中明确进行。P>
results1 = executor.map(task1, [1, 2, 3])
results2 = executor.map(task2, results1)
results3 = executor.map(task3, results2)
executor.gather(results3)
基本上,dask分布式有效,您希望ipython并行的负载平衡能够在需要链接这样的东西时起作用。
本笔记本说明了两个示例。