Параллельная доза q для Clojure
Я вообще не использовал многопоточность в Clojure, поэтому не знаю, с чего начать.
У меня есть doseq
, тело которого может работать параллельно. Я бы хотел, чтобы всегда было 3 потока (оставляя 1 ядро бесплатно), которые оценивают тело параллельно, пока диапазон не исчерпан. Нет никакого общего состояния, ничего сложного - эквивалент многопроцессорности Python был бы прекрасен.
Так что-то вроде:
(dopar 3 [i (range 100)]
; repeated 100 times in 3 parallel threads...
...)
С чего начать? Есть ли команда для этого? Стандартный пакет? Хорошая ссылка?
До сих пор я нашел pmap
и мог бы использовать это (как мне ограничить 3 за раз? похоже, что он использует 32 за раз - нет, источник говорит 2 + количество процессоров), но похоже, что это базовый примитив, который уже должен существовать где-то.
пояснение: я действительно хотел бы контролировать количество потоков. У меня есть процессы, которые долгое время работают и используют достаточное количество памяти, поэтому создание большого количества и надеяться, что все будет хорошо, - это не хороший подход (пример, который использует значительный фрагмент доступный mem).
update: начиная писать макрос, который делает это, и мне нужен семафор (или мьютекс, или атом, который я могу ждать). Существуют ли семафоры в Clojure? Или я должен использовать ThreadPoolExecutor? Кажется странным, что приходится так много тянуть с Java - я думал, что параллельное программирование в Clojure должно быть легким... Может быть, я думаю об этом совершенно неправильно? Хммм. Агенты?
Ответы
Ответ 1
pmap
на самом деле отлично работает в большинстве случаев - он использует пул потоков с разумным количеством потоков для вашей машины. Я бы не стал пытаться создать свои собственные механизмы для управления количеством потоков, если у вас нет реальных контрольных доказательств того, что по умолчанию возникают проблемы.
Сказав, что если вы действительно хотите ограничить максимум тремя потоками, простой подход состоит в том, чтобы просто использовать pmap для трех подмножеств диапазона:
(defn split-equally [num coll]
"Split a collection into a vector of (as close as possible) equally sized parts"
(loop [num num
parts []
coll coll
c (count coll)]
(if (<= num 0)
parts
(let [t (quot (+ c num -1) num)]
(recur (dec num) (conj parts (take t coll)) (drop t coll) (- c t))))))
(defmacro dopar [thread-count [sym coll] & body]
`(doall (pmap
(fn [vals#]
(doseq [~sym vals#]
[email protected]))
(split-equally ~thread-count ~coll))))
Обратите внимание на использование doall
, которое необходимо для принудительной оценки pmap
(что является ленивым).
Ответ 2
ОК, я думаю, что я хочу иметь agent
для каждого цикла, с данными, отправленными агенту с помощью send
. Агенты, запускаемые с использованием send
, запускаются из пула потоков, поэтому число ограничено каким-то образом (он не дает мелкозернистого управления ровно тремя потоками, но теперь это нужно будет сделать).
[Дэйв Рэй объясняет в комментариях: для управления размером пула мне нужно будет написать свой собственный)
(defmacro dopar [seq-expr & body]
(assert (= 2 (count seq-expr)) "single pair of forms in sequence expression")
(let [[k v] seq-expr]
`(apply await
(for [k# ~v]
(let [a# (agent k#)]
(send a# (fn [~k] [email protected]))
a#)))))
который можно использовать как:
(deftest test-dump
(dopar [n (range 7 11)]
(time (do-dump-single "/tmp/single" "a" n 10000000))))
Ура! Работает! Я рок! (ОК, Clojure скалы немного). Связанный пост в блоге.
Ответ 3
Почему бы вам просто не использовать pmap? Вы по-прежнему не можете контролировать threadpool, но это намного меньше, чем писать настраиваемый макрос, который использует агенты (почему бы не фьючерсы?).
Ответ 4
У меня была аналогичная проблема со следующими требованиями:
- Контролировать количество используемых потоков;
- Быть агностиком относительно управления пулом потоков;
- Порядок выполнения задач не требуется;
- Время обработки задач может быть разным, поэтому упорядочение задач не должно, но задача, которая заканчивается раньше, должна быть возвращена ранее;
- Оцените и введите последовательность ввода лениво;
- Элементы во входной последовательности не должны считываться за пределы, но должны буферизироваться и считываться в соответствии с возвращаемыми результатами, чтобы избежать проблем с отсутствием памяти.
Функция ядра pmap
удовлетворяет только двум последним предположениям.
Вот реализация, которая удовлетворяет этим предположениям, используя стандартный пул потоков Java ExecutorService
вместе с CompletionService
и некоторым разделением входного потока:
(require '[clojure.tools.logging :as log])
(import [java.util.concurrent ExecutorService ExecutorCompletionService
CompletionService Future])
(defn take-seq
[^CompletionService pool]
(lazy-seq
(let [^Future result (.take pool)]
(cons (.get result)
(take-seq pool)))))
(defn qmap
[^ExecutorService pool chunk-size f coll]
(let [worker (ExecutorCompletionService. pool)]
(mapcat
(fn [chunk]
(let [actual-size (atom 0)]
(log/debug "Submitting payload for processing")
(doseq [item chunk]
(.submit worker #(f item))
(swap! actual-size inc))
(log/debug "Outputting completed results for" @actual-size "trades")
(take @actual-size (take-seq worker))))
(partition-all chunk-size coll))))
Как видно, qmap
не создает экземпляр самого пула потоков, а только ExecutorCompletionService
. Это позволяет, например, передавать фиксированный размер ThreadPoolExecutorService
. Кроме того, поскольку qmap
возвращает ленивую последовательность, он не может и не должен сам управлять ресурсом пула потоков. Наконец, chunk-size
позволяет ограничить, сколько элементов входной последовательности реализовано и представлено как задачи сразу.
Нижеприведенный код демонстрирует правильное использование:
(import [java.util.concurrent Executors])
(let [thread-pool (Executors/newFixedThreadPool 3)]
(try
(doseq [result (qmap thread-pool
;; submit no more than 500 tasks at once
500
long-running-resource-intensive-fn
unboundedly-large-lazy-input-coll)]
(println result))
(finally
;; (.shutdown) only prohibits submitting new tasks,
;; (.shutdownNow) will even cancel already submitted tasks.
(.shutdownNow thread-pool))))
Вот документация для некоторых из используемых классов Java concurrency:
Ответ 5
Не уверен, что это идиоматично, поскольку я все еще довольно новичок с Clojure, но для меня работает следующее решение, и оно также выглядит довольно кратким:
(let [number-of-threads 3
await-timeout 1000]
(doseq [p-items (partition number-of-threads items)]
(let [agents (map agent p-items)]
(doseq [a agents] (send-off a process))
(apply await-for await-timeout agents)
(map deref agents))))
Ответ 6
На самом деле библиотека теперь делает именно это. Из их github
:
Библиотека claypoole предоставляет параллельные версии на основе пула Clojure, такие как pmap
, future
и for
.
Он предоставляет как упорядоченные, так и неупорядоченные версии для того же самого.