Лучший способ переместить сообщения с DLQ в Amazon SQS?
Какова наилучшая практика перемещения сообщений из очереди с мертвой буквой обратно в исходную очередь в Amazon SQS?
Будет ли это
- Получить сообщение из DLQ
- Запись сообщения в очередь
- Удалить сообщение из DLQ
Или есть более простой способ?
Кроме того, будет ли AWS в конечном итоге иметь инструмент в консоли для перемещения сообщений с DLQ?
Ответы
Ответ 1
Это похоже на ваш лучший вариант. Существует вероятность того, что ваш процесс завершится с ошибкой после шага 2. В этом случае вы в конечном итоге скопируете сообщение дважды, но вы все равно должны обрабатывать повторную доставку сообщений (или не заботьтесь).
Ответ 2
Вот быстрый хак. Это определенно не лучший или рекомендуемый вариант.
- Задайте основную очередь SQS как DLQ для фактического DLQ с максимальным приемом как 1.
- Просмотр содержимого в DLQ (это переместит сообщения в основную очередь, так как это DLQ для фактического DLQ)
- Удалите настройку, чтобы главная очередь больше не была DLQ фактического DLQ
Ответ 3
Не нужно перемещать сообщение, потому что оно будет сопряжено со многими другими проблемами, такими как дублирование сообщений, сценарии восстановления, потерянное сообщение, проверка дедупликации и т.д.
Вот решение, которое мы реализовали -
Обычно мы используем DLQ для временных ошибок, а не для постоянных ошибок. Так взял ниже подход -
-
Прочитайте сообщение из DLQ как обычную очередь
Выгоды - Чтобы избежать дублирования обработки сообщений
- Лучший контроль над DLQ- Как я поставил проверку, чтобы обрабатывать только тогда, когда обычная очередь полностью обработана.
- Масштабировать процесс на основе сообщения на DLQ
-
Затем следуйте тому же коду, который следует обычной очереди.
-
Более надежный в случае прерывания работы или если процесс был прерван во время обработки (например, экземпляр остановлен или процесс остановлен)
Выгоды - Повторное использование кода
- Обработка ошибок
- Восстановление и воспроизведение сообщений
-
Расширьте видимость сообщения, чтобы никакой другой поток не обрабатывал их.
Выгода - Избегайте обработки одной и той же записи несколькими потоками.
-
Удалите сообщение только в том случае, если произошла постоянная ошибка или успешно.
Выгода - Продолжайте обработку, пока мы не получим временную ошибку.
Ответ 4
здесь:
import boto3
import sys
import Queue
import threading
work_queue = Queue.Queue()
sqs = boto3.resource('sqs')
from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)
from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)
def process_queue():
while True:
messages = work_queue.get()
bodies = list()
for i in range(0, len(messages)):
bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})
to_q.send_messages(Entries=bodies)
for message in messages:
print("Coppied " + str(message.body))
message.delete()
for i in range(10):
t = threading.Thread(target=process_queue)
t.daemon = True
t.start()
while True:
messages = list()
for message in from_q.receive_messages(
MaxNumberOfMessages=10,
VisibilityTimeout=123,
WaitTimeSeconds=20):
messages.append(message)
work_queue.put(messages)
work_queue.join()
Ответ 5
Для этого я написал небольшой скрипт на python, используя boto3 lib:
conf = {
"sqs-access-key": "",
"sqs-secret-key": "",
"reader-sqs-queue": "",
"writer-sqs-queue": "",
"message-group-id": ""
}
import boto3
client = boto3.client(
'sqs',
aws_access_key_id = conf.get('sqs-access-key'),
aws_secret_access_key = conf.get('sqs-secret-key')
)
while True:
messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)
if 'Messages' in messages:
for m in messages['Messages']:
print(m['Body'])
ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
print(ret)
client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
else:
print('Queue is currently empty or messages are invisible')
break
Вы можете получить этот скрипт по этой ссылке
этот скрипт в основном может перемещать сообщения между произвольными очередями. и он поддерживает очереди fifo, а также вы можете указать поле message_group_id
.
Ответ 6
Есть еще один способ добиться этого без написания одной строки кода. Учтите, что ваше действительное имя очереди - SQS_Queue, а DLQ для него - SQS_DLQ. Теперь выполните следующие действия:
- Установите SQS_Queue как dlq для SQS_DLQ. Поскольку SQS_DLQ уже является dlq SQS_Queue. Теперь оба выступают в роли других.
- Установите максимальный счетчик приема вашего SQS_DLQ равным 1.
- Теперь читайте сообщения из консоли SQS_DLQ. Так как количество полученных сообщений равно 1, оно отправит все сообщения в свой собственный dlq, который является вашей действительной очередью SQS_Queue.
Ответ 7
Мы используем следующий скрипт для перенаправления сообщения из очереди src в очередь tgt:
имя файла: redrive.py
использование: python redrive.py -s {source queue name} -t {target queue name}
'''
This script is used to redrive message in (src) queue to (tgt) queue
The solution is to set the Target Queue as the Source Queue Dead Letter Queue.
Also set Source Queue redrive policy, Maximum Receives to 1.
Also set Source Queue VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.
Source Queue Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--src', required=True,
help='Name of source SQS')
parser.add_argument('-t', '--tgt', required=True,
help='Name of targeted SQS')
args = parser.parse_args()
return args
def verify_queue(queue_name):
queue_url = sqs.get_queue_url(QueueName=queue_name)
return True if queue_url.get('QueueUrl') else False
def get_queue_attribute(queue_url):
queue_attributes = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['All'])['Attributes']
print(queue_attributes)
return queue_attributes
def main():
args = parse_args()
for q in [args.src, args.tgt]:
if not verify_queue(q):
print(f"Cannot find {q} in AWS SQS")
src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']
target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
target_queue_attributes = get_queue_attribute(target_queue_url)
# Set the Source Queue Redrive policy
redrive_policy = {
'deadLetterTargetArn': target_queue_attributes['QueueArn'],
'maxReceiveCount': '1'
}
sqs.set_queue_attributes(
QueueUrl=src_queue_url,
Attributes={
'VisibilityTimeout': '5',
'RedrivePolicy': json.dumps(redrive_policy)
}
)
get_queue_attribute(src_queue_url)
# read all messages
num_received = 0
while True:
try:
resp = sqs.receive_message(
QueueUrl=src_queue_url,
MaxNumberOfMessages=10,
AttributeNames=['All'],
WaitTimeSeconds=5)
num_message = len(resp.get('Messages', []))
if not num_message:
break
num_received += num_message
except Exception:
break
print(f"Redrive {num_received} messages")
# Reset the Source Queue Redrive policy
sqs.set_queue_attributes(
QueueUrl=src_queue_url,
Attributes={
'VisibilityTimeout': '30',
'RedrivePolicy': ''
}
)
get_queue_attribute(src_queue_url)
if __name__ == "__main__":
main()