为了账号安全,请及时绑定邮箱和手机立即绑定

Dataflow BigQuery 插入作业因大数据集而立即失败

Dataflow BigQuery 插入作业因大数据集而立即失败

倚天杖 2022-01-18 16:37:45
我使用梁 python 库设计了一个梁/数据流管道。管道大致执行以下操作:ParDo:从 API 收集 JSON 数据ParDo:转换 JSON 数据I/O:将转换后的数据写入 BigQuery 表一般来说,代码会做它应该做的事情。但是,当从 API 收集一个大数据集(大约 500.000 个 JSON 文件)时,bigquery 插入作业在使用 DataflowRunner(它与在我的计算机)。使用较小的数据集时,一切正常。数据流日志如下:2019-04-22 (00:41:29) Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the...Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the bq tool: "bq show -j --project_id=X dataflow_job_14675275193414385105". 2019-04-22 (00:41:29) Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /Wr...Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /WriteToBigQuery/NativeWrite failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: beamapp-X-04212005-04211305-sf4k-harness-lqjg,beamapp-X-04212005-04211305-sf4k-harness-lgg2,beamapp-X-04212005-04211305-sf4k-harness-qn55,beamapp-X-04212005-04211305-sf4k-harness-hcsn按照建议使用 bq cli 工具来获取有关 BQ 加载作业的更多信息不起作用。找不到工作(我怀疑它是否已经创建,因为即时失败)。我想我遇到了某种配额/bq 限制,甚至是内存不足问题(请参阅:https ://beam.apache.org/documentation/io/built-in/google-bigquery/ )限制 BigQueryIO 目前有以下限制。您无法使用 >您的管道的其他步骤对 BigQuery 写入的完成进行排序。如果您使用的是 Beam SDK for Python,如果您编写了一个非常大的数据集,您可能会遇到导入大小配额>问题。作为一种解决方法,您可以对数据集进行分区(例如,使用 Beam 的分区转换)并写入多个 BigQuery 表。Beam SDK for Java 没有这个>限制,因为它会为您划分数据集。对于如何缩小此问题的根本原因,我将不胜感激。我还想尝试分区 Fn,但没有找到任何 python 源代码示例如何将分区 pcollection 写入 BigQuery 表。
查看完整描述

2 回答

?
宝慕林4294392

TA贡献2021条经验 获得超8个赞

可能有助于调试的一件事是查看 Stackdriver 日志。

如果您在 Google控制台中打开 Dataflow 作业并单击LOGS图形面板右上角的 ,则应该会打开底部的日志面板。LOGS 面板的右上角有一个指向 Stackdriver 的链接。这将为您提供有关您的工人/洗牌/等的大量日志信息。对于这个特定的工作。

其中有很多内容,并且很难过滤掉相关的内容,但希望您能够找到比A work item was attempted 4 times without success. 例如,每个工作人员偶尔会记录它正在使用的内存量,这可以与每个工作人员拥有的内存量(基于机器类型)进行比较,以查看它们是否确实内存不足,或者您的错误是否正在发生别处。

祝你好运!


查看完整回答
反对 回复 2022-01-18
?
富国沪深

TA贡献1790条经验 获得超9个赞

据我所知,在 Cloud Dataflow 和 Apache Beam 的 Python SDK 中没有可用的选项来诊断 OOM(Java SDK 可以)。我建议您在Cloud Dataflow 问题跟踪器中打开功能请求,以获取有关此类问题的更多详细信息。


除了检查 Dataflow 作业日志文件之外,我建议您使用Stackdriver Monitoring 工具监控您的管道,该工具提供每个作业的资源使用情况(作为总内存使用时间)。


关于 Python SDK 中的 Partition 函数使用,以下代码(基于 Apache Beam文档中提供的示例)将数据拆分为 3 个 BigQuery 加载作业:


def partition_fn(input_data, num_partitions):

      return int(get_percentile(lines) * num_partitions / 100)


    partition = input_data | beam.Partition(partition_fn, 3)


    for x in range(3):

      partition[x] | 'WritePartition %s' % x >> beam.io.WriteToBigQuery(

        table_spec,

        schema=table_schema,

        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,

        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)


查看完整回答
反对 回复 2022-01-18
  • 2 回答
  • 0 关注
  • 172 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信