为了账号安全,请及时绑定邮箱和手机立即绑定

异步收集的顺序版本

异步收集的顺序版本

DIEA 2022-09-27 16:45:08
我试图创建一个类似于asyncio.gather的方法,但它将按顺序而不是异步执行任务列表:async def in_sequence(*tasks):    """Executes tasks in sequence"""    for task in tasks:        await task接下来,此方法应该按如下方式使用:async def some_work(work_name):    """Do some work"""    print(f"Start {work_name}")    await asyncio.sleep(1)    if raise_exception:        raise RuntimeError(f"{work_name} raise an exception")    print(f"Finish {work_name}")async def main():    try:        await asyncio.gather(            some_work("work1"),         # work1, work2, in_sequence and work5 executed in concurrently            some_work("work2"),            in_sequence(                some_work("work3"),     # work3 and work4 executed in sequence                some_work("work4")            ),            some_work("work5"),    except RuntimeError as error:        print(error)                    # raise an exception at any point to terminate一切都很好,直到我试图在some_work中抛出一个例外:async def main():    try:        await asyncio.gather(            some_work("work1"),            some_work("work2"),            in_sequence(                some_work("work3", raise_exception=True),       # raise an exception here                some_work("work4")            ),            some_work("work5"),    except RuntimeError as error:        print(error)之后,我立即收到以下错误消息:RuntimeWarning: coroutine 'some_work' was never awaited我阅读了文档并继续实验:async def in_sequence(*tasks):    """Executes tasks in sequence"""    _tasks = []    for task in tasks:        _tasks.append(asyncio.create_task(task))    for _task in _tasks:        await _task这个版本按预期工作!在这方面,我有下一个问题:为什么第二个版本有效,而第一个版本不起作用?asyncio是否已经拥有按顺序执行任务列表的工具?我是否选择了正确的实现方法,或者是否有更好的选择?
查看完整描述

4 回答

?
慕桂英4014372

TA贡献1871条经验 获得超13个赞

  1. 第一个版本不起作用,因为没有捕获可在 上引发的异常。第二个方法之所以有效,是因为创建了一个运行协程的类似未来的 Task 对象。该对象不会返回/传播包装协程的结果。当您使用该对象时,它将挂起,直到有结果异常,或者直到它被取消in_sequenceawait taskcreate_taskawait

  2. 似乎没有。

  3. 第二个版本将同时执行传递的协程,因此它是不正确的实现。如果你真的想使用一些功能,你可以:in_sequence

    • 以某种方式延迟协程的创建。

    • 对函数中的顺序执行进行分组async

例如:

async def in_sequence(*fn_and_args):

    for fn, args, kwargs in fn_and_args:

        await fn(*args, **kwargs)  # create a coro and await it in place


in_sequence(

    (some_work, ("work3",), {'raise_exception': True}),

    (some_work, ("work4",), {}),

)

async def in_sequence():

    await some_work("work3", raise_exception=True)

    await some_work("work4")


查看完整回答
反对 回复 2022-09-27
?
当年话下

TA贡献1890条经验 获得超9个赞

这个版本按预期工作!


第二个版本的问题在于,它实际上并没有按顺序运行协程,而是并行运行它们。这是因为调度协程与当前协程并行运行。因此,当您在循环中等待任务时,您实际上允许所有任务在等待第一个任务时运行。尽管外观如此,但整个循环的运行时间与最长的任务一样长。(有关更多详细信息,请参阅此处。asyncio.create_task()


第一个版本显示的警告旨在防止您意外创建从未等待过的协程,例如,仅编写 而不是 。就 asyncio 而言,是实例化协程对象并将它们传递给“忘记”等待其中一些对象。asyncio.sleep(1)await asyncio.sleep(1)mainin_sequence


禁止显示警告消息的一种方法是允许协程旋转,但立即取消它。例如:


async def in_sequence(*coros):

    remaining = iter(coros)

    for coro in remaining:

        try:

            await coro

        except Exception:

            for c in remaining:

                asyncio.create_task(c).cancel()

            raise

请注意,以下划线开头的变量名称标记未使用的变量,因此,如果您实际使用它们,则不应命名变量。


查看完整回答
反对 回复 2022-09-27
?
精慕HU

TA贡献1845条经验 获得超8个赞

从 user4815162342 和安东·波米申科的解决方案中汲取灵感,我想出了它的以下变体:


async def in_sequence(*storm):

    twister = iter(storm)

    for task in twister:

        task = task() # if it's a regular function, it's done here.

        if inspect.isawaitable(task):

            try:

                await task # if it's also awaitable, await it

            except BaseException as e:

                task.throw(e) # if an error occurs, throw it into the coroutine

            finally:

                task.close() # to ensure coroutine closer


    assert not any(twister) # optionally verify that the iterator is now empty


通过这种方式,您可以将常规函数与协程与此组合在一起。但一定要这样称呼它:in_sequence


await in_sequence(*[b.despawn, b.release])

请注意缺少 (),因为否则将立即调用常规函数,并且协程将引发 for 从未等待过。(是一个协程,不是我的例子)()__call__()RuntimeWarningb.despawnb.release


您也可以在调用 之前进行额外的检查,但这取决于您。callable(task)task()


查看完整回答
反对 回复 2022-09-27
?
白板的微信

TA贡献1883条经验 获得超3个赞

你说in_sequence的版本有效(asyncio.create_task),但我认为它没有。来自文档


将 coro 协程包装到任务中并计划其执行。返回任务对象。


它似乎并行运行协程,但您需要按顺序运行它们。


所以实验并找到了两种方法来解决这个问题


使用原始in_sequence函数并添加以下代码,以隐藏该错误:


import warnings

warnings.filterwarnings(

    'ignore',

    message=r'^coroutine .* was never awaited$',

    category=RuntimeWarning

)

修复in_sequence函数,如下所示:


async def in_sequence(*tasks):

    for index, task in enumerate(tasks):

        try:

            await task

        except Exception as e:

            for task in tasks[index + 1:]:

                task.close()

            raise e

其他问题的答案:


当您在协程上没有链接时,C++代码会触发该警告。只是简单的代码可以告诉你这个想法(在终端):

async def test():

    return 1


f = test()

f = None # after that you will get that error

我不知道

见上文


查看完整回答
反对 回复 2022-09-27
  • 4 回答
  • 0 关注
  • 91 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信