Как отправить несколько асинхронных запросов на различные веб-службы?
Мне нужно отправить несколько запросов на множество различных веб-сервисов и получить результаты. Проблема в том, что если я отправляю запросы один за другим, это занимает столько времени, сколько мне нужно отправлять и обрабатывать отдельно.
Мне интересно, как я могу сразу отправить все запросы и получить результаты.
Как показано в следующем коде, у меня есть три основных метода, каждый из которых имеет свои собственные вспомогательные методы.
Каждый вспомогательный метод отправляет запрос на связанный с ним веб-сервис и получает результаты, поэтому, например, для получения результатов веб-службы 9 я должен дождаться завершения всех веб-сервисов от 1 до 8, для отправки требуется много времени все запросы один за другим и получать их результаты.
Как показалось, ни один из методов и под-методов не связан друг с другом, поэтому я могу вызывать их всех и получать их результаты в любом порядке, единственное, что важно, - это получать результаты каждого под-метода и заполнить связанные списки.
private List<StudentsResults> studentsResults = new ArrayList();
private List<DoctorsResults> doctorsResults = new ArrayList();
private List<PatientsResults> patientsResults = new ArrayList();
main (){
retrieveAllLists();
}
retrieveAllLists(){
retrieveStudents();
retrieveDoctors();
retrievePatients();
}
retrieveStudents(){
this.studentsResults = retrieveStdWS1(); //send request to Web Service 1 to receive its list of students
this.studentsResults = retrieveStdWS2(); //send request to Web Service 2 to receive its list of students
this.studentsResults = retrieveStdWS3(); //send request to Web Service 3 to receive its list of students
}
retrieveDoctors(){
this.doctorsResults = retrieveDocWS4(); //send request to Web Service 4 to receive its list of doctors
this.doctorsResults = retrieveDocWS5(); //send request to Web Service 5 to receive its list of doctors
this.doctorsResults = retrieveDocWS6(); //send request to Web Service 6 to receive its list of doctors
}
retrievePatients(){
this.patientsResults = retrievePtWS7(); //send request to Web Service 7 to receive its list of patients
this.patientsResults = retrievePtWS8(); //send request to Web Service 8 to receive its list of patients
this.patientsResults = retrievePtWS9(); //send request to Web Service 9 to receive its list of patients
}
Ответы
Ответ 1
Это простой подход, основанный на вилке, но для ясности вы можете начать любое количество потоков и получать результаты позже, когда они доступны, например, этот подход.
ExecutorService pool = Executors.newFixedThreadPool(10);
List<Callable<String>> tasks = new ArrayList<>();
tasks.add(new Callable<String>() {
public String call() throws Exception {
Thread.sleep((new Random().nextInt(5000)) + 500);
return "Hello world";
}
});
List<Future<String>> results = pool.invokeAll(tasks);
for (Future<String> future : results) {
System.out.println(future.get());
}
pool.shutdown();
UPDATE, COMPLETE:
Здесь многословное, но работоспособное решение. Я написал его ad hoc и не скомпилировал его.
Учитывая, что три списка имеют разные типы, а методы WS индивидуальны, это не
действительно модульные, но старайтесь использовать свои лучшие навыки программирования и посмотреть, сможете ли вы его немного улучшить.
ExecutorService pool = Executors.newFixedThreadPool(10);
List<Callable<List<StudentsResults>>> stasks = new ArrayList<>();
List<Callable<List<DoctorsResults>>> dtasks = new ArrayList<>();
List<Callable<List<PatientsResults>>> ptasks = new ArrayList<>();
stasks.add(new Callable<List<StudentsResults>>() {
public List<StudentsResults> call() throws Exception {
return retrieveStdWS1();
}
});
stasks.add(new Callable<List<StudentsResults>>() {
public List<StudentsResults> call() throws Exception {
return retrieveStdWS2();
}
});
stasks.add(new Callable<List<StudentsResults>>() {
public List<StudentsResults> call() throws Exception {
return retrieveStdWS3();
}
});
dtasks.add(new Callable<List<DoctorsResults>>() {
public List<DoctorsResults> call() throws Exception {
return retrieveDocWS4();
}
});
dtasks.add(new Callable<List<DoctorsResults>>() {
public List<DoctorsResults> call() throws Exception {
return retrieveDocWS5();
}
});
dtasks.add(new Callable<List<DoctorsResults>>() {
public List<DoctorsResults> call() throws Exception {
return retrieveDocWS6();
}
});
ptasks.add(new Callable<List<PatientsResults>>() {
public List<PatientsResults> call() throws Exception {
return retrievePtWS7();
}
});
ptasks.add(new Callable<List<PatientsResults>>() {
public List<PatientsResults> call() throws Exception {
return retrievePtWS8();
}
});
ptasks.add(new Callable<List<PatientsResults>>() {
public List<PatientsResults> call() throws Exception {
return retrievePtWS9();
}
});
List<Future<List<StudentsResults>>> sresults = pool.invokeAll(stasks);
List<Future<List<DoctorsResults>>> dresults = pool.invokeAll(dtasks);
List<Future<List<PatientsResults>>> presults = pool.invokeAll(ptasks);
for (Future<List<StudentsResults>> future : sresults) {
this.studentsResults.addAll(future.get());
}
for (Future<List<DoctorsResults>> future : dresults) {
this.doctorsResults.addAll(future.get());
}
for (Future<List<PatientsResults>> future : presults) {
this.patientsResults.addAll(future.get());
}
pool.shutdown();
Каждый Callable
возвращает список результатов и вызывается в отдельном потоке.
Когда вы вызываете метод Future.get()
, вы возвращаете результат обратно в основной поток.
Результатом является НЕ до тех пор, пока Callable
не закончится, поэтому проблем с concurrency нет.
Ответ 2
Итак, просто для удовольствия я предоставляю два рабочих примера. Первый показывает старую школьную возможность сделать это до java 1.5. Второй показывает гораздо более чистый способ использования инструментов, доступных в java 1.5:
import java.util.ArrayList;
public class ThreadingExample
{
private ArrayList <MyThread> myThreads;
public static class MyRunnable implements Runnable
{
private String data;
public String getData()
{
return data;
}
public void setData(String data)
{
this.data = data;
}
@Override
public void run()
{
}
}
public static class MyThread extends Thread
{
private MyRunnable myRunnable;
MyThread(MyRunnable runnable)
{
super(runnable);
setMyRunnable(runnable);
}
/**
* @return the myRunnable
*/
public MyRunnable getMyRunnable()
{
return myRunnable;
}
/**
* @param myRunnable the myRunnable to set
*/
public void setMyRunnable(MyRunnable myRunnable)
{
this.myRunnable = myRunnable;
}
}
public ThreadingExample()
{
myThreads = new ArrayList <MyThread> ();
}
public ArrayList <String> retrieveMyData ()
{
ArrayList <String> allmyData = new ArrayList <String> ();
if (isComplete() == false)
{
// Sadly we aren't done
return (null);
}
for (MyThread myThread : myThreads)
{
allmyData.add(myThread.getMyRunnable().getData());
}
return (allmyData);
}
private boolean isComplete()
{
boolean complete = true;
// wait for all of them to finish
for (MyThread x : myThreads)
{
if (x.isAlive())
{
complete = false;
break;
}
}
return (complete);
}
public void kickOffQueries()
{
myThreads.clear();
MyThread a = new MyThread(new MyRunnable()
{
@Override
public void run()
{
// This is where you make the call to external services
// giving the results to setData("");
setData("Data from list A");
}
});
myThreads.add(a);
MyThread b = new MyThread (new MyRunnable()
{
@Override
public void run()
{
// This is where you make the call to external services
// giving the results to setData("");
setData("Data from list B");
}
});
myThreads.add(b);
for (MyThread x : myThreads)
{
x.start();
}
boolean done = false;
while (done == false)
{
if (isComplete())
{
done = true;
}
else
{
// Sleep for 10 milliseconds
try
{
Thread.sleep(10);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}
public static void main(String [] args)
{
ThreadingExample example = new ThreadingExample();
example.kickOffQueries();
ArrayList <String> data = example.retrieveMyData();
if (data != null)
{
for (String s : data)
{
System.out.println (s);
}
}
}
}
Это гораздо более простая рабочая версия:
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ThreadingExample
{
public static void main(String [] args)
{
ExecutorService service = Executors.newCachedThreadPool();
Set <Callable<String>> callables = new HashSet <Callable<String>> ();
callables.add(new Callable<String>()
{
@Override
public String call() throws Exception
{
return "This is where I make the call to web service A, and put its results here";
}
});
callables.add(new Callable<String>()
{
@Override
public String call() throws Exception
{
return "This is where I make the call to web service B, and put its results here";
}
});
callables.add(new Callable<String>()
{
@Override
public String call() throws Exception
{
return "This is where I make the call to web service C, and put its results here";
}
});
try
{
List<Future<String>> futures = service.invokeAll(callables);
for (Future<String> future : futures)
{
System.out.println (future.get());
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
catch (ExecutionException e)
{
e.printStackTrace();
}
}
}
Ответ 3
Изучив проблему, вам необходимо интегрировать ваше приложение в 10+ различных веб-сервисов. В то же время, делая все вызовы асинхронными. Это можно сделать легко с Apache Camel. Это важная структура интеграции с предприятиями, а также поддерживает асинхронную обработку. Вы можете использовать свой компонент CXF для вызова веб-сервисов и механизма маршрутизации для получения результатов обработки и обработки. Посмотрите на страницу относительно возможности асинхронной маршрутизации верблюдов. Они также предоставили полный пример для вызова web-сервисов async с использованием CXF, он доступен на своем maven repo. Подробнее см. Ниже страница.
Ответ 4
Вы можете запросить реализацию jax-ws
для создания асинхронных привязок для веб-службы.
У этого есть два преимущества, которые я вижу:
- Как обсуждалось в Асинхронные вызовы веб-сервисов с JAX-WS: используйте поддержку wsimport для асинхронности или сворачивайте мои собственные?,
jax-ws
будет генерировать хорошо протестированные (и возможно, fancier) для вас, вам не нужно создавать экземпляр ExecutorService самостоятельно. Настолько меньше работы для вас! (но и меньший контроль над деталями реализации потоковой передачи).
- Сгенерированные привязки включают метод, в котором вы указываете обработчик обратного вызова, который может удовлетворить ваши потребности лучше, чем синхронно
get()
отображать все списки ответов в потоке, вызывающем retrieveAllLists()
. Он позволяет обрабатывать ошибки для каждого вызова и обрабатывать результаты параллельно, что хорошо, если обработка является нетривиальной.
Пример для Metro можно найти на сайте Metro. Обратите внимание на содержимое файла настраиваемых привязок custom-client.xml:
<bindings ...>
<bindings node="wsdl:definitions">
<enableAsyncMapping>true</enableAsyncMapping>
</bindings>
</bindings>
Когда вы укажете этот файл привязок на wsimport
, он будет генерировать клиент, который возвращает объект, который реализует javax.xml.ws.Response<T>
. Response
расширяет интерфейс Future
, который другие также рекомендуют использовать при развертывании собственной реализации.
Итак, неудивительно, что если вы идете без обратных вызовов, код будет похож на другие ответы:
public void retrieveAllLists() throws ExecutionException{
// first fire all requests
Response<List<StudentsResults>> students1 = ws1.getStudents();
Response<List<StudentsResults>> students2 = ws2.getStudents();
Response<List<StudentsResults>> students3 = ws3.getStudents();
Response<List<DoctorsResults>> doctors1 = ws4.getDoctors();
Response<List<DoctorsResults>> doctors2 = ws5.getDoctors();
Response<List<DoctorsResults>> doctors3 = ws6.getDoctors();
Response<List<PatientsResults>> patients1 = ws7.getPatients();
Response<List<PatientsResults>> patients2 = ws8.getPatients();
Response<List<PatientsResults>> patients3 = ws9.getPatients();
// then await and collect all the responses
studentsResults.addAll(students1.get());
studentsResults.addAll(students2.get());
studentsResults.addAll(students3.get());
doctorsResults.addAll(doctors1.get());
doctorsResults.addAll(doctors2.get());
doctorsResults.addAll(doctors3.get());
patientsResults.addAll(patients1.get());
patientsResults.addAll(patients2.get());
patientsResults.addAll(patients3.get());
}
Если вы создаете обработчиков обратного вызова, например
private class StudentsCallbackHandler
implements AsyncHandler<Response<List<StudentsResults>>> {
public void handleResponse(List<StudentsResults> response) {
try {
studentsResults.addAll(response.get());
} catch (ExecutionException e) {
errors.add(new CustomError("Failed to retrieve Students.", e.getCause()));
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
вы можете использовать их следующим образом:
public void retrieveAllLists() {
List<Future<?>> responses = new ArrayList<Future<?>>();
// fire all requests, specifying callback handlers
responses.add(ws1.getStudents(new StudentsCallbackHandler()));
responses.add(ws2.getStudents(new StudentsCallbackHandler()));
responses.add(ws3.getStudents(new StudentsCallbackHandler()));
...
// await completion
for( Future<?> response: responses ) {
response.get();
}
// or do some other work, and poll response.isDone()
}
Обратите внимание, что коллекция studentResults теперь должна быть потокобезопасной, так как результаты будут добавляться одновременно!
Ответ 5
Вы можете рассмотреть следующую парадигму, в которой вы создаете работу (последовательно), но фактическая работа выполняется параллельно. Один из способов сделать это: 1) создать "основное" создание очереди рабочих элементов; 2) создать объект "doWork", который запрашивает очередь для работы; 3) имеют "основное" начало некоторого количества потоков "doWork" (может быть того же числа, что и количество различных сервисов, или меньшее число); есть объекты "doWork", которые добавляют свои результаты в список объектов (любая конструкция работает Vector, list...).
Каждый объект "doWork" должен пометить свой элемент очереди, поместить все результаты в переданный контейнер и проверить новую работу (если не больше в очереди, он будет спать и повторить попытку).
Конечно, вам захочется увидеть, насколько хорошо вы можете построить свою модель класса. Если каждый из веб-сервисов сильно отличается для синтаксического анализа, тогда вы можете создать интерфейс, который будет реализован каждым из классов "retrieveinfo" promises.
Ответ 6
У этого есть различные варианты, чтобы развить это.
- JMS: качество обслуживания и управления, например. попытка повторной доставки, очередь мертвых сообщений, управление нагрузкой, масштабируемость, кластеризация, мониторинг и т.д.
- Просто используйте шаблон Observer для этого. Для более подробной информации OODesign и как решить проблему с продуктом и потребителем следуйте за этим Kodelog **