我尝试通过apply_async将共享计数器传递给多处理任务,但它失败了这样的错误"RuntimeError:同步对象应仅通过继承在进程之间共享"。发生什么事了
def processLine(lines, counter, mutex):
pass
counter = multiprocessing.Value('i', 0)
mutex = multiprocessing.Lock()
pool = Pool(processes = 8)
lines = []
for line in inputStream:
lines.append(line)
if len(lines) >= 5000:
#don't queue more than 1'000'000 lines
while counter.value > 1000000:
time.sleep(0.05)
mutex.acquire()
counter.value += len(lines)
mutex.release()
pool.apply_async(processLine, args=(lines, counter, ), callback = collectResults)
lines = []
让池处理调度:
for result in pool.imap(process_single_line, input_stream):
pass
如果顺序无关:
for result in pool.imap_unordered(process_single_line, input_stream):
pass
pool.*map*()
函数有chunksize
参数,您可以更改该参数以查看它是否会影响您的情况下的性能。
如果您的代码希望在一次调用中传递多行:
from itertools import izip_longest
chunks = izip_longest(*[iter(inputStream)]*5000, fillvalue='') # grouper recipe
for result in pool.imap(process_lines, chunks):
pass
限制排队项数的一些替代方法有:
-
multiprocessing.Queue
设置最大大小(你不需要一个池在这种情况下)。queue.put()
将在达到最大大小时阻塞,直到其他进程调用queue.get()
- 使用多处理原语(如Condition或BoundedSemaphor)手动实现生产者/消费者模式。
注意:每个Value都有关联的锁,不需要单独的锁
我用一种不太优雅的方式解决了这个问题
def processLine(lines):
pass
def collectResults(result):
global counter
counter -= len(result)
counter = 0
pool = Pool(processes = 8)
lines = []
for line in inputStream:
lines.append(line)
if len(lines) >= 5000:
#don't queue more than 1'000'000 lines
while counter.value > 1000000:
time.sleep(0.05)
counter.value += len(lines)
pool.apply_async(processLine, args=(lines), callback = collectResults)
lines = []