我正在探索Apache Airflow。我正在使用一种在 MySQL 中插入记录的方法。我已经安排DAG在每 5 分钟后运行一次,但它似乎没有发生,因为 MYSQL 时间戳告诉 MySQL 任务在 5 分钟内被执行多次。如您所见,它正在几分钟内插入记录。下面是我的代码:import datetime as dtfrom airflow import DAGfrom airflow.hooks.mysql_hook import MySqlHookfrom airflow.operators.bash_operator import BashOperatorfrom airflow.operators.python_operator import PythonOperatordef fetch_data_mysql(): mysql_hook = MySqlHook(mysql_conn_id='mysql_default') sql = 'SELECT * from random_table' sql = "INSERT INTO random_table(text) VALUES ('Hi Adnan')" print('INSERT MYSQL RESULT') # results = mysql_hook.get_records(sql) # results = mysql_hook.run(sql, autocommit=True, parameters=('Hi Addu',)) mysql_hook.run(sql, autocommit=True)def print_world(): print('world') return 'WORLD IN SEPTEMBER'default_args = { 'owner': 'me', 'start_date': dt.datetime(2018, 9, 11), 'retries': 1, 'retry_delay': dt.timedelta(minutes=2),}with DAG('airflow_tutorial_v01', default_args=default_args, schedule_interval='0/5 * * * *', ) as dag: print_hello = BashOperator(task_id='print_hello', bash_command='echo "hello"') sleep = BashOperator(task_id='sleep', bash_command='sleep 5') print_world = PythonOperator(task_id='print_world', python_callable=print_world) mysql_task = PythonOperator(task_id='mysql_tut', python_callable=fetch_data_mysql)print_hello >> sleep >> print_world >> mysql_task我正在使用v1.10.0.日志链接在这里:- https://www.dropbox.com/s/f0g64mhi8sgzlvw/my_simple_dag.py.log?dl=0
2 回答
MYYA
TA贡献1868条经验 获得超4个赞
你在回填。如果您检查日志,其执行日期为2018-09-20 00:15:00+00:00、2018-09-20 00:20:00+00:00、2018-09-20 00:25:00+00:00等。
将以下内容添加到您的default_args:
'catchup_by_default': False
你default_args应该看起来像:
default_args = {
'owner': 'me',
'start_date': dt.datetime(2018, 9, 11),
'retries': 1,
'retry_delay': dt.timedelta(minutes=2),
'catchup_by_default': False,
}
添加回答
举报
0/150
提交
取消