Как выполнить параллельные задачи в .NET 4.5
Я хотел бы использовать .NET iterator с параллельными задачами/ждут?. Что-то вроде этого:
IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
Parallel.ForEach(
source,
s=>
{
// Ordering is NOT important
// items can be yielded as soon as they are done
yield return ExecuteOrDownloadSomething(s);
}
}
К сожалению,.NET не может справиться с этим. Лучший ответ до сих пор от @svick - используйте AsParallel().
BONUS: Любой простой код async/await, который реализует несколько издателей и одного абонента? Абонент уступит, и пабы будут обрабатываться. (только для основных библиотек)
Ответы
Ответ 1
Это похоже на работу для PLINQ:
return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s));
Это будет выполнять делегат параллельно, используя ограниченное количество потоков, возвращая каждый результат сразу после его завершения.
Если метод ExecuteOrDownloadSomething()
привязан к IO (например, он фактически загружает что-то), и вы не хотите тратить потоки, то использование async
- await
может иметь смысл, но это будет сложнее.
Если вы хотите полностью воспользоваться преимуществами async
, вы не должны возвращать IEnumerable
, потому что он синхронный (т.е. блокирует, если элементы не доступны). Вам нужна какая-то асинхронная коллекция, и вы можете использовать ISourceBlock
(в частности, TransformBlock
) из потока данных TPL для этого:
ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
var block = new TransformBlock<TSrc, TDest>(
async s => await ExecuteOrDownloadSomethingAsync(s),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
foreach (var item in source)
block.Post(item);
block.Complete();
return block;
}
Если источник "медленный" (т.е. вы хотите начать обработку результатов из Foo()
до завершения итерации source
), вам может потребоваться переместить вызов foreach
и Complete()
в отдельный Task
. Еще лучше было бы сделать source
в ISourceBlock<TSrc>
тоже.
Ответ 2
Итак, кажется, что вы действительно хотите сделать, это упорядочить последовательность задач на основе их завершения. Это не очень сложно:
public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks)
{
var input = tasks.ToList();
var output = input.Select(task => new TaskCompletionSource<T>());
var collection = new BlockingCollection<TaskCompletionSource<T>>();
foreach (var tcs in output)
collection.Add(tcs);
foreach (var task in input)
{
task.ContinueWith(t =>
{
var tcs = collection.Take();
switch (task.Status)
{
case TaskStatus.Canceled:
tcs.TrySetCanceled();
break;
case TaskStatus.Faulted:
tcs.TrySetException(task.Exception.InnerExceptions);
break;
case TaskStatus.RanToCompletion:
tcs.TrySetResult(task.Result);
break;
}
}
, CancellationToken.None
, TaskContinuationOptions.ExecuteSynchronously
, TaskScheduler.Default);
}
return output.Select(tcs => tcs.Task);
}
Итак, здесь мы создаем TaskCompletionSource
для каждой задачи ввода, затем просматриваем каждую из задач и устанавливаем продолжение, которое захватывает следующий источник завершения из BlockingCollection
и задает его результат. Первая выполненная задача захватывает первые tcs, которые были возвращены, вторая завершенная задача получает второй tcs, который был возвращен, и так далее.
Теперь ваш код становится довольно простым:
var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item))
.Order();
foreach(var task in tasks)
{
var result = task.Result;//or you could `await` each result
//....
}
Ответ 3
В асинхронной библиотеке, созданной командой робототехники MS, у них были примитивы concurrency, которые позволяли использовать итератор для получения асинхронного кода.
Библиотека (CCR) бесплатна (она не использовалась, чтобы быть бесплатной). Хорошую вводную статью можно найти здесь: Параллельные вопросы
Возможно, вы можете использовать эту библиотеку вместе с библиотекой заданий .Net, или она заставит вас "сворачивать свои собственные"