在下一步中使用结果的流式处理管道



我使用的是streaming包。我想通过保留常量内存,使用S.store定义的一个步骤的结果作为管道中后续步骤的参数。myStream是从一个文件中加载和解析的。

我有一个下面的例子进行类型检查:

import qualified Streaming.Prelude as S
import qualified Data.Map.Strict as M
data A = MkA deriving (Show)
insertA :: MonadIO m => S.Stream (S.Of A) m r -> m (M.Map String Int)
insertA = undefined
insertB :: MonadIO m => M.Map String Int -> S.Stream (S.Of A) m r -> m Int
insertB = undefined
myStream :: S.Stream (S.Of A) IO r
myStream = undefined
run :: IO ()
run =
myStream
& S.store insertA
& insertB M.empty
& print

然而,行& insertB M.empty采用了一个空映射,但我想使用上一步中的映射,即insertA函数中的映射。insertB函数随后使用此Map进行查找。

我能想到的解决方案如下:

run :: IO ()
run =
myStream
& S.store insertA
& ( e -> do
resultMap <- S.effects e
insertB resultMap e
)
& print

问题

这是否保留了像在恒定内存中运行一样的流式传输优势?它如何在后台解决这个问题,因为流需要作为一个整体进行处理才能获得Map?它多次传递同一个流-从文件中加载它两次以保留恒定内存?

如果是这种情况(加载文件2次(,如果流的来源不是解析文件,而是来自某个只能读取一次的数据流,该怎么办?

对于这个问题,有没有其他优雅的解决方案也具有流式传输的好处,管道中的下一步需要使用上一步的结果?

这里提出的代码有问题:

resultMap <- S.effects e
insertB resultMap e

问题是你是";运行";相同的流两次,这对于基于IO的流来说通常是有问题的。

例如,假设myStream从文件句柄中读取。当我们第二次调用insertB时,effects已经到达文件末尾!对句柄的任何进一步读取都不会返回任何数据。

当然,我们可以用两个不同的流读取同一个文件两次。这样可以保留流,但需要两次通过。


但应该注意的是,对于某些具有内置资源管理的基本monad,如resourcet,您可以运行两次相同的Stream值,因为流代码是";"聪明";足以在每次运行流时分配和释放底层资源。

例如,存在于线性基中的Stream类型的版本支持函数readFile:

readFile :: FilePath -> Stream (Of Text) RIO ()

返回在资源感知CCD_ 16中工作的CCD_。

也就是说,我不喜欢在流媒体管道中隐藏对文件的重复读取,这对我来说似乎很困惑

最新更新