多处理平均查询为熊猫 df 然后将 df 合并为单个 df



我想汇总许多表记录计数,我希望它并行运行以节省时间

list_tabl_cn =['TBL_A', 'TBL_B', 'TBL_C', 'TBL_D']
def tblRowCn(p_tbl):
#connDb = pyodbc.connect(f'DSN={nama_db_target}', autocommit =True)
connDb = sqlanydb.connect(uid='dba', 
pwd='sql',
host='ip:port', 
dbn='blah')    
is_tableExists = ego.my_desc(p_tbl,163).shape[0] 
if is_tableExists:
proc_name = 'df_'+p_tbl
if p_tbl == 'STG_CFG_SYS':
Q_ = """
SELECT OPENDATE as TGL_POS, COUNT(1) CN FROM {0}
GROUP BY TGL_POS
""".format(p_tbl)
else:
Q_ = """
SELECT TANGGAL_POSISI as TGL_POS, COUNT(1) CN FROM {0}
GROUP BY TGL_POS
""".format(p_tbl)
df_tbl = pd.read_sql_query(Q_, connDb, parse_dates=['TGL_POS'])
df_tbl['THN'],df_tbl['BLN']= df_tbl['TGL_POS'].dt.year, df_tbl['TGL_POS'].dt.month    
else:
df_tbl=[]
return df_tbl
def task(table_nm):
print(f"Task Executed with process {mp.current_process().pid}")
tblRowCn(table_nm.upper())
def main():
executor = mp.Pool(mp.cpu_count()-8)
executor.map(task, [n_table_nm for n_table_nm in list_tabl_cn])
executor.close()
if __name__ == "__main__":
main()  

可能是这样的

def main():
executor = mp.Pool(mp.cpu_count()-8)
executor.map(task, [n_table_nm for n_table_nm in list_tabl_cn])
append.[task1, task2, task...]   
executor.close()

我的整个数据帧都在append.[task1, task2, task...]

我相信我在代码中遗漏了一些东西,但它太模糊了。

如果所有Dataframes都具有相同的列,并且您希望将所有数据帧的所有行添加到一个dataframe中,则可以使用 pandasconcat函数。 将所有单独的数据帧添加到列表中,然后连接所有数据帧以创建主数据帧。

list_of_df =[] 
for df in executor.map(task, list_tabl_cn):
if df:
list_of_df.append(df)
main_df = pd.concat(list_of_df)

您可以在tblRowCn方法中删除 else 条件,它是多余的且不需要的。

在你的代码中,你从list_tabl_cn生成了一个列表,以将其传递给map函数,你不必这样做,你可以像上面的代码一样直接给映射函数list_tabl_cn

相关内容

  • 没有找到相关文章

最新更新