什么样的python模式可以用于并行化



cmd是一个处理参数x并将输出打印到stdout的函数。例如,它可能是

def cmd(x):
print(x)

调用cmd()的串行程序如下所示。

for x in array:
cmd(x)

为了加快程序的速度,我希望它并行运行。stdout输出可能是无序的,但单个x的输出不能被另一个x的输出破坏。

在python中可以有各种方法来实现这一点。我想出了这样的办法。

from joblib import Parallel, delayed
Parallel(n_jobs=100)(delayed(cmd)(i) for i in range(100))

就代码的简单性/可读性和效率而言,这是在python中实现这一点的最佳方式吗?

此外,上面的代码在python3上运行正常。但不是在python2上,我得到了以下错误。这是一个可能导致错误的问题吗?

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site packages/joblib/externals/loky/backend/semlock.py:217:运行时间警告:OSX上的信号量已损坏,发布可能会增加其最大值"增加其最大值",运行时间警告(

谢谢。

标准库中的

https://docs.python.org/3/library/threading.html

import threading
def cmd(x):
lock.acquire(blocking=True)
print(x)
lock.release()
lock = threading.Lock()
for i in range(100):
t = threading.Thread(target=cmd, args=(i,))
t.start()

使用锁可以保证lock.acquire()lock.release()之间的代码一次只能由一个线程执行。print方法在python3中已经是线程安全的,因此即使没有锁,输出也不会中断。但是,如果在线程之间共享任何状态(它们修改的对象(,则需要一个锁。

如果您使用的是python3,那么您可以使用标准库中的concurrent.futures而不是

考虑以下用法:

with concurrent.futures.ProcessPoolExecutor(100) as executor:
for x in array:
executor.submit(cmd, x)

我会用以下代码来处理问题中的问题(假设我们谈论的是CPU绑定操作(:

import multiprocessing as mp
import random

def cmd(value):
# some CPU heavy calculation
for dummy in range(10 ** 8):
random.random()
# result
return "result for {}".format(value)

if __name__ == '__main__':
data = [val for val in range(10)]
pool = mp.Pool(4)  # 4 - is the number of processes (the number of CPU cores used)
# result is obtained after the process of all the data
result = pool.map(cmd, data)
print(result)

输出:

['result for 0', 'result for 1', 'result for 2', 'result for 3', 'result for 4', 'result for 5', 'result for 6', 'result for 7', 'result for 8', 'result for 9']

EDIT-另一种在计算后立即获得结果的实现-processesqueues而不是poolmap:

import multiprocessing
import random

def cmd(value, result_queue):
# some CPU heavy calculation
for dummy in range(10 ** 8):
random.random()
# result
result_queue.put("result for {}".format(value))

if __name__ == '__main__':
data = [val for val in range(10)]
results = multiprocessing.Queue()
LIMIT = 3  # 3 - is the number of processes (the number of CPU cores used)
counter = 0
for val in data:
counter += 1
multiprocessing.Process(
target=cmd,
kwargs={'value': val, 'result_queue': results}
).start()
if counter >= LIMIT:
print(results.get())
counter -= 1
for dummy in range(LIMIT - 1):
print(results.get())

输出:

result for 0
result for 1
result for 2
result for 3
result for 4
result for 5
result for 7
result for 6
result for 8
result for 9

最新更新