为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用 While 循环执行 Airflow 运算符

如何使用 While 循环执行 Airflow 运算符

呼啦一阵风 2021-06-01 21:21:50
要求:使用 while 循环为每个日期运行 SQL 查询。例如:开始日期选择为 8 月 25 日,结束日期选择为 8 月 28 日。然后 BigQueryOperator 首先运行 8 月 25 日,然后是 8 月 26 日,依此类推,直到我们到达 28 日。问题:在下面的 DAG 中,它只执行开始日期的查询,然后完成作业。它甚至不会执行/迭代 BigQueryOperator 到下一个日期等等。from airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.contrib.operators.bigquery_operator import BigQueryOperatorfrom datetime import date, datetime, timedeltaimport datetimedefault_args = {    'owner': 'airflow',    'start_date': datetime.datetime(2018, 8, 31),    'email': ['xyz@xyz.com'],    'email_on_failure': True,    'retries': 1,    'retry_delay': timedelta(minutes=10),    'depends_on_past': False}dag = DAG('his_temp',default_args=default_args,schedule_interval=None)date1 = datetime.date(2018, 8, 25)date2 = datetime.date(2018, 8, 28)day = datetime.timedelta(days=1)while date1 <= date2:    parameter = {        'dataset': "projectname.finance",        'historical_date': date1.strftime('%Y%m%d')    }    sqlpartition = BigQueryOperator(    task_id='execute_sqlpartition',    use_legacy_sql=False,    write_disposition='WRITE_TRUNCATE',    allow_large_results=True,    bql="sqlqueries/sqlpartition.sql",    destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'),    params=parameter,    bigquery_conn_id='bigquery',    dag=dag)    print "data loaded for "+ parameter.get('historical_date')    date1 = date1 + day   
查看完整描述

2 回答

?
慕姐4208626

TA贡献1852条经验 获得超7个赞

Airflow scheduler 的整个概念是它会调度任务,你只需要正确配置它。难怪它会在提到的开始日期运行一次,因为将选择 dag 开始日期,并且由于没有安排每日任务,它将运行一次并停止。您必须在 dag 级别而不是操作员级别进行配置。



查看完整回答
反对 回复 2021-06-08
?
互换的青春

TA贡献1797条经验 获得超6个赞

您可以在依赖项的末尾添加自触发运算符。类似于以下内容:


def trigger_check(context, dag_run_obj):

    if date1 <= date2:

        return dag_run_obj

trigger = TriggerDagRunOperator(

    task_id="test_trigger_dagrun",

    trigger_dag_id="his_temp",

    python_callable=trigger_check,

    ... more arguments

)

op1 >> op2 >> ... >> trigger

第一次触发它后,它会循环遍历日期,直到达到 date2 阈值。您必须更加小心地通过将其设为有序的 PythonOperator 或类似的东西来更新日期


查看完整回答
反对 回复 2021-06-08
  • 2 回答
  • 0 关注
  • 263 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信