У меня длинный конвейер, выполненный с сопрограммами. Обычно это выборка потоков журналов, выполнение некоторого обогащения (threaded) и запись их в хранилище данных.Как запускать несколько параллельных сопроцессов ввода-вывода параллельно
Вот небольшой пример для имитации трубопровода:
import time
import random
from concurrent import futures
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
next(cr)
return cr
return start
@coroutine
def foo():
pool = futures.ThreadPoolExecutor(max_workers=10)
while True:
i = (yield)
fut = pool.submit(enrich, i)
fut.add_done_callback(result_handler)
time.sleep(random.random()*10)
def enrich(i):
enriched = 'foo' + str(i)
time.sleep(random.random())
return enriched
def source(name, target):
while True:
time.sleep(random.random())
i = random.randint(0,10)
target.send(name + str(i))
Единого трубопровод вызывается следующим образом работает отлично.
source('task one ', foo())
Теперь я хотел бы запустить несколько конвейеров для разных журналов в фоновом потоке. Одна попытка - снова использовать ThreadPoolExecutor для управления несколькими конвейерами.
def run():
pool = futures.ThreadPoolExecutor(max_workers=10)
tasks = [source('task one ', foo()),
source('task two ', foo())]
for task in tasks:
fut = pool.submit(task)
fut.add_done_callback(result_handler)
Однако трубопровод блокирует первую задачу и никогда не выполняет вторую задачу. Каков правильный способ запуска таких длинных (возможно, вечно) конвейеров в фоновом потоке?
В 'run' функции вы передаете результат' источника ('задача одного', Foo() 'в' pool.submit (задача) '. Он должен быть' pool.submit (источник, «задача одна ', foo()) ' –
Или,' tasks = [(source,' task one ', foo()), (source,' task two ', foo())] '. Затем' pool.submit ('* task) ' –
Отлично! Это решает проблему моделирования, которую я представил здесь. Фактический конвейер довольно запутан, и я все еще получаю исключение обратного вызова, которое мне нужно будет проверить. Спасибо за быстрый ответ! – maiaini