为了账号安全,请及时绑定邮箱和手机立即绑定

Dataflow/apache beam - 传入模式时如何访问当前文件名?

Dataflow/apache beam - 传入模式时如何访问当前文件名?

BIG阳 2021-08-24 13:32:45
我在堆栈溢出之前看到过这个问题的回答(https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow),但不是因为 apache beam 为 python 添加了可拆分的 dofn 功能。将文件模式传递给 gcs 存储桶时,如何访问正在处理的当前文件的文件名?我想将文件名传递到我的转换函数中:with beam.Pipeline(options=pipeline_options) as p:                                  lines = p | ReadFromText('gs://url to file')                                            data = (                                                                            lines                                                                           | 'Jsonify' >> beam.Map(jsonify)                                                | 'Unnest' >> beam.FlatMap(unnest)                                              | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                              'project_id:dataset_id.table_name', schema=schema,                                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)               )                                                   最终,我想要做的是在转换 json 的每一行时将文件名传递到我的转换函数中(请参阅此内容,然后使用文件名在不同的 BQ 表中进行查找以获取值)。我想一旦我设法知道如何获取文件名,我将能够找出侧面输入部分,以便在 bq 表中进行查找并获得唯一值。
查看完整描述

2 回答

  • 2 回答
  • 0 关注
  • 118 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信