Блокировка обработки очереди Azure в приложении Azure Function
Я создал приложение Azure Function с триггером очереди Azure Storage, которое обрабатывает очередь, в которой каждый элемент очереди является URL-адресом. Функция просто загружает содержимое URL-адреса. У меня есть другая функция, которая загружает и анализирует XML файл сайта и добавляет все URL-адреса страниц в очередь. Проблема заключается в том, что приложение "Функции" запускается слишком быстро, и оно забивает веб-сайт, чтобы он возвращал "Ошибки сервера". Есть ли способ ограничить/уменьшить скорость, с которой работает приложение "Функции"?
Я мог бы, конечно, написать простое веб-задание, которое обрабатывало их серийно (или с некоторым асинхронным, но ограничивало количество одновременных запросов), но мне очень нравилась простота Azure Functions и хотелось попробовать "безсерверные" вычисления.
Ответы
Ответ 1
Есть несколько вариантов, которые вы можете рассмотреть.
Во-первых, есть некоторые кнопки, которые вы можете настроить в host.json
, которые обрабатывают обработку очереди (документировано здесь). Регулятор queues.batchSize
- сколько сообщений очереди выставляется за раз. Если установлено значение 1, время выполнения будет извлекать по одному сообщению за раз, и только для получения следующего, когда обработка этого сообщения будет завершена. Это может привести к некоторому уровню сериализации в одном экземпляре.
Другим вариантом может быть установка NextVisibleTime в сообщениях, которые вы вставляете в очередь, чтобы они были разнесены - по умолчанию сообщения, помещенные в очередь, становятся видимыми и готовы к немедленной обработке.
Последний вариант может заключаться в том, чтобы вы могли помещать сообщение с коллекцией всех URL-адресов для сайта, а не по одному за раз, поэтому, когда сообщение обрабатывается, вы можете последовательно обрабатывать URL-адреса в своей функции и ограничьте parallelism таким образом.
Ответ 2
NextVisibleTime может запутаться, если в очередь добавлено несколько параллельных функций. Еще один простой вариант для тех, у кого возникла эта проблема: создайте другую очередь, "throttled-items", и попросите исходную функцию следовать ей для триггеров очереди. Затем добавьте простую функцию таймера, которая каждую минуту перемещает сообщения из исходной очереди, соответственно помещая интервал NextVisibleTime.
[FunctionName("ThrottleQueueItems")]
public static async Task Run([TimerTrigger("0 * * * * *")] TimerInfo timer, ILogger logger)
{
var originalQueue = // get original queue here;
var throttledQueue = // get throttled queue here;
var itemsPerMinute = 60; // get from app settings
var individualDelay = 60.0 / itemsPerMinute;
var totalRetrieved = 0;
var maxItemsInBatch = 32; // change if you modify the default queue config
do
{
var pending = (await originalQueue.GetMessagesAsync(Math.Min(maxItemsInBatch, htmlPerMinute - totalRetrieved))).ToArray();
if (!pending.Any())
break;
foreach (var message in pending)
{
await throttledQueue.AddMessageAsync(new CloudQueueMessage(message.AsString), null,
TimeSpan.FromSeconds(individualDelay * ++totalRetrieved), null, null);
await originalQueue.DeleteMessageAsync(message);
}
} while (htmlPerMinute > totalRetrieved);
}