我创建了几个自定义类来转换数据帧,如下所示(版本简化(:
class class1():
def _init_(self, a):
self.dataframe = a
def __getitem__(self, key):
return self.dataframe[key]
def transform(self):
self.dataframe = (some dataframe computations)
return self.dataframe
class class2():
def _init_(self, b):
self.dataframe = b
def __getitem__(self, key):
return self.dataframe[key]
def transform(self, arg0):
self.dataframe = (some dataframe computations)
return self.dataframe
我现在要做的是并行执行这两个类方法。 我想它应该是这样的:
import multiprocessing
df1 = class1(a)
pr1 = multiprocessing.Process(target=df1.transform)
df2 = class2(b)
pr1 = multiprocessing.Process(target=df2.transform, kwargs={'arg0' : x})
pr1.start()
pr2.start()
pr1.join()
pr2.join()
但是,在我执行此代码后,并执行
print df1.dataframe
print df2.dataframe
我观察到根本没有进行任何转换。(尽管根据计算时间,当调用pr1.start((和pr2.start((时,某些东西确实很高兴(
有谁知道可能是什么原因?解决方案是什么?
非常感谢:)
P.s下一步将是"合并 df1.dataframe 和 df2.dataframe"。所以我也想知道我是否需要另一个函数fun_wait_until_all_process_finished((然后做 pd.merge((df1.dataframe,df2.dataframe((
在我看来,使用线程模块是解决此问题的最佳方法。
为了便于使用,您可以使用装饰器:
import threading
def threaded(fn):
def wrapper(*args, **kwargs):
threading.Thread(target=fn, args=args, kwargs=kwargs).start()
return wrapper
定义后,将装饰器添加到要线程化的函数/类方法中,如下所示:
@threaded
def transform(self):
self.dataframe = (some dataframe computations)
return self.dataframe
然后只需调用函数,就会自动创建一个线程:
c1 = class1(a)
c2 = class2(b)
c1.transform()
c2.transform()