有没有一种方法可以让一个管道从多个来源获取数据而不阻塞它们中的任何一个



我正在编写一个服务器,其中一个要求是它需要能够将数据推送到客户端,而无需客户端直接请求数据。我正在使用导管,但感觉这超出了导管的能力。我遇到的问题是,似乎没有一种方法可以告诉套接字是否有可用的数据,并且await将阻塞执行,直到有可用的数据。假设我有以下函数

getPacket :: Conduit ByteString IO ClientPacket --take a bytestring and yield a ClientPacket i.e. the ByteString deserialized into a sensible form
processPacket :: Conduit ClientPacket IO ServerPacket --take a ClientPacket and yield a ServerPacket i.e. a response to the client's request
putPacket :: Conduit ServerPacket IO ByteString --serialize the ServerPacket

然后我将管道与管道的源和汇连接在一起。网络图书馆

appSource appData $$ getPacket =$= processPacket =$= putPacket $= appSink appData

现在,我从导管外部引入一个数据源,我想将该数据合并到导管中。例如,如果这是一个聊天服务器,外部数据将是由其他客户机发送的消息。问题是,无论我在哪里尝试引入这个外部数据,它都会被await调用阻塞。基本上,我将得到如下所示的代码:

yield processOutsideData --deal with the outside data
data <- await            --await data from upstream

更多的外部数据将被处理的唯一方法是,如果上游组件产生一些东西,但上游只会产生,如果它从客户端获得数据,这正是我要避免的。我已经尝试使用多个线程和TChan来解决这个问题,但似乎appSource和appSink必须在同一线程中使用,否则我从recv(这是有意义的)得到无效的文件描述符异常。

然而,如果套接字源和接收器在同一个线程中运行,我再次遇到等待阻塞的问题,我无法检查套接字是否有数据可用。在这一点上,我似乎遇到了一堵管道墙。

但我真的很喜欢使用导管,并希望继续使用它们。所以我的问题是:有没有一种方法可以实现我试图用导管实现的目标?

Michael Snoyman的管道网络示例使用并发性。telnet客户端示例运行一个线程用于发送输入,另一个线程用于显示接收到的内容。我已经调整它来发送和接收整行

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Monad            (liftM, void)
import Data.ByteString          (ByteString)
import Data.ByteString.Char8    (unpack)
import Data.Conduit.Network
import Data.String              (IsString, fromString)
import Network                  (withSocketsDo)
getLines :: (IsString a, MonadIO m) => Producer m a
getLines = repeatMC . liftM fromString $ liftIO getLine
putLines :: (MonadIO m) => Consumer ByteString m ()
putLines = mapM_C $ liftIO . putStrLn . unpack
main :: IO ()
main = withSocketsDo $
    runTCPClient (clientSettings 4000 "localhost") $ server ->
        void $ concurrently
            (getLines $$ appSink server)
            (appSource server $$ putLines)

我们可以在服务器上做同样的事情。创建STM通道,将接收到的数据写入通道,并将数据从通道发送到客户端。它使用STM -conduit包的简单包装器来封装STM通道sourceTBMChansinkTBMChan

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async       (concurrently)
import Control.Concurrent.STM.TBMChan (newTBMChan)
import Control.Monad                  (void)
import Control.Monad.STM              (atomically)
import Data.Conduit.Network
import Data.Conduit.TMChan            (sourceTBMChan, sinkTBMChan)
import Network                        (withSocketsDo)
main :: IO ()
main = withSocketsDo $ do
    channel <- atomically $ newTBMChan 10
    runTCPServer (serverSettings 4000 "*") $ server ->
        void $ concurrently
            (appSource server $$ sinkTBMChan channel False)
            (sourceTBMChan channel $$ appSink server)

如果我们运行的服务器只有一个客户端连接,它回显客户端发送的内容。

----------
| a      | (sent)
| a      | (received)
| b      | (sent)
| b      | (received)
| c      | (sent)
| c      | (received) 
----------

如果我们在多个客户端连接的情况下运行服务器,则消息在客户端之间分布,每个消息由一个客户端获取。

----------             ----------
| 1      | (sent)      | 1      | (received)
| 2      | (sent)      | 3      | (received)
| 2      | (received)  |        |
| 3      | (sent)      |        |
|        |             |        |
|        |             |        |
----------             ----------

这个例子没有处理客户端关闭连接时该怎么做

相关内容

  • 没有找到相关文章

最新更新