Parallel.Foreach нереста слишком много потоков

Проблема

Хотя код, о котором я буду говорить здесь, я написал в F #, он основан на платформе .NET 4, но не в зависимости от какой-либо особенности F # (по крайней мере, кажется!).

У меня есть некоторые данные на моем диске, которые я должен обновить из сети, сохраняя последнюю версию на диске:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

Проблема в том, что для loadAndSaveAndUpdate всех моих данных мне пришлось бы выполнять эту функцию много раз:

{1 .. 5000} |> loadAndSaveAndUpdate

Каждый шаг будет делать

  • некоторый диск IO,
  • некоторые данные хрустят,
  • некоторая сеть IO (с возможностью большого количества задержек),
  • больше хрустов данных,
  • и некоторый диск IO.

Было бы неплохо, если бы это было сделано параллельно, в некоторой степени? К сожалению, ни одна из моих функций чтения и разбора не является "асинхронным рабочим процессом".

Первые (не очень хорошие) решения я придумал

Задачи

Первое, что я сделал, это настроить Task[] и запустить их все:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

Затем я нажал CTRL + ESC, чтобы увидеть, сколько потоков оно использовало. 15, 17,..., 35,..., 170,... до тех пор, пока не будет убито приложение! Что-то не так.

Parallel

Я сделал почти то же самое, но используя Parallel.ForEach(...), и результаты были одинаковыми: много и много и много потоков.

Решение, которое работает... вид

Затем я решил начать только теги n, Task.WaitAll(of them), затем другие n, пока не было больше доступных задач.

Это работает, но проблема в том, что, когда он завершил обработку, скажем, n-1 задач, он будет ждать, ждать, ждать последней прошивки, которая настаивает на блокировке из-за большого количества латентности сети. Это нехорошо!

Итак, как вы могли бы атаковать эту проблему? Я был бы признателен за рассмотрение различных решений, включая Async Workflows (и в этом случае как адаптировать мои неасинхронные функции), Parallel Extensions, странные параллельные шаблоны и т.д.

Спасибо.

Ответы

Ответ 1

Вы уверены, что ваши индивидуальные задачи выполняются своевременно? Я считаю, что оба класса Parallel.ForEach и Task уже используют .NET threadpool. Задачи обычно должны быть недолговечными рабочими элементами, и в этом случае threadpool будет порождать только небольшое количество реальных потоков, но если ваши задачи не достигнут прогресса, а есть другие задачи в очереди, то количество используемых потоков будет постоянно увеличиваться до максимум (который по умолчанию равен 250/processor в .NET 2.0 SP1, но отличается от разных версий фреймворка). Также стоит отметить, что (по крайней мере, в .NET 2.0 SP1) создание новых потоков дросселируется до 2 новых потоков в секунду, поэтому получение количества потоков, которые вы видите, указывает на то, что задачи не заканчиваются в коротком количестве (поэтому не может быть полностью точным, чтобы обвинить Parallel.ForEach).

Я думаю, что предложение Брайана использовать рабочие процессы async является хорошим, особенно если источником долгоживущих задач является IO, так как async вернет ваши потоки в threadpool до тех пор, пока IO не завершится. Другой вариант заключается в том, чтобы просто принять, что ваши задачи не завершаются быстро и позволяют нереститься из многих потоков (которые можно в некоторой степени контролировать с помощью System.Threading.ThreadPool.SetMaxThreads) - в зависимости от вашей ситуации не может быть большой проблемой, re используя много потоков.

Ответ 2

Использование "async" позволит вам выполнять работу с привязкой к вводу/выводу без записи потоков, в то время как различные вызовы ввода-вывода "на море", поэтому это будет мое первое предложение. Для преобразования кода в async должно быть просто, обычно по линиям

  • оберните каждое тело функции в async{...}, добавьте return где необходимо
  • создавать асинхронные версии любых примитивов ввода-вывода, которые еще не находятся в библиотеке через Async.FromBeginEnd
  • Переключение вызовов формы let r = Foo() на let! r = AsyncFoo()
  • Используйте Async.Parallel для преобразования 5000 асинхронных объектов в единую Async, которая работает параллельно

Для этого существуют различные учебники; одна такая веб-трансляция здесь.

Ответ 3

ParallelOptions.MaxDegreeOfParallelism ограничивает количество одновременных операций, выполняемых вызовами метода Parallel

Ответ 4

Вы всегда можете использовать ThreadPool.

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

в основном

  • Создать пул потоков
  • Установить максимальное количество потоков
  • Очередь всех задач с помощью QueueUserWorkItem(WaitCallback)