2016-12-29 8 views
1

Каким образом Луиджи выполняет методы (запускать, выводить, требует). Я понимаю, что требование запускается как первая проверка для проверки действительности DAG задачи, но не следует выводить вывод после запуска()?Порядок выполнения задач задачи Luigi

Я на самом деле пытаюсь дождаться сообщения кафки в запуске и на основе этого запуска кучу других задач и вернуть LocalTarget. Как это:

def run(self): 
    for message in self.consumer: 
     self.metadata_key = str(message.value, 'utf-8') 
     self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id) 
     if not os.path.exists(self.path): 
      os.mkdir(self.path) 

     with self.conn.cursor() as cursor: 
       all_accounts = cursor.execute('select domainname from tblaccountinfo;') 
     for each in all_accounts: 
      open(os.path.join(self.path,each)).close() 

def output(self): 
    return LocalTarget(self.path) 

Однако, я получаю сообщение об ошибке сказав:

Исключение: путь или is_tmp должен быть установлен

В возвратного LocalTarget (self.path) линии , Почему luigi пытается выполнить метод def output() до выполнения def run()?

ответ

2

Когда вы запускаете конвейер (то есть одну или несколько задач), Луиджи сначала проверяет, существуют ли его выходные цели, а если нет, запланировано выполнение задачи.

Как Луиджи знает, какие цели он должен проверить? Это просто вызывает вызов метода output() вашей задачи.