2014-01-06 7 views
7

Вот код, который реализует небольшой принимающий сервер с использованием conduit, network-conduit и stm-conduit. Он получает данные по сокету и затем передает его через STM-канал в основной поток.кабелепровод и розетки: разрешить несколько соединений

import Control.Concurrent (forkIO) 
import Control.Concurrent.STM (atomically) 
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan()) 
import Control.Monad (void) 
import Control.Monad.IO.Class (MonadIO (liftIO)) 
import Control.Monad.Trans.Class 

import Data.ByteString (ByteString) 
import qualified Data.ByteString as B 
import Data.Conduit 
import qualified Data.Conduit.Binary as DCB 
import Data.Conduit.Extra.Resumable 
import Data.Conduit.Network (sourceSocket) 
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources) 

import System.Directory (removeFile) 
import System.IO 

type BSChan = TBMChan ByteString 

listenSocket :: Socket -> Int -> IO BSChan 
listenSocket soc bufSize = do 
    chan <- atomically $ newTBMChan bufSize 
    forkListener chan 
    return chan 
    where 
    forkListener chan = void . forkIO $ listen soc 2 >> loop where 
     loop = do 
     (conn, _) <- accept soc 
     sourceSocket conn $$ sinkTBMChan chan 
     close conn 
     loop 

main :: IO() 
main = do 
    soc <- socket AF_UNIX Stream 0 
    bind soc (SockAddrUnix "mysock") 
    socChan <- listenSocket soc 8 
    sourceTBMChan socChan $$ DCB.sinkHandle stdout 
    removeFile "mysock" 

(В реальном приложении, поток данных из сокета получает объединены с некоторыми другими, поэтому я не справиться с этим непосредственно у слушателя).

Проблема в том, что, когда я ожидал, что это останется открытым до тех пор, пока основной поток не будет убит, вместо этого он выйдет после того, как первое сообщение будет получено в сокете. Я не могу понять, почему он делает это, если только это не означает, что приемник (от 2-й до последней строки) выходит, как только он видит конец первого потока данных. Могу ли я убедить его не делать этого? В Conduit есть кое-что о том, чтобы сделать источник возобновляемым, но не раковиной.

+2

Для будущих вопросов, пожалуйста, укажите весь импорт, чтобы ваш код действительно скомпилировался. Это упрощает тестирование решений. – shang

+0

Несовершеннолетний комментарий, не связанный с аспектом кабельного вещания здесь: реализация здесь заставит соединения быть принятыми по одному, вместо того, чтобы иметь отдельный рабочий поток, выделенный каждому входящему соединению. Это намеренно? –

+0

@shang - справедливая точка, я обновляю импорт. Предназначен для добавления сущности, а затем ссылки на нее, но я забыл об этом! – Impredicative

ответ

6

Из справки о из sinkTBMChan:

Когда раковина закрыта, канал закроется тоже.

Так что, когда первое гнездо ручки закрывается, это приводит к тому, Source от sourceSocket закрыть, закрыть подключенный слив, который, в свою очередь, закрывает TBMChan, распространяющуюся в sinkHandle остановить слив.

Простейший способ решить это, возможно, изменить ваш loop в пользовательский источник, который не замыкается между соединениями и подключить этот источник к TBMChan.

listenSocket :: Socket -> Int -> IO BSChan 
listenSocket soc bufSize = do 
    chan <- atomically $ newTBMChan bufSize 
    forkListener chan 
    return chan 
    where 
    forkListener chan = void . forkIO $ do 
     listen soc 2 
     loop $$ sinkTBMChan chan 

    loop = do 
     (conn, _) <- liftIO $ accept soc 
     sourceSocket conn 
     liftIO $ close conn 
     loop 
+0

Да, это в значительной степени то, что я сделал (см. Ниже).Я полностью отключил «network-conduit» и только что реализовал источник, который не закрывает соединение. – Impredicative

1

Итак, вот один ответ, который не предполагает создания возобновляемой раковины. sourceSocket в network-conduit позволяет использовать одно соединение, но мы можем реализовать поведение повторного подключения внутри sourceSocket (извинений для кода, я думаю, что это нуждается в очистке, но, по крайней мере, это работает!):

sourceSocket :: (MonadIO m) => Socket -> Producer m ByteString 
sourceSocket sock = 
    loop 
    where 
    loop = do 
     (conn, _) <- lift . liftIO $ accept sock 
     loop' conn 
     lift . liftIO $ close conn 
     loop 
    loop' conn = do 
     bs <- lift . liftIO $ recv conn 4096 
     if B.null bs 
     then return() 
     else yield bs >> loop' conn 

Одна проблема здесь состоит в том, что это никогда не выйдет (пока программа не умрет). Это не проблема в моем случае использования, так как сокет должен продолжать прослушивать жизнь программы.

4

Координирование завершения работы авторов и читателей из канала является нетривиальной задачей, но вы можете использовать решение от pipes экосистемы решить эту проблему, которая заключается в использовании pipes-concurrency библиотеки. Эта библиотека предоставляет несколько ненужных утилит pipes, которые можно повторно использовать с библиотеками conduit для обмена информацией между читателями и писателями, чтобы каждая сторона автоматически правильно узнала, когда нужно очищать, и вы также можете вручную очистить обе стороны.

Ключевая функция, которую вы используете из библиотеки pipes-concurrency, - spawn. Его тип:

spawn :: Buffer a -> IO (Output a, Input a) 

Buffer указывает, что базовая абстракция STM канал использовать.Судя по вашему примеру кода, это звучит, как вы хотите Bounded буфер:

spawn (Bounded 8) :: IO (Output a, Input a) 

a может быть что-то в этом случае, так это может быть ByteString, например:

spawn (Bounded 8) :: IO (Output ByteString, Input ByteString) 

Input и Output ведут себя как почтовый ящик. Вы добавлять сообщения в почтовый ящик с помощью send щих данных в Output с и вы принимаете сообщения выхода из почтового ящика (в порядке их поступления) по recv ИНГ данные Input с:

-- Returns `False` if the mailbox is sealed 
send :: Output a -> a -> STM Bool 

-- Returns `Nothing` if the mailbox is sealed 
recv :: Input a -> STM (Maybe a) 

Аккуратные особенностью pipes-concurrency является то, что ему позволяет сборщику мусора автоматически запечатывать почтовый ящик, если в почтовый ящик нет читателей или нет писателей. Это позволяет избежать общего источника взаимоблокировок.

Если вы используете экосистему pipes, вы обычно используете следующие две утилиты более высокого уровня для чтения и записи в почтовый ящик.

-- Stream values into the mailbox until it is sealed 
toOutput :: Output a -> Consumer a IO() 

-- Stream values from the mailbox until it is sealed 
fromInput :: Input a -> Producer a IO() 

Однако, поскольку основной механизм является pipes -независимой вы можете переписать эквивалентные conduit версии этих функций:

import Control.Monad.Trans.Class (lift) 
import Data.Conduit 
import Pipes.Concurrent 

toOutput' :: Output a -> Sink a IO() 
toOutput' o = awaitForever (\a -> lift $ atomically $ send o a) 

fromInput' :: Input a -> Source IO a 
fromInput' i = do 
    ma <- lift $ atomically $ recv i 
    case ma of 
     Nothing -> return() 
     Just a -> do 
      yield a 
      fromInput' i 

Тогда ваша основная функция будет выглядеть как-то вроде этого:

main :: IO() 
main = do 
    soc <- socket AF_UNIX Stream 0 
    bind soc (SockAddrUnix "mysock") 
    (output, input) <- spawn (Bounded 8) 
    forkIO $ readFromSocket soc $$ toOutput output 
    fromInput input $$ DCB.sinkHandle stdout 
    removeFile "mysock" 

... где readFromSocket будет Source, что читает с вашего Socket.

Вы можете свободно писать в output с использованием других источников данных, также, и не беспокоиться о том, чтобы координировать их или распоряжаться input или output должным образом, когда вы закончите.

Чтобы узнать больше о pipes-concurrency, я рекомендую прочитать official tutorial.

+0

Спасибо за это, выглядит как потенциально более удобный способ заниматься вещами. На данный момент мне удалось решить проблему, но немного поучу через учебник. – Impredicative

1

Я думаю, что ответ Шанга правильный, я бы просто пошел немного дальше и скажу, что поведение writeTBMChan выглядит как лучший преступник здесь. Я бы рекомендовал изменить его, чтобы автоматически не закрыть TBMChan. Простая реализация этой идеи:

sinkTBMChan chan = awaitForever $ liftIO . atomically . writeTBMChan chan 

Если вы используете это в своей программе, оно будет работать должным образом.

 Смежные вопросы

  • Нет связанных вопросов^_^