Azure: Как перенести сообщения из очереди ядов обратно в основную очередь?
Мне интересно, есть ли инструмент или библиотека, которая может перемещать сообщения между очередями? В настоящее время я делаю что-то вроде ниже
public static void ProcessQueueMessage([QueueTrigger("myqueue-poison")] string message, TextWriter log)
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("myqueue");
queue.CreateIfNotExists();
var messageData = JsonConvert.SerializeObject(data, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() });
queue.AddMessage(new CloudQueueMessage(messageData));
}
Ответы
Ответ 1
По сути, хранилище Azure не поддерживает перемещение сообщений из одной очереди в другую. Вам нужно будет сделать это самостоятельно.
Одним из способов реализации перемещения сообщений из одной очереди в другую является удаление сообщений из исходной очереди (путем вызова GetMessages
), чтение содержимого сообщения и создание нового сообщения в целевой очереди. Это вы можете сделать с помощью Storage Client Library.
Одним из инструментов, который приходит мне в голову для перемещения сообщений, является Cerebrata Azure Management Studio
. У него есть этот функционал.
По состоянию на (2018-09-11) версия 1.4.1 обозревателя хранилищ Microsoft Azure не поддерживает перемещение сообщений очереди.
Ответ 2
По состоянию на (2018-09-11) версия 1.4.1 обозревателя хранилищ Microsoft Azure не имеет возможности перемещать сообщения из одной очереди Azure в другую.
Я написал в блоге простое решение для передачи ядовитых сообщений обратно в исходную очередь и подумал, что это может сэкономить кому-то несколько минут. Очевидно, вам нужно исправить ошибку, из-за которой сообщения попадали в очередь подозрительных сообщений!
Вам нужно добавить ссылку на пакет NuGet в Microsoft.NET.Sdk.Functions:
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
void Main()
{
const string queuename = "MyQueueName";
string storageAccountString = "xxxxxx";
RetryPoisonMesssages(storageAccountString, queuename);
}
private static int RetryPoisonMesssages(string storageAccountString, string queuename)
{
CloudQueue targetqueue = GetCloudQueueRef(storageAccountString, queuename);
CloudQueue poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");
int count = 0;
while (true)
{
var msg = poisonqueue.GetMessage();
if (msg == null)
break;
poisonqueue.DeleteMessage(msg);
targetqueue.AddMessage(msg);
count++;
}
return count;
}
private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference(queuename);
return queue;
}
Ответ 3
Здесь скрипт Python вы можете найти полезным. Вам нужно будет установить azure-storage-queue
queueService = QueueService(connection_string = "YOUR CONNECTION STRING")
for queue in queueService.list_queues():
if "poison" in queue.name:
print(queue.name)
targetQueueName = queue.name.replace("-poison", "")
while queueService.peek_messages(queue.name):
for message in queueService.get_messages(queue.name, 32):
print(".", end="", flush=True)
queueService.put_message(targetQueueName, message.content)
queueService.delete_message(queue.name, message.id, message.pop_receipt)
Ответ 4
Любой, кто приходит сюда, ищет Node-эквивалент ответа @MitchWheats с помощью функции Azure.
import AzureStorage from 'azure-storage'
import { Context, HttpRequest } from '@azure/functions'
import util from 'util'
const queueService = AzureStorage.createQueueService()
queueService.messageEncoder = new AzureStorage.QueueMessageEncoder.TextBase64QueueMessageEncoder()
const deleteMessage = util.promisify(queueService.deleteMessage).bind(queueService)
const createMessage = util.promisify(queueService.createMessage).bind(queueService)
const getMessage = util.promisify(queueService.getMessage).bind(queueService)
export async function run (context: Context, req: HttpRequest): Promise<void> {
try {
const poisonQueue = (req.query.queue || (req.body && req.body.queue));
const targetQueue = poisonQueue.split('-')[0]
let count = 0
while (true) {
const message = await getMessage(poisonQueue)
if (!message) { break; }
if (message.messageText && message.messageId && message.popReceipt) {
await createMessage(targetQueue, message.messageText)
await deleteMessage(poisonQueue, message.messageId, message.popReceipt)
}
count++
}
context.res = {
body: 'Replayed ${count} messages from ${poisonQueue} on ${targetQueue}'
};
} catch (e) {
context.res = { status: 500 }
}
}
Чтобы использовать эту функцию, вам необходимо предоставить информацию о соединении для учетной записи хранения, используемой для очередей хранения. Это предоставляется в качестве переменных среды. Либо вы предоставляете AZURE_STORAGE_ACCOUNT
и AZURE_STORAGE_ACCESS_KEY
, либо AZURE_STORAGE_CONNECTION_STRING
. Дополнительные сведения об этом доступны в документации по Azure Storage SDK.
Также написал несколько строк об этом в этой статье среднего
Ответ 5
Здесь обновленная версия ответа Митча, используя последний пакет Microsoft.Azure.Storage.Queue. Просто создайте новое консольное приложение .NET, добавьте в него вышеупомянутый пакет и замените содержимое Program.cs следующим:
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using System.Threading.Tasks;
namespace PoisonMessageDequeuer
{
class Program
{
static async Task Main(string[] args)
{
const string queuename = "MyQueueName";
string storageAccountString = "xxx";
await RetryPoisonMesssages(storageAccountString, queuename);
}
private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename)
{
var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
var poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");
var count = 0;
while (true)
{
var msg = await poisonqueue.GetMessageAsync();
if (msg == null)
break;
await poisonqueue.DeleteMessageAsync(msg);
await targetqueue.AddMessageAsync(msg);
count++;
}
return count;
}
private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
{
var storageAccount = CloudStorageAccount.Parse(storageAccountString);
var queueClient = storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(queuename);
return queue;
}
}
}
Это все еще довольно медленно, если вы работаете с> 1000 сообщениями, поэтому я рекомендую изучить пакетные API для больших количеств.