Ответ 1
Один из способов сделать это - использовать другую тему, в которой вы будете нажимать все сообщения, которые нужно отложить. Если все задержанные сообщения должны обрабатываться после одной и той же временной задержки, это будет довольно прямолинейно:
while(it.hasNext()) {
val message = it.next().message()
if(shouldBeDelayed(message)) {
val delay = 24 hours
val delayTo = getCurrentTime() + delay
putMessageOnDelayedQueue(message, delay, delayTo)
}
else {
process(message)
}
consumer.commitOffset()
}
Теперь все обычные сообщения будут обработаны как можно скорее, а те, кому требуется задержка, будут добавлены в другую тему.
Хорошо, что мы знаем, что сообщение во главе отложенной темы - это тот, который должен быть обработан первым, так как значение delayTo будет самым маленьким. Поэтому мы можем настроить другого потребителя, который читает головное сообщение, проверяет, находится ли метка времени в прошлом и если она обрабатывает сообщение и фиксирует смещение. Если нет, он не фиксирует смещение и вместо этого просто спит до этого времени:
while(it.hasNext()) {
val delayedMessage = it.peek().message()
if(delayedMessage.delayTo < getCurrentTime()) {
val readMessage = it.next().message
process(readMessage.originalMessage)
consumer.commitOffset()
} else {
delayProcessingUntil(delayedMessage.delayTo)
}
}
В случае разного времени задержки вы можете разделить тему на задержку (например, 24 часа, 12 часов, 6 часов). Если время задержки более динамично, чем это становится немного сложнее. Вы можете решить эту проблему, представив две темы задержки. Прочитайте все сообщения с задержкой A
и обработайте все сообщения, значения которых delayTo
в прошлом. Среди других вы просто найдете ту, которая находится ближе всего к delayTo
, а затем помещайте их в тему B
. Сон, пока ближайший не будет обработан, и сделайте все наоборот, т.е. Обработайте сообщения из темы B
и поставьте один раз, который еще не должен быть возвращен на тему A
.
Чтобы ответить на ваши конкретные вопросы (некоторые из них были рассмотрены в комментариях к вашему вопросу)
- фиксация каждого смещения может замедлить ZK вниз
Вы можете рассмотреть возможность переключения на сохранение смещения в Kafka (функция, доступная из 0.8.2, проверить свойство offsets.storage
в потребительской конфигурации)
- Может ли user.commitOffset выдавать исключение? если да, я буду использовать одно и то же сообщение дважды (можно решить с помощью идемпотентных сообщений)
Я полагаю, что он может, если он не может связываться со смещенным хранилищем, например. Использование идемпотентных сообщений решает эту проблему, как вы говорите.
- проблема, ожидающая долгого времени без фиксации смещения, например, период задержки составляет 24 часа, будет дальше от итератора, спать в течение 24 часов, обрабатывать и совершать (тайм-аут сеанса ZK?)
Это не будет проблемой с описанным выше решением, если обработка самого сообщения занимает больше времени ожидания сеанса.
- Как может ZK-сеанс оставаться в живых без каких-либо новых смещений? (установка улья zookeeper.session.timeout.ms может разрешить в мертвом потребителе, не распознавая его).
Снова с вышесказанным вам не нужно устанавливать длительный тайм-аут сеанса.
- какие-либо другие проблемы отсутствуют?
Всегда есть:)