TPL Data Flow, завершение гарантии только после завершения всех блоков данных источника
Как я могу переписать код, который код завершает, когда BOTH transformblocks завершен? Я думал, что завершение означает, что он отмечен как полный И "очередь в очереди" пуста?
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
Я редактировал код, добавляя количество входных буферов для каждого блока преобразования. Очевидно, что все 100 элементов передаются в каждый из блоков преобразования. Но как только один из трансформационных блоков заканчивается, процессорный блок не принимает больше элементов, и вместо этого входной буфер неполного трансформационного блока просто очищает входной буфер.
Ответы
Ответ 1
Вопрос в том, что сказал casperOne в своем ответе. Как только первый блок преобразования завершается, процессорный блок переходит в "режим завершения": он будет обрабатывать оставшиеся элементы в своей очереди ввода, но он не примет никаких новых элементов.
Существует более простое решение, чем разделение блока процессора на два: не устанавливайте PropagateCompletion
, а вместо этого устанавливайте завершение блока процессора вручную, когда оба блока преобразования завершены:
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
.ContinueWith(_ => processorBlock.Complete());
Ответ 2
Проблема заключается в том, что вы устанавливаете свойство PropagateCompletion
каждый раз, когда вы вызываете метод LinkTo
связывать блоки и разные времена ожидания в блоках преобразования.
Из документации для метода Complete
в интерфейсе IDataflowBlock
(выделено мной)
Сигналы к IDataflowBlock, которые он не должен принимать, и не создают никаких сообщений и не тратят больше отложенных сообщений.
Поскольку вы разыгрываете время ожидания в каждом из экземпляров TransformBlock<TInput, TOutput>
, transformBlock2
(ожидание 20 мс) заканчивается до transformBlock1
(ожидание в течение 50 мс). transformBlock2
завершает сначала, а затем посылает сигнал на processorBlock
, который затем говорит: "Я ничего не принимаю" (и transformBlock1
пока не выпустил все свои сообщения).
Обратите внимание, что обработка transformBlock1
до transformBlock1
не гарантируется абсолютно; возможно, что пул потоков (при условии, что вы используете планировщик по умолчанию) будет обрабатывать задачи в другом порядке (но, скорее всего, не будет, поскольку он будет красть работу из очередей после выполнения 20 мс элементов).
Конвейер выглядит следующим образом:
broadcastBlock
/ \
transformBlock1 transformBlock2
\ /
processorBlock
Чтобы обойти это, вы хотите иметь конвейер, который выглядит так:
broadcastBlock
/ \
transformBlock1 transformBlock2
| |
processorBlock1 processorBlock2
Это выполняется путем создания двух отдельных ActionBlock<TInput>
экземпляров:
// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);
// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);
// Linking
broadCastBlock.LinkTo(transformBlock1,
new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2,
new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1,
new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2,
new DataflowLinkOptions { PropagateCompletion = true });
Затем вам нужно подождать оба процессорных блока вместо одного:
Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();
Очень важное замечание здесь; при создании ActionBlock<TInput>
по умолчанию используется свойство MaxDegreeOfParallelism
на ExecutionDataflowBlockOptions
экземпляр, переданный ему, установлен на один.
Это означает, что вызовы делегата Action<T>
, которые вы передаете в ActionBlock<TInput>
, являются потокобезопасными, только один будет выполняться одновременно.
Поскольку у вас теперь есть два экземпляра ActionBlock<TInput>
, указывающие на один и тот же делегат Action<T>
, вам не гарантируется безопасность потоков.
Если ваш метод является потокобезопасным, вам не нужно ничего делать (что позволит вам установить свойство MaxDegreeOfParallelism
в DataflowBlockOptions.Unbounded
, так как нет причин блокировать).
Если это не потокобезопасно, и вам нужно это гарантировать, вам нужно прибегнуть к традиционным примитивам синхронизации, например, к lock
.
В этом случае вы сделали бы это так (хотя это явно не нужно, как метод WriteLine
на Console
класс является потокобезопасным):
// The lock.
var l = new object();
// The action, can be a method, makes it easier to share.
Action<string> a = i => {
// Ensure one call at a time.
lock (l) Console.WriteLine(i);
};
// And so on...
Ответ 3
В дополнение к ответу svick: чтобы быть совместимым с поведением, которое вы получаете с опцией PropagateCompletion, вам также необходимо пересылать исключения в случае, если предыдущий блок был поврежден. Этот метод продолжения, как и следующий, также заботится об этом:
public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
if (target == null) return;
if (sources.Length == 0) { target.Complete(); return; }
Task.Factory.ContinueWhenAll(
sources.Select(b => b.Completion).ToArray(),
tasks => {
var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
if (exceptions.Count != 0) {
target.Fault(new AggregateException(exceptions));
} else {
target.Complete();
}
}
);
}
Ответ 4
В других ответах достаточно ясно, почему PropagateCompletion = true mess things up, когда блок имеет более двух источников.
Чтобы обеспечить простое решение проблемы, вы можете захотеть взглянуть на библиотеку с открытым исходным кодом DataflowEx, которая решает эту проблему с более разумными правилами завершения. (Он использует привязку потока данных TPL внутри, но поддерживает сложное распространение завершения. Реализация похожа на WhenAll, но также обрабатывает добавление динамической ссылки. Проверьте Dataflow.RegisterDependency() и TaskEx.AwaitableWhenAll() для подробностей.)
Я немного изменил свой код, чтобы все работало с помощью DataflowEx:
public CompletionDemo1()
{
broadCaster = new BroadcastBlock<int>(
i =>
{
return i;
}).ToDataflow();
transformBlock1 = new TransformBlock<int, string>(
i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(
i =>
{
Console.WriteLine("2 input count: " + transformBlock2.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processor = new ActionBlock<string>(
i =>
{
Console.WriteLine(i);
}).ToDataflow();
/** rather than TPL linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
**/
//Use DataflowEx linking
var transform1 = transformBlock1.ToDataflow();
var transform2 = transformBlock2.ToDataflow();
broadCaster.LinkTo(transform1);
broadCaster.LinkTo(transform2);
transform1.LinkTo(processor);
transform2.LinkTo(processor);
}
Полный код здесь.
Отказ от ответственности: я являюсь автором DataflowEx, который публикуется под лицензией MIT.