为了账号安全,请及时绑定邮箱和手机立即绑定

Python - 在 asyncio 中取消任务?

Python - 在 asyncio 中取消任务?

慕无忌1623718 2023-06-20 16:51:11
我在下面为异步池编写了代码。在__aexit__任务完成后我取消了 _worker 任务。但是当我运行代码时,工作任务不会被取消并且代码会永远运行。这就是任务的样子:<Task pending coro=<AsyncPool._worker() running at \async_pool.py:17> wait_for=<Future cancelled>>。正在asyncio.wait_for取消但不是工作任务。class AsyncPool:    def __init__(self,coroutine,no_of_workers,timeout):        self._loop           = asyncio.get_event_loop()        self._queue          = asyncio.Queue()        self._no_of_workers  = no_of_workers        self._coroutine      = coroutine        self._timeout        = timeout        self._workers        = None    async def _worker(self):         while True:            try:                ret = False                queue_item           = await self._queue.get()                ret = True                result               = await asyncio.wait_for(self._coroutine(queue_item), timeout = self._timeout,loop= self._loop)            except Exception as e:                print(e)            finally:                if ret:                    self._queue.task_done()    async def push_to_queue(self,item):        self._queue.put_nowait(item)        async def __aenter__(self):        assert self._workers == None        self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]        return self        async def __aexit__(self,type,value,traceback):        await self._queue.join()        for worker in self._workers:            worker.cancel()        await asyncio.gather(*self._workers, loop=self._loop, return_exceptions =True)要使用异步池:async def something(item):    print("got", item)    await asyncio.sleep(item) async def main():    async with AsyncPool(something, 5, 2) as pool:        for i in range(10):            await pool.push_to_queue(i) asyncio.run(main())我终端的输出:
查看完整描述

3 回答

?
侃侃尔雅

TA贡献1801条经验 获得超15个赞

问题是您的except Exception例外条款也会捕获取消并忽略它。更令人困惑的是,print(e)在 a 的情况下只打印一个空行CancelledError,这是输出中空行的来源。(将其更改为print(type(e))显示发生了什么。)

要更正此问题,请更改except Exception为更具体的内容,例如except asyncio.TimeoutError. Python 3.8 中不需要此更改,它asyncio.CancelledError不再派生自Exception,而是派生自BaseException,因此except Exception不会捕获它。


查看完整回答
反对 回复 2023-06-20
?
慕慕森

TA贡献1856条经验 获得超17个赞

这似乎有效。这event是一个计数计时器,当它到期时它的cancels任务。


import asyncio

from datetime import datetime as dt

from datetime import timedelta as td

import random

import time


class Program:

    def __init__(self):

        self.duration_in_seconds = 20

        self.program_start = dt.now()

        self.event_has_expired = False

        self.canceled_success = False


        


    async def on_start(self):

        print("On Start Event Start! Applying Overrides!!!")

        await asyncio.sleep(random.randint(3, 9))



    async def on_end(self):

        print("On End Releasing All Overrides!")

        await asyncio.sleep(random.randint(3, 9))

        


    async def get_sensor_readings(self):

        print("getting sensor readings!!!")

        await asyncio.sleep(random.randint(3, 9))   


 

    async def evauluate_data(self):

        print("checking data!!!")

        await asyncio.sleep(random.randint(3, 9))   



    async def check_time(self):

        if (dt.now() - self.program_start > td(seconds = self.duration_in_seconds)):

            self.event_has_expired = True

            print("Event is DONE!!!")


        else:

            print("Event is not done! ",dt.now() - self.program_start)




    async def main(self):

        # script starts, do only once self.on_start()

        await self.on_start()

        print("On Start Done!")


        while not self.canceled_success:


            readings = asyncio.ensure_future(self.get_sensor_readings())

            analysis = asyncio.ensure_future(self.evauluate_data())

            checker = asyncio.ensure_future(self.check_time())

            

            if not self.event_has_expired:

                await readings   

                await analysis           

                await checker

                

            else:

                # close other tasks before final shutdown

                readings.cancel()

                analysis.cancel()

                checker.cancel()

                self.canceled_success = True

                print("cancelled hit!")



        # script ends, do only once self.on_end() when even is done

        await self.on_end()

        print('Done Deal!')



async def main():

    program = Program()

    await program.main()



查看完整回答
反对 回复 2023-06-20
?
红颜莎娜

TA贡献1842条经验 获得超12个赞

当您asyncio创建然后取消任务时,您仍然有需要“回收”的任务。所以你想要await worker它。然而,一旦你await取消了这样的任务,因为它永远不会给你返回预期的返回值,就会asyncio.CancelledError被提高,你需要在某个地方抓住它。


由于这种行为,我认为您不应该为每个取消的任务gather执行它们await,因为它们应该立即返回:


async def __aexit__(self,type,value,traceback):

    await self._queue.join()


    for worker in self._workers:

        worker.cancel()

    for worker in self._workers:

        try:

           await worker

        except asyncio.CancelledError:

           print("worker cancelled:", worker)


查看完整回答
反对 回复 2023-06-20
  • 3 回答
  • 0 关注
  • 236 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信