1 回答

TA贡献1821条经验 获得超6个赞
您的代码中有错误。实际上,queue.task_done()应该只在从队列中取出元素时调用,而不是在将它们放入队列时调用。
但是您的中间件类正在它刚刚使用的队列上调用它.put(),用于self._out列表中的最后一个队列;queue.task_done()从DistributorMiddleware.distribute()以下位置删除呼叫:
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
# ^^^^^ you didn't take anything from the queue here!
当您删除该行时,您的测试就通过了。
您在测试中看到异常的原因是因为只有这样队列才知道task_done()被调用得太频繁了。该queue.task_done()呼叫DistributorMiddleware.distribute()减1,未完成的任务计数器,但只有当该计数器下降到低于零能的异常进行检测。只有当最后一个任务从 中的队列中取出时,您才会到达那个点test_distribution(),此时未完成的任务计数器至少提前一步达到 0。
也许那是为了改为调用self._in.task_done()?您只是在该while循环中从该队列中获取了一个元素:
async def distribute(self):
while True:
ele = await self._in.get()
# getting an element from self._in
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
self._in.task_done()
# done with ele, so decrement the self._in unfinished tasks counter
添加回答
举报