Почему Clojure зависает после выполнения моих расчетов?

Я экспериментирую с фильтрацией через элементы параллельно. Для каждого элемента мне нужно выполнить вычисление расстояния, чтобы увидеть, достаточно ли близко к целевой точке. Неважно, что структуры данных уже существуют для этого, я сейчас только начинаю эксперименты.

Во всяком случае, я хотел запустить некоторые очень простые эксперименты, где я генерирую случайные векторы и фильтрую их. Здесь моя реализация, которая делает все это

(defn pfilter [pred coll]
  (map second
    (filter first
      (pmap (fn [item] [(pred item) item]) coll))))

(defn random-n-vector [n]
  (take n (repeatedly rand)))

(defn distance [u v]
  (Math/sqrt (reduce + (map #(Math/pow (- %1 %2) 2) u v))))

(defn -main [& args]
  (let [[n-str vectors-str threshold-str] args
        n (Integer/parseInt n-str)
        vectors (Integer/parseInt vectors-str)
        threshold (Double/parseDouble threshold-str)
        random-vector (partial random-n-vector n)
        u (random-vector)]
    (time (println n vectors 
      (count 
        (pfilter 
          (fn [v] (< (distance u v) threshold))
          (take vectors (repeatedly random-vector))))))))

Код выполняет и возвращает то, что я ожидаю, это параметр n (длина векторов), векторы (число векторов) и число векторов, которые ближе, чем порог к целевому вектору. Я не понимаю, почему программы зависают в течение дополнительной минуты перед завершением.

Вот результат запуска, который демонстрирует ошибку

$ time lein run 10 100000 1.0
     [null] 10 100000 12283
     [null] "Elapsed time: 3300.856 msecs"

real    1m6.336s
user    0m7.204s
sys 0m1.495s

Любые комментарии о том, как фильтровать параллельно в целом, также более чем приветствуются, поскольку я еще не подтвердил, что pfilter действительно работает.

Ответы

Ответ 1

Вам нужно вызвать shutdown-agents, чтобы убить потоки, поддерживающие threadpool, используемые pmap.

О pfilter, он должен работать, но работать медленнее, чем filter, так как ваш предикат прост. Параллелизация не является бесплатной, поэтому вам нужно дать каждому потоку умеренно-интенсивные задачи, чтобы компенсировать многопоточные служебные данные. Перед тем, как фильтровать их, пакетьте свои элементы.