导管和插座:允许多个连接



这里有一些代码,它使用conduitnetwork-conduitstm-conduit实现小型接收服务器。它在套接字上接收数据,然后通过 STM 通道将其流式传输到主线程。

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)
import System.Directory (removeFile)
import System.IO
type BSChan = TBMChan ByteString
listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ listen soc 2 >> loop where 
loop = do
(conn, _) <- accept soc
sourceSocket conn $$ sinkTBMChan chan
close conn
loop
main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
socChan <- listenSocket soc 8
sourceTBMChan socChan $$ DCB.sinkHandle stdout
removeFile "mysock"

(在实际应用程序中,来自套接字的数据流与其他一些数据流合并,这就是为什么我不直接在侦听器中处理它的原因)。

问题是,我原本期望它在主线程被杀死之前保持打开状态,而是在套接字上收到第一条消息后退出。我无法弄清楚它为什么这样做,除非接收器(在倒数第二行)在看到第一个数据流的末尾后退出。我能说服它不要这样做吗?Conduit有一些关于使源可恢复但不是接收器的内容。

来自sinkTBMChan的文档:

当水槽关闭时,通道也会关闭。

因此,当第一个插座手柄关闭时,它会导致SourcesourceSocket关闭,关闭连接的接收器,进而关闭传播到sinkHandle停止接收器的TBMChan

解决此问题的最简单方法可能是将loop更改为不会在连接之间关闭的自定义源,并将该源连接到TBMChan

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ do
listen soc 2
loop $$ sinkTBMChan chan
loop = do
(conn, _) <- liftIO $ accept soc
sourceSocket conn
liftIO $ close conn
loop

协调关闭频道中的写入器和读取器是一个不平凡的问题,但您可以重用pipes生态系统中的解决方案来解决此问题,即使用pipes-concurrency库。 此库提供了几个独立于pipes的实用程序,您可以与conduit库重用这些实用程序,以便在读取器和写入器之间进行通信,以便每一端自动正确知道何时清理,您也可以手动清理任一端。

pipes-concurrency库中使用的关键功能是spawn。 其类型为:

spawn :: Buffer a -> IO (Output a, Input a)

Buffer指定要使用的底层 STM 通道抽象。 从您的示例代码来看,听起来您想要一个Bounded缓冲区:

spawn (Bounded 8) :: IO (Output a, Input a)

在这种情况下,a可以是任何东西,因此它可以是ByteString,例如:

spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)

InputOutput的行为类似于邮箱。 通过将邮件sendOutput将邮件添加到邮箱,并通过从Inputrecv数据将邮件从邮箱中取出(按 FIFO 顺序):

-- Returns `False` if the mailbox is sealed
send :: Output a -> a -> STM Bool
-- Returns `Nothing` if the mailbox is sealed
recv :: Input a -> STM (Maybe a)

pipes-concurrency的巧妙功能是,它会检测垃圾回收器,以便在邮箱没有读取器或没有写入器时自动密封邮箱。 这避免了死锁的常见来源。

如果您使用的是pipes生态系统,则通常使用以下两个更高级别的实用程序来读取和写入邮箱。

-- Stream values into the mailbox until it is sealed
toOutput :: Output a -> Consumer a IO ()
-- Stream values from the mailbox until it is sealed
fromInput :: Input a -> Producer a IO ()

但是,由于核心机制pipes独立,因此您可以重写这些函数的等效conduit版本:

import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Pipes.Concurrent
toOutput' :: Output a -> Sink a IO ()
toOutput' o = awaitForever (a -> lift $ atomically $ send o a)
fromInput' :: Input a -> Source IO a
fromInput' i = do
ma <- lift $ atomically $ recv i
case ma of
Nothing -> return ()
Just a  -> do
yield a
fromInput' i

那么你的主函数将看起来像这样:

main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
(output, input) <- spawn (Bounded 8)
forkIO $ readFromSocket soc $$ toOutput output
fromInput input $$ DCB.sinkHandle stdout
removeFile "mysock"

readFromSocket的地方将是一些从您的Socket中读取的Source.

然后,您也可以使用其他数据源自由地写入output,而不必担心在完成后必须协调它们或正确处理inputoutput

要了解有关pipes-concurrency的更多信息,我建议您阅读官方教程。

我认为@shang的答案是正确的,我只是走得更远一点,说writeTBMChan的行为看起来像是更好的罪魁祸首。我建议将其更改为不自动关闭TBMChan。这个想法的简单实现是:

sinkTBMChan chan = awaitForever $ liftIO . atomically . writeTBMChan chan

如果您在程序中使用它,它将按预期工作。

因此,这里有一个不涉及创建可恢复接收器的答案。network-conduit中的sourceSocket允许单个连接,但我们可以在sourceSocket内部实现重新连接行为(抱歉代码,我认为它需要清理,但至少它可以工作!

sourceSocket :: (MonadIO m) => Socket -> Producer m ByteString
sourceSocket sock =
loop
where
loop = do
(conn, _) <- lift . liftIO $ accept sock
loop' conn
lift . liftIO $ close conn
loop
loop' conn = do
bs <- lift . liftIO $ recv conn 4096
if B.null bs
then return ()
else yield bs >> loop' conn

这里的一个问题是它永远不会退出(直到程序死亡)。在我的用例中,这不是问题,因为套接字应该在程序的生命周期内保持侦听。

相关内容

  • 没有找到相关文章

最新更新