我想汇总许多表记录计数,我希望它并行运行以节省时间。
list_tabl_cn =['TBL_A', 'TBL_B', 'TBL_C', 'TBL_D']
def tblRowCn(p_tbl):
#connDb = pyodbc.connect(f'DSN={nama_db_target}', autocommit =True)
connDb = sqlanydb.connect(uid='dba',
pwd='sql',
host='ip:port',
dbn='blah')
is_tableExists = ego.my_desc(p_tbl,163).shape[0]
if is_tableExists:
proc_name = 'df_'+p_tbl
if p_tbl == 'STG_CFG_SYS':
Q_ = """
SELECT OPENDATE as TGL_POS, COUNT(1) CN FROM {0}
GROUP BY TGL_POS
""".format(p_tbl)
else:
Q_ = """
SELECT TANGGAL_POSISI as TGL_POS, COUNT(1) CN FROM {0}
GROUP BY TGL_POS
""".format(p_tbl)
df_tbl = pd.read_sql_query(Q_, connDb, parse_dates=['TGL_POS'])
df_tbl['THN'],df_tbl['BLN']= df_tbl['TGL_POS'].dt.year, df_tbl['TGL_POS'].dt.month
else:
df_tbl=[]
return df_tbl
def task(table_nm):
print(f"Task Executed with process {mp.current_process().pid}")
tblRowCn(table_nm.upper())
def main():
executor = mp.Pool(mp.cpu_count()-8)
executor.map(task, [n_table_nm for n_table_nm in list_tabl_cn])
executor.close()
if __name__ == "__main__":
main()
可能是这样的
def main():
executor = mp.Pool(mp.cpu_count()-8)
executor.map(task, [n_table_nm for n_table_nm in list_tabl_cn])
append.[task1, task2, task...]
executor.close()
我的整个数据帧都在append.[task1, task2, task...]
我相信我在代码中遗漏了一些东西,但它太模糊了。
如果所有Dataframes
都具有相同的列,并且您希望将所有数据帧的所有行添加到一个dataframe
中,则可以使用 pandasconcat
函数。 将所有单独的数据帧添加到列表中,然后连接所有数据帧以创建主数据帧。
list_of_df =[]
for df in executor.map(task, list_tabl_cn):
if df:
list_of_df.append(df)
main_df = pd.concat(list_of_df)
您可以在tblRowCn
方法中删除 else 条件,它是多余的且不需要的。
在你的代码中,你从list_tabl_cn
生成了一个列表,以将其传递给map函数,你不必这样做,你可以像上面的代码一样直接给映射函数list_tabl_cn
。