2016-08-24 5 views
0

Я использую python multiprocessing, чтобы развернуть некоторые дочерние процессы для выполнения моих заданий. Существует два требования:Как отличить процессы в Multiprocessing.Pool?

  1. Мне нужно знать pid дочернего процесса в случае его убийства, если захочу.
  2. Мне нужен обратный вызов, чтобы сделать некоторые вещи после окончания работы. Поскольку эти материалы используют блокировку в родительском процессе, это невозможно сделать в дочернем процессе.

Но я получаю:

  1. Процесс генерироваться by multiprocessing.Process() имеет атрибут "PID", чтобы получить идентификатор процесса. Но я не могу добавить свой асинхронный обратный вызов, конечно, я тоже не могу ждать синхронно.
  2. Пул процессов, созданный multiprocessing.Pool(), обеспечивает интерфейс обратного вызова. Но я не могу определить, какой процесс в пуле соответствует моей задаче, так как мне может понадобиться убить процесс в соответствии с конкретным заданием.

Задача дешева, здесь показана код:

import random, time 
import multiprocessing 
import os 

class Job(object): 
    def __init__(self, jobid, jobname, command): 
     self.jobid, self.jobname, self.command = jobid, jobname, command 

    def __str__(self): 
     return "Job <{0:05d}>".format(self.jobid) 

    def __repr__(self): 
     return self.__str__() 

def _run_job(job): 
    time.sleep(1) 
    print "{} done".format(job) 
    return job, random.choice([True, False]) # the second argument indicates whether job has finished successfully 

class Test(object): 
    def __init__(self): 
     self._loc = multiprocessing.Lock() 
     self._process_pool = multiprocessing.Pool() 

    def submit_job(self, job): 
     with self._loc: 
      self._process_pool.apply_async(_run_job, (job,), callback=self.job_done) 
      print "submitting {} successfully".format(job) 

    def job_done(self, result): 
     with self._loc: 
      # stuffs after job has finished is related to some cleanning work, so it needs the lock of the parent process 
      job, success = result 
      if success: 
       print "{} success".format(job) 
      else: 
       print "{} failure".format(job) 


j1 = Job(1, "test1", "command1") 
j2 = Job(2, "test2", "command2") 
t = Test() 
t.submit_job(j1) 
t.submit_job(j2) 
time.sleep(3.1) # wait for all jobs finishing 

Но теперь я не могу получить идентификатор процесса, соответствующий каждую работу. Например, мне нужно убить задание < 1>, но я не могу найти, какой процесс в пуле процессов связан с заданием < 1>, поэтому я не могу убивать работу всякий раз, когда захочу.

Если я использую multiprocessing.Process, то я могу записать pid каждого процесса с соответствующим заданием. Но теперь я не могу добавить метод обратного вызова.

Так есть способ как получить pid дочернего процесса, так и добавить метод обратного вызова?

ответ

0

И наконец, я нахожу решение: вместо этого используйте multiprocessing.Event.

С multiprocessing.Pool не может сказать, какой процесс назначен, я не могу записать его, чтобы я мог убить его в соответствии с идентификатором задания всякий раз, когда захочу.

К счастью, multiprocessing предоставляет объект Event в качестве альтернативы методу обратного вызова. Вспомните, что делает метод обратного вызова: он обеспечивает асинхронный ответ на дочерний процесс. Как только дочерний процесс завершится, родительский процесс может обнаружить его и вызвать метод обратного вызова. Таким образом, основная проблема заключается в том, как родительский процесс определяет, завершился ли дочерний процесс или нет. Это Event объект для.

Итак, решение прост: передайте объект Event объекту дочернему процессу. Как только дочерний процесс завершится, он установит объект Event. В родительском процессе он запускает поток демона, чтобы контролировать, установлено ли событие. Если это так, он может вызвать метод, который выполняет эти функции обратного вызова. Более того, поскольку я создал процессы с multiprocessing.Process вместо multiprocessing.Pool, я могу легко получить его PID, что позволяет мне его убить.

Код решения:

import time 
import multiprocessing 
import threading 

class Job(object): 
    def __init__(self, jobid, jobname, command): 
     self.jobid, self.jobname, self.command = jobid, jobname, command 
     self.lifetime = 0 

    def __str__(self): 
     return "Job <{0:05d}>".format(self.jobid) 

    def __repr__(self): 
     return self.__str__() 

def _run_job(job, done_event): 
    time.sleep(1) 
    print "{} done".format(job) 
    done_event.set() 

class Test(object): 
    def __init__(self): 
     self._loc = multiprocessing.Lock() 
     self._process_pool = {} 
     t = threading.Thread(target=self.scan_jobs) 
     t.daemon = True 
     t.start() 

    def scan_jobs(self): 
     while True: 
      with self._loc: 
       done_jobid = [] 
       for jobid in self._process_pool: 
        process, event = self._process_pool[jobid] 
        if event.is_set(): 
         print "Job<{}> is done in process <{}>".format(jobid, process.pid) 
         done_jobid.append(jobid) 
       map(self._process_pool.pop, done_jobid) 
      time.sleep(1) 


    def submit_job(self, job): 
     with self._loc: 
      done_event = multiprocessing.Event() 
      new_process = multiprocessing.Process(target=_run_host_job, args=(job, done_event)) 
      new_process.daemon = True 
      self._process_pool[job.jobid] = (new_process, done_event) 
      new_process.start() 
      print "submitting {} successfully".format(job) 


j1 = Job(1, "test1", "command1") 
j2 = Job(2, "test2", "command2") 
t = Test() 
t.submit_job(j1) 
t.submit_job(j2) 
time.sleep(5) # wait for job to finish 
+0

Интересно. У меня есть аналогичная проблема - почему бы просто не использовать список процессов? – comodoro

 Смежные вопросы

  • Нет связанных вопросов^_^