为了账号安全,请及时绑定邮箱和手机立即绑定

在芹菜中对AsyncResult进行单元测试

在芹菜中对AsyncResult进行单元测试

慕妹3242003 2021-03-29 13:11:14
我正在尝试在Django的单元测试框架中测试一些celery功能,但是每当我尝试检查AsyncResult时,测试的行为就好像从未启动过一样。我知道这段代码可以在RabbitMQ的真实环境中工作,所以我只是想知道为什么在使用测试框架时它不起作用。这是一个例子:@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,                   CELERY_ALWAYS_EAGER = True,                   BROKER_BACKEND = 'memory',)def test_celery_do_work(self):    result = myapp.tasks.celery_do_work.AsyncResult('blat')    applied_task = myapp.tasks.celery_do_work.apply_async((), task_id='blat')    applied_task.wait()    # THIS SUCCEEDS    self.assertTrue(applied_task.successful())    # THIS FAILS    self.assertTrue(result.successful())使用ALWAYS_EAGER选项是否会因为立即执行而禁用AsyncResult功能?如果是这样,有什么方法可以对AsyncResult状态检查进行单元测试?如果我尝试取出ALWAYS_EAGER选项,则测试将永远不会运行,因此我一头雾水。
查看完整描述

1 回答

?
哈士奇WWW

TA贡献1799条经验 获得超6个赞

如果CELERY_ALWAYS_EAGER为is True,则对的呼叫apply_async()实际上被替换为apply()。返回的结果是EagerResult,其中已经包含任务的结果。

因此,是的,设置ALWAYS_EAGER = True将禁用整个AsyncResult功能。整个异步过程将被绕过,并且实际上没有任何任务发送到代理,这就是为什么您无法通过检索结果的原因AsyncResult

CELERY_ALWAYS_EAGER = True在测试仅需要Celery结果的代码路径时使用,并与EagerResult或一起使用相同的方式AsyncResult

如果需要,还有运行与测试的方式AsyncResult也与CELERY_ALWAYS_EAGER = False,但对于这一点,你需要调用任务在你的测试用例之前启动工作。然后,工作人员将能够执行您的任务,并且AsyncResult可以正常工作。您可以看一下django-celery-testworker,它似乎就是这样做的,尽管我尚未对其进行测试。


查看完整回答
反对 回复 2021-04-02
  • 1 回答
  • 0 关注
  • 186 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号