我有多个测试文件,每个文件都有一个异步夹具,如下所示:@pytest.fixture(scope="module")def event_loop(request): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close()@pytest.fixture(scope="module")async def some_fixture(): return await make_fixture()我正在使用 xdist 进行并行化。另外我有这个装饰器:@toolz.currydef throttle(limit, f): semaphore = asyncio.Semaphore(limit) @functools.wraps(f) async def wrapped(*args, **kwargs): async with semaphore: return await f(*args, **kwargs) return wrapped我有一个函数使用它:@throttle(10)def f(): ...现在f正在从多个测试文件调用,并且我收到一个异常,告诉我无法使用不同事件循环中的信号量。我尝试转向会话级事件循环装置:@pytest.fixture(scope="session", autouse=True)def event_loop(request): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close()但这只给了我:ScopeMismatch:您尝试使用“模块”范围请求对象访问“函数”范围固定装置“event_loop”,涉及工厂是否有可能让 xdist + 异步装置 + 信号量一起工作?
1 回答
胡子哥哥
TA贡献1825条经验 获得超6个赞
最终使用以下方法让它工作conftest.py:
import asyncio
import pytest
@pytest.fixture(scope="session")
def event_loop():
return asyncio.get_event_loop()
添加回答
举报
0/150
提交
取消