我需要在输入列表上运行并行化过程,但在过程中使用代码中上面定义的所有变量和函数。但是这个过程本身是可以并行化的,因为它只依赖于一个变量,即列表的输入。 所以我有两种可能性,但我不知道如何实现这两种可能性:
1(使用一个类,并有一个应该使用该类的所有函数和属性并行化的方法。也就是说:在并行循环中运行该方法,但有机会读取对象的属性而不创建它的副本。
2(在运行并行化过程之前,只需有一个大的main并定义全局变量。
前任:
from joblib import Parallel, delayed
def func(x,y,z):
#do something
a = func0(x,y) #whatever function
a = func1(a,z) #whatever function
return a
if name==“__main__””:
#a lot of stuff in which you create y and z
global y,z
result = Parallel(n_jobs=2)(delayed(func)(i,y,z)for i in range(10))
所以问题是,当我到达并行函数时,y 和 z 已经定义,它们只是查找数据,我的问题是我如何将这些值传递给并行函数,而无需 python 为每个作业创建副本?
如果你只需要将列表传递给一些并行进程,我会使用内置的线程模块。从我能告诉你的问题来看,这就是你所需要的,你可以将参数传递给线程。
下面是一个基本的线程设置:
import threading
def func(x, y):
print(x, y) # random example
x, y = "foo", "bar"
threads = []
for _ in range(10): # create 10 threads
t = threading.Thread(target=func, args=(x, y,))
threads.append(t)
t.start()
for t in threads:
t.join() # waits for the thread to complete
但是,如果您需要以线程安全的方式跟踪该列表,则需要使用队列:
import threading, queue
# build a thread-safe list
my_q = queue.Queue()
for i in range(1000):
my_q.put(i)
# here is your worker function
def worker(queue):
while not queue.empty():
task = queue.get() # get the next value from the queue
print(task)
queue.task_done() # when you are done tell the queue that this task is complete
# spin up some threads
threads = []
for _ in range(10):
t = threading.Thread(target=worker, args=(my_q,))
threads.append(t)
t.start()
my_q.join() # joining the queue means your code will wait here until the queue is empty
现在,要回答有关共享状态的问题,您可以创建一个对象来保存变量。这样,您可以传递对象本身,而不是将变量的副本传递给每个线程(我相信这称为 Borg,但我可能有点错误(。执行此操作时,如果您计划对共享变量进行任何更改,则会导入该变量以确保它们是线程安全的。例如,如果两个线程尝试同时递增一个数字,则可能会丢失该更改,因为一个线程会覆盖另一个线程。为了防止这种情况,我们使用threading.Lock
对象。(如果你不关心这个,就忽略下面的所有锁的东西(。
还有其他方法可以做到这一点,但我发现这种方法很容易理解并且非常灵活:
import threading
# worker function
def worker(vars, lock):
with lock:
vars.counter += 1
print(f"{threading.current_thread().name}: counter = {vars.counter}")
# this holds your variables to be referenced by threads
class Vars(object):
counter = 0
vars = Vars()
lock = threading.Lock()
# spin up some threads
threads = []
for _ in range(10):
t = threading.Thread(target=worker, args=(vars, lock, ))
threads.append(t)
t.start()
for t in threads:
t.join()