In [5]: def fn(x):
...: os.environ["var_{}".format(x)] = x
...:
...:
[PYFLYBY] import os
In [6]: def gn(x):
...: return os.environ["var_{}".format(x)]
...:
...:
...:
a = ["1", "2", "3"]
In [8]: with concurrent.futures.ProcessPoolExecutor(max_workers=3, initializer=fn, initargs=a) as e:
...: r = e.map(gn, a)
...:
Exception in initializer:
Traceback (most recent call last):
File "/opt/python/python-3.7/lib64/python3.7/concurrent/futures/process.py", line 226, in _process_worker
initializer(*initargs)
TypeError: fn() takes 1 positional argument but 3 were given
Exception in initializer:
Traceback (most recent call last):
File "/opt/python/python-3.7/lib64/python3.7/concurrent/futures/process.py", line 226, in _process_worker
initializer(*initargs)
TypeError: fn() takes 1 positional argument but 3 were given
Exception in initializer:
Traceback (most recent call last):
File "/opt/python/python-3.7/lib64/python3.7/concurrent/futures/process.py", line 226, in _process_worker
initializer(*initargs)
TypeError: fn() takes 1 positional argument but 3 were given
所以,基本上,我想把一个[0]传给第一个工人,把一个[1]传给第二个工人,以此类推……有什么方法可以用这种方式实现这一点吗?现在,整个a被传递给fn,这导致了这个错误。
您的例子并不完全正确,但关于问题:
您可以将multiprocessing.Queue
传递给初始值设定器函数,将特定于工作进程的数据放入其中,并在每个工作进程中执行一个queue.get()
:
import os
import concurrent.futures
import multiprocessing
import time
def fn(q):
x = q.get()
os.environ["var_x"] = x
def gn(i):
time.sleep(0.5)
return f"pid={os.getpid()} var_x={os.environ['var_x']}n"
q = multiprocessing.Queue()
a = ["1", "2", "3"]
with concurrent.futures.ProcessPoolExecutor(max_workers=3, initializer=fn, initargs=(q,)) as e:
[q.put(i) for i in a]
print(*e.map(gn, a))
输出:
pid=1218 var_x=1
pid=1219 var_x=2
pid=1220 var_x=3