多处理映射的替代方法,它不会存储函数的返回值



我使用multiprocessing.imap_unordered同时运行一个函数,但我的RAM使用量不断增加。

问题如下:我有数百万个数据组合(使用itertools.product创建(需要传递给函数。然后,该函数将使用SVM来计算分数,然后存储分数和当前组合此函数不会返回任何值它只会计算一个分数并将其存储在共享值中。我不需要所有其他的组合,只需要最好的。

通过使用imap_unordered,RAM的使用量一直在增加,直到它因缺少RAM而崩溃。我想发生这种情况是因为imap将存储函数的结果,不会返回任何值,但可能会保留NoneNull值?

下面是一个示例代码:

from functools import partial
import itertools
import multiprocessing
import time

def svm(input_data, params):
# Copy the data to avoid changing the original data
# as input_data is a reference to a pandas dataframe
# and I need to shift columns up and down.
dataset = input_data.copy()
# Use svm here to analyse data
score = sum(dataset) + sum(params)  # simulate score of svm
# Simulate a process that takes a bit of time
time.sleep(0.5)
return (score, params)

if __name__ == "__main__":

# Without this, multiprocessing gives error
multiprocessing.freeze_support()
# Set the number of worker processes
# Empty for all the cores
# Int for number of processes
pool = multiprocessing.Pool()
# iterable settings
total_combinations = 2
total_features = 45
# Keep track of best score
best_score = -1000
best_param = [0 for _ in range(total_features)]
input_data = [x * x for x in range(10000)]
# Create a partial function with the necessary args
func = partial(svm, input_data)
params = itertools.product(range(total_combinations), repeat=total_features)
# Calculate scores concurrently
# As the iterable is in the order of millions, this value
# will get continuously large until it uses all available
# memory as the map stores the results, that in this case
# it's not needed.
for score, param in pool.imap_unordered(func, iterable=params, chunksize=100):
if score > best_score:
best_score = score
best_param = param
# Wait for all the processes to terminate their tasks
pool.close()
pool.join()
print(best_score)
print(best_param)

在本例中,您会注意到RAM的使用量会随着时间的推移而增加。尽管在这种情况下它并不多,但如果您单独呆一天或其他时间(通过增加可迭代的范围(,它将达到GB的RAM。正如我所说,我有数百万种组合。

我应该如何解决此问题?有没有imap的替代方案根本不会存储任何关于函数的信息?我应该只创建Processes而不是使用Pool吗?可能是因为我正在复制数据集,后来垃圾收集器没有清理它吗?

您可以使用apply或apply_async

我已经跟踪了import objgraph和打印objgraph.show_most_common_types(limit=20)的内存使用情况。我注意到元组和列表的数量在子进程的持续时间内不断增加。为了解决这个问题,我更改了Pool中的maxtasksperchild,以便在一段时间后强制关闭进程,从而释放内存。

from functools import partial
import itertools
import multiprocessing
import random
import time
# Tracing memory leaks
import objgraph

def svm(input_data, params):
# Copy the data to avoid changing the original data
# as input_data is a reference to a pandas dataframe.
dataset = input_data.copy()
# Use svm here to analyse data
score = sum(dataset) + sum(params)  # simulate score of svm
# Simulate a process that takes a bit of time
time.sleep(0.5)
return (score, params)

if __name__ == "__main__":
# iterable settings
total_combinations = 2
total_features = 12
# Keep track of best score
best_score = -1000
best_param = [0 for _ in range(total_features)]
# Simulate a dataframe with random data
input_data = [random.random() for _ in range(100000)]
# Create a partial function with the necessary args
func = partial(svm, input_data)
params = itertools.product(range(total_combinations), repeat=total_features)
# Without this, multiprocessing gives error
multiprocessing.freeze_support()
# Set the number of worker processes
# Empty for all the cores
# Int for number of processes
with multiprocessing.Pool(maxtasksperchild=5) as pool:
# Calculate scores concurrently
# As the iterable is in the order of millions, this value
# will get continuously large until it uses all available
# memory as the map stores the results, that in this case
# it's not needed.
for score, param in pool.imap_unordered(func, iterable=params, chunksize=10):
if score > best_score:
best_score = score
best_param = param
# print(best_score)
# Count the number of objects in the memory
# If the number of objects keep increasing, it's a memory leak
print(objgraph.show_most_common_types(limit=20))
# Wait for all the processes to terminate their tasks
pool.close()
pool.join()
print(best_score)
print(best_param)

相关内容

  • 没有找到相关文章

最新更新