如何在 Python 中并行化函数



我有一个令人尴尬的并行问题,但一直想知道如何"设计"功能以使其达到最终结果

所以,这是顺序版本

def train_weights(Xtr, ztr, Xte, zte):
    regr = some_model()
    regr.fit(Xtr, ztr)
    error = np.mean((regr.predict(Xte) - zte) ** 2)
    return regr.coef_, error
rnge = range(z_train.shape[0])
weights = []
errors = []
for i in rnge:
    z_dim_tr = z_train[:,i]
    z_dim_te = z_test[:, i]
    weight, error = train_weights(X_train, z_dim_tr, X_test, z_dim_te)
    weights.append(wgts)
    errors.append(error)

所以,我只是从矩阵(训练和测试矩阵(中切片一列然后将其传递给函数。请注意,输出顺序很重要。权重列表中的权重指数对应于特定的"i",并且错误相同。

如何并行化?

由于这只是一个一般的并行处理问题,因此您可以使用 multiprocessing.dummy 中的Pool

由于我没有您的数据集,因此让我们考虑以下示例。

import multiprocessing
from multiprocessing.dummy import Pool
def test(args):
    a, b = args
    return a
data = [
    (1, 2),
    (2, 3),
    (3, 4),
]
pool = Pool(multiprocessing.cpu_count())
results = pool.map(test, data)
pool.close()
pool.join()
for result in results:
    print(result)

池创建一定数量的工作进程(在本例中为 multiprocessing.cpu_count() (。然后,每个工作线程连续执行一个作业,直到执行完所有作业。换句话说,map()所有作业都已执行后首先返回。

总而言之,上面的示例在调用map()时返回的结果列表,这些结果的顺序与给定的顺序相同。所以最后上面的代码打印12然后3

它可以使用concurrents.futures库轻松实现

下面是示例代码:

from concurrent.futures.thread import ThreadPoolExecutor
MAX_WORKERS = 20
def train_weights(Xtr, ztr, Xte, zte):
    regr = some_model()
    regr.fit(Xtr, ztr)
    error = np.mean((regr.predict(Xte) - zte) ** 2)
    return regr.coef_, error
def work_done(future):
    weights.append(future.result())
rnge = range(z_train.shape[0])
weights = []
errors = []
for i in rnge:
    z_dim_tr = z_train[:, i]
    z_dim_te = z_test[:, i]
    with ThreadPoolExecutor(MAX_WORKERS) as executor:
        executor.submit(train_weights, X_train, X_test, Xte, z_dim_te).add_done_callback(work_done)

在这里,执行器为其提交的每个任务返回 future。 请记住,如果您使用已完成的任务add_done_callback()从线程返回到主线程(这会阻塞您的主线程(,如果您真的想要真正的并行性,那么您应该单独等待未来的对象。 这是代码片段。

futures = []
for i in rnge:
    z_dim_tr = z_train[:, i]
    z_dim_te = z_test[:, i]
    with ThreadPoolExecutor(MAX_WORKERS) as executor:
        futures.append(executor.submit(train_weights, X_train, X_test, Xte, z_dim_te))
wait(futures)
for succeded, failed in futures:
    # work with your result here
    if succeded:
        weights.append(succeded.result())
    if failed:
        errors.append(failed.result())

查看 joblib

https://pythonhosted.org/joblib/parallel.html

Joblib 提供了一个简单的帮助程序类来编写并行 for 循环 使用多处理。核心思想是编写代码 作为生成器表达式执行,并将其转换为并行 计算机科学:

>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

可以使用以下内容分布在 2 个 CPU 上:

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

以下是使用 Ray 并行化代码的一种方法。使用雷的一些优点

  • 大型数据将存储在共享内存中,可由多个工作线程(以只读方式(访问,因此工作线程无需创建自己的数据副本。
  • 相同的代码将在一台计算机或多台计算机上运行。

Ray是一个用于编写并行和分布式Python的库。

import numpy as np
import ray
ray.init()
z_train = np.random.normal(size=(100, 30))
z_test = np.random.normal(size=(50, 30))

@ray.remote(num_return_vals=2)
def train_weights(ztr, zte):
    # Fit model.
    predictions = np.random.normal(size=zte.shape[0])
    error = np.mean((predictions - zte) ** 2)
    coef = np.random.normal()
    return coef, error

weight_ids = []
error_ids = []
for i in range(z_train.shape[1]):
    z_dim_tr = z_train[:, i]
    z_dim_te = z_test[:, i]
    weight_id, error_id = train_weights.remote(z_dim_tr, z_dim_te)
    weight_ids.append(weight_id)
    error_ids.append(error_id)
weights = ray.get(weight_ids)
errors = ray.get(error_ids)

您可以在 Ray 文档中阅读更多内容。请注意,我是 Ray 开发人员之一。

相关内容

  • 没有找到相关文章

最新更新