1

Скажем, у меня есть исполнитель пула потоков с макс. 10 нитей, и я представить задачу к ней, которая сама по себе создает другую задачу, и в свою очередь, ждет его завершения, рекурсивно, пока не достигают глубины 11Пулы потоков Python - задачи, которые создают подзадачи и ждут от них

Пример кода в Python:

import concurrent.futures 

e = concurrent.futures.ThreadPoolExecutor(max_workers=10) 

def task(depth): 
    print 'started depth %d' % (depth,) 
    if depth > 10: 
     return depth 
    else: 
     f = e.submit(task, depth + 1) 
     concurrent.futures.wait([f]) 


f = e.submit(task, 0) 
print f.result() 

выше выходов кода:

started depth 0 
started depth 1 
started depth 2 
started depth 3 
started depth 4 
started depth 5 
started depth 6 
started depth 7 
started depth 8 
started depth 9 

и взаимоблокировки.

Есть ли способ решить эту проблему без создания дополнительных потоков и исполнителей?

Другими словами, путь для рабочих потоков для работы над другими задачами в ожидании?

+0

http://stackoverflow.com/questions/1239035/asynchronous-method-call-in-python –

+0

Или еще лучше пойти с высоким уровнем решения, как сельдерей HTTP://www.celeryproject.org/ –

+0

Не заходит в тупик, потому что он продолжает вызывать новые потоки, но максимальное количество в пуле - только 10. Ни один из нитей не заканчивает свою задачу. – Alexander

ответ

3

Использование Сопрограммы кода можно переписать так:

import asyncio 

@asyncio.coroutine 
def task(depth): 
    print('started depth %d' % (depth,)) 
    if depth > 10: 
     return depth 
    else: 
     # create new task 
     t = asyncio.async(task(depth + 1)) 
     # wait for task to complete 
     yield from t 
     # get the result of the task 
     return t.result() 

loop = asyncio.get_event_loop() 
result = loop.run_until_complete(task(1)) 
print(result) 
loop.close() 

Однако, я пытаясь понять, зачем вам нужен этот дополнительный код. В вашем примере кода вы всегда ждете непосредственно результата задачи, поэтому ваш код не будет работать без исполнителя. Например, следующий будет тот же результат

def task(depth): 
    print 'started depth %d' % (depth,) 
    if depth > 10: 
     return depth 
    else: 
     task(depth + 1) 

Я думаю, что этот пример из документации лучше показывает, как асинхронная сопрограммы способны parallelise задачи. В этом примере создаются 3 задачи, каждая из которых вычисляет другой факториал. Обратите внимание, как при выполнении каждой задачи другой сопрограммы (в данном случае async.sleep) другой задаче разрешено продолжить ее выполнение.

import asyncio 

@asyncio.coroutine 
def factorial(name, number): 
    f = 1 
    for i in range(2, number+1): 
     print("Task %s: Compute factorial(%s)..." % (name, i)) 
     yield from asyncio.sleep(1) 
     f *= i 
    print("Task %s: factorial(%s) = %s" % (name, number, f)) 

loop = asyncio.get_event_loop() 
tasks = [ 
    asyncio.ensure_future(factorial("A", 2)), 
    asyncio.ensure_future(factorial("B", 3)), 
    asyncio.ensure_future(factorial("C", 4))] 
loop.run_until_complete(asyncio.wait(tasks)) 
loop.close() 

Выход:

Task A: Compute factorial(2)... 
Task B: Compute factorial(2)... 
Task C: Compute factorial(2)... 
Task A: factorial(2) = 2 
Task B: Compute factorial(3)... 
Task C: Compute factorial(3)... 
Task B: factorial(3) = 6 
Task C: Compute factorial(4)... 
Task C: factorial(4) = 24 
0

Нет, если вы хотите избежать тупиковой ситуации, вы не можете ждать в будущем от того же исполнителя в задаче.

Единственное, что можно сделать в данном примере, чтобы вернуться в будущем, а затем рекурсивно обрабатывать результаты:

import concurrent.futures 
import time 

e = concurrent.futures.ThreadPoolExecutor(max_workers=10) 

def task(depth): 
    print 'started depth %d' % (depth,) 
    if depth > 10: 
     return depth 
    else: 
     f = e.submit(task, depth + 1) 
     return f 


f = e.submit(task, 0) 
while isinstance(f.result(), concurrent.futures.Future): 
    f = f.result() 

print f.result() 

Однако было бы лучше, чтобы избежать такого рекурсивного выполнения в первой очереди.

0

Что вы здесь испытываете, это то, что вы уже справедливо назвали deadlock. Первый поток, который запускает следующий поток и ждет его, удерживает lock, который все последующие задачи будут заторможены, ожидая того, что тот же lock будет выпущен (что никогда не будет в вашем случае). Я предлагаю вам начать свои собственные темы в задачах вместо того, чтобы использовать бассейн, что-то вроде:

import concurrent.futures 
import threading 


class TaskWrapper(threading.Thread): 

    def __init__(self, depth, *args, **kwargs): 
     self._depth = depth 
     self._result = None 
     super(TaskWrapper, self).__init__(*args, **kwargs) 

    def run(self): 
     self._result = task(self._depth) 

    def get(self): 
     self.join() 
     return self._result 

e = concurrent.futures.ThreadPoolExecutor(max_workers=10) 


def task(depth): 
    print 'started depth %d' % (depth,) 
    if depth > 10: 
     return depth 
    else: 
     t = TaskWrapper(depth + 1) 
     t.start() 
     return t.get() 

f = e.submit(task, 0) 
print f.result()