我有一个函数说fun()
哪个yields
生成器。我想检查生成器是否为空,并且由于我想节省尽可能多的运行时间,因此我 不要将其转换为列表并检查其是否为空。 相反,我这样做:
def peek(iterable):
try:
first = next(iterable)
except StopIteration:
return None
return first, itertools.chain([first], iterable)
我像这样使用多处理:
def call_generator_obj(ret_val):
next = peek(ret_val)
if next is not None:
return False
else:
return True
def main():
import multiprocessing as mp
pool = mp.Pool(processes=mp.cpu_count()-1)
# for loop over here
ret_val = fun(args, kwargs)
results.append(pool.apply(call_generator_obj, args=(ret_val,))
# the above line throws me the error
据我所知,酸洗是在将内存中的某个对象转换为字节流时,我正在我的任何函数中做类似的事情。
回溯:(在尖线之后(
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 253, in apply
return self.apply_async(func, args, kwds).get()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 608, in get
raise self._value
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 385, in _handle_tasks
put(task)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle generator objects
据我所知,酸洗是在将内存中的某些对象转换为字节流时,我认为我在这里没有做这样的事情。
好吧,你正在这样做。
不能直接在进程之间传递 Python 值。即使是最简单的变量也包含指向进程内存空间中某处结构的指针,仅将该指针复制到其他进程就会给您带来段错误或垃圾,具体取决于另一个进程中的相同内存空间是未映射还是映射到完全不同的内容。像生成器这样复杂的东西 - 基本上是一个实时堆栈帧 - 将更加不可能。
multiprocessing
解决这个问题的方法是透明地腌制你给它传递的所有东西。 函数、它们的参数和它们的返回值都需要被腌制。
如果你想知道它是如何在幕后工作的:Pool
本质上是通过有一个Queue
来工作的,父级put
的任务——其中的任务基本上是(func, args)
对的——而子级get
任务。Queue
本质上是通过调用pickle.dumps(value)
然后将结果写入管道或其他进程间通信机制来工作的。