Python 多处理回调



我有一个函数列表,可以完成一些工作,例如从URL下载html(每个函数都非常不同,所以我不能使单个函数接受URL和下载)。 我使用多处理来加快任务速度。 下面是我的代码

def runInParallel(list_of_functions):
for fn in list_of_functions:
proc = [Process(target=fn[1]).start() for fn in list_of_functions]
for p in proc:
p.join()

我想要的是如何存储每个函数返回的结果? 每个函数都返回一个字典,我需要解析并存储在数据库中,我不想在每个函数中重复这些步骤,所以我想要的是某种回调,可以用功能返回的结果传递。 我怎样才能做到这一点?

编辑:使用pool但抛出错误。 我有以下list_of_functions

[('f1', <function f1 at 0x7f34c11c9ed8>), ('f2', <function f2 at 0x7f34c11c9f50>)]

def runInParallel(list_of_functions):
import multiprocessing
pool = multiprocessing.Pool(processes = 3)
x = pool.map(lambda f: f(), list_of_functions)
print x


File "main.py", line 31, in <module>
runInParallel(all_functions)
File "main.py", line 11, in runInParallel
x = pool.map(lambda f: f(), list_of_functions)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如上面的注释中所述:如果您直接使用Process,则需要设置一个队列,其中进程put,以便您可以从父进程get

from multiprocessing import Process, Queue
from time import sleep
def f1(queue):
sleep(1) # get url, "simulated" by sleep
queue.put(dict(iam="type 1"))
def f2(queue):
sleep(1.5)
queue.put(dict(iam="type 2"))
def f3(queue):
sleep(0.5)
queue.put(dict(iam="type 3"))

def runInParallel(list_of_functions):
queue = Queue()
proc = [Process(target=fn[1], args=(queue,)) for fn in list_of_functions]
for p in proc:
p.start()
res = []
for p in proc:
p.join()
res.append(queue.get())
return res
if __name__ == '__main__':
list_of_functions = [("f1", f1), ("f2", f2), ("f3", f3)]
for d in runInParallel(list_of_functions):
print d

指纹:

{'iam': 'type 3'}
{'iam': 'type f1'}
{'iam': 'type f2'}

如果你的函数基本上都做同样的事情(获取 url 并以某种方式处理 html),那么将你的函数合并为一个具有一些if/elif逻辑的函数允许你使用map并且你不需要任何队列:

from multiprocessing import Pool
from time import sleep
def f(arg):
url, typ = arg
if typ == 'a':
sleep(1) # instead you would do something with `url` here
return dict(iam="type 1", url=url)
elif typ == 'b':
sleep(1.5)
return dict(iam="type 2", url=url)
elif typ == 'c':
sleep(0.5)
return dict(iam="type 3", url=url)
def runInParallel(work):
p = Pool(3)
return p.map(f, work)
if __name__ == '__main__':
work = [('http://url1', 'a'),
('http://url2', 'b'),
('http://url3', 'c'),
]
for d in runInParallel(work):
print d

指纹:

{'url': 'http://url1', 'iam': 'type 1'}
{'url': 'http://url2', 'iam': 'type 2'}
{'url': 'http://url3', 'iam': 'type 3'}

这两个脚本都可以在Windows上工作,也可以在Unix环境中工作(在OSX上尝试过)

相关内容

  • 没有找到相关文章

最新更新