2017-02-19 32 views
0

В соответствии с этим вопросом проблема решена, но похоже, что это не так. Setting Time Limit on specific task with celeryНастройка атрибутов сельдерея Задачи (i.e time_limit и soft_time_limit) не работают

Текущая версия сельдерея 3.1.18 (Cipater).

Я пытаюсь перезаписать настройки по умолчанию для задачи. Цель состоит в том, чтобы изменить softtimelimit и жесткий временной предел задачи, потому что одна и та же задача используется для нескольких целей.

Передача soft_time_limit и time_limit конструктору MyTask для изменения настроек по умолчанию.

///celery/app/ task.py 
class MyTask(task.Task): 
    time_limit = 100 
    soft_time_limit = 110 
    max_retries = 0 

def __init__(self, time_limit=None, soft_time_limit=None, 
      max_retries=None, *args, **kwargs): 
    if time_limit: 
     self.time_limit = time_limit 
    if soft_time_limit: 
     self.soft_time_limit = soft_time_limit 
    if max_retries: 
     self.max_retries = max_retries 
    task.Task.__init__(self, *args, **kwargs) 


t1 = MyTask(time_limit=30, soft_time_limit=20, 
     max_retries=5) 
or 

t1 = MyTask() 
t1.time_limit = 30 
t1.soft_time_limit = 20 

Затем пройти t1.si() к task.RetryableChain (...)

job = task.RetryableChain(...) 
job.delay() 

Когда метод запуска вызывается работником, он по-прежнему получает старое значение (time_limit = 100), где, как я установил time_limit = 30.

Пожалуйста, дайте мне знать, если проблема все еще существует в версии 3.1.18.

ответ

0

Мне пришлось исправить код сельдерея, чтобы он работал. Это, безусловно, временное решение, но оно работает. Я не уверен, когда атрибуты установлены с новыми значениями, то почему они не переносятся на worker.job. Я могу понять, что когда мы вызываем task.si или s(), он создает экземпляр Signature, который не содержит эти атрибуты time_limit, поэтому он берет из исходных значений, хранящихся в классе. Просто мысль.

t1 = MyTask() 
kwargs = {} 
kwargs['time_limit'] = 30 
kwargs['soft_time_limit'] = 40 

t.s(kwargs) 

---- >>> /celery/worker/job.py

def execute_using_pool(self, pool, **kwargs): 
    """Used by the worker to send this task to the pool. 

    :param pool: A :class:`celery.concurrency.base.TaskPool` instance. 

    :raises celery.exceptions.TaskRevokedError: if the task was revoked 
     and ignored. 

    """ 
    uuid = self.id 
    task = self.task 
    if self.revoked(): 
     raise TaskRevokedError(uuid) 

    hostname = self.hostname 
    kwargs = self.kwargs 
    if task.accept_magic_kwargs: 
     kwargs = self.extend_with_default_kwargs() 
    request = self.request_dict 
    request.update({'hostname': hostname, 'is_eager': False, 
        'delivery_info': self.delivery_info, 
        'group': self.request_dict.get('taskset')}) 
    timeout, soft_timeout = request.get('timelimit', (None, None)) 
    # timeout = timeout or task.time_limit 
    # soft_timeout = soft_timeout or task.soft_time_limit 
    **# SKAR request.get(‘time limit’) always returns the original value stored in Task. 
    timeout = kwargs.get('time_limit', task.time_limit) 
    soft_timeout = kwargs.get('soft_time_limit', task.soft_time_limit)** 
    result = pool.apply_async(
     trace_task_ret, 
     args=(self.name, uuid, self.args, kwargs, request), 
     accept_callback=self.on_accepted,