这里有一些代码,它使用conduit
、network-conduit
和stm-conduit
实现小型接收服务器。它在套接字上接收数据,然后通过 STM 通道将其流式传输到主线程。
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)
import System.Directory (removeFile)
import System.IO
type BSChan = TBMChan ByteString
listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ listen soc 2 >> loop where
loop = do
(conn, _) <- accept soc
sourceSocket conn $$ sinkTBMChan chan
close conn
loop
main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
socChan <- listenSocket soc 8
sourceTBMChan socChan $$ DCB.sinkHandle stdout
removeFile "mysock"
(在实际应用程序中,来自套接字的数据流与其他一些数据流合并,这就是为什么我不直接在侦听器中处理它的原因)。
问题是,我原本期望它在主线程被杀死之前保持打开状态,而是在套接字上收到第一条消息后退出。我无法弄清楚它为什么这样做,除非接收器(在倒数第二行)在看到第一个数据流的末尾后退出。我能说服它不要这样做吗?Conduit
有一些关于使源可恢复但不是接收器的内容。
来自sinkTBMChan
的文档:
当水槽关闭时,通道也会关闭。
因此,当第一个插座手柄关闭时,它会导致Source
从sourceSocket
关闭,关闭连接的接收器,进而关闭传播到sinkHandle
停止接收器的TBMChan
。
解决此问题的最简单方法可能是将loop
更改为不会在连接之间关闭的自定义源,并将该源连接到TBMChan
。
listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ do
listen soc 2
loop $$ sinkTBMChan chan
loop = do
(conn, _) <- liftIO $ accept soc
sourceSocket conn
liftIO $ close conn
loop
协调关闭频道中的写入器和读取器是一个不平凡的问题,但您可以重用pipes
生态系统中的解决方案来解决此问题,即使用pipes-concurrency
库。 此库提供了几个独立于pipes
的实用程序,您可以与conduit
库重用这些实用程序,以便在读取器和写入器之间进行通信,以便每一端自动正确知道何时清理,您也可以手动清理任一端。
从pipes-concurrency
库中使用的关键功能是spawn
。 其类型为:
spawn :: Buffer a -> IO (Output a, Input a)
Buffer
指定要使用的底层 STM 通道抽象。 从您的示例代码来看,听起来您想要一个Bounded
缓冲区:
spawn (Bounded 8) :: IO (Output a, Input a)
在这种情况下,a
可以是任何东西,因此它可以是ByteString
,例如:
spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)
Input
和Output
的行为类似于邮箱。 通过将邮件send
到Output
将邮件添加到邮箱,并通过从Input
recv
数据将邮件从邮箱中取出(按 FIFO 顺序):
-- Returns `False` if the mailbox is sealed
send :: Output a -> a -> STM Bool
-- Returns `Nothing` if the mailbox is sealed
recv :: Input a -> STM (Maybe a)
pipes-concurrency
的巧妙功能是,它会检测垃圾回收器,以便在邮箱没有读取器或没有写入器时自动密封邮箱。 这避免了死锁的常见来源。
如果您使用的是pipes
生态系统,则通常使用以下两个更高级别的实用程序来读取和写入邮箱。
-- Stream values into the mailbox until it is sealed
toOutput :: Output a -> Consumer a IO ()
-- Stream values from the mailbox until it is sealed
fromInput :: Input a -> Producer a IO ()
但是,由于核心机制pipes
独立,因此您可以重写这些函数的等效conduit
版本:
import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Pipes.Concurrent
toOutput' :: Output a -> Sink a IO ()
toOutput' o = awaitForever (a -> lift $ atomically $ send o a)
fromInput' :: Input a -> Source IO a
fromInput' i = do
ma <- lift $ atomically $ recv i
case ma of
Nothing -> return ()
Just a -> do
yield a
fromInput' i
那么你的主函数将看起来像这样:
main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
(output, input) <- spawn (Bounded 8)
forkIO $ readFromSocket soc $$ toOutput output
fromInput input $$ DCB.sinkHandle stdout
removeFile "mysock"
。readFromSocket
的地方将是一些从您的Socket
中读取的Source
.
然后,您也可以使用其他数据源自由地写入output
,而不必担心在完成后必须协调它们或正确处理input
或output
。
要了解有关pipes-concurrency
的更多信息,我建议您阅读官方教程。
我认为@shang的答案是正确的,我只是走得更远一点,说writeTBMChan
的行为看起来像是更好的罪魁祸首。我建议将其更改为不自动关闭TBMChan
。这个想法的简单实现是:
sinkTBMChan chan = awaitForever $ liftIO . atomically . writeTBMChan chan
如果您在程序中使用它,它将按预期工作。
因此,这里有一个不涉及创建可恢复接收器的答案。network-conduit
中的sourceSocket
允许单个连接,但我们可以在sourceSocket
内部实现重新连接行为(抱歉代码,我认为它需要清理,但至少它可以工作!
sourceSocket :: (MonadIO m) => Socket -> Producer m ByteString
sourceSocket sock =
loop
where
loop = do
(conn, _) <- lift . liftIO $ accept sock
loop' conn
lift . liftIO $ close conn
loop
loop' conn = do
bs <- lift . liftIO $ recv conn 4096
if B.null bs
then return ()
else yield bs >> loop' conn
这里的一个问题是它永远不会退出(直到程序死亡)。在我的用例中,这不是问题,因为套接字应该在程序的生命周期内保持侦听。