为了账号安全,请及时绑定邮箱和手机立即绑定

在 Apache Airflow 中保存算子的结果

在 Apache Airflow 中保存算子的结果

弑天下 2021-09-28 13:26:57
几个运算符允许提取数据,但我从未设法使用结果。例如:https : //github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_get_data.py该运算符可以按如下方式调用:get_data = BigQueryGetDataOperator(      task_id='get_data_from_bq',      dataset_id='test_dataset',      table_id='Transaction_partitions',      max_results='100',      selected_fields='DATE',      bigquery_conn_id='airflow-service-account'      )然而,get_data 是 DAG 类型,但第 116 行说“返回 table_data”。需要明确的是,操作员工作并检索数据,我只是不明白如何使用数据检索/数据所在的位置。如何使用上面的“get_data”获取数据?
查看完整描述

2 回答

?
当年话下

TA贡献1890条经验 获得超9个赞

您将get_data在下一个任务中使用的方式可以是一个PythonOperator,然后您可以使用它来处理数据。


get_data = BigQueryGetDataOperator(

      task_id='get_data_from_bq',

      dataset_id='test_dataset',

      table_id='Transaction_partitions',

      max_results='100',

      selected_fields='DATE',

      bigquery_conn_id='airflow-service-account'

      )


def process_data_from_bq(**kwargs):

      ti = kwargs['ti']

      bq_data = ti.xcom_pull(task_ids='get_data_from_bq')

      # Now bq_data here would have your data in Python list

      print(bq_data)


process_data = PythonOperator(

      task_id='process_data_from_bq',

      python_callable=process_bq_data,

      provide_context=True

      )


get_data >> process_data

查看完整回答
反对 回复 2021-09-28
?
Qyouu

TA贡献1786条经验 获得超11个赞

返回值保存在Xcom 中。您可以从另一个操作员访问它,如示例所示。

data = ti.xcom_pull(task_ids='get_data_from_bq')


查看完整回答
反对 回复 2021-09-28
  • 2 回答
  • 0 关注
  • 159 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信