Возвращаемые значения из потоков Java
У меня есть Java-Thread, как показано ниже:
public class MyThread extends Thread {
MyService service;
String id;
public MyThread(String id) {
this.id = node;
}
public void run() {
User user = service.getUser(id)
}
}
У меня около 300 идентификаторов, и каждые пару секунд я запускаю потоки, чтобы сделать вызов для каждого из идентификаторов. Например.
for(String id: ids) {
MyThread thread = new MyThread(id);
thread.start();
}
Теперь я хотел бы собирать результаты из каждого потока и вносить пакетную вставку в базу данных вместо того, чтобы делать 300 вложений в базу каждые 2 секунды.
Любая идея, как я могу это сделать?
Ответы
Ответ 1
Если вы хотите собрать все результаты перед обновлением базы данных, вы можете использовать метод invokeAll
. Это заботится о бухгалтерии, которая потребуется, если вы отправляете задания по одному, например daveb.
private static final ExecutorService workers = Executors.newCachedThreadPool();
...
Collection<Callable<User>> tasks = new ArrayList<Callable<User>>();
for (final String id : ids) {
tasks.add(new Callable<User>()
{
public User call()
throws Exception
{
return svc.getUser(id);
}
});
}
/* invokeAll blocks until all service requests complete,
* or a max of 10 seconds. */
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS);
for (Future<User> f : results) {
User user = f.get();
/* Add user to batch update. */
...
}
/* Commit batch. */
...
Ответ 2
Канонический подход заключается в использовании Callable
и ExecutorService
. submit
ting a Callable
to ExecutorService
возвращает a (typesafe) Future
, из которого вы можете get
получить результат.
class TaskAsCallable implements Callable<Result> {
@Override
public Result call() {
return a new Result() // this is where the work is done.
}
}
ExecutorService executor = Executors.newFixedThreadPool(300);
Future<Result> task = executor.submit(new TaskAsCallable());
Result result = task.get(); // this blocks until result is ready
В вашем случае вы, вероятно, захотите использовать invokeAll
, который возвращает List
из Futures
, или создайте этот список самостоятельно, когда вы добавляете задачи к исполнителю. Чтобы собрать результаты, просто назовите get
на каждом из них.
Ответ 3
Сохраняйте результат в своем объекте. Когда он завершится, запустите его в синхронизированную коллекцию (приходит в голову синхронизированная очередь).
Когда вы хотите собрать свои результаты для отправки, возьмите все из очереди и прочитайте свои результаты с объектов. Возможно, вы даже знаете, что каждый объект может "отправить" свои собственные результаты в базу данных, таким образом могут быть представлены разные классы, и все они будут обрабатываться с помощью той же крошечной, элегантной петли.
В JDK есть много инструментов, чтобы помочь в этом, но это действительно легко, когда вы начинаете думать о своем потоке как об истинном объекте, а не просто о пуге дерьма вокруг метода "запустить". Как только вы начнете думать об объектах таким образом, программирование станет намного проще и более удовлетворительным.
Ответ 4
В Java8 есть лучший способ сделать это, используя CompletableFuture. Скажем, у нас есть класс, который получает идентификатор из базы данных, для простоты мы можем просто вернуть число, как показано ниже,
static class GenerateNumber implements Supplier<Integer>{
private final int number;
GenerateNumber(int number){
this.number = number;
}
@Override
public Integer get() {
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
e.printStackTrace();
}
return this.number;
}
}
Теперь мы можем добавить результат в параллельную коллекцию, когда результаты каждого будущего будут готовы.
Collection<Integer> results = new ConcurrentLinkedQueue<>();
int tasks = 10;
CompletableFuture<?>[] allFutures = new CompletableFuture[tasks];
for (int i = 0; i < tasks; i++) {
int temp = i;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()-> new GenerateNumber(temp).get(), executor);
allFutures[i] = future.thenAccept(results::add);
}
Теперь мы можем добавить обратный вызов, когда все фьючерсы готовы,
CompletableFuture.allOf(allFutures).thenAccept(c->{
System.out.println(results); // do something with result
});
Ответ 5
Вам нужно сохранить результат в виде одного синглтона. Это должно быть правильно синхронизировано.
EDIT. Я знаю, что это не лучший совет, так как это не хорошая идея для обработки raw Threads
. Но с учетом вопроса это будет работать, не так ли? Возможно, меня не поддержали, но почему голосует?
Ответ 6
Вы можете создать очередь или список, которые вы передаете тем потокам, которые вы создаете, потоки добавляют их результат в список, который очищается потребителем, который выполняет пакетную вставку.
Ответ 7
Самый простой способ - передать объект каждому потоку (по одному объекту в потоке), который будет содержать результат позже. Основной поток должен содержать ссылку на каждый объект результата. Когда все потоки соединены, вы можете использовать результаты.
Ответ 8
public class TopClass {
List<User> users = new ArrayList<User>();
void addUser(User user) {
synchronized(users) {
users.add(user);
}
}
void store() throws SQLException {
//storing code goes here
}
class MyThread extends Thread {
MyService service;
String id;
public MyThread(String id) {
this.id = node;
}
public void run() {
User user = service.getUser(id)
addUser(user);
}
}
}
Ответ 9
Вы можете создать класс, который расширяет Observable. Затем ваш поток может вызывать метод в классе Observable, который уведомляет любые классы, зарегистрированные в этом наблюдателе, вызывая Observable.notifyObservers(Object).
Класс наблюдения будет реализовывать Observer и регистрироваться в Observable. Затем вы реализуете метод обновления (Observable, Object), который вызывается при вызове Observerable.notifyObservers(Object).