哈斯克尔中的高效比特流



在不断努力有效地摆弄比特(例如,请参阅这个SO问题)中,最新的挑战是比特的高效流式传输和消费。

作为第一个简单的任务,我选择在/dev/urandom生成的比特流中找到最长的相同位序列。典型的咒语是head -c 1000000 </dev/urandom | my-exe。实际目标是流式传输位并解码Elias gamma码,例如,不是字节块或其倍数的代码。

对于这种可变长度的代码,最好有taketakeWhilegroup等语言来操作列表。由于BitStream.take实际上会消耗部分 bistream 一些 monad 可能会发挥作用。

显而易见的起点是Data.ByteString.Lazy的懒惰字节。

A. 计算字节数

正如预期的那样,这个非常简单的Haskell程序的性能与C程序相当。

import qualified Data.ByteString.Lazy as BSL
main :: IO ()
main = do
bs <- BSL.getContents
print $ BSL.length bs

B. 添加字节数

一旦我开始使用unpack事情应该会变得更糟。

main = do
bs <- BSL.getContents
print $ sum $ BSL.unpack bs

令人惊讶的是,Haskell和C表现出几乎相同的表现。

C. 相同位的最长序列

作为第一个非平凡任务,可以像这样找到最长的相同位序列:

module Main where
import           Data.Bits            (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import           Data.List            (group)
import           Data.Word8           (Word8)
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (i-> (w `shiftR` i) .&. 1 == 1) [0..7]
bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)
main :: IO ()
main = do
bs <- BSL.getContents
print $ maximum $ length <$> (group $ bitStream bs)

延迟字节串转换为列表[Word8]然后,使用移位,将每个Word拆分为位,从而产生列表[Bool]。然后,此列表列表将用concat扁平化。获得Bool的(惰性)列表后,使用group将列表拆分为相同位的序列,然后在其上映射length。最后maximum给出了所需的结果。很简单,但不是很快:

# C
real    0m0.606s
# Haskell
real    0m6.062s

这种幼稚的实现正好慢了一个数量级。

分析显示分配了相当多的内存(大约 3GB 用于解析 1MB 的输入)。不过,没有观察到大规模的空间泄漏。

从这里我开始四处闲逛:

  • 有一个bitstream包承诺">快速、打包、严格的比特流(即布尔列表)与半自动流融合"。不幸的是,它不是最新的当前vector包,有关详细信息,请参阅此处。
  • 接下来,我调查streaming.我不太明白为什么我需要"有效"的流来发挥一些monad的作用 - 至少在我开始与所提出的任务相反,即编码和写入比特流到文件之前。
  • 只是foldByteString怎么样?我必须引入状态来跟踪消耗的位。这不是taketakeWhilegroup等语言的好选择。

现在我不太确定该去哪里。

更新

我想出了如何用streamingstreaming-bytestring做到这一点.我可能做得不对,因为结果是灾难性的糟糕。

import           Data.Bits                 (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import           Data.Word8                (Word8)
import qualified Streaming                 as S
import           Streaming.Prelude         (Of, Stream)
import qualified Streaming.Prelude         as S
splitByte :: Word8 -> [Bool]
splitByte w = (i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]
bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s
main :: IO ()
main = do
let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
gs = S.group $ bitStream bs ::  Stream (Stream (Of Bool) IO) IO ()
maxLen <- S.maximum $ S.mapped S.length gs
print $ S.fst' maxLen

这将测试您对来自 stdin(标准)输入的几千字节以外的任何内容的耐心。分析器表示,它在Streaming.Internal.>>=.loopData.Functor.Of.fmap上花费了大量的时间(输入大小的二次)。我不太确定第一个是什么,但fmap表明(?)这些Of a b的杂耍对我们没有任何好处,因为我们在IO monad中,所以它无法优化。

我这里还有字节加法器的流等效物:SumBytesStream.hs,它比简单的懒惰ByteString实现略慢,但仍然不错。由于streaming-bytestring被宣称为">字节完成正确",我期望更好。那么,我可能做得不对。

无论如何,所有这些位计算都不应该发生在IO monad中。但是BSS.getContents迫使我进入IO monad,因为getContents :: MonadIO m => ByteString m ()并且没有出路。

更新 2

按照@dfeuer的建议,我在master@HEAD使用了streaming包。这是结果。

longest-seq-c       0m0.747s    (C)
longest-seq         0m8.190s    (Haskell ByteString)
longest-seq-stream  0m13.946s   (Haskell streaming-bytestring)

Streaming.concat的 O(n^2) 问题解决了,但我们仍然没有接近 C 基准。

更新 3

Cirdec的解决方案产生了与C相媲美的性能。使用的构造称为"教会编码列表",请参阅此SO答案或Haskell维基关于rank-N类型的内容。

源文件:

所有源文件都可以在github上找到。Makefile具有运行实验和分析的所有各种目标。默认make将只构建所有内容(首先创建一个bin/目录!),然后make time将对longest-seq可执行文件进行计时。C 可执行文件附加一个-c以区分它们。

当流上的操作融合在一起时,可以删除中间分配及其相应的开销。GHC 前奏以重写规则的形式为惰性流提供折叠/构建融合。一般的想法是,如果一个函数产生一个看起来像折叠器的结果(它有应用于(:)[]的类型(a -> b -> b) -> b -> b),而另一个函数使用一个看起来像折叠器的列表,则可以删除构造中间列表。

对于您的问题,我将构建类似的东西,但使用严格的左折叠(foldl')而不是折叠器。我将使用强制列表看起来像左折的数据类型,而不是使用重写规则来尝试检测某些内容何时看起来像foldl

-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}

由于我从放弃列表开始,我们将重新实现列表前奏的一部分。

可以从列表和字节串的foldl'函数创建严格的左折叠。

{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (c z -> foldl' c z l)
{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (c z -> BSL.foldl' c z l)

使用一个最简单的示例是查找列表的长度。

{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (z a -> z+1) 0

我们还可以映射和连接左褶皱。

{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (c z -> build l (z a -> c z (f a)) z)
{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (c z -> build ll (z l -> build l c z) z)

对于您的问题,我们需要能够将单词拆分为位。

{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (i-> (w `shiftR` i) .&. 1 == 1) [0..7]
{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte

ByteString

{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS

为了找到最长的运行,我们将跟踪以前的值、当前运行的长度和最长的运行长度。我们使字段严格,以便折叠的严格性防止在内存中累积。为状态创建严格的数据类型是控制其内存表示形式和计算其字段时间的简单方法。

data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
where
current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
where
(LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)

我们完成了

main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun $ bitStream' bs

这要快得多,但不是c的性能。

longest-seq-c       0m00.12s    (C)
longest-seq         0m08.65s    (Haskell ByteString)
longest-seq-fuse    0m00.81s    (Haskell ByteString fused)

程序分配大约 1 Mb 从输入中读取 1000000 字节。

total alloc =   1,173,104 bytes  (excludes profiling overheads)

更新了 github 代码

我找到了另一种与C相提并论的解决方案。该Data.Vector.Fusion.Stream.Monadic有一个基于Coutts,Leshchinskiy,Stewart 2007论文的流实现。其背后的想法是使用破坏/展开流融合。

回想一下,列表的展开器:: (b -> Maybe (a, b)) -> b -> [a]通过重复应用(展开)前步函数(从初始值开始)来创建列表。Stream只是一个具有启动状态的unfoldr函数。 (Data.Vector.Fusion.Stream.Monadic库使用 GADT 为Step创建构造函数,这些构造函数可以方便地进行模式匹配。我认为,在没有GADT的情况下也可以做到这一点。

解决方案的核心部分是将BytesString转换为Bool流的mkBitstream :: BSL.ByteString -> Stream Bool函数。基本上,我们跟踪当前ByteString,当前字节以及当前字节中有多少尚未消耗。每当一个字节用完时,另一个字节就会被砍掉ByteString。剩下Nothing时,流Done

longestRun函数直接取自@Cirdec的解决方案。

这是练习曲:

{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where
import           Control.Monad.Identity            (Identity)
import           Data.Bits                         (shiftR, (.&.))
import qualified Data.ByteString.Lazy              as BSL
import           Data.Functor.Identity             (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import           Data.Word8                        (Word8)
type Stream a = S.Stream Identity a   -- no need for any monad, really
data Step = Step BSL.ByteString !Word8 !Word8   -- could use tuples, but this is faster
mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
{-# INLINE_INNER step #-}
step (Step bs w n) | n==0 = case (BSL.uncons bs) of
Nothing        -> return S.Done
Just (w', bs') -> return $ 
S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7)
| otherwise = return $ 
S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1))

data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x  = LongestRun x current (max current longest)
where current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
(LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
return longest
main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun (mkBitstream bs)