Доступ к файлу через несколько потоков

Я хочу получить доступ к большому файлу (размер файла может варьироваться от 30 МБ до 1 ГБ) через 10 потоков, а затем обрабатывать каждую строку в файле и записывать их в другой файл через 10 потоков. Если я использую только один поток для доступа к IO, другие потоки блокируются. Обработка занимает некоторое время, почти эквивалентное чтению строки кода из файловой системы. Существует еще одно ограничение, данные в выходном файле должны быть в том же порядке, что и входной файл.

Я хочу, чтобы вы думали о дизайне этой системы. Существует ли какой-либо существующий API для поддержки одновременного доступа к файлам?

Также запись в один файл может привести к тупиковой ситуации.

Просьба предложить, как достичь этого, если я заинтересован в ограничении времени.

Ответы

Ответ 1

  • Вы должны абстрактный из файла читать. Создайте класс, который читает файл и отправляет содержимое в различное количество потоков.

Класс не должен отправлять строки, он должен обернуть их в класс Line, который содержит метаинформацию, e. г. номер строки, так как вы хотите сохранить исходную последовательность.

  • Вам нужен класс обработки, который выполняет фактическую работу по собранным данным. В вашем случае нет работы. Класс просто хранит информацию, вы можете продлить ее когда-нибудь, чтобы сделать дополнительные вещи (например, отменить строку. Добавить некоторые другие строки,...)

  • Затем вам понадобится класс слияния, который делает некий вид multiway merge sort для потоков обработки и собирает все ссылки на Line в последовательности.

Класс слияния также может записать данные обратно в файл, но чтобы код был чистым...

  • Я бы рекомендовал создать класс вывода, который снова будет abstracts из всех файлов и файлов.

Конечно, для этого подхода вам понадобится много памяти, если у вас недостаточно основной памяти. Вам понадобится подход, основанный на потоке, который работает inplace, чтобы уменьшить накладные расходы памяти.


ОБНОВЛЕНИЕ Потоковый подход

Everthing остается неизменным, кроме:

Поток Reader загружает данные чтения в Balloon. Этот воздушный шар имеет определенное количество экземпляров Line, которые он может удерживать (чем больше число, тем больше потребляемой основной памяти).

Поверхности обработки берут Line с воздушного шара, читатель набрасывает больше линий в баллон, когда он становится более пустым.

Класс слияния берет строки из потоков обработки, как указано выше, и автор записывает данные обратно в файл.

Может быть, вы должны использовать FileChannel в потоках ввода-вывода, поскольку он больше подходит для чтения больших файлов и, вероятно, потребляет меньше памяти при обработке файла (но это всего лишь предполагаемое предположение).

Ответ 2

Я бы начал с трех потоков.

  • поток читателя, который считывает данные, разбивает его на "строки" и помещает их в ограниченную очередь блокировки (Q1),
  • поток обработки, который читает из Q1, обрабатывает и помещает их во вторую ограниченную блокирующую очередь (Q2) и
  • поток писем, который читает из Q2 и записывает на диск.

Конечно, я также гарантирую, что выходной файл находится на физически другом диске, чем входной файл.

Если обработка, как правило, быстрее медленнее, чем ввод/вывод (отслеживает размеры очереди), вы можете начать экспериментировать с двумя или более параллельными "процессорами", которые синхронизируются в том, как они читают и напишите их данные.

Ответ 3

Любой тип ввода-вывода, будь то диск, сеть и т.д., как правило, является узким местом.

Используя несколько потоков, вы усугубляете проблему, так как очень вероятно, что только один поток может иметь доступ к ресурсу ввода-вывода за один раз.

Лучше всего использовать один поток для чтения, передавать информацию в рабочий пул потоков, а затем писать прямо оттуда. Но опять же, если работники пишут на том же месте, будут узкие места, так как только один может иметь замок. Легко фиксируется путем передачи данных в один поток сообщений.

В "коротком":

Один поток читателя записывает в BlockingQueue или тому подобное, это дает ему естественную упорядоченную последовательность.

Затем потоки рабочего пула ожидают в очереди данных, записывая порядковый номер.

Затем рабочие потоки записывают обработанные данные в другой BlockingQueue с его исходным порядковым номером, чтобы

Нить записи может принимать данные и записывать их последовательно.

Это, скорее всего, даст самую быструю реализацию.

Ответ 4

Вы можете сделать это с помощью FileChannel в java, который позволяет нескольким потокам обращаться к одному файлу. FileChannel позволяет читать и писать, начиная с позиции. См. Пример кода ниже:

import java.io.*;
import java.nio.*;
import java.nio.channels.*;

public class OpenFile implements Runnable
{
    private FileChannel _channel;
    private FileChannel _writeChannel;
    private int _startLocation;
    private int _size;

    public OpenFile(int loc, int sz, FileChannel chnl, FileChannel write)
    {
        _startLocation = loc;
        _size = sz;
        _channel = chnl;
        _writeChannel = write;
    }

    public void run()
    {
        try
        {
            System.out.println("Reading the channel: " + _startLocation + ":" + _size);
            ByteBuffer buff = ByteBuffer.allocate(_size);
            if (_startLocation == 0)
                Thread.sleep(100);
            _channel.read(buff, _startLocation);
            ByteBuffer wbuff = ByteBuffer.wrap(buff.array());
            int written = _writeChannel.write(wbuff, _startLocation);
            System.out.println("Read the channel: " + buff + ":" + new String(buff.array()) + ":Written:" + written);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    public static void main(String[] args)
        throws Exception
    {
        FileOutputStream ostr = new FileOutputStream("OutBigFile.dat");
        FileInputStream str = new FileInputStream("BigFile.dat");
        String b = "Is this written";
        //ostr.write(b.getBytes());
        FileChannel chnl = str.getChannel();
        FileChannel write = ostr.getChannel();
        ByteBuffer buff = ByteBuffer.wrap(b.getBytes());
        write.write(buff);
        Thread t1 = new Thread(new OpenFile(0, 10000, chnl, write));
        Thread t2 = new Thread(new OpenFile(10000, 10000, chnl, write));
        Thread t3 = new Thread(new OpenFile(20000, 10000, chnl, write));
        t1.start();
        t2.start();
        t3.start();
        t1.join();
        t2.join();
        t3.join();
        write.force(false);
        str.close();
        ostr.close();
    }
}

В этом примере есть три потока, которые читают один и тот же файл и записываются в один и тот же файл и не конфликтуют. Эта логика в этом примере не учитывала, что назначенные размеры не должны заканчиваться на конце линии и т.д. Вы найдете правильную логику на основе ваших данных.

Ответ 5

Я столкнулся с подобной ситуацией раньше и тем, как я справился с этим:

Прочитайте файл в основном потоке по строкам и отправьте обработку строки исполнителю. Разумная отправная точка в ExecutorService здесь. Если вы планируете использовать фиксированное отсутствие потоков, вас может заинтересовать Executors.newFixedThreadPool(10) factory в классе Executors. Невозможно также использовать javadocs в этом разделе.

В принципе, я бы отправил все задания, вызвонил, а затем в основном потоке продолжал записывать в выходной файл в порядке для всех возвращаемых Future. Вы можете использовать метод Future class 'get(), блокирующий природу, чтобы обеспечить порядок, но вы действительно не должны использовать многопоточность для записи, так же как вы не будете использовать ее для чтения. Имеет смысл?

Однако, 1 GB файлы данных? Если бы я был вами, я был бы сначала заинтересован в значительном разрушении этих файлов.

PS. Я сознательно избегал кода в ответе, так как я бы хотел, чтобы OP сам это пробовал. Предоставлены достаточные указатели на определенные классы, методы API и пример.

Ответ 6

Помните, что идеальное количество потоков ограничено аппаратной архитектурой и другими материалами (вы могли бы подумать о том, чтобы обратиться к пулу потоков, чтобы рассчитать наилучшее количество потоков). Предполагая, что "10" - хорошее число, мы продолжаем. =)

Если вы ищете производительность, вы можете сделать следующее:

  • Прочитайте файл, используя потоки, которые у вас есть, и обработайте их согласно вашему бизнес-правилу. Сохраните одну управляющую переменную, которая указывает следующую ожидаемую строку, которая будет вставлена ​​в выходной файл.

  • Если следующая ожидаемая строка будет обработана, добавьте ее в буфер (очередь) (было бы идеально, если бы вы могли найти способ вставки прямого в выходной файл, но у вас были бы проблемы с блокировкой), В противном случае сохраните эту "будущую" строку внутри двоично-поискового дерева, упорядочив позицию по строке. Binary-search-tree дает вам временную сложность "O (log n)" для поиска и вставки, что очень быстро для вашего контекста. Продолжайте заполнять дерево до тех пор, пока следующая "ожидаемая" строка не будет обработана.

Активирует поток, который будет отвечать за открытие выходного файла, периодически израсходует буфер и записывает строки в файл.

Кроме того, следует отслеживать ожидаемый "младший" node BST для вставки в файл. Вы можете использовать его, чтобы проверить, находится ли строка будущего внутри BST перед началом поиска на нем.

  • Когда следующая ожидаемая строка завершается обработкой, вставьте ее в очередь и проверьте, находится ли следующий элемент внутри дерева двоичного поиска. В случае, если следующая строка находится в дереве, удалите node из дерева и добавьте содержимое node в очередь и повторите поиск, если следующая строка уже находится внутри дерева.
  • Повторяйте эту процедуру до тех пор, пока все файлы не будут обработаны, дерево пуст, а очередь будет пустой.

В этом подходе используется - O (n), чтобы прочитать файл (но распараллелен) - O (1), чтобы вставить упорядоченные строки в очередь - O (Logn) * 2 для чтения и записи двоичного дерева поиска - O (n) для записи нового файла

плюс стоимость вашего бизнес-правила и операций ввода-вывода.

Надеюсь, что это поможет.

Ответ 7

Одним из возможных способов будет создание одного потока, который будет считывать входной файл и помещать строки чтения в очередь блокировки. Несколько потоков будут ждать данные из этой очереди, обрабатывать данные.

Другим возможным решением может быть разделение файла на куски и назначение каждого фрагмента отдельному потоку.

Чтобы избежать блокировки, вы можете использовать асинхронный ввод-вывод. Вы также можете взглянуть на шаблон Proactor из Pattern-Oriented Software Architecture Volume 2

Ответ 8

Spring Пакет приходит в голову.

Для поддержания порядка потребуется шаг после процесса, т.е. сохранить индекс чтения/ключ, упорядоченный в контексте обработки. Логика обработки также должна хранить обработанную информацию в контексте. После завершения обработки вы можете после этого обработать список и пишите в файл.

Остерегайтесь проблем с OOM.

Ответ 9

Так как порядок нужно поддерживать, поэтому сама проблема говорит о том, что чтение и запись не могут выполняться параллельно, поскольку это последовательный процесс, единственное, что вы можете делать параллельно, - это обработка записей, но это также не очень помогает только один писатель.

Вот проектное предложение:

  • Использовать один поток t1 для чтения файла и хранения данных в LinkedBlockingQueue Q1
  • Используйте другой поток t2 для чтения данных из Q1 и введите другой LinkedBlockingQueue Q2
  • Thread t3 считывает данные из Q2 и записывает их в файл.
  • Чтобы убедиться, что вы не сталкиваетесь с OutofMemoryError, вы должны инициализировать очереди с соответствующим размером
  • Вы можете использовать CyclicBarrier, чтобы гарантировать, что все потоки завершили свою работу.
  • Кроме того, вы можете установить действие в CyclicBarrier, где вы можете выполнять свои задачи после обработки.

Удачи, надеясь, что вы получите лучший дизайн.

Приветствия!!

Ответ 10

Я столкнулся с аналогичной проблемой в прошлом. Где я должен читать данные из одного файла, обрабатывать его и записывать результат в другой файл. Поскольку часть обработки была очень тяжелой. Поэтому я попытался использовать несколько потоков. Вот проект, который я выполнил для решения моей проблемы:

  • Используйте основную программу как master, прочитайте весь файл за один раз (но не начинайте обработку). Создайте один объект данных для каждой строки с порядком последовательности.
  • Используйте одну очередь приоритетных blocksqueue say в главном, добавьте в нее эти объекты данных. Поделиться ссылкой этой очереди в конструкторе каждого потока.
  • Создайте различные процессоры, то есть потоки, которые будут прослушиваться в этой очереди. Когда мы добавляем объекты данных в эту очередь, мы вызываем метод notifyall. Все потоки будут обрабатываться индивидуально.
  • После обработки поместите все результаты в одну карту и поместите результаты с ключом в качестве его порядкового номера.
  • Когда очередь пуста и все потоки неактивны, значит, обработка выполнена. Остановите потоки. Итерация по карте и запись результатов в файл