2012-04-17 3 views
37

Если я функция определена следующим образом:Как динамически добавлять/удалять периодические задачи сельдерея (celerybeat)

def add(x,y): 
    return x+y 

Есть ли способ, чтобы динамически добавлять эту функцию в качестве сельдерея PeriodicTask и пнуть его в во время выполнения? Я хотел бы быть в состоянии сделать что-то вроде (псевдокод):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30")) 
celery.beat.start(some_unique_task_id) 

Я также хочу, чтобы остановить или удалить эту задачу динамически чем-то вроде (псевдокод):

celery.beat.remove_task(some_unique_task_id) 

или

celery.beat.stop(some_unique_task_id) 

FYI Я не использую djcelery, который позволяет вам управлять периодическими задачами с помощью администратора django.

ответ

18

Нет, извините, это невозможно с обычным сельдереем.

Но он легко расширяется, чтобы делать то, что вы хотите, например. django-celery scheduler - это просто подкласс, считывающий и записывающий расписание в базу данных (с некоторыми оптимизациями сверху).

Также вы можете использовать планировщик django-celery даже для проектов, отличных от Django.

Что-то вроде этого:

  • Установите Джанго + Джанго-сельдерей:

    $ ГПГ, установить -U-Джанго Джанго сельдерей

  • Добавьте следующие параметры в celeryconfig:

    DATABASES = { 
        'default': { 
         'NAME': 'celerybeat.db', 
         'ENGINE': 'django.db.backends.sqlite3', 
        }, 
    } 
    INSTALLED_APPS = ('djcelery',) 
    
  • Создать таблицы базы данных :

    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig 
    
  • Start celerybeat с планировщиком базы данных:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ 
        -S djcelery.schedulers.DatabaseScheduler 
    

Также есть еще djcelerymon команда, которая может быть использована для не-Django проектов начать celerycam и веб-сервер Django администратора в в том же процессе вы можете использовать , чтобы также редактировать периодические задачи в хорошем веб-интерфейсе:

$ djcelerymon 

(Примечание для какой-то причине djcelerymon не может быть остановлена ​​с помощью Ctrl + C, вы должны использовать Ctrl + Z + убить% 1)

+1

Не могли бы вы упомянуть код, чтобы добавить задачу и удалить? Извините, я не понимаю. –

+6

Любые изменения в этом с 2012 по 2016 год? – Tanay

32

Этот вопрос был на google groups.

Я НЕ АВТОР, вся заслуга Жан Марк

Вот правильное решение для этого.Подтвержденная работа. В моем сценарии Я подклассифицировал Periodic Task и создал из нее модель, так как я могу добавить другие поля в модель, поскольку мне нужно, а также я мог бы добавить метод «прекращения» . Вы должны установить для свойства периодической задачи значение значение False и сохранить его перед удалением. Весь подкласс не является обязательным, метод schedule_every - тот, который действительно выполняет эту работу. Когда вы готовы прекратить выполнение задачи (если вы не указали ), вы можете просто использовать PeriodicTask.objects.filter (name = ...) для поиска вашей задачи, отключить , а затем удалить его.

Надеюсь, это поможет!

from djcelery.models import PeriodicTask, IntervalSchedule 
from datetime import datetime 

class TaskScheduler(models.Model): 

    periodic_task = models.ForeignKey(PeriodicTask) 

    @staticmethod 
    def schedule_every(task_name, period, every, args=None, kwargs=None): 
    """ schedules a task by name every "every" "period". So an example call would be: 
     TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
     that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """ 
     permissible_periods = ['days', 'hours', 'minutes', 'seconds'] 
     if period not in permissible_periods: 
      raise Exception('Invalid period specified') 
     # create the periodic task and the interval 
     ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task 
     interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) 
     if interval_schedules: # just check if interval schedules exist like that already and reuse em 
      interval_schedule = interval_schedules[0] 
     else: # create a brand new interval schedule 
      interval_schedule = IntervalSchedule() 
      interval_schedule.every = every # should check to make sure this is a positive int 
      interval_schedule.period = period 
      interval_schedule.save() 
     ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) 
     if args: 
      ptask.args = args 
     if kwargs: 
      ptask.kwargs = kwargs 
     ptask.save() 
     return TaskScheduler.objects.create(periodic_task=ptask) 

    def stop(self): 
     """pauses the task""" 
     ptask = self.periodic_task 
     ptask.enabled = False 
     ptask.save() 

    def start(self): 
     """starts the task""" 
     ptask = self.periodic_task 
     ptask.enabled = True 
     ptask.save() 

    def terminate(self): 
     self.stop() 
     ptask = self.periodic_task 
     self.delete() 
     ptask.delete() 
+1

Это должен быть принятый ответ. – kai

+1

@kai 'IntervalSchedule',' PeriodicTask' и т. Д. Являются классами 'djcelery', а OP говорит, что он не использует' djcelery'. Определенно полезно, тем не менее. – Chris

2

Вы можете проверить это flask-djcelery, который конфигурирует флягу и djcelery, а также обеспечивает просматриваться отдых апи

2

Существует библиотека называется Джанго-сельдерей-бит, который обеспечивает модели нужно. Чтобы динамически загружать новые периодические задачи, нужно создать собственный планировщик.

from django_celery_beat.schedulers import DatabaseScheduler 


class AutoUpdateScheduler(DatabaseScheduler): 

    def tick(self, *args, **kwargs): 
     if self.schedule_changed(): 
      print('resetting heap') 
      self.sync() 
      self._heap = None 
      new_schedule = self.all_as_schedule() 

      if new_schedule: 
       to_add = new_schedule.keys() - self.schedule.keys() 
       to_remove = self.schedule.keys() - new_schedule.keys() 
       for key in to_add: 
        self.schedule[key] = new_schedule[key] 
       for key in to_remove: 
        del self.schedule[key] 

     super(AutoUpdateScheduler, self).tick(*args, **kwargs) 

    @property 
    def schedule(self): 
     if not self._initial_read and not self._schedule: 
      self._initial_read = True 
      self._schedule = self.all_as_schedule() 

     return self._schedule 
+0

Спасибо. Не работает сразу, но используя 'to_add = [ключ для ключа в new_schedule.keys(), если ключ не в self.schedule.keys()]' и аналогичный для 'to_remove' сделал трюк. Почему это не стандартный вариант? До сих пор мне приходилось сталкиваться с задачами Сельдери, которые обращаются к другим задачам сельдерея с обратным отсчетом. Это звучит не очень хорошо для меня. – freethebees