我们可以等到以前的作业在 Python 或 pyspark 中完成吗?



我正在尝试以并行方式运行作业,但问题是某些作业依赖于其他作业。如果我同时触发它们,那么依赖作业将失败。因此,我认为依赖工作将等到父母的工作完全完成。有什么办法可以做到这一点吗?

例:

Job       Job_Type
A         independent
B         independent
C         A
D         B

在上面的例子中,你可以看到作业A和B是独立的,所以它们将同时运行,但C和D分别依赖于A和B。所以 C 将在 A 完成后立即开始,D 将在 B 完成后立即开始。我们可以做这样的过程吗?

在我的代码中,我是基于序列号的类别-

Job       Job_Type      seq_num
A         independent    1
B         independent    1
C         A              2
D         B              2

将触发第一个 seq_num = 1,seq_num 1 中的所有作业将并行运行。完成后 1 seq_num 2 将开始。因此,假设作业 A 需要 10 分钟。B 需要 15 分钟,所以 2 seq_num将在 15 分钟后开始,但我希望过程 作业 C 将立即开始完成作业 A 意味着 10 分钟后作业 C 将开始,15 分钟后作业 D 将开始。请帮助我实现此逻辑。

我正在使用的代码:

def parallel_Execution():

logging.info("parallel_Execution..................[started]")
par_temp_loc = '/medaff/Temp/'
'''Reading the metadata file and creating as a dataframe'''
df = pd.read_csv(par_temp_loc+'metadata_file_imedical.txt', delimiter='|',error_bad_lines=False)
uni_master_job = df['Master Job Name'].unique().tolist()
print(uni_master_job)
'''getting unique execution sequence'''
logging.info("Getting the unique Execution Sequence Number!")
unique_exec_seq = df['Execution Sequence'].unique().tolist()
unique_exec_seq.sort()
print(unique_exec_seq)
num_unique_seq = len(unique_exec_seq)
logging.info("Total Number of unique sequence Number : %2d" %(num_unique_seq))

p2 = ThreadWithReturnValue(target = partial(parallel_temp2, unique_exec_seq, df ))
p2.start()
r2 = p2.join()
print(r2)
#r1 = r1.append(r2)
mail_df(r2)
'''Parallel Processing Function'''
def parallel_temp2(unique_exec_seq, df):
list_df = []
df_main4 = pd.DataFrame()
for exec_seq in unique_exec_seq:
seq_num = exec_seq
temp_df = df[df['Execution Sequence'] == exec_seq].copy()
unique_master_job = temp_df['Master Job Name'].unique().tolist()
print(unique_master_job)
#logging.info("%s Master Job Started." %(unique_master_job))
if(len(unique_master_job)>0):

num_processes = len(unique_master_job)
pool = ThreadPool(processes=num_processes)
result1 = pool.map(partial(parallel_view_creation, exec_seq, temp_df), unique_master_job)
pool.close()
pool.join()
df_main = pd.DataFrame(result1)
#print("printing df_main")
#print(df_main)

for m_job in df_main.master_job.unique():
temp_df1 = df_main[df_main['master_job'] == m_job]
status = temp_df1.status.unique()[0]
if(status == 0):
unique_master_job.remove(m_job)
pool = ThreadPool(processes=num_processes)
result2 = pool.map(partial(parallel_build_query, exec_seq, temp_df), unique_master_job)
pool.close()
pool.join()
if(result2):                
df_main2 = pd.DataFrame(result2)
df_main3 = pd.concat([df_main,df_main2], sort = False)
status_df_list = df_main3['status'].unique().tolist()
print(status_df_list)
if(0 in status_df_list):
break
if(0 in status_df_list):
break
else:
df_main4 = df_main4.append(df_main3)
if(0 in status_df_list):
df_main4 = df_main4.append(df_main3)

return df_main4

如果您需要任何信息,请告诉我。

python中的线程不是数据的并行处理,因为python有一个称为GIL的概念,它最终仅按顺序执行操作。

一种更好的编码方法是

  1. 分离您的所有四个要求。
  2. 下载并安装气流
  3. 使用 DAG 创建美观且易于维护的工作流。

这也可用于标准化您的流程。

相关内容

  • 没有找到相关文章