我正在尝试在Django的单元测试框架中测试一些celery功能,但是每当我尝试检查AsyncResult时,测试的行为就好像从未启动过一样。我知道这段代码可以在RabbitMQ的真实环境中工作,所以我只是想知道为什么在使用测试框架时它不起作用。这是一个例子:@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS = True, CELERY_ALWAYS_EAGER = True, BROKER_BACKEND = 'memory',)def test_celery_do_work(self): result = myapp.tasks.celery_do_work.AsyncResult('blat') applied_task = myapp.tasks.celery_do_work.apply_async((), task_id='blat') applied_task.wait() # THIS SUCCEEDS self.assertTrue(applied_task.successful()) # THIS FAILS self.assertTrue(result.successful())使用ALWAYS_EAGER选项是否会因为立即执行而禁用AsyncResult功能?如果是这样,有什么方法可以对AsyncResult状态检查进行单元测试?如果我尝试取出ALWAYS_EAGER选项,则测试将永远不会运行,因此我一头雾水。
1 回答

哈士奇WWW
TA贡献1799条经验 获得超6个赞
如果CELERY_ALWAYS_EAGER
为is True
,则对的呼叫apply_async()
实际上被替换为apply()
。返回的结果是EagerResult
,其中已经包含任务的结果。
因此,是的,设置ALWAYS_EAGER = True
将禁用整个AsyncResult
功能。整个异步过程将被绕过,并且实际上没有任何任务发送到代理,这就是为什么您无法通过检索结果的原因AsyncResult
。
CELERY_ALWAYS_EAGER = True
在测试仅需要Celery结果的代码路径时使用,并与EagerResult
或一起使用相同的方式AsyncResult
。
如果需要,还有运行与测试的方式AsyncResult
也与CELERY_ALWAYS_EAGER = False
,但对于这一点,你需要调用任务在你的测试用例之前启动工作。然后,工作人员将能够执行您的任务,并且AsyncResult
可以正常工作。您可以看一下django-celery-testworker,它似乎就是这样做的,尽管我尚未对其进行测试。
添加回答
举报
0/150
提交
取消