2016-11-03 6 views
2

My TaskB требует TaskA, а по завершении TaskA записывает в таблицу MySQL, а затем TaskB должен принимать этот вывод в таблицу в качестве своего ввода.Задачи MySQL в рабочем процессе Luigi

Я не могу понять, как это сделать в Луиджи. Может ли кто-нибудь указать мне пример или дать мне быстрый пример здесь?

ответ

5

Существующий MySqlTarget в luigi использует отдельную таблицу маркеров, чтобы указать, когда задача завершена. Вот грубый подход, который я бы взял ... но ваш вопрос очень абстрактный, поэтому в действительности он будет более сложным.

import luigi 
from datetime import datetime 
from luigi.contrib.mysqldb import MySqlTarget 


class TaskA(luigi.Task): 
    rundate = luigi.DateParameter(default=datetime.now().date()) 
    target_table = "table_to_update" 
    host = "localhost:3306" 
    db = "db_to_use" 
    user = "user_to_use" 
    pw = "pw_to_use" 

    def get_target(self): 
     return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table, 
          update_id=str(self.rundate)) 

    def requires(self): 
     return [] 

    def output(self): 
     return self.get_target() 

    def run(self): 
     #update table 
     self.get_target().touch() 


class TaskB(luigi.Task): 
    def requires(self): 
     return [TaskA()] 

    def run(self): 
     # reading from target_table 
+0

Для этого Matt. Это действительно помогает. У меня был один вопрос? означает ли это, что MySqlTarget отслеживает, какая строка обновляется с помощью update_id, которая является основным идентификатором строки. И в этом случае, если мои первичные идентификаторы являются автоинкрементами, что я делаю? –

+0

О, это сложно. Я думаю, вам придется использовать другое уникальное значение, помимо идентификатора autoincrement, как update_id. Это буквально работает ' "" "INSERT INTO {marker_table} (update_id, target_table) ЗНАЧЕНИЯ (% s,% s) ПО DUPLICATE KEY UPDATE update_id = ЗНАЧЕНИЯ (update_id) """ .format (marker_table = self.marker_table), (self.update_id, self.table) ' – MattMcKnight

+0

Итак, решение похоже, есть таблица обновлений, где регистрируются обновления рабочего процесса? Но я не хочу поддерживать таблицу для каждой задачи (и у меня много задач.) Итак, из вашей интерпретации sql того, что происходит, кажется, что я должен сделать это 'return MySqlTarget (host = self.host, database = self.db, user = self.user, password = self.pw, table = self.target_table, update_id = str (self.rundate), update_task_type = TASK_TYPE_A) ' –

 Смежные вопросы

  • Нет связанных вопросов^_^