Ответ 1
Вы не используете какие-либо API-интерфейсы асинхронного ввода-вывода в любом из ваших кодов. Все, что вы делаете, связано с процессором, и все ваши операции ввода-вывода будут уничтожать блокирование ресурсов процессора. AsParallel
предназначен для задач, связанных с вычислением, если вы хотите воспользоваться преимуществами асинхронного ввода-вывода, вам необходимо использовать API-интерфейсы асинхронной программируемой модели (APM) сегодня в <= v4.0. Это делается путем поиска методов BeginXXX/EndXXX
на используемых вами классах ввода-вывода и использования их, когда они доступны.
Прочитайте это сообщение для стартеров: TPL TaskFactory.FromAsync vs Задачи с методами блокировки
Затем вы не хотите использовать AsParallel
в этом случае. AsParallel
включает потоковое вещание, которое приведет к немедленному планированию новой задачи для каждого элемента, но здесь вам не нужно/не нужно. Вам будет гораздо лучше обслуживать разделение работы с помощью Parallel::ForEach
.
Посмотрите, как вы можете использовать это знание для достижения max concurrency в вашем конкретном случае:
var refs = GetReferencesFromDB();
// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
refs,
ref =>
{
string filePath = GetFilePath(ref);
byte[] fileDataBuffer = new byte[1048576];
// Need to use FileStream API directly so we can enable async I/O
FileStream sourceFileStream = new FileStream(
filePath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
8192,
true);
// Use FromAsync to read the data from the file
Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
sourceFileStream.BeginRead
sourceFileStream.EndRead
fileDataBuffer,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async read is completed
readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
{
int soureFileStreamBytesRead;
try
{
// Determine exactly how many bytes were read
// NOTE: this will propagate any potential exception that may have occurred in EndRead
sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
}
finally
{
// Always clean up the source stream
sourceFileStream.Close();
sourceFileStream = null;
}
// This is here to make sure you don't end up trying to read files larger than this sample code can handle
if(sourceFileStreamBytesRead == fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
}
// Convert the file data to a string
string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);
// Parse the HTML
string convertedHtml = ParseHtml(html);
// This is here to make sure you don't end up trying to write files larger than this sample code can handle
if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
}
// Convert the file data back to bytes for writing
Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);
// Need to use FileStream API directly so we can enable async I/O
FileStream destinationFileStream = new FileStream(
destinationFilePath,
FileMode.OpenOrCreate,
FileAccess.Write,
FileShare.None,
8192,
true);
// Use FromAsync to read the data from the file
Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
destinationFileStream.BeginWrite,
destinationFileStream.EndWrite,
fileDataBuffer,
0,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async write is completed
destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
{
try
{
// NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
destinationFileStreamWriteAntecedent.Wait();
}
finally
{
// Always close the destination file stream
destinationFileStream.Close();
destinationFileStream = null;
}
},
TaskContinuationOptions.AttachedToParent);
// Send to external system **concurrent** to writing to destination file system above
SendToWs(ref, convertedHtml);
},
TaskContinuationOptions.AttachedToParent);
});
Теперь, здесь несколько примечаний:
- Это пример кода, поэтому я использую буфер 1 МБ для чтения/записи файлов. Это чрезмерно для файлов HTML и расточительство системных ресурсов. Вы можете либо опустить его в соответствии с вашими максимальными потребностями, либо реализовать цепные чтения/записи в StringBuilder, который является упражнением, которое я оставляю вам, так как я буду писать более 500 строк кода, чтобы делать асинхронные чтения/записи.: P
- Вы заметите, что в продолжении для задач чтения/записи у меня есть
TaskContinuationOptions.AttachedToParent
. Это очень важно, так как это предотвратит рабочий поток, которыйParallel::ForEach
запускает работу с момента завершения, до тех пор, пока все базовые вызовы async не будут завершены. Если бы этого не было, вы могли бы начать работу по всем 5000 элементам одновременно, что загрязняло бы подсистему TPL тысячами запланированных задач и вообще не масштабировалось. - Я вызываю SendToWs одновременно с записью файла в общий ресурс файла. Я не знаю, что является основой реализации SendToWs, но это тоже звучит как хороший кандидат для создания async. Прямо сейчас он принял на себя чистую вычислительную работу и, как таковой, собирается записывать поток ЦП при выполнении. Я оставляю это как упражнение для вас, чтобы выяснить, как лучше всего использовать то, что я показал вам, чтобы улучшить пропускную способность там.
- Это все напечатанная свободная форма, и мой мозг был единственным компилятором здесь, а синтаксис синтаксиса SO - это все, что я использовал, чтобы убедиться, что синтаксис хорош. Поэтому, пожалуйста, простите любые ошибки синтаксиса и дайте мне знать, если я слишком сильно напортачил, что вы не можете сделать головы или хвосты, и я продолжу.