我正在尝试Store
编码和解码以适应Streaming
。 Store
已经实现了带有一个名为 decodeMessageBS
的函数的流式decode
。
我尝试为Streaming
做store
反序列化的基本实现,如下所示(暂时没有bracket
以保持简单(。但是,popper
的逻辑似乎有问题,因为decodeMessageBS
不断抛出PeekException
:
{-# LANGUAGE RankNTypes #-}
import Streaming.Prelude as S hiding (print,show)
import Data.IORef
import Streaming as S
import qualified Data.ByteString as BS (ByteString,empty,length)
import System.IO.ByteBuffer
import Data.Store
import Data.Store.Streaming
streamDecode :: forall a. (Store a) => ByteBuffer -> Stream (Of BS.ByteString) IO () -> Stream (Of a) IO ()
streamDecode bb inp = do
ref <- lift $ newIORef inp
let popper = do
r <- S.uncons =<< readIORef ref
case r of
Nothing -> return Nothing
Just (a,rest) -> writeIORef ref rest >> return (Just a)
let go = do
r <- lift $ decodeMessageBS bb $ popper
lift $ print "Decoding"
case r of
Nothing -> return ()
Just msg -> (lift $ print "Message found") >> (S.yield . fromMessage $ msg) >> go
go
我可以用decodeIOPortionWith
很好地解码我的测试文件 - 所以,问题似乎出在馈送decodeMessageBS
所需的逻辑上。将不胜感激关于这里popper
逻辑有什么问题的指示。
PeekException
是因为Store
在流模式下保存消息时使用不同的格式,这与Binary
不同。使用函数时,它需要围绕Store
数据Message
类型的包装器decodeMessageBS
。 decodeIOPortionWith
不需要Message
包装器,因此可以很好地处理保存下来的Store
数据。在我修复序列化以将数据保存为Message
编码后,decodeMessageBS
该数据上运行良好。