Ответ 1
Wrap numpy ndarray
вокруг многопроцессорной обработки RawArray()
Существует несколько способов совместного использования массивов numpy в памяти через процессы. Давайте посмотрим, как вы можете это сделать, используя модуль многопроцессорности.
Первое важное замечание состоит в том, что numpy предоставляет функцию np.frombuffer()
для переноса интерфейса ndarray вокруг существующего объекта, который поддерживает протокол буфера (например, bytes()
, bytearray()
, array()
и т.д.). Это создает массивы только для чтения из объектов только для чтения и записываемых массивов из записываемых объектов.
Мы можем объединить это с разделяемой памятью RawArray()
, что обеспечивает многопроцессорность. Обратите внимание, что array()
не работает для этой цели, поскольку это прокси-объект с блокировкой и напрямую не отображает интерфейс буфера. Конечно, это означает, что мы должны обеспечить правильную синхронизацию наших нумерованных RawArrays.
Существует одна сложная проблема, связанная с RawArrays, обернутыми ndarray: когда многопроцессорность отправляет такой массив между процессами - и действительно, он должен будет отправить наши массивы, как только они созданы, для обоих рабочих - он рассохнет, а затем раскроет их. К сожалению, это приводит к созданию копий ndarrays вместо совместного использования их в памяти.
Решение, хотя и немного уродливое, должно сохранять RawArrays как, пока они не будут переданы рабочим, а только обернуть их в ndarrays после запуска каждого рабочего процесса > .
Кроме того, было бы предпочтительнее передавать массивы, будь то простой RawArray или обернутый ndarray, непосредственно через multiprocessing.Queue
, но это тоже не работает. RawArray не может быть помещен в такую очередь, и обернутый ndarray был бы маринован и рассыпан, поэтому фактически копируется.
Обходной путь - отправить список всех предварительно выделенных массивов рабочим процессам и сообщить индексы в этот список по очереди. Это очень похоже на прохождение маркеров (индексов), и тот, кто имеет токен, может работать с соответствующим массивом.
Структура основной программы может выглядеть так:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import queue
from multiprocessing import freeze_support, set_start_method
from multiprocessing import Event, Process, Queue
from multiprocessing.sharedctypes import RawArray
def create_shared_arrays(size, dtype=np.int32, num=2):
dtype = np.dtype(dtype)
if dtype.isbuiltin and dtype.char in 'bBhHiIlLfd':
typecode = dtype.char
else:
typecode, size = 'B', size * dtype.itemsize
return [RawArray(typecode, size) for _ in range(num)]
def main():
my_dtype = np.float32
# 125000000 (size) * 4 (dtype) * 2 (num) ~= 1 GB memory usage
arrays = create_shared_arrays(125000000, dtype=my_dtype)
q_free = Queue()
q_used = Queue()
bail = Event()
for arr_id in range(len(arrays)):
q_free.put(arr_id) # pre-fill free queue with allocated array indices
pr1 = MyDataLoader(arrays, q_free, q_used, bail,
dtype=my_dtype, step=1024)
pr2 = MyDataProcessor(arrays, q_free, q_used, bail,
dtype=my_dtype, step=1024)
pr1.start()
pr2.start()
pr2.join()
print("\n{} joined.".format(pr2.name))
pr1.join()
print("{} joined.".format(pr1.name))
if __name__ == '__main__':
freeze_support()
# On Windows, only "spawn" is available.
# Also, this tests proper sharing of the arrays without "cheating".
set_start_method('spawn')
main()
Это готовит список из двух массивов, две очереди - "свободную" очередь, в которой MyDataProcessor помещает индексы массивов, с которыми она выполняется, и из нее извлекает MyDataLoader, а также "использованную" очередь, в которой MyDataLoader помещает индексы легко заполненных массивов и MyDataProcessor извлекает их из - и multiprocessing.Event
, чтобы начать согласованное спасение всех работников. На данный момент мы можем покончить с последним, поскольку у нас есть только один производитель и один потребитель массивов, но не больно быть готовым к большему количеству работников.
Затем мы предварительно заполняем "пустую" очередь всеми индексами наших RawArrays в списке и создаем экземпляр одного из каждого типа работников, передавая им необходимые объекты связи. Мы запускаем оба из них и просто ждем их до join()
.
Вот как выглядит MyDataProcessor, который потребляет индексы массива из "использованной" очереди и отправляет данные в какой-либо внешний черный ящик (debugio.output
в примере):
class MyDataProcessor(Process):
def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
super().__init__()
self.arrays = arrays
self.q_free = q_free
self.q_used = q_used
self.bail = bail
self.dtype = dtype
self.step = step
def run(self):
# wrap RawArrays inside ndarrays
arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]
from debugio import output as writer
while True:
arr_id = self.q_used.get()
if arr_id is None:
break
arr = arrays[arr_id]
print('(', end='', flush=True) # just visualizing activity
for j in range(0, len(arr), self.step):
writer.write(str(arr[j]) + '\n')
print(')', end='', flush=True) # just visualizing activity
self.q_free.put(arr_id)
writer.flush()
self.bail.set() # tell loaders to bail out ASAP
self.q_free.put(None, timeout=1) # wake up loader blocking on get()
try:
while True:
self.q_used.get_nowait() # wake up loader blocking on put()
except queue.Empty:
pass
Первое, что он делает, - это обернуть полученные RawArrays в ndarrays с помощью "np.frombuffer()" и сохранить новый список, поэтому они могут использоваться как массивы numpy во время выполнения процесса, и не нужно их обертывать и снова.
Обратите внимание, что MyDataProcessor только записывает событие self.bail
, он никогда не проверяет его. Вместо этого, если нужно сказать, что нужно выйти, он найдет знак None
в очереди, а не индекс массива. Это делается для того, когда MyDataLoader не имеет больше доступных данных и запускает процедуру слежения, MyDataProcessor все равно может обрабатывать все допустимые массивы, находящиеся в очереди, без преждевременного выхода.
Вот как выглядит MyDataLoader:
class MyDataLoader(Process):
def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
super().__init__()
self.arrays = arrays
self.q_free = q_free
self.q_used = q_used
self.bail = bail
self.dtype = dtype
self.step = step
def run(self):
# wrap RawArrays inside ndarrays
arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]
from debugio import input as reader
for _ in range(10): # for testing we end after a set amount of passes
if self.bail.is_set():
# we were asked to bail out while waiting on put()
return
arr_id = self.q_free.get()
if arr_id is None:
# we were asked to bail out while waiting on get()
self.q_free.put(None, timeout=1) # put it back for next loader
return
if self.bail.is_set():
# we were asked to bail out while we got a normal array
return
arr = arrays[arr_id]
eof = False
print('<', end='', flush=True) # just visualizing activity
for j in range(0, len(arr), self.step):
line = reader.readline()
if not line:
eof = True
break
arr[j] = np.fromstring(line, dtype=self.dtype, sep='\n')
if eof:
print('EOF>', end='', flush=True) # just visualizing activity
break
print('>', end='', flush=True) # just visualizing activity
if self.bail.is_set():
# we were asked to bail out while we filled the array
return
self.q_used.put(arr_id) # tell processor an array is filled
if not self.bail.is_set():
self.bail.set() # tell other loaders to bail out ASAP
# mark end of data for processor as we are the first to bail out
self.q_used.put(None)
Он очень похож по структуре на другого работника. Причина, по которой она немного раздута, заключается в том, что она проверяет событие self.bail
во многих точках, чтобы уменьшить вероятность застревания. (Он не является полностью надежным, так как существует небольшая вероятность того, что событие может быть установлено между проверкой и доступом к очереди. Если это проблема, нужно использовать некоторый примитив примитива синхронизации, который обеспечивает арбитражный доступ как к объединению Событий, так и к очереди.)
Он также обматывает полученные RawArrays в ndarrays в самом начале и считывает данные из внешнего черного ящика (debugio.input
в примере).
Обратите внимание, что, играя с аргументами step=
для обоих работников в функции main()
, мы можем изменить соотношение количества чтения и записи (строго для целей тестирования) в производственной среде step=
будет 1
, чтение и запись всех членов массива numpy).
Увеличение обоих значений приводит к тому, что работники получают доступ только к нескольким значениям в массивах numpy, тем самым значительно ускоряя все, что показывает, что производительность не ограничена связью между рабочими процессами. Если бы мы помещали массивы numpy непосредственно в Очереди, копируя их вперед и назад между процессами целиком, увеличение размера шага не привело бы к значительному повышению производительности - оно оставалось бы медленным.
Для справки, вот модуль debugio
, который я использовал для тестирования:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from ast import literal_eval
from io import RawIOBase, BufferedReader, BufferedWriter, TextIOWrapper
class DebugInput(RawIOBase):
def __init__(self, end=None):
if end is not None and end < 0:
raise ValueError("end must be non-negative")
super().__init__()
self.pos = 0
self.end = end
def readable(self):
return True
def read(self, size=-1):
if self.end is None:
if size < 0:
raise NotImplementedError("size must be non-negative")
end = self.pos + size
elif size < 0:
end = self.end
else:
end = min(self.pos + size, self.end)
lines = []
while self.pos < end:
offset = self.pos % 400
pos = self.pos - offset
if offset < 18:
i = (offset + 2) // 2
pos += i * 2 - 2
elif offset < 288:
i = (offset + 12) // 3
pos += i * 3 - 12
else:
i = (offset + 112) // 4
pos += i * 4 - 112
line = str(i).encode('ascii') + b'\n'
line = line[self.pos - pos:end - pos]
self.pos += len(line)
size -= len(line)
lines.append(line)
return b''.join(lines)
def readinto(self, b):
data = self.read(len(b))
b[:len(data)] = data
return len(data)
def seekable(self):
return True
def seek(self, offset, whence=0):
if whence == 0:
pos = offset
elif whence == 1:
pos = self.pos + offset
elif whence == 2:
if self.end is None:
raise ValueError("cannot seek to end of infinite stream")
pos = self.end + offset
else:
raise NotImplementedError("unknown whence value")
self.pos = max((pos if self.end is None else min(pos, self.end)), 0)
return self.pos
class DebugOutput(RawIOBase):
def __init__(self):
super().__init__()
self.buf = b''
self.num = 1
def writable(self):
return True
def write(self, b):
*lines, self.buf = (self.buf + b).split(b'\n')
for line in lines:
value = literal_eval(line.decode('ascii'))
if value != int(value) or int(value) & 255 != self.num:
raise ValueError("expected {}, got {}".format(self.num, value))
self.num = self.num % 127 + 1
return len(b)
input = TextIOWrapper(BufferedReader(DebugInput()), encoding='ascii')
output = TextIOWrapper(BufferedWriter(DebugOutput()), encoding='ascii')