Отправить запись и дождаться ее подтверждения
Я использую ниже класс для отправки данных в нашу очередь обмена сообщениями, используя сокет либо синхронно, либо асинхронно, как показано ниже.
-
sendAsync
- Он отправляет данные асинхронно без какого-либо таймаута. После отправки (on LINE A)
он добавляет в ведро retryHolder
так, что, если подтверждение не получено, оно снова повторится из фонового потока, который запускается в конструкторе.
-
send
- он внутренне вызывает метод sendAsync
, а затем выполняет спящий режим для определенного периода ожидания и, если подтверждение не получено, оно удаляется из ведра retryHolder
, чтобы мы не повторили попытку.
Таким образом, единственное различие между этими двумя вышеописанными методами: - Для async мне нужно выполнить любую попытку, но для синхронизации мне не нужно повторять попытку, но похоже, что это может быть повторено, поскольку мы используем один и тот же кеш кеша повтора и поток повторений выполняется каждые 1 секунду.
ResponsePoller
- это класс, который получает подтверждение для данных, которые были отправлены в нашу очередь сообщений, а затем вызывает метод removeFromretryHolder
ниже, чтобы удалить адрес, чтобы мы не повторили попытку после получения подтверждения.
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
private final Cache<Long, byte[]> retryHolder =
CacheBuilder
.newBuilder()
.maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(
RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// retry again
for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
sendAsync(entry.getKey(), entry.getValue());
}
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedRecords);
// send data on a socket LINE A
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(address, encodedRecords);
return sent;
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// if key is not present, then acknowledgement was received successfully
sent = !retryHolder.asMap().containsKey(address);
// and key is still present in the cache, then it means acknowledgment was not received after
// waiting for timeout period, so we will remove it from cache.
if (!sent)
removeFromretryHolder(address);
return sent;
}
public void removeFromretryHolder(final long address) {
retryHolder.invalidate(address);
}
}
Каков наилучший способ, с помощью которого мы не пытаемся повторить попытку, если кто-либо вызывает метод send
, но нам все равно нужно знать, было ли получено подтверждение или нет. Единственное, что мне не нужно вообще повторять.
Нужна ли нам отдельная ведро для всех вызовов синхронизации только для подтверждения, и мы не возвращаемся из этого ведра?
Ответы
Ответ 1
Код имеет ряд потенциальных проблем:
- Ответ может быть получен до вызова
retryHolder#put
.
- Возможно, есть условие гонки, когда сообщения также повторяются.
- Если два сообщения отправляются на один и тот же адрес, второй перезаписывает первый?
- Отправить всегда тратит время на сон, вместо этого используйте
wait
+ notify
.
Я бы сохранил класс с большим количеством состояний. Он может содержать флаг (retryIfNoAnswer
yes/no), который может проверить обработчик повтора. Он может предоставить методы waitForAnswer
/markAnswerReceived
, используя wait
/notify
, чтобы отправить не нужно спать в течение фиксированного времени. Метод waitForAnswer
может возвращать значение true, если был получен ответ и false при тайм-ауте. Поместите объект в обработчик повтора перед отправкой и используйте временную метку, чтобы повторить только сообщения старше определенного возраста. Это фиксирует первое состояние гонки.
EDIT: обновленный пример кода ниже, компилируется с кодом, а не тестируется:
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
// Not sure why you are using a cache rather than a standard ConcurrentHashMap?
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class PendingMessage {
private final long _address;
private final byte[] _encodedRecords;
private final Socket _socket;
private final boolean _retryEnabled;
private final Object _monitor = new Object();
private long _sendTimeMillis;
private volatile boolean _acknowledged;
public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
_address = address;
_sendTimeMillis = System.currentTimeMillis();
_encodedRecords = encodedRecords;
_socket = socket;
_retryEnabled = retryEnabled;
}
public synchronized boolean hasExpired() {
return System.currentTimeMillis() - _sendTimeMillis > 500L;
}
public synchronized void markResent() {
_sendTimeMillis = System.currentTimeMillis();
}
public boolean shouldRetry() {
return _retryEnabled && !_acknowledged;
}
public boolean waitForAck() {
try {
synchronized(_monitor) {
_monitor.wait(500L);
}
return _acknowledged;
}
catch (InterruptedException e) {
return false;
}
}
public void ackReceived() {
_acknowledged = true;
synchronized(_monitor) {
_monitor.notifyAll();
}
}
public long getAddress() {
return _address;
}
public byte[] getEncodedRecords() {
return _encodedRecords;
}
public Socket getSocket() {
return _socket;
}
}
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private void handleRetries() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage m : messages) {
if (m.hasExpired()) {
if (m.shouldRetry()) {
m.markResent();
doSendAsync(m, m.getSocket());
}
else {
// Or leave the message and let send remove it
cache.invalidate(m.getAddress());
}
}
}
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetries();
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
cache.put(address, m);
return doSendAsync(m, socket);
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(socket);
}
finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
}
finally {
// Alternatively (checks that address points to m):
// cache.asMap().remove(address, m);
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage m = cache.getIfPresent(address);
if (m != null) {
m.ackReceived();
cache.invalidate(address);
}
}
}
И вызывается из ResponsePoller
:
SendToQueue.getInstance().handleAckReceived(addressFrom);
Ответ 2
Дизайн: я чувствую, что вы пытаетесь написать потокобезопасный и несколько эффективный отправитель/получатель сообщений NIO, но (оба) кода, которые я вижу здесь, не в порядке и не будут иметь существенных изменений. Лучше всего сделать это:
- полностью использовать структуру
0MQ
. Здесь я вижу вещи и ожидания, которые действительно доступны из коробки в ZMQ
и java.util.concurrent
API.
- или посмотрите
Netty
(https://netty.io/index.html), если это применимо к вашему проекту. "Netty - это асинхронная инфраструктура сетевого приложения, управляемая событиями
для быстрой разработки поддерживаемых высокопроизводительных серверов протоколов и клиентов ". Это сэкономит ваше время, если ваш проект станет сложным, иначе он может быть переполнен для начала (но потом ожидать проблем...).
Однако, если вы думаете, что находитесь почти у вас с кодом или кодом @john, я просто дам советы:
- не используйте
wait()
и notify()
. Не используйте sleep()
.
- используйте один поток для вашего "отслеживателя потока" (т.е. ~ кэша ожидающего сообщения).
На самом деле вам не нужны 3 потока для обработки ожидающих сообщений, кроме случаев, когда эта обработка выполняется медленно (или делает тяжелые вещи), что здесь не так, поскольку вы в основном делаете асинхронный вызов (насколько это реально асинхронно). это?).
То же самое для обратного пути: используйте службу-исполнитель (несколько потоков) для обработки полученных пакетов, только если фактическая обработка медленная/блокирующая или тяжелая.
Я не эксперт в 0MQ
, но насколько socket.send(...)
является потокобезопасным и неблокирующим (что я не уверен лично - скажите мне), приведенные выше советы должны быть правильными и сделать вещи проще.
Тем не менее, чтобы строго ответить на ваш вопрос:
Нужна ли нам отдельная ведро для всех вызовов синхронизации только для подтверждения, и мы не возвращаемся из этого ведра?
Я бы сказал нет, следовательно, что вы думаете о следующем? Основываясь на вашем коде и независимо от моих собственных чувств, это кажется приемлемым:
public class SendToQueue {
// ...
private final Map<Long, Boolean> transactions = new ConcurrentHashMap<>();
// ...
private void startTransaction(long address) {
this.transactions.put(address, Boolean.FALSE);
}
public void updateTransaction(long address) {
Boolean state = this.transactions.get(address);
if (state != null) {
this.transactions.put(address, Boolean.TRUE);
}
}
private void clearTransaction(long address) {
this.transactions.remove(address);
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean success = false;
// If address is enough randomized or atomically counted (then ok for parallel send())
startTransaction(address);
try {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
// wait for acknowledgement
success = waitDoneUntil(new DoneCondition() {
@Override
public boolean isDone() {
return SendToQueue.this.transactions.get(address); // no NPE
}
}, 500, TimeUnit.MILLISECONDS);
if (success) {
// Message acknowledged!
}
}
} finally {
clearTransaction(address);
}
return success;
}
public static interface DoneCondition {
public boolean isDone();
}
/**
* WaitDoneUntil(Future f, int duration, TimeUnit unit). Note: includes a
* sleep(50).
*
* @param f Will block for this future done until maxWaitMillis
* @param waitTime Duration expressed in (time) unit.
* @param unit Time unit.
* @return DoneCondition finally met or not
*/
public static boolean waitDoneUntil(DoneCondition f, int waitTime, TimeUnit unit) {
long curMillis = 0;
long maxWaitMillis = unit.toMillis(waitTime);
while (!f.isDone() && curMillis < maxWaitMillis) {
try {
Thread.sleep(50); // define your step here accordingly or set as parameter
} catch (InterruptedException ex1) {
//logger.debug("waitDoneUntil() interrupted.");
break;
}
curMillis += 50L;
}
return f.isDone();
}
//...
}
public class ResponsePoller {
//...
public void onReceive(long address) { // sample prototype
// ...
SendToQueue.getInstance().updateTransaction(address);
// The interested sender will know that its transaction is complete.
// While subsequent (late) calls will have no effect.
}
}