我正在编写一个服务器,其中一个要求是它需要能够将数据推送到客户端,而无需客户端直接请求数据。我正在使用导管,但感觉这超出了导管的能力。我遇到的问题是,似乎没有一种方法可以告诉套接字是否有可用的数据,并且await将阻塞执行,直到有可用的数据。假设我有以下函数
getPacket :: Conduit ByteString IO ClientPacket --take a bytestring and yield a ClientPacket i.e. the ByteString deserialized into a sensible form
processPacket :: Conduit ClientPacket IO ServerPacket --take a ClientPacket and yield a ServerPacket i.e. a response to the client's request
putPacket :: Conduit ServerPacket IO ByteString --serialize the ServerPacket
然后我将管道与管道的源和汇连接在一起。网络图书馆
appSource appData $$ getPacket =$= processPacket =$= putPacket $= appSink appData
现在,我从导管外部引入一个数据源,我想将该数据合并到导管中。例如,如果这是一个聊天服务器,外部数据将是由其他客户机发送的消息。问题是,无论我在哪里尝试引入这个外部数据,它都会被await调用阻塞。基本上,我将得到如下所示的代码:
yield processOutsideData --deal with the outside data
data <- await --await data from upstream
更多的外部数据将被处理的唯一方法是,如果上游组件产生一些东西,但上游只会产生,如果它从客户端获得数据,这正是我要避免的。我已经尝试使用多个线程和TChan来解决这个问题,但似乎appSource和appSink必须在同一线程中使用,否则我从recv(这是有意义的)得到无效的文件描述符异常。
然而,如果套接字源和接收器在同一个线程中运行,我再次遇到等待阻塞的问题,我无法检查套接字是否有数据可用。在这一点上,我似乎遇到了一堵管道墙。
但我真的很喜欢使用导管,并希望继续使用它们。所以我的问题是:有没有一种方法可以实现我试图用导管实现的目标?
Michael Snoyman的管道网络示例使用并发性。telnet客户端示例运行一个线程用于发送输入,另一个线程用于显示接收到的内容。我已经调整它来发送和接收整行
{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Monad (liftM, void)
import Data.ByteString (ByteString)
import Data.ByteString.Char8 (unpack)
import Data.Conduit.Network
import Data.String (IsString, fromString)
import Network (withSocketsDo)
getLines :: (IsString a, MonadIO m) => Producer m a
getLines = repeatMC . liftM fromString $ liftIO getLine
putLines :: (MonadIO m) => Consumer ByteString m ()
putLines = mapM_C $ liftIO . putStrLn . unpack
main :: IO ()
main = withSocketsDo $
runTCPClient (clientSettings 4000 "localhost") $ server ->
void $ concurrently
(getLines $$ appSink server)
(appSource server $$ putLines)
我们可以在服务器上做同样的事情。创建STM通道,将接收到的数据写入通道,并将数据从通道发送到客户端。它使用STM -conduit包的简单包装器来封装STM通道sourceTBMChan
和sinkTBMChan
。
{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Concurrent.STM.TBMChan (newTBMChan)
import Control.Monad (void)
import Control.Monad.STM (atomically)
import Data.Conduit.Network
import Data.Conduit.TMChan (sourceTBMChan, sinkTBMChan)
import Network (withSocketsDo)
main :: IO ()
main = withSocketsDo $ do
channel <- atomically $ newTBMChan 10
runTCPServer (serverSettings 4000 "*") $ server ->
void $ concurrently
(appSource server $$ sinkTBMChan channel False)
(sourceTBMChan channel $$ appSink server)
如果我们运行的服务器只有一个客户端连接,它回显客户端发送的内容。
----------
| a | (sent)
| a | (received)
| b | (sent)
| b | (received)
| c | (sent)
| c | (received)
----------
如果我们在多个客户端连接的情况下运行服务器,则消息在客户端之间分布,每个消息由一个客户端获取。
---------- ----------
| 1 | (sent) | 1 | (received)
| 2 | (sent) | 3 | (received)
| 2 | (received) | |
| 3 | (sent) | |
| | | |
| | | |
---------- ----------
这个例子没有处理客户端关闭连接时该怎么做