Как правильно поставить очередь задач для запуска в С#
У меня есть перечисление элементов (RunData.Demand
), каждый из которых представляет собой некоторую работу, связанную с вызовом API через HTTP. Он отлично работает, если я всего лишь foreach
через все это и вызываю API во время каждой итерации. Однако каждая итерация занимает секунду или два, поэтому я бы хотел запустить 2-3 потока и разделить работу между ними. Вот что я делаю:
ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads
var tasks = RunData.Demand
.Select(service => Task.Run(async delegate
{
var availabilityResponse = await client.QueryAvailability(service);
// Do some other stuff, not really important
}));
await Task.WhenAll(tasks);
Вызов client.QueryAvailability
в основном вызывает API с помощью класса HttpClient
:
public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request)
{
var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request);
if (response.IsSuccessStatusCode)
{
return await response.Content.ReadAsAsync<QueryAvailabilityResponse>();
}
throw new HttpException((int) response.StatusCode, response.ReasonPhrase);
}
Это отлично работает некоторое время, но в конечном итоге все начинает отсчитываться. Если я установил HttpClient Timeout на час, тогда я начну получать странные внутренние ошибки сервера.
То, что я начал делать, это установить секундомер в методе QueryAvailability
, чтобы узнать, что происходит.
Что происходит, все 1200 элементов в RunData.Demand создаются сразу и все 1200 await client.PostAsJsonAsync
методов вызывают. Похоже, что он использует 2 потока для медленной проверки задач, поэтому к концу у меня есть задачи, ожидающие 9 или 10 минут.
Здесь поведение, которое я бы хотел:
Я хотел бы создать 1200 задач, а затем запускать их по 3-4 в то время, когда потоки становятся доступными. Я не хочу сразу поставить в очередь 1200 HTTP-вызовов.
Есть ли хороший способ сделать это?
Ответы
Ответ 1
Как я всегда рекомендую.. вам нужен поток данных TPL (для установки: Install-Package Microsoft.Tpl.Dataflow
).
Вы создаете ActionBlock
с действием, выполняемым для каждого элемента. Установите MaxDegreeOfParallelism
для дросселирования. Начните публикацию с него и дождитесь его завершения:
var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service =>
{
var availabilityResponse = await client.QueryAvailability(service);
// ...
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
foreach (var service in RunData.Demand)
{
block.Post(service);
}
block.Complete();
await block.Completion;
Ответ 2
Вы используете асинхронные HTTP-вызовы, поэтому ограничение количества потоков не поможет (и не будет ParallelOptions.MaxDegreeOfParallelism
в Parallel.ForEach
, как предлагает один из ответов). Даже один поток может инициировать все запросы и обрабатывать результаты по мере их поступления.
Один из способов решения этой проблемы - использовать поток данных TPL.
Другим приятным решением является разделение источника IEnumerable
на разделы и элементы процесса в каждом разделе последовательно, как описано в этом сообщении в блоге:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
Ответ 3
В то время как библиотека Dataflow отличная, я думаю, что она немного тяжелая, когда не используется состав блоков. Я хотел бы использовать что-то вроде метода расширения ниже.
Кроме того, в отличие от метода Partitioner, это запускает методы async в вызывающем контексте - оговорка заключается в том, что если ваш код не является действительно асинхронным или принимает "быстрый путь", тогда он будет эффективно работать синхронно, поскольку нити не являются явно созданный.
public static async Task RunParallelAsync<T>(this IEnumerable<T> items, Func<T, Task> asyncAction, int maxParallel)
{
var tasks = new List<Task>();
foreach (var item in items)
{
tasks.Add(asyncAction(item));
if (tasks.Count < maxParallel)
continue;
var notCompleted = tasks.Where(t => !t.IsCompleted).ToList();
if (notCompleted.Count >= maxParallel)
await Task.WhenAny(notCompleted);
}
await Task.WhenAll(tasks);
}
Ответ 4
Старый вопрос, но я хотел бы предложить альтернативное облегченное решение, используя класс SemaphoreSlim. Просто ссылку System.Threading.
SemaphoreSlim sem = new SemaphoreSlim(4,4);
foreach (var service in RunData.Demand)
{
await sem.WaitAsync();
Task t = Task.Run(async () =>
{
var availabilityResponse = await client.QueryAvailability(serviceCopy));
// do your other stuff here with the result of QueryAvailability
}
t.ContinueWith(sem.Release());
}
Семафор действует как механизм блокировки. Вы можете только ввести семафор, вызвав Wait (WaitAsync), который вычитает один из счетчика. Вызов release добавляет один к счету.