我想处理通过 MQTT 接收的事件流。我正在使用的库使用回调来提供结果。我正在做的处理取决于以前的状态,而不仅仅是最新的事件。此外,将来可能会从其他来源收集事件。
起初,我决定将其组合到列表中,这听起来是个好主意。我遇到了一个小问题,因为 IO 可以防止延迟评估并且等待无限流可能会很长,但我通过交错 IO 解决了它。
stream :: IO [Event]
允许我做一些好事,比如foldl
、foldM
map
、mapM
等......不幸的是,使用这种方法,我宁愿无法合并两个流,因为那里没有更多的锁定功能。
我正在挖掘许多库,例如,我发现了带有 TQueue 的 STM。不幸的是,这不是我真正想要的。
我决定创建自定义类型并使其Foldable
以便能够折叠它。由于IO,我失败了。
import Control.Concurrent.STM
newtype Stream a = Stream (STM a)
runStream
:: ((a -> IO ()) -> IO i)
-> IO (Stream a)
runStream block = do
queue <- newTQueueIO
block (atomically . writeTQueue queue)
return $ Stream (readTQueue queue)
foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
n <- atomically read
m <- f n s
foldStream f m (Stream read)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read
zipStream :: [Stream a] -> Stream a
zipStream = undefined
呜呜main = foldStream (x _ -> print x) () =<< events
呜��
是否可以像常规 List 一样实现一些基类的基类来处理此流?
在这些情况下,通常的技巧是使回调写入队列,然后从队列的另一端读取。
使用 stm-chans 包中的有界、可关闭的队列,我们可以定义这个函数:
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
foldQueue :: TBMQueue a -> (x -> a -> IO x) -> IO x -> (x -> IO b) -> IO b
foldQueue queue step start done =
let go state =
do m <- atomically (readTBMQueue queue)
case m of
Nothing -> done state
Just a -> step state a >>= go
in start >>= go
它采用通道、一个阶跃函数(类似于foldM
所需的函数(、一个获取初始状态的动作和一个返回最终结果的"done"动作,然后从通道馈送数据,直到它关闭。请注意,折叠状态x
由foldQueue
的调用方选择。
如果以后我们想从 foldl 包升级到 monadic folds(它有一个非常有用的Applicative
实例(,我们可以这样做:
import qualified Control.Foldl as L
foldQueue' :: TBMQueue a -> L.FoldM IO a b -> IO b
foldQueue' queue = L.impurely (foldQueue queue)
使用"折叠"包中的impurely
。
有时(例如在解析、分组或解码时(,使用基于拉取的使用者会更容易。我们可以使用流式处理包来做到这一点:
import Streaming
import qualified Streaming.Prelude as S
foldQueue' :: TBMQueue a -> (Stream (Of a) IO () -> IO r) -> IO r
foldQueue' queue consume = consume (S.untilRight (do
m <- atomically (readTBMQueue queue)
return (case m of
Nothing -> Right ()
Just a -> Left a)))
给定一个使用流的函数,我们向它提供从队列中读取的值流。
通常,从通道读取和写入通道必须在不同的线程中进行。我们可以使用异步concurrently
等函数来干净地处理它。