我希望使用 multiprocessing
加快缓慢的循环。但是,从我所看到的多处理示例中,我不确定这种实现是很好的实践,可行的还是可能的。
循环中有两个部分:data ingestion
和data processing
。我想在处理过程中开始进行数据摄入的下一部分,因此数据可尽快可用。
伪代码:
d = get_data(n)
for n in range(N):
p = process_data(d)
d = get_data(n+1) #prepare data for next process loop
- 多处理是否适合这种功能?
- 一个人将如何做?
预先感谢。
正如您所说,多处理基本上是在派遣和收集工作。正如您所澄清的那样,您基本上希望process_data
和get_data
并行工作。
这是我的解决方案
import multiprocessing as mp
# create pool for dispatching work
pool = mp.Pool()
# call your functions asynchronously
process_data_process = pool.apply_async(process_data, (d,))
get_data_process = pool.apply_async(get_data, (n+1,))
# After your functions are dispatched, wait for results
process_data_result = process_data_process.get()
get_data_result = get_data_process.get()
# Note: get_data_result will not be fetched till process_data_result is ready
# But that should be fine since you can't start the next batch
# till this batch is done
,您可以将其包裹在循环中。希望回答您的问题!
让我们假设您想拥有一个线程/进程摄入数据,因为它将是i/o,而不是cpu绑定。在将数据传递到处理层之前,您仅进行最小的解析和/或验证。
让我们进一步假设您可以完全并行对每个输入项目进行数据处理;这些输入项目之间没有分类或时间/测序依赖性。
在这种情况下,您的任务基本上是"风扇淘汰"处理模型的海报孩子。您创建一个多处理。然后,您创建一个多处理。然后,此初始化代码成为摄入处理任务(队列的"生产者"(,并且流程池都成为消费者,执行处理。
有许多在线示例,第一个链接可能使用了这种模式。
剩下的问题当然是您将如何处理结果。
如果他们需要序列化回到某个单个文件,那么显而易见的方法是创建两个队列对象...一个用于工作队列(摄入过程对其进行馈送,池进程消耗(,另一个是输出队列(池中进料中,一个过程将其消耗从中输入,以将结果连贯地写入您的输出(。请注意,让主人(摄入(过程多路复用是可能的,有时甚至是非常有效的。它可以与输出队列上的民意调查相交以写出结果。但是,当然,您也可以旋转一个专门用于输出处理的过程。
另一方面,您的结果可能可以通过工作过程并行编写。如果您将结果写入许多文件,或将它们作为插入或更新语句发布到某些SQL数据库,或将它们送到Hadoop HDFS或Spark DataSet中。有许多形式的输出可以平行写入。
也可能需要将处理和输出/结果处理层解除。可能是,您的应用程序将在数据处理层中使用大量进程和较小的输出层中最佳调整。(例如,如果每个项目的处理是CPU密集的,并且您有很多核心,那么您可能会在CPU置于CPU时会遇到过多的I/O频道的问题(。
再次使用队列。它们旨在支持多制作和多消费者的连贯性。您摆脱了对并发锁定,死锁和生计问题等的担忧。