1 回答
TA贡献2041条经验 获得超4个赞
您可以查看以前的执行日期(prev_ds宏),并将其与当前的执行日期(比较ds在宏)BranchPythonOperator。例子:
start = DummyOperator(task_id='start_task', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
once = DummyOperator(task_id='once_task', dag=dag)
dummy_task_id_that_does_nothing = DummyOperator(task_id='dummy_task_id_that_does_nothing', dag=dag)
def check_if_task_already_ran(**context):
ds = context.get('ds')
prev_ds = context.get('prev_ds')
print(context)
print(ds)
print(prev_ds)
if prev_ds == ds:
return 'dummy_task_id_that_does_nothing' #task_id
else:
return 'once_task' # Task that would just be executed once in a day
compare_ds = BranchPythonOperator(
task_id='compare_ds',
provide_context=True,
python_callable=check_if_task_already_ran,
dag=dag)
start >> compare_ds
compare_ds >> once >> end
compare_ds >> dummy_task_id_that_does_nothing >> end
添加回答
举报