为了账号安全,请及时绑定邮箱和手机立即绑定

Queue.asyncio ValueError: task_done() 调用了太多次

Queue.asyncio ValueError: task_done() 调用了太多次

qq_笑_17 2021-08-11 22:09:41
我实现了一段代码,它从一个队列中获取一个元素,并将相同的对象从队列列表中放入每个队列中。问题是,当我运行特定测试时,出现ValueError: task_done() called too many times异常。这个错误发生在测试代码中,而不是在被测试的代码中。我正在asyncio.Queue使用协程和编程。我将每Queue.get一个都与一个准确的Queue.task_done电话相匹配。我正在用pytest测试代码。我正在使用以下库:蟒蛇 3.7pytest==3.10.0pytest-asyncio==0.9.0我有两个文件:middleware.py包含我的类实现和test_middleware.py实现pytest测试。文件middlware.py:import asyncioclass DistributorMiddleware:    def __init__(self, in_queue, out_list_queue):        self._in = in_queue        self._out = out_list_queue    async def distribute(self):        while True:            ele = await self._in.get()            count=0            for queue in self._out:                await queue.put(ele)                count+=1                print(f'inserted ele in {count}')            queue.task_done()            if ele == None:                break        for queue in self._out:            await queue.join()文件test_middleware.py:import pytestimport asyncio                from asyncio import Queuefrom middleware import DistributorMiddlewareimport randomimport os@pytest.mark.asyncio                                                                                     async def test_distribution(request, event_loop):                                                            q_list = [ Queue() for _ in range(10) ]                                                                  _in = Queue()    distrib = DistributorMiddleware(_in, q_list)                                                             event_loop.create_task(distrib.distribute())                                                             num_ele = random.randint(1, 10)    ele_set = set()    for _ in range(num_ele):                                                                                     ele = os.urandom(4)                                                                                      ele_set.add(ele)        await _in.put(ele)    await _in.put(None)                                                                                  
查看完整描述

1 回答

?
达令说

TA贡献1821条经验 获得超6个赞

您的代码中有错误。实际上,queue.task_done()应该只在从队列中取出元素时调用,而不是在将它们放入队列时调用。


但是您的中间件类正在它刚刚使用的队列上调用它.put(),用于self._out列表中的最后一个队列;queue.task_done()从DistributorMiddleware.distribute()以下位置删除呼叫:


async def distribute(self):


    while True:

        ele = await self._in.get()

        count=0

        for queue in self._out:

            await queue.put(ele)

            count+=1

            print(f'inserted ele in {count}')

        queue.task_done()

        # ^^^^^ you didn't take anything from the queue here!

当您删除该行时,您的测试就通过了。


您在测试中看到异常的原因是因为只有这样队列才知道task_done()被调用得太频繁了。该queue.task_done()呼叫DistributorMiddleware.distribute()减1,未完成的任务计数器,但只有当该计数器下降到低于零能的异常进行检测。只有当最后一个任务从 中的队列中取出时,您才会到达那个点test_distribution(),此时未完成的任务计数器至少提前一步达到 0。


也许那是为了改为调用self._in.task_done()?您只是在该while循环中从该队列中获取了一个元素:


async def distribute(self):


    while True:

        ele = await self._in.get()

        # getting an element from self._in

        count=0

        for queue in self._out:

            await queue.put(ele)

            count+=1

            print(f'inserted ele in {count}')

        self._in.task_done()

        # done with ele, so decrement the self._in unfinished tasks counter


查看完整回答
反对 回复 2021-08-11
  • 1 回答
  • 0 关注
  • 785 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号