要求:使用 while 循环为每个日期运行 SQL 查询。例如:开始日期选择为 8 月 25 日,结束日期选择为 8 月 28 日。然后 BigQueryOperator 首先运行 8 月 25 日,然后是 8 月 26 日,依此类推,直到我们到达 28 日。问题:在下面的 DAG 中,它只执行开始日期的查询,然后完成作业。它甚至不会执行/迭代 BigQueryOperator 到下一个日期等等。from airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.contrib.operators.bigquery_operator import BigQueryOperatorfrom datetime import date, datetime, timedeltaimport datetimedefault_args = { 'owner': 'airflow', 'start_date': datetime.datetime(2018, 8, 31), 'email': ['xyz@xyz.com'], 'email_on_failure': True, 'retries': 1, 'retry_delay': timedelta(minutes=10), 'depends_on_past': False}dag = DAG('his_temp',default_args=default_args,schedule_interval=None)date1 = datetime.date(2018, 8, 25)date2 = datetime.date(2018, 8, 28)day = datetime.timedelta(days=1)while date1 <= date2: parameter = { 'dataset': "projectname.finance", 'historical_date': date1.strftime('%Y%m%d') } sqlpartition = BigQueryOperator( task_id='execute_sqlpartition', use_legacy_sql=False, write_disposition='WRITE_TRUNCATE', allow_large_results=True, bql="sqlqueries/sqlpartition.sql", destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'), params=parameter, bigquery_conn_id='bigquery', dag=dag) print "data loaded for "+ parameter.get('historical_date') date1 = date1 + day
2 回答
慕姐4208626
TA贡献1852条经验 获得超7个赞
Airflow scheduler 的整个概念是它会调度任务,你只需要正确配置它。难怪它会在提到的开始日期运行一次,因为将选择 dag 开始日期,并且由于没有安排每日任务,它将运行一次并停止。您必须在 dag 级别而不是操作员级别进行配置。
互换的青春
TA贡献1797条经验 获得超6个赞
您可以在依赖项的末尾添加自触发运算符。类似于以下内容:
def trigger_check(context, dag_run_obj):
if date1 <= date2:
return dag_run_obj
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
trigger_dag_id="his_temp",
python_callable=trigger_check,
... more arguments
)
op1 >> op2 >> ... >> trigger
第一次触发它后,它会循环遍历日期,直到达到 date2 阈值。您必须更加小心地通过将其设为有序的 PythonOperator 或类似的东西来更新日期
添加回答
举报
0/150
提交
取消