2014-09-03 5 views
2

У меня есть unittest, который использует многопроцессорность.python 3.4 многопроцессорность не работает с unittest

После обновления с Python 3.2 до Python 3.4 я получаю следующую ошибку. Я не могу найти подсказку, что было изменено внутри Python и что мне нужно изменить, чтобы мой код работал.

Заранее спасибо.

Traceback (most recent call last): 
    File "<string>", line 1, in <module> 
    File "C:\Python341_64\lib\multiprocessing\spawn.py", line 106, in spawn_main 
    exitcode = _main(fd) 
    File "C:\Python341_64\lib\multiprocessing\spawn.py", line 116, in _main 
    self = pickle.load(from_parent) 
EOFError: Ran out of input 

Error 
Traceback (most recent call last): 
    File "D:\test_multiproc.py", line 46, in testSmallWorkflow 
    p.start() 
    File "C:\Python341_64\lib\multiprocessing\process.py", line 105, in start 
    self._popen = self._Popen(self) 
    File "C:\Python341_64\lib\multiprocessing\context.py", line 212, in _Popen 
    return _default_context.get_context().Process._Popen(process_obj) 
    File "C:\Python341_64\lib\multiprocessing\context.py", line 313, in _Popen 
    return Popen(process_obj) 
    File "C:\Python341_64\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__ 
    reduction.dump(process_obj, to_child) 
    File "C:\Python341_64\lib\multiprocessing\reduction.py", line 59, in dump 
    ForkingPickler(file, protocol).dump(obj) 
TypeError: cannot serialize '_io.TextIOWrapper' object 

После кода образца, как я могу воспроизвести ошибку:

import shutil 
import traceback 
import unittest 
import time 
from multiprocessing import Process 
import os 


class MyTest(unittest.TestCase): 

    #--------------------------------------------------------------------------- 
    def setUp(self): 
     self.working_dir = os.path.join(os.environ["TEMP"], "Testing") 
     os.mkdir(self.working_dir) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def tearDown(self): 
     try: 
      time.sleep(5) 
      shutil.rmtree(self.working_dir, ignore_errors=True) 
     except OSError as err: 
      traceback.print_tb(err) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def info(self, title): 
     print(title) 
     print('module name:', __name__) 
     if hasattr(os, 'getppid'): # only available on Unix 
      print('parent process:', os.getppid()) 
     print('process id:', os.getpid()) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def f(self, name): 
     self.info('function f') 
     print('hello', name) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def testSmallWorkflow(self): 
     self.info('main line') 
     p = Process(target=self.f, args=('bob',)) 
     p.start() 
     p.join() 
    #--------------------------------------------------------------------------- 

ответ

4

Проблема заключается в том, что сам unittest.TestCase класс больше не pickleable, и вы должны мариновать его для того, чтобы замариновать один его связанных методов (self.f). Простое решение было бы создать отдельный класс для методов, которые вы должны позвонить в дочернем процессе:

class Tester: 
    def info(self, title=None): 
     print("title {}".format(title)) 
     print('module name:', __name__) 
     if hasattr(os, 'getppid'): # only available on Unix 
      print('parent process:', os.getppid()) 
     print('process id:', os.getpid()) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def f(self, name): 
     self.info('function f') 
     print('hello', name) 
    #------------------------------- 


class MyTest(unittest.TestCase): 

    #--------------------------------------------------------------------------- 
    def setUp(self): 
     self.working_dir = os.path.join(os.environ["TEMP"], "Testing") 
     os.mkdir(self.working_dir) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def tearDown(self): 
     try: 
      time.sleep(5) 
      shutil.rmtree(self.working_dir, ignore_errors=True) 
     except OSError as err: 
      traceback.print_tb(err) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def testSmallWorkflow(self): 
     t = Tester() 
     self.info('main line') 
     p = Process(target=t.f, args=('bob',)) 
     p.start() 
     p.join() 

В качестве альтернативы, можно использовать __setstate__/__getstate__, чтобы удалить объект из TestCase, что это unpickleable. В этом случае это внутренний класс, называемый _Outcome. Мы не заботимся об этом у ребенка, поэтому мы можем просто удалить его из маринованного состояния:

class MyTest(unittest.TestCase): 

    #--------------------------------------------------------------------------- 
    def setUp(self): 
     self.working_dir = os.path.join(os.environ["TEMP"], "Testing") 
     os.mkdir(self.working_dir) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def tearDown(self): 
     try: 
      time.sleep(2) 
      shutil.rmtree(self.working_dir, ignore_errors=True) 
     except OSError as err: 
      traceback.print_tb(err) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def info(self, title=None): 
     print("title {}".format(title)) 
     print('module name:', __name__) 
     if hasattr(os, 'getppid'): # only available on Unix 
      print('parent process:', os.getppid()) 
     print('process id:', os.getpid()) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def f(self, name): 
     self.info('function f') 
     print('hello', name) 
    #--------------------------------------------------------------------------- 

    #--------------------------------------------------------------------------- 
    def testSmallWorkflow(self): 
     t = Tester() 
     self.info('main line') 
     p = Process(target=self.f, args=('bob',)) 
     p.start() 
     p.join() 

    def __getstate__(self): 
     self_dict = self.__dict__.copy() 
     del self_dict['_outcome'] 
     return self_dict 

    def __setstate(self, state): 
     self.__dict__.update(self_dict) 
+0

благодарит за очень полезный и быстрый ответ. Вы сказали, что unittest.Testcase _no longer_ pickable. У вас есть дополнительная информация для меня об этом и почему? Может быть, какая-то ссылка, где я могу прочитать об этом? – knumskull

+0

@knumskull Я не знаю, было ли это документировано где угодно; Я просто вычислил это, посмотрев на код. Проблема в том, что объект '_Outcome', который' TestCase' использует внутри, содержит атрибут 'result', который является экземпляром' unittest.runner.TextTestResult'. Этот класс отвечает за запись результатов каждого теста на экран. Он содержит ссылки на объекты '_io.TextIoWrapper', которые не могут быть маринованными. Если я найду какое-то время на этой неделе, я могу вникать в это, чтобы посмотреть, что именно изменилось между 3.2 и 3.4, и, возможно, предоставить патч, чтобы снова попробовать TestCase. – dano

+0

Спасибо за объяснение. Это мне очень помогло. – knumskull