Concurrent.futures使用指南-一个同时使用线程和处理的简单示例



我想使用concurrent.futures模块启用程序的并行处理/线程处理。

不幸的是,我似乎找不到任何使用concurrent.futures模块的好的、简单的、防白痴的例子。它们通常需要更高级的python知识或处理/线程概念和术语。

下面是一个基于我的程序的简化的、自包含的示例:有一个纯粹的CPU绑定任务非常适合多强制执行,还有一个单独的IO绑定任务插入数据库(SQLite)。在我的程序中,我已经将其转换为使用多处理池类,但由于CPU绑定任务的结果都是在等待任务完成时收集的,因此它使用了大量内存。因此,我希望使用线程/处理的组合,我相信concurrent.futures可以为我做得相当简单。

那么,我该如何将以下内容转换为使用该模块的内容呢?

import sqlite3
#Stand in CPU intensive task
def calculate(value):
return value * 10
#Stand in Thread I/O intensive task
def output(value):
global db
if (value % 1000) == 0:
db.execute('delete from test_table')
db.execute('insert into test_table (result) values (?)', (value,))
def main():
global db
results = []
db  = sqlite3.connect('e:\z_dev\test.sqlite')
db.cursor()
#=========
#Perform CPU intensive task
for i in range(1000):
results.append( calculate(i))
#Perform Threading intensive task
for a in results:
output(a)
#=========
db.commit()
db.close()
if __name__ == '__main__':
main()

我正在寻找一个不使用任何花哨/复杂python的答案。或者一个简单明了的解释,或者两者兼而有之!

感谢

编辑:我当前的"多处理器"实现。也许是错的,但它似乎奏效了。没有任何螺纹。这是在上面的"#============"部分。

#Multiprocessing
pool = multiprocessing.Pool(None)
for i in range(1000):
results.append( pool.apply_async(calculate(i)))
pool.close()
pool.join()
for i in results:
results[i] = results[i].get()
#Complete lack of threading; but if I had it, it'd be here:     
for a in results:
output(a)

concurrent.futures有一个最小化的API。它很容易用于非常直接的问题,但你没有一个非常简单的问题。如果你这样做了,你就已经解决了;-)

你没有展示任何你编写的multiprocessing.Pool代码,但这将是一个更有希望的开始-假设你更想解决问题,而不是确认你的希望,如果你只切换到较弱的API,这一定很容易做到;-)

使用multiprocessing的"一个明显"方法是使用Pool.apply_async()方法,将异步结果对象放在有界的Queue.Queue上,并让主程序中的线程从Queue中取出这些对象,然后等待结果显示。这很简单,但不是魔术。它解决了您的问题,因为有界Queues在以不同速度运行的生产者和消费者之间进行调解的规范方式。concurrent.futures中没有任何内容直接解决这个问题,它是"大量内存"问题的核心。

# Define global result_queue only in the main program.
import Queue
result_queue = Queue.Queue(100)  # pick a reasonable max size based on your problem
# Run this in as many threads as you like.
def consume_results():
while True:
a = result_queue.get()
if a is None:
break
output(a.get())  # `output()` is your function
...
# main program passes out work, after starting threads
for i in range(1000):
# the .put() will block so long as the queue is at its max size
result_queue.put(pool.apply_async(calculate, args=(i,)))
# add sentinels to let threads know they're done
for i in range(number_of_threads_you_started):
result_queue.put(None)

这正是需要来保持生产者和消费者大致平衡的东西,而且在任何标准库中都没有任何东西可以神奇地为你做到这一点。

编辑-充实它

这里有一个完整的、可执行的例子,任何使用Python3的人都可以运行。注:

  • 它不使用您的代码片段,因为这些片段依赖于外部数据库模块,不是每个人都可以运行
  • 它坚持使用concurrent.futures来管理进程和线程。使用multiprocessingthreading并不是很难,事实上,这里使用方式线程会更容易直接使用threading。但这种方式已经足够清楚了
  • concurrent.futuresFuture对象与multiprocessing异步结果对象基本相同——API功能的拼写不同
  • 你的问题并不简单,因为它有多个阶段,可以以不同的速度运行。同样,任何标准库中的任何东西都无法神奇地隐藏其潜在的不良后果。创建自己的有界队列仍然是最好的解决方案。对于MAX_QUEUE_SIZE的任何正常值,这里的内存使用将保持适度
  • 您通常不希望创建比可用内核数量少一个的CPU绑定工作进程。主程序也需要循环才能运行,操作系统也是如此
  • 一旦你习惯了这些东西,这个代码中的所有注释都会很烦人,比如看到代码行i += 1上的注释"incremental by 1";-)

这是代码:

import concurrent.futures as cf
import threading
import queue
NUM_CPUS = 3
NUM_THREADS = 4
MAX_QUEUE_SIZE = 20
# Runs in worker processes.
def producer(i):
return i + 10
def consumer(i):
global total
# We need to protect this with a lock because
# multiple threads in the main program can
# execute this function simultaneously.
with sumlock:
total += i
# Runs in threads in main program.
def consume_results(q):
while True:
future = q.get()
if future is None:
break
else:
consumer(future.result())
if __name__ == "__main__":
sumlock = threading.Lock()
result_queue = queue.Queue(MAX_QUEUE_SIZE)
total = 0
NUM_TO_DO = 1000
with cf.ThreadPoolExecutor(NUM_THREADS) as tp:
# start the threads running `consume_results`
for _ in range(NUM_THREADS):
tp.submit(consume_results, result_queue)
# start the worker processes
with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
for i in range(NUM_TO_DO):
# blocks until the queue size <= MAX_QUEUE_SIZE
result_queue.put(pp.submit(producer, i))
# tell threads we're done
for _ in range(NUM_THREADS):
result_queue.put(None)
print("got", total, "expected", (10 + NUM_TO_DO + 9) * NUM_TO_DO // 2)

如果一切顺利,这就是预期的输出:

got 509500 expected 509500

最新更新