Как (и если) писать однопользовательскую очередь с помощью TPL?
Недавно я слышал кучу подкастов о TPL в .NET 4.0. Большинство из них описывают фоновые действия, такие как загрузка изображений или выполнение вычислений, использование задач, чтобы работа не мешала потоку графического интерфейса.
Большая часть кода, над которым я работаю, имеет больше разновидностей нескольких производителей/одного потребителя, где рабочие элементы из нескольких источников должны быть поставлены в очередь, а затем обрабатываться по порядку. Одним из примеров является ведение журнала, где строки журнала из нескольких потоков секвенируются в одну очередь для последующей записи в файл или базу данных. Все записи из любого источника должны оставаться в порядке, и записи с одного и того же момента времени должны быть "близки" друг к другу в конечном результате.
Таким образом, несколько потоков или задач или что-то еще вызывают queuer:
lock( _queue ) // or use a lock-free queue!
{
_queue.enqueue( some_work );
_queueSemaphore.Release();
}
И посвященный рабочий поток обрабатывает очередь:
while( _queueSemaphore.WaitOne() )
{
lock( _queue )
{
some_work = _queue.dequeue();
}
deal_with( some_work );
}
Кажется разумным посвятить рабочий поток для потребительской стороны этих задач. Должен ли я писать будущие программы, используя некоторую конструкцию из TPL? Который из? Почему?
Ответы
Ответ 1
Вы можете использовать длинную задачу Task для обработки элементов из BlockingCollection, как предложено Wilka. Вот пример, который в значительной степени соответствует вашим требованиям к приложениям. Вы увидите вывод примерно так:
Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C
Не то, что выходы из A, B, C и D кажутся случайными, поскольку они зависят от времени начала потоков, но B всегда появляется перед B1.
public class LogItem
{
public string Message { get; private set; }
public LogItem (string message)
{
Message = message;
}
}
public void Example()
{
BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();
// Start queue listener...
CancellationTokenSource canceller = new CancellationTokenSource();
Task listener = Task.Factory.StartNew(() =>
{
while (!canceller.Token.IsCancellationRequested)
{
LogItem item;
if (_queue.TryTake(out item))
Console.WriteLine(item.Message);
}
},
canceller.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
// Add some log messages in parallel...
Parallel.Invoke(
() => { _queue.Add(new LogItem("Log from task A")); },
() => {
_queue.Add(new LogItem("Log from task B"));
_queue.Add(new LogItem("Log from task B1"));
},
() => { _queue.Add(new LogItem("Log from task C")); },
() => { _queue.Add(new LogItem("Log from task D")); });
// Pretend to do other things...
Thread.Sleep(1000);
// Shut down the listener...
canceller.Cancel();
listener.Wait();
}
Ответ 2
Я знаю, что этот ответ примерно на год опоздал, но посмотрите MSDN.
который показывает, как создать лимитированный лимитированный калькулятор с помощью класса TaskScheduler. Ограничивая concurrency на одну задачу, это должно затем обрабатывать ваши задачи таким образом, чтобы они были поставлены в очередь через:
LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);
factory.StartNew(()=>
{
// your code
});
Ответ 3
Я не уверен, что TPL является адекватным в вашем случае использования. Насколько я понимаю, основным вариантом использования TPL является разделение одной огромной задачи на несколько меньших задач, которые могут выполняться бок о бок. Например, если у вас есть большой список, и вы хотите применить одно и то же преобразование для каждого элемента. В этом случае у вас может быть несколько задач, применяющих преобразование в подмножестве списка.
Случай, который вы описываете, кажется мне не подходит для меня. В вашем случае у вас нет нескольких задач, которые делают то же самое параллельно. У вас есть несколько различных задач, которые каждый выполняет - это собственная работа (производители) и одна задача, которая потребляет. Возможно, TPL может использоваться для потребительской части, если вы хотите иметь несколько потребителей, потому что в этом случае каждый потребитель выполняет ту же работу (при условии, что вы найдете логику для обеспечения согласованной временной согласованности).
Ну, это, конечно, просто мой личный взгляд на тему
Жить долго и процветать
Ответ 4
Похоже, что BlockingCollection вам будет удобно. Поэтому для вашего кода выше вы можете использовать что-то вроде (предполагая, что _queue
является экземпляром BlockingCollection
):
// for your producers
_queue.Add(some_work);
Назначенный рабочий поток обрабатывает очередь:
foreach (var some_work in _queue.GetConsumingEnumerable())
{
deal_with(some_work);
}
Примечание: когда все ваши продюсеры закончили производство, вам нужно позвонить CompleteAdding()
на _queue
, иначе ваш потребитель будет застревать в ожидании дополнительной работы.