Эффективные потоки битов в Haskell
В постоянном стремлении эффективно сражаться с битами (например, см. Этот вопрос SO) самой новой задачей является эффективная потоковая передача и потребление бит.
В качестве первой простой задачи я выбираю найти самую длинную последовательность идентичных бит в битовом потоке, генерируемом /dev/urandom
. Типичным заклинанием будет head -c 1000000 </dev/urandom | my-exe
head -c 1000000 </dev/urandom | my-exe
. Фактическая цель состоит в том, чтобы потоковые биты и декодировать гамма-код Elias, например, то есть коды, которые не являются кусками байтов или их кратных.
Для таких кодов переменной длины приятно иметь язык take
, takeWhile
, group
и т.д. Для манипулирования списками. Так как BitStream.take
самом деле потребляет часть биста, возможно, вступает в игру какая-то монада.
Очевидной отправной точкой является ленивая байтовая строка из Data.ByteString.Lazy
.
A. Подсчет байтов
Эта очень простая программа Haskell выполняет наравне с программой C, как и следовало ожидать.
import qualified Data.ByteString.Lazy as BSL
main :: IO ()
main = do
bs <- BSL.getContents
print $ BSL.length bs
B. Добавление байтов
Как только я начну использовать unpack
все должно стать хуже.
main = do
bs <- BSL.getContents
print $ sum $ BSL.unpack bs
Удивительно, что Haskell и C показывают почти такую же производительность.
C. Самая длинная последовательность идентичных бит
В качестве первой нетривиальной задачи самая длинная последовательность идентичных бит может быть найдена следующим образом:
module Main where
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.List (group)
import Data.Word8 (Word8)
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w 'shiftR' i) .&. 1 == 1) [0..7]
bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)
main :: IO ()
main = do
bs <- BSL.getContents
print $ maximum $ length <$> (group $ bitStream bs)
Лесткая байтовая трансформация преобразуется в список [Word8]
а затем, используя сдвиги, каждое Word
разбивается на биты, в результате чего появляется список [Bool]
. Этот список списков затем сглаживается с помощью concat
. Получив (ленивый) список Bool
, используйте group
чтобы разбить список на последовательности одинаковых битов, а затем отобразить length
над ним. Наконец, maximum
дает желаемый результат. Довольно просто, но не очень быстро:
# C
real 0m0.606s
# Haskell
real 0m6.062s
Эта наивная реализация на порядок медленнее.
Профилирование показывает, что выделяется довольно много памяти (около 3 ГБ для разбора 1 МБ ввода). Тем не менее, нет большой утечки пространства.
Отсюда я начинаю ковырять:
- Существует
bitstream
пакет, который обещает "быстро, упакованы, жесткие битовые потоки (т.е. список из BOOLS) с полуавтоматическим слиянием потока.". К сожалению, он не обновляется с текущим пакетом vector
, см. Здесь подробности. - Затем я исследую
streaming
. Я не совсем понимаю, зачем мне нужно "эффектное" потоковое вещание, которое приносит какую-нибудь монаду в игру - по крайней мере, до тех пор, пока я не начну с обратной задачи поставленной задачи, то есть кодирования и записи битовых потоков в файл. - Как просто
fold
-ing над ByteString
? Я должен был бы ввести состояние, чтобы отслеживать потребляемые биты. Это не совсем приятное take
, takeWhile
, group
и т.д. Язык, который желателен.
И теперь я не совсем уверен, куда идти.
Обновление:
Я понял, как это сделать с streaming
и streaming-bytestring
. Я, вероятно, не делаю этого правильно, потому что результат катастрофически плох.
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import Data.Word8 (Word8)
import qualified Streaming as S
import Streaming.Prelude (Of, Stream)
import qualified Streaming.Prelude as S
splitByte :: Word8 -> [Bool]
splitByte w = (\i-> (w 'shiftR' i) .&. 1 == 1) <$> [0..7]
bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s
main :: IO ()
main = do
let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
gs = S.group $ bitStream bs :: Stream (Stream (Of Bool) IO) IO ()
maxLen <- S.maximum $ S.mapped S.length gs
print $ S.fst' maxLen
Это проверит ваше терпение с чем-то, кроме нескольких тысяч байтов ввода от stdin. Профайлер говорит, что он проводит безумное количество времени (квадратичное по размеру ввода) в Streaming.Internal.>>=.loop
и Data.Functor.Of.fmap
. Я не совсем уверен, что такое первый, но fmap
указывает (?), Что жонглирование этих Of ab
не приносит нам никакой пользы, и поскольку мы находимся в монаде IO, его нельзя оптимизировать.
У меня также есть потоковый эквивалент байтового сумматора : SumBytesStream.hs
, который немного медленнее, чем простая ленивая реализация ByteString
, но по-прежнему приличная. Поскольку streaming-bytestring
провозглашается как "bytestring io done right", я ожидал лучшего. Тогда я, вероятно, не сделаю это правильно.
В любом случае, все эти бит -c не должны происходить в монаде IO. Но BSS.getContents
заставляет меня в монаду IO, потому что getContents :: MonadIO m => ByteString m()
и нет выхода.
Обновление 2
Следуя совету @dfeuer, я использовал streaming
пакет в master @HEAD. Вот результат.
longest-seq-c 0m0.747s (C)
longest-seq 0m8.190s (Haskell ByteString)
longest-seq-stream 0m13.946s (Haskell streaming-bytestring)
Проблема O (n ^ 2) Streaming.concat
решена, но мы все еще не приближаемся к эталону C.
Обновление 3
Решение Cirdec дает производительность по аналогии с C. Используемая конструкция называется "Церковно-кодированные списки", см. Этот ответ SO или Haskell Wiki по типам ранга-N.
Исходные файлы:
Все исходные файлы можно найти в github. В Makefile
есть все различные цели для запуска экспериментов и профилирования. По умолчанию make
будет просто создавать все (сначала создайте каталог bin/
!), А затем make time
для выполнения самых longest-seq
исполняемых файлов. В исполняемых файлах C добавляется -c
чтобы различать их.
Ответы
Ответ 1
Промежуточные распределения и их соответствующие накладные расходы могут быть удалены, когда операции с потоками сливаются вместе. Прелюдия GHC обеспечивает фальсификацию fold/build для ленивых потоков в виде правил перезаписи. Общая идея состоит в том, что если одна функция создает результат, который выглядит как foldr (у него есть тип (a → b → b) → b → b
примененный к (:)
и []
), а другая функция расходует список который выглядит как foldr, можно создать промежуточный список.
Для вашей проблемы я собираюсь построить что-то подобное, но используя строгие левые складки (foldl'
) вместо foldr. Вместо использования правил перезаписи, которые пытаются обнаружить, когда что-то похоже на foldl
, я буду использовать тип данных, который заставит списки выглядеть как левые складки.
-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}
Поскольку я начал с отказа от списков, мы будем повторно внедрять часть прелюдии для списков.
Строгие левые складки могут быть созданы из функций foldl'
как списков, так и байтов.
{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (\c z -> foldl' c z l)
{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (\c z -> BSL.foldl' c z l)
Простейшим примером его использования является поиск длины списка.
{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (\z a -> z+1) 0
Мы также можем отображать и объединять левые складки.
{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z)
{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)
Для вашей проблемы мы должны иметь возможность разбить слово на биты.
{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w 'shiftR' i) .&. 1 == 1) [0..7]
{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte
И ByteString
в биты
{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS
Чтобы найти самый длинный прогон, мы будем отслеживать предыдущее значение, длину текущего прогона и длину самого длинного пробега. Мы создаем поля строгими, чтобы строгость складки препятствовала накоплению накопленных в памяти цепей. Создание строгого типа данных для состояния - это простой способ получить контроль над представлением своей памяти и при оценке его полей.
data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
where
current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
where
(LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)
И мы закончили
main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun $ bitStream' bs
Это намного быстрее, но не совсем производительность c.
longest-seq-c 0m00.12s (C)
longest-seq 0m08.65s (Haskell ByteString)
longest-seq-fuse 0m00.81s (Haskell ByteString fused)
Программа выделяет около 1 Мб для чтения 1000000 байт с ввода.
total alloc = 1,173,104 bytes (excludes profiling overheads)
Обновлен код github
Ответ 2
Я нашел другое решение, которое находится на одном уровне с C. Data.Vector.Fusion.Stream.Monadic
имеет реализацию потока, основанную на этой статье Coutts, Leshchinskiy, Stewart 2007. Идея этого заключается в использовании слияния с потоком destroy/unfoldr.
Напомним, что список unfoldr :: (b → Maybe (a, b)) → b → [a]
создает список, повторно применяя (разворачивая) функцию шага вперед, начиная с начального значения. Stream
- это просто функция unfoldr
с начальным состоянием. (Библиотека Data.Vector.Fusion.Stream.Monadic
использует GADT для создания конструкторов для Step
которые могут быть удобно сопоставлены с образцами. Этого можно было бы сделать и без GADT, я думаю.)
Центральным элементом решения является mkBitstream :: BSL.ByteString → Stream Bool
которая превращает BytesString
в поток Bool
. В основном, мы отслеживаем текущий ByteString
, текущий байт и сколько текущего байта все еще не загружено. Всякий раз, когда байт используется, другой байт прерывается ByteString
. Когда Nothing
не осталось, поток будет Done
.
Функция longestRun
берется прямо из решения @Cirdec.
Здесь этюд:
{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where
import Control.Monad.Identity (Identity)
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.Functor.Identity (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import Data.Word8 (Word8)
type Stream a = S.Stream Identity a -- no need for any monad, really
data Step = Step BSL.ByteString !Word8 !Word8 -- could use tuples, but this is faster
mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
{-# INLINE_INNER step #-}
step (Step bs w n) | n==0 = case (BSL.uncons bs) of
Nothing -> return S.Done
Just (w', bs') -> return $
S.Yield (w' .&. 1 == 1) (Step bs' (w' 'shiftR' 1) 7)
| otherwise = return $
S.Yield (w .&. 1 == 1) (Step bs (w 'shiftR' 1) (n-1))
data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
where current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
(LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
return longest
main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun (mkBitstream bs)