1 回答
TA贡献1806条经验 获得超8个赞
您的代码正在创建 2 个 dags,每个表一个,但用第二个覆盖第一个。
我的建议是将 JSON 文件的格式更改为:
{
"2mins": [
"tbl1": ["update_timestamp", "stg"],
"tbl2": ["update_timestamp", "stg"]
],
"10mins": [
"tbl3": ["update_timestamp", "stg"],
"tbl4": ["update_timestamp", "stg"]
]
}
让您的代码迭代计划并为每个表创建所需的任务(您将需要两个循环):
# looping on the schedules to create two dags
for schedule, tables in config.items():
cron_time = '*/10 * * * *'
if schedule== '2mins':
cron_time = '*/20 * * * *'
dag_id = 'upsert_every_{}'.format(schedule)
dag = DAG(
dag_id ,
default_args=default_args,
description='Incremental load - Every 10mins',
schedule_interval=cron_time,
catchup=False,
max_active_runs=1,
doc_md = docs
)
# Looping over the tables to create the tasks for
# each table in the current schedule
for table_name, table_config in tables.items():
max_ts = PythonOperator(
task_id="get_maxts_{}".format(table_name),
python_callable=get_max_ts,
op_kwargs={'tablename':table_name, 'dag': dag},
provide_context=True,
dag=dag
)
export_gcs = PythonOperator(
task_id='export_gcs_{}'.format(table_name),
python_callable=pgexport,
op_kwargs={'tablename':table_name, 'dag': dag},
provide_context=True,
dag=dag
)
stg_load = PythonOperator(
task_id='stg_load_{}'.format(table_name),
python_callable=stg_bqimport,
op_kwargs={'tablename':table_name, 'dag': dag},
provide_context=True,
dag=dag
)
merge = PythonOperator(
task_id='merge_{}'.format(table_name),
python_callable=prd_merge,
op_kwargs={'tablename':table_name, 'dag': dag},
provide_context=True,
dag=dag
)
# Tasks for the same table will be chained
max_ts >> export_gcs >> stg_load >> merge
# DAG is created among the global objects
globals()[dag_id] = dag
添加回答
举报