为了账号安全,请及时绑定邮箱和手机立即绑定

Python:如何实现顺序执行的异步调度程序?

Python:如何实现顺序执行的异步调度程序?

大话西游666 2023-10-31 19:23:32
我想要一个用于“动作”执行的异步调度程序,它满足某些属性:操作是一次性的,并且按照精确的时间戳进行安排。操作应严格按顺序执行,即调度程序在前一个操作完成执行之前无法启动下一个操作。在执行动作之间,当调度程序等待下一个时间戳时,调度程序必须处于 状态,asyncio.sleep()以便让其他协程轮流执行。当调度新的动作时,调度器应立即重新调整其等待时间,以便调度器始终等待最快的动作。当没有调度任何动作时,调度程序应该处于永久状态asyncio.sleep(),直到添加新动作。我的尝试:import asyncioimport timeclass Action:    def __init__(self, timestamp):        self.timestamp = timestamp    async def do(self):        print("Doing action...")class Scheduler:    def __init__(self):        self._actions = []        self._sleep_future = None    def add(self, action):        self._actions.append(action)        self._actions.sort(key=lambda x: x.timestamp)        if self._sleep_future:            self._sleep_future.cancel()    def pop(self):        return self._actions.pop(0)    async def start(self):        asyncio.create_task(self.loop())    async def loop(self):        while True:            now = time.time()                        while self._actions:                action = self._actions[0]                if action.timestamp <= now:                    action = self.pop()                                    await action.do()                                   else:                    break            self._sleep_future = asyncio.ensure_future(                asyncio.sleep(self._actions[0].timestamp - now)            )            try:                await self._sleep_future            except asyncio.CancelledError:                continue            finally:                self._sleep_future = None这个实现不可靠,并且没有考虑我寻求的条件(5)!你能给我推荐一些东西吗?
查看完整描述

1 回答

?
动漫人物

TA贡献1815条经验 获得超10个赞

asyncio 事件循环已经包含您尝试实现的代码 - 排序超时并等待任务提交。您需要使接口适应Scheduler底层 asyncio 功能,例如如下所示:


class Scheduler:

    def __init__(self):

        self._running = asyncio.Lock()


    async def start(self):

        pass  # asyncio event loop will do the actual work


    def add(self, action):

        loop = asyncio.get_event_loop()

        # Can't use `call_at()` because event loop time uses a

        # different timer than time.time().

        loop.call_later(

            action.timestamp - time.time(),

            loop.create_task, self._execute(action)

        )


    async def _execute(self, action):

        # Use a lock to ensure that no two actions run at

        # the same time.

        async with self._running:

            await action.do()


查看完整回答
反对 回复 2023-10-31
  • 1 回答
  • 0 关注
  • 156 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信