Ожидание бесконечно для сообщения, которое никогда не может прибыть
У меня есть Java-типизированный актер, который отвечает за логику фильтра/повтора на внешнем ресурсе, который может быть временно недоступен. Поля актера и общие методы:
public class MyActorImpl implements MyActor {
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait) {
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
}
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput) {
operation(opInput, DEFAULTWAIT);
}
// called internally
public void operation(OpInput opInput, long currentWait);
}
Актер имеет несколько операций, которые имеют более или менее ту же логику повтора/буфера; каждая операция имеет свои поля [op]Wait
и [op]Buffer
.
- Родительский оператор вызывает
void operation(OpInput opInput)
- Предыдущий метод вызывает
void operation(OpInput opInput, long currentWait)
с помощью DEFAULTWAIT
для второго параметра
- Если параметр
currentWait
не равен opWait
, тогда вход сохраняется в opBuffer
, иначе вход отправляется на внешний ресурс.
- Если внешний ресурс возвращает успех, то
opWait
устанавливается на DEFAULTWAIT
, а содержимое opBuffer
отправляется обратно через метод operation(opInput)
. Если внешний ресурс (или, скорее, сеть) возвращает ошибку, то я обновляю opWait = updateWait(opWait)
и планирую operation(opInput, opWait)
в планировщике системы действий с использованием задержки opWait
ms.
т.е. Я использую планировщик системы действий для реализации экспоненциального отсрочки; Я использую параметр currentWait
для идентификации сообщения, которое я пытаюсь выполнить, и буферизую другие сообщения, пока основное сообщение не будет успешно обработано внешним ресурсом.
Проблема заключается в том, что если запланированное сообщение operation(opInput, currentWait)
потеряно, я навсегда буферизую сообщения, потому что защита currentWait == opWait
не будет работать для всех других сообщений. Я мог бы использовать что-то вроде spring-retry для реализации экспоненциального отсрочка, но я не вижу способа объединить петли повторения операций, что означает, что я мог бы использовать один поток за цикл повтора (в то время как использование планировщика системы актеров не ставит гораздо большую нагрузку на систему).
Я ищу более отказоустойчивый способ реализовать буферизацию и экспоненциальную отсрочку на интерфейсе между актером и внешним ресурсом, не выделяя слишком много ресурсов для задачи.
Ответы
Ответ 1
Если я правильно понимаю вас, если единственная проблема заключается в потере запланированного сообщения, почему бы вам просто не использовать что-то вроде Надежного шаблона прокси-сервера для этого конкретное сообщение, а затем, если он терпит неудачу opWait = DEFAULTWAIT;
Есть кое-что о вашем коде, который я получаю, я не понимаю, что вы имеете в виду, когда говорите, что public void operation(OpInput opInput)
вызывается извне. Вы имеете в виду, что этот метод взаимодействует с сетью, которая использует ресурсы, которые иногда недоступны?
Если я могу предложить альтернативу. Из того, что я понимаю, ваша основная проблема заключается в том, что у вас есть ресурсы, которые иногда недоступны, поэтому у вас есть какой-то тип que/buffer, который вы реализуете с помощью какой-то логики ожидания, чтобы сообщение обрабатывалось, когда оно снова доступно, что, к сожалению, включает некоторые сообщения, которые могут быть потеряны и привести к бесконечному ожиданию. Я думаю, вы могли бы достичь того, что хотите, используя фьючерсы с таймаутами. Затем повторите попытку, если будущее не будет завершено за определенное количество времени, до трех повторных попыток. Вы даже можете настроить это время на основе нагрузки на сервер и сколько времени потребуется для завершения сообщения. Надеюсь, что это поможет.