Вот код, который реализует небольшой принимающий сервер с использованием conduit
, network-conduit
и stm-conduit
. Он получает данные по сокету и затем передает его через STM-канал в основной поток.кабелепровод и розетки: разрешить несколько соединений
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)
import System.Directory (removeFile)
import System.IO
type BSChan = TBMChan ByteString
listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ listen soc 2 >> loop where
loop = do
(conn, _) <- accept soc
sourceSocket conn $$ sinkTBMChan chan
close conn
loop
main :: IO()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
socChan <- listenSocket soc 8
sourceTBMChan socChan $$ DCB.sinkHandle stdout
removeFile "mysock"
(В реальном приложении, поток данных из сокета получает объединены с некоторыми другими, поэтому я не справиться с этим непосредственно у слушателя).
Проблема в том, что, когда я ожидал, что это останется открытым до тех пор, пока основной поток не будет убит, вместо этого он выйдет после того, как первое сообщение будет получено в сокете. Я не могу понять, почему он делает это, если только это не означает, что приемник (от 2-й до последней строки) выходит, как только он видит конец первого потока данных. Могу ли я убедить его не делать этого? В Conduit
есть кое-что о том, чтобы сделать источник возобновляемым, но не раковиной.
Для будущих вопросов, пожалуйста, укажите весь импорт, чтобы ваш код действительно скомпилировался. Это упрощает тестирование решений. – shang
Несовершеннолетний комментарий, не связанный с аспектом кабельного вещания здесь: реализация здесь заставит соединения быть принятыми по одному, вместо того, чтобы иметь отдельный рабочий поток, выделенный каждому входящему соединению. Это намеренно? –
@shang - справедливая точка, я обновляю импорт. Предназначен для добавления сущности, а затем ссылки на нее, но я забыл об этом! – Impredicative