Является ли этот клиент JAX-WS безопасным потоком?

Поскольку инициализация клиентской службы WS и порта занимает много времени, мне нравится инициализировать их один раз при запуске и повторно использовать один и тот же экземпляр порта. Инициатизация будет выглядеть примерно так:

private static RequestContext requestContext = null;

static
{
    MyService service = new MyService(); 
    MyPort myPort = service.getMyServicePort(); 

    Map<String, Object> requestContextMap = ((BindingProvider) myPort).getRequestContext();
    requestContextMap = ((BindingProvider)myPort).getRequestContext(); 
    requestContextMap.put(BindingProvider.USERNAME_PROPERTY, uName); 
    requestContextMap.put(BindingProvider.PASSWORD_PROPERTY, pWord); 

    rc = new RequestContext();
    rc.setApplication("test");
    rc.setUserId("test");
}

Вызов где-то в моем классе:

myPort.someFunctionCall(requestContext, "someValue");

Мой вопрос: будет ли этот вызов быть потокобезопасным?

Джонни

Ответы

Ответ 1

В соответствии с часто задаваемые вопросы CXF:

Являются ли прокси-серверы JAX-WS потоками безопасными?

Официальный ответ JAX-WS: Нет. Согласно спецификации JAX-WS, клиентские прокси НЕ НЕВОЗМОЖНЫ. Чтобы написать переносимый код, вы должны рассматривать их как непоточные и синхронизировать доступ или использовать пул экземпляров или аналогичный.

Ответ CXF: Прокси CXF являются потокобезопасными для МНОГО вариантов использования. исключения:

  • Использование ((BindingProvider)proxy).getRequestContext() - для спецификации JAX-WS, контекст запроса - PER INSTANCE. Таким образом, все, что там установлено, будет влиять на запросы других потоков. С помощью CXF вы можете:

    ((BindingProvider)proxy).getRequestContext().put("thread.local.request.context","true");
    

    и будущие вызовы getRequestContext() будут использовать поток локальный контекст запроса. Это позволяет использовать контекст запроса поточно. (Примечание: контекст ответа всегда является локальным потоком в CXF)

  • Настройки на канале - если вы используете код или конфигурацию напрямую манипулировать каналом (например, устанавливать настройки TLS или аналогичные), те не являются потокобезопасными. Канал предназначен для каждого экземпляра, и, следовательно, эти настройки будут разделены. Кроме того, если вы используете FailoverFeature и LoadBalanceFeatures, трубопровод заменяется на лету. Таким образом, настройки, установленные на кабелепроводе, могут потеряться, прежде чем использоваться на установочный поток.

  • Поддержка сеанса - если вы включите поддержку сеансов (см. jaxws spec), файл cookie сеанса хранится в кабелепроводе. Таким образом, это будут попадать в вышеуказанные правила по настройкам кабелепровода и, таким образом, будут использоваться совместно через потоки.
  • Тоны WS-Security - если используется WS-SecureConversation или WS-Trust, извлеченный токен кэшируется в конечной точке/прокси, чтобы избежать дополнительные (и дорогие) звонки в STS для получения жетонов. Таким образом, несколько потоков будут разделять токен. Если каждый поток имеет разные учетных данных или требований безопасности, вам необходимо использовать отдельный прокси-сервер экземпляров.

Для проблем с кабелепроводом вы МОЖЕТЕ установить новую ConduitSelector, который использует поток локальный или похожий. Это немного сложный, хотя.

Для большинства "простых" случаев использования вы можете использовать прокси CXF на нескольких потоки. Вышеописанные обходные пути для других.

Ответ 2

В общем, нет.

В соответствии с CXF FAQ http://cxf.apache.org/faq.html#FAQ-AreJAX-WSclientproxiesthreadsafe?

Официальный ответ JAX-WS: Нет. Согласно спецификации JAX-WS, клиент прокси не являются потокобезопасными. Чтобы написать переносимый код, вы должны лечить их как не-потокобезопасные и синхронизировать доступ или использовать пул экземпляры или аналогичные.

Ответ CXF: Прокси CXF являются потокобезопасными для МНОГО вариантов использования.

Список исключений см. в разделе "Вопросы и ответы".

Ответ 3

Как видно из вышеприведенных ответов, прокси-серверы JAX-WS не являются потокобезопасными, поэтому я просто хотел поделиться своей реализацией, чтобы другие кэшировали клиентские прокси. Я действительно столкнулся с той же проблемой и решил создать spring bean, который выполняет кэширование прокси-серверов JAX-WS Client. Вы можете увидеть более подробную информацию http://programtalk.com/java/using-spring-and-scheduler-to-store/

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Component;

/**
 * This keeps the cache of MAX_CUNCURRENT_THREADS number of
 * appConnections and tries to shares them equally amongst the threads. All the
 * connections are created right at the start and if an error occurs then the
 * cache is created again.
 *
 */
/*
 *
 * Are JAX-WS client proxies thread safe? <br/> According to the JAX-WS spec,
 * the client proxies are NOT thread safe. To write portable code, you should
 * treat them as non-thread safe and synchronize access or use a pool of
 * instances or similar.
 *
 */
@Component
public class AppConnectionCache {

 private static final Logger logger = org.apache.logging.log4j.LogManager.getLogger(AppConnectionCache.class);

 private final Map<Integer, MyService> connectionCache = new ConcurrentHashMap<Integer, MyService>();

 private int cachedConnectionId = 1;

 private static final int MAX_CUNCURRENT_THREADS = 20;

 private ScheduledExecutorService scheduler;

 private boolean forceRecaching = true; // first time cache

 @PostConstruct
 public void init() {
  logger.info("starting appConnectionCache");
  logger.info("start caching connections"); ;;
  BasicThreadFactory factory = new BasicThreadFactory.Builder()
    .namingPattern("appconnectioncache-scheduler-thread-%d").build();
  scheduler = Executors.newScheduledThreadPool(1, factory);

  scheduler.scheduleAtFixedRate(new Runnable() {
   @Override
   public void run() {
    initializeCache();
   }

  }, 0, 10, TimeUnit.MINUTES);

 }

 public void destroy() {
  scheduler.shutdownNow();
 }

 private void initializeCache() {
  if (!forceRecaching) {
   return;
  }
  try {
   loadCache();
   forceRecaching = false; // this flag is used for initializing
   logger.info("connections creation finished successfully!");
  } catch (MyAppException e) {
   logger.error("error while initializing the cache");
  }
 }

 private void loadCache() throws MyAppException {
  logger.info("create and cache appservice connections");
  for (int i = 0; i < MAX_CUNCURRENT_THREADS; i++) {
   tryConnect(i, true);
  }
 }

 public MyPort getMyPort() throws MyAppException {
  if (cachedConnectionId++ == MAX_CUNCURRENT_THREADS) {
   cachedConnectionId = 1;
  }
  return tryConnect(cachedConnectionId, forceRecaching);
 }

 private MyPort tryConnect(int threadNum, boolean forceConnect) throws MyAppException {
  boolean connect = true;
  int tryNum = 0;
  MyPort app = null;
  while (connect && !Thread.currentThread().isInterrupted()) {
   try {
    app = doConnect(threadNum, forceConnect);
    connect = false;
   } catch (Exception e) {
    tryNum = tryReconnect(tryNum, e);
   }
  }
  return app;
 }

 private int tryReconnect(int tryNum, Exception e) throws MyAppException {
  logger.warn(Thread.currentThread().getName() + " appservice service not available! : " + e);
  // try 10 times, if
  if (tryNum++ < 10) {
   try {
    logger.warn(Thread.currentThread().getName() + " wait 1 second");
    Thread.sleep(1000);
   } catch (InterruptedException f) {
    // restore interrupt
    Thread.currentThread().interrupt();
   }
  } else {
   logger.warn(" appservice could not connect, number of times tried: " + (tryNum - 1));
   this.forceRecaching = true;
   throw new MyAppException(e);
  }
  logger.info(" try reconnect number: " + tryNum);
  return tryNum;
 }

 private MyPort doConnect(int threadNum, boolean forceConnect) throws InterruptedException {
  MyService service = connectionCache.get(threadNum);
  if (service == null || forceConnect) {
   logger.info("app service connects : " + (threadNum + 1) );
   service = new MyService();
   connectionCache.put(threadNum, service);
   logger.info("connect done for " + (threadNum + 1));
  }
  return service.getAppPort();
 }
}