2017-01-31 7 views
4

Сценарий. У меня есть пул процессов, в котором я выполняю задания. Однако, если подпроцесс убит во время выполнения задачи, объект AsyncResult никогда не будет отмечен как готовый. Я бы надеялся, что это будет отмечено как готовое и неудачное.Как я могу узнать, будет ли AsyncResult никогда не быть готовым?

Чтобы воспроизвести это:

>>> import multiprocessing 
>>> import time 
>>> p = multiprocessing.Pool(processes=1) 
>>> result = p.apply_async(time.sleep, args=(1000,)) 
>>> result.ready() 
False 

В другой оболочке, найти идентификатор процесса и убить его.

>>> result.ready() 
False 
>>> result.wait(5) # Waits 5 seconds even though subprocess is dead 

Это проблема, потому что у меня есть поток, ожидающий выполнения задания, и он обычно имеет довольно длительный тайм-аут. Как мне получить вызов result.wait(timeout), чтобы не дождаться таймаута? Кроме того, как я могу сказать, что он был оставлен, а не только, что задача все еще работает, но мы достигли таймаута?

ответ

1

Библиотека Pebble уведомляет вас, если процесс неожиданно замирает. Он также поддерживает таймауты и обратные вызовы.

Вот ваш пример.

from pebble import ProcessPool 
from concurrent.futures import TimeoutError 

with ProcessPool() as pool: 
    future = pool.schedule(time.sleep, args=(1000,), timeout=100) 

    try: 
     results = future.result() 
     print(results) 
    except TimeoutError as error: 
     print("Function took longer than %d seconds" % error.args[1]) 
    except ProcessExpired as error: 
     print("%s. Exit code: %d" % (error, error.exitcode)) 
    except Exception as error: 
     print("function raised %s" % error) 
     print(error.traceback) # Python's traceback of remote process 

Другие примеры в documentation.

+0

https://github.com/noxdafox/pebble - Предполагаю, вы написали библиотеку? –

+0

Да, да. Пул был добавлен в библиотеку из-за отсутствия реализации, способной справляться с таймаутами и сбоями. Это особенно важно при запуске приложений с интенсивным использованием процессора, которые не могут быть ограничены. Другим примером использования примера были привязки Python библиотек C, страдающих случайными segfaults, сбой всего приложения. – noxdafox

1

Вызов result.wait() будет ждать, пока не достигнет timeout, если только он не получит сигнал от пула. Однако, если вы сделаете kill -9 [pid], тогда пул немедленно запустит следующее задание в очереди.

По существу, это проще использовать, а затем вручную «опросить» и проверить ready(). Проблема с этим, как вы заявили, заключается в том, что ready() остается False, когда работа убита.

Чтобы исправить это, вы можете проверить, жив ли pid или нет. Поскольку ApplyResult не несет pid, вам нужны другие средства для его получения. Вы могли бы сделать что-то вроде этого:

def test(identifier): 
    pid = os.getpid() 

    f = open("pids/" + str(pid), "w") 
    f.write(str(identifier)) 
    f.close() 

    # do stuff 
    time.sleep(1000) 

Затем создать работу, как это (jobs = [] Учитывая,).

job = (identifier, pool.apply_async(test, (identifier,))) 
jobs.append(job) 

Идентификатор не требуется, но полезно, если позже вы хотите, чтобы выяснить, какие ApplyResult принадлежит к какой идентификатор процесса.

Вы можете получить все рабочие места и проверить, если каждое задание (ИДП) живо:

def is_alive(pid): 
    return os.path.exists("/proc/" + str(pid)) 

for pid in os.listdir("pids"): 
    if is_alive(pid): 
     ... 
    else: 
     ... 

Учитывая содержание каждого PID-имя-файл. Затем, используя identifier, найденный в jobs, вы можете теперь связать, какой ApplyResult принадлежит к тому, какой pid и конкретно проверить, какая работа мертва или ready() или, если ни одно из перечисленных выше не работает.


Вы также можете создать трубку и вилку дочернего процесса.

r, w = os.pipe() 

def child(): 
    global r, w 

    data = ... 
    time.sleep(100) 

    os.close(r) 
    w = os.fdopen(w, "w") 
    w.write(data) 
    w.close() 

Затем вы просто записываете данные обратно в родительский процесс.

def parent(child_pid): 
    global r, w 

    os.close(w) 

    r = os.fdopen(r) 
    data = r.read() 
    r.close() 

    status = os.waitpid(child_pid, 0) 
    if status == 0: 
     # Everything is fine 
    elif status == 9: 
     # kill -9 [pid] 

    # Process data 

Вы можете использовать status и data получил решить, что случилось с ребенком процесса.

Вы начинаете все, делая.

if __name__ == "__main__": 
    child_pid = os.fork() 
    if child_pid: 
     parent(child_pid) 
    else: 
     child() 

Из вашего вопроса я предполагаю, что Unix. Если не стесняйтесь поправлять меня. Также, если какой-либо не-Python 2.7 проскользнул в ответ, я приношу свои извинения.