将dask数组提交到分布式客户端,同时使用结果



我有代表视频帧的dask数组,并希望创建多个视频文件。我使用的是imageio库,它允许我将帧"附加"到ffmpeg子进程中。所以我可能有这样的东西:

my_frames = [[arr1f1, arr1f2, arr1f3], [arr2f1, arr2f2, arr2f3], ...]

因此,每个内部列表代表一个视频(或产品)的帧。我正在寻找发送/提交要计算的帧的最佳方式,同时在它们完成时(按顺序)将帧写入imageio。更复杂的是,上面的内部列表实际上是生成器,可以是100秒或1000秒的帧。还要记住,由于imageio的工作方式,我认为它需要存在于一个单独的过程中。以下是我迄今为止所做工作的简化版本:

for frame_arrays in frames_to_write:
# 'frame_arrays' is [arr1f1, arr2f1, arr3f1, ...]
future_list = _client.compute(frame_arrays)
# key -> future
future_dict = dict(zip(frame_keys, future_list))
# write the current frame
# future -> key
rev_future_dict = {v: k for k, v in future_dict.items()}
result_iter = as_completed(future_dict.values(), with_results=True)
for future, result in result_iter:
frame_key = rev_future_dict[future]
# get the writer for this specific video and add a new frame
w = writers[frame_key]
w.append_data(result)

这是可行的,我的实际代码是从上面重新组织的,以便在编写当前框架的同时提交下一个框架,所以我认为这有一些好处。我正在考虑一种解决方案,用户说"我想一次处理X帧",所以我发送50帧,写入50帧,再发送50帧、写入50帧等。

我在这方面工作了一段时间后的问题:

  1. result的数据何时存在于本地内存中?迭代器何时返回或何时完成
  2. 有没有可能用dask核心线程调度程序做这样的事情,这样用户就不必分布式安装了
  3. 是否可以根据工作人员的数量调整发送的帧数
  4. 有没有办法发送一个dask数组的字典和/或使用包含"frame_key"的as_completed
  5. 如果我加载整个系列的帧并将它们提交给客户端/集群,我可能会杀死调度器,对吧
  6. ValueError上使用get_client()然后使用Client()是获得客户端的首选方式吗(如果用户不提供)
  7. 有没有可能给dask/distributed一个或多个迭代器,让它在工作者可用时从中提取迭代器
  8. 我是在装聋作哑吗?克服这一点

注意:这是我不久前对这个问题的扩展,但略有不同。

在下面的许多例子之后,我得到了以下内容:

try:
# python 3
from queue import Queue
except ImportError:
# python 2
from Queue import Queue
from threading import Thread
def load_data(frame_gen, q):
for frame_arrays in frame_gen:
future_list = client.compute(frame_arrays)
for frame_key, arr_future in zip(frame_keys, future_list):
q.put({frame_key: arr_future})
q.put(None)
input_q = Queue(batch_size if batch_size is not None else 1)
load_thread = Thread(target=load_data, args=(frames_to_write, input_q,))
remote_q = client.gather(input_q)
load_thread.start()
while True:
future_dict = remote_q.get()
if future_dict is None:
break
# write the current frame
# this should only be one element in the dictionary, but this is
# also the easiest way to get access to the data
for frame_key, result in future_dict.items():
# frame_key = rev_future_dict[future]
w = writers[frame_key]
w.append_data(result)
input_q.task_done()
load_thread.join()

这回答了我的大部分问题,并且似乎以我想要的方式工作。

相关内容

最新更新