为了账号安全,请及时绑定邮箱和手机立即绑定

如何按名称从 Celery 队列中删除任务

如何按名称从 Celery 队列中删除任务

杨__羊羊 2023-06-27 17:31:57
我正在尝试找到一种方法来从 Celery 队列中删除所有当前排队的具有特定给定名称的任务。从官方文件中,我知道我可以通过查找工人的姓名然后获取他们的 ID 来检查工人并撤销任务,例如:def drop_celery_task(options):    def _get_tasks_id(workers: list, tasks_ids: list, task_name: str):        """        Get task ids with the given name included inside the given `workers` tasks.        {'worker1.example.com': [             {'name': 'tasks.sleeptask', 'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',              'args': '(8,)', 'kwargs': '{}'}]        }        """        for worker in workers:            if not workers[worker]:                continue            for _task in workers[worker]:                if _task["name"].split(".")[-1] == task_name:                    tasks_ids.append(_task["id"])    task_name = options.drop_celery_task["name"]    i = Inspect(app=celery_app)  # Inspect all nodes.    registered = i.registered()    if not registered:        raise Exception("No registered tasks found")    if not any(task_name == worker.split(".")[-1] for worker in chain(*list(registered.values()))):        raise Exception(f"Task not registered: {task_name}")    tasks_ids = []    _get_tasks_id(i.active(), tasks_ids, task_name)    _get_tasks_id(i.scheduled(), tasks_ids, task_name)    _get_tasks_id(i.reserved(), tasks_ids, task_name)    if tasks_ids:        for task_id in tasks_ids:            Control(app=celery_app).revoke(task_id)    else:        logging.info(f"No active/scheduled/registered task found with the name {task_name}")但此代码仅撤销 celery 工作人员获取或预取的任务,而不是仍在队列中的任务(使用 Redis 作为后端)。关于如何使用 celery 命令删除 Redis 中的任务,或阻止工作人员接受具有给定名称的任务,有什么建议吗?
查看完整描述

1 回答

?
长风秋雁

TA贡献1757条经验 获得超7个赞

我最终在 Redis 中使用我想要的名称标识了任务的 ID(使用 Redis 客户端,而不是 celery 命令),然后通过命令撤销这些 ID Control(app=celery_app).revoke(task_id)。在 Redis 中,队列是带有队列名称的键下的列表对象。



查看完整回答
反对 回复 2023-06-27
  • 1 回答
  • 0 关注
  • 336 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信