我试图用 asyncio 队列运行这个简单的代码,但捕获异常,甚至嵌套异常。我想获得一些帮助,使 asyncio 中的队列正常工作:import asyncio, logginglogging.basicConfig(level=logging.DEBUG)logging.getLogger("asyncio").setLevel(logging.WARNING)num_workers = 1in_queue = asyncio.Queue()out_queue = asyncio.Queue()tasks = []async def run(): for request in range(1): await in_queue.put(request) # each task consumes from 'input_queue' and produces to 'output_queue': for i in range(num_workers): tasks.append(asyncio.create_task(worker(name=f'worker-{i}'))) # tasks.append(asyncio.create_task(saver())) print('waiting for queues...') await in_queue.join() # await out_queue.join() print('all queues done') for task in tasks: task.cancel() print('waiting until all tasks cancelled') await asyncio.gather(*tasks, return_exceptions=True) print('done')async def worker(name): while True: try: print(f"{name} started") num = await in_queue.get() print(f'{name} got {num}') await asyncio.sleep(0) # await out_queue.put(num) except Exception as e: print(f"{name} exception {e}") finally: print(f"{name} ended") in_queue.task_done()async def saver(): while True: try: print("saver started") num = await out_queue.get() print(f'saver got {num}') await asyncio.sleep(0) print("saver ended") except Exception as e: print(f"saver exception {e}") finally: out_queue.task_done()asyncio.run(run(), debug=True)print('Done!')输出:waiting for queues...worker-0 startedworker-0 got 0worker-0 endedworker-0 startedworker-0 exception worker-0 ended这是基本流程,我稍后想做的是在更多工作人员上运行更多请求,其中每个工作人员将把数字从 移动in_queue到out_queue,然后保护程序将从out_queue.
添加回答
举报
0/150
提交
取消