为了账号安全,请及时绑定邮箱和手机立即绑定

调用 thread.join() 是否会在异步上下文中阻止事件循环?

调用 thread.join() 是否会在异步上下文中阻止事件循环?

波斯汪 2021-08-05 18:22:36
我正在使用aiohttp实现一个 Web API ,使用启用了 UVloop 的 gunicorn部署--worker-class aiohttp.GunicornUVLoopWebWorker。因此,我的代码总是在异步上下文中运行。我有在处理请求时实现并行作业以获得更好性能的想法。我没有使用,asyncio因为我想要Parallelism,而不是Concurrency。我知道python 中的多处理和GIL 问题。但加入流程也适用于我的问题。下面是一个例子:from aiohttp.web import middleware@middlewareasync def context_init(request, handler):    request.context = {}    request.context['threads'] = []    ret = await handler(request)    for thread in request.context['threads']:        thread.join()    return ret考虑到thread.join()或process.join()阻塞当前线程,这将阻塞事件循环(据我所知)。如何异步加入?我想要的可以形象地表示为:await thread.join()或await process.join()。更新:感谢@user4815162342,我能够为我的项目编写正确的代码:中间件:from aiohttp.web import middlewarefrom util.process_session import ProcessSession@middlewareasync def context_init(request, handler):    request.context = {}    request.context['process_session'] = ProcessSession()    request.context['processes'] = {}    ret = await handler(request)    await request.context['process_session'].wait_for_all()    return ret用途:import asyncioimport concurrent.futuresfrom functools import partialclass ProcessSession():    def __init__(self):        self.loop = asyncio.get_running_loop()        self.pool = concurrent.futures.ProcessPoolExecutor()        self.futures = []    async def wait_for_all(self):        await asyncio.wait(self.futures)    def add_process(self, f, *args, **kwargs):        ret = self.loop.run_in_executor(self.pool, partial(f, *args, **kwargs))        self.futures.append(ret)        return retclass ProcessBase():    def __init__(self, process_session, f, *args, **kwargs):        self.future = process_session.add_process(f, *args, **kwargs)    async def wait(self):        await asyncio.wait([self.future])        return self.future.result()
查看完整描述

2 回答

?
HUWWW

TA贡献1874条经验 获得超12个赞

回答您的问题:是的,它确实阻止了事件循环。


我发现这ThreadPoolExecutor在这种情况下效果很好。


from util.process_session import ProcessSession

from concurrent.futures.thread import ThreadPoolExecutor

import asyncio


from aiohttp.web import middleware


@middleware

async def context_init(request, handler):

    request.context = {}

    request.context['threads'] = []

    ret = await handler(request)

    with ThreadPoolExecutor(1) as executor:

           await asyncio.get_event_loop().run_in_executor(executor, 

           functools.partial(join_threads, data={

             'threads': request.context['threads']

           }))

    return ret


def join_threads(threads):

    for t in threads:

        t.join()


查看完整回答
反对 回复 2021-08-05
  • 2 回答
  • 0 关注
  • 155 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信