Преимущества использования BufferBlock <T> в сетях потоков данных
Мне было интересно, есть ли преимущества, связанные с использованием BufferBlock, связанного с одним или несколькими ActionBlocks, кроме дросселирования (с использованием BoundedCapacity), вместо того, чтобы просто публиковать непосредственно в ActionBlock (s) (пока дросселирование не требуется).
Ответы
Ответ 1
Если все, что вы хотите сделать, - пересылать элементы из одного блока нескольким другим, вам не нужно BufferBlock
.
Но есть, конечно, случаи, когда это полезно. Например, если у вас есть сложная сеть потоков данных, вы можете создать ее из небольших подсетей, каждый из которых создается по своему собственному методу. И для этого вам нужно каким-то образом представить группу блоков. В случае, о котором вы упомянули, возвращение этого единственного BufferBlock
(возможно, как ITargetBlock
) из метода было бы простым решением.
Другим примером, где BufferBlock
было бы полезно, является то, что вы хотите отправить элементы из нескольких исходных блоков в несколько целевых блоков. Если вы использовали BufferBlock
в качестве посредника, вам не нужно подключать каждый блок источника к каждому целевому блоку.
Я уверен, что есть много других примеров, где вы могли бы использовать BufferBlock
. Конечно, если вы не видите причин использовать его в своем случае, то не делайте этого.
Ответ 2
Чтобы добавить в svick ответ, есть еще одно преимущество буферных блоков. Если у вас есть блок с несколькими выходными ссылками и вы хотите балансировать между ними, вы должны превратить выходные блоки в не жадные и добавить буферный блок для обработки очередей. Я нашел следующий пример полезным:
Цитируется по ссылке, которая сейчас мертва:
Вот что мы планируем сделать:
- Некоторые блоки кода отправляют данные в BufferBlock, используя метод Post (T t).
- Этот BufferBlock связан с 3 экземплярами ActionBlock с помощью метода LinkTo t) BufferBlock.
Обратите внимание, что BufferBlock не передает копии входных данных во все целевые блоки, с которыми он связан. Вместо этого он делает это только для одного целевого блока. Здесь мы ожидаем, что когда одна цель занята обработкой запроса. Она будет передана. перейдем к другой цели. Теперь давайте обратимся к приведенному ниже коду:
static void Main(string[] args)
{
BufferBlock<int> bb = new BufferBlock<int>();
ActionBlock<int> a1 = new ActionBlock<int>(a =>
{
Thread.Sleep(100);
Console.WriteLine("Action A1 executing with value {0}", a);
});
ActionBlock<int> a2 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A2 executing with value {0}", a);
});
ActionBlock<int> a3 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A3 executing with value {0}", a);
});
bb.LinkTo(a1);
bb.LinkTo(a2);
bb.LinkTo(a3);
Task t = new Task(() =>
{
int i = 0;
while (i < 10)
{
Thread.Sleep(50);
i++;
bb.Post(i);
}
}
);
t.Start();
Console.Read();
}
При выполнении он выдает следующий вывод:
- Действие А1 выполняется со значением 1
- Действие А1 выполняется со значением 2
- Действие А1 выполняется со значением 3
- Действие А1 выполняется со значением 4
- Действие А1 выполняется со значением 5
- Действие А1 выполняется со значением 6
- Действие А1 выполняется со значением 7
- Действие А1 выполняется со значением 8
- Действие А1 выполняется со значением 9
- Действие А1 выполняется со значением 10
Это показывает, что только одна цель фактически выполняет все данные, даже когда они заняты (из-за целенаправленного добавления Thread.Sleep(100)). Почему?
Это связано с тем, что по умолчанию все целевые блоки являются жадными по природе и буферизуют входные данные, даже если они не могут обрабатывать данные. Чтобы изменить это поведение, мы установили для свойства Greedy значение false в DataFlowBlockOptions при инициализации ActionBlock, как показано ниже,
static void Main(string[] args)
{
BufferBlock<int> bb = new BufferBlock<int>();
ActionBlock<int> a1 = new ActionBlock<int>(a =>
{
Thread.Sleep(100);
Console.WriteLine("Action A1 executing with value {0}", a);
}
, new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
maxDegreeOfParallelism: 1, maxMessagesPerTask: 1,
cancellationToken: CancellationToken.None,
//Not Greedy
greedy: false));
ActionBlock<int> a2 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A2 executing with value {0}", a);
}
, new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
cancellationToken: CancellationToken.None,
greedy: false));
ActionBlock<int> a3 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A3 executing with value {0}", a);
}
, new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
cancellationToken: CancellationToken.None,
greedy: false));
bb.LinkTo(a1);
bb.LinkTo(a2);
bb.LinkTo(a3);
Task t = new Task(() =>
{
int i = 0;
while (i < 10)
{
Thread.Sleep(50);
i++;
bb.Post(i);
}
});
t.Start();
Console.Read();
}
Выход этой программы:
- Действие А1 выполняется со значением 1
- Действие А2 выполняется со значением 3
- Действие А1 выполняется со значением 2
- Действие А3 выполняется со значением 6
- Действие А3 выполняется со значением 7
- Действие A3 выполняется со значением 8
- Действие А2 выполняется со значением 5
- Действие А3 выполняется со значением 9
- Действие А1 выполняется со значением 4
- Действие А2 выполняется со значением 10
Это явно распределение данных по трем ActionBlock (s), как и ожидалось.
Ответ 3
Нет, второй пример не будет компилироваться по ряду причин: для блока группировки "группировать" поток данных можно установить greedy = false, а не для блока исполнения; и затем он должен быть установлен через GroupingDataflowBlockOptions - not DataflowBlockOptions; а затем он устанавливается как значение свойства "{Greedy = false}", а не параметр конструктора.
Если вы хотите дросселировать емкость блока действий, сделайте это, установив значение свойства BoundedCapacity для DataflowBlockOptions (хотя, как указано OP, они уже знают об этой опции). Вот так:
var a1 = new ActionBlock<int>(
i => doSomeWork(i),
new ExecutionDataflowBlockOptions {BoundedCapacity = 1}
);