在DataflowRunner内部的实现细节方面,很多人可能并不关心是否BigQuerySink使用WriteToBigQuery。但是,在我的情况下,我试图将我的代码部署到带有 RunTimeValueProvider 的参数的数据流模板。支持此行为WriteToBigQuery:class WriteToBigQuery(PTransform):.... table (str, callable, ValueProvider): The ID of the table, or a callable that returns it. The ID must contain only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If dataset argument is :data:`None` then the table argument must contain the entire table reference specified as: ``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one argument representing an element to be written to BigQuery, and return a TableReference, or a string table name as specified above. Multiple destinations are only supported on Batch pipelines at the moment.BigQuerySink不支持它:class BigQuerySink(dataflow_io.NativeSink): table (str): The ID of the table. The ID must contain only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If **dataset** argument is :data:`None` then the table argument must contain the entire table reference specified as: ``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``.更有趣的是,BigQuerySink在代码中从 2.11.0 开始就被弃用了。@deprecated(since='2.11.0', current="WriteToBigQuery")但是在 DataFlowRunner 中,当前的代码和注释似乎完全不符合预期WriteToBigQuery使用 over 的默认类BigQuerySink: def apply_WriteToBigQuery(self, transform, pcoll, options): # Make sure this is the WriteToBigQuery class that we expected, and that # users did not specifically request the new BQ sink by passing experiment # flag.我的问题是双重的:DataflowRunner为什么类和类之间的合同/期望存在差异io.BigQuery?无需等待错误修复,有人对如何强制使用DataflowRunneroverWriteToBigQuery有建议BigQuerySink吗?
1 回答
偶然的你
TA贡献1841条经验 获得超3个赞
转换有两种不同的WriteToBigQuery
写入 BigQuery 的策略:
将插入流式传输到 BigQuery 端点
定期触发文件加载作业(或批处理管道一次)
对于 Python SDK,我们最初只支持流式插入,并且我们有一个只在 Dataflow 上工作的文件加载的 runner-native 实现(这是BigQuerySink
.
对于在 Dataflow 上运行的批处理管道,BigQuerySink
替换为 - 如您正确发现的那样。对于所有其他情况,使用流式插入。
在最新版本的 Beam 中,我们在 SDK 中原生添加了对文件加载的支持 - 实现在BigQueryBatchFileLoads
.
因为我们不想破坏依赖旧行为的用户,所以我们隐藏BigQueryBatchFileLoads
了一个实验标志。(标志是use_beam_bq_sink
)。
所以:
在未来的版本中,我们将自动使用
BigQueryBatchFileLoads
,但目前,您有两个选项可以访问它:直接在您的管道中使用它(例如
input | BigQueryBatchFileLoads(...)
)。通过选项
--experiments use_beam_bq_sink
,同时使用WriteToBigQuery
。
我希望这会有所帮助!
添加回答
举报
0/150
提交
取消