Эффективные потоки битов в Haskell

В постоянном стремлении эффективно сражаться с битами (например, см. Этот вопрос SO) самой новой задачей является эффективная потоковая передача и потребление бит.

В качестве первой простой задачи я выбираю найти самую длинную последовательность идентичных бит в битовом потоке, генерируемом /dev/urandom. Типичным заклинанием будет head -c 1000000 </dev/urandom | my-exe head -c 1000000 </dev/urandom | my-exe. Фактическая цель состоит в том, чтобы потоковые биты и декодировать гамма-код Elias, например, то есть коды, которые не являются кусками байтов или их кратных.

Для таких кодов переменной длины приятно иметь язык take, takeWhile, group и т.д. Для манипулирования списками. Так как BitStream.take самом деле потребляет часть биста, возможно, вступает в игру какая-то монада.

Очевидной отправной точкой является ленивая байтовая строка из Data.ByteString.Lazy.

A. Подсчет байтов

Эта очень простая программа Haskell выполняет наравне с программой C, как и следовало ожидать.

import qualified Data.ByteString.Lazy as BSL

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ BSL.length bs

B. Добавление байтов

Как только я начну использовать unpack все должно стать хуже.

main = do
    bs <- BSL.getContents
    print $ sum $ BSL.unpack bs

Удивительно, что Haskell и C показывают почти такую же производительность.

C. Самая длинная последовательность идентичных бит

В качестве первой нетривиальной задачи самая длинная последовательность идентичных бит может быть найдена следующим образом:

module Main where

import           Data.Bits            (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import           Data.List            (group)
import           Data.Word8           (Word8)

splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w 'shiftR' i) .&. 1 == 1) [0..7]

bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ maximum $ length <$> (group $ bitStream bs)

Лесткая байтовая трансформация преобразуется в список [Word8] а затем, используя сдвиги, каждое Word разбивается на биты, в результате чего появляется список [Bool]. Этот список списков затем сглаживается с помощью concat. Получив (ленивый) список Bool, используйте group чтобы разбить список на последовательности одинаковых битов, а затем отобразить length над ним. Наконец, maximum дает желаемый результат. Довольно просто, но не очень быстро:

# C
real    0m0.606s

# Haskell
real    0m6.062s

Эта наивная реализация на порядок медленнее.

Профилирование показывает, что выделяется довольно много памяти (около 3 ГБ для разбора 1 МБ ввода). Тем не менее, нет большой утечки пространства.

Отсюда я начинаю ковырять:

  • Существует bitstream пакет, который обещает "быстро, упакованы, жесткие битовые потоки (т.е. список из BOOLS) с полуавтоматическим слиянием потока.". К сожалению, он не обновляется с текущим пакетом vector, см. Здесь подробности.
  • Затем я исследую streaming. Я не совсем понимаю, зачем мне нужно "эффектное" потоковое вещание, которое приносит какую-нибудь монаду в игру - по крайней мере, до тех пор, пока я не начну с обратной задачи поставленной задачи, то есть кодирования и записи битовых потоков в файл.
  • Как просто fold -ing над ByteString? Я должен был бы ввести состояние, чтобы отслеживать потребляемые биты. Это не совсем приятное take, takeWhile, group и т.д. Язык, который желателен.

И теперь я не совсем уверен, куда идти.

Обновление:

Я понял, как это сделать с streaming и streaming-bytestring. Я, вероятно, не делаю этого правильно, потому что результат катастрофически плох.

import           Data.Bits                 (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import           Data.Word8                (Word8)
import qualified Streaming                 as S
import           Streaming.Prelude         (Of, Stream)
import qualified Streaming.Prelude         as S

splitByte :: Word8 -> [Bool]
splitByte w = (\i-> (w 'shiftR' i) .&. 1 == 1) <$> [0..7]

bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s

main :: IO ()
main = do
    let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
        gs = S.group $ bitStream bs ::  Stream (Stream (Of Bool) IO) IO ()
    maxLen <- S.maximum $ S.mapped S.length gs
    print $ S.fst' maxLen

Это проверит ваше терпение с чем-то, кроме нескольких тысяч байтов ввода от stdin. Профайлер говорит, что он проводит безумное количество времени (квадратичное по размеру ввода) в Streaming.Internal.>>=.loop и Data.Functor.Of.fmap. Я не совсем уверен, что такое первый, но fmap указывает (?), Что жонглирование этих Of ab не приносит нам никакой пользы, и поскольку мы находимся в монаде IO, его нельзя оптимизировать.

У меня также есть потоковый эквивалент байтового сумматора : SumBytesStream.hs, который немного медленнее, чем простая ленивая реализация ByteString, но по-прежнему приличная. Поскольку streaming-bytestring провозглашается как "bytestring io done right", я ожидал лучшего. Тогда я, вероятно, не сделаю это правильно.

В любом случае, все эти бит -c не должны происходить в монаде IO. Но BSS.getContents заставляет меня в монаду IO, потому что getContents :: MonadIO m => ByteString m() и нет выхода.

Обновление 2

Следуя совету @dfeuer, я использовал streaming пакет в master @HEAD. Вот результат.

longest-seq-c       0m0.747s    (C)
longest-seq         0m8.190s    (Haskell ByteString)
longest-seq-stream  0m13.946s   (Haskell streaming-bytestring)

Проблема O (n ^ 2) Streaming.concat решена, но мы все еще не приближаемся к эталону C.

Обновление 3

Решение Cirdec дает производительность по аналогии с C. Используемая конструкция называется "Церковно-кодированные списки", см. Этот ответ SO или Haskell Wiki по типам ранга-N.

Исходные файлы:

Все исходные файлы можно найти в github. В Makefile есть все различные цели для запуска экспериментов и профилирования. По умолчанию make будет просто создавать все (сначала создайте каталог bin/ !), А затем make time для выполнения самых longest-seq исполняемых файлов. В исполняемых файлах C добавляется -c чтобы различать их.

Ответы

Ответ 1

Промежуточные распределения и их соответствующие накладные расходы могут быть удалены, когда операции с потоками сливаются вместе. Прелюдия GHC обеспечивает фальсификацию fold/build для ленивых потоков в виде правил перезаписи. Общая идея состоит в том, что если одна функция создает результат, который выглядит как foldr (у него есть тип (a → b → b) → b → b примененный к (:) и []), а другая функция расходует список который выглядит как foldr, можно создать промежуточный список.

Для вашей проблемы я собираюсь построить что-то подобное, но используя строгие левые складки (foldl') вместо foldr. Вместо использования правил перезаписи, которые пытаются обнаружить, когда что-то похоже на foldl, я буду использовать тип данных, который заставит списки выглядеть как левые складки.

-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}

Поскольку я начал с отказа от списков, мы будем повторно внедрять часть прелюдии для списков.

Строгие левые складки могут быть созданы из функций foldl' как списков, так и байтов.

{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (\c z -> foldl' c z l)

{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (\c z -> BSL.foldl' c z l)

Простейшим примером его использования является поиск длины списка.

{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (\z a -> z+1) 0

Мы также можем отображать и объединять левые складки.

{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z)

{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)

Для вашей проблемы мы должны иметь возможность разбить слово на биты.

{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w 'shiftR' i) .&. 1 == 1) [0..7]

{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte

И ByteString в биты

{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS

Чтобы найти самый длинный прогон, мы будем отслеживать предыдущее значение, длину текущего прогона и длину самого длинного пробега. Мы создаем поля строгими, чтобы строгость складки препятствовала накоплению накопленных в памяти цепей. Создание строгого типа данных для состояния - это простой способ получить контроль над представлением своей памяти и при оценке его полей.

data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
  where
    current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
 where
   (LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)

И мы закончили

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun $ bitStream' bs

Это намного быстрее, но не совсем производительность c.

longest-seq-c       0m00.12s    (C)
longest-seq         0m08.65s    (Haskell ByteString)
longest-seq-fuse    0m00.81s    (Haskell ByteString fused)

Программа выделяет около 1 Мб для чтения 1000000 байт с ввода.

total alloc =   1,173,104 bytes  (excludes profiling overheads)

Обновлен код github

Ответ 2

Я нашел другое решение, которое находится на одном уровне с C. Data.Vector.Fusion.Stream.Monadic имеет реализацию потока, основанную на этой статье Coutts, Leshchinskiy, Stewart 2007. Идея этого заключается в использовании слияния с потоком destroy/unfoldr.

Напомним, что список unfoldr :: (b → Maybe (a, b)) → b → [a] создает список, повторно применяя (разворачивая) функцию шага вперед, начиная с начального значения. Stream - это просто функция unfoldr с начальным состоянием. (Библиотека Data.Vector.Fusion.Stream.Monadic использует GADT для создания конструкторов для Step которые могут быть удобно сопоставлены с образцами. Этого можно было бы сделать и без GADT, я думаю.)

Центральным элементом решения является mkBitstream :: BSL.ByteString → Stream Bool которая превращает BytesString в поток Bool. В основном, мы отслеживаем текущий ByteString, текущий байт и сколько текущего байта все еще не загружено. Всякий раз, когда байт используется, другой байт прерывается ByteString. Когда Nothing не осталось, поток будет Done.

Функция longestRun берется прямо из решения @Cirdec.

Здесь этюд:

{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where

import           Control.Monad.Identity            (Identity)
import           Data.Bits                         (shiftR, (.&.))
import qualified Data.ByteString.Lazy              as BSL
import           Data.Functor.Identity             (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import           Data.Word8                        (Word8)

type Stream a = S.Stream Identity a   -- no need for any monad, really

data Step = Step BSL.ByteString !Word8 !Word8   -- could use tuples, but this is faster

mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
    {-# INLINE_INNER step #-}
    step (Step bs w n) | n==0 = case (BSL.uncons bs) of
                            Nothing        -> return S.Done
                            Just (w', bs') -> return $ 
                                S.Yield (w' .&. 1 == 1) (Step bs' (w' 'shiftR' 1) 7)
                       | otherwise = return $ 
                                S.Yield (w .&. 1 == 1) (Step bs (w 'shiftR' 1) (n-1))


data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x  = LongestRun x current (max current longest)
    where current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
    (LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
    return longest

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun (mkBitstream bs)