为了账号安全,请及时绑定邮箱和手机立即绑定

Python - 如何 - Big Query 异步任务

Python - 如何 - Big Query 异步任务

慕沐林林 2021-07-30 14:15:34
这可能是一个愚蠢的问题,但我似乎无法异步运行 python google-clood-bigquery。我的目标是同时运行多个查询并等待所有asyncio.wait()查询在查询收集器中完成。我正在使用asyncio.create_tast()来启动查询。问题是每个查询在开始之前都等待前一个查询完成。这是我的查询功能(很简单):async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:  job = self.api.query(query, **kwargs)  return job.result()既然我不能等待job.result(),我应该等待别的东西吗?
查看完整描述

3 回答

?
慕标琳琳

TA贡献1830条经验 获得超9个赞

如果您在 a 内部工作coroutine并希望在不阻塞的情况下运行不同的查询,event_loop那么您可以使用run_in_executor基本上在后台线程中运行查询而不阻塞循环的函数。这是如何使用它的一个很好的例子。


确保这正是您所需要的;为在 Python API 中运行查询而创建的作业已经是异步的,它们仅在您调用job.result(). 这意味着asyncio除非您在协程内,否则您不需要使用。


这是在作业完成后立即检索结果的快速示例:


from concurrent.futures import ThreadPoolExecutor, as_completed

import google.cloud.bigquery as bq



client = bq.Client.from_service_account_json('path/to/key.json')

query1 = 'SELECT 1'

query2 = 'SELECT 2'


threads = []

results = []


executor = ThreadPoolExecutor(5)


for job in [client.query(query1), client.query(query2)]:

    threads.append(executor.submit(job.result))


# Here you can run any code you like. The interpreter is free


for future in as_completed(threads):

    results.append(list(future.result()))

results 将:


[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]


查看完整回答
反对 回复 2021-08-03
?
侃侃尔雅

TA贡献1801条经验 获得超16个赞

事实上,由于该asyncio.create_task()函数,我找到了一种方法可以很容易地将我的查询包装在一个 asyinc 调用中。我只需要将它包装job.result()在一个协程中;这是实现。它现在异步运行。


class BQApi(object):                                                                                                 

    def __init__(self):                                                                                              

        self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])                               


    async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:                                       

        job = self.api.query(query, **kwargs)                                                                        

        task = asyncio.create_task(self.coroutine_job(job))                                                          

        return await task                                                                                            


    @staticmethod                                                                                                    

    async def coroutine_job(job):                                                                                    

        return job.result()   


查看完整回答
反对 回复 2021-08-03
  • 3 回答
  • 0 关注
  • 108 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号