Почему итерация над GetConsumingEnumerable() не полностью опустошает базовую блокирующую коллекцию

У меня есть количественная и повторяемая проблема с использованием параллельной библиотеки задач BlockingCollection<T>, ConcurrentQueue<T> и GetConsumingEnumerable при попытке создать простой конвейер.

Вкратце, добавление записей в значение по умолчанию BlockingCollection<T> (которое под капотом опирается на ConcurrentQueue<T>) из одного потока, не гарантирует, что они будут удалены из BlockingCollection<T> из другого потока, вызывающего GetConsumingEnumerable() Метод.

Я создал очень простое приложение Winforms для воспроизведения/имитации этого, которое просто печатает целые числа на экране.

  • Timer1 отвечает за очередность рабочих элементов... Он использует параллельный словарь под названием _tracker, чтобы он знал, что он уже добавил в блокирующую коллекцию.
  • Timer2 просто регистрирует состояние счета как BlockingCollection, так и _tracker
  • Кнопка START запускает Paralell.ForEach, которая просто выполняет итерации по блокирующим коллекциям GetConsumingEnumerable() и начинает печатать их во втором списке.
  • Кнопка STOP останавливает Timer1, предотвращая добавление дополнительных записей в коллекцию блокировки.
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }

Здесь последовательность событий:

  • Нажмите "Пуск"
  • Timer1 ticks и ListBox1 сразу обновляются 3 сообщениями (добавление 0, 1, 2)
  • ListBox2 затем обновляется с 3 сообщениями, на 1 секунду
    • Обработка 0
    • Обработка 1
    • Обработка 2
  • Timer1 ticks и ListBox1 сразу обновляются 3 сообщениями (добавление 3, 4, 5)
  • ListBox2 обновляется с 2 сообщениями, 1 секунда
    • Обработка 3
    • Обработка 4
    • Обработка 5 не печатается... казалось бы, пропала без вести.
  • Нажмите STOP, чтобы предотвратить добавление дополнительных сообщений по таймеру 1
  • Подождите... "Обработка 5" все еще не отображается

Missing Entry

Вы можете видеть, что параллельный словарь по-прежнему отслеживает, что 1 элемент еще не обработан и впоследствии удален из _tracker

Если я снова нажму "Старт", тогда таймер 1 начнет добавлять еще 3 записи, а цикл "Параллельный" вернется к пожизненной печати 5, 6, 7 и 8.

Entry returned after subsequent items shoved in behind it

У меня полная потеря, почему это происходит. Очевидно, что при вызове start явно вызывает newtask, который вызывает Paralell foreach и повторно выполняет GetConsumingEnumerable(), который волшебным образом обнаруживает недостающую запись... I

Почему BlockingCollection.GetConsumingEnumerable() не гарантирует итерацию по каждому элементу, добавленному в коллекцию.

Почему добавление большего количества записей впоследствии приводит к тому, что он "отклеивается" и продолжает обрабатывать?

Ответы

Ответ 1

Вы не можете использовать GetConsumingEnumerable() в Parallel.ForEach().

Используйте GetConsumingPartitioner из дополнительных возможностей TPL

В сообщении в блоге вы также получите объяснение, почему нельзя использовать GetConsumingEnumerable()

Алгоритм разбиения, используемый по умолчанию как Parallel.ForEach, так и PLINQ, использует chunking, чтобы минимизировать затраты на синхронизацию: вместо того, чтобы блокировать один раз для элемента, он будет блокировать, захватить группу элементов (кусок), а затем отпустите блокировку.

то есть. Parallel.ForEach до тех пор, пока не получит группу рабочих элементов, прежде чем продолжить. Именно то, что показывает ваш эксперимент.

Ответ 2

Начиная с .net 4.5, вы можете создать разделитель, который будет принимать только по одному элементу за раз:

var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff}

https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs.110).aspx

Ответ 3

Я не мог воспроизвести ваше поведение с помощью простого консольного приложения, делающего в основном то же самое (работа на бета-версии .Net 4.5, которая может изменить ситуацию). Но я думаю, причина в том, что Parallel.ForEach() пытается оптимизировать выполнение, разбивая входную коллекцию на куски. И с вашим перечислимым, кусок не может быть создан, пока вы не добавите больше предметов в коллекцию. Для получения дополнительной информации см. Пользовательские разделители для PLINQ и TPL в MSDN.

Чтобы исправить это, не используйте Parallel.ForEach(). Если вы все еще хотите обрабатывать элементы параллельно, вы можете запустить Task на каждой итерации.

Ответ 4

Мне кажется, что я должен только для ясности отметить, что в тех случаях, когда вы можете вызвать метод BlockingCollection.CompleteAdding() до выполнения Parallel.foreach, проблема, описанная выше, не будет проблемой. Я использовал эти два объекта много раз с отличными результатами.

Кроме того, вы всегда можете повторно установить свой BlockingCollection после вызова функции CompleteAdding(), чтобы добавить дополнительные элементы при необходимости (_entries = new BlockingCollection();)

Изменение кода события клика выше, как описано ниже, позволит решить вашу проблему с отсутствующей записью и заставить ее работать должным образом, если вы нажимаете кнопки запуска и остановки несколько раз:

private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
    timer1.Stop();
    timer1.Enabled = false;
>>>>_entries.CompleteAdding();
>>>>_entries = new BlockingCollection<int>();
}