我正在根据序列号以并行方式运行作业。我正在采取每一项工作的状态, 如 - 成功或失败。然后,在获得每个作业的状态后,我正在发送带有每个作业状态的邮件。但是邮件是在整个过程完成后生成的。但我希望如果任何作业失败,该过程将停止在那里,邮件将生成。你能帮我怎么做吗?我正在运行的代码: df_mail_final = pd.DataFrame() df_mail_final1 = pd.DataFrame() '''Getting the status of every job''' for m_job in df_main4.master_job.unique(): list_df = [] dict_mail = OrderedDict() temp_df1 = df_main4[df_main4['master_job'] == m_job].copy() temp_df1['duration'] = pd.to_datetime(temp_df1['end_time'].unique()[-1]) - pd.to_datetime(temp_df1['start_time'].unique()[0]) temp_df1['duration'] = temp_df1['duration'].replace('0 days' ,'') status_list = temp_df1.status.unique() if(0 in status_list): dict_mail['Master Job Name'] = m_job idx = temp_df1['status'] == 0 dict_mail['Execution_Seq'] = temp_df1.loc[idx]["exec_seq"].unique()[0] dict_mail['Start_time'] = temp_df1.loc[idx]["start_time"].unique()[0] dict_mail['End_time'] = temp_df1.loc[idx]["end_time"].unique()[-1] dict_mail['Status'] = 'Failed' dict_mail['Duration'] = temp_df1.loc[idx]["duration"].unique()[-1] dict_mail['Reason'] = temp_df1.loc[idx]["error_msg"].unique()[0] dict_mail['Function_Name'] = temp_df1.loc[idx]["error_func"].unique()[0] list_df.append(dict_mail) df_mail = pd.DataFrame(list_df) if(0 not in status_list): print(m_job) dict_mail['Master Job Name'] = m_job dict_mail['Execution_Seq'] = temp_df1.exec_seq.unique()[0] dict_mail['Start_time'] = temp_df1.start_time.unique()[0] dict_mail['End_time'] = temp_df1.end_time.unique()[-1] dict_mail['Status'] = 'Success'
1 回答
慕标5832272
TA贡献1966条经验 获得超4个赞
分享如何实现此作业的要点/系统设计逻辑
def my_parallel_job(*args, **kwargs):
# do your stuff here
pass
def parallel_job_wrapper(*args, **kwargs):
try:
my_parallel_job(*args, **kwargs)
# if errors following will not run
return "success"
except:
# if errors comes
return "fail"
def main(*args, **kwargs):
# call you parallel jobs from here
p1 = parallel_job_wrapper(*args, **kwargs)
# preferably you are using something like python's multithreading pool methods
在上面的代码中,第二个函数是充当缓冲,以防第一个函数发生任何故障。这可确保即使任何并行作业失败,您也不会停止。main
添加回答
举报
0/150
提交
取消