Каким образом Луиджи выполняет методы (запускать, выводить, требует). Я понимаю, что требование запускается как первая проверка для проверки действительности DAG задачи, но не следует выводить вывод после запуска()?Порядок выполнения задач задачи Luigi
Я на самом деле пытаюсь дождаться сообщения кафки в запуске и на основе этого запуска кучу других задач и вернуть LocalTarget. Как это:
def run(self):
for message in self.consumer:
self.metadata_key = str(message.value, 'utf-8')
self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
if not os.path.exists(self.path):
os.mkdir(self.path)
with self.conn.cursor() as cursor:
all_accounts = cursor.execute('select domainname from tblaccountinfo;')
for each in all_accounts:
open(os.path.join(self.path,each)).close()
def output(self):
return LocalTarget(self.path)
Однако, я получаю сообщение об ошибке сказав:
Исключение: путь или is_tmp должен быть установлен
В возвратного LocalTarget (self.path) линии , Почему luigi пытается выполнить метод def output() до выполнения def run()?