3 回答

TA贡献1799条经验 获得超8个赞
让我们看看发生了什么:
此代码安排三个协程执行并返回Future对象group_task(内部类的实例_GatheringFuture)聚合结果。
group_task = asyncio.gather(
keep_printing("First"),
keep_printing("Second"),
keep_printing("Third")
)
此代码等待未来完成超时。如果发生超时,它会取消未来并引发asyncio.TimeoutError.
try:
await asyncio.wait_for(group_task, 3)
except asyncio.TimeoutError:
print("Time's up!")
发生超时。让我们看看 asyncio 库task.py。wait_for执行以下操作:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
...
await waiter
...
await _cancel_and_wait(fut, loop=loop) # _GatheringFuture.cancel() inside
raise exceptions.TimeoutError()
当我们这样做时_GatheringFuture.cancel(),如果实际上取消了任何子任务,CancelledError则传播
class _GatheringFuture(futures.Future):
...
def cancel(self):
...
for child in self._children:
if child.cancel():
ret = True
if ret:
# If any child tasks were actually cancelled, we should
# propagate the cancellation request regardless of
# *return_exceptions* argument. See issue 32684.
self._cancel_requested = True
return ret
然后
...
if outer._cancel_requested:
# If gather is being cancelled we must propagate the
# cancellation regardless of *return_exceptions* argument.
# See issue 32684.
outer.set_exception(exceptions.CancelledError())
else:
outer.set_result(results)
因此,从收集中提取结果或异常更为正确future
async def main():
group_task = asyncio.gather(
keep_printing("First"),
keep_printing("Second"),
keep_printing("Third")
)
try:
await asyncio.wait_for(group_task, 3)
except asyncio.TimeoutError:
print("Time's up!")
try:
result = await group_task
except asyncio.CancelledError:
print("Gather was cancelled")

TA贡献1829条经验 获得超13个赞
我认为你需要await
放在asyncio.gather
. 因此,此调用取自您的代码:
group_task = asyncio.gather( keep_printing("First"), keep_printing("Second"), keep_printing("Third") )
需要改成:
group_task = await asyncio.gather( keep_printing("First"), keep_printing("Second"), keep_printing("Third") )
不知道为什么,我还在学习这些东西。

TA贡献1803条经验 获得超3个赞
当aw因为超时被取消时,wait_for等待aw被取消。如果将 CancelledError 处理到协程中,则会出现超时错误。这在 3.7 版中发生了变化。
例子
import asyncio
import datetime
async def keep_printing(name):
print(datetime.datetime.now())
try:
await asyncio.sleep(3600)
except asyncio.exceptions.CancelledError:
print("done")
async def main():
try:
await asyncio.wait_for(keep_printing("First"), timeout=3)
except asyncio.exceptions.TimeoutError:
print("timeouted")
if __name__ == "__main__":
asyncio.run(main())
用于从 Task 或 Future 检索结果的 gather 方法,你有一个无限循环并且从不返回任何结果。如果 aws 序列中的任何 Task 或 Future 被取消(wait_for 发生了什么),它会被视为引发了 CancelledError——在这种情况下 gather() 调用不会被取消。这是为了防止取消一个已提交的任务/未来导致其他任务/未来被取消。
对于保护聚集法,你可以将它覆盖到盾牌上。
import asyncio
import datetime
async def keep_printing(name):
while True:
print(name, datetime.datetime.now())
try:
await asyncio.sleep(0.5)
except asyncio.exceptions.CancelledError:
print(f"canceled {name}")
return None
async def main():
group_task = asyncio.shield(asyncio.gather(
keep_printing("First"),
keep_printing("Second"),
keep_printing("Third"))
)
try:
await asyncio.wait_for(group_task, 3)
except asyncio.exceptions.TimeoutError:
print("Done")
if __name__ == "__main__":
asyncio.run(main())
添加回答
举报