3 回答
TA贡献1827条经验 获得超4个赞
我知道我迟到了,但我会选择第二种模式:“2 dags。一个用 TriggerDagOperator 感知和触发,一个进程”,因为:
每个文件都可以并行执行
第一个 DAG 可以选择要处理的文件,重命名它(添加后缀 '_processing' 或将其移动到处理文件夹)
如果我是贵公司的新开发人员,我打开工作流,我想了解工作流的逻辑是什么,而不是上次处理了哪些文件是动态构建的
如果 dag 2 发现文件存在问题,则将其重命名(使用 '_error' 后缀或将其移动到错误文件夹)
这是一种处理文件的标准方法,无需创建任何额外的运算符
它使 de DAG 幂等且更易于测试。本文中的更多信息
重命名和/或移动文件是在每个 ETL 中处理文件的一种非常标准的方法。
顺便说一句,我总是推荐这篇文章https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753。它没有
TA贡献1811条经验 获得超5个赞
似乎您应该能够使用 bash 运算符运行批处理器 dag 来清除文件夹,只需确保depends_on_past=True
在 dag 上进行设置以确保在下次安排 dag 之前成功清除文件夹。
TA贡献1856条经验 获得超11个赞
我发现这篇文章:https ://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13
其中使用了一个新的运算符,即 TriggerMultiDagRunOperator。我认为这符合我的需要。
添加回答
举报