Amazon Kinesis & AWS Lambda Retries
Я очень новичок в Amazon Kinesis, поэтому, возможно, это просто проблема в моем понимании, но в FAQ AWS Lambda FAQ он говорит:
Записи Amazon Kinesis и DynamoDB Streams, отправленные на вашу функцию AWMS Lambda, строго сериализованы за каждый осколок. Это означает, что если вы поместите две записи в один и тот же осколок, Lambda гарантирует, что ваша функция Lambda будет успешно вызвана с первой записью, прежде чем она будет вызвана со второй записью. Если вызов для одной записи истекает, дросселируется или встречается с любой другой ошибкой, Lambda будет повторять попытку, пока она не удастся (или запись не достигнет 24-часового срока действия), прежде чем перейти к следующей записи. Порядок записи записей по различным осколкам не гарантируется, и обработка каждого осколка происходит параллельно.
Мой вопрос: что произойдет, если по какой-то причине некоторые некорректные данные попадут на осколок производителя и когда функция лямбда забирает его из-за ошибок, а затем просто продолжает повторять попытку? Это означает, что обработка этого конкретного осколка будет заблокирована в течение 24 часов по ошибке.
Лучше всего обрабатывать такие ошибки приложения, обертывая проблему в пользовательской ошибке и отправляя эту ошибку вниз по течению вместе со всеми успешно обработанными записями и позволяя ее обрабатывать? Конечно, это все равно не помогло бы в случае неустранимой ошибки, которая разбила бы программу как нулевой указатель: снова мы вернемся к циклу повторной блокировки в течение следующих 24 часов.
Ответы
Ответ 1
Не переусердствуйте, Кинезис - это просто очередь. Вы должны успешно использовать запись (то есть, поп из очереди), чтобы перейти к следующей. Также как стек FIFO.
Соответствующий подход должен быть:
- Получить запись из потока.
- Обработать его в блоке try-catch-finally.
- Если запись успешно обработана, не проблема. < - TRY
- Но если это не удается, обратите внимание на другое место, чтобы исследовать
причина, почему это не удалось. < - CATCH
- И в конце ваших логических блоков всегда сохраняйте позицию
DynamoDB. < - FINALLY
- Если в вашей системе происходит внутреннее (ошибка памяти, аппаратная ошибка)
и т.д.), это еще одна история; поскольку это может повлиять на обработку всех
записи, а не только один.
Кстати, если обработка записи занимает более 1 минуты, очевидно, что вы делаете что-то неправильно. Поскольку Kinesis предназначен для обработки тысяч записей в секунду, у вас не должно быть роскоши обработки таких длинных заданий для каждого из них.
Вопрос, который вы задаете, является общей проблемой систем очередей, иногда называемой "ядовитым сообщением". Вы должны обращаться с ними в своей бизнес-логике, чтобы быть в безопасности.
http://www.cogin.com/articles/SurvivingPoisonMessages.php#PoisonMessages
Ответ 2
Это общий вопрос по обработке событий в Kinesis, и я попытаюсь дать вам несколько моментов для создания вашей лямбда-функции для обработки таких проблем с "поврежденными" данными. Поскольку наилучшей практикой является разделение отдельных частей вашей системы на поток Kinesis и другие части, читаемые из потока Kinesis, часто бывает, что у вас будут такие проблемы.
Во-первых, почему у вас возникают такие проблемные события?
Использование Kinesis для обработки ваших событий - хороший способ разбить сложную систему, которая выполняет как обработку переднего плана (обслуживая конечных пользователей), так и в то же время/обработку кода (анализ событий), в две независимые компоненты вашей системы. Первичные люди могут сосредоточиться на своем бизнесе, в то время как сторонним людям не нужно вводить изменения кода в интерфейсный модуль, если они хотят добавить функциональность для обслуживания своих аналитических вариантов использования. Kinesis - это буфер событий, который как разрывает необходимость синхронизации, так и упрощает код бизнес-логики.
Поэтому мы хотели бы, чтобы события, написанные в потоке, были гибкими в своей "схеме", и если команды переднего плана хотят изменить формат события, добавить поля, удалить поля, изменить протокол или ключи шифрования, они должны быть способны делать это так часто, как хотят.
Теперь все команды, которые читают из потока, могут эффективно обрабатывать такие гибкие события, а не прерывать их обработку каждый раз, когда происходят такие изменения. Поэтому должно быть распространено, что ваша функция лямбда увидит события, которые она не может обработать, а " ядовитая таблетка" - это не редкое событие, как вы могли ожидать.
Во-вторых, как вы справляетесь с такими проблемными событиями?
Функция Lambda получит пакет событий для обработки. Обратите внимание, что вы не должны получать события один за другим, но в больших партиях событий. Если ваши партии слишком малы, вы быстро получите большие задержки в потоке.
Для каждой партии вы будете перебирать события, обрабатывать их, а затем контрольно-пропускную точку в DynamoDB - последний идентификатор последовательности пакета. Lambda выполняет большую часть этих шагов автоматически (см. Здесь: http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser-create-test-function.html):
console.log('Loading function');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
context.succeed();
};
Это то, что происходит в "счастливом пути", если все события обрабатываются без каких-либо проблем. Но если вы столкнулись с какой-либо проблемой в пакете, и вы не "зафиксируете" события с уведомлением об успешном завершении, пакет выйдет из строя, и вы снова получите все события в пакете.
Теперь вам нужно решить, в чем причина сбоя в обработке.
-
Временная проблема (дросселирование, проблема с сетью...) - все в порядке, чтобы подождать секунду и повторите попытку пару раз. Во многих случаях проблема будет разрешаться сама.
-
Особая проблема (из памяти...) - лучше увеличить распределение памяти функции лямбда или уменьшить размер партии. Во многих случаях такое изменение решит проблему.
-
Ошибка констант - это означает, что вам нужно либо игнорировать проблемное событие (поместить его в DLQ - мертвую букву), либо изменить свой код для его обработки.
Проблема заключается в том, чтобы идентифицировать тип сбоя в коде и обрабатывать его по-разному. Вы должны написать свой Лямбда-код, чтобы идентифицировать его (например, тип исключения) и реагировать по-разному.
Вы можете использовать интеграцию с CloudWatch для записи таких сбоев в консоли и создания соответствующих аварийных сигналов. Вы также можете использовать журналы CloudWatch, чтобы зарегистрировать свою "мертвую букву" и посмотреть, что является источником проблемы.