Как правильно поставить очередь задач для запуска в С#

У меня есть перечисление элементов (RunData.Demand), каждый из которых представляет собой некоторую работу, связанную с вызовом API через HTTP. Он отлично работает, если я всего лишь foreach через все это и вызываю API во время каждой итерации. Однако каждая итерация занимает секунду или два, поэтому я бы хотел запустить 2-3 потока и разделить работу между ними. Вот что я делаю:

ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads
var tasks = RunData.Demand
   .Select(service => Task.Run(async delegate
   {
      var availabilityResponse = await client.QueryAvailability(service);
      // Do some other stuff, not really important
   }));

await Task.WhenAll(tasks);

Вызов client.QueryAvailability в основном вызывает API с помощью класса HttpClient:

public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request)
{
   var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request);

   if (response.IsSuccessStatusCode)
   {
      return await response.Content.ReadAsAsync<QueryAvailabilityResponse>();
   }

   throw new HttpException((int) response.StatusCode, response.ReasonPhrase);
}

Это отлично работает некоторое время, но в конечном итоге все начинает отсчитываться. Если я установил HttpClient Timeout на час, тогда я начну получать странные внутренние ошибки сервера.

То, что я начал делать, это установить секундомер в методе QueryAvailability, чтобы узнать, что происходит.

Что происходит, все 1200 элементов в RunData.Demand создаются сразу и все 1200 await client.PostAsJsonAsync методов вызывают. Похоже, что он использует 2 потока для медленной проверки задач, поэтому к концу у меня есть задачи, ожидающие 9 или 10 минут.

Здесь поведение, которое я бы хотел:

Я хотел бы создать 1200 задач, а затем запускать их по 3-4 в то время, когда потоки становятся доступными. Я не хочу сразу поставить в очередь 1200 HTTP-вызовов.

Есть ли хороший способ сделать это?

Ответы

Ответ 1

Как я всегда рекомендую.. вам нужен поток данных TPL (для установки: Install-Package Microsoft.Tpl.Dataflow).

Вы создаете ActionBlock с действием, выполняемым для каждого элемента. Установите MaxDegreeOfParallelism для дросселирования. Начните публикацию с него и дождитесь его завершения:

var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service => 
{
    var availabilityResponse = await client.QueryAvailability(service);
    // ...
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

foreach (var service in RunData.Demand)
{
    block.Post(service);
}

block.Complete();
await block.Completion;

Ответ 2

Вы используете асинхронные HTTP-вызовы, поэтому ограничение количества потоков не поможет (и не будет ParallelOptions.MaxDegreeOfParallelism в Parallel.ForEach, как предлагает один из ответов). Даже один поток может инициировать все запросы и обрабатывать результаты по мере их поступления.

Один из способов решения этой проблемы - использовать поток данных TPL.

Другим приятным решением является разделение источника IEnumerable на разделы и элементы процесса в каждом разделе последовательно, как описано в этом сообщении в блоге:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}

Ответ 3

В то время как библиотека Dataflow отличная, я думаю, что она немного тяжелая, когда не используется состав блоков. Я хотел бы использовать что-то вроде метода расширения ниже.

Кроме того, в отличие от метода Partitioner, это запускает методы async в вызывающем контексте - оговорка заключается в том, что если ваш код не является действительно асинхронным или принимает "быстрый путь", тогда он будет эффективно работать синхронно, поскольку нити не являются явно созданный.

public static async Task RunParallelAsync<T>(this IEnumerable<T> items, Func<T, Task> asyncAction, int maxParallel)
{
    var tasks = new List<Task>();

    foreach (var item in items)
    {
        tasks.Add(asyncAction(item));

        if (tasks.Count < maxParallel)
                continue; 

        var notCompleted = tasks.Where(t => !t.IsCompleted).ToList();

        if (notCompleted.Count >= maxParallel)
            await Task.WhenAny(notCompleted);
    }

    await Task.WhenAll(tasks);
}

Ответ 4

Старый вопрос, но я хотел бы предложить альтернативное облегченное решение, используя класс SemaphoreSlim. Просто ссылку System.Threading.

SemaphoreSlim sem = new SemaphoreSlim(4,4);

foreach (var service in RunData.Demand)
{

    await sem.WaitAsync();
    Task t = Task.Run(async () => 
    {
        var availabilityResponse = await client.QueryAvailability(serviceCopy));    
        // do your other stuff here with the result of QueryAvailability
    }
    t.ContinueWith(sem.Release());
}

Семафор действует как механизм блокировки. Вы можете только ввести семафор, вызвав Wait (WaitAsync), который вычитает один из счетчика. Вызов release добавляет один к счету.