3 回答

TA贡献1830条经验 获得超9个赞
如果您在 a 内部工作coroutine并希望在不阻塞的情况下运行不同的查询,event_loop那么您可以使用run_in_executor基本上在后台线程中运行查询而不阻塞循环的函数。这是如何使用它的一个很好的例子。
确保这正是您所需要的;为在 Python API 中运行查询而创建的作业已经是异步的,它们仅在您调用job.result(). 这意味着asyncio除非您在协程内,否则您不需要使用。
这是在作业完成后立即检索结果的快速示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq
client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'
threads = []
results = []
executor = ThreadPoolExecutor(5)
for job in [client.query(query1), client.query(query2)]:
threads.append(executor.submit(job.result))
# Here you can run any code you like. The interpreter is free
for future in as_completed(threads):
results.append(list(future.result()))
results 将:
[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]

TA贡献1801条经验 获得超16个赞
事实上,由于该asyncio.create_task()函数,我找到了一种方法可以很容易地将我的查询包装在一个 asyinc 调用中。我只需要将它包装job.result()在一个协程中;这是实现。它现在异步运行。
class BQApi(object):
def __init__(self):
self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])
async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
job = self.api.query(query, **kwargs)
task = asyncio.create_task(self.coroutine_job(job))
return await task
@staticmethod
async def coroutine_job(job):
return job.result()
添加回答
举报