Эффективная очередь FIFO для произвольных размеров кусков байтов в Python

Как я могу реализовать буфер FIFO, к которому я могу эффективно добавлять произвольно размерные куски байтов в голову и из которых я могу эффективно вырезать куски произвольного размера байтов из хвоста?

Фон:

У меня есть класс, который читает байты из файловых объектов в кусках произвольного размера и сам является файлоподобным объектом, из которого клиенты могут читать байты в кусках произвольного размера.

Способ, которым я реализован, заключается в том, что всякий раз, когда клиент хочет прочитать кусок байтов, класс будет многократно читать из базовых файловых объектов (с размерами блоков, соответствующих этим объектам) и добавлять байты в голову очереди FIFO до тех пор, пока в очереди не будет достаточно байтов, чтобы обслуживать кусок запрашиваемого размера клиенту. Затем он выталкивает эти байты из хвоста очереди и возвращает их клиенту.

У меня проблема с производительностью, которая возникает, когда размер блока для основных файловых объектов намного больше размера блока, который клиенты используют при чтении из класса.

Скажем, размер блока для основных файловых объектов - 1 MiB, а размер куска, который клиент читает, равен 1 KiB. В первый раз, когда клиент запрашивает 1 KiB, класс должен прочитать 1 MiB и добавить его в очередь FIFO. Затем для этого запроса и последующих запросов 1023 класс должен вывести 1 KiB из хвоста очереди FIFO, который постепенно уменьшается в размере от 1 MiB до 0 байтов, и в это время цикл начинается снова.

В настоящее время я реализовал это с помощью объекта StringIO. Запись новых байтов в конец объекта StringIO выполняется быстро, но удаление байтов с начала происходит очень медленно, потому что должен быть создан новый объект StringIO, который содержит копию всего предыдущего буфера минус первый кусок байтов.

Вопросы SO, которые касаются похожих вопросов, как правило, указывают на контейнер deque. Тем не менее, deque реализуется как двойной список. Написание фрагмента для дека потребует разделения куска на объекты, каждый из которых содержит один байт. Затем Deque добавит два указателя на каждый объект для хранения, что, вероятно, увеличит требования к памяти, по крайней мере, на порядок по сравнению с байтами. Кроме того, потребуется пройти много времени, чтобы пересечь связанный список и иметь дело с каждым объектом как для разделения кусков на объекты, так и для объединения объектов в куски.

Ответы

Ответ 1

  В настоящее время я реализовал это с помощью объекта StringIO. Написание нового байтов до конца объекта StringIO быстро, но удаление байтов с самого начала очень медленно, потому что новый объект StringIO, который содержит копию всего предыдущего буфера минус первый кусок байты, должны быть созданы.

На самом деле наиболее типичный способ реализации FIFO - это двухкратное использование буфера с двумя указателями как таковыми:

enter image description here источник изображения

Теперь вы можете реализовать это с помощью StringIO(), используя .seek() для чтения/записи из соответствующего места.

Ответ 2

Обновление: здесь реализована технология кругового буфера из ответа vartec (основываясь на моем первоначальном ответе, сохраненном ниже для тех, кто интересуется)

from cStringIO import StringIO

class FifoFileBuffer(object):
    def __init__(self):
        self.buf = StringIO()
        self.available = 0    # Bytes available for reading
        self.size = 0
        self.write_fp = 0

    def read(self, size = None):
        """Reads size bytes from buffer"""
        if size is None or size > self.available:
            size = self.available
        size = max(size, 0)

        result = self.buf.read(size)
        self.available -= size

        if len(result) < size:
            self.buf.seek(0)
            result += self.buf.read(size - len(result))

        return result


    def write(self, data):
        """Appends data to buffer"""
        if self.size < self.available + len(data):
            # Expand buffer
            new_buf = StringIO()
            new_buf.write(self.read())
            self.write_fp = self.available = new_buf.tell()
            read_fp = 0
            while self.size <= self.available + len(data):
                self.size = max(self.size, 1024) * 2
            new_buf.write('0' * (self.size - self.write_fp))
            self.buf = new_buf
        else:
            read_fp = self.buf.tell()

        self.buf.seek(self.write_fp)
        written = self.size - self.write_fp
        self.buf.write(data[:written])
        self.write_fp += len(data)
        self.available += len(data)
        if written < len(data):
            self.write_fp -= self.size
            self.buf.seek(0)
            self.buf.write(data[written:])
        self.buf.seek(read_fp)

Оригинальный ответ (замененный выше):

Вы можете использовать буфер и отслеживать начальный индекс (читать указатель на файл), иногда уплотняя его, когда он становится слишком большим (это должно давать довольно хорошую амортизацию).

Например, оберните объект StringIO следующим образом:

from cStringIO import StringIO
class FifoBuffer(object):
    def __init__(self):
        self.buf = StringIO()

    def read(self, *args, **kwargs):
        """Reads data from buffer"""
        self.buf.read(*args, **kwargs)

    def write(self, *args, **kwargs):
        """Appends data to buffer"""
        current_read_fp = self.buf.tell()
        if current_read_fp > 10 * 1024 * 1024:
            # Buffer is holding 10MB of used data, time to compact
            new_buf = StringIO()
            new_buf.write(self.buf.read())
            self.buf = new_buf
            current_read_fp = 0

        self.buf.seek(0, 2)    # Seek to end
        self.buf.write(*args, **kwargs)

        self.buf.seek(current_read_fp)

Ответ 3

Можете ли вы предположить что-либо о ожидаемых количествах чтения/записи?

В этом случае обработка данных в, например, 1024 байтовых фрагментах и ​​использование deque [1] может быть лучше; вы могли бы просто прочитать N полных фрагментов, затем один последний фрагмент для разделения и поместить остаток обратно в начало очереди.

1) collections.deque

class collections.deque([iterable[, maxlen]])

Возвращает новый объект deque, инициализированный слева направо (с помощью append()) с данными из итерабельного. Если iterable не указан, новый deque пуст.

Deques - это обобщение стеков и очередей (название произносится как "колода" и сокращается для "двойной очереди" ). Deques поддерживает поточно-безопасную, эффективную по объему память добавляет и выскакивает с обеих сторон deque примерно с той же производительностью O (1) в любом направлении....

Ответ 4

  ... но удаление байтов с самого начала происходит очень медленно, потому что должен быть создан новый объект StringIO, содержащий копию всего предыдущего буфера за вычетом первого фрагмента байтов.

Этот тип медлительности можно преодолеть с помощью bytearray в Python> = v3.4. Смотрите обсуждение в этой проблеме, а патч здесь.

Ключом является: удаление заголовка (байтов) из bytearray с помощью

a[:1] = b''   # O(1) (amortized)

намного быстрее чем

a = a[1:]     # O(len(a))

когда len(a) огромен (скажем, 10 ** 6).

bytearray также предоставляет вам удобный способ предварительного просмотра всего набора данных в виде массива (т.е. самого себя), в отличие от контейнера deque, который должен объединять объекты в чанк.

Теперь эффективный FIFO может быть реализован следующим образом:

class byteFIFO:
    """ byte FIFO buffer """
    def __init__(self):
        self._buf = bytearray()

    def put(self, data):
        self._buf.extend(data)

    def get(self, size):
        data = self._buf[:size]
        # The fast delete syntax
        self._buf[:size] = b''
        return data

    def peek(self, size):
        return self._buf[:size]

    def getvalue(self):
        # peek with no copy
        return self._buf

    def __len__(self):
        return len(self._buf)

Benchmark

import time
bfifo = byteFIFO()
bfifo.put(b'a'*1000000)        # a very long array
t0 = time.time()
for k in range(1000000):
    d = bfifo.get(4)           # "pop" from head
    bfifo.put(d)               # "push" in tail
print('t = ', time.time()-t0)  # t = 0.897 on my machine

Реализация циклического/кольцевого буфера в ответе Кэмерона требует 2,378 секунды, а его первоначальной реализации - 1,108 секунды.