我使用的代码发布在下面。我在 Ubuntu 16.04 上运行,我的笔记本电脑有一个 i7 四核处理器。"data"是一个有 ~100,000 行和 4 列的矩阵。"EEMD"是一个计算成本很高的函数。在我的机器上,处理所有列需要 5 分钟,无论我是并行执行每一列还是使用 Pool.map((,如下所示。
我在这个网站上看到了其他示例,其中包含我已经能够运行并成功演示 Pool.map(( 将运行代码所需的时间缩短了进程数的倍数,但这在这里对我不起作用,我不知道为什么。
无论我使用 Pool.map(( 还是 Pool.imap((,结果都是一样的。
#!/usr/bin/python
import time
from pyeemd import eemd
import numpy as np
import linecache
data = np.loadtxt("test_data.txt")
idx = range(4)
def eemd_sans_multi():
t = time.time()
for i in idx:
eemd(data[:,i])
print("Without multiprocessing...")
print time.time()-t
def eemd_wrapper(idx):
imfs = eemd(data[:,idx])
return imfs
def eemd_with_multi():
import multiprocessing as mp
pool = mp.Pool(processes=4)
t = time.time()
for x in pool.map(eemd_wrapper, idx):
print(x)
print("With multiprocessing...")
print time.time()-t
if __name__ == "__main__":
eemd_sans_multi()
eemd_with_multi()
基于沙丘回复的新代码
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import ctypes
from time import time
from pyeemd import eemd
import numpy as np
import re
import linecache
data = np.loadtxt("test_data.txt",skiprows=8)
headers = re.split(r't+',linecache.getline("test_data.txt", 8))
idx = [i for i, x in enumerate(headers) if x.endswith("Z")]
idx = idx[0:2]
print(idx)
def eemd_wrapper(idx):
imfs = eemd(data[:,idx])
return imfs
def main():
print("serial")
start = time()
for i in idx:
eemd_wrapper(i)
end = time()
print("took {} secondsn".format(end-start))
for executor_class in (ThreadPoolExecutor, ProcessPoolExecutor):
print(executor_class.__name__)
start = time()
# we'll only be using two workers so as to make time comparisons simple
with executor_class(max_workers=2) as executor:
executor.map(eemd_wrapper, idx)
end = time()
print("took {} secondsn".format(end-start))
if __name__ == '__main__':
main()
在python 3中,你可以尝试ProcessPoolExecutor
concurrent.futures
模块,这里有一个例子:
from time import time
from concurrent.futures import ProcessPoolExecutor
def gcd(pair):
a, b = pair
low = min(a, b)
for i in range(low, 0, -1):
if a % i == 0 and b % i == 0:
return i
numbers = [(1963309, 2265973), (2030677, 3814172),
(1551645, 2229620), (2039045, 2020802), (6532541, 9865412)]
start = time()
results = list(map(gcd, numbers))
end = time()
print('1st Took %.3f seconds' % (end - start))
start = time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('2nd Took %.3f seconds' % (end - start))
主要编辑
看起来libeemd
已经是多线程的了。在 Python 中并行执行不会显著提高性能。您已经声明您正在使用 Ubuntu 16.04,这意味着您将使用 gcc 5.4(支持 OpenMP(编译 libeemd。libeemd 的 Makefile 显示它是用-fopenmp
编译的。所以是的,它已经是多线程的。
该库已经是多线程的,这也解释了为什么ProcessPoolExecutor
在示例代码中遇到问题。也就是说,在调用进程池之前,该库已经被使用,Unix系统创建新进程(分叉(的默认方式是创建进程的伪副本。因此,子工作者只剩下一个引用父进程中线程的库。如果您只自己执行ProcessPoolExecutor
,您将看到它工作正常。
原始答案
鉴于pyeemd
是使用ctypes
作为胶水的libeemd
包装器,您不需要使用多处理 - 多线程解决方案应该足以获得速度提升(以及最好的速度提升(。
为什么是线程?
当任务受 CPU 限制时,Python 中通常使用多处理代替多线程。这是因为全局解释器锁(GIL(,这对于单线程Python的性能至关重要。但是,GIL 使多线程纯Python代码像单线程一样运行。
但是,当线程通过ctypes
模块进入 C 函数时,它会释放 GIL,因为该函数不需要执行 Python 代码。Python 类型被转换为 C 类型用于调用,numpy
数组是围绕 C 缓冲区的包装器(保证在函数持续时间内存在(。所以不需要 Python 解释器及其 GIL。
如果使用纯 Python,多处理是获得速度提升的好方法,但它的缺陷之一是需要将数据发送给童工并将结果返回给父级。如果其中任何一个占用大量内存,则会增加前后推送数据的大量开销。那么,如果不需要,为什么要使用多处理。
例
在这里,我们将测试完成长时间运行的 C 函数需要多长时间 4 次。这将串行完成一次,一次使用两个工作线程,一次使用两个工作进程。这将表明,当大部分工作在 C 库中完成时,多线程与多处理一样好(如果不是更好的话(。lengthy.c
只是一个例子,任何使用相同参数调用的确定性但昂贵的函数都可以。
冗长.c
#include <stdint.h>
double lengthy(uint64_t n) {
double total = 0;
for (uint64_t i = 0; i < n; ++i) {
total += i;
}
return total;
}
将代码转换为可由ctypes
加载的库
dunes@dunes-VM:~/src$ gcc -c -Wall -Werror -fpic lengthy.c
dunes@dunes-VM:~/src$ gcc -shared -Wl,-soname,liblengthy.so -o liblengthy.so lengthy.o -lc
time_lengthy.py
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import ctypes
from time import time
# create a handle to the C function lengthy
liblengthy = ctypes.cdll.LoadLibrary('./liblengthy.so')
lengthy = liblengthy.lengthy
lengthy.argtypes = ctypes.c_uint64,
lengthy.restype = ctypes.c_double
def job(arg):
"""This function is only necessary as lengthy itself cannot be pickled, and
therefore cannot be directly used with a ProcessPoolExecutor.
"""
return lengthy(arg)
def main():
n = 1 << 28
# i << 28 was chosen because it takes approximately 1 second on my machine
# Feel free to choose any value where 0 <= n < (1 << 64)
items = [n] * 4 # 4 jobs to do
print("serial")
start = time()
for i in items:
job(i)
end = time()
print("took {} secondsn".format(end-start))
for executor_class in (ThreadPoolExecutor, ProcessPoolExecutor):
print(executor_class.__name__)
start = time()
# we'll only be using two workers so as to make time comparisons simple
with executor_class(max_workers=2) as executor:
executor.map(job, items)
end = time()
print("took {} secondsn".format(end-start))
if __name__ == '__main__':
main()
其中,当运行时给出:
dunes@dunes-VM:~/src$ python3 multi.py
serial
took 4.936346530914307 seconds
ThreadPoolExecutor
took 2.59773850440979 seconds
ProcessPoolExecutor
took 2.611887216567993 seconds
我们可以看到并行运行的两个线程/进程的速度几乎是串行运行的单个线程的两倍。但是,线程不会承受在父工作线程和子辅助角色之间来回发送数据的开销。所以,你不妨使用线程,因为pyeemd源代码表明它在纯Python中没有任何重要的工作。