Почему итерация над GetConsumingEnumerable() не полностью опустошает базовую блокирующую коллекцию
У меня есть количественная и повторяемая проблема с использованием параллельной библиотеки задач BlockingCollection<T>
, ConcurrentQueue<T>
и GetConsumingEnumerable
при попытке создать простой конвейер.
Вкратце, добавление записей в значение по умолчанию BlockingCollection<T>
(которое под капотом опирается на ConcurrentQueue<T>
) из одного потока, не гарантирует, что они будут удалены из BlockingCollection<T>
из другого потока, вызывающего GetConsumingEnumerable()
Метод.
Я создал очень простое приложение Winforms для воспроизведения/имитации этого, которое просто печатает целые числа на экране.
-
Timer1
отвечает за очередность рабочих элементов... Он использует параллельный словарь под названием _tracker
, чтобы он знал, что он уже добавил в блокирующую коллекцию.
-
Timer2
просто регистрирует состояние счета как BlockingCollection
, так и _tracker
- Кнопка START запускает
Paralell.ForEach
, которая просто выполняет итерации по блокирующим коллекциям GetConsumingEnumerable()
и начинает печатать их во втором списке.
- Кнопка STOP останавливает
Timer1
, предотвращая добавление дополнительных записей в коллекцию блокировки.
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
Здесь последовательность событий:
- Нажмите "Пуск"
- Timer1 ticks и ListBox1 сразу обновляются 3 сообщениями (добавление 0, 1, 2)
- ListBox2 затем обновляется с 3 сообщениями, на 1 секунду
- Обработка 0
- Обработка 1
- Обработка 2
- Timer1 ticks и ListBox1 сразу обновляются 3 сообщениями (добавление 3, 4, 5)
- ListBox2 обновляется с 2 сообщениями, 1 секунда
- Обработка 3
- Обработка 4
- Обработка 5 не печатается... казалось бы, пропала без вести.
- Нажмите STOP, чтобы предотвратить добавление дополнительных сообщений по таймеру 1
- Подождите... "Обработка 5" все еще не отображается
![Missing Entry]()
Вы можете видеть, что параллельный словарь по-прежнему отслеживает, что 1 элемент еще не обработан и впоследствии удален из _tracker
Если я снова нажму "Старт", тогда таймер 1 начнет добавлять еще 3 записи, а цикл "Параллельный" вернется к пожизненной печати 5, 6, 7 и 8.
![Entry returned after subsequent items shoved in behind it]()
У меня полная потеря, почему это происходит. Очевидно, что при вызове start явно вызывает newtask, который вызывает Paralell foreach и повторно выполняет GetConsumingEnumerable(), который волшебным образом обнаруживает недостающую запись... I
Почему BlockingCollection.GetConsumingEnumerable()
не гарантирует итерацию по каждому элементу, добавленному в коллекцию.
Почему добавление большего количества записей впоследствии приводит к тому, что он "отклеивается" и продолжает обрабатывать?
Ответы
Ответ 1
Вы не можете использовать GetConsumingEnumerable()
в Parallel.ForEach()
.
Используйте GetConsumingPartitioner
из дополнительных возможностей TPL
В сообщении в блоге вы также получите объяснение, почему нельзя использовать GetConsumingEnumerable()
Алгоритм разбиения, используемый по умолчанию как Parallel.ForEach, так и PLINQ, использует chunking, чтобы минимизировать затраты на синхронизацию: вместо того, чтобы блокировать один раз для элемента, он будет блокировать, захватить группу элементов (кусок), а затем отпустите блокировку.
то есть. Parallel.ForEach до тех пор, пока не получит группу рабочих элементов, прежде чем продолжить. Именно то, что показывает ваш эксперимент.
Ответ 2
Начиная с .net 4.5, вы можете создать разделитель, который будет принимать только по одному элементу за раз:
var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff}
https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs.110).aspx
Ответ 3
Я не мог воспроизвести ваше поведение с помощью простого консольного приложения, делающего в основном то же самое (работа на бета-версии .Net 4.5, которая может изменить ситуацию). Но я думаю, причина в том, что Parallel.ForEach()
пытается оптимизировать выполнение, разбивая входную коллекцию на куски. И с вашим перечислимым, кусок не может быть создан, пока вы не добавите больше предметов в коллекцию. Для получения дополнительной информации см. Пользовательские разделители для PLINQ и TPL в MSDN.
Чтобы исправить это, не используйте Parallel.ForEach()
. Если вы все еще хотите обрабатывать элементы параллельно, вы можете запустить Task
на каждой итерации.
Ответ 4
Мне кажется, что я должен только для ясности отметить, что в тех случаях, когда вы можете вызвать метод BlockingCollection.CompleteAdding() до выполнения Parallel.foreach, проблема, описанная выше, не будет проблемой. Я использовал эти два объекта много раз с отличными результатами.
Кроме того, вы всегда можете повторно установить свой BlockingCollection после вызова функции CompleteAdding(), чтобы добавить дополнительные элементы при необходимости (_entries = new BlockingCollection();)
Изменение кода события клика выше, как описано ниже, позволит решить вашу проблему с отсутствующей записью и заставить ее работать должным образом, если вы нажимаете кнопки запуска и остановки несколько раз:
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
>>>>_entries.CompleteAdding();
>>>>_entries = new BlockingCollection<int>();
}