Многопоточная обработка файлов с помощью .NET

Существует папка, содержащая 1000 маленьких текстовых файлов. Я пытаюсь разобрать и обработать все из них, пока в папку будет занесено больше файлов. Мое намерение состоит в многопоточности этой операции, поскольку один прототип с прототипом занимает шесть минут для обработки 1000 файлов.

Мне нравится иметь потоки чтения и записи как следующие. Пока читающий поток читает файлы, я хотел бы иметь потоки писем для их обработки. Когда читатель начнет чтение файла, я хотел бы отметить его как обработанный, например, переименовав его. После того, как он будет прочитан, переименуйте его в завершенный.

Как мне подойти к многопоточному приложению?

Лучше ли использовать распределенную хеш-таблицу или очередь?

Какую структуру данных я использую, чтобы избежать блокировок?

Есть ли лучший подход к этой схеме?

Ответы

Ответ 1

Так как любопытство в том, как .NET 4 работает с этим в комментариях, вот этот подход. Извините, это скорее не вариант для OP. Отказ от ответственности: это не очень научный анализ, который просто показывает, что есть явное преимущество в производительности. На основе аппаратного обеспечения ваш пробег может сильно различаться.

Здесь быстрый тест (если вы видите большую ошибку в этом простом тесте, это просто пример. Прокомментируйте, и мы можем исправить его, чтобы быть более полезным/точным). Для этого я просто уронил 12 000 ~ 60 файлов KB в каталог в качестве образца (запустите LINQPad, вы можете играть с ним бесплатно, обязательно получите LINQPad 4, хотя):

var files = 
Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList();

var sw = Stopwatch.StartNew(); //start timer
files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial
sw.Stop(); //stop
sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration

sw.Restart();
files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel
sw.Stop();
sw.ElapsedMilliseconds.Dump("Run MS - Parallel");

Слегка изменив ваш цикл, чтобы распараллелить запрос, все, что необходимо в большинство простых ситуаций. Под "простым" я в основном подразумеваю, что результат одного действия не влияет на следующий. Что-то, что нужно помнить чаще всего, состоит в том, что некоторые коллекции, например наш удобный List<T>, небезопасный поток, поэтому использовать его в параллельном сценарии не очень хорошо:) К счастью, были параллельные коллекции, добавленные в .NET 4, которые являются потокобезопасными. Также имейте в виду, если вы используете блокировку, это может быть узким местом, в зависимости от ситуации.

Здесь используется .AsParallel<T>(IEnumeable<T>) и .ForAll<T>(ParallelQuery<T>), доступные в .NET 4.0. .AsParallel() вызывает обертку IEnumerable<T> в ParallelEnumerableWrapper<T> (внутренний класс), который реализует ParallelQuery<T>. Теперь вы можете использовать методы параллельного расширения, в этом случае мы используем .ForAll().

.ForAll() внутренне разбивает a ForAllOperator<T>(query, action) и запускает его синхронно. Это обрабатывает потоки и слияние потоков после его запуска... Там довольно много происходит, я предлагаю начиная здесь, если вы хотите узнать больше, включая дополнительные параметры.


Результаты (компьютер 1 - физический жесткий диск):

  • Последовательный: 1288 - 1333 мс
  • Параллель: 461 - 503 мс

Спецификации компьютера - для сравнения:

Результаты (компьютер 2 - твердотельный накопитель):

  • Последовательный: 545 - 601 ms
  • Параллель: 248 - 278 ms

Технические характеристики компьютера - для сравнения:

  • Quad Core 2 Quad Q9100 @2.26 GHz
  • 8 GB RAM (DDR 1333)
  • 120 > GB OCZ Vertex SSD (стандартная версия - 1.4 Firmware)

На этот раз у меня нет ссылок на CPU/RAM, они были установлены. Это ноутбук Dell M6400 (здесь ссылка на M6500... Dell собственные ссылки на 6400 broken).


Эти цифры относятся к 10 прогонам, принимая минимальные/максимальные внутренние результаты (удаляя исходный минимум/максимум для каждого из них как возможные выбросы). Мы сталкиваемся с узким местом ввода-вывода здесь, особенно на физическом диске, но думаем о том, что делает серийный метод. Он читает, обрабатывает, читает, обрабатывает, повторяет полоскание. При параллельном подходе вы (даже с узким местом ввода-вывода) просматриваете и обрабатываете одновременно. В худшей ситуации с узким местом вы обрабатываете один файл при чтении следующего. Это одно (на любом текущем компьютере!) Должно привести к некоторому повышению производительности. Вы можете видеть, что мы можем получить немного больше, чем один за один раз в результатах выше, что дает нам здоровый импульс.

Другое выражение об отказе: Quad core +.NET 4 parallel не даст вам в четыре раза больше производительности, оно не масштабируется линейно... Есть и другие соображения и узкие места в игре.

Я надеюсь, что это было интересно проявить подход и возможные выгоды. Не стесняйтесь критиковать или улучшать... Этот ответ существует исключительно для тех, кому интересно, как указано в комментариях:)

Ответ 2

Дизайн

Вероятно, модель "Продюсер/потребитель" будет наиболее полезной для этой ситуации. Вы должны создать достаточно потоков, чтобы увеличить пропускную способность.

Вот некоторые вопросы о шаблоне Продюсер/Потребитель, чтобы дать вам представление о том, как это работает:

Вы должны использовать блокирующую очередь, и продюсер должен добавлять файлы в очередь, пока потребители обрабатывают файлы из очереди. Блокирующая очередь не требует блокировки, поэтому это самый эффективный способ решить вашу проблему.

Если вы используете .NET 4.0, существует несколько параллельных коллекций, которые вы можете использовать из коробки:

Threading

Единственный поток производителей, вероятно, будет самым эффективным способом загрузки файлов с диска и нажатия их в очередь; впоследствии несколько потребителей будут вынимать элементы из очереди, и они будут обрабатывать их. Я бы предложил вам попробовать 2-4 потребительских потока на ядро ​​и выполнить некоторые измерения производительности, чтобы определить, какой из них наиболее оптимален (т.е. Количество потоков, обеспечивающих максимальную пропускную способность). Я бы не рекомендую использовать ThreadPool для этого конкретного примера.

P.S. Я не понимаю, что беспокоит одна точка отказа и использование распределенных хеш-таблиц? Я знаю, что DHT звучит как классная вещь для использования, но сначала я попробую использовать обычные методы, если у вас нет конкретной проблемы, которую вы пытаетесь решить.

Ответ 3

Я рекомендую вам ставить в очередь поток для каждого файла и отслеживать текущие потоки в словаре, запуская новый поток, когда поток завершается, до максимального предела. Я предпочитаю создавать свои собственные потоки, когда они могут быть длительными, и использовать обратные вызовы, чтобы сигнализировать, когда они сделаны или столкнулись с исключением. В приведенном ниже примере я использую словарь для отслеживания текущих рабочих экземпляров. Таким образом, я могу позвонить в экземпляр, если я хочу прекратить работу раньше. Обратные вызовы также могут использоваться для обновления пользовательского интерфейса с прогрессом и пропускной способностью. Вы можете также динамически дросселировать текущую границу потока для добавленных точек.

Пример кода - это сокращенный демонстратор, но он выполняется.

class Program
{
    static void Main(string[] args)
    {
        Supervisor super = new Supervisor();
        super.LaunchWaitingThreads();

        while (!super.Done) { Thread.Sleep(200); }
        Console.WriteLine("\nDone");
        Console.ReadKey();
    }
}

public delegate void StartCallbackDelegate(int idArg, Worker workerArg);
public delegate void DoneCallbackDelegate(int idArg);

public class Supervisor
{
    Queue<Thread> waitingThreads = new Queue<Thread>();
    Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>();
    int maxThreads = 20;
    object locker = new object();

    public bool Done { 
        get { 
            lock (locker) {
                return ((waitingThreads.Count == 0) && (runningThreads.Count == 0)); 
            } 
        } 
    }

    public Supervisor()
    {
        // queue up a thread for each file
        Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n)));
    }

    Thread CreateThread(string fileNameArg)
    {
        Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile);
        thread.IsBackground = true;
        return thread;
    }

    // called when a worker starts
    public void WorkerStart(int threadIdArg, Worker workerArg)
    {
        lock (locker)
        {
            // update with worker instance
            runningThreads[threadIdArg] = workerArg;
        }
    }

    // called when a worker finishes
    public void WorkerDone(int threadIdArg)
    {
        lock (locker)
        {
            runningThreads.Remove(threadIdArg);
        }
        Console.WriteLine(string.Format("  Thread {0} done", threadIdArg.ToString()));
        LaunchWaitingThreads();
    }

    // launches workers until max is reached
    public void LaunchWaitingThreads()
    {
        lock (locker)
        {
            while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0))
            {
                Thread thread = waitingThreads.Dequeue();
                runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate
                thread.Start();
            }
        }
    }
}

public class Worker
{
    string fileName;
    StartCallbackDelegate startCallback;
    DoneCallbackDelegate doneCallback;
    public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg)
    {
        fileName = fileNameArg;
        startCallback = startCallbackArg;
        doneCallback = doneCallbackArg;
    }

    public void ProcessFile()
    {
        startCallback(Thread.CurrentThread.ManagedThreadId, this);
        Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString()));
        File.ReadAllBytes(fileName);
        doneCallback(Thread.CurrentThread.ManagedThreadId);
    }
}

Ответ 4

У вас может быть центральная очередь, потоки чтения потребуют доступа на запись во время нажатия содержимого в памяти в очередь. Для потоков обработки потребуется доступ на чтение к этой центральной очереди, чтобы вывести следующий поток данных, подлежащий обработке. Таким образом, вы минимизируете время, затрачиваемое на блокировки, и не должны иметь дело со сложностями кода, свободного от блокировки.

EDIT: в идеале вы будете обрабатывать все исключения/условия ошибки (если они есть) изящно, поэтому у вас нет точек сбоя.

В качестве альтернативы вы можете иметь несколько потоков, каждый из которых "утверждает" файл, переименовывая его перед обработкой, таким образом, файловая система становится реализацией для заблокированного доступа. Нет подсказки, если это будет более результативным, чем мой первоначальный ответ, сообщит только тестирование.

Ответ 5

Вообще говоря, 1000 маленьких файлов (как маленькие, кстати?) не должны обрабатывать шесть минут. В качестве быстрого теста сделайте find "foobar" * в каталоге, содержащем файлы (первый аргумент в кавычках не имеет значения, он может быть любым) и посмотреть, сколько времени требуется для обработки каждого файла. Если это займет больше одной секунды, я буду разочарован.

Предполагая, что этот тест подтверждает мое подозрение, процесс связан с ЦП, и вы не сможете улучшить разделение чтения на собственный поток. Вы должны:

  • Выясните, почему в среднем требуется более 350 мс для обработки небольшого ввода и, надеюсь, улучшить алгоритм.
  • Если нет возможности ускорить работу алгоритма, и у вас есть многоядерная машина (почти каждый делает это в наши дни), используйте пул потоков, чтобы назначить 1000 задач, каждая из которых выполняет чтение одного файла.

Ответ 6

Вы можете рассматривать очередь файлов для обработки. Заполняйте очередь один раз, сканируя каталог при запуске и обновляя очередь FileSystemWatcher, чтобы эффективно добавлять новые файлы в очередь без постоянное повторное сканирование каталога.

Если это вообще возможно, прочитайте и напишите на разные физические диски. Это даст вам максимальную производительность ввода-вывода.

Если у вас есть начальный пакет многих файлов для обработки, а затем неравномерный темп добавления новых файлов, и все это происходит на одном диске (чтение/запись), вы можете рассмотреть возможность буферизации обработанных файлов в памяти до тех пор, пока один из применяются два условия:

  • Есть (временно) нет новых файлов
  • Буферизировано столько файлов, что вы не хотите использовать больше памяти для буферизация (в идеале конфигурируемая пороговое значение)

Если ваша фактическая обработка файлов имеет интенсивность процессора, вы можете подумать о наличии одного потока обработки для ядра процессора. Однако для "нормальной" обработки процессорное время будет тривиальным по сравнению с временем ввода-вывода, и сложность не будет стоить каких-либо незначительных выигрышей.