Как получить все сообщения в очереди Amazon SQS с помощью библиотеки boto в Python?
Я работаю над приложением, чей рабочий процесс управляется передачей сообщений в SQS, используя boto.
Моя очередь SQS постепенно растет, и я не могу проверить, сколько элементов она должна содержать.
Теперь у меня есть демон, который периодически обследует очередь и проверяет, есть ли у меня набор элементов фиксированного размера. Например, рассмотрим следующую "очередь":
q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]
Теперь я хочу проверить, есть ли у меня "msg1_comp1", "msg2_comp1" и "msg3_comp1" в очереди вместе в какой-то момент времени, но я не знаю размер очереди.
После просмотра API вы можете либо получить только 1 элемент, либо фиксированное количество элементов в очереди, но не все:
>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10
Предложение, предложенное в ответах, состояло в том, чтобы получить, например, 10 сообщений в цикле до тех пор, пока я ничего не получу, но сообщения в SQS имеют тайм-аут видимости, а это означает, что если я опросу элементов из очереди, они не будут действительно удалены, они будут невидимыми только на короткий промежуток времени.
Есть ли простой способ получить все сообщения в очереди, не зная, сколько их есть?
Ответы
Ответ 1
Настройте свой вызов на q.get_messages(n)
внутри цикла while:
all_messages=[]
rs=q.get_messages(10)
while len(rs)>0:
all_messages.extend(rs)
rs=q.get_messages(10)
Кроме того, дамп не будет поддерживать более 10 сообщений:
def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
"""Utility function to dump the messages in a queue to a file
NOTE: Page size must be < 10 else SQS errors"""
Ответ 2
Я работал с очередями AWS SQS для предоставления мгновенных уведомлений, поэтому мне нужно обрабатывать все сообщения в режиме реального времени. Следующий код поможет вам эффективно удалить все сообщения и устранить ошибки при удалении.
Примечание. Для удаления сообщений из очереди их необходимо удалить. Я использую обновленный boto3 AWS python SDK, библиотеку json и следующие значения по умолчанию:
import boto3
import json
region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
messages_to_delete = []
for message in queue.receive_messages(
MaxNumberOfMessages=max_queue_messages):
# process message body
body = json.loads(message.body)
message_bodies.append(body)
# add message to delete
messages_to_delete.append({
'Id': message.message_id,
'ReceiptHandle': message.receipt_handle
})
# if you don't receive any notifications the
# messages_to_delete list will be empty
if len(messages_to_delete) == 0:
break
# delete messages to remove them from SQS queue
# handle any errors
else:
delete_response = queue.delete_messages(
Entries=messages_to_delete)
Ответ 3
Мое понимание заключается в том, что распределенный характер службы SQS в значительной степени делает ваш проект неработоспособным. Каждый раз, когда вы вызываете get_messages, вы говорите с другим набором серверов, на котором будут некоторые, но не все ваши сообщения. Таким образом, невозможно "время от времени проверять", чтобы установить, готова ли определенная группа сообщений, а затем просто принять их.
То, что вам нужно сделать, - это постоянно проводить опрос, принимать все сообщения по мере их поступления и хранить их локально в своих собственных структурах данных. После каждой успешной выборки вы можете проверить свои структуры данных, чтобы узнать, был ли собран полный набор сообщений.
Имейте в виду, что сообщения будут выходить из строя, а некоторые сообщения будут доставлены дважды, так как удаление должно распространяться на все серверы SQS, но последующие запросы иногда выбивают сообщения об удалении.
Ответ 4
Я выполняю это в cronjob
from django.core.mail import EmailMessage
from django.conf import settings
import boto3
import json
sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_REGION)
queue = sqs.get_queue_by_name(QueueName='email')
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)
while len(messages) > 0:
for message in messages:
mail_body = json.loads(message.body)
print("E-mail sent to: %s" % mail_body['to'])
email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']])
email.send()
message.delete()
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)
Ответ 5
Что-то вроде кода ниже должно сделать трюк. Извините, что в С#, но преобразовать его в python не сложно. Словарь используется для отсечения дубликатов.
public Dictionary<string, Message> GetAllMessages(int pollSeconds)
{
var msgs = new Dictionary<string, Message>();
var end = DateTime.Now.AddSeconds(pollSeconds);
while (DateTime.Now <= end)
{
var request = new ReceiveMessageRequest(Url);
request.MaxNumberOfMessages = 10;
var response = GetClient().ReceiveMessage(request);
foreach (var msg in response.Messages)
{
if (!msgs.ContainsKey(msg.MessageId))
{
msgs.Add(msg.MessageId, msg);
}
}
}
return msgs;
}
Ответ 6
ПРИМЕЧАНИЕ. Это не является прямым ответом на вопрос.
Скорее это увеличение ответа @TimothyLiu, предполагая, что конечный пользователь использует пакет Boto
(он же Boto2) не Boto3
. Этот код является "Boto-2-ization" вызова delete_messages
, упомянутого в его ответе
A Boto
(2) вызывает delete_message_batch(messages_to_delete)
, где messages_to_delete
- объект dict
с ключом: значение, соответствующее id
: receipt_handle
пары возвращает
AttributeError: объект 'dict' не имеет атрибута 'id'.
Кажется, что delete_message_batch
ожидает объект класса Message
; копируя источник Boto для delete_message_batch
и позволяя ему использовать объект Message
(ala boto3) также терпит неудачу, если вы удаляете более 10 "сообщений" за раз. Итак, мне пришлось использовать следующую рабочую среду.
код eprint из здесь
from __future__ import print_function
import sys
from itertools import islice
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
@static_vars(counter=0)
def take(n, iterable, reset=False):
"Return next n items of the iterable as same type"
if reset: take.counter = 0
take.counter += n
bob = islice(iterable, take.counter-n, take.counter)
if isinstance(iterable, dict): return dict(bob)
elif isinstance(iterable, list): return list(bob)
elif isinstance(iterable, tuple): return tuple(bob)
elif isinstance(iterable, set): return set(bob)
elif isinstance(iterable, file): return file(bob)
else: return bob
def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False
"""
Deletes a list of messages from a queue in a single request.
:param cx: A boto connection object.
:param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted
:param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects.
"""
listof10s = []
asSuc, asErr, acS, acE = "","",0,0
res = []
it = tuple(enumerate(messages))
params = {}
tenmsg = take(10,it,True)
while len(tenmsg)>0:
listof10s.append(tenmsg)
tenmsg = take(10,it)
while len(listof10s)>0:
tenmsg = listof10s.pop()
params.clear()
for i, msg in tenmsg: #enumerate(tenmsg):
prefix = 'DeleteMessageBatchRequestEntry'
numb = (i%10)+1
p_name = '%s.%i.Id' % (prefix, numb)
params[p_name] = msg.get('id')
p_name = '%s.%i.ReceiptHandle' % (prefix, numb)
params[p_name] = msg.get('receipt_handle')
try:
go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST')
(sSuc,cS),(sErr,cE) = tup_result_messages(go)
if cS:
asSuc += ","+sSuc
acS += cS
if cE:
asErr += ","+sErr
acE += cE
except cx.ResponseError:
eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
except:
eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res
def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0):
if sSuc == "": sSuc="None"
if sErr == "": sErr="None"
if cS == expect: sSuc="All"
if cE == expect: sErr="All"
return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr)