С# производитель/потребитель
Недавно я столкнулся с реализацией реализации С# производителя/потребителя. это очень просто и (по крайней мере для меня) очень элегантно.
похоже, он был разработан в 2006 году, поэтому мне было интересно, является ли эта реализация
- безопасный
- все еще применимо
Код ниже (исходный код ссылался на http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)
using System;
using System.Collections;
using System.Threading;
public class Test
{
static ProducerConsumer queue;
static void Main()
{
queue = new ProducerConsumer();
new Thread(new ThreadStart(ConsumerJob)).Start();
Random rng = new Random(0);
for (int i=0; i < 10; i++)
{
Console.WriteLine ("Producing {0}", i);
queue.Produce(i);
Thread.Sleep(rng.Next(1000));
}
}
static void ConsumerJob()
{
// Make sure we get a different random seed from the
// first thread
Random rng = new Random(1);
// We happen to know we've only got 10
// items to receive
for (int i=0; i < 10; i++)
{
object o = queue.Consume();
Console.WriteLine ("\t\t\t\tConsuming {0}", o);
Thread.Sleep(rng.Next(1000));
}
}
}
public class ProducerConsumer
{
readonly object listLock = new object();
Queue queue = new Queue();
public void Produce(object o)
{
lock (listLock)
{
queue.Enqueue(o);
// We always need to pulse, even if the queue wasn't
// empty before. Otherwise, if we add several items
// in quick succession, we may only pulse once, waking
// a single thread up, even if there are multiple threads
// waiting for items.
Monitor.Pulse(listLock);
}
}
public object Consume()
{
lock (listLock)
{
// If the queue is empty, wait for an item to be added
// Note that this is a while loop, as we may be pulsed
// but not wake up before another thread has come in and
// consumed the newly added object. In that case, we'll
// have to wait for another pulse.
while (queue.Count==0)
{
// This releases listLock, only reacquiring it
// after being woken up by a call to Pulse
Monitor.Wait(listLock);
}
return queue.Dequeue();
}
}
}
Ответы
Ответ 1
Код старше этого - я написал его некоторое время до выхода .NET 2.0. Концепция очереди производителя/потребителя намного старше, чем это:)
Да, этот код безопасен, насколько мне известно, но он имеет некоторые недостатки:
- Это не общий. Современная версия, безусловно, будет общей.
- Он не может остановить очередь. Один простой способ остановить очередь (так что все потребительские потоки удаляются) состоит в том, чтобы иметь токен "stop work", который можно поместить в очередь. Затем вы добавляете столько жетонов, сколько у вас есть потоки. Кроме того, у вас есть отдельный флаг, указывающий, что вы хотите остановить. (Это позволяет останавливать другие потоки, прежде чем завершить всю текущую работу в очереди.)
- Если задания очень маленькие, использование одного задания за раз может быть не самым эффективным.
Идеи, лежащие в основе кода, более важны, чем сам код, если честно.
Ответ 2
Вы можете сделать что-то вроде следующего фрагмента кода. Он является общим и имеет метод присвоения нулевых значений (или любого другого флага, который вы хотите использовать), чтобы сообщить рабочим потокам о выходе.
Код взят отсюда: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ConsoleApplication1
{
public class TaskQueue<T> : IDisposable where T : class
{
object locker = new object();
Thread[] workers;
Queue<T> taskQ = new Queue<T>();
public TaskQueue(int workerCount)
{
workers = new Thread[workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(workers[i] = new Thread(Consume)).Start();
}
public void Dispose()
{
// Enqueue one null task per worker to make each exit.
foreach (Thread worker in workers) EnqueueTask(null);
foreach (Thread worker in workers) worker.Join();
}
public void EnqueueTask(T task)
{
lock (locker)
{
taskQ.Enqueue(task);
Monitor.PulseAll(locker);
}
}
void Consume()
{
while (true)
{
T task;
lock (locker)
{
while (taskQ.Count == 0) Monitor.Wait(locker);
task = taskQ.Dequeue();
}
if (task == null) return; // This signals our exit
Console.Write(task);
Thread.Sleep(1000); // Simulate time-consuming task
}
}
}
}
Ответ 3
В тот же день я узнал, как Monitor.Wait/Pulse работает (и много о потоках вообще) из вышеприведенного фрагмента кода и серии статей из. Так как Джон говорит, он имеет большую ценность для него и действительно безопасен и применим.
Однако, с .NET 4, существует реализация очереди производителей-потребителей в рамках. Я только что нашел его сам, но до этого момента он делает все, что мне нужно.
Ответ 4
Предупреждение: Если вы прочтете комментарии, вы поймете, что мой ответ неверен:)
В коде есть возможный тупик.
Представьте себе следующий случай, для ясности я использовал однопоточный подход, но его легко преобразовать в многопоточность со сном:
// We create some actions...
object locker = new object();
Action action1 = () => {
lock (locker)
{
System.Threading.Monitor.Wait(locker);
Console.WriteLine("This is action1");
}
};
Action action2 = () => {
lock (locker)
{
System.Threading.Monitor.Wait(locker);
Console.WriteLine("This is action2");
}
};
// ... (stuff happens, etc.)
// Imagine both actions were running
// and there 0 items in the queue
// And now the producer kicks in...
lock (locker)
{
// This would add a job to the queue
Console.WriteLine("Pulse now!");
System.Threading.Monitor.Pulse(locker);
}
// ... (more stuff)
// and the actions finish now!
Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();
Пожалуйста, дайте мне знать, если это не имеет никакого смысла.
Если это подтвердится, тогда ответ на ваш вопрос: "Нет, это не безопасно";)
Надеюсь, это поможет.