我正在使用BigQuery python API以及用于 Pandas的BigQuery 连接器。每次append访问 BigQuery 中的数据集时,我都想确保从上次离开的位置开始,以防止数据重复和丢失。Load Job Config或其他地方是否有设置可以自动执行此操作?如果没有,您建议我如何处理连接错误并上传 reties,同时防止数据重复?我知道我可以查询最后一行并根据它附加数据,但我不想查询,因为 BigQuery 会收取查询费用。以下是我目前上传到 BigQuery 的内容:import pandas as pdfrom google.cloud import bigquery, exceptionstest_df = pd.DataFrame({ 'num_legs': [2, 4, 8, 0], 'num_wings': [2, 0, 0, 0], 'num_specimen_seen': [10, 2, 1, 8], 'names': ['falcon', 'dog', 'spider', 'fish']})project = "test-project"dataset_id = "test-dataset"table_id = "test-table"client = bigquery.Client()try: dataset_ref = client.dataset(dataset_id=dataset_id, project=project) dataset = client.get_dataset(dataset_ref)except exceptions.NotFound: print("specified dataset not found! -- creating a new dataset.") dataset = client.create_dataset(dataset_id)table_ref = dataset.table(table_id)load_job = client.load_table_from_dataframe( dataframe=test_df, destination=table_ref, project=project)load_job.result()
1 回答
慕莱坞森
TA贡献1810条经验 获得超4个赞
假设您可以在源数据上生成唯一的 load-batch-id。
实现目标的一种方法是:
使用 load_batch_id 标记您的加载作业
Client.load_table_from_dataframe有说法:job_config(google.cloud.bigquery.job LoadJobConfig,可选)
将 load_batch_id 注入LoadJobConfig.labels (Dict[str, str] – 作业标签。)
当您需要确认加载作业是否成功时,使用job.list api 搜索带有标签 load_batch_id 的作业。
一种简化的情况是,您每天仅将数据加载到 BQ,并且您的源数据可按日期分组。然后,您的 20190325 python 脚本首先检查标记为 20190324(或更深入过去)的作业,以查看它是否需要重试。
添加回答
举报
0/150
提交
取消