Ответ 1
Здесь я беру на себя это. Я решил использовать структуры данных Clojure, чтобы понять, как это будет работать. Обратите внимание, что было бы совершенно обычным и идиоматичным взять блокирующую очередь из панели инструментов Java и использовать ее здесь; Я думаю, что код будет легко адаптироваться. Обновление: я действительно адаптировал его к java.util.concurrent.LinkedBlockingQueue
, см. Ниже.
clojure.lang.PersistentQueue
Вызовите (pro-con)
, чтобы запустить пробный запуск; затем просмотрите содержимое output
, чтобы узнать, произошло ли что-либо, и queue-lengths
, чтобы увидеть, остались ли они в пределах данной границы.
Обновление. Чтобы объяснить, почему я почувствовал необходимость использовать ensure
ниже (об этом я спрашивал об этом в IRC), это необходимо для предотвращения перекоса записи (см. статью в Википедии о Изоляция снимков для определения). Если бы я заменил @queue
на (ensure queue)
, для двух или более производителей можно было бы проверить длину очереди, найти, что она меньше 4, а затем поместить дополнительные элементы в очередь и, возможно, привести общую длину очередь выше 4, нарушая ограничение. Аналогично, два пользователя, делающие @queue
, могут принять один и тот же элемент для обработки, а затем поместить два элемента из очереди. ensure
предотвращает любой из этих сценариев.
(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn queue-length-watch [_ _ _ new-queue-state]
(dosync (alter queue-lengths conj (count new-queue-state))))
(add-watch queue :queue-length-watch queue-length-watch)
(defn producer [tag]
(future
(while @go-on?
(if (dosync (let [l (count (ensure queue))]
(when (< l *max-queue-length*)
(alter queue conj tag)
true)))
(Thread/sleep (rand-int 2000))))))
(defn consumer []
(future
(while @go-on?
(Thread/sleep 100) ; don't look at the queue too often
(when-let [item (dosync (let [item (first (ensure queue))]
(alter queue pop)
item))]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))
java.util.concurrent.LinkedBlockingQueue
Версия выше написана с использованием LinkedBlockingQueue
. Обратите внимание, как общий контур кода в основном тот же, причем некоторые детали на самом деле немного чище. Я удалил queue-lengths
из этой версии, так как LBQ
позаботится об этом ограничении для нас.
(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn producer [tag]
(future
(while @go-on?
(.put queue tag)
(Thread/sleep (rand-int 2000)))))
(defn consumer []
(future
(while @go-on?
;; I'm using .poll on the next line so as not to block
;; indefinitely if we're done; note that this has the
;; side effect that nulls = nils on the queue will not
;; be handled; there a number of other ways to go about
;; this if this is a problem, see docs on LinkedBlockingQueue
(when-let [item (.poll queue)]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))