Parallel.Foreach нереста слишком много потоков
Проблема
Хотя код, о котором я буду говорить здесь, я написал в F #, он основан на платформе .NET 4, но не в зависимости от какой-либо особенности F # (по крайней мере, кажется!).
У меня есть некоторые данные на моем диске, которые я должен обновить из сети, сохраняя последнюю версию на диске:
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
Проблема в том, что для loadAndSaveAndUpdate
всех моих данных мне пришлось бы выполнять эту функцию много раз:
{1 .. 5000} |> loadAndSaveAndUpdate
Каждый шаг будет делать
- некоторый диск IO,
- некоторые данные хрустят,
- некоторая сеть IO (с возможностью большого количества задержек),
- больше хрустов данных,
- и некоторый диск IO.
Было бы неплохо, если бы это было сделано параллельно, в некоторой степени? К сожалению, ни одна из моих функций чтения и разбора не является "асинхронным рабочим процессом".
Первые (не очень хорошие) решения я придумал
Задачи
Первое, что я сделал, это настроить Task[]
и запустить их все:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
Затем я нажал CTRL + ESC, чтобы увидеть, сколько потоков оно использовало. 15, 17,..., 35,..., 170,... до тех пор, пока не будет убито приложение! Что-то не так.
Parallel
Я сделал почти то же самое, но используя Parallel.ForEach(...)
, и результаты были одинаковыми: много и много и много потоков.
Решение, которое работает... вид
Затем я решил начать только теги n
, Task.WaitAll(of them)
, затем другие n
, пока не было больше доступных задач.
Это работает, но проблема в том, что, когда он завершил обработку, скажем, n-1
задач, он будет ждать, ждать, ждать последней прошивки, которая настаивает на блокировке из-за большого количества латентности сети. Это нехорошо!
Итак, как вы могли бы атаковать эту проблему? Я был бы признателен за рассмотрение различных решений, включая Async Workflows (и в этом случае как адаптировать мои неасинхронные функции), Parallel Extensions, странные параллельные шаблоны и т.д.
Спасибо.
Ответы
Ответ 1
Вы уверены, что ваши индивидуальные задачи выполняются своевременно? Я считаю, что оба класса Parallel.ForEach
и Task
уже используют .NET threadpool. Задачи обычно должны быть недолговечными рабочими элементами, и в этом случае threadpool будет порождать только небольшое количество реальных потоков, но если ваши задачи не достигнут прогресса, а есть другие задачи в очереди, то количество используемых потоков будет постоянно увеличиваться до максимум (который по умолчанию равен 250/processor в .NET 2.0 SP1, но отличается от разных версий фреймворка). Также стоит отметить, что (по крайней мере, в .NET 2.0 SP1) создание новых потоков дросселируется до 2 новых потоков в секунду, поэтому получение количества потоков, которые вы видите, указывает на то, что задачи не заканчиваются в коротком количестве (поэтому не может быть полностью точным, чтобы обвинить Parallel.ForEach
).
Я думаю, что предложение Брайана использовать рабочие процессы async
является хорошим, особенно если источником долгоживущих задач является IO, так как async
вернет ваши потоки в threadpool до тех пор, пока IO не завершится. Другой вариант заключается в том, чтобы просто принять, что ваши задачи не завершаются быстро и позволяют нереститься из многих потоков (которые можно в некоторой степени контролировать с помощью System.Threading.ThreadPool.SetMaxThreads
) - в зависимости от вашей ситуации не может быть большой проблемой, re используя много потоков.
Ответ 2
Использование "async" позволит вам выполнять работу с привязкой к вводу/выводу без записи потоков, в то время как различные вызовы ввода-вывода "на море", поэтому это будет мое первое предложение. Для преобразования кода в async должно быть просто, обычно по линиям
- оберните каждое тело функции в
async{...}
, добавьте return
где необходимо
- создавать асинхронные версии любых примитивов ввода-вывода, которые еще не находятся в библиотеке через
Async.FromBeginEnd
- Переключение вызовов формы
let r = Foo()
на let! r = AsyncFoo()
- Используйте
Async.Parallel
для преобразования 5000 асинхронных объектов в единую Async, которая работает параллельно
Для этого существуют различные учебники; одна такая веб-трансляция здесь.
Ответ 3
ParallelOptions.MaxDegreeOfParallelism ограничивает количество одновременных операций, выполняемых вызовами метода Parallel
Ответ 4
Вы всегда можете использовать ThreadPool
.
http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx
в основном
- Создать пул потоков
- Установить максимальное количество потоков
- Очередь всех задач с помощью
QueueUserWorkItem(WaitCallback)