实例属性不会使用多处理



我有一个问题,即不保留属性更改,甚至保留创建的新属性。我认为我已经将其范围缩小到了我的脚本利用多处理的事实,并且我认为当脚本返回主线程时,在单独的过程线程中发生的更改不会"记住"。

基本上,我有几组数据需要并行处理。数据存储为属性,并通过类中的几种方法更改。在处理结束时,我希望返回主线程并从每个对象实例中加入数据。但是,如上所述,当我尝试访问并行处理位后使用数据的实例属性时,那里什么也没有。好像在多处理位期间实施的任何更改被"忘记"。

是否有一个明显的解决方案可以解决此问题?还是我需要重建代码以返回处理后的数据,而不仅仅是将其作为实例属性更改/存储?我想另一种解决方案是将数据序列化,然后在必要时重新阅读,而不仅仅是将其保存在内存中。

可能值得注意的是,我使用的是pathos模块而不是Python的multiprocessing模块。我遇到了一些与腌制有关的错误,类似于这里:python多处理腌制:无法腌制'type'function'>。我的代码在几个模块上被打破,如前所述,数据处理方法包含在类中。

对不起文字墙。

编辑这是我的代码:

import importlib
import pandas as pd
from pathos.helpers import mp
from provider import Provider
# list of data providers ... length is arbitrary
operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']

# create provider objects for each operating provider
provider_obj_list = []
for name in operating_providers:
    loc     = 'providers.%s' % name
    module  = importlib.import_module(loc)
    provider_obj = Provider(module)
    provider_obj_list.append(provider_obj)
processes = []
for instance in provider_obj_list:
    process = mp.Process(target = instance.data_processing_func)
    process.daemon = True
    process.start()
    processes.append(process)
for process in processes:
    process.join()
# now that data_processing_func is complete for each set of data, 
# stack all the data
stack = pd.concat((instance.data for instance in provider_obj_list))

我有许多模块(operating_providers中列出的名称),其中包含特定于其数据源的属性。这些模块是迭代导入的,并传递给了我在单独的模块(provider)中创建的提供商类的新实例。我将每个提供商实例附加到列表(provider_obj_list),然后迭代创建单独的进程,以调用实例方法instance.data_processing_func。此功能可以执行一些数据处理(每个实例都访问了完全不同的数据文件),并在途中创建新的实例属性,当并行处理完成时,我需要访问这些属性。

我尝试使用多线程,而不是使用多线程 - 在这种情况下,我的实例属性持续存在,这就是我想要的。但是,我不确定为什么会发生这种情况 - 我必须研究线程与多处理之间的差异。

感谢您的任何帮助!

这是一些示例代码,显示了如何做我在评论中概述的事情。我无法测试它,因为我没有安装providerpathos,但是它应该让您对我的建议一个很好的了解。

import importlib
from pathos.helpers import mp
from provider import Provider
def process_data(loc):
    module  = importlib.import_module(loc)
    provider_obj = Provider(module)
    provider_obj.data_processing_func()

if __name__ == '__main__':
    # list of data providers ... length is arbitrary
    operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']
    # create list of provider locations for each operating provider
    provider_loc_list = []
    for name in operating_providers:
        loc = 'providers.%s' % name
        provider_loc_list.append(loc)
    processes = []
    for loc in provider_loc_list:
        process = mp.Process(target=process_data, args=(loc,))
        process.daemon = True
        process.start()
        processes.append(process)
    for process in processes:
        process.join()

相关内容

  • 没有找到相关文章

最新更新