Очистка очереди служебной шины azure за один раз
Мы используем очередь служебных шин в нашем проекте. Мы нуждаемся в функциональности для удаления всех сообщений из очереди, когда администратор выбирает очистить очередь. Я искал в сети, но не смог найти какую-либо функцию, которая делает это внутри класса QueueClient
.
Должен ли я вызывать все сообщения один за другим, а затем пометить их полностью, чтобы очистить очередь или есть лучший способ?
QueueClient queueClient = _messagingFactory.CreateQueueClient(
queueName, ReceiveMode.PeekLock);
BrokeredMessage brokeredMessage = queueClient.Receive();
while (brokeredMessage != null )
{
brokeredMessage.Complete();
brokeredMessage = queueClient.Receive();
}
Ответы
Ответ 1
Использование метода Receive()
в цикле while, как и вы, приведет к тому, что ваш код будет запущен неограниченно, как только очередь будет пустой, поскольку метод Receive()
будет ожидать появления другого сообщения в очереди.
Если вы хотите, чтобы это запускалось автоматически, попробуйте использовать метод Peek()
.
Например:
while (queueClient.Peek() != null)
{
var brokeredMessage = queueClient.Receive();
brokeredMessage.Complete();
}
Вы можете сделать это проще с помощью ReceiveMode.ReceiveAndDelete
, как было упомянуто hocho.
Ответ 2
Использование:
вы можете написать метод, который пуст очереди служебной шины или темы/подписки:
MessageReceiver messageReceiver = ...
while (messageReceiver.Peek() != null)
{
// Batch the receive operation
var brokeredMessages = messageReceiver.ReceiveBatch(300);
// Complete the messages
var completeTasks = brokeredMessages.Select(m => Task.Run(() => m.Complete())).ToArray();
// Wait for the tasks to complete.
Task.WaitAll(completeTasks);
}
Ответ 3
Для Azure-ServiceBus-Queues существует ReceiveBatch
-метод, который позволяет вам получать пакет n-сообщений в то время, В сочетании с ReceiveMode.ReceiveAndDelete
вы можете очистить очередь более эффективно.
Caveat Число n сообщений может быть возвращено, но оно не гарантируется. Также существует ограничение на размер пакета сообщений 256K.
Ответ 4
Существует простой метод Clear()
для очистки всей очереди, если вы используете библиотеку WindowsAzure.Storage из nuget. Я использую класс Microsoft.Windows.Azure.Queue
из этой библиотеки для управления очередью. В противном случае вы можете получить доступ через API за свою документацию. Я не знаю, как долго этот метод использовался в библиотеке Azure, и, вероятно, этого не было, когда вопрос был первоначально задан, но API REST проистекает с по крайней мере 2014 на это сообщение обратной связи Azure
Полный код .NET для очистки очереди с помощью библиотеки Azure:
string connectionString = "YourAzureConnectionStringHere";
string queueName = "YourWebJobQueueName";
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
// Create the queue client, then get a reference to queue
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
queue = queueClient.GetQueueReference(GetQueueName(queueName));
// Clear the entire queue
queue.Clear();
Ответ 5
У меня хорошие результаты, используя комбинацию ReceiveAndDelete, PrefetchCount, ReceiveBatchAsync и простой цикл истины вместо использования Peek. Пример с MessagingFactory ниже:
var receiverFactory = MessagingFactory.CreateFromConnectionString("ConnString");
var receiver = receiverFactory.CreateMessageReceiver("QName", ReceiveMode.ReceiveAndDelete);
receiver.PrefetchCount = 300;
bool loop = true;
while (loop)
{
var messages = await receiver.ReceiveBatchAsync(300, TimeSpan.FromSeconds(1));
loop = messages.Any();
}
Требуется только пакет Nuget для WindowsAzure.ServiceBus.
Ответ 6
Самый быстрый способ очистки Azure ServiceBus Queue - установить очень короткий DefaultMessageTimeToLive
, подождать несколько секунд, попробовать получить из очереди, чтобы он обновил, а затем восстановить начальный DefaultMessageTimeToLive
.
Вы можете сделать это с портала или из кода:
var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
var queueDescription = _namespaceManager.GetQueue(queueName);
var queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.ReceiveAndDelete);
var dl = queueDescription.EnableDeadLetteringOnMessageExpiration;
var ttl = queueDescription.DefaultMessageTimeToLive;
queueDescription.EnableDeadLetteringOnMessageExpiration = false;
queueDescription.DefaultMessageTimeToLive = TimeSpan.FromSeconds(1);
Thread.Sleep(5000);
var dumy = queueClient.ReceiveBatch(200, TimeSpan.FromSeconds(1)).ToArray();
queueDescription.EnableDeadLetteringOnMessageExpiration = dl;
queueDescription.DefaultMessageTimeToLive = ttl;