Ожидание многократных потоков с использованием WaitForMultipleObjects
Я использую функцию WaitForMultipleObjects
, чтобы дождаться завершения нескольких потоков, но я делаю что-то неправильно, потому что результат не является ожидаемым
см. этот пример кода
type
TForm1 = class(TForm)
Memo1: TMemo;
Button1: TButton;
procedure Button1Click(Sender: TObject);
private
end;
TFoo = class(TThread)
private
Factor: Double;
procedure ShowData;
protected
procedure Execute; override;
constructor Create(AFactor : Double);
end;
var
Form1: TForm1;
implementation
Uses
Math;
{$R *.dfm}
{ TFoo }
constructor TFoo.Create(AFactor: Double);
begin
inherited Create(False);
Factor := AFactor;
FreeOnTerminate := True;
end;
procedure TFoo.Execute;
const
Max=100000000;
var
i : Integer;
begin
inherited;
for i:=1 to Max do
Factor:=Sqrt(Factor);
Synchronize(ShowData);
end;
procedure TFoo.ShowData;
begin
Form1.Memo1.Lines.Add(FloatToStr(Factor));
end;
procedure TForm1.Button1Click(Sender: TObject);
const
nThreads=5;
Var
tArr : Array[1..nThreads] of TFoo;
hArr : Array[1..nThreads] of THandle;
i : Integer;
rWait : Cardinal;
begin
for i:=1 to nThreads do
begin
tArr[i]:=TFoo.Create(Pi*i);
hArr[i]:=tArr[i].Handle;
end;
repeat
rWait:= WaitForMultipleObjects(nThreads, @hArr, True, 100);
Application.ProcessMessages;
until rWait<>WAIT_TIMEOUT;
//here I want to show this message when all the threads are terminated
Memo1.Lines.Add('Wait done');
end;
end.
это текущий вывод демонстрационного приложения
1
Wait done
1
1
1
1
но я хочу что-то вроде этого
1
1
1
1
1
Wait done
Как я должен использовать функцию WaitForMultipleObjects
, чтобы ждать завершения всего потока?
Ответы
Ответ 1
Исправить: Удалить FreeOnTerminate.
Ваш код заставляет потоки освобождаться, когда вам все еще нужны дескрипторы. Это большая ошибка, и вы можете получить доступ к нарушениям где-то еще в вашем коде, или коды возврата ошибок, возвращающиеся из ваших объектов WaitFormMultipleObjects.
TThread.handle становится недействительным, когда TThread освобождается, и это завершает цикл ожидания раньше, потому что дескриптор больше недействителен. Вы также можете столкнуться с нарушением прав доступа, если попытаетесь получить доступ к TThread после его освобождения в фоновом режиме, поэтому я считаю, что лучше освободить их намеренно и в известное время.
Использование дескриптора потока в качестве дескриптора события отлично работает, но вы не должны использовать FreeOnTerminate для освобождения потока, когда он его завершает, так как это слишком быстро уничтожает дескрипторы.
Я также согласен с людьми, которые заявили, что выполнение цикла ожидания ожидания с Application.Processmessages довольно уродливо. Есть и другие способы сделать это.
unit threadUnit2;
interface
uses Classes, SyncObjs,Windows, SysUtils;
type
TFoo = class(TThread)
private
FFactor: Double;
procedure ShowData;
protected
procedure Execute; override;
constructor Create(AFactor : Double);
destructor Destroy; override;
end;
procedure WaitForThreads;
implementation
Uses
Forms,
Math;
procedure Trace(msg:String);
begin
if Assigned(Form1) then
Form1.Memo1.Lines.Add(msg);
end;
{ TFoo }
constructor TFoo.Create(AFactor: Double);
begin
inherited Create(False);
FFactor := AFactor;
// FreeOnTerminate := True;
end;
destructor TFoo.Destroy;
begin
inherited;
end;
procedure TFoo.Execute;
const
Max=100000000;
var
i : Integer;
begin
inherited;
for i:=1 to Max do
FFactor:=Sqrt(FFactor);
Synchronize(ShowData);
end;
procedure TFoo.ShowData;
begin
Trace(FloatToStr(FFactor));
end;
procedure WaitForThreads;
const
nThreads=5;
Var
tArr : Array[1..nThreads] of TFoo;
hArr : Array[1..nThreads] of THandle;
i : Integer;
rWait : Cardinal;
begin
for i:=1 to nThreads do
begin
tArr[i]:=TFoo.Create(Pi*i);
hArr[i]:=tArr[i].handle; // Event.Handle;
end;
repeat
rWait:= WaitForMultipleObjects(nThreads, @hArr[1],{waitAll} True, 150);
Application.ProcessMessages;
until rWait<>WAIT_TIMEOUT;
Sleep(0);
//here I want to show this message when all the threads are terminated
Trace('Wait done');
for i:=1 to nThreads do
begin
tArr[i].Free;
end;
end;
end.
Ответ 2
Если вы действительно хотите узнать, как работает многопоточность, вы находитесь на правильном пути - изучите код и задайте вопросы, как вы это делали. Если, однако, вы просто хотите использовать многопоточность в своем приложении, вы можете сделать это гораздо проще с помощью OmniThreadLibrary, если вы используете хотя бы Delphi 2009.
uses
Math,
OtlTask,
OtlParallel;
function Calculate(factor: real): real;
const
Max = 100000000;
var
i: integer;
begin
Result := factor;
for i := 1 to Max do
Result := Sqrt(Result);
end;
procedure TForm35.btnClick(Sender: TObject);
const
nThreads = 5;
begin
Parallel.ForEach(1, nThreads).Execute(
procedure (const task: IOmniTask; const value: integer)
var
res: real;
begin
res := Calculate(Pi*value);
task.Invoke(
procedure begin
Form35.Memo1.Lines.Add(FloatToStr(res));
end
);
end
);
Memo1.Lines.Add('All done');
end;
Ответ 3
Вот что происходит.
- Ваш код возвращает
WAIT_FAILED
из WaitForMultipleObjects
.
- Вызов
GetLastError
приводит к ошибке кода 6, дескриптор недействителен.
- Единственными дескрипторами, которые вы передаете в
WaitForMultipleObjects
, являются дескрипторы потоков, а один из дескрипторов потоков недействителен.
- Единственный способ, по которому один из дескрипторов потоков может стать недействительным, - это закрыть его.
- Как указывали другие, вы закрываете ручки, установив
FreeOnTerminate
.
Мораль истории состоит в том, чтобы правильно проверить возвращаемые значения со всех функций, и пусть GetLastError
приведет вас к основной причине проблемы.
Ответ 4
Не пропускайте такой короткий период ожидания, как последний параметр.
Согласно MSDN
dwMilliseconds [in] Интервал тайм-аута, в миллисекундах. Функция возвращается, если интервал истекает, даже если условия, заданные параметром bWaitAll, не выполняются. Если dwMilliseconds равно нулю, функция проверяет состояния указанных объектов и немедленно возвращается. Если dwMilliseconds является INFINITE, интервал тайм-аута функции никогда не истекает.
Обратите особое внимание на второе предложение. Вы говорите ему ждать все ручки, но тайм-аут через 100 мс. Итак, передайте INFINITE в качестве последнего параметра вместо этого и используйте WAIT_OBJECT_0
вместо WAIT_TIMEOUT
в качестве теста выхода.
Ответ 5
Всякий раз, когда вы ждете и включаете сообщение, вы должны использовать MsgWait... и указать маску для обработки ожидаемого сообщения
repeat
rWait:= MsgWaitForMultipleObjects(nThreads, @hArr[1], True, INFINITE, QS_ALLEVENTS);
Application.ProcessMessages;
until (rWait<>WAIT_TIMEOUT) and (rWait <> (WAIT_OBJECT_0 + nThreads));
nThreads
Ответ 6
Я не мог передать эту возможность, чтобы создать рабочий пример запуска нескольких потоков и использования обмена сообщениями, чтобы сообщить результаты обратно в графический интерфейс.
Нити, которые будут запущены, объявляются как:
type
TWorker = class(TThread)
private
FFactor: Double;
FResult: Double;
FReportTo: THandle;
protected
procedure Execute; override;
public
constructor Create(const aFactor: Double; const aReportTo: THandle);
property Factor: Double read FFactor;
property Result: Double read FResult;
end;
Конструктор просто устанавливает частные члены и устанавливает FreeOnTerminate в False. Это необходимо, так как это позволит основному потоку запрашивать экземпляр для результата. Метод execute выполняет его расчет, а затем публикует сообщение с дескриптором, которое он получил в своем конструкторе, чтобы сказать, что оно сделано.
procedure TWorker.Execute;
const
Max = 100000000;
var
i : Integer;
begin
inherited;
FResult := FFactor;
for i := 1 to Max do
FResult := Sqrt(FResult);
PostMessage(FReportTo, UM_WORKERDONE, Self.Handle, 0);
end;
Объявления для пользовательского сообщения UM_WORKERDONE объявляются как:
const
UM_WORKERDONE = WM_USER + 1;
type
TUMWorkerDone = packed record
Msg: Cardinal;
ThreadHandle: Integer;
unused: Integer;
Result: LRESULT;
end;
В форме, начинающейся с потоков, это добавлено к ее объявлению:
private
FRunning: Boolean;
FThreads: array of record
Instance: TThread;
Handle: THandle;
end;
procedure StartThreads(const aNumber: Integer);
procedure HandleThreadResult(var Message: TUMWorkerDone); message UM_WORKERDONE;
Функция FRunning используется для предотвращения нажатия кнопки во время работы. FThreads используется для хранения указателя экземпляра и дескриптора созданных потоков.
Процедура запуска потоков имеет довольно простую реализацию:
procedure TForm1.StartThreads(const aNumber: Integer);
var
i: Integer;
begin
Memo1.Lines.Add(Format('Starting %d worker threads', [aNumber]));
SetLength(FThreads, aNumber);
for i := 0 to aNumber - 1 do
begin
FThreads[i].Instance := TWorker.Create(pi * (i+1), Self.Handle);
FThreads[i].Handle := FThreads[i].Instance.Handle;
end;
end;
Веселье в реализации HandleThreadResult:
procedure TForm1.HandleThreadResult(var Message: TUMWorkerDone);
var
i: Integer;
ThreadIdx: Integer;
Thread: TWorker;
Done: Boolean;
begin
// Find thread in array
ThreadIdx := -1;
for i := Low(FThreads) to High(FThreads) do
if FThreads[i].Handle = Cardinal(Message.ThreadHandle) then
begin
ThreadIdx := i;
Break;
end;
// Report results and free the thread, nilling its pointer so we can detect
// when all threads are done.
if ThreadIdx > -1 then
begin
Thread := TWorker(FThreads[i].Instance);
Memo1.Lines.Add(Format('Thread %d returned %f', [ThreadIdx, Thread.Result]));
FreeAndNil(FThreads[i].Instance);
end;
// See whether all threads have finished.
Done := True;
for i := Low(FThreads) to High(FThreads) do
if Assigned(FThreads[i].Instance) then
begin
Done := False;
Break;
end;
if Done then
Memo1.Lines.Add('Work done');
end;
Enjoy...
Ответ 7
Существует одно условие, которое удовлетворяет вашему условию "до" в цикле повтора, которое вы игнорируете, WAIT_FAILED
:
until rWait<>WAIT_TIMEOUT;
Memo1.Lines.Add('Wait done');
Поскольку ваш тайм-аут несколько туго, один (или более) из потоков заканчивается и освобождает себя, делая один (или более) дескриптор недействительным для следующего WaitForMultipleObjects
, что заставляет его возвращать 'WAIT_FAILED', что приводит к 'Wait сделанное '.
Для каждой итерации цикла повторения вы должны удалить дескрипторы готовых потоков из hArr
. Затем снова не забудьте проверить "WAIT_FAILED" в любом случае.
изменить:
Ниже приведен пример кода, показывающего, как это можно сделать. Разница в этом подходе, вместо того, чтобы сохранять потоки живыми, заключается в том, что она не оставляет неиспользуемых объектов ядра и RTL. Это не имело бы значения для образца, но для многих потоков, занимающихся длительным бизнесом, это может быть предпочтительным.
В коде WaitForMultipleObjects
вызывается с передачей "false" для параметра "bWaitAll", чтобы иметь возможность удалить дескриптор потока без использования дополнительного вызова API, чтобы узнать, является ли он недопустимым или нет. Но это позволяет иначе, так как код также должен иметь возможность обрабатывать потоки, отделенные вне вызова ожидания.
procedure TForm1.Button1Click(Sender: TObject);
const
nThreads=5;
Var
tArr : Array[1..nThreads] of TFoo;
hArr : Array[1..nThreads] of THandle;
i : Integer;
rWait : Cardinal;
hCount: Integer; // total number of supposedly running threads
Flags: DWORD; // dummy variable used in a call to find out if a thread handle is valid
procedure RemoveHandle(Index: Integer); // Decrement valid handle count and leave invalid handle out of range
begin
if Index <> hCount then
hArr[Index] := hArr[hCount];
Dec(hCount);
end;
begin
Memo1.Clear;
for i:=1 to nThreads do
begin
tArr[i]:=TFoo.Create(Pi*i);
hArr[i]:=tArr[i].Handle;
end;
hCount := nThreads;
repeat
rWait:= WaitForMultipleObjects(hCount, @hArr, False, 100);
case rWait of
// one of the threads satisfied the wait, remove its handle
WAIT_OBJECT_0..WAIT_OBJECT_0 + nThreads - 1: RemoveHandle(rWait + 1);
// at least one handle has become invalid outside the wait call,
// or more than one thread finished during the previous wait,
// find and remove them
WAIT_FAILED:
begin
if GetLastError = ERROR_INVALID_HANDLE then
begin
for i := hCount downto 1 do
if not GetHandleInformation(hArr[i], Flags) then // is handle valid?
RemoveHandle(i);
end
else
// the wait failed because of something other than an invalid handle
RaiseLastOSError;
end;
// all remaining threads continue running, process messages and loop.
// don't process messages if the wait returned WAIT_FAILED since we didn't wait at all
// likewise WAIT_OBJECT_... may return soon
WAIT_TIMEOUT: Application.ProcessMessages;
end;
until hCount = 0; // no more valid thread handles, we're done
Memo1.Lines.Add('Wait done');
end;
Обратите внимание, что это ответ на вопрос, как его спрашивают. Я предпочел бы использовать событие TThreads OnTerminate
для уменьшения счетчика и вывода сообщения" Ожидание ", когда оно достигнет" 0". Это или, как другие рекомендовали, переместил ожидание на собственный поток, было бы проще и, возможно, более чистым, и избежать необходимости Application.ProcessMessages
.
Ответ 8
Я добавил следующие строки в конец процедуры:
memo1.Lines.add(intToHex(rWait, 2));
if rWait = $FFFFFFFF then
RaiseLastOSError;
Оказывается, что WaitForMultipleObjects
не работает с ошибкой Access Denied, скорее всего, потому что некоторые, но не все потоки заканчиваются и очищаются между итерациями.
У тебя здесь липкая проблема. Вам нужно, чтобы насос сообщений работал, или вызовы Synchronize не будут работать, поэтому вы не можете передавать INFINITE, как предположил Кен. Но если вы делаете то, что делаете сейчас, вы сталкиваетесь с этой проблемой.
Решение состоит в том, чтобы переместить вызов WaitForMultipleObjects
и код вокруг него в собственный поток. Он должен дождаться INFINITE, а затем, когда он закончит, он должен каким-то образом сигнализировать о потоке пользовательского интерфейса, чтобы сообщить об этом. (Например, когда вы нажимаете кнопку, отключите кнопку, а затем, когда поток монитора закончится, он снова включит кнопку.)
Ответ 9
Вы можете реорганизовать свой код, чтобы ждать только одного объекта вместо многих.
Я хотел бы познакомить вас с маленьким помощником, который обычно помогает мне в таких случаях. На этот раз его имя IFooMonitor:
IFooMonitor = interface
function WaitForAll(ATimeOut: Cardinal): Boolean;
procedure ImDone;
end;
TFoo и IFooMonitor будут друзьями:
TFoo = class(TThread)
strict private
FFactor: Double;
FMonitor: IFooMonitor;
procedure ShowData;
protected
procedure Execute; override;
public
constructor Create(const AMonitor: IFooMonitor; AFactor: Double);
end;
constructor TFoo.Create(const ACountDown: ICountDown; AFactor: Double);
begin
FCountDown := ACountDown;
FFactor := AFactor;
FreeOnTerminate := True;
inherited Create(False);// <- call inherited constructor at the end!
end;
Когда TFoo выполняется со своей работой, он расскажет об этом своему новому другу:
procedure TFoo.Execute;
const
Max = 100000000;
var
i: Integer;
begin
for i := 1 to Max do
FFactor := Sqrt(FFactor);
Synchronize(ShowData);
FMonitor.ImDone();
end;
Теперь мы можем реорганизовать обработчик событий таким образом:
procedure TForm1.Button1Click(Sender: TObject);
const
nThreads = 5;
var
i: Integer;
monitor: IFooMonitor;
begin
monitor := TFooMonitor.Create(nThreads); // see below for the implementation.
for i := 1 to nThreads do
TFoo.Create(monitor, Pi*i);
while not monitor.WaitForAll(100) do
Application.ProcessMessages;
Memo1.Lines.Add('Wait done');
end;
И вот как мы можем реализовать IFooMonitor:
uses
SyncObjs;
TFooMonitor = class(TInterfacedObject, IFooMonitor)
strict private
FCounter: Integer;
FEvent: TEvent;
FLock: TCriticalSection;
private
{ IFooMonitor }
function WaitForAll(ATimeOut: Cardinal): Boolean;
procedure ImDone;
public
constructor Create(ACount: Integer);
destructor Destroy; override;
end;
constructor TFooMonitor.Create(ACount: Integer);
begin
inherited Create;
FCounter := ACount;
FEvent := TEvent.Create(nil, False, False, '');
FLock := TCriticalSection.Create;
end;
procedure TFooMonitor.ImDone;
begin
FLock.Enter;
try
Assert(FCounter > 0);
Dec(FCounter);
if FCounter = 0 then
FEvent.SetEvent;
finally
FLock.Leave
end;
end;
destructor TFooMonitor.Destroy;
begin
FLock.Free;
FEvent.Free;
inherited;
end;
function TFooMonitor.WaitForAll(ATimeOut: Cardinal): Boolean;
begin
Result := FEvent.WaitFor(ATimeOut) = wrSignaled
end;