如何在每个工作线程上对延迟的 dask 进行排队以允许按顺序执行进程?



我需要worker一次处理一个任务,并在开始新任务之前完成当前流程。我无法做到:(1)每个工人在任何时候最多运行一项任务,(2)让工人在开始新程序之前完成一个程序;原子事务。

我在具有40个节点的集群上使用dask.distributed Client;每个节点有4个内核和15GB RAM。我处理的管道的任务约为 8-10GB,因此在一个工作上有两个任务将导致应用程序失败。

我试图用dask-worker scheduler-ip:port --nprocs 1 --resources process=1futures = [client.submit(func, f, resources={'process': 1}) for f in futures]为我的工人分配资源和任务分配,但没有成功。

我的代码如下:

import dask
from dask.distributed import Client

@dask.delayed
def load():
...

@dask.delayed
def foo():
...

@dask.delayed
def save():
...
client = Client(scheduler-ip:port)
# Process file from a given path
paths = ['list', 'of', 'path']
results = []
for path in paths:
img = load(path)
for _ in range(n):
img = foo(img)
results.append(save(output-filename))
client.scatter(results)
futures = client.compute(results)
def identity(x):
return x
client.scatter(futures)
futures = [client.submit(same, f, resources={'process': 1}) for f in futures]
client.gather(futures)

截至目前,我有两种情况:

1-我运行所有输入,应用程序以MemoryError终止

2-我运行一个子样本,但它运行如下:

load(img-1)->load(img-2)->foo(img-1)->load(img-3)->...->save(img-1)->save(img-2)->...

TLDR:这就是我想对每个工人做的事情:

加载(img-1)->foo(img-1)->save(img-1)->load(img-7)->...

这里最简单的事情可能是只用一个线程启动你的工作线程

dask-worker ... --nthreads 1

然后该工人一次只会开始一件事

最新更新