Есть ли лучший способ подождать очереди в очереди?
Есть ли лучший способ подождать очереди в очереди перед выполнением другого процесса?
В настоящее время я делаю:
this.workerLocker = new object(); // Global variable
this.RunningWorkers = arrayStrings.Length; // Global variable
// Initiate process
foreach (string someString in arrayStrings)
{
ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
Thread.Sleep(100);
}
// Waiting execution for all queued threads
lock (this.workerLocker) // Global variable (object)
{
while (this.RunningWorkers > 0)
{
Monitor.Wait(this.workerLocker);
}
}
// Do anything else
Console.WriteLine("END");
// Method DoSomething() definition
public void DoSomething(object data)
{
// Do a slow process...
.
.
.
lock (this.workerLocker)
{
this.RunningWorkers--;
Monitor.Pulse(this.workerLocker);
}
}
Ответы
Ответ 1
Вероятно, вы хотите взглянуть на AutoResetEvent и ManualResetEvent.
Они предназначены именно для этой ситуации (ожидая окончания потока ThreadPool, прежде чем делать что-то).
Вы сделали бы что-то вроде этого:
static void Main(string[] args)
{
List<ManualResetEvent> resetEvents = new List<ManualResetEvent>();
foreach (var x in Enumerable.Range(1, WORKER_COUNT))
{
ManualResetEvent resetEvent = new ManualResetEvent();
ThreadPool.QueueUserWorkItem(DoSomething, resetEvent);
resetEvents.Add(resetEvent);
}
// wait for all ManualResetEvents
WaitHandle.WaitAll(resetEvents.ToArray()); // You probably want to use an array instead of a List, a list was just easier for the example :-)
}
public static void DoSomething(object data)
{
ManualResetEvent resetEvent = data as ManualResetEvent;
// Do something
resetEvent.Set();
}
Изменить: забыл упомянуть, что вы можете подождать один поток, любой поток и так далее.
Кроме того, в зависимости от вашей ситуации, AutoResetEvent может немного упростить ситуацию, поскольку он (как следует из названия) может сигнализировать события автоматически: -)
Ответ 2
Как насчет Fork
и Join
, которые используют только Monitor
;-p
Forker p = new Forker();
foreach (var obj in collection)
{
var tmp = obj;
p.Fork(delegate { DoSomeWork(tmp); });
}
p.Join();
Полный код, указанный в этом более раннем ответе.
Или для очереди производителей/потребителей с ограниченным размером (потокобезопасным и т.д.), здесь.
Ответ 3
В дополнение к Barrier, указанному Хенком Холтерманом (BTW его очень плохое использование Barrier, см. мой комментарий к его ответу),.NET 4.0 предоставляет целую кучу других параметров (чтобы использовать их в .NET 3.5, вы необходимо загрузить дополнительную DLL из Microsoft). Я написал сообщение в котором перечислены все их, но мой любимый, безусловно, Parallel.ForEach:
Parallel.ForEach<string>(arrayStrings, someString =>
{
DoSomething(someString);
});
За кулисами Parallel.ForEach ставит очередь на новый и улучшенный пул потоков и ждет, пока не будут выполнены все потоки.
Ответ 4
Мне действительно нравится Begin-End-Async Pattern, когда мне нужно дождаться завершения задач.
Я бы посоветовал вам обернуть BeginEnd в рабочий класс:
public class StringWorker
{
private string m_someString;
private IAsyncResult m_result;
private Action DoSomethingDelegate;
public StringWorker(string someString)
{
DoSomethingDelegate = DoSomething;
}
private void DoSomething()
{
throw new NotImplementedException();
}
public IAsyncResult BeginDoSomething()
{
if (m_result != null) { throw new InvalidOperationException(); }
m_result = DoSomethingDelegate.BeginInvoke(null, null);
return m_result;
}
public void EndDoSomething()
{
DoSomethingDelegate.EndInvoke(m_result);
}
}
Для начала и работы используйте этот фрагмент кода:
List<StringWorker> workers = new List<StringWorker>();
foreach (var someString in arrayStrings)
{
StringWorker worker = new StringWorker(someString);
worker.BeginDoSomething();
workers.Add(worker);
}
foreach (var worker in workers)
{
worker.EndDoSomething();
}
Console.WriteLine("END");
И что это.
Sidenote: если вы хотите вернуть результат с BeginEnd, измените "Action" на Func и измените EndDoSomething, чтобы вернуть тип.
public class StringWorker
{
private string m_someString;
private IAsyncResult m_result;
private Func<string> DoSomethingDelegate;
public StringWorker(string someString)
{
DoSomethingDelegate = DoSomething;
}
private string DoSomething()
{
throw new NotImplementedException();
}
public IAsyncResult BeginDoSomething()
{
if (m_result != null) { throw new InvalidOperationException(); }
m_result = DoSomethingDelegate.BeginInvoke(null, null);
return m_result;
}
public string EndDoSomething()
{
return DoSomethingDelegate.EndInvoke(m_result);
}
}
Ответ 5
В .NET 4.0 для этого есть новый класс, Barrier.
Кроме того, ваш метод не так уж плох, и вы можете немного оптимизировать только с помощью Pulsing, если RunningWorkers равно 0 после декремента. Это будет выглядеть так:
this.workerLocker = new object(); // Global variable
this.RunningWorkers = arrayStrings.Length; // Global variable
foreach (string someString in arrayStrings)
{
ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
//Thread.Sleep(100);
}
// Waiting execution for all queued threads
Monitor.Wait(this.workerLocker);
// Method DoSomething() definition
public void DoSomething(object data)
{
// Do a slow process...
// ...
lock (this.workerLocker)
{
this.RunningWorkers--;
if (this.RunningWorkers == 0)
Monitor.Pulse(this.workerLocker);
}
}
Вы можете использовать EventWaithandle или AutoResetEvent, но все они обернуты API Win32. Поскольку класс Monitor является чистым управляемым кодом, я предпочел бы монитор для этой ситуации.
Ответ 6
Да, есть.
Предлагаемый подход
1) счетчик и дескриптор ожидания
int ActiveCount = 1; // 1 (!) is important
EventWaitHandle ewhAllDone = new EventWaitHandle(false, ResetMode.Manual);
2) добавление цикла
foreach (string someString in arrayStrings)
{
Interlocked.Increment(ref ActiveCount);
ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
// Thread.Sleep(100); // you really need this sleep ?
}
PostActionCheck();
ewhAllDone.Wait();
3) DoSomething
должен выглядеть как
{
try
{
// some long executing code
}
finally
{
// ....
PostActionCheck();
}
}
4), где PostActionCheck
-
void PostActionCheck()
{
if (Interlocked.Decrement(ref ActiveCount) == 0)
ewhAllDone.Set();
}
Идея
ActiveCount инициализируется с помощью 1
, а затем увеличивается с шагом n
.
PostActionCheck
называется n + 1
раз. Последний инициирует событие.
Преимущество этого решения заключается в том, что используется один объект ядра (который является событием) и 2 * n + 1
вызовы легкого API. (Может ли быть меньше?)
P.S.
Я написал код здесь, я мог бы ошибочно написать некоторые имена классов.
Ответ 7
Я не уверен, что на самом деле я недавно сделал что-то похожее для сканирования каждого IP-адреса подсети для приема определенного порта.
Несколько вещей, которые я могу предложить, могут повысить производительность:
-
Используйте метод SetMaxThreads ThreadPool для настройки производительности (т.е. балансировки с одновременным запуском множества потоков, против времени блокировки на таком большом количестве потоков).
-
Не спать при настройке всех рабочих элементов, нет никакой реальной необходимости (о чем я сразу знаю. Однако не спите внутри метода DoSomething, возможно, только на миллисекунду, чтобы другие потоки могли прыгайте туда, если нужно.
Я уверен, что вы можете реализовать более индивидуальный метод самостоятельно, но я сомневаюсь, что это было бы более эффективно, чем использование ThreadPool.
P.S Я не на 100% не понимаю причину использования монитора, так как вы все равно блокируете? Обратите внимание, что вопрос задается только потому, что я раньше не использовал класс Monitor, а не потому, что я действительно сомневаюсь в его использовании.
Ответ 8
Используйте Spring Threading. Он имеет встроенные барьерные реализации.
http://www.springsource.org/extensions/se-threading-net