为了账号安全,请及时绑定邮箱和手机立即绑定

Google Cloud Composer DAG 没有被触发

Google Cloud Composer DAG 没有被触发

慕雪6442864 2023-05-16 14:58:15
我计划从今天 2020/08/11 开始在东部标准时间 (NY) 周二至周六凌晨 04:00 运行 DAG。编写代码并部署后,我预计 DAG 会被触发。我刷新了我的 Airflow UI 页面几次,但它仍然没有触发。我正在使用带有 python 3 的 Airflow 版本 v1.10.9-composer。这是我的 DAG 代码:"""This DAG executes a retrieval job"""# Required packages to execute DAGfrom __future__ import print_functionimport pendulumfrom airflow.models import DAGfrom airflow.models import Variablefrom datetime import datetime, timedeltafrom airflow.contrib.operators.ssh_operator import SSHOperatorfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.utils.trigger_rule import TriggerRulelocal_tz = pendulum.timezone("America/New_York")# DAG parametersdefault_args = {    'owner': 'Me',    'depends_on_past': False,    'start_date': datetime(2020, 8, 10, 4, tzinfo=local_tz),    'dagrun_timeout': None,    'email': Variable.get('email'),    'email_on_failure': True,    'email_on_retry': False,    'provide_context': True,    'retries': None,    'retry_delay': timedelta(minutes=5)}# create DAG object with Name and default_argswith DAG(        'retrieve_files',        schedule_interval='0 4 * * 2-6',        description='Retrieves files from sftp',        max_active_runs=1,        catchup=True,        default_args=default_args) as dag:    # Define tasks - below are dummy tasks and a task instantiated by SSHOperator- calling methods written in other py class    start_dummy = DummyOperator(        task_id='start',        dag=dag    )    end_dummy = DummyOperator(        task_id='end',        trigger_rule=TriggerRule.NONE_FAILED,        dag=dag    )    retrieve_file = SSHOperator(        ssh_conn_id="my_conn",        task_id='retrieve_file',        command='/usr/bin/python3  /path_to_file/getFile.py',        dag=dag)    dag.doc_md = __doc__    retrieve_file.doc_md = """\    #### Task Documentation    Connects to sftp and retrieves files.    """    start_dummy >> retrieve_file >> end_dummy
查看完整描述

1 回答

?
郎朗坤

TA贡献1921条经验 获得超9个赞

参考官方文档:

调度程序会在开始日期之后的一个 schedule_interval 运行您的作业。

如果您的 start_date 是 2020-01-01 并且 schedule_interval 是@daily,则第一次运行将在 2020-01-02 上创建,即在您的开始日期过后。

为了在每天的特定时间(包括今天)运行 DAG,start_date需要将时间设置为过去的时间,并且schedule_interval需要具有所需的时间格式cron。正确设置昨天的日期时间非常重要,否则触发器将不起作用。

在这种情况下,我们应该将 设置start_date为上一周的星期二,即:(2020, 8, 4)。由于每周运行一次,因此从开始日期起应该有 1 周的间隔。

让我们看一下以下示例,它展示了如何在美国东部标准时间周二至周六凌晨 04:00 运行作业:

from datetime import datetime, timedelta

from airflow import models

import pendulum

from airflow.operators import bash_operator


local_tz = pendulum.timezone("America/New_York")


default_dag_args = {

    'start_date': datetime(2020, 8, 4, 4, tzinfo=local_tz),

    'retries': 0,

}


with models.DAG(

        'Test',

        default_args=default_dag_args,

        schedule_interval='00 04 * * 2-6') as dag:

       # DAG code

    print_dag_run_conf = bash_operator.BashOperator(

        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

我建议您查看start_date 文档的处理方式。



查看完整回答
反对 回复 2023-05-16
  • 1 回答
  • 0 关注
  • 118 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信