2016-11-06 8 views
6

Я использую многопроцессорную библиотеку для создания двух дочерних процессов. Я хотел бы убедиться, что до тех пор, пока родительский процесс жив, если дочерние процессы умирают (получают SIGKILL или SIGTERM), они автоматически перезапускаются. С другой стороны, если родительский процесс получает SIGTERM/SIGINT, я хочу, чтобы он завершил все дочерние процессы, а затем вышел из него.Многопроцессорность Python - Захват сигналов для перезапуска дочерних процессов или закрытия родительского процесса

Это, как я подошел к этой проблеме:

import sys 
import time 
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN 
from functools import partial 
import multiprocessing 
import setproctitle 

class HelloWorld(multiprocessing.Process): 
    def __init__(self): 
     super(HelloWorld, self).__init__() 

     # ignore, let parent handle it 
     signal(SIGTERM, SIG_IGN) 

    def run(self): 

     setproctitle.setproctitle("helloProcess") 

     while True: 
      print "Hello World" 
      time.sleep(1) 

class Counter(multiprocessing.Process): 
    def __init__(self): 
     super(Counter, self).__init__() 

     self.counter = 1 

     # ignore, let parent handle it 
     signal(SIGTERM, SIG_IGN) 

    def run(self): 

     setproctitle.setproctitle("counterProcess") 

     while True: 
      print self.counter 
      time.sleep(1) 
      self.counter += 1 


def signal_handler(helloProcess, counterProcess, signum, frame): 

    print multiprocessing.active_children() 
    print "helloProcess: ", helloProcess 
    print "counterProcess: ", counterProcess 

    if signum == 17: 

     print "helloProcess: ", helloProcess.is_alive() 

     if not helloProcess.is_alive(): 
      print "Restarting helloProcess" 

      helloProcess = HelloWorld() 
      helloProcess.start() 

     print "counterProcess: ", counterProcess.is_alive() 

     if not counterProcess.is_alive(): 
      print "Restarting counterProcess" 

      counterProcess = Counter() 
      counterProcess.start() 

    else: 

     if helloProcess.is_alive(): 
      print "Stopping helloProcess" 
      helloProcess.terminate() 

     if counterProcess.is_alive(): 
      print "Stopping counterProcess" 
      counterProcess.terminate() 

     sys.exit(0) 



if __name__ == '__main__': 

    helloProcess = HelloWorld() 
    helloProcess.start() 

    counterProcess = Counter() 
    counterProcess.start() 

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]: 
     signal(signame, partial(signal_handler, helloProcess, counterProcess)) 

    multiprocessing.active_children() 

Если я отправить SIGKILL на встречную, он будет правильно перезагрузиться. Однако отправка SIGKILL в helloProcess также перезапускает counterProcess вместо helloProcess?

Если я отправлю SIGTERM родительскому процессу, родительский выход будет завершен, но дочерние процессы станут сиротами и продолжат работу. Как исправить это поведение?

ответ

1

Чтобы воссоздать мертвые ребенок из signal.SIGCHLD обработчика, мать должна вызвать один из os.wait функций, потому что Process.is_alive здесь не работает.
Хотя возможно, это сложно, потому что signal.SIGCHLD доставлен матери, когда один из его детей изменил статус f.e. signal.SIGSTOP, signal.SIGCONT или любые другие завершающие сигналы принимаются ребенком.
Таким образом, обработчик signal.SIGCHLD должен различать тезисы состояния ребенка. Просто просто воссоздание детей, когда signal.SIGCHLD доставлено, может создать больше детей, чем необходимо.

Следующий код использует os.waitpid с os.WNOHANG, чтобы сделать это без блокировки и os.WUNTRACED и os.WCONTINUED для обучения, если signal.SIGCHLD от signal.SIGSTOP или signal.SIGCONT.
os.waitpid не работает, то есть возвращает (0, 0), если какой-либо из Process экземпляр print ed, i.e str(Process()), прежде чем вы позвоните os.waitpid.

import sys 
import time 
from signal import signal, pause, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_DFL 
import multiprocessing 
import os 

class HelloWorld(multiprocessing.Process): 
    def run(self): 
     # reset SIGTERM to default for Process.terminate to work 
     signal(SIGTERM, SIG_DFL) 
     while True: 
      print "Hello World" 
      time.sleep(1) 

class Counter(multiprocessing.Process): 
    def __init__(self): 
     super(Counter, self).__init__() 
     self.counter = 1 

    def run(self): 
     # reset SIGTERM to default for Process.terminate to work 
     signal(SIGTERM, SIG_DFL) 
     while True: 
      print self.counter 
      time.sleep(1) 
      self.counter += 1 


def signal_handler(signum, _): 
    global helloProcess, counterProcess 

    if signum == SIGCHLD: 
     pid, status = os.waitpid(-1, os.WNOHANG|os.WUNTRACED|os.WCONTINUED) 
     if os.WIFCONTINUED(status) or os.WIFSTOPPED(status): 
      return 
     if os.WIFSIGNALED(status) or os.WIFEXITED(status): 
      if helloProcess.pid == pid: 
       print("Restarting helloProcess") 
       helloProcess = HelloWorld() 
       helloProcess.start() 

      elif counterProcess.pid == pid: 
       print("Restarting counterProcess") 
       counterProcess = Counter() 
       counterProcess.start() 

    else: 
     # mother shouldn't be notified when it terminates children 
     signal(SIGCHLD, SIG_DFL) 
     if helloProcess.is_alive(): 
      print("Stopping helloProcess") 
      helloProcess.terminate() 

     if counterProcess.is_alive(): 
      print("Stopping counterProcess") 
      counterProcess.terminate() 

     sys.exit(0) 

if __name__ == '__main__': 

    helloProcess = HelloWorld() 
    helloProcess.start() 

    counterProcess = Counter() 
    counterProcess.start() 

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]: 
     signal(signame, signal_handler) 

    while True: 
     pause() 

Следующий код воссоздает мертвых детей без использования signal.SIGCHLD. Так что это проще чем первый.
Создав двух детей, материнский процесс устанавливает обработчик сигнала с именем term_child для SIGINT, SIGTERM, SIGQUIT. term_child завершает и присоединяет каждого ребенка после вызова.

Материнский процесс продолжает проверять, живы ли дети, и воссоздает их, если необходимо, в петле while.

Поскольку каждый ребенок наследует обработчик сигналов от матери, SIGINT обработчика должен быть сброшен в значение по умолчанию для Process.terminate работать

import sys 
import time 
from signal import signal, SIGINT, SIGTERM, SIGQUIT 
import multiprocessing 

class HelloWorld(multiprocessing.Process):  
    def run(self): 
     signal(SIGTERM, SIG_DFL) 
     while True: 
      print "Hello World" 
      time.sleep(1) 

class Counter(multiprocessing.Process): 
    def __init__(self): 
     super(Counter, self).__init__() 
     self.counter = 1 

    def run(self): 
     signal(SIGTERM, SIG_DFL) 
     while True: 
      print self.counter 
      time.sleep(1) 
      self.counter += 1 

def term_child(_, __): 
    for child in children: 
     child.terminate() 
     child.join() 
    sys.exit(0) 

if __name__ == '__main__': 

    children = [HelloWorld(), Counter()] 
    for child in children: 
     child.start() 

    for signame in (SIGINT, SIGTERM, SIGQUIT): 
     signal(signame, term_child) 

    while True: 
     for i, child in enumerate(children): 
      if not child.is_alive(): 
       children[i] = type(child)() 
       children[i].start() 
     time.sleep(1) 
5

Есть несколько проблем с кодом, поэтому я собираюсь переходить через них в последовательности.

Если я отправил SIGKILL в counterProcess, он перезапустится правильно. Однако отправка SIGKILL в helloProcess также перезапускает counterProcess вместо helloProcess?

Это своеобразное поведение, скорее всего, связано с отсутствием блокирующего вызова в вашем основном процессе, поскольку multiprocessing.active_children() на самом деле не действует. Я не могу объяснить, почему именно программа ведет себя так, как это делает, но добавляет блокирующий вызов в функцию __main__, например.

while True: 
    time.sleep(1) 

решает проблему.

Другой довольно серьезной проблемой является то, как вы передаете объекты в обработчик:

helloProcess = HelloWorld() 
... 
partial(signal_handler, helloProcess, counterProcess) 

который obsolate, учитывая, что вы создавать новые объекты внутри:

if not helloProcess.is_alive(): 
    print "Restarting helloProcess" 

    helloProcess = HelloWorld() 
    helloProcess.start() 

Обратите внимание, что оба объекта используют различные псевдонимы HelloWorld() объектов. Частичный объект связан с псевдонимом в функции __main__, а объект в обратном вызове связан с его локальным псевдонимом области. Поэтому, назначая новый объект локальному псевдониму области, вы на самом деле не влияете на объект, к которому привязан обратный вызов (он все еще привязан к объекту, созданному в области __main__).

Вы можете исправить это подмены своего обратного вызова сигнала с новыми объектами так же, как в рамках обратного вызова:

def signal_handler(...): 
    ... 
    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]: 
     signal(signame, partial(signal_handler, helloProcess, counterProcess)) 
    ... 

Однако это приводит к другой ловушке, потому что теперь каждый дочерний процесс наследует функцию обратного вызова от родительского и доступа это каждый раз, когда он принимает сигналы.Чтобы исправить это, вы можете временно установить обработчик сигналов к невыполнению прямо перед созданием дочернего процесса:

for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]: 
    signal(signame, SIG_DFL) 

Наконец, вы можете подавить любой сигнал, поступающим от ваших дочерних процессов перед завершением их, в противном случае они будут вызывать обратный вызов снова:

signal(SIGCHLD, SIG_IGN) 

Обратите внимание, что вы мы хотите перестроить архитектуру приложения и использовать некоторые из функций multiprocessing предоставляет.

Окончательный код:

import sys 
import time 
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN, SIG_DFL 
from functools import partial 
import multiprocessing 
#import setproctitle 

class HelloWorld(multiprocessing.Process): 
    def __init__(self): 
     super(HelloWorld, self).__init__() 

     # ignore, let parent handle it 
     #signal(SIGTERM, SIG_IGN) 

    def run(self): 

     #setproctitle.setproctitle("helloProcess") 

     while True: 
      print "Hello World" 
      time.sleep(1) 

class Counter(multiprocessing.Process): 
    def __init__(self): 
     super(Counter, self).__init__() 

     self.counter = 1 

     # ignore, let parent handle it 
     #signal(SIGTERM, SIG_IGN) 

    def run(self): 

     #setproctitle.setproctitle("counterProcess") 

     while True: 
      print self.counter 
      time.sleep(1) 
      self.counter += 1 


def signal_handler(helloProcess, counterProcess, signum, frame): 

    print multiprocessing.active_children() 
    print "helloProcess: ", helloProcess 
    print "counterProcess: ", counterProcess 

    print "current_process: ", multiprocessing.current_process() 

    if signum == 17: 

     # Since each new child inherits current signal handler, 
     # temporarily set it to default before spawning new child. 
     for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]: 
      signal(signame, SIG_DFL) 

     print "helloProcess: ", helloProcess.is_alive() 

     if not helloProcess.is_alive(): 
      print "Restarting helloProcess" 

      helloProcess = HelloWorld() 
      helloProcess.start() 

     print "counterProcess: ", counterProcess.is_alive() 

     if not counterProcess.is_alive(): 
      print "Restarting counterProcess" 

      counterProcess = Counter() 
      counterProcess.start() 

     # After new children are spawned, revert to old signal handling policy. 
     for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]: 
      signal(signame, partial(signal_handler, helloProcess, counterProcess)) 


    else: 

     # Ignore any signal that child communicates before quit 
     signal(SIGCHLD, SIG_IGN) 

     if helloProcess.is_alive(): 
      print "Stopping helloProcess" 
      helloProcess.terminate() 

     if counterProcess.is_alive(): 
      print "Stopping counterProcess" 
      counterProcess.terminate() 

     sys.exit(0) 



if __name__ == '__main__': 

    helloProcess = HelloWorld() 
    helloProcess.start() 

    counterProcess = Counter() 
    counterProcess.start() 

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]: 
     signal(signame, partial(signal_handler, helloProcess, counterProcess)) 

    while True: 
     print multiprocessing.active_children() 
     time.sleep(1) 
+0

'обработчика signal.SIGCHLD' и' multiprocessing.Process' не работает вместе. В обработчике 'signal.SIGCHLD'' Process.is_alive' возвращает 'True' даже после того, как дочерний элемент завершился. –

 Смежные вопросы

  • Нет связанных вопросов^_^