Ответ 1
TL;DR: Единственный надежный способ переигрывать неудачную партию событий в IEventProcessor.ProcessEventsAsync
является - Shutdown
EventProcessorHost
(ака EPH
) сразу - либо с помощью eph.UnregisterEventProcessorAsync()
или завершения процесса - в зависимости от ситуации, Это позволит другим экземплярам EPH
получить аренду для этого раздела и начать с предыдущей контрольной точки.
Прежде чем объяснить это - я хочу сказать, что это отличный вопрос, и действительно, это был один из самых сложных вариантов дизайна, который мы должны были сделать для EPH
. На мой взгляд, это был компромисс ч/б: usability
/supportability
структуры EPH
, а не Technical-Correctness
.
Идеальная ситуация была бы такой: когда пользовательский код в IEventProcessorImpl.ProcessEventsAsync
создает исключение - библиотека EPH
не должна его перехватывать. Это должно было позволить это Exception
- сбой процесса, и crash-dump
ясно показывает, что callstack
. Я до сих пор верю - это самое technically-correct
решение.
Текущая ситуация: контракт IEventProcessorImpl.ProcessEventsAsync
API & EPH
:
- до тех пор, пока
EventData
может быть получен от службы EventHubs - продолжайте вызывать пользовательский обратный вызов (IEventProcessorImplementation.ProcessEventsAsync
) сEventData's
и если пользовательский обратный вызов выдает ошибки при вызове, уведомитеEventProcessorOptions.ExceptionReceived
. - Код пользователя внутри
IEventProcessorImpl.ProcessEventsAsync
должен обрабатывать все ошибки и включать в себяRetry's
мере необходимости.EPH
не устанавливает никакого тайм-аута для этого обратного вызова, чтобы предоставить пользователям полный контроль над временем обработки. - Если конкретное событие является причиной проблемы - пометьте
EventData
специальным свойством - для ex: type =poison-event
и повторно отправьте в тот жеEventHub
(EventHub
указатель на фактическое событие, скопируйте этиEventData.Offset
иSequenceNumber
в NewEventData.ApplicationProperties
) или перенаправьте его в очередь SERVICEBUS или сохраните в другом месте, в основном, идентифицируйте и отложите обработку ядовитого события. - если вы обработали все возможные случаи и по-прежнему сталкиваетесь с
Exceptions
- catch'em & shutdownEPH
илиfailfast
процесса с этим исключением. КогдаEPH
возвращается - он начнёт с того места, где его оставили.
Почему проверка "старого события" НЕ работает (прочитайте это, чтобы понять EPH
в целом):
За кулисами EPH
запускает насос для каждого получателя раздела EventHub Consumergroup - работа которого заключается в том, чтобы запустить получатель с заданной checkpoint
(если он есть) и создать выделенный экземпляр реализации IEventProcessor
а затем receive
из назначенного раздела EventHub из указанного Offset
в контрольной точке (если не присутствует - EventProcessorOptions.initialOffsetProvider
) и в конечном итоге вызвать IEventProcessorImpl.ProcessEventsAsync
. Цель Checkpoint
- обеспечить надежный запуск обработки сообщений, когда процесс EPH
завершает работу и владелец раздела перемещается в другие экземпляры EPH
. Таким образом, checkpoint
будет потребляться только при запуске НАСОСА и НЕ будет считываться после запуска насоса.
Пока я пишу это, EPH
находится на версии 2.2.10.