Лучший способ в .NET управлять очередью задач на отдельном (одном) потоке
Я знаю, что за эти годы асинхронное программирование сильно изменилось. Я несколько смущен, что позволил себе получить этот ржавый всего лишь 34 года, но я рассчитываю на StackOverflow, чтобы довести меня до скорости.
То, что я пытаюсь сделать, это управлять очередью "работы" в отдельном потоке, но таким образом, что обрабатывается только один элемент за раз. Я хочу опубликовать работу над этим потоком, и ему не нужно передавать что-либо обратно вызывающему. Конечно, я мог бы просто развернуть новый объект Thread
и пропустить его через общий объект Queue
, используя спящие, прерывания, дескрипторы ожидания и т.д. Но я знаю, что с тех пор все стало лучше. Мы имеем BlockingCollection
, Task
, async
/await
, не говоря уже о пакетах NuGet, которые, вероятно, абстрагируются от этого.
Я знаю, что вопросы "Какие лучшие...", как правило, неодобрились, поэтому я буду перефразировать его, сказав "Что такое рекомендуемый..." способ сделать что-то подобное с помощью встроенных механизмов .NET предпочтительно. Но если сторонний пакет NuGet упрощает вещание, это тоже хорошо.
Я рассмотрел экземпляр TaskScheduler
с фиксированным максимумом concurrency из 1, но, похоже, на данный момент существует, вероятно, гораздо менее неуклюжий способ сделать это.
Фон
В частности, то, что я пытаюсь сделать в этом случае, - это очередь задачи геолокации IP во время веб-запроса. Один и тот же IP-адрес может несколько раз оказаться в очереди на геолокацию, но задача будет знать, как его обнаружить и ускорить, если он уже был разрешен. Но обработчик запроса просто собирается отправить эти вызовы () => LocateAddress(context.Request.UserHostAddress)
в очередь и позволить методу LocateAddress
обрабатывать двойное обнаружение работы. API-интерфейс геолокации, который я использую, не любит бомбардировать запросы, поэтому я хочу ограничить его одной одновременной задачей одновременно. Тем не менее, было бы неплохо, если бы подход позволил легко масштабировать до более параллельных задач с простым изменением параметров.
Ответы
Ответ 1
Чтобы создать асинхронную одиночную степень очереди parallelism, вы можете просто создать SemaphoreSlim
, инициализированный одним, а затем применить метод enqueing await
при приобретении этого семафора до начала запрошенной работы.
public class TaskQueue
{
private SemaphoreSlim semaphore;
public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}
public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}
Конечно, чтобы иметь фиксированную степень parallelism, кроме одной, просто инициализировать семафор другому числу.
Ответ 2
Ваш лучший вариант, как я вижу, это использовать TPL Dataflow
ActionBlock
:
var actionBlock = new ActionBlock<string>(address =>
{
if (!IsDuplicate(address))
{
LocateAddress(address);
}
});
actionBlock.Post(context.Request.UserHostAddress);
TPL Dataflow
- надежная, потокобезопасная, async
- уже и очень настраиваемая структура, основанная на актерах (доступна как nuget)
Вот простой пример для более сложного случая. Предположим, вы хотите:
- Включить concurrency (ограничен доступными ядрами).
- Ограничьте размер очереди (чтобы у вас не хватило памяти).
- Имейте как
LocateAddress
, так и вставку очереди async
.
- Отмените все через час.
var actionBlock = new ActionBlock<string>(async address =>
{
if (!IsDuplicate(address))
{
await LocateAddressAsync(address);
}
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});
await actionBlock.SendAsync(context.Request.UserHostAddress);
Ответ 3
Используйте BlockingCollection<Action>
, чтобы создать шаблон производителя/потребителя с одним потребителем (только одна вещь, работающая одновременно, как вы хотите) и один или несколько производителей.
Сначала определите общую очередь:
BlockingCollection<Action> queue = new BlockingCollection<Action>();
В вашем потребителе Thread
или Task
вы берете из него:
//This will block until there an item available
Action itemToRun = queue.Take()
Затем из любого числа производителей на другие потоки просто добавьте в очередь:
queue.Add(() => LocateAddress(context.Request.UserHostAddress));
Ответ 4
На самом деле вам не нужно запускать задачи в одном потоке, вам нужно, чтобы они запускались последовательно (один за другим) и FIFO. У TPL нет класса для этого, но вот моя реализация с тестами. https://github.com/Gentlee/SerialQueue
Также есть реализация @Servy, тесты показывают, что он в два раза медленнее, чем мой, и он не гарантирует FIFO.