我有一个令人尴尬的并行问题,但一直想知道如何"设计"功能以使其达到最终结果
所以,这是顺序版本
def train_weights(Xtr, ztr, Xte, zte):
regr = some_model()
regr.fit(Xtr, ztr)
error = np.mean((regr.predict(Xte) - zte) ** 2)
return regr.coef_, error
rnge = range(z_train.shape[0])
weights = []
errors = []
for i in rnge:
z_dim_tr = z_train[:,i]
z_dim_te = z_test[:, i]
weight, error = train_weights(X_train, z_dim_tr, X_test, z_dim_te)
weights.append(wgts)
errors.append(error)
所以,我只是从矩阵(训练和测试矩阵(中切片一列然后将其传递给函数。请注意,输出顺序很重要。权重列表中的权重指数对应于特定的"i",并且错误相同。
如何并行化?
由于这只是一个一般的并行处理问题,因此您可以使用 multiprocessing.dummy
中的Pool
。
由于我没有您的数据集,因此让我们考虑以下示例。
import multiprocessing
from multiprocessing.dummy import Pool
def test(args):
a, b = args
return a
data = [
(1, 2),
(2, 3),
(3, 4),
]
pool = Pool(multiprocessing.cpu_count())
results = pool.map(test, data)
pool.close()
pool.join()
for result in results:
print(result)
池创建一定数量的工作进程(在本例中为 multiprocessing.cpu_count()
(。然后,每个工作线程连续执行一个作业,直到执行完所有作业。换句话说,map()
所有作业都已执行后首先返回。
总而言之,上面的示例在调用map()
时返回的结果列表,这些结果的顺序与给定的顺序相同。所以最后上面的代码打印1
,2
然后3
。
它可以使用concurrents.futures库轻松实现
下面是示例代码:
from concurrent.futures.thread import ThreadPoolExecutor
MAX_WORKERS = 20
def train_weights(Xtr, ztr, Xte, zte):
regr = some_model()
regr.fit(Xtr, ztr)
error = np.mean((regr.predict(Xte) - zte) ** 2)
return regr.coef_, error
def work_done(future):
weights.append(future.result())
rnge = range(z_train.shape[0])
weights = []
errors = []
for i in rnge:
z_dim_tr = z_train[:, i]
z_dim_te = z_test[:, i]
with ThreadPoolExecutor(MAX_WORKERS) as executor:
executor.submit(train_weights, X_train, X_test, Xte, z_dim_te).add_done_callback(work_done)
在这里,执行器为其提交的每个任务返回 future。 请记住,如果您使用已完成的任务add_done_callback()
从线程返回到主线程(这会阻塞您的主线程(,如果您真的想要真正的并行性,那么您应该单独等待未来的对象。 这是代码片段。
futures = []
for i in rnge:
z_dim_tr = z_train[:, i]
z_dim_te = z_test[:, i]
with ThreadPoolExecutor(MAX_WORKERS) as executor:
futures.append(executor.submit(train_weights, X_train, X_test, Xte, z_dim_te))
wait(futures)
for succeded, failed in futures:
# work with your result here
if succeded:
weights.append(succeded.result())
if failed:
errors.append(failed.result())
查看 joblib
https://pythonhosted.org/joblib/parallel.html
Joblib 提供了一个简单的帮助程序类来编写并行 for 循环 使用多处理。核心思想是编写代码 作为生成器表达式执行,并将其转换为并行 计算机科学:
>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
可以使用以下内容分布在 2 个 CPU 上:
>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
以下是使用 Ray 并行化代码的一种方法。使用雷的一些优点
- 大型数据将存储在共享内存中,可由多个工作线程(以只读方式(访问,因此工作线程无需创建自己的数据副本。
- 相同的代码将在一台计算机或多台计算机上运行。
Ray是一个用于编写并行和分布式Python的库。
import numpy as np
import ray
ray.init()
z_train = np.random.normal(size=(100, 30))
z_test = np.random.normal(size=(50, 30))
@ray.remote(num_return_vals=2)
def train_weights(ztr, zte):
# Fit model.
predictions = np.random.normal(size=zte.shape[0])
error = np.mean((predictions - zte) ** 2)
coef = np.random.normal()
return coef, error
weight_ids = []
error_ids = []
for i in range(z_train.shape[1]):
z_dim_tr = z_train[:, i]
z_dim_te = z_test[:, i]
weight_id, error_id = train_weights.remote(z_dim_tr, z_dim_te)
weight_ids.append(weight_id)
error_ids.append(error_id)
weights = ray.get(weight_ids)
errors = ray.get(error_ids)
您可以在 Ray 文档中阅读更多内容。请注意,我是 Ray 开发人员之一。