Ответ 1
Поэтому я могу немного рассказать об одном из анализов Chan
и TQueue
(который pipes-concurrency
используется внутри здесь), что мотивировало некоторые дизайнерские решения, которые вошли в unagi-chan
. Я не уверен, ответит ли он на ваш вопрос. Я рекомендую разворачивать разные очереди и играть с вариациями во время бенчмаркинга, чтобы получить реальное представление о том, что происходит.
Chan
Chan
выглядит следующим образом:
data Chan a
= Chan (MVar (Stream a)) -- pointer to "head", where we read from
(MVar (Stream a)) -- pointer to "tail", where values written to
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)
Это связанный список MVar
s. Два MVar
в типе Chan
действуют как указатели на текущую головку и хвост списка соответственно. Вот как выглядит запись:
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _ writeVar) val = do
new_hole <- newEmptyMVar mask_ $ do
old_hole <- takeMVar writeVar -- [1]
putMVar old_hole (ChItem val new_hole) -- [2]
putMVar writeVar new_hole -- [3]
В 1 автор берет блокировку на конец записи, по 2 наш элемент a
предоставляется читателю, а в 3 конец записи разблокируется для других авторов.
Это действительно хорошо работает в сценарии с одним потребителем/с одним продюсером (см. график здесь), потому что чтения и записи не бороться. Но как только у вас будет несколько одновременных авторов, вы можете начать с проблем:
-
писатель, который нажимает 1, когда другой писатель находится в 2, блокирует и будет назначаться (самый быстрый, который я смог измерить контекстным переключателем, составляет ~ 150 нс (довольно быстро), возможно, ситуации, когда это намного медленнее). Поэтому, когда вы получаете много писателей, утверждающих вы в основном делаете большой раунд через планировщик, в очередь ожидания для
MVar
, а затем, наконец, запись может завершиться. -
Когда писатель получает запланированный (по истечении времени ожидания), а в 2, он удерживается на блокировке, и никакие записи не могут быть завершены до тех пор, пока они не будут повторно перенесены; это становится больше проблемой, когда мы переубеждены, то есть когда соотношение потоков/ядер высокое.
Наконец, использование MVar
-per-item требует некоторых накладных расходов с точки зрения распределения, и что более важно, когда мы накапливаем много изменяемых объектов, мы можем вызвать много давления GC.
TQUEUE
TQueue
отлично, потому что STM
делает его очень простым, чтобы рассуждать о его правильности. Это функциональная очередь в стиле очереди, а write
состоит в простом чтении стека писателей, в том, что касается нашего элемента и записи его:
data TQueue a = TQueue (TVar [a])
(TVar [a])
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do
listend <- readTVar write -- a transaction with a consistent
writeTVar write (a:listend) -- view of memory
Если после того, как writeTQueue
напишет свой новый стек обратно, другая чередующаяся запись сделает то же самое, одна из записей будет повторена. Чем больше writeTQueue
чередуется, тем усиливается эффект раздора. Однако производительность ухудшается гораздо медленнее, чем в Chan
, потому что существует только одна операция writeTVar
, которая может лишить конкурирующих writeTQueue
s, а транзакция очень мала (просто чтение и a (:)
).
Чтение работает путем "декутации" стека со стороны записи, его реверсирования и сохранения перевернутого стека в своей переменной для легкого "выскакивания" (в целом это дает нам амортизацию O (1) push и pop)
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z
У читателей есть симметричная умеренная проблема, связанная с писателями. В общем случае читатели и писатели не спорят, однако, когда стек читателя исчерпан, читатели борются с другими читателями и писателями. Я подозреваю, что если вы предварительно загрузили TQueue
с достаточными значениями, а затем запустили 4 читателя и 4 автора, вы могли бы вызвать оживление, поскольку реверс должен был завершиться до следующей записи. Интересно также отметить, что в отличие от MVar
запись в TVar
, на которую ожидают многие читатели, разбуждает их все одновременно (это может быть более или менее эффективно, в зависимости от сценария).
Я подозреваю, что вы не видите значительных недостатков TQueue
в своем тесте; прежде всего, вы видите умеренные эффекты конкуренции за запись и накладные расходы на много выделения и GC'ing много изменяемых объектов.
унаги-чан
unagi-chan
был разработан, во-первых, для корректной обработки конфликта. Это концептуально очень просто, но реализация имеет некоторые сложности
data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))
data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))
data Cell a = Empty | Written a | Blocking (MVar a)
Для чтения и записи сторон очереди Stream
, на которой они координируют передаваемые значения (от писателя к считывателю) и указания на блокировку (от читателя к записи), а стороны чтения и записи имеют независимый атомный счетчик. Запись работает как:
-
писатель называет атомный
incrCounter
на счетчике записи, чтобы получить свой уникальный индекс, на котором координировать его (одно) считыватель -
писатель находит свою ячейку и выполняет CAS
Written a
-
если он успешно завершен, иначе он видит, что читатель побил его и блокирует (или переходит к блокировке), поэтому он выполняет
(\Blocking v)-> putMVar v a)
и выходит.
Чтение работает аналогичным и очевидным способом.
Первое нововведение состоит в том, чтобы превратить точку соприкосновения в атомарную операцию, которая не деградирует под конфликтом (как цикл CAS/retry или Chan-like lock). На основе простого бенчмаркинга и экспериментов лучше всего подходит примап-выборка и добавление, представленный библиотекой atomic-primops
.
Затем в 2 и читателю, и писателю необходимо выполнить только один сравнительный обмен и обмен (быстрый путь для чтения - простое неатомное чтение) для завершения координации.
Чтобы попытаться сделать unagi-chan
хорошим, мы
-
использовать fetch-and-add для обработки точки раздора
-
используйте методы безблокировки, так что, когда мы перенаправляем поток, который выполняется в неподходящее время, не блокирует прогресс для других потоков (заблокированный писатель может заблокировать не более, чем читатель, "назначенный" ему счетчиком; прочитайте предупреждения async исключений в
unagi-chan
docs и обратите внимание, чтоChan
имеет более приятную семантику здесь) -
используйте массив для хранения наших элементов, который имеет лучшую локальность (но ниже) ниже накладных расходов на элемент и оказывает меньшее давление на GC
Последняя заметка re. использование массива: одновременная запись в массив, как правило, является плохой идеей для масштабирования, потому что вы вызываете много трафика кеш-когерентности, поскольку кэш-линии недействительны в ваших потоках-писателях. Общий термин - "ложное разделение". Но есть и проблемы, связанные с кешем и минусами, к альтернативным проектам, которые я могу думать о том, что это будет полосатые записи или что-то еще; Я экспериментировал с этим немного, но на данный момент у меня нет ничего убедительного.
Одно место, где мы имеем законное отношение к ложному совместному использованию, находится в нашем счетчике, который мы выравниваем и помещаем в 64 байта; это действительно проявилось в тестах, и единственным недостатком является увеличение использования памяти.