我正在尝试找出一种方法来使用多处理包来减少处理我拥有的一些代码所需的时间。
从本质上讲,我有一个使用多个嵌套 for 循环完成的匹配计算,我想充分利用我可用的 12 核处理器。我已经找到了一些关于循环和多处理的文档和答案,但由于某种原因,它似乎没有在我的脑海中点击。无论如何。。。
我有两个大型数据帧,我已将其转换为列表列表,以便能够更轻松地迭代它们。
它们都遵循相同的格式,但具有不同的值 - 因此作为示例,DF/列表如下所示
热电室和电脑断层扫描:
|user_id| hour1_avg | hour2_avg |....| hour24_avg| hour1_stdev | ... | hour24_stdev |
|-------|-----------|-----------|----|-----------|-------------|-----|--------------|
| 12345 | 1.34 | 2.14 |----| 3.24 | .942 |-----| .834 |
| 54321 | 2.14 | 3.10 |----| 6.26 | .826 |-----| .018 |
然后使用.values.to_list()
将其转换为列表列表。
TTL 和 CTL:
[[12345, 1.34, 2.14,...3.24,.942,....834],[54321, 2.14, 3.10,...6.26, .826,....018], [etc]]
我的代码遍历两个列表列表,计算每个小时值的计算,然后如果所有 24 小时都满足if
语句中的条件,则将配对结果吐到pairs
列表中。那些不符合标准的人可能会被踢出去。
pairs = [] #output for for loops
start_time = time.time()
for idx, a in enumerate(ttl): # iterate through primary list of list
if idx % 12 != 0: #used to separate for 12 processors (0-11 to split processes manually)
continue
for b in ctl: # iterate through second list of list
i = 0
tval_avg = [] # used to calculate average between computed variables in the loop
for c in range(1,31): # iterate through hour avg and stdev
i += 1
tval = np.absolute((a[c] - b[c])/np.sqrt((a[c+24]**2/31)+(b[c+24]**2/31)))
if math.isnan(tval) or tval > 2.04:
break
else:
tval_avg.append(tval)
if i == 24: # checks to make sure each hour matches criteria to before being returned
pairs.append([a[0], b[0], a[2], a[3], np.mean(tval_avg)])
if idx % 10 == 0 :
print(idx) # check progress of loop
print("--- %s seconds ---" % (time.time() - start_time)) # show total time at the end
如果我在 spyder 中手动打开 12 个内核并将 0-11 分配给if idx %
语句并运行它们(允许我使用更多处理器(,这将起作用。我的目标是在一个内核中运行所有内容,使用多处理来分配 12 个(或任何有效的(不同"作业"——每个处理器一个,并将结果吐到单个数据帧中。这种类型的代码可以做到这一点吗?如果是这样,我需要进行哪些类型的更改?
抱歉,如果这很复杂。如果需要,我很乐意进一步解释。
我已经在SO周围搜索了与我的特定问题类似的东西,但找不到任何东西。我也很难理解多处理,以及如何将其应用于此特定场景,因此非常感谢任何帮助!
这在我的带有大型 DF 的笔记本上运行不到 1.5 分钟。不过,非多处理变体的速度并不慢。
编辑:实际上,只有当阈值如此之高以至于找不到(或很少(对时,才是正确的。如果您获得许多对,则 ipc 开销非常大,以至于非多处理变体要快得多。至少对我来说。
我通过将过滤器从>2.04
更改为>20
来验证结果,这更适合我创建的统一样本。
我们的两种算法似乎都会生成相同的对列表(一旦我修复了范围并删除了idx % 12
部分(。
顺便说一句,我使用 tqdm 来可视化进度,这是一个非常方便的库。
import math
import pandas as pd
import numpy as np
import tqdm
import multiprocessing
avg_cols = [f"hour{i}_avg" for i in range(1, 25)]
stdev_cols = [f"hour{i}_stdev" for i in range(1, 25)]
columns = ["userid"] + avg_cols + stdev_cols
np.random.seed(23)
# threshod = 2.04
# rands_tt = np.random.rand(3000, 49)
# rands_ct = np.random.rand(112000, 49)
threshold = 20
rands_tt = np.random.rand(2, 49)
rands_ct = np.random.rand(10, 49)
multipliers = np.repeat([1000000, 5, 2], [1, 24, 24])[None, :]
TT = pd.DataFrame(data=rands_tt * multipliers, columns=columns)
CT = pd.DataFrame(data=rands_ct * multipliers, columns=columns)
pairs = []
tt_complete = TT.loc[:, columns].to_numpy()
ct_complete = CT.loc[:, columns].to_numpy()
avg = slice(1, 25)
stdev = slice(25, 49)
# do the **2/31 calculations only once
tt_complete[:, stdev] **= 2
tt_complete[:, stdev] /= 31
ct_complete[:, stdev] **= 2
ct_complete[:, stdev] /= 31
def find_pairs(tt_row):
tvals = np.absolute(
(tt_row[None, avg] - ct_complete[:, avg]) / np.sqrt(tt_row[None, stdev] + ct_complete[:, stdev])
)
# nan will propagate itself as max and when compared to 2.04 will return False
valid_tval_idxs = np.where(tvals.max(axis=1) <= threshold)[0]
mean_tvals = tvals.mean(axis=1)
return [[tt_row[0], ct_complete[i, 0], tt_row[2], tt_row[3], mean_tvals[i]] for i in valid_tval_idxs]
# for tt_row in tqdm.tqdm(tt_complete):
# pairs.extend(find_pairs(tt_row))
with multiprocessing.Pool(6) as pool:
pairlist_iterable = pool.imap_unordered(find_pairs, tt_complete, chunksize=200)
for pairlist in tqdm.tqdm(pairlist_iterable, total=len(tt_complete)):
pairs.extend(pairlist)
ttl = TT.to_numpy().tolist()
ctl = CT.to_numpy().tolist()
pairs2 = [] # output for for loops
for idx, a in enumerate(ttl): # iterate through primary list of list
for b in ctl: # iterate through second list of list
i = 0
tval_avg = [] # used to calculate average between computed variables in the loop
for c in range(1, 25): # iterate through hour avg and stdev
i += 1
tval = np.absolute((a[c] - b[c]) / np.sqrt((a[c + 24] ** 2 / 31) + (b[c + 24] ** 2 / 31)))
if math.isnan(tval) or tval > threshold:
break
else:
tval_avg.append(tval)
if i == 24: # checks to make sure each hour matches criteria to before being returned
pairs2.append([a[0], b[0], a[2], a[3], np.mean(tval_avg)])
print(pairs)
print(pairs2)
print(pairs == pairs2)
输出为
100%|██████████| 2/2 [00:00<00:00, 2150.93it/s]
[[517297.88384658925, 878265.8552092713, 3.8272987969845347, 1.4119792198355636, 6.95265573421445]]
[[517297.88384658925, 878265.8552092713, 3.8272987969845347, 1.4119792198355636, 6.95265573421445]]
True
你的外循环已经过ttl
了。 将该循环主体中的代码移动到帮助程序函数中 接受a
作为输入,并且 返回(tval_avg, pairs)
.
然后使用map
重复调用该帮助程序。
返回元组将被序列化并发送回父进程。 您需要合并单个工蜂的结果 以获得与原始代码计算相同的结果。
或者,您可能更愿意序列化来自帮助程序的结果 到唯一命名的文件中。