我有10个函数,它们都查询DB并返回DF。
我不能一个接一个地执行它们,因为我在最后加入了它们,如果时间戳不匹配,我会得到null值。当我查询大量数据时,这需要时间,所以我想并行运行它。
def df1(domain,durarion):
do something
return df
def df2(domain,durarion):
do something
return df
def df3(domain,durarion):
do something
return df
def df4(domain,durarion):
do something
return df
def df5(domain,durarion):
do something
return df
def df6(domain,durarion):
do something
return df
def df7(domain,durarion):
do something
return df
def df8(domain,durarion):
do something
return df
def final_df(domain,duration):
df = pd.concat([df1(domain,duration),
df2(domain,duration),
df3(domain,duration),
df4(domain,duration),
df5(domain,duration),
df6(domain,duration),
df7(domain,duration),
df8(domain,duration)
],axis=1,sort=False).reset_index()
df = df.set_index('time')
return df
df = final_df(domain,duration)
我想并行调用final_df
函数中的所有8个函数df1, df2, df3, df4, df5, df6, df7, df8
。
p.S:-我熟悉多处理,但我只是不想并行运行它们,也不想保存它的结果。
我能够解决它:-
import pandas as pd
import numpy as np
import threading
import time
def df1(domain,durarion):
do something
return df
def df2(domain,durarion):
do something
return df
def df3(domain,durarion):
do something
return df
def df4(domain,durarion):
do something
return df
def df5(domain,durarion):
do something
return df
def df6(domain,durarion):
do something
return df
def df7(domain,durarion):
do something
return df
def df8(domain,durarion):
do something
return df
def df9(domain,durarion):
do something
return df
def df10(domain,durarion):
do something
return df
def df11(domain,durarion):
do something
return df
def getdf(domain,duration):
with concurrent.futures.ThreadPoolExecutor() as executor:
f1 = executor.submit(df1, domain,duration)
f2 = executor.submit(df2, domain,duration)
f3 = executor.submit(df3, domain,duration)
f4 = executor.submit(df4, domain,duration)
f5 = executor.submit(df5, domain,duration)
f6 = executor.submit(df6, domain,duration)
f7 = executor.submit(df7, domain,duration)
f8 = executor.submit(df8, domain,duration)
f9 = executor.submit(df9, domain,duration)
f10 = executor.submit(df10, domain,duration)
f11 = executor.submit(df11, domain,duration)
df = pd.concat([f1.result(),f2.result(),f3.result(),f4.result(),f5.result(),f6.result(),f7.result(),f8.result(),f9.result(),f10.result(),f11.result()],axis=1,sort=False).reset_index()
df['time'] = pd.to_datetime(df['time'])
df = df.set_index('time')
return df
df = getdf(domain, duration)