与 multiprocessing.pool 共享大型数据帧



我有一个函数,我想使用多处理并行计算。该函数接受一个参数,但也从两个已经加载到内存中的非常大的数据帧加载子集(其中一个大约是 1G,另一个刚刚超过 6G(。

largeDF1 = pd.read_csv(directory + 'name1.csv')
largeDF2 = pd.read_csv(directory + 'name2.csv')
def f(x):
load_content1 = largeDF1.loc[largeDF1['FirstRow'] == x]
load_content2 = largeDF1.loc[largeDF1['FirstRow'] == x]
#some computation happens here
new_data.to_csv(directory + 'output.csv', index = False)
def main():
multiprocessing.set_start_method('spawn', force = True)
pool = multiprocessing.Pool(processes = multiprocessing.cpu_count())
input = input_data['col']
pool.map_async(f, input)
pool.close()
pool.join()

问题是文件太大,当我在多个内核上运行它们时,我遇到了内存问题。我想知道是否有一种方法可以在所有进程中共享加载的文件。

我试过管理器((,但无法让它工作。任何帮助,不胜感激。谢谢。

如果您在类 UNIX 系统(默认情况下使用forkstartmethod(上运行它,数据将开箱即用地共享。大多数操作系统对内存页使用写入时复制。因此,即使您多次fork进程,它们也会共享包含数据帧的大部分内存页,只要您不修改这些数据帧。

但是,在使用spawnstart 方法时,每个工作进程都必须加载数据帧。我不确定在这种情况下操作系统是否足够智能以共享内存页面。或者事实上,这些生成的进程都将具有相同的内存布局。

我能想到的唯一便携式解决方案是将数据保留在磁盘上,并在工作线程中使用mmap将其以只读方式映射到内存中。这样,操作系统就会注意到多个进程正在映射同一个文件,并且它只会加载一个副本。

缺点是数据将以磁盘csv格式存储在内存中,这使得从中读取数据(无需复制!(不太方便。因此,您可能希望事先将数据准备为更易于使用的形式。例如,将数据从'FirstRow'转换为floatdouble的二进制文件,您可以使用struct.iter_unpack进行迭代。

下面的函数(来自我的状态行脚本(使用mmap来计算邮箱文件中的邮件数量。

def mail(storage, mboxname):
"""
Report unread mail.
Arguments:
storage: a dict with keys (unread, time, size) from the previous call or an empty dict.
This dict will be *modified* by this function.
mboxname (str): name of the mailbox to read.
Returns: A string to display.
"""
stats = os.stat(mboxname)
if stats.st_size == 0:
return 'Mail: 0'
# When mutt modifies the mailbox, it seems to only change the
# ctime, not the mtime! This is probably releated to how mutt saves the
# file. See also stat(2).
newtime = stats.st_ctime
newsize = stats.st_size
if not storage or newtime > storage['time'] or newsize != storage['size']:
with open(mboxname) as mbox:
with mmap.mmap(mbox.fileno(), 0, prot=mmap.PROT_READ) as mm:
start, total = 0, 1  # First mail is not found; it starts on first line...
while True:
rv = mm.find(b'nnFrom ', start)
if rv == -1:
break
else:
total += 1
start = rv + 7
start, read = 0, 0
while True:
rv = mm.find(b'nStatus: R', start)
if rv == -1:
break
else:
read += 1
start = rv + 10
unread = total - read
# Save values for the next run.
storage['unread'], storage['time'], storage['size'] = unread, newtime, newsize
else:
unread = storage['unread']
return f'Mail: {unread}'

在这种情况下,我使用了mmap因为它比读取文件快 4 倍。查看正常阅读与使用 mmap。

相关内容

  • 没有找到相关文章

最新更新