假设我有一个非常大的列表,我正在执行这样的操作:
for item in items:
try:
api.my_operation(item)
except:
print 'error with item'
我的问题有两个:
- 有很多项目
- api。my_operation永远返回
我想使用多线程来启动一堆api。my_operations,这样我可以一次处理5个,10个,甚至100个项目。
如果my_operation()返回一个异常(因为可能我已经处理了那个项目)-没关系。它不会破坏任何东西。循环可以继续到下一项。
注意:这是Python 2.7.3
首先,在Python中,如果你的代码是cpu限制的,多线程不会有帮助,因为只有一个线程可以持有全局解释器锁,因此一次只能运行Python代码。所以,你需要使用进程,而不是线程。
如果你的操作"需要永远返回",因为它是io绑定的,也就是说,等待网络或磁盘副本或类似的东西,这是不正确的。我以后再讲。
接下来,一次处理5个、10个或100个项目的方法是创建一个5个、10个或100个工人的池,并将这些项目放入工人服务的队列中。幸运的是,标准库multiprocessing
和concurrent.futures
库都为您提供了大部分细节。
前者对于传统编程更强大、更灵活;如果需要编写future-waiting,后者更简单;对于简单的情况,你选择哪一个并不重要。(在这种情况下,最明显的实现是使用futures
使用3行,使用multiprocessing
使用4行。)
如果你使用的是2.6-2.7或3.0-3.1,futures
不是内置的,但你可以从PyPI (pip install futures
)安装。
最后,如果你能把整个循环迭代变成一个函数调用(例如,你可以把它传递给map
),那么并行化事情通常会简单得多,所以让我们先这样做:
def try_my_operation(item):
try:
api.my_operation(item)
except:
print('error with item')
把它们放在一起:
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)
如果您有许多相对较小的作业,那么多处理的开销可能会淹没收益。解决这个问题的方法是将工作分批分成更大的任务。例如(使用itertools
食谱中的grouper
,您可以将其复制并粘贴到您的代码中,或者从PyPI上的more-itertools
项目中获取):
def try_multiple_operations(items):
for item in items:
try:
api.my_operation(item)
except:
print('error with item')
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group)
for group in grouper(5, items)]
concurrent.futures.wait(futures)
最后,如果你的代码是IO绑定的呢?那么线程就和进程一样好,而且开销更少(限制也更少,但在这种情况下,这些限制通常不会影响到您)。有时候,"更少的开销"足以意味着你不需要对线程进行批处理,但你需要对进程进行批处理,这是一个很好的胜利。
那么,如何使用线程而不是进程呢?只需将ProcessPoolExecutor
更改为ThreadPoolExecutor
。
如果你不确定你的代码是cpu限制的还是io限制的,就试试这两种方式。
我可以在我的python脚本中为多个函数这样做吗?例如,如果我在代码的其他地方有另一个For循环,我想并行化。是否有可能在同一个脚本中做两个多线程函数?
是的。事实上,有两种不同的方法。
首先,您可以共享同一个(线程或进程)执行器,并在多个地方使用它,没有问题。任务和未来的重点在于它们是独立的;你不关心它们在哪里运行,只关心你将它们排队并最终得到答案。
或者,您可以在同一个程序中有两个执行器,这没有问题。这是有性能代价的——如果您同时使用两个执行器,那么您最终将尝试在8个内核上运行(例如)16个繁忙线程,这意味着将会有一些上下文切换。但有时这样做是值得的,因为,比如说,两个执行器很少同时忙碌,而且这会使您的代码简单得多。或者一个执行者正在运行非常大的任务,可能需要一段时间才能完成,而另一个执行者正在运行非常小的任务,需要尽快完成,因为响应性比程序的吞吐量更重要。
如果你不知道哪个适合你的程序,通常是第一个。
有多处理。池,下面的示例演示了如何使用其中一个:
from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool
pool_size = 5 # your "parallelness"
# define worker function before a Pool is instantiated
def worker(item):
try:
api.my_operation(item)
except:
print('error with item')
pool = Pool(pool_size)
for item in items:
pool.apply_async(worker, (item,))
pool.close()
pool.join()
现在,如果你确实像@abarnert提到的那样确定你的进程是CPU绑定的,将ThreadPool更改为进程池实现(在ThreadPool导入下注释)。您可以在这里找到更多详细信息:http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers
您可以使用如下方法将处理拆分为指定数量的线程:
import threading
def process(items, start, end):
for item in items[start:end]:
try:
api.my_operation(item)
except Exception:
print('error with item')
def split_processing(items, num_splits=4):
split_size = len(items) // num_splits
threads = []
for i in range(num_splits):
# determine the indices of the list this thread will handle
start = i * split_size
# special case on the last chunk to account for uneven splits
end = None if i+1 == num_splits else (i+1) * split_size
# create the thread
threads.append(
threading.Thread(target=process, args=(items, start, end)))
threads[-1].start() # start the thread we just created
# wait for all threads to finish
for t in threads:
t.join()
split_processing(items)
import numpy as np
import threading
def threaded_process(items_chunk):
""" Your main process which runs in thread for each chunk"""
for item in items_chunk:
try:
api.my_operation(item)
except Exception:
print('error with item')
n_threads = 20
# Splitting the items into chunks equal to number of threads
array_chunk = np.array_split(input_image_list, n_threads)
thread_list = []
for thr in range(n_threads):
thread = threading.Thread(target=threaded_process, args=(array_chunk[thr]),)
thread_list.append(thread)
thread_list[thr].start()
for thread in thread_list:
thread.join()