复杂对象的多处理远程管理器



我想在N个进程之间共享一个熊猫数据帧。 我尝试使用多处理库实现此工作。 我使用带有命名空间的远程同步管理器。 如果我以几秒钟的间隔并行执行此代码两次,则第二个 Namespace 对象没有 df_parquet 属性。

我想知道如何获得第一次执行的df_parquet实例化。

import time
from multiprocessing import Process, Lock
from multiprocessing.managers import SyncManager
from threading import Thread
import pandas as pd
class ParquetDataframe(object):
def __init__(self,manager,global_val):
self.manager = manager
self.Global = global_val
if not hasattr(self.Global, 'df_parquet'):
print("Global.df_parquet - Init")
lock = self.manager.Lock()
lock.acquire()
data = [['tom', 10], ['nick', 15], ['juli', 14]]
df = pd.DataFrame(data, columns=['Name', 'Age'])
self.Global.df_parquet = df
lock.release()
print("Global.df_parquet : "+str(self.Global.df_parquet))
def get_value(self):
return self.Global.df_parquet
def func(parquet):
for i in range(50):
time.sleep(1)
parquet.get_value()
def serve(manager):
print("Manager creation - Begin")
manager.get_server().serve_forever()
print("Manager creation - End")
class Manager(SyncManager):
pass
if __name__ == '__main__':
print("Initialisation step - Begin")
Global = None
manager = Manager(address=('127.0.0.1', 50000),authkey=bytes("12345", encoding='utf8'))
print("Initialisation step - End")
try:
print("Try connect to the manager")
manager.connect()
Global = manager.Namespace()
except:
print("If exception we create a manager")
background_thread = Thread(target=serve, args=(manager,))
background_thread.start()
manager.connect()
print("Get the Namespace")
Global = manager.Namespace()
parquet = ParquetDataframe(manager,Global)
parquet = ParquetDataframe(manager, Global)
#procs = [Process(target=func, args=(parquet,)) for i in range(10)]
#for p in procs: p.start()
#for p in procs: p.join()

我们最终决定在这个库中使用 POSIX 进程间信号量

最新更新