2008-12-10 4 views
11

(я использую pyprocessing модуль в этом примере, но заменяя обработку с многопроцессорной, вероятно, следует работать, если вы запустите python 2.6 или использовать multiprocessing backport)Правильный способ отмены принимать и закрытия соединения Python обработки/многопроцессорная Приёмника

В настоящее время у меня есть программа, которая слушает Unix-сокет (используя process.connection.Listener), принимает соединения и порождает поток, обрабатывающий запрос. В какой-то момент я хочу выйти из процесса изящно, но так как accept() - вызов блокируется, и я не вижу способа отменить его красивым способом. У меня есть один способ, который работает здесь (OS X), по крайней мере, установка обработчика сигналу и сигнализацию процесса из другого потока следующим образом:

import processing 
from processing.connection import Listener 
import threading 
import time 
import os 
import signal 
import socket 
import errno 

# This is actually called by the connection handler. 
def closeme(): 
    time.sleep(1) 
    print 'Closing socket...' 
    listener.close() 
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE) 

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None) 

listener = Listener('/tmp/asdf', 'AF_UNIX') 
# This is a thread that handles one already accepted connection, left out for brevity 
threading.Thread(target=closeme).start() 
print 'Accepting...' 
try: 
    listener.accept() 
except socket.error, e: 
    if e.args[0] != errno.EINTR: 
     raise 
# Cleanup here... 
print 'Done...' 

Единственный способ, которым я думал о том, проникающей глубоко в связь (listener._listener._socket) и установление параметра без блокировки ... но это, вероятно, имеет некоторые побочные эффекты и, как правило, очень страшно.

У кого-нибудь есть более элегантный (и, возможно, даже правильный!) Способ выполнения этого? Он должен быть портативным для OS X, Linux и BSD, но переносимость Windows и т. Д. Не требуется.

Уточнение: Спасибо всем! Как обычно, обнаруживаются двусмысленности в моем первоначальном вопросе :)

  • Мне нужно выполнить очистку после того, как я отменил прослушивание, и я не всегда хочу, чтобы на самом деле выйти из этого процесса.
  • мне нужно, чтобы иметь возможность получить доступ к этому процессу других процессов не порожден из того же родителя, что делает очереди громоздкие
  • Причин потоков таково:
    • Они доступ к общему государству. На самом деле более или менее общая база данных в памяти, поэтому я полагаю, что это можно сделать по-другому.
    • Я должен иметь возможность одновременного подключения нескольких соединений, но фактические потоки блокируют что-то большую часть времени. Каждое принятое соединение создает новый поток; это, чтобы не блокировать всех клиентов в операциях ввода-вывода.

Что касается темы против процессов, я использую темы для создания моих Блокирующих OPS неблокирующих и процессов для того, чтобы мультипроцессирования.

ответ

3

Это не тот, который выбирает для?

Вызывайте принять на сокете, если выбрать указывает, что он не будет блокировать ...

Выбор: есть тайм-аут, так что вы можете время от времени вспыхивают время от времени проверять , если его время, чтобы закрыть ... .

0

Возможно, это не идеальный вариант, но вы можете освободить блок, отправив сокет некоторые данные из обработчика сигнала или поток, который завершает процесс.

EDIT: Еще один способ реализовать это может быть использовать Connection Queues, так как они, кажется, поддерживают таймаут (Перефразировав, я неправильно понял ваш код в моем первом чтении).

+0

Поскольку я не получаю операцию, я пытаюсь отменить Я не могу просто отправить данные в соединение (их нет), и есть также состояние гонки там. Во-вторых, это сработало бы, если бы я на самом деле был опросом данных, я не так; Я жду нового подключения. Или я неправильно понял вас? – 2008-12-10 22:24:37

+0

Поскольку приемник принимает соединения в указанном сокете, не следует подключаться к нему из другого потока release accept() (используя multiprocessing.connection.Client)? Извиняюсь за двусмысленность во второй части моего ответа, я исправлю это. – codelogic 2008-12-10 22:59:36

+0

Очереди не открываются из внешних инструментов и т. Д. Подключение к соке освободит accepy(), но я думаю, что создаст гонку. – 2008-12-12 23:38:58

1

Я новичок в модуле многопроцессорности, но мне кажется, что смешивание модуля обработки и поточного модуля противоречит интуиции, не нацелены ли они на решение одной и той же проблемы?

В любом случае, как насчет обертывания ваших функций прослушивания в самом процессе? Я не понимаю, как это влияет на остальную часть вашего кода, но это может быть более чистая альтернатива.

from multiprocessing import Process 
from multiprocessing.connection import Listener 


class ListenForConn(Process): 

    def run(self): 
     listener = Listener('/tmp/asdf', 'AF_UNIX') 
     listener.accept() 

     # do your other handling here 


listen_process = ListenForConn() 
listen_process.start() 

print listen_process.is_alive() 

listen_process.terminate() 
listen_process.join() 

print listen_process.is_alive() 
print 'No more listen process.' 
3

Я думал, что я мог бы избежать этого, но мне кажется, что я должен сделать что-то вроде этого:

from processing import connection 
connection.Listener.fileno = lambda self: self._listener._socket.fileno() 

import select 

l = connection.Listener('/tmp/x', 'AF_UNIX') 
r, w, e = select.select((l,),(),()) 
if l in r: 
    print "Accepting..." 
    c = l.accept() 
    # ... 

Я знаю, что это нарушает закон о Деметре и вводят некоторые злую обезьяну-заплатки, но похоже, это было бы самым большим sy-to-port способ выполнения этого. Если у кого-то есть более элегантное решение, я был бы рад его услышать :)