Синхронизация загрузки многопоточных файлов
В настоящее время я работаю над клиентским/серверным приложением Delphi XE3 для передачи файлов (с помощью компонентов Indy FTP). Клиентская часть контролирует папку, получает список файлов внутри, загружает их на сервер и удаляет оригиналы. Загрузка выполняется отдельным потоком, который обрабатывает файлы один за другим. Файлы могут варьироваться от 0 до нескольких тысяч, а их размеры также сильно различаются.
Это приложение Firemonkey, скомпилированное как для OSX, так и для Windows, поэтому мне пришлось использовать TThread вместо OmniThreadLibrary, который я предпочитал. Мой клиент сообщает, что приложение случайно зависает. Я не мог его дублировать, но поскольку у меня не так много опыта работы с TThread, я мог бы где-то поставить тупик. Я читал довольно много примеров, но я до сих пор не уверен в некоторых многопоточных особенностях.
Структура приложения проста:
Таймер в основном потоке проверяет папку и получает информацию о каждом файле в запись, которая переходит в общий TList. В этом списке хранится информация об именах файлов, размере, прогрессе, полностью ли загружен файл или его нужно повторить. Все это отображается в сетке с индикаторами выполнения и т.д. Этот список доступен только по основному потоку.
После этого элементы из списка отправляются в поток, вызывая метод AddFile (код ниже). Поток хранит все файлы в потокобезопасной очереди, подобной этой http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
Когда файл загружается, поток uploader уведомляет основной поток о вызове Synchronize.
Основной поток периодически вызывает метод Uploader.GetProgress, чтобы проверить текущий ход файла и отобразить его. Эта функция не является поточно-безопасной, но может ли она вызвать тупик или вернутся только неверные данные?
Каким будет безопасный и эффективный способ проверки прогресса?
Итак, такой подход, или я что-то пропустил? Как вы это сделаете?
Например, я хочу создать новый поток только для чтения содержимого папки. Это означает, что TList, который я использую, должен быть потокобезопасным, но он должен быть доступен все время, чтобы обновить отображаемую информацию в сетке графического интерфейса. Не все ли синхронизация просто замедляют работу графического интерфейса?
Я опубликовал упрощенный код ниже, если кто-то захочет посмотреть на него. Если нет, я был бы рад услышать некоторые мнения о том, что я должен использовать в целом. Основные задачи - работать как с ОС, так и с ОС Windows; чтобы иметь возможность отображать информацию обо всех файлах и о текущем ходе; и реагировать независимо от количества и размера файлов.
Это код потока пользователя. Я удалил некоторые из них для удобства чтения:
type
TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
TFileInfo = record
ID: Integer;
Path: String;
Size: Int64;
UploadedSize: Int64;
Status: TFileStatus;
end;
TUploader = class(TThread)
private
FTP: TIdFTP;
fQueue: TThreadedQueue<TFileInfo>;
fCurrentFile: TFileInfo;
FUploading: Boolean;
procedure ConnectFTP;
function UploadFile(aFileInfo: TFileInfo): String;
procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
procedure SignalComplete;
procedure SignalError(aError: String);
protected
procedure Execute; override;
public
property Uploading: Boolean read FUploading;
constructor Create;
destructor Destroy; override;
procedure Terminate;
procedure AddFile(const aFileInfo: TFileInfo);
function GetProgress: TFileInfo;
end;
procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
fQueue.Enqueue(aFileInfo);
end;
procedure TUploader.ConnectFTP;
begin
...
FTP.Connect;
end;
constructor TUploader.Create;
begin
inherited Create(false);
FreeOnTerminate := false;
fQueue := TThreadedQueue<TFileInfo>.Create;
// Create the TIdFTP and set ports and other params
...
end;
destructor TUploader.Destroy;
begin
fQueue.Close;
fQueue.Free;
FTP.Free;
inherited;
end;
// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
Temp: TFileInfo;
begin
try
ConnectFTP;
except
on E: Exception do
SignalError(E.Message);
end;
// Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
while fQueue.Peek(fCurrentFile) = wrSignaled do
try
if UploadFile(fCurrentFile) = '' then
begin
fQueue.Dequeue(Temp); // Delete the item from the queue if succesful
SignalComplete;
end;
except
on E: Exception do
SignalError(E.Message);
end;
end;
// Return the current file info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
Result := fCurrentFile;
end;
// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
fCurrentFile.UploadedSize := AWorkCount;
end;
procedure TUploader.SignalComplete;
begin
Synchronize(
procedure
begin
frmClientMain.OnCompleteFile(fCurrentFile);
end);
end;
procedure TUploader.SignalError(aError: String);
begin
try
FTP.Disconnect;
except
end;
if fQueue.Closed then
Exit;
Synchronize(
procedure
begin
frmClientMain.OnUploadError(aError);
end);
end;
// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
fQueue.Close;
inherited;
end;
function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
Result := 'Error';
try
if not FTP.Connected then
ConnectFTP;
FUploading := true;
FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));
Result := '';
finally
FUploading := false;
end;
end;
И части основного потока, которые взаимодействуют с загрузчиком:
......
// Main form
fUniqueID: Integer; // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
fUploader: TUploader; // The uploader thread
fFiles: TList<TFileInfo>;
fCurrentFileName: String; // Used to display the progress
function IndexOfFile(aID: Integer): Integer; //Return the index of the record inside the fFiles given the file ID
public
procedure OnCompleteFile(aFileInfo: TFileInfo);
procedure OnUploadError(aError: String);
end;
// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
// show and log the error
end;
// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
I: Integer;
begin
I := IndexOfFile(aFileInfo.ID);
if (I >= 0) and (I < fFiles.Count) then
begin
aFileInfo.Status := fsUploaded;
aFileInfo.UploadedSize := aFileInfo.Size;
FFiles.Items[I] := aFileInfo;
Inc(FFilesUploaded);
TFile.Delete(aFileInfo.Path);
colProgressImg.UpdateCell(I);
end;
end;
procedure TfrmClientMain.ProcessFolder;
var
NewFiles: TStringDynArray;
I, J: Integer;
FileInfo: TFileInfo;
begin
// Remove completed files from the list if it contains more than XX files
while FFiles.Count > 1000 do
if FFiles[0].Status = fsUploaded then
begin
Dec(FFilesUploaded);
FFiles.Delete(0);
end else
Break;
NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
for I := 0 to Length(NewFiles) - 1 do
begin
FileInfo.ID := FUniqueID;
Inc(FUniqueID);
FileInfo.Path := NewFiles[I];
FileInfo.Size := GetFileSizeByName(NewFiles[I]);
FileInfo.UploadedSize := 0;
FileInfo.Status := fsToBeQueued;
FFiles.Add(FileInfo);
if (I mod 100) = 0 then
begin
UpdateStatusLabel;
grFiles.RowCount := FFiles.Count;
Application.ProcessMessages;
if fUploader = nil then
break;
end;
end;
// Send the new files and resend failed to the uploader thread
for I := 0 to FFiles.Count - 1 do
if (FFiles[I].Status = fsToBeQueued) then
begin
if fUploader = nil then
Break;
FileInfo := FFiles[I];
FileInfo.Status := fsQueued;
FFiles[I] := FileInfo;
SaveDebug(1, 'Add: ' + ExtractFileName(FFiles[I].Path));
FUploader.AddFile(FFiles[I]);
end;
end;
procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
FileInfo: TFileInfo;
I: Integer;
begin
if (fUploader = nil) or not fUploader.Uploading then
Exit;
FileInfo := fUploader.GetProgress;
I := IndexOfFile(FileInfo.ID);
if (I >= 0) and (I < fFiles.Count) then
begin
fFiles.Items[I] := FileInfo;
fCurrentFileName := ExtractFileName(FileInfo.Path);
colProgressImg.UpdateCell(I);
end;
end;
function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
I: Integer;
begin
Result := -1;
for I := 0 to FFiles.Count - 1 do
if FFiles[I].ID = aID then
Exit(I);
end;
Ответы
Ответ 1
Возможно, это не проблема, но TFileInfo - это запись.
Это означает, что при передаче в качестве параметра (non const/var) он копируется. Это может привести к проблемам с такими вещами, как строки в записи, которые не получают количество ссылок, обновляемое при копировании записи.
Можно попытаться сделать это классом и передать экземпляр в качестве параметра (т.е. указателя на данные в куче).
Что-то еще, на что можно обратить внимание, - это совлокальный Int64 (например, ваши значения размера) в поточных 32-битных системах.
Обновление/чтение не выполняется атомарно, и у вас нет каких-либо конкретных защит, поэтому можно прочитать значение, чтобы получить несоответствие верхних и нижних 32-бит из-за потоковой передачи. (например, прочитать верхние 32 бита, записать верхние 32 биты, записать нижние 32 бита, прочитать нижние 32биты, читать и записывать в разных потоках). Вероятно, это не вызывает проблем, которые вы видите, и если вы не работаете с передачами файлов размером > 4 ГБ, вряд ли когда-нибудь возникнут какие-либо проблемы.
Ответ 2
Тупики определенно трудно обнаружить, но это может быть проблемой.
В вашем коде я не видел, чтобы вы добавили тайм-аут в очередь, заглядывать или деактивировать - что означает, что он примет значение по умолчанию для Infinite.
В этой очереди есть эта строка - это означает, что любой объект синхронизации будет блокироваться до тех пор, пока не будет завершен ввод (он блокирует монитор) или время ожидания (поскольку у вас нет тайм-аута, он будет ждать вечно )
TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult;
...
if not TMonitor.Enter(FQueue, Timeout)
Я также сделаю предположение, что вы внедрили PEEK самостоятельно на основе Dequeue - только вы фактически не удаляете элемент.
Кажется, что он реализует свой собственный тайм-аут - однако у вас все еще есть следующее:
function TSimpleThreadedQueue.Peek/Dequeue(var Item: T; Timeout: LongWord): TWaitResult;
...
if not TMonitor.Enter(FQueue, Timeout)
Если тайм-аут Infinite - поэтому, если вы находитесь в режиме peek, ожидая, что он будет сигнализирован с бесконечным тайм-аутом, вы не сможете переписать что-то из второго потока без блокировки этого потока, ожидающего, что метод peek станет завершена с бесконечным таймаутом.
Вот фрагмент комментария от TMonitor
Enter locks the monitor object with an optional timeout (in ms) value.
Enter without a timeout will wait until the lock is obtained.
If the procedure returns it can be assumed that the lock was acquired.
Enter with a timeout will return a boolean status indicating whether or
not the lock was obtained (True) or the attempt timed out prior to
acquire the lock (False). Calling Enter with an INFINITE timeout
is the same as calling Enter without a timeout.
Поскольку реализация по умолчанию использует Infinite, а значение TMonitor.Spinlock не предусмотрено, это блокирует поток, пока он не сможет получить объект FQueue.
Мое предложение было бы изменить ваш код следующим образом:
// Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
while true do
case fQueue.Peek(fCurrentFile,10)
wrSignaled:
try
if UploadFile(fCurrentFile) = '' then
begin
fQueue.Dequeue(Temp); // Delete the item from the queue if succesful
SignalComplete;
end;
except
on E: Exception do
SignalError(E.Message);
end;
wrTimeout: sleep(10);
wrIOCompletion,
wrAbandoned,
wrError: break;
end; //case
Таким образом, peek не будет удерживать блокировку FQueue неограниченно, оставляя окно для Enqueue для его получения и добавления файла из основного потока (UI).
Ответ 3
Это может быть длинный выстрел, но здесь есть еще одна возможность [первый ответ может быть более вероятным] (что-то, с чем я только что столкнулся, но знал раньше): использование Synchronize может вызвать тупик. Вот блог о том, почему это происходит:
Delphi-Workaround-for-TThread-SynchronizeWaitFor-.aspx
Соответствующая точка статьи:
Thread A вызывает синхронизацию (MethodA)
Запросы Thread B Синхронизировать (MethodB)
Затем в контексте основного потока:
Вызов основного потока CheckSynchronize() при обработке сообщений
CheckSynchronize реализуется для пакетной обработки всех ожидающих вызовов (*). Поэтому он поднимает очередь ожидающих вызовов (содержащих MethodA и MethodB) и циклов через них один за другим.
MethodA выполняется в основном потоке контекст. Предположим, что методA вызывает ThreadB.WaitFor
Ожидание на вызовы CheckSynchronize для обработки любых ожидающих вызовов для синхронизации
В теории это должно затем обрабатывать ThreadB Synchronize (MethodB), позволяя Thread B завершить. Однако MethodB уже обладание первым вызовом CheckSynchronize, поэтому он никогда не получает называется.
ТУПИК!
статья QC Embarcadero, описывающая проблему более подробно.
Пока я не вижу вызовы ProcessMessages в вышеуказанном коде или, если на то пошло, WaitFor, которые будут вызываться во время Synchronize, все равно может быть проблема, которая в момент вызова synchronize вызывает другой поток синхронизация также - но основной поток уже синхронизирован и блокируется.
Сначала это не касалось меня, потому что я стараюсь избегать синхронных вызовов, таких как чума, и обычно разрабатывать обновления пользовательского интерфейса из потоков, используя другие методы, такие как передача сообщений и потокобезопасные списки с уведомлением о сообщении вместо синхронизации вызовов.