2016-07-03 4 views
1

У меня длинный конвейер, выполненный с сопрограммами. Обычно это выборка потоков журналов, выполнение некоторого обогащения (threaded) и запись их в хранилище данных.Как запускать несколько параллельных сопроцессов ввода-вывода параллельно

Вот небольшой пример для имитации трубопровода:

import time 
import random 
from concurrent import futures 

def coroutine(func): 
    def start(*args, **kwargs): 
     cr = func(*args, **kwargs) 
     next(cr) 
     return cr 
    return start 

@coroutine 
def foo(): 
    pool = futures.ThreadPoolExecutor(max_workers=10) 
    while True: 
     i = (yield) 
     fut = pool.submit(enrich, i) 
     fut.add_done_callback(result_handler) 
     time.sleep(random.random()*10) 

def enrich(i): 
    enriched = 'foo' + str(i) 
    time.sleep(random.random()) 
    return enriched 

def source(name, target): 
    while True: 
     time.sleep(random.random()) 
     i = random.randint(0,10) 
     target.send(name + str(i)) 

Единого трубопровод вызывается следующим образом работает отлично.

source('task one ', foo()) 

Теперь я хотел бы запустить несколько конвейеров для разных журналов в фоновом потоке. Одна попытка - снова использовать ThreadPoolExecutor для управления несколькими конвейерами.

def run(): 
    pool = futures.ThreadPoolExecutor(max_workers=10) 
    tasks = [source('task one ', foo()), 
      source('task two ', foo())] 
    for task in tasks: 
     fut = pool.submit(task) 
     fut.add_done_callback(result_handler) 

Однако трубопровод блокирует первую задачу и никогда не выполняет вторую задачу. Каков правильный способ запуска таких длинных (возможно, вечно) конвейеров в фоновом потоке?

+1

В 'run' функции вы передаете результат' источника ('задача одного', Foo() 'в' pool.submit (задача) '. Он должен быть' pool.submit (источник, «задача одна ', foo()) ' –

+0

Или,' tasks = [(source,' task one ', foo()), (source,' task two ', foo())] '. Затем' pool.submit ('* task) ' –

+0

Отлично! Это решает проблему моделирования, которую я представил здесь. Фактический конвейер довольно запутан, и я все еще получаю исключение обратного вызова, которое мне нужно будет проверить. Спасибо за быстрый ответ! – maiaini

ответ

2

Потому что функция source бесконечна, список tasks = [source('task one ', foo()), source('task two ', foo())] не создан. Вот почему запускается первая задача и блоки pipelne.
Решение должно пройти source и его аргументы pool.submit.

tasks = [(source, 'task one', foo()), (source, 'task two', foo())] 
for task in tasks: 
    fut = pool.submit(*task) 
    fut.add_done_callback(result_handler)