Отпустите разъем обратно в зависимости от того, живут ли они или нет в своем собственном пуле?

Я использую ниже класс для отправки данных в нашу очередь сообщений, используя сокет либо синхронно, либо асинхронно, как показано ниже. Это зависит от требования, хочу ли я называть синхронный или асинхронный метод для отправки данных в сокет. В большинстве случаев мы будем отправлять данные асинхронно, но иногда мне может понадобиться синхронно отправлять данные.

  • sendAsync - Он отправляет данные асинхронно, и мы не блокируем поток, который отправляет данные. Если подтверждение не получено, оно будет снова повторяться из фонового потока, который запускается только в конструкторе SendToQueue.
  • send - Он отправляет данные синхронно в сокете. Он внутренне вызывает метод doSendAsync, а затем выполняет спящий режим для определенного периода ожидания и, если подтверждение не получено, оно удаляет из ведра cache, чтобы мы не повторили попытку.

Таким образом, единственное различие между этими двумя вышеописанными методами: - Для случая async мне нужно выполнить любую попытку, если подтверждение не получено, но для синхронизации мне вообще не нужно повторять попытку, и поэтому я сохраняю больше состояний в классе PendingMessage.

ResponsePoller - это класс, который получает подтверждение для данных, которые были отправлены в нашу очередь сообщений в конкретном сокете, а затем вызывает метод handleAckReceived ниже, чтобы удалить адрес, чтобы мы не повторили попытку после получения подтверждения, Если подтверждение получено, то сокет является живым, иначе он мертв.

public class SendToQueue {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder()
          .maximumSize(1000000)
          .concurrencyLevel(100)
          .build();

  private static class PendingMessage {
    private final long _address;
    private final byte[] _encodedRecords;
    private final boolean _retryEnabled;
    private final Object _monitor = new Object();
    private long _sendTimeMillis;
    private volatile boolean _acknowledged;

    public PendingMessage(long address, byte[] encodedRecords, boolean retryEnabled) {
      _address = address;
      _sendTimeMillis = System.currentTimeMillis();
      _encodedRecords = encodedRecords;
      _retryEnabled = retryEnabled;
    }

    public synchronized boolean hasExpired() {
      return System.currentTimeMillis() - _sendTimeMillis > 500L;
    }

    public synchronized void markResent() {
      _sendTimeMillis = System.currentTimeMillis();
    }

    public boolean shouldRetry() {
      return _retryEnabled && !_acknowledged;
    }

    public boolean waitForAck() {
      try {
        synchronized (_monitor) {
          _monitor.wait(500L);
        }
        return _acknowledged;
      } catch (InterruptedException ie) {
        return false;
      }
    }

    public void ackReceived() {
      _acknowledged = true;
      synchronized (_monitor) {
        _monitor.notifyAll();
      }
    }

    public long getAddress() {
      return _address;
    }

    public byte[] getEncodedRecords() {
      return _encodedRecords;
    }
  }

  private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
  }

  public static SendToQueue getInstance() {
    return Holder.INSTANCE;
  }

  private void handleRetries() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage m : messages) {
      if (m.hasExpired()) {
        if (m.shouldRetry()) {
          m.markResent();
          doSendAsync(m, Optional.<Socket>absent());
        } else {
          cache.invalidate(m.getAddress());
        }
      }
    }
  }

  private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgment
                                                  // and then delete entry from the cache
                                                  // accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetries();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m, Optional.<Socket>absent());
  }

  private boolean doSendAsync(final PendingMessage pendingMessage, final Optional<Socket> socket) {
    Optional<Socket> actualSocket = socket;
    if (!actualSocket.isPresent()) {
      SocketHolder liveSocket = SocketManager.getInstance().getSocket();
      actualSocket = Optional.of(liveSocket.getSocket());
    }

    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      return msg.send(actualSocket.get());
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords) {
    return send(address, encodedRecords, Optional.<Socket>absent());
  }

  public boolean send(final long address, final byte[] encodedRecords,
      final Optional<Socket> socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }   

  // called by acknowledgment thread which is in "ResponsePoller" class
  public void handleAckReceived(final long address) {
    PendingMessage m = cache.getIfPresent(address);
    if (m != null) {
      m.ackReceived();
      cache.invalidate(address);
    }
  }
}

Поскольку я отправляю данные по сокету и , если я получаю подтверждение подтверждения для тех же данных, значит, Socket жив, но , если данные не подтверждаются, тогда это означает, что сокет мертв (но я буду продолжать повторную попытку отправки данных).

Итак, с моим дизайном (или если есть лучший способ), как я могу выяснить, мертв или жив какой-либо сокет, потому что либо подтверждение не было получено, либо получено от этого сокета и основы, на котором мне нужно отпустите сокет обратно в свой пул (независимо от того, жив он или нет), вызывая метод ниже в зависимости от того, получено подтверждение или нет для синхронизации или асинхронного случая.

Мне также нужно настроить счетчик, если подтверждение не получено в конкретном сокете для x (где x - число > 0, значение по умолчанию должно быть 2) раза, а затем только отметьте сокет мертвым. Каков наилучший и эффективный способ сделать это?

SocketManager.getInstance().releaseSocket(socket, SocketState.LIVE);
SocketManager.getInstance().releaseSocket(socket, SocketState.DEAD);

Ответы

Ответ 1

Скажите, что вы дома, у вас есть кабель, подключенный к вашему ноутбуку, и маршрутизатор, а за маршрутизатором - кабельный модем. Если вы выключите маршрутизатор - ваш ноутбук будет знать, что - нет напряжения. Если вы отключите свой модем... это сложно. Вы просто не можете этого знать. Одна из потенциальных проблем - это не путь к размещению. Но даже если вы подключены, это может быть любая другая проблема. Некоторые протоколы, такие как ssh, имеют встроенный ping, поэтому они поддерживают связь. Если ваше приложение ничего не делает с каждым интервалом, есть пинг-понг между клиентом и сервером, поэтому вы знаете, жив ли он.

Если у вас есть полный контроль над протоколом, одним из вариантов является keep-alive.

Ваш клиент находится на одном конце, но в целом очень сложно, чтобы две стороны были уверены, что у них есть соглашение. Проблема византийских генералов описывает то, что является общей сетевой моделью, где каждый node не знает ни о каком другом, и может доверять только тому, что известно из.

В общем, я не стал бы писать распределенную систему сам. Я использую Hystrix. Ссылка на их конфигурацию, поэтому вы можете видеть, насколько она велика. Вы можете отслеживать, работает ли сервер, или нет. Также, когда вы вернетесь назад, вы можете подготовить политику, чтобы понять ее, а не наводнять ее, отменить устаревшие сообщения и многое другое - графики, статистику, интеграцию с другими решениями. Существует большое сообщество, использующее его, и решение проблем. Это намного лучший вариант, чем делать это самостоятельно.

Не уверен, что у вас есть только одна служба, с которой можно поговорить, или если Hystrix для вас хорошо. В Java люди склонны использовать слои, рамки, если они имеют проблемы... Надеюсь, что это поможет.