Преимущества использования BufferBlock <T> в сетях потоков данных

Мне было интересно, есть ли преимущества, связанные с использованием BufferBlock, связанного с одним или несколькими ActionBlocks, кроме дросселирования (с использованием BoundedCapacity), вместо того, чтобы просто публиковать непосредственно в ActionBlock (s) (пока дросселирование не требуется).

Ответы

Ответ 1

Если все, что вы хотите сделать, - пересылать элементы из одного блока нескольким другим, вам не нужно BufferBlock.

Но есть, конечно, случаи, когда это полезно. Например, если у вас есть сложная сеть потоков данных, вы можете создать ее из небольших подсетей, каждый из которых создается по своему собственному методу. И для этого вам нужно каким-то образом представить группу блоков. В случае, о котором вы упомянули, возвращение этого единственного BufferBlock (возможно, как ITargetBlock) из метода было бы простым решением.

Другим примером, где BufferBlock было бы полезно, является то, что вы хотите отправить элементы из нескольких исходных блоков в несколько целевых блоков. Если вы использовали BufferBlock в качестве посредника, вам не нужно подключать каждый блок источника к каждому целевому блоку.

Я уверен, что есть много других примеров, где вы могли бы использовать BufferBlock. Конечно, если вы не видите причин использовать его в своем случае, то не делайте этого.

Ответ 2

Чтобы добавить в svick ответ, есть еще одно преимущество буферных блоков. Если у вас есть блок с несколькими выходными ссылками и вы хотите балансировать между ними, вы должны превратить выходные блоки в не жадные и добавить буферный блок для обработки очередей. Я нашел следующий пример полезным:

Цитируется по ссылке, которая сейчас мертва:

Вот что мы планируем сделать:

  • Некоторые блоки кода отправляют данные в BufferBlock, используя метод Post (T t).
  • Этот BufferBlock связан с 3 экземплярами ActionBlock с помощью метода LinkTo t) BufferBlock.

Обратите внимание, что BufferBlock не передает копии входных данных во все целевые блоки, с которыми он связан. Вместо этого он делает это только для одного целевого блока. Здесь мы ожидаем, что когда одна цель занята обработкой запроса. Она будет передана. перейдем к другой цели. Теперь давайте обратимся к приведенному ниже коду:

static void Main(string[] args)
{
    BufferBlock<int> bb = new BufferBlock<int>();

    ActionBlock<int> a1 = new ActionBlock<int>(a =>
    {
        Thread.Sleep(100);
        Console.WriteLine("Action A1 executing with value {0}", a);
    });

    ActionBlock<int> a2 = new ActionBlock<int>(a =>
    {
        Thread.Sleep(50);
        Console.WriteLine("Action A2 executing with value {0}", a);
    });

    ActionBlock<int> a3 = new ActionBlock<int>(a =>
    {
        Thread.Sleep(50);
        Console.WriteLine("Action A3 executing with value {0}", a);
    });

    bb.LinkTo(a1);
    bb.LinkTo(a2);
    bb.LinkTo(a3);

    Task t = new Task(() =>
        {
            int i = 0;
            while (i < 10)
            {
                Thread.Sleep(50);
                i++;
                bb.Post(i);
            }
        }
    );

    t.Start();
    Console.Read();
}

При выполнении он выдает следующий вывод:

  • Действие А1 выполняется со значением 1
  • Действие А1 выполняется со значением 2
  • Действие А1 выполняется со значением 3
  • Действие А1 выполняется со значением 4
  • Действие А1 выполняется со значением 5
  • Действие А1 выполняется со значением 6
  • Действие А1 выполняется со значением 7
  • Действие А1 выполняется со значением 8
  • Действие А1 выполняется со значением 9
  • Действие А1 выполняется со значением 10

Это показывает, что только одна цель фактически выполняет все данные, даже когда они заняты (из-за целенаправленного добавления Thread.Sleep(100)). Почему?

Это связано с тем, что по умолчанию все целевые блоки являются жадными по природе и буферизуют входные данные, даже если они не могут обрабатывать данные. Чтобы изменить это поведение, мы установили для свойства Greedy значение false в DataFlowBlockOptions при инициализации ActionBlock, как показано ниже,

static void Main(string[] args)
{
    BufferBlock<int> bb = new BufferBlock<int>();
    ActionBlock<int> a1 = new ActionBlock<int>(a =>
        {
            Thread.Sleep(100);
            Console.WriteLine("Action A1 executing with value {0}", a);
        }
        , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
            maxDegreeOfParallelism: 1, maxMessagesPerTask: 1,
            cancellationToken: CancellationToken.None,
            //Not Greedy
            greedy: false));

    ActionBlock<int> a2 = new ActionBlock<int>(a =>
        {
            Thread.Sleep(50);
            Console.WriteLine("Action A2 executing with value {0}", a);
        }
        , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
            maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
            cancellationToken: CancellationToken.None,
            greedy: false));
    ActionBlock<int> a3 = new ActionBlock<int>(a =>
        {
            Thread.Sleep(50);
            Console.WriteLine("Action A3 executing with value {0}", a);
        }
        , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
            maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
            cancellationToken: CancellationToken.None,
            greedy: false));

    bb.LinkTo(a1);
    bb.LinkTo(a2);
    bb.LinkTo(a3);

    Task t = new Task(() =>
    {
        int i = 0;
        while (i < 10)
        {
            Thread.Sleep(50);
            i++;
            bb.Post(i);
        }
    });

    t.Start();
    Console.Read();
}

Выход этой программы:

  • Действие А1 выполняется со значением 1
  • Действие А2 выполняется со значением 3
  • Действие А1 выполняется со значением 2
  • Действие А3 выполняется со значением 6
  • Действие А3 выполняется со значением 7
  • Действие A3 выполняется со значением 8
  • Действие А2 выполняется со значением 5
  • Действие А3 выполняется со значением 9
  • Действие А1 выполняется со значением 4
  • Действие А2 выполняется со значением 10

Это явно распределение данных по трем ActionBlock (s), как и ожидалось.

Ответ 3

Нет, второй пример не будет компилироваться по ряду причин: для блока группировки "группировать" поток данных можно установить greedy = false, а не для блока исполнения; и затем он должен быть установлен через GroupingDataflowBlockOptions - not DataflowBlockOptions; а затем он устанавливается как значение свойства "{Greedy = false}", а не параметр конструктора.

Если вы хотите дросселировать емкость блока действий, сделайте это, установив значение свойства BoundedCapacity для DataflowBlockOptions (хотя, как указано OP, они уже знают об этой опции). Вот так:

var a1 = new ActionBlock<int>(
            i => doSomeWork(i), 
            new ExecutionDataflowBlockOptions {BoundedCapacity = 1}
        );