为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Pandas 界面恢复上传到 BigQuery

使用 Pandas 界面恢复上传到 BigQuery

婷婷同学_ 2021-12-16 15:52:20
我正在使用BigQuery python API以及用于 Pandas的BigQuery 连接器。每次append访问 BigQuery 中的数据集时,我都想确保从上次离开的位置开始,以防止数据重复和丢失。Load Job Config或其他地方是否有设置可以自动执行此操作?如果没有,您建议我如何处理连接错误并上传 reties,同时防止数据重复?我知道我可以查询最后一行并根据它附加数据,但我不想查询,因为 BigQuery 会收取查询费用。以下是我目前上传到 BigQuery 的内容:import pandas as pdfrom google.cloud import bigquery, exceptionstest_df = pd.DataFrame({    'num_legs': [2, 4, 8, 0],    'num_wings': [2, 0, 0, 0],    'num_specimen_seen': [10, 2, 1, 8],    'names': ['falcon', 'dog', 'spider', 'fish']})project = "test-project"dataset_id = "test-dataset"table_id = "test-table"client = bigquery.Client()try:    dataset_ref = client.dataset(dataset_id=dataset_id, project=project)    dataset = client.get_dataset(dataset_ref)except exceptions.NotFound:    print("specified dataset not found! -- creating a new dataset.")    dataset = client.create_dataset(dataset_id)table_ref = dataset.table(table_id)load_job = client.load_table_from_dataframe(    dataframe=test_df, destination=table_ref, project=project)load_job.result()
查看完整描述

1 回答

?
慕莱坞森

TA贡献1810条经验 获得超4个赞

假设您可以在源数据上生成唯一的 load-batch-id

实现目标的一种方法是:

  1. 使用 load_batch_id 标记您的加载作业

    Client.load_table_from_dataframe有说法:job_config(google.cloud.bigquery.job LoadJobConfig,可选)

    将 load_batch_id 注入LoadJobConfig.labels (Dict[str, str] – 作业标签。)

  2. 当您需要确认加载作业是否成功时,使用job.list api 搜索带有标签 load_batch_id 的作业。

一种简化的情况是,您每天仅将数据加载到 BQ,并且您的源数据可按日期分组。然后,您的 20190325 python 脚本首先检查标记为 20190324(或更深入过去)的作业,以查看它是否需要重试。


查看完整回答
反对 回复 2021-12-16
  • 1 回答
  • 0 关注
  • 141 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信