Простая структура Java Map/Reduce
Может ли кто-нибудь указать мне на простой, с открытым исходным кодом Map/Reduce framework/API для Java? Кажется, что не существует много доказательств существования такой вещи, но кто-то другой может знать разные.
Лучшее, что я могу найти, это, конечно же, Hadoop MapReduce, но это не соответствует "простым" критериям. Мне не нужна возможность запуска распределенных заданий, просто что-то, позволяющее мне запускать задания на карте/сокращении на многоядерном компьютере в одном JVM, используя стандартный стиль Java concurrency.
Не сложно писать, но мне бы этого не хотелось.
Ответы
Ответ 1
Я думаю, что стоит упомянуть, что эти проблемы - это история с Java 8. Пример:
int heaviestBlueBlock =
blocks.filter(b -> b.getColor() == BLUE)
.map(Block::getWeight)
.reduce(0, Integer::max);
Другими словами: single-node MapReduce доступен в Java 8.
Подробнее см. Презентация Брайана Гетца о проекте лямбда
Ответ 2
Вы проверили Akka? Хотя akka - действительно распределенная модель на основе concurrency на основе Actor, вы можете реализовать много вещей просто с небольшим кодом. Просто так легко разделить работу на куски с ней, и она автоматически использует все преимущества многоядерной машины, а также позволяет использовать несколько машин для обработки работы. В отличие от использования потоков, мне кажется более естественным.
У меня есть Java пример сокращения карты с помощью akka. Это не самая простая карта, сводящая пример, поскольку она использует фьючерсы; но это должно дать вам общее представление о том, что связано с этим. Есть несколько важных моментов, которые демонстрирует пример моей карты:
- Как разделить работу.
- Как назначить работу: akka имеет действительно простую систему обмена сообщениями, а также работу partioner, график которой вы можете настроить. Как только я научился использовать его, я не мог остановиться. Это просто так просто и гибко. Я использовал все четыре ядра процессора в кратчайшие сроки. Это действительно отлично подходит для реализации сервисов.
- Как узнать, когда работа выполнена, и результат готов к обработке: на самом деле это часть, которая может быть самой сложной и запутанной для понимания, если вы уже не знакомы с Futures. Вам не нужно использовать Futures, так как есть другие варианты. Я просто использовал их, потому что мне нужно было что-то более короткое, чтобы люди могли заглянуть.
Если у вас есть какие-либо вопросы, у StackOverflow действительно есть класс Akka akka.
Ответ 3
Я использую следующую структуру
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);
List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
reduce(future);
Ответ 4
Я понимаю, что это может быть немного после факта, но вы можете взглянуть на классы JSR166y ForkJoin из JDK7.
Существует обратная портированная библиотека, которая работает под JDK6 без каких-либо проблем, поэтому вам не нужно ждать, пока следующее тысячелетие не пойдет с ней. Он сидит где-то между сырым исполнителем и хауопом, дающим основу для работы на карте, уменьшает работу в текущей JVM.
Ответ 5
Я создал один-для себя пару лет назад, когда получил 8-ядерную машину, но я был не очень доволен этим. Мне никогда не приходилось так просто использовать, как я надеялся, а задачи с интенсивной памятью не очень хорошо масштабировались.
Если у вас нет реальных ответов, я могу поделиться больше, но ядро этого:
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
while (inputIterator.hasNext()) {
TMapInput m = inputIterator.next();
Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
futureSet.add(f);
Thread.sleep(10);
}
while (!futureSet.isEmpty()) {
Thread.sleep(5);
for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
Future<TMapOutput> f = fit.next();
if (f.isDone()) {
fit.remove();
TMapOutput x = f.get();
m_reducer.reduce(x);
}
}
}
return m_reducer.getResult();
}
}
EDIT: на основе комментария ниже представлена версия без sleep
. Хитрость заключается в использовании CompletionService
, который по существу обеспечивает блокирующую очередь завершенных Future
s.
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Collection<TMapInput> input) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
CompletionService<TMapOutput> futurePool =
new ExecutorCompletionService<TMapOutput>(pool);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
for (TMapInput m : input) {
futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
}
pool.shutdown();
int n = futureSet.size();
for (int i = 0; i < n; i++) {
m_reducer.reduce(futurePool.take().get());
}
return m_reducer.getResult();
}
Я также отмечу, что это очень дистиллированный алгоритм сокращения карты, в том числе один сокращающий рабочий, который выполняет операцию сокращения и слияния.
Ответ 6
Мне нравится использовать Skandium для parallelism в Java. Рамка реализует определенные модели parallelism (а именно Master-Slave, Map/Reduce, Pipe, Fork and Divide и Conquer) для многоядерных машин с общей памятью. Этот метод называется "алгоритмическими скелетами". Шаблоны могут быть вложенными.
В деталях есть скелеты и мышцы. Мышцы выполняют фактическую работу (разделение, слияние, выполнение и условие). Скелеты представляют шаблоны parallelism, за исключением "While", "For" и "If", которые могут быть полезны при шаблонах вложенности.
Примеры можно найти внутри рамки. Мне нужно было немного понять, как использовать мышцы и скелеты, но, преодолев это препятствие, мне очень нравится эта структура.:)
Ответ 7
Вы посмотрели GridGain?
Ответ 8
Возможно, вы захотите взглянуть на веб-сайт проекта Functionals 4 Java: http://f4j.rethab.ch/ Он вводит фильтр, карту и сводится к java версии до 8.
Ответ 9
API-интерфейс MapReduce был введен в v3.2 Hazelcast (см. раздел MapReduce API в документах). Хотя Hazelcast предназначен для использования в распределенной системе, он отлично работает в одной установке node, и он довольно легкий.
Ответ 10
Вы можете попробовать LeoTask: работа с параллельной задачей и структура агрегации результатов
Это бесплатно и с открытым исходным кодом: https://github.com/mleoking/leotask
Вот краткое введение, показывающее его API: https://github.com/mleoking/leotask/blob/master/leotask/introduction.pdf?raw=true
Это легкая фреймворк, работающий на одном компьютере с использованием всех доступных процессорных ядер.
Он имеет следующие функции:
- Автоматическое и параллельное исследование пространства параметров
- Гибкая агрегирование результатов на основе конфигурации
- Модель программирования, ориентированная только на логику ключа
- Надежное и автоматическое восстановление прерывания.
и утилиты:
- Динамические и клонируемые структуры сетей.
- Интеграция с Gnuplot
- Создание сети в соответствии с общими сетевыми моделями
- DelimitedReader: сложный читатель, который исследует CSV (значения, разделенные запятыми), такие как база данных
- Генератор быстрых случайных чисел, основанный на алгоритме Мерсенн Твистер
- Интегрированный CurveFitter из проекта ImageJ