Подождите, пока пустые потоки завершатся
Прошу прощения за излишний вопрос. Тем не менее, я нашел много решений для своей проблемы, но ни один из них не очень хорошо объяснил. Я надеюсь, что здесь будет ясно.
Мой основной поток приложения С# порождает 1..n фоновых работников, использующих ThreadPool. Я хочу, чтобы исходный поток был заблокирован, пока все рабочие не завершили работу. Я изучил ManualResetEvent, в частности, но я не понимаю его использования.
В псевдо:
foreach( var o in collection )
{
queue new worker(o);
}
while( workers not completed ) { continue; }
При необходимости я буду знать количество рабочих, которые должны быть поставлены в очередь перед началом работы.
Ответы
Ответ 1
Попробуйте это. Функция принимает список делегатов Action. Он добавит запись рабочего потока ThreadPool для каждого элемента в списке. Он будет ожидать завершения каждого действия перед возвратом.
public static void SpawnAndWait(IEnumerable<Action> actions)
{
var list = actions.ToList();
var handles = new ManualResetEvent[actions.Count()];
for (var i = 0; i < list.Count; i++)
{
handles[i] = new ManualResetEvent(false);
var currentAction = list[i];
var currentHandle = handles[i];
Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
ThreadPool.QueueUserWorkItem(x => wrappedAction());
}
WaitHandle.WaitAll(handles);
}
Ответ 2
Здесь другой подход - инкапсуляция; поэтому ваш код может быть таким же простым, как:
Forker p = new Forker();
foreach (var obj in collection)
{
var tmp = obj;
p.Fork(delegate { DoSomeWork(tmp); });
}
p.Join();
Если класс Forker
приведен ниже (мне стало скучно в поезде; -p)... опять же, это позволяет избежать объектов ОС, но обертывает вещи довольно аккуратно (IMO):
using System;
using System.Threading;
/// <summary>Event arguments representing the completion of a parallel action.</summary>
public class ParallelEventArgs : EventArgs
{
private readonly object state;
private readonly Exception exception;
internal ParallelEventArgs(object state, Exception exception)
{
this.state = state;
this.exception = exception;
}
/// <summary>The opaque state object that identifies the action (null otherwise).</summary>
public object State { get { return state; } }
/// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>
public Exception Exception { get { return exception; } }
}
/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary>
public sealed class Forker
{
int running;
private readonly object joinLock = new object(), eventLock = new object();
/// <summary>Raised when all operations have completed.</summary>
public event EventHandler AllComplete
{
add { lock (eventLock) { allComplete += value; } }
remove { lock (eventLock) { allComplete -= value; } }
}
private EventHandler allComplete;
/// <summary>Raised when each operation completes.</summary>
public event EventHandler<ParallelEventArgs> ItemComplete
{
add { lock (eventLock) { itemComplete += value; } }
remove { lock (eventLock) { itemComplete -= value; } }
}
private EventHandler<ParallelEventArgs> itemComplete;
private void OnItemComplete(object state, Exception exception)
{
EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock
if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
if (Interlocked.Decrement(ref running) == 0)
{
EventHandler allHandler = allComplete; // don't need to lock
if (allHandler != null) allHandler(this, EventArgs.Empty);
lock (joinLock)
{
Monitor.PulseAll(joinLock);
}
}
}
/// <summary>Adds a callback to invoke when each operation completes.</summary>
/// <returns>Current instance (for fluent API).</returns>
public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException("handler");
ItemComplete += handler;
return this;
}
/// <summary>Adds a callback to invoke when all operations are complete.</summary>
/// <returns>Current instance (for fluent API).</returns>
public Forker OnAllComplete(EventHandler handler)
{
if (handler == null) throw new ArgumentNullException("handler");
AllComplete += handler;
return this;
}
/// <summary>Waits for all operations to complete.</summary>
public void Join()
{
Join(-1);
}
/// <summary>Waits (with timeout) for all operations to complete.</summary>
/// <returns>Whether all operations had completed before the timeout.</returns>
public bool Join(int millisecondsTimeout)
{
lock (joinLock)
{
if (CountRunning() == 0) return true;
Thread.SpinWait(1); // try our luck...
return (CountRunning() == 0) ||
Monitor.Wait(joinLock, millisecondsTimeout);
}
}
/// <summary>Indicates the number of incomplete operations.</summary>
/// <returns>The number of incomplete operations.</returns>
public int CountRunning()
{
return Interlocked.CompareExchange(ref running, 0, 0);
}
/// <summary>Enqueues an operation.</summary>
/// <param name="action">The operation to perform.</param>
/// <returns>The current instance (for fluent API).</returns>
public Forker Fork(ThreadStart action) { return Fork(action, null); }
/// <summary>Enqueues an operation.</summary>
/// <param name="action">The operation to perform.</param>
/// <param name="state">An opaque object, allowing the caller to identify operations.</param>
/// <returns>The current instance (for fluent API).</returns>
public Forker Fork(ThreadStart action, object state)
{
if (action == null) throw new ArgumentNullException("action");
Interlocked.Increment(ref running);
ThreadPool.QueueUserWorkItem(delegate
{
Exception exception = null;
try { action(); }
catch (Exception ex) { exception = ex;}
OnItemComplete(state, exception);
});
return this;
}
}
Ответ 3
Во-первых, как долго выполняются работники? потоки пулов обычно должны использоваться для краткосрочных задач - если они будут работать некоторое время, рассмотрите ручные потоки.
Возьмем проблему; вам действительно нужно блокировать основной поток? Можете ли вы использовать обратный вызов? Если да, то что-то вроде:
int running = 1; // start at 1 to prevent multiple callbacks if
// tasks finish faster than they are started
Action endOfThread = delegate {
if(Interlocked.Decrement(ref running) == 0) {
// ****run callback method****
}
};
foreach(var o in collection)
{
var tmp = o; // avoid "capture" issue
Interlocked.Increment(ref running);
ThreadPool.QueueUserWorkItem(delegate {
DoSomeWork(tmp); // [A] should handle exceptions internally
endOfThread();
});
}
endOfThread(); // opposite of "start at 1"
Это довольно легкий (без примитивов ОС) способ отслеживания рабочих.
Если вы нуждаетесь для блокировки, вы можете сделать то же самое с помощью Monitor
(опять же, избегая объекта ОС):
object syncLock = new object();
int running = 1;
Action endOfThread = delegate {
if (Interlocked.Decrement(ref running) == 0) {
lock (syncLock) {
Monitor.Pulse(syncLock);
}
}
};
lock (syncLock) {
foreach (var o in collection) {
var tmp = o; // avoid "capture" issue
ThreadPool.QueueUserWorkItem(delegate
{
DoSomeWork(tmp); // [A] should handle exceptions internally
endOfThread();
});
}
endOfThread();
Monitor.Wait(syncLock);
}
Console.WriteLine("all done");
Ответ 4
Я использовал новую библиотеку параллельных задач в CTP здесь:
Parallel.ForEach(collection, o =>
{
DoSomeWork(o);
});
Ответ 5
Вот решение, использующее класс CountdownEvent
.
var complete = new CountdownEvent(1);
foreach (var o in collection)
{
var capture = o;
ThreadPool.QueueUserWorkItem((state) =>
{
try
{
DoSomething(capture);
}
finally
{
complete.Signal();
}
}, null);
}
complete.Signal();
complete.Wait();
Конечно, если у вас есть доступ к классу CountdownEvent
, то у вас есть весь TPL для работы. Класс Parallel
заботится о вас.
Parallel.ForEach(collection, o =>
{
DoSomething(o);
});
Ответ 6
Я думаю, что вы были на правильном пути с помощью ManualResetEvent. Эта ссылка содержит пример кода, который точно соответствует тому, что вы пытаетесь сделать. Ключ должен использовать WaitHandle.WaitAll и передать массив событий ожидания. Каждый поток должен установить одно из этих событий ожидания.
// Simultaneously calculate the terms.
ThreadPool.QueueUserWorkItem(
new WaitCallback(CalculateBase));
ThreadPool.QueueUserWorkItem(
new WaitCallback(CalculateFirstTerm));
ThreadPool.QueueUserWorkItem(
new WaitCallback(CalculateSecondTerm));
ThreadPool.QueueUserWorkItem(
new WaitCallback(CalculateThirdTerm));
// Wait for all of the terms to be calculated.
WaitHandle.WaitAll(autoEvents);
// Reset the wait handle for the next calculation.
manualEvent.Reset();
Edit:
Убедитесь, что на пути кода вашего рабочего потока вы установили событие (то есть autoEvents 1.Set();). Как только все они будут сигнализированы, waitAll вернется.
void CalculateSecondTerm(object stateInfo)
{
double preCalc = randomGenerator.NextDouble();
manualEvent.WaitOne();
secondTerm = preCalc * baseNumber *
randomGenerator.NextDouble();
autoEvents[1].Set();
}
Ответ 7
Я нашел хорошее решение здесь:
http://msdn.microsoft.com/en-us/magazine/cc163914.aspx
Может оказаться полезным для других с той же проблемой
Ответ 8
Использование .NET 4.0 Barrie r class:
Barrier sync = new Barrier(1);
foreach(var o in collection)
{
WaitCallback worker = (state) =>
{
// do work
sync.SignalAndWait();
};
sync.AddParticipant();
ThreadPool.QueueUserWorkItem(worker, o);
}
sync.SignalAndWait();