为了账号安全,请及时绑定邮箱和手机立即绑定

通过相关管道处理 Dataflow/Apache Beam 中的拒绝

通过相关管道处理 Dataflow/Apache Beam 中的拒绝

繁花不似锦 2023-08-22 15:03:49
我有一个从 BigQuery 获取数据并将其写入 GCS 的管道,但是,如果我发现任何拒绝,我想将它们纠正到 Bigquery 表。我将拒绝收集到全局列表变量中,然后将该列表加载到 BigQuery 表中。当我在本地运行该过程时,该过程运行良好,因为管道以正确的顺序运行。当我使用 dataflowrunner 运行它时,它不能保证顺序(我希望 pipeline1 在 pipeline2 之前运行。有没有办法使用 python 在 Dataflow 中拥有依赖的管道?或者还请建议是否可以用更好的方法解决这个问题。提前致谢。with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:     data = (pipeline1               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))               | 'combine output to list' >> beam.combiners.ToList()               | 'tranform' >> beam.Map(lambda x: somefunction)  # Collecting rejects in the except block of this method to a global list variable               ....etc               | 'to gcs' >> beam.io.WriteToText(output)               )# Loading the rejects gathered in the above pipeline to Biquerywith beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:    rejects = (pipeline2                    | 'create pipeline' >> beam.Create(reject_list)                    | 'to json format' >> beam.Map(lambda data: {.....})                    | 'to bq' >> beam.io.WriteToBigQuery(....)                    )
查看完整描述

1 回答

?
皈依舞

TA贡献1851条经验 获得超3个赞

您可以执行类似的操作,但只需 1 个管道,并在转换中添加一些附加代码。

应该beam.Map(lambda x: somefunction)有两个输出:一个被写入 GCS,一个被拒绝的元素最终将被写入 BigQuery。

为此,您的转换函数必须返回一个TaggedOutput.

第二个PCollection,然后您可以写入 BigQuery。

您不需要Create在管道的第二个分支中有一个。

管道将是这样的:

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:

 

    data = (pipeline1

               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))

               | 'combine output to list' >> beam.combiners.ToList()

               | 'tranform' >> beam.Map(transform)  # Tagged output produced here


    pcoll_to_gcs = data.gcs_output

    pcoll_to_bq  = data.rejected


    pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)

    pcoll_to_bq  | "to bq" >> beam.io.WriteToBigQuery(....)

那么transform函数会是这样的


def transform(element):

  if something_is_wrong_with_element:

    yield pvalue.TaggedOutput('rejected', element)


  transformed_element = ....


  yield pvalue.TaggedOutput('gcs_output', transformed_element)


查看完整回答
反对 回复 2023-08-22
  • 1 回答
  • 0 关注
  • 1588 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信