为了账号安全,请及时绑定邮箱和手机立即绑定

使用队列会导致 asyncio 异常“将 Future <Future pending>

使用队列会导致 asyncio 异常“将 Future <Future pending>

当年话下 2021-09-14 15:13:48
我试图用 asyncio 队列运行这个简单的代码,但捕获异常,甚至嵌套异常。我想获得一些帮助,使 asyncio 中的队列正常工作:import asyncio, logginglogging.basicConfig(level=logging.DEBUG)logging.getLogger("asyncio").setLevel(logging.WARNING)num_workers = 1in_queue = asyncio.Queue()out_queue = asyncio.Queue()tasks = []async def run():    for request in range(1):        await in_queue.put(request)    # each task consumes from 'input_queue' and produces to 'output_queue':    for i in range(num_workers):        tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))    # tasks.append(asyncio.create_task(saver()))    print('waiting for queues...')    await in_queue.join()    # await out_queue.join()    print('all queues done')    for task in tasks:        task.cancel()    print('waiting until all tasks cancelled')    await asyncio.gather(*tasks, return_exceptions=True)    print('done')async def worker(name):    while True:        try:            print(f"{name} started")            num = await in_queue.get()            print(f'{name} got {num}')            await asyncio.sleep(0)            # await out_queue.put(num)        except Exception as e:            print(f"{name} exception {e}")        finally:            print(f"{name} ended")            in_queue.task_done()async def saver():    while True:        try:            print("saver started")            num = await out_queue.get()            print(f'saver got {num}')            await asyncio.sleep(0)            print("saver ended")        except Exception as e:            print(f"saver exception {e}")        finally:            out_queue.task_done()asyncio.run(run(), debug=True)print('Done!')输出:waiting for queues...worker-0 startedworker-0 got 0worker-0 endedworker-0 startedworker-0 exception worker-0 ended这是基本流程,我稍后想做的是在更多工作人员上运行更多请求,其中每个工作人员将把数字从 移动in_queue到out_queue,然后保护程序将从out_queue.
查看完整描述

2 回答

  • 2 回答
  • 0 关注
  • 209 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信