Лучшая практика для опроса очереди AWS SQS и удаления полученных сообщений из очереди?
У меня есть очередь SQS, которая постоянно заполняется потребителем данных, и теперь я пытаюсь создать службу, которая вытащит эти данные из SQS с помощью Python boto.
То, как я это разработал, заключается в том, что у меня будет 10-20 потоков, которые пытаются читать сообщения из очереди SQS, а затем делать то, что они должны делать с данными (бизнес-логикой), прежде чем вернуться в очередь, чтобы получить следующую партию данных, как только они будут сделаны. Если нет данных, они будут просто ждать, пока не будут доступны некоторые данные.
У меня есть две области, о которых я не уверен в этом дизайне
- Это вопрос вызова метода receive_message() с длинным значением time_out, и если ничего не возвращается за 20 секунд (максимально допустимое), просто повторите попытку? Или существует метод блокировки, который возвращается только после того, как доступны данные?
- Я заметил, что как только я получаю сообщение, он не удаляется из очереди, мне нужно получить сообщение, а затем отправить другой запрос после его получения, чтобы удалить его из очереди? похоже, немного перебор.
Спасибо
Ответы
Ответ 1
Функция long-poll метода receive_message()
является наиболее эффективным способом опроса SQS. Если это произойдет без каких-либо сообщений, я бы порекомендовал короткую задержку перед повторной попыткой, особенно если у вас несколько читателей. Возможно, вы захотите даже выполнить дополнительную задержку, чтобы каждое последующее пустое чтение немного задержалось, так что вы не получите дросселирования AWS.
И да, вам нужно удалить сообщение после того, как прочитали или оно появится снова в очереди. Это может быть очень полезно в случае, если рабочий читает сообщение, а затем проваливается, прежде чем он сможет полностью обработать сообщение. В этом случае он будет перезагружен и прочитан другим работником. Вы также хотите убедиться, что тайм-аут невидимости сообщений установлен достаточно долго, чтобы у рабочего было достаточно времени для обработки сообщения, прежде чем оно автоматически появится в очереди. Если это необходимо, ваши работники могут настроить таймаут по мере его обработки, если он занимает больше времени, чем ожидалось.
Ответ 2
Если вам нужен простой способ настроить прослушиватель, который включает автоматическое удаление сообщений по окончании их обработки и автоматическую отправку исключений в указанную очередь, вы можете использовать пакет pySqsListener.
Вы можете настроить слушателя так:
from sqs_listener import SqsListener
class MyListener(SqsListener):
def handle_message(self, body, attributes, messages_attributes):
run_my_function(body['param1'], body['param2']
listener = MyListener('my-message-queue', 'my-error-queue')
listener.listen()
Существует флаг для переключения с короткого опроса на длинный - все это задокументировано в файле README.
Отказ от ответственности: я являюсь автором указанного пакета.
Ответ 3
Другой вариант - настроить приложение-работник с использованием AWS Beanstalk, как описано в этот блогпост.
Вместо длительного опроса с использованием boto3, ваше приложение флешки получает сообщение как объект json в сообщении HTTP. HTTP-путь и тип отправляемого сообщения настраиваются на вкладке конфигурации AWS Elastic Beanstalk Configuration:
![введите описание изображения здесь]()
AWS Elastic Beanstalk обладает дополнительным преимуществом, позволяя динамически масштабировать количество рабочих в зависимости от размера вашей очереди SQS и преимуществ управления развертыванием.
Это - пример приложения, которое я нашел полезным в качестве шаблона.