Ответ 1
EDIT: Оказывается, я был очень неправ. TransformBlock
возвращает элементы в том же порядке, в каком они вошли, даже если он настроен для parallelism. Из-за этого код в моем исходном ответе полностью бесполезен, и вместо него можно использовать обычный TransformBlock
.
Оригинальный ответ:
Насколько мне известно, только одна конструкция parallelism в .Net поддерживает возврат обработанных элементов в том порядке, в котором они вошли: PLINQ с AsOrdered()
. Но мне кажется, что PLINQ не соответствует тому, что вам нужно.
TPL Dataflow, с другой стороны, подходит хорошо, я думаю, но у него нет блока, который поддерживал бы parallelism и возвращал элементы в порядке в одно и то же время (TransformBlock
поддерживает их оба, но не в то же время). К счастью, блоки потока данных были разработаны с учетом возможности компоновки, поэтому мы можем создать собственный блок, который делает это.
Но сначала мы должны выяснить, как упорядочить результаты. Использование параллельного словаря, как вы и предполагали, наряду с некоторым механизмом синхронизации, безусловно, будет работать. Но я думаю, что есть более простое решение: используйте очередь Task
s. В выходной задаче вы удаляете Task
, дожидаясь ее завершения (асинхронно), и когда это произойдет, вы отправляете ее результат. Нам по-прежнему нужна некоторая синхронизация для случая, когда очередь пуста, но мы можем получить это бесплатно, если мы выберем, какую очередь использовать умнее.
Итак, общая идея такова: мы пишем IPropagatorBlock
с некоторым вводом и некоторым выходом. Самый простой способ создать пользовательский IPropagatorBlock
- создать один блок, обрабатывающий вход, другой блок, который производит результаты, и обрабатывает их как один с помощью DataflowBlock.Encapsulate()
.
Входной блок должен будет обрабатывать входящие элементы в правильном порядке, поэтому там нет распараллеливания. Он создаст новый Task
(фактически, TaskCompletionSource
, чтобы мы могли позже установить результат Task
), добавьте его в очередь, а затем отправьте элемент для обработки вместе с каким-то образом, чтобы установить результат правильного Task
. Поскольку нам не нужно связывать этот блок с чем-либо, мы можем использовать ActionBlock
.
Выходной блок должен будет принимать Task
из очереди, асинхронно ждать их, а затем отправлять их. Но так как у всех блоков есть встроенная в них очередь, а блоки, которые принимают делегаты, имеют асинхронное ожидание, это будет очень просто: new TransformBlock<Task<TOutput>, TOutput>(t => t)
. Этот блок будет работать как в очереди, так и в качестве выходного блока. Из-за этого нам не нужно иметь дело с какой-либо синхронизацией.
Последняя часть головоломки фактически обрабатывает элементы параллельно. Для этого мы можем использовать еще один ActionBlock
, на этот раз с MaxDegreeOfParallelism
. Он будет принимать входные данные, обрабатывать их и задавать результат правильного Task
в очереди.
Совместим, это может выглядеть так:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
После стольких разговоров, я думаю, это довольно небольшой код.
Кажется, вы очень заботитесь о производительности, поэтому вам может потребоваться тонкая настройка этого кода. Например, имеет смысл установить MaxDegreeOfParallelism
блока processor
на что-то вроде Environment.ProcessorCount
, чтобы избежать переподписки. Кроме того, если задержка более важна, чем пропускная способность для вас, имеет смысл установить MaxMessagesPerTask
того же блока на 1 (или другое небольшое число), чтобы при завершении обработки элемента он немедленно отправлялся на выход.
Кроме того, если вы хотите дросселировать входящие элементы, вы можете установить BoundedCapacity
в enqueuer
.