多处理 - 取消池中的剩余作业,而不销毁池



我正在使用map_async创建一个由 4 个工作人员组成的池。并为其提供要处理的图像文件列表 [Set 1]。
有时,我需要取消中间的处理,以便我可以处理一组不同的文件 [Set 2]。

举个例子,我给了map_async 1000 个文件来处理。然后希望在处理了大约 200 个文件后取消对剩余作业的处理。
此外,我想在不破坏/终止池的情况下进行此取消。这可能吗?

我不想终止池,因为在 Windows 上重新创建池是一个缓慢的过程(因为它使用"spawn"而不是"fork"(。我需要使用相同的池来处理一组不同的图像文件 [Set 2]。

# Putting job_set1 through processing. It may consist of 1000 images
cpu = multiprocessing.cpu_count()
pool = Pool(processes=cpu)
result = pool.map_async(job_set1, thumb_ts_list, chunksize=chunksize)

现在在两者之间,我需要取消此集合 1 的处理。并转到不同的集合(等待所有 1000 张图像完成处理不是一种选择,但我可以等待当前正在处理的图像完成(

<Somehow cancel processing of job_set1>
result = pool.map_async(job_set2, thumb_ts_list, chunksize=chunksize)

现在是软件工程基本定理的时候了:虽然multiprocessing.Pool不提供取消作为一项功能,但我们可以通过从精心设计的可迭代对象中读取Pool来添加它。 然而,仅仅有一个生成器从列表中yields值,但在某些信号上停止是不够的,因为Pool急切地耗尽提供给它的任何生成器。 因此,我们需要一个非常精心制作的迭代对象。

懒惰的Pool

我们需要的通用工具是一种仅在工作人员可用时为Pool构建任务的方法(或者最多提前一个任务,以防构建它们需要大量时间(。 基本思想是减慢Pool的线程收集工作,仅在任务完成时才增加信号量。 (我们从imap_unordered的可观察行为中知道存在这样的线程。

import multiprocessing
from threading import Semaphore
size=multiprocessing.cpu_count()  # or whatever Pool size to use
# How many workers are waiting for work?  Add one to buffer one task.
work=Semaphore(size)
def feed0(it):
it=iter(it)
try:
while True:
# Don't ask the iterable until we have a customer, in case better
# instructions become available:
work.acquire()
yield next(it)
except StopIteration: pass
work.release()
def feed(p,f,it):
import sys,traceback
iu=p.imap_unordered(f,feed0(it))
while True:
try: x=next(iu)
except StopIteration: return
except Exception: traceback.print_exception(*sys.exc_info())
work.release()
yield x

feed中的try可防止子项中的故障破坏信号量的计数,但请注意,它不能防止父项中的故障。

可取消迭代器

现在,我们可以实时控制Pool输入,使任何调度策略都变得简单明了。 例如,下面是类似于itertools.chain但能够异步丢弃其中一个输入序列中的任何剩余元素:

import collections,queue
class Cancel:
closed=False
cur=()
def __init__(self): self.data=queue.Queue() # of deques
def add(self,d):
d=collections.deque(d)
self.data.put(d)
return d
def __iter__(self):
while True:
try: yield self.cur.popleft()
except IndexError:
self.cur=self.data.get()
if self.cur is None: break
@staticmethod
def cancel(d): d.clear()
def close(self): self.data.put(None)

尽管没有锁定,但这是线程安全的(至少在 CPython 中(,因为像deque.clear这样的操作对于 Python 检查来说是原子的(我们不会单独检查self.cur是否为空(。

用法

使其中一个看起来像

pool=mp.Pool(size)
can=Cancel()
many=can.add(range(1000))
few=can.add(["some","words"])
can.close()
for x in feed(pool,assess_happiness,can):
if happy_with(x): can.cancel(many)  # straight onto few, then out

当然,addS 和close本身可以在循环中。

multiprocessing模块似乎没有取消的概念。您可以使用concurrent.futures.ProcessPoolExecutor包装器,并在获得足够的结果时取消待处理期货。

下面是一个示例,它从一组路径中挑选出 10 个 JPEG,并取消待处理的未来,同时让流程池在之后可用:

import concurrent.futures

def interesting_path(path):
"""Gives path if is a JPEG else ``None``."""
with open(path, 'rb') as f:
if f.read(3) == b'xffxd8xff':
return path
return None

def find_interesting(paths, count=10):
"""Yields count from paths which are 'interesting' by multiprocess task."""
with concurrent.futures.ProcessPoolExecutor() as pool:
futures = {pool.submit(interesting_path, p) for p in paths}
print ('Started {}'.format(len(futures)))
for future in concurrent.futures.as_completed(futures):
res = future.result()
futures.remove(future)
if res is not None:
yield res
count -= 1
if count == 0:
break
cancelled = 0
for future in futures:
cancelled += future.cancel()
print ('Cancelled {}'.format(cancelled))
concurrent.futures.wait(futures)
# Can still use pool here for more processing as needed

请注意,选择如何将工作分解为期货仍然很棘手,更大的集合开销更大,但也意味着更少的浪费工作。这也可以很容易地适应 Python 3.6 异步语法。

相关内容

  • 没有找到相关文章