Я использую Luigi для выполнения нескольких задач, а затем мне нужно перенаправить вывод в стандартизованное расположение файла. Я написал WrapperTask с перегруженной complete()
способом, чтобы сделать это:Retry .complete() для WrapperTask
from luigi.task import flatten
class TaskX(luigi.WrapperTask):
date = luigi.DateParameter()
client = luigi.s3.S3Client()
def requires(self):
yield TaskA(date=self.date)
yield TaskB(date=self.date)
def complete(self):
tasks_complete = all(r.complete() for r in flatten(self.requires()))
## at the end of everything, batch copy the files
if tasks_complete:
self.client.copy('current-old', 'current')
return True
else:
return False
if __name__ == "__main__":
luigi.run()
но у меня возникают проблемы при получении условную часть complete()
будет вызвана, когда процесс фактически завершен.
Я предполагаю, что это из-за asynchronous behavior, на что указывают другие, но я не уверен, как это исправить.
Я попытался запустить Луиджи с этими параметрами командной строки:
$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task
Но не кажется, что работает правильно. Правильно ли это подходит для решения этой задачи?
Кроме того, мне любопытно - кто-нибудь имел опыт работы с командой --worker-retry-external-task
? У меня проблемы с пониманием.
В source code,
def _is_external(task):
return task.run is None or task.run == NotImplemented
вызывается, чтобы определить, имеет ли или нет LuigiTask в run()
метод, которым WrapperTask
не делает. Таким образом, я ожидал бы, что флаг --retry-external-task
повторит complete()
для этого, пока он не будет завершен, тем самым выполняя действие. Тем не менее, просто играя в интерпретатором приводит меня к мысли, что:
>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
<bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True
Этот фрагмент кода не делает то, что он думает, что это.
Я здесь вне базы?