1 回答
TA贡献1851条经验 获得超3个赞
您可以执行类似的操作,但只需 1 个管道,并在转换中添加一些附加代码。
应该beam.Map(lambda x: somefunction)
有两个输出:一个被写入 GCS,一个被拒绝的元素最终将被写入 BigQuery。
为此,您的转换函数必须返回一个TaggedOutput
.
第二个PCollection
,然后您可以写入 BigQuery。
您不需要Create
在管道的第二个分支中有一个。
管道将是这样的:
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(transform) # Tagged output produced here
pcoll_to_gcs = data.gcs_output
pcoll_to_bq = data.rejected
pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
pcoll_to_bq | "to bq" >> beam.io.WriteToBigQuery(....)
那么transform函数会是这样的
def transform(element):
if something_is_wrong_with_element:
yield pvalue.TaggedOutput('rejected', element)
transformed_element = ....
yield pvalue.TaggedOutput('gcs_output', transformed_element)
添加回答
举报