Ограничивающие скорость каналы core.async в Clojure
Я использую Clojure с core.async и имею ситуацию, когда я хочу установить ограничение скорости на количество сообщений, обрабатываемых через канал.
В частности, я хотел бы:
- Определите ограничение скорости, например. 1000 сообщений в секунду
- Обычно обрабатывайте сообщения (и быстро), пока количество сообщений меньше, чем ограничение скорости
- Есть какая-то разумная альтернативная обработка событий, если предел превышен (например, попросить клиента повторить попытку позже)
- Имеют достаточно низкие накладные расходы
Какой лучший способ достичь этого?
Ответы
Ответ 1
Повреждение проблемы:
- Определите ограничение скорости, например. 1000 сообщений в секунду
- Обрабатывать сообщения обычно (и быстро), если количество сообщений меньше
чем предел скорости
- Есть какая-то разумная альтернативная обработка событий, если предел превышения скорости (например, сообщение клиенту
повторите попытку позже)
- Имеют достаточно низкие накладные расходы
Я подхожу к проблеме с решением, которое просто компонует каналы в циклах.
Общий алгоритм ограничения скорости называется токеновым ведром. У вас есть ведро фиксированных размеров токенов, и вы добавляете токены с фиксированной скоростью. Пока у вас есть токен, вы можете отправить сообщение.
Размер ковша определяет "взрывность" (насколько быстро вы можете догнать максимальную скорость), а скорость определяет максимальную среднюю скорость. Это будут параметры для нашего кода.
Позвольте создать канал, который отправляет сообщение (неважно, что) с заданной скоростью. (# 1)
(defn rate-chan [burstiness rate]
(let [c (chan burstiness) ;; bucket size is buffer size
delta (/ 1000 rate)]
(go
(while true
(>! c :go) ;; send a token, will block if bucket is full
(<! (timeout delta)))) ;; wait a little
c))
Теперь нам нужен канал, который ограничивает другой канал по скорости. (# 2)
(defn limit-chan [in rc]
(let [c (chan)]
(go
(while true
(<! rc) ;; wait for token
(>! c (<! in)))) ;; pass message along
c))
Теперь мы можем использовать эти каналы по умолчанию, если нет ожидающего сообщения:
(defn chan-with-default [in]
(let [c (chan)]
(go
(while true
;; take from in, or if not available, pass useful message
(>! c (alts! [in] :default :rate-exceeded))))
c))
Теперь у нас есть все части для решения проблемы.
(def rchan (-> (chan)
(limit-chan (rate-chan 100 1000))
(chan-with-default)))
Что касается №4, это не абсолютное быстрое решение. Но это тот, который использует составные части и, вероятно, будет достаточно быстрым. Если вы хотите это быстрее, вы можете сделать один цикл, чтобы сделать все это (вместо того, чтобы разлагать его на более мелкие функции). Самый быстрый способ - реализовать .
Ответ 2
Здесь один из способов использования атома, чтобы подсчитать, сколько сообщений отправляется и периодически пересылать его на ноль:
(def counter (atom 0))
(def time-period 1000) ;milliseconds
(def max-rate 1000) ;max number of messages per time-period
(def ch (chan))
(defn alert-client []
(println "That enough!"))
(go (while true (<! (timeout time-period)) (reset! counter 0))) ; reset counter periodically
(defn process [msg]
(if (> (swap! counter inc) max-rate) (alert-client) (put! ch msg)))
(doseq [x (range 1001)] (process x)) ; throw some messages at the channel
Вам понадобится еще один код для использования сообщений с канала. Если вы не уверены, что сможете последовательно потреблять сообщения со скоростью, на которую вы их регулируете, вы, вероятно, захотите указать размер буфера канала или тип канала (падение/скользящее).
Ответ 3
Я написал небольшую библиотеку, чтобы решить эту проблему. Его реализация жутко похожа на Eric Normand's, но с некоторыми мерами для каналов с высокой пропускной способностью (таймаут не является точным для почти миллисекундного времени сна).
Он также поддерживает дросселирование группы каналов глобально и функцию дросселирования.
Проверьте здесь.
Ответ 4
То, что вы ищете, называется автоматическим выключателем. Я думаю, что страница в Википедии довольно плохое описание:
http://en.wikipedia.org/wiki/Circuit_breaker_design_pattern
Хотя наши друзья Scala сделали абсолютно фантастически:
http://doc.akka.io/docs/akka/2.2.3/common/circuitbreaker.html
Существует также библиотека clojure, но вам нужно выполнить интеграцию с core.async
самостоятельно:
https://github.com/krukow/clojure-circuit-breaker
https://github.com/josephwilk/circuit-breaker
Сообщение в блоге о выключателях и масштабировании с помощью clojure:
http://blog.josephwilk.net/clojure/building-clojure-services-at-scale.html
Похоже, вы можете рассмотреть что-то вроде netflix Hystrix, который обеспечивает привязки clojure:
https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-clj
НТН