为了账号安全,请及时绑定邮箱和手机立即绑定

如何向Celery动态添加/删除定期任务(celerybeat)

如何向Celery动态添加/删除定期任务(celerybeat)

牛魔王的故事 2019-11-27 14:37:38
如果我有一个定义如下的函数:def add(x,y):  return x+y有没有办法将该函数动态添加为芹菜PeriodicTask并在运行时启动它?我希望能够做类似(伪代码)的事情:some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))celery.beat.start(some_unique_task_id)我还想用(pseudocode)之类的东西动态地停止或删除该任务:celery.beat.remove_task(some_unique_task_id)要么celery.beat.stop(some_unique_task_id)仅供参考,我没有使用djcelery,它使您可以通过django管理员管理定期任务
查看完整描述

3 回答

?
达令说

TA贡献1821条经验 获得超6个赞

这个问题在google网上论坛上得到了回答。


我不是作者,所有功劳归Jean Jean


这是一个适当的解决方案。确认可以正常工作,在我的场景中,我将“定期任务”子类化,并根据它创建了一个模型,因为我可以根据需要向模型中添加其他字段,也可以添加“终止”方法。您必须将定期任务的enabled属性设置为False并保存,然后再将其删除。整个子类不是必须的,schedule_every方法是真正起作用的方法。当您准备终止任务时(如果您没有将其子类化),您可以使用PeriodicTask.objects.filter(name = ...)搜索任务,将其禁用,然后将其删除。


希望这可以帮助!


from djcelery.models import PeriodicTask, IntervalSchedule

from datetime import datetime


class TaskScheduler(models.Model):


    periodic_task = models.ForeignKey(PeriodicTask)


    @staticmethod

    def schedule_every(task_name, period, every, args=None, kwargs=None):

    """ schedules a task by name every "every" "period". So an example call would be:

         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 

         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 

    """

        permissible_periods = ['days', 'hours', 'minutes', 'seconds']

        if period not in permissible_periods:

            raise Exception('Invalid period specified')

        # create the periodic task and the interval

        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task

        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)

        if interval_schedules: # just check if interval schedules exist like that already and reuse em

            interval_schedule = interval_schedules[0]

        else: # create a brand new interval schedule

            interval_schedule = IntervalSchedule()

            interval_schedule.every = every # should check to make sure this is a positive int

            interval_schedule.period = period 

            interval_schedule.save()

        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)

        if args:

            ptask.args = args

        if kwargs:

            ptask.kwargs = kwargs

        ptask.save()

        return TaskScheduler.objects.create(periodic_task=ptask)


    def stop(self):

        """pauses the task"""

        ptask = self.periodic_task

        ptask.enabled = False

        ptask.save()


    def start(self):

        """starts the task"""

        ptask = self.periodic_task

        ptask.enabled = True

        ptask.save()


    def terminate(self):

        self.stop()

        ptask = self.periodic_task

        self.delete()

        ptask.delete()


查看完整回答
反对 回复 2019-11-27
?
至尊宝的传说

TA贡献1789条经验 获得超10个赞

有一个名为django-celery-beat的库,它提供了所需的模型。为了使其能够动态加载新的定期任务,必须创建自己的Scheduler。


from django_celery_beat.schedulers import DatabaseScheduler



class AutoUpdateScheduler(DatabaseScheduler):


    def tick(self, *args, **kwargs):

        if self.schedule_changed():

            print('resetting heap')

            self.sync()

            self._heap = None

            new_schedule = self.all_as_schedule()


            if new_schedule:

                to_add = new_schedule.keys() - self.schedule.keys()

                to_remove = self.schedule.keys() - new_schedule.keys()

                for key in to_add:

                    self.schedule[key] = new_schedule[key]

                for key in to_remove:

                    del self.schedule[key]


        super(AutoUpdateScheduler, self).tick(*args, **kwargs)


    @property

    def schedule(self):

        if not self._initial_read and not self._schedule:

            self._initial_read = True

            self._schedule = self.all_as_schedule()


        return self._s


查看完整回答
反对 回复 2019-11-27
  • 3 回答
  • 0 关注
  • 4128 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信