Вложенность в Parallel.ForEach
В приложении metro мне нужно выполнить несколько вызовов WCF. Существует значительное количество вызовов, поэтому мне нужно сделать их в параллельном цикле. Проблема в том, что параллельный цикл завершается до того, как вызовы WCF завершены.
Как бы вы реорганизовали это для работы, как ожидалось?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Ответы
Ответ 1
Вся идея Parallel.ForEach()
заключается в том, что у вас есть набор потоков, и каждый поток обрабатывает часть коллекции. Как вы заметили, это не работает с async
- await
, где вы хотите выпустить поток на время вызова async.
Вы можете "исправить" это, заблокировав потоки ForEach()
, но это победит всю точку async
- await
.
Что вы можете сделать, это использовать поток данных TPL вместо Parallel.ForEach()
, который хорошо поддерживает асинхронный Task
.
В частности, ваш код можно записать с помощью TransformBlock
, который преобразует каждый идентификатор в Customer
с помощью async
lambda. Этот блок может быть настроен для выполнения параллельно. Вы связали бы этот блок с ActionBlock
, который записывает каждый Customer
на консоль.
После настройки блочной сети вы можете Post()
каждый идентификатор TransformBlock
.
В коде:
var ids = new List <string> {1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
var getCustomerBlock = new TransformBlock < string, Customer > ( async я = > { ICustomerRepo repo = new CustomerRepo(); return wait repo.GetCustomer(i); }, новый ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var writeCustomerBlock = new ActionBlock <Customer> (c = > Console.WriteLine(c.ID));
getCustomerBlock.LinkTo( writeCustomerBlock, новый DataflowLinkOptions { PropagateCompletion = true });
foreach (var id в идентификаторах) getCustomerBlock.Post(ID);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Код>
Хотя вы, вероятно, хотите ограничить parallelism TransformBlock
некоторой небольшой константой. Кроме того, вы можете ограничить емкость TransformBlock
и добавить элементы к ней асинхронно, используя SendAsync()
, например, если коллекция слишком большая.
В качестве дополнительного преимущества по сравнению с вашим кодом (если он работает) заключается в том, что запись начнется сразу после завершения одного элемента и не дождитесь завершения всей обработки.
Ответ 2
ответ svick (как обычно) отлично.
Тем не менее, я считаю, что Dataflow будет более полезным, когда у вас действительно есть большие объемы данных для передачи. Или, когда вам нужна async
-сложная очередь.
В вашем случае более простым решением является просто использовать async
-style parallelism:
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
{
ICustomerRepo repo = new CustomerRepo();
return repo.GetCustomer(i);
});
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Ответ 3
Использование DataFlow как предложенного svick может быть излишним, и ответ Стивена не предоставляет средства для управления concurrency операции. Однако это можно сделать довольно просто:
public static async Task RunWithMaxDegreeOfConcurrency<T>(
int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
//observe exceptions here
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
Вызовы ToArray()
можно оптимизировать, используя массив вместо списка и заменяя завершенные задачи, но я сомневаюсь, что это сильно повлияет на большинство сценариев. Использование примера на вопрос OP:
RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
РЕДАКТИРОВАТЬ Пользователь SO и TPL wiz Eli Arbel указал мне на статьи из Стивена Тууба. Как обычно, его реализация элегантна и эффективна:
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
Ответ 4
Вы можете сэкономить усилия с помощью нового пакета AsyncEnumerator NuGet, которого не было 4 года назад, когда вопрос был изначально опубликован. Он позволяет вам контролировать степень parallelism:
, используя System.Collections.Async;
...
ожидание ids.ParallelForEachAsync(async я = >
{ ICustomerRepo repo = new CustomerRepo(); var cust = ожидание repo.GetCustomer(i); customers.Add(касты);
},
maxDegreeOfParallelism: 10);
Код>
Отказ от ответственности: я являюсь автором библиотеки AsyncEnumerator, которая является открытым исходным кодом и лицензирована в рамках MIT, и я публикую это сообщение, чтобы помочь сообществу.
Ответ 5
Оберните Parallel.Foreach
в Task.Run()
а вместо ключевого слова await
используйте [yourasyncmethod].Result
(вам нужно сделать Task.Run, чтобы не блокировать поток пользовательского интерфейса)
Что-то вроде этого:
var yourForeachTask = Task.Run(() =>
{
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = repo.GetCustomer(i).Result;
customers.Add(cust);
});
});
await yourForeachTask;
Ответ 6
Это должно быть довольно эффективно и проще, чем работать с полным потоком данных TPL:
var customers = await ids.SelectAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
});
...
public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
var results = new List<TResult>();
var activeTasks = new HashSet<Task<TResult>>();
foreach (var item in source)
{
activeTasks.Add(selector(item));
if (activeTasks.Count >= maxDegreesOfParallelism)
{
var completed = await Task.WhenAny(activeTasks);
activeTasks.Remove(completed);
results.Add(completed.Result);
}
}
results.AddRange(await Task.WhenAll(activeTasks));
return results;
}
Ответ 7
После введения кучи вспомогательных методов вы сможете запускать параллельные запросы с помощью этого простого sintax:
const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
.Split(DegreeOfParallelism)
.SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
.ConfigureAwait(false);
Что происходит, мы разделим исходную коллекцию на 10 фрагментов (.Split(DegreeOfParallelism)
), затем запустим 10 задач, каждый из которых обрабатывает свои элементы по очереди (.SelectManyAsync(...)
) и объединяет их обратно в один список.
Стоит упомянуть, что существует более простой подход:
double[] result2 = await Enumerable.Range(0, 1000000)
.Select(async i => await CalculateAsync(i).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
Но для этого требуется предосторожность: если у вас слишком большой исходный набор, он сразу отправит Task
для каждого элемента, что может привести к значительным ударам производительности.
Методы расширения, используемые в приведенных выше примерах, выглядят следующим образом:
public static class CollectionExtensions
{
/// <summary>
/// Splits collection into number of collections of nearly equal size.
/// </summary>
public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
{
if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));
List<T> source = src.ToList();
var sourceIndex = 0;
for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
{
var list = new List<T>();
int itemsLeft = source.Count - targetIndex;
while (slicesCount * list.Count < itemsLeft)
{
list.Add(source[sourceIndex++]);
}
yield return list;
}
}
/// <summary>
/// Takes collection of collections, projects those in parallel and merges results.
/// </summary>
public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
this IEnumerable<IEnumerable<T>> source,
Func<T, Task<TResult>> func)
{
List<TResult>[] slices = await source
.Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
return slices.SelectMany(s => s);
}
/// <summary>Runs selector and awaits results.</summary>
public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
{
List<TResult> result = new List<TResult>();
foreach (TSource source1 in source)
{
TResult result1 = await selector(source1).ConfigureAwait(false);
result.Add(result1);
}
return result;
}
/// <summary>Wraps tasks with Task.WhenAll.</summary>
public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
{
return Task.WhenAll<TResult>(source);
}
}
Ответ 8
Я немного опаздываю на вечеринку, но вы можете захотеть использовать GetAwaiter.GetResult() для запуска вашего асинхронного кода в контексте синхронизации, но, как указано ниже,
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
// Run this in thread which Parallel library occupied.
var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
customers.Add(cust);
});
Ответ 9
Метод расширения для этого, который использует SemaphoreSlim, а также позволяет установить максимальную степень параллелизма
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Пример использования:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);