2

Я написал простой TimeManager: менеджер контекста, который запускает threading.Timer при вводе контекста и отменяет его при выходе из него. Если таймер выключается до выхода из контекста, он вызывает исключение:Захват исключения, вызванный threading.Timer в менеджере контекста

import threading 

class TimeManager(object): 
    def __init__(self): 
     self._timeout = 1 

    def _timeoutHandler(self): 
     raise Exception("Timeout!") 

    def __enter__(self): 
     self.timer = threading.Timer(self._timeout, self._timeoutHandler) 
     self.timer.start() 
     return self 

    def __exit__(self, exc_type, exc_val, exc_tb): 
     self.timer.cancel() 
     return False 

Очевидно, что я не могу поймать исключение в основном потоке, так как он принадлежит к отдельной теме:

>>> with TimeManager() as t: 
... try: 
...  time.sleep(5) 
... except Exception: 
...  print "caught" 
... 
Exception in thread Thread-3: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner 
    self.run() 
    File "/usr/lib64/python2.6/threading.py", line 736, in run 
    self.function(*self.args, **self.kwargs) 
    File "<stdin>", line 5, in _timeoutHandler 
Exception: Timeout! 

Итак, как я могу поймать исключение в основном потоке? Должен ли я отказаться от идеи менеджера контекста?

Обратите внимание, что проблема не совпадает с описанием here, там не существует нескольких потоков. I думаю также отличается от this, где передача сообщений будет отрицать цель таймаута.

+1

Там есть модуль, который называется [ 'stopit'] (https://github.com/glenfant/stopit), что обеспечивает прерываемые менеджер контекста, используя потоки или сигналы. Однако каждый подход имеет свои ограничения. Например, используя потоки, вы фактически не можете прерывать блокирующий вызов (например, 'time.sleep'). Сигналы могут, но доступны только в Unix и не безопасны для нас в многопоточных приложениях. – dano

+0

Благодарим за то, что вы указали модуль 'stopit': это действительно то, что я искал. Если вы включите свой комментарий в ответ, я буду более чем счастлив отметить его как окончательный :) – Jir

ответ

2

Существует модуль под названием stopit, который предоставляет прерывистые контекстные менеджеры, используя потоки или сигналы. Однако каждый подход имеет свои ограничения. Например, используя потоки, вы фактически не можете прерывать блокирующий вызов (например, time.sleep). Сигналы могут, но доступны только в Unix и небезопасны для использования в многопоточных приложениях.

Похоже, что он использует функцию C-API PyThreadState_SetAsyncExc для асинхронного повышения исключения в нужном потоке.

Вот пример использование (взято из их документов):

>>> import time 
>>> def variable_duration_func(duration): 
...  t0 = time.time() 
...  while True: 
...   dummy = 0 
...   if time.time() - t0 > duration: 
...    break 
>>> 
>>> start_time = time.time() 
>>> with Timeout(2.0) as timeout_ctx: 
...  variable_duration_func(5.0) 
>>> time.time() - start_time < 2.2 
True 
>>> timeout_ctx.state == timeout_ctx.TIMED_OUT 
True