Python:多处理中的全局对象锁定



考虑对由某个类对象的元素组成的巨大数据运行多处理。数据在内存中作为全局变量,每个并行运行的函数都以数据索引为参数,访问全局数据对象,读取该索引中的文本,运行计算并返回结果,而不修改全局对象。

import concurrent
import numpy as np
data_size = 1_000_000
class DataClass:
def __init__(self, text):
self.text = text
def process_text(dataset_idx):
return dataset[dataset_idx].text.lower()
dataset = [DataClass('SOME TEXT') for _ in range(data_size)]
dataset_indices_to_process = range(data_size)
results = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for result in executor.map(process_text, dataset_indices_to_process ):
results.append(result)

由于全局对象在其中一个子流程的读取过程中被锁定,这是否会带来任何开销?将这种对全局数据的只读访问并行化的最佳方式是什么?

由于您只是在读取数据,因此可以使用以下代码。但是池中的每个进程都有自己的dataset列表副本。即使您的平台使用fork方法来创建继承变量的新进程,这也是正确的,因为每个进程都将增加继承列表的引用计数,这将导致生成新的副本:

data_size = 1_000_000
class DataClass:
def __init__(self, text):
self.text = text
def process_text(dataset_idx):
return dataset[dataset_idx].text.lower()
# We don't need a list comprehension:
dataset = [DataClass('SOME TEXT')] * data_size
# Required if the platform is Windows:
if __name__ == '__main__':
import concurrent.futures # Not: import concurrent
dataset_indices_to_process = range(data_size)
with concurrent.futures.ProcessPoolExecutor() as executor:
# No need to explicitly loop:
results = list(executor.map(process_text, dataset_indices_to_process))

理想情况下,您将拥有由主进程在共享内存中创建的字符串列表的单个副本。不幸的是,与例如整数不同,列表中包含的字符串引用在其他进程中无效。我相信,你能做的最好的事情就是使用管理的列表。请注意,列表上的每个操作类似于对另一个进程的远程方法调用,因此与对"进程"的访问相比执行得更慢;常规的";字典:

class DataClass:
def __init__(self, text):
self.text = text
def initialize_pool_processes(arr):
global dataset
dataset = arr
def process_text(dataset_idx):
return dataset[dataset_idx].text.lower()
# Required if the platform is Windows:
if __name__ == '__main__':
import concurrent.futures # Not: import concurrent
from multiprocessing import Manager
data_size = 1_000_000
dataset_indices_to_process = range(data_size)
dataset = Manager().list([DataClass('SOME TEXT')] * data_size)
with concurrent.futures.ProcessPoolExecutor(initializer=initialize_pool_processes, initargs=(dataset,)) as executor:
# No need to explicitly loop:
results = list(executor.map(process_text, dataset_indices_to_process))

相关内容

  • 没有找到相关文章

最新更新