如何从多进程共享数据到主进程?



这是我现在使用的第二个版本编码(它来自Booboo),它需要大约17分钟返回查询结果,数据可以传输到父进程。

from multiprocessing.pool import Pool
def do_query(query):
conn = cx_Oracle.connect("*", "*", "***", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute(query)
table = cursor.fetchall()
col = [x[0] for x in cursor.description]
return pd.DataFrame(table, columns=col)
if __name__ == '__main__':
print('start:',datetime.datetime.now())
queries = [
"select aaa from AAA",
"select bbb from BBB",
"select ccc from CCC",
"select ddd from DDD", 
]
pool = Pool(len(queries))
dataframes = pool.map(do_query, queries)
pool.close()
pool.join()
print('end:',datetime.datetime.now())

下面的脚本是我的第一个版本编码(原始版本),我用4个子进程来查询,需要4~6min才能完成查询,但没有将查询数据从子进程传输到父进程。但是我可以在子进程中保存这些数据。

def aquery():
conn = cx_Oracle.connect("*", "*", "***", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute("select aaa from AAA")
aaa = cursor.fetchall()
col = [x[0] for x in cursor.description]
df_aaa = pd.DataFrame(aaa, columns=col)

def bquery():
conn = cx_Oracle.connect("*", "*", "***", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute("select bbb from BBB")
bbb = cursor.fetchall()
col = [x[0] for x in cursor.description]
df_bbb = pd.DataFrame(bbb, columns=col)

if __name__ == '__main__':
print('start:', datetime.datetime.now())
aaa_process = multiprocessing.Process(target=aaa)
bbb_process = multiprocessing.Process(target=bbb)
aaa_process.start()
bbb_process.start()
aaa_process.join()
bbb_process.join()
print('end:', datetime.datetime.now())

所以我想知道是否第二个版本的编码花费更多的时间只是因为它需要时间将数据从子进程传输到父进程?

您的示例,我认为(至少我希望)只是简化了您真正想要完成的内容,所呈现的内容无法实现任何性能改进,因为您有效地执行任何多处理。这是因为尽管主进程创建了子进程,但它会立即阻塞,直到子进程完成并返回其结果,因此没有并行计算发生。你所做的只是通过创建一个新进程和必须将数据从一个地址空间传输到另一个地址空间来增加额外的开销。

因此,例如,多处理/多线程的实际用例涉及必须执行多个查询的情况,每个查询都需要从结果中生成一个数据帧。如果我们要分析在这种情况下所涉及的内容,那么就是查询本身,然后是数据框架的创建。由于在创建新进程和跨进程地址空间返回数据时存在上述开销,因此我的方法是为查询使用多线程,这应该可以很好地工作,因为线程之间争夺全局解释器锁的问题不应该出现,因为查询任务将主要处于网络等待状态。然后,您可以将查询结果返回给主线程(现在不需要进程间传输),并让主线程执行从返回的数据创建数据框的CPU工作,或者更简单地使用相同的工作函数从结果构建数据框并返回完成的数据框。在后一种情况下,取决于pandas是否释放了全局解释器锁,在构建数据框架时可能无法实现任何级别的并行处理,但您不会比让主进程构建数据框架做得更差。

pandas不释放全局解释器锁的最坏情况下,您将不会做更多的cpu密集型工作,即并行创建数据帧,而多处理将建议自己作为解决方案。但是,只有当每个任务的CPU处理量足够大,通过并行计算获得的收益抵消了额外开销所损失的收益时,多处理的开销才值得。我不相信在这种特殊情况下会是这样。

下面是使用多线程池的示例代码,在这种情况下,您有两个查询来创建两个数据帧:

from multiprocessing.pool import ThreadPool
def do_query(query):
conn = cx_Oracle.connect("*", "*","*", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute(query)
table = cursor.fetchall()
col = [x[0] for x in cursor.description] 
return pd.DataFrame(table, columns=col)

if __name__ == '__main__':
queries = [
"select * from I where I.DATE between sysdate - 4 and sysdate - 3",
"select * from I where I.DATE between sysdate - 5 and sysdate - 4"
]
pool = ThreadPool(len(queries))
dataframes = pool.map(do_query, queries)
pool.close()
pool.join()

目前还不清楚上述是否会提高运行两个连续查询的串行代码的性能;这取决于查询本身的复杂度(运行时间)。

相关内容

  • 没有找到相关文章

最新更新