Java concurrency: выполнение множества "бесконечных" задач с несколькими потоками
Я создаю (параллельный) симулятор для набора из N частиц, движущихся в пространстве согласно законам Ньютона.
Моя идея - это модель каждой частицы как задача, которая взаимодействует с другими частицами (задачами), чтобы получить свои позиции и массы, чтобы вычислить силу, на которую она подвержена.
Каждая задача частицы - это что-то вроде
while(true){
force = thisParticle.calculateNetForce(allTheParticles);
thisParticle.waitForAllTheParticlesToCalculateNetForce(); // synchronization
thisParticle.updatePosition(force);
thisParticle.waitForAllTheParticlesToUpdateTheirState(); // synchronization
}
У меня может быть много частиц (100 или более), поэтому я не могу создать такое количество потоков Java (которые отображаются на физические потоки).
Моя идея - использовать потоки Runtime.getRuntime().availableProcessors()+1
, на которые могут выполняться многие задачи.
Однако я не могу использовать FixedThreadExecutor, потому что задачи-частицы не заканчиваются. Я хотел бы использовать FixedThreadExecutor, который также может выполнять своеобразное планирование внутри. Знаете ли вы что-то для этой цели?
Или вы могли бы предложить мне более подходящие подходы для моделирования такой системы с точки зрения concurrency (например, разной декомпозиции задачи)?
P.s.: Я ограничен "классическими" concurrency механизмами, не считая актеров или аналогичных архитектур.
Ответы
Ответ 1
Самый большой убийца для производительности, скорее всего, будет проверкой безопасности потока, которую вы выполняете, чтобы гарантировать, что все частицы взаимодействуют безопасным потоком. Я предлагаю вам использовать один поток на ядро и попытаться минимизировать взаимодействие между потоками. Это можно сделать, разделив пространство на потоки, например. половина X, половина Y, половина Z делит пространство на 8. Вы смотрите на все взаимодействия в каждом пространстве одновременно и независимо друг от друга, и вам нужно только беспокоиться, когда частица переходит из одного пространства/потока в другой.
Ответ 2
Я бы предположил, что вы храните все ваши частицы, возможно, в массиве из 2-мерного массива? Это был бы отличный кандидат для Fork-Join Framework.
Вы разделили бы вычисления частей массива на меньшие части. Вы продолжаете разделение до определенного размера. Наконец, вы вычисляете и возвращаете. Возвращаемое значение затем будет объединено и рассчитано с другой стороной дерева.
Ответ 3
Вместо потока на частицу я создам ExecutorService с соответствующим количеством потоков. Я бы сохранил частицы в списке (или какой-либо другой коллекции). Я бы создал отдельные части работы, которые будут выполняться (как Runnable или Callable) для каждого этапа вычисления и обновления частиц. Когда вы отправляете часть работы исполнителю, вы возвращаетесь в Будущее. Поместите эти фьючерсы в коллекцию. После того, как вы отправили все части работы, которые вы хотите запустить параллельно, вы перебираете свой список фьючерсов и вызываете get()
на каждом из них, чтобы реализовать свои шаги синхронизации.
Вы можете создать небольшое POJO, чтобы связать частицу и вычислить силу (или спрятать расчетную силу в экземпляре частицы).
Ответ 4
Почему вы не делаете вычисления в дискретных шагах?
while(true){
for(Particle p : allParticles){
force = p.calculateNetForce(allParticles);
p.setNextPosition(force); //Remembers, but doesn't change the current position
}
for(Particle p : allParticles){
p.nextState(); //Change the position
}
}
Сначала вычислите силу для каждой частицы, но не измените ее текущее состояние. После того, как вы рассчитали его для каждой частицы, обновите его внутреннее состояние в соответствии с вашими предыдущими вычислениями. Таким образом, даже одного потока будет достаточно, и, конечно, вы можете разделить вычисления на несколько потоков, но вам потребуется дополнительная синхронизация.
ОБНОВЛЕНИЕ JAVA 8
Используя Java 8, вы можете использовать многоядерные системы, не заботясь о потоках, синхронизации и т.д.
while(true){
allParticles.parallelStream().forEach(p -> {
double force = p.calculateNetForce(allParticles);
p.setNextPosition(force)
});
allParticles.parallelStream().forEach(p -> p.nextState());
}
Ответ 5
Для каждой частицы вы вызываете calculateNetForce(allTheParticles)
, что, я полагаю, делает ваши вычисления пропорциональными O (N ^ 2) (квадрат числа всех частиц). Это главный убийца производительности, и вам лучше найти алгоритм со сложностью O (N), и только затем попытайтесь распараллелить. С головы до ног я могу предложить сначала вычислить суммарную массу и центр тяжести для всех частиц. Затем для каждой частицы вычислите массу и центр остальных частиц. Это можно сделать, взяв общую массу и центр и добавив "дыру" с отрицательной массой вместо текущей частицы. Затем вычислите силу между частицей и остальными. Расчеты для каждой частицы независимы и могут быть распараллелены любым из способов, предложенных другими комментаторами.
Ответ 6
Частица должна быть самой Runnable и Callable, это позволит вам избежать создания большого количества дополнительных объектов и синхронизировать различные шаги. Вот SSCCEE:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Particle implements Callable<Void> {
private enum ParticleState {
POSITION_UPDATED, FORCE_CALCULATED
}
private int id;
private int calculatedForce;
private ParticleState particleState = ParticleState.POSITION_UPDATED;
private List<Particle> allTheParticles;
public Particle(int id, List<Particle> allTheParticles) {
this.id = id;
this.allTheParticles = allTheParticles;
}
private void calculateNetForce() {
System.out.println("calculation in " + id);
String someIntenseOperation = "";
for (int i = 0; i < 10000; i++) {
someIntenseOperation += allTheParticles.size();
}
calculatedForce = 0;
particleState = ParticleState.FORCE_CALCULATED;
}
private void updatePosition() {
System.out.println("updating position of " + id);
particleState = ParticleState.POSITION_UPDATED;
}
@Override
public Void call() throws Exception {
switch (particleState) {
case FORCE_CALCULATED:
updatePosition();
break;
case POSITION_UPDATED:
calculateNetForce();
break;
}
return null;
}
public static void main(String[] args) throws InterruptedException {
final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
final List<Particle> allTheParticles = new ArrayList<>();
for (int i = 0; i < 20; i++) {
allTheParticles.add(new Particle(i, allTheParticles));
}
while (true) {
executor.invokeAll(allTheParticles);
executor.invokeAll(allTheParticles);
}
}
}
Ответ 7
Так как при проверке столкновений обычно принимают n ^ 2 вычислений, деление пространства - хорошая идея. хотя это будет по существу проблемой O (n ^ 2).
Эта проблема плохо претендует на приближающуюся матрицу (но посмотрите Parallel computing, чтобы узнать лучшие идеи, чтобы справиться с ней)
Вы можете использовать некоторые методы, обозначенные здесь:
Эффективный способ моделирования многих столкновений частиц?
Пожалуйста, обратите внимание, что должна быть плохая идея использовать модель актера, потому что потоки будут проблематичными после определенного номера.
В настоящее время Java OpenCL lib (ex: Aparapi), а Java 9 должна открывать openCL с проектом Sumatra. Таким образом, вы можете использовать Fork и Join lib, а JVM будет использовать OpenCL под капотом.