Как одновременно обрабатывать элементы в коллекции в Java
Мне нужно обрабатывать элементы в экземпляре Collection одновременно.
Другими словами, вместо повторения экземпляра Collection
for (Someclass elem : coll){
process(elem);
}
Мне нравится обрабатывать эти элементы одновременно. Скажем, что-то вроде ConcurrentCollectionExecutor(coll, new Callable{…}, numberOfThreads)
. Кроме того, необходимо установить ряд одновременных потоков.
Любой гибкий шаблон уже существует?
Ответы
Ответ 1
Сделайте метод process методом run() в классе MyRunnable, который реализует Runnable и конструктор которого принимает elem в качестве входных данных и сохраняет его как переменную экземпляра. Затем используйте:
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
for (Someclass elem : coll){
Runnable worker = new MyRunnable(elem);
executor.execute(worker);
}
Ответ 2
Хорошим решением будет:
- создать ArrayBlockingQueue, содержащий элементы для обработки
- создать ExecutorService, чтобы выполнить вашу обработку одновременно
- создайте экземпляр
Runnable
, указав ArrayBlockingQueue как параметр
- Реализуйте метод
run
: пока в очереди есть элементы, опросите их и обработайте их
- Отправьте
Runnable
в ExecutorService
Код:
BlockingQueue<Someclass> toProcess =
new ArrayBlockingQueue<Someclass>(coll.size(), false, coll);
ExecutorService es = Executors.newFixedThreadPool(numberOfThreads);
for(int count = 0 ; count < numberOfThreads ; ++c) {
es.submit(new MyRunnable(toProcess));
}
private static class MyRunnable() implements Runnable {
private final BlockingQueue<Someclass> toProcess;
public MyRunnable(BlockingQueue<Someclass> toProcess) {
this.toProcess = toProcess;
}
@Override
public void run() {
Someclass element = null;
while((element = toProcess.poll()) != null) {
process(element);
}
}
}
Ответ 3
Ниже "ручной" версии такого класса исполнителя. Обратите внимание, что вы должны передать туда не экземпляр Callable
(или Runnable
), а имя класса такого класса.
public class ConcurrentCollectionExecutor<T> {
private Collection<T> collection;
private Class<Runnable> processor;
private int numberOfThreads;
private Executor executor;
public ConcurrentCollectionExecutor(Collection<T> collection, Class<Runnable> processor, int numberOfThreads) {
this.collection = collection;
this.processor = processor;
this.numberOfThreads = numberOfThreads;
this.executor = Executors.newFixedThreadPool(numberOfThreads);
}
public void run() {
try {
Constructor<Runnable> constructor = null;
for (T t : collection) {
if (constructor == null) {
constructor = processor.getConstructor(t.getClass());
}
executor.execute(constructor.newInstance(t));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Ответ 4
Я не знаю никаких шаблонов для этого, но в качестве идеи вы можете делить свои элементы коллекции на количество потоков, поэтому каждый поток обрабатывает X-элементы, например:
Коллекция содержит 20 элементов, каждая ваша функция обеспечивает 4 потока, а затем стажер вы начинаете их:
thread1 gets the elements [0 .. 4]
thread2 gets the elements [5 .. 9]
thread3 gets the elements [10 .. 14]
thread1 gets the elements [15 .. 19]
Обратите внимание, что удаление элементов из коллекции может вызвать проблемы, тогда специально поток 4 пытается получить доступ к элементу [19], в то время как в вашей коллекции меньше 20 элементов.
ИЗМЕНИТЬ
Как упоминалось в мозге в зависимости от времени процесса элементов, эта идея может быть неэффективной, как если бы обработка одного из первых 5 элементов заняла 10 секунд, а остальные элементы заняли только 0,5 секунды, тогда thread1 будет занят, но другие потоки будут в конечном итоге не работает параллельно очень долго.