差不多就是标题。我正在使用pool.map_async运行一个程序,这个程序对我来说是一个黑匣子。它基本上分析文件并输出结果。
有时,我需要对同一个文件进行两次分析。当我多处理这个文件时,黑盒程序会生气,因为两个进程试图同时访问同一个文件。
我无法调试或更改这个黑盒程序,但通过命令行进行多处理,不同调用之间的3-5秒等待(调用到不同的内核(解决了这个问题。
有没有办法告诉map_async不要尽可能快地排队,而是在两次调用之间等待一段指定的时间?
正确的处理方法是使用locks
,请参阅https://docs.python.org/3/library/multiprocessing.html#synchronization-进程间
但这需要更改被调用的进程函数,使它们尊重作为arg提供给它们的锁。如果没有,我几乎看不到让进程等待的替代方案,并且您的解决方案足够好(尽管,它使多处理的想法在某种程度上过时了…(
EDIT:
以下是如何使用锁将黑盒程序包装在包装器中并由多处理池执行的想法。工作被划分为块,以便并行地逐步执行包装器函数。锁表示当一个进程执行black_box时,没有其他black_box同时运行。
如果您知道某些工作块没有冲突,那么您可以在锁之外执行那些black_box实例。
import multiprocessing as mp
import time
from functools import partial
# define 4 chunks of work
work = []
work.append(range(1, 5))
work.append(range(6, 10))
work.append(range(11, 15))
work.append(range(16, 20))
def black_box(i: int):
print(i)
time.sleep(1)
def wrapper(lock, work_chunk: list):
for w in work_chunk:
lock.acquire()
black_box(w)
lock.release()
return f"chunk {work_chunk} done"
if __name__ == '__main__':
m = mp.Manager()
lock = m.Lock()
func = partial(wrapper, lock)
with mp.Pool(processes=4) as pool:
print(pool.map(func, work))
另一个版本,每个工作项都有关于是否需要锁的信息:
import multiprocessing as mp
import time
from functools import partial
# define work with lock information
work = [(1, True), (2, False), (3, True), (4, False), (5, True), (6, False), (7, False)]
def black_box(i: int):
print(i)
time.sleep(1)
def wrapper(lock, work_item: tuple):
if work_item[1] is True:
lock.acquire()
black_box(work_item[0])
lock.release()
else:
black_box(work_item[0])
return f"chunk {work_item[0]} done"
if __name__ == '__main__':
m = mp.Manager()
lock = m.Lock()
func = partial(wrapper, lock)
with mp.Pool(processes=4) as pool:
print(pool.map(func, work))
最后是一个没有锁的版本,其中每个工作项都有延迟信息。这里唯一的机制是一些进程延迟指定的秒数(希望没有冲突…(
import multiprocessing as mp
import time
# define work with delay information
work = [(1, 1), (2, 2), (3, 4), (4, 0), (5, 0), (6, 0), (7, 0)]
def black_box(i: int):
print(i)
time.sleep(1)
def wrapper(work_item: tuple):
time.sleep(work_item[1])
black_box(work_item[0])
return f"chunk {work_item[0]} done"
if __name__ == '__main__':
m = mp.Manager()
with mp.Pool(processes=4) as pool:
print(pool.map(wrapper, work))