为了账号安全,请及时绑定邮箱和手机立即绑定

在数据流管道中动态设置bigquery数据集

在数据流管道中动态设置bigquery数据集

至尊宝的传说 2023-10-11 15:40:59
我可以根据在上一个数据流步骤中处理的数据将数据插入到不同的 bigQuery 数据集吗?我正在创建一个数据流管道,它从 PubSub 订阅中读取数据并写入大查询表。其定义如下:def run(argv=None, save_main_session=True):    options: PipelineOptions = PipelineOptions(        project='project-id',        runner='DataflowRunner',        region='region',        streaming=True,        setup_file='dataflow/setup.py',        autoscaling_algorithm='THROUGHPUT_BASED',        job_name='telemetry-processing'    )    with beam.Pipeline(options=options) as p:        status = (                p                 | 'Get Status PubSub' >> beam.io.ReadFromPubSub(            subscription='projects/project-id/subscriptions/subscription-id',            with_attributes=True))        status_records = (status| 'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, nozzle_status_proto.NozzleStatus)) )        status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project- id:dataset-id.table-id')         bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes()))         bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery(        'project-id:dataset-id.backup-table-id')对于给定的输入和输出,它完全按照预期工作。我想要的是,关于 PubSubMessage 中的特定属性,定义我的消息应该发送到哪个数据集。所以我需要改变的部分是:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:dataset-id.table-id')我已经尝试提取所需的数据并像这样使用它:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:{data-from-previous-step}.table-id')但我们无法直接从 PCollection 获取数据。我尝试像这篇文章中那样覆盖 WriteToBigQuery(How can I write to Big Query using a runtime valueprovider in Apache Beam?),但我没有收到错误,也没有插入任何内容。我不知道如何实现这一点。你知道我应该从哪里开始做这件事吗?我是否必须为 n 个数据集创建 n 个管道?
查看完整描述

1 回答

?
婷婷同学_

TA贡献1844条经验 获得超8个赞

WriteToBigQuery 的“table”参数可以是从元素到应写入的表的函数。例如:

status_records | 'Write' >> beam.io.WriteToBigQuery(
  lambda e: 'dataset1.invalid_records' if is_invalid(e) else 'dataset2.good_records')


查看完整回答
反对 回复 2023-10-11
  • 1 回答
  • 0 关注
  • 67 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信