为了账号安全,请及时绑定邮箱和手机立即绑定

气流:为每个文件运行 DAG 的正确方法

气流:为每个文件运行 DAG 的正确方法

有只小跳蛙 2022-07-19 15:18:18
我有以下任务要解决:文件不定期通过端点发送并存储在本地。我需要为这些文件中的每一个触发 DAG 运行。对于每个文件,将执行相同的任务总体而言,流程如下所示:对于每个文件,运行任务 A->B->C->D正在批量处理文件。虽然这项任务对我来说似乎微不足道,但我已经找到了几种方法来做到这一点,我对哪一种是“正确的”(如果有的话)感到困惑。第一种模式:使用实验性 REST API 触发 dag。也就是说,公开一个 Web 服务,它摄取请求和文件,将其存储到文件夹中,并使用实验性 REST api通过将 file_id 作为 conf 来触发 DAG缺点:REST api 仍处于实验阶段,不确定 Airflow 如何处理同时出现许多请求的负载测试(这不应该发生,但是,如果发生了怎么办?)第二种模式:2 dags。一个用 TriggerDagOperator 感知和触发,一个进程。始终使用与之前描述的相同的 ws,但这次它只是存储文件。然后我们有:第一个 dag:使用 FileSensor 和 TriggerDagOperator 来触发 N 个给定 N 个文件的 dag第二天:任务A->B->C缺点:需要避免将相同的文件发送到两个不同的 DAG 运行。例子:文件夹 x.json 中的文件 传感器找到 x,触发 DAG (1)传感器返回并再次安排。如果 DAG (1) 未处理/移动文件,则传感器 DAG 可能会重新安排使用相同文件运行的新 DAG。这是不需要的。第三种模式:对于文件中的文件,任务 A->B->C正如在这个问题中看到的那样。缺点:这可能有效,但我不喜欢 UI 可能会变得混乱,因为每次 DAG 运行看起来都不一样,但它会随着正在处理的文件数量而改变。此外,如果要处理 1000 个文件,则运行可能会非常难以阅读第四种模式:使用 subdags我还不确定它们是如何完全工作的,因为我看到 它们不被鼓励(最后),但是应该可以为每个文件生成一个 subdag 并让它运行。类似于这个问题。缺点:似乎 subdags 只能与顺序执行器一起使用。我是否遗漏了什么并且过度思考了一些应该(在我看来)非常直截了当的事情?谢谢
查看完整描述

3 回答

?
GCT1015

TA贡献1827条经验 获得超4个赞

我知道我迟到了,但我会选择第二种模式:“2 dags。一个用 TriggerDagOperator 感知和触发,一个进程”,因为:

  • 每个文件都可以并行执行

  • 第一个 DAG 可以选择要处理的文件,重命名它(添加后缀 '_processing' 或将其移动到处理文件夹)

  • 如果我是贵公司的新开发人员,我打开工作流,我想了解工作流的逻辑是什么,而不是上次处理了哪些文件是动态构建的

  • 如果 dag 2 发现文件存在问题,则将其重命名(使用 '_error' 后缀或将其移动到错误文件夹)

  • 这是一种处理文件的标准方法,无需创建任何额外的运算符

  • 它使 de DAG 幂等且更易于测试。本文中的更多信息

重命名和/或移动文件是在每个 ETL 中处理文件的一种非常标准的方法。

顺便说一句,我总是推荐这篇文章https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753。它没有


查看完整回答
反对 回复 2022-07-19
?
四季花海

TA贡献1811条经验 获得超5个赞

似乎您应该能够使用 bash 运算符运行批处理器 dag 来清除文件夹,只需确保depends_on_past=True在 dag 上进行设置以确保在下次安排 dag 之前成功清除文件夹。



查看完整回答
反对 回复 2022-07-19
?
呼唤远方

TA贡献1856条经验 获得超11个赞

我发现这篇文章:https ://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13

其中使用了一个新的运算符,即 TriggerMultiDagRunOperator。我认为这符合我的需要。


查看完整回答
反对 回复 2022-07-19
  • 3 回答
  • 0 关注
  • 78 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信