Python 多处理模块



编辑:使用环境信息进行了更新(请参阅第一部分)

环境

我正在使用Python 2.7

乌班图16.04

问题

我有一个应用程序,我已将其简化为三个阶段的过程:

  1. 从多个数据源(HTTP 请求、系统信息等)收集数据
  2. 基于此数据计算指标
  3. 以各种格式输出这些指标

这些阶段中的每一个都必须在进入下一阶段之前完成,但是每个阶段都由多个可以并行运行的子任务组成(我可以发送 3 个 HTTP 请求并在等待它们返回的同时读取系统日志)

我已将阶段划分为模块,将子任务划分为子模块,因此我的项目层次结构如下所示:

+ datasources
|-- __init__.py
|-- data_one.py
|-- data_two.py
|-- data_three.py
+ metrics
|-- __init__.py
|-- metric_one.py
|-- metric_two.py
+ outputs
|-- output_one.py
|-- output_two.py
- app.py

app.py看起来大致是这样(为简洁起见,伪代码):

import datasources
import metrics
import outputs
for datasource in dir(datasources):
datasource.refresh()
for metric in dir(metrics):
metric.calculate()
for output in dir(outputs):
output.dump()

(有额外的代码包装dir调用以忽略系统模块,有异常处理等 - 但这是它的要点)

每个数据源子模块大致如下所示:

data = []
def refresh():
# Populate the "data" member somehow
data = [1, 2, 3]
return

每个指标子模块大致如下所示:

import datasources.data_one as data_one
import datasources.data_two as data_two
data = []
def calculate():
# Use the datasources to compute the metric
data = [sum(x) for x in zip(data_one, data_two)]
return

为了并行化第一阶段(数据源),我写了一些简单的东西,如下所示:

def run_thread(datasource):
datasource.refresh()
threads = []
for datasource in dir(datasources):
thread = threading.Thread(target=run_thread, args=(datasource))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()

这有效,之后我可以计算任何指标并填充datasources.x.data属性

为了并行化第二阶段(指标),因为它对 I/O 的依赖更少,而更多地依赖于 CPU,我觉得简单的线程实际上不会加快速度,我需要多处理模块来利用多个内核。我写了以下内容:

def run_pool(calculate):
calculate()
pool = multiprocessing.Pool()
pool.map(run_pool, [m.calculate for m in dir(metrics)]
pool.close()
pool.join()

这段代码运行了几秒钟(所以我认为它正在工作?),但是当我尝试时:

metrics.metric_one.data

它返回[],就像模块从未运行过

一样不知何故,通过使用多处理模块,它似乎正在限定线程的范围,以便它们不再共享数据属性。我应该如何重写它,以便我可以并行计算每个指标,利用多个内核,但在完成后仍然可以访问数据?

再次更新,每条评论:由于您使用的是 2.7,并且您正在处理模块而不是对象,因此您在挑选所需内容时遇到问题。 解决方法并不漂亮。 它涉及将每个模块的名称传递给您的操作功能。 我更新了partial部分,并更新了删除with语法。

几件事:

首先,一般来说,多核比线程更好。 使用线程时,您始终面临处理全局解释器锁的风险,这可能效率极低。 如果您使用多核,这将不再是问题。

其次,您有正确的概念,但是通过拥有全局到模块的数据成员,您使它变得奇怪。 让您的源返回您感兴趣的数据,并使您的指标(和输出)将数据列表作为输入并输出结果列表。

这会将您的伪代码变成这样的东西:

app.py:

import datasources
import metrics
import outputs
pool = multiprocessing.Pool()
data_list = pool.map(lambda o: o.refresh, list(dir(datasources)))
pool.close()
pool.join()
pool = multiprocessing.Pool()
metrics_funcs = [(m, data_list) for m in dir(metrics)]
metrics_list = pool.map(lambda m: m[0].calculate(m[1]), metrics_funcs)
pool.close()
pool.join()
pool = multiprocessing.Pool()
output_funcs = [(o, data_list, metrics_list) for o in dir(outputs)]
output_list = pool.map(lambda o: o[0].dump(o[1], o[2]), output_funcs)
pool.close()
pool.join()

执行此操作后,数据源将如下所示

def refresh():
# Populate the "data" member somehow
return [1, 2, 3]

您的指标如下所示:

def calculate(data_list):
# Use the datasources to compute the metric
return [sum(x) for x in zip(data_list)]

最后,您的输出可能如下所示:

def dump(data_list, metrics_list):
# do whatever; you now have all the information

删除"全局"数据并传递它会使每个部分都更清晰(并且更容易测试)。 这突出了使每件作品完全独立。 如您所见,我所做的只是更改传递给map的列表中的内容,在这种情况下,我通过将它们作为元组传递并在函数中解压缩来注入所有以前的计算。 当然,您不必使用 lambda。 您可以单独定义每个函数,但实际上没有太多要定义的函数。 但是,如果确实定义了每个函数,则可以使用分部函数来减少传递的参数数。 我经常使用这种模式,在更复杂的代码中,您可能需要这样做。 下面是一个例子:

from functools import partial
do_dump(module_name, data_list, metrics_list):
globals()[module_name].dump(data_list, metrics_list)
invoke = partial(do_dump, data_list=data_list, metrics_list=metrics_list)
with multiprocessing.Pool() as pool:
output_list = pool.map(invoke, [o.__name__ for o in dir(outputs)])
pool.close()
pool.join()

更新,每条评论:

当您使用 map 时,您可以保证输入的顺序与输出的顺序相匹配,即data_list[i]是运行dir(datasources)[i].refresh()的输出。 我不会将数据源模块导入指标,而是将以下更改更改为app.py

data_list = ...
pool.close()
pool.join()
data_map = {name: data_list[i] for i, name in enumerate(dir(datasources))}

然后将data_map传递到每个指标中。 然后,指标通过名称获取所需的数据,例如

d1 = data_map['data_one']
d2 = data_map['data_two']
return [sum(x) for x in zip([d1, d2])]

ProcessThread在python中的行为完全不同。如果要使用多处理,则需要使用同步数据类型来传递信息。

例如,您可以使用multiprocessing.Array,它可以在进程之间共享。

有关详细信息,请参阅文档:https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes

相关内容

  • 没有找到相关文章

最新更新