2016-06-22 3 views
0

Я использую Luigi для выполнения нескольких задач, а затем мне нужно перенаправить вывод в стандартизованное расположение файла. Я написал WrapperTask с перегруженной complete() способом, чтобы сделать это:Retry .complete() для WrapperTask

from luigi.task import flatten 

class TaskX(luigi.WrapperTask): 
    date = luigi.DateParameter() 
    client = luigi.s3.S3Client() 

    def requires(self): 
     yield TaskA(date=self.date)  
     yield TaskB(date=self.date) 

    def complete(self): 
     tasks_complete = all(r.complete() for r in flatten(self.requires())) 

     ## at the end of everything, batch copy the files 
     if tasks_complete: 
      self.client.copy('current-old', 'current') 
      return True 
     else: 
      return False 


if __name__ == "__main__": 
    luigi.run() 

но у меня возникают проблемы при получении условную часть complete() будет вызвана, когда процесс фактически завершен.

Я предполагаю, что это из-за asynchronous behavior, на что указывают другие, но я не уверен, как это исправить.

Я попытался запустить Луиджи с этими параметрами командной строки:

$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task 

Но не кажется, что работает правильно. Правильно ли это подходит для решения этой задачи?

Кроме того, мне любопытно - кто-нибудь имел опыт работы с командой --worker-retry-external-task? У меня проблемы с пониманием.

В source code,

def _is_external(task): 
    return task.run is None or task.run == NotImplemented 

вызывается, чтобы определить, имеет ли или нет LuigiTask в run() метод, которым WrapperTask не делает. Таким образом, я ожидал бы, что флаг --retry-external-task повторит complete() для этого, пока он не будет завершен, тем самым выполняя действие. Тем не менее, просто играя в интерпретатором приводит меня к мысли, что:

>>> import luigi_newsletter_process 
>>> task = luigi_newsletter_process.Newsletter() 
>>> task.run 
    <bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)> 
>>> task.run() 
>>> task.run == None 
False 
>>> task.run() == None 
True 

Этот фрагмент кода не делает то, что он думает, что это.

Я здесь вне базы?

ответ

0

Я до сих пор считаю, что переопределение .complete() теоретически могло бы это сделать, и я до сих пор не уверен, почему это не так, но если вы просто ищете способ передачи больших файлов после запуска процесса , практическое решение состоит в том, чтобы передача осуществлялась в рамках метода .run():

def run(self): 
    logger.info('transferring into current directory') 
    self.client.copy('current-old','current')