为了账号安全,请及时绑定邮箱和手机立即绑定

用于迁移现有代码库的线程中的 Python asyncio

用于迁移现有代码库的线程中的 Python asyncio

偶然的你 2023-05-09 15:36:55
我们有一个相当大的项目,它正在做大量的网络(API 调用、Websocket 消息),并且还有很多内部作业在线程中间隔运行。我们当前的架构涉及生成大量线程,当系统负载很大时,应用程序无法正常运行,因此我们决定尝试使用 asyncio。我知道最好的方法是将整个代码库迁移到异步代码,但由于代码库的大小和有限的开发资源,这在不久的将来是不现实的。但是,我们希望开始迁移部分代码库以使用 asyncio 事件循环,并希望我们能够在某个时候转换整个项目。到目前为止我们遇到的问题是整个代码库都有同步代码,为了在里面添加非阻塞异步代码,代码需要在不同的线程中运行,因为你不能真正在同一个线程中运行异步和同步代码线。为了结合异步和同步代码,我想出了这种在应用程序启动时创建的单独线程中运行异步代码的方法。代码的其他部分只需调用 add_asyncio_task 即可将作业添加到此循环中。import threadingimport asyncio_tasks = []def threaded_loop(loop):    asyncio.set_event_loop(loop)    global _tasks    while True:        if len(_tasks) > 0:            # create a copy of needed tasks            needed_tasks = _tasks.copy()            # flush current tasks so that next tasks can be easily added            _tasks = []            # run tasks            task_group = asyncio.gather(*needed_tasks)            loop.run_until_complete(task_group)def add_asyncio_task(task):    _tasks.append(task)def start_asyncio_loop():    loop = asyncio.get_event_loop()    t = threading.Thread(target=threaded_loop, args=(loop,))    t.start()在 app.py 的某处:start_asyncio_loop()以及代码中的其他任何地方:add_asyncio_task(some_coroutine)由于我是 asyncio 的新手,我想知道在我们的情况下这是否是一种好方法,或者这种方法是否被认为是一种反模式并且有一些问题会在以后的道路上打击我们?或者也许 asyncio 已经有了一些解决方案,而我只是想在这里发明轮子?感谢您的投入!
查看完整描述

1 回答

?
慕后森

TA贡献1802条经验 获得超5个赞

这种方法总体上很好。你有一些问题:

(1)几乎所有的asyncio对象都不是线程安全的

(2) 您的代码本身不是线程安全的。如果任务出现在之后needed_tasks = _tasks.copy()但之前怎么办_tasks = []?这里需要一把锁。顺便说一句,复制是没有意义的。简单needed_tasks = _tasks就行了。

(3) 一些 asyncio 结构是线程安全的。使用它们:

import threading

import asyncio


# asyncio.get_event_loop() creates a new loop per thread. Keep

# a single reference to the main loop. You can even try

#   _loop = asyncio.new_event_loop()

_loop = asyncio.get_event_loop()


def get_app_loop():

    return _loop


def asyncio_thread():

    loop = get_app_loop()

    asyncio.set_event_loop(loop)

    loop.run_forever()


def add_asyncio_task(task):

    asyncio.run_coroutine_threadsafe(task, get_app_loop())


def start_asyncio_loop():

    t = threading.Thread(target=asyncio_thread)

    t.start()


查看完整回答
反对 回复 2023-05-09
  • 1 回答
  • 0 关注
  • 98 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信