Совместное использование большого массива с непрерывной записью только для чтения между процессами многопроцессорности
У меня есть 60-килобайтный массив SciPy (матрица), который я должен использовать между объектами 5+ multiprocessing
Process
. Я видел numpy-sharedmem и читал эту дискуссию в списке SciPy. Кажется, что существует два подхода - numpy-sharedmem
и использование multiprocessing.RawArray()
и отображение NumPy dtype
в ctype
s. Теперь, numpy-sharedmem
, кажется, путь, но я еще не видел хороший ссылочный пример. Мне не нужны никакие блокировки, так как массив (на самом деле матрица) будет доступен только для чтения. Теперь, из-за его размера, я бы хотел избежать копирования. Похоже, правильный метод состоит в том, чтобы создать единственную копию массива как массив sharedmem
, а затем передать его объектам Process
? Несколько конкретных вопросов:
-
Каков наилучший способ передачи дескриптора sharedmem суб- Process()
es? Нужна ли мне очередь для прохождения одного массива? Будет ли труба лучше? Могу ли я просто передать его в качестве аргумента для подкласса Process()
init (где я предполагаю, что он маринован)?
-
В обсуждении, которое я упомянул выше, упоминается numpy-sharedmem
, не обладающее 64-битной безопасностью? Я определенно использую некоторые структуры, которые не являются 32-разрядными адресуемыми.
-
Есть ли компромисс с подходом RawArray()
? Медленно, buggier?
-
Нужно ли мне сопоставление ctype-to-dtype для метода numpy-sharedmem?
-
Есть ли у кого-нибудь пример кода OpenSource? Я очень практикован, и мне трудно заставить это работать без какого-либо хорошего примера, чтобы посмотреть.
Если есть дополнительная информация, которую я могу предоставить, чтобы помочь прояснить это для других, прокомментируйте, и я добавлю. Спасибо!
Это нужно запускать на Ubuntu Linux и, возможно, в Mac OS, но переносимость не вызывает большого беспокойства.
Ответы
Ответ 1
@Велимир Млакер дал отличный ответ. Я думал, что могу добавить несколько комментариев и крошечный пример.
(Я не мог найти много документации по sharedmem - это результаты моих собственных экспериментов.)
- Вам нужно передать дескрипторы при запуске подпроцесса или после его запуска? Если это только первый, вы можете просто использовать аргументы
target
и args
для Process
. Это потенциально лучше, чем использование глобальной переменной.
- На странице обсуждения, которую вы связали, похоже, что поддержка 64-разрядного Linux была добавлена в sharedmem некоторое время назад, поэтому это может быть проблема без проблем.
- Я не знаю об этом.
- Нет. См. Пример ниже.
Пример
#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy
def do_work(data, start):
data[start] = 0;
def split_work(num):
n = 20
width = n/num
shared = sharedmem.empty(n)
shared[:] = numpy.random.rand(1, n)[0]
print "values are %s" % shared
processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
for p in processes:
p.start()
for p in processes:
p.join()
print "values are %s" % shared
print "type is %s" % type(shared[0])
if __name__ == '__main__':
split_work(4)
Выход
values are [ 0.81397784 0.59667692 0.10761908 0.6736734 0.46349645 0.98340718
0.44056863 0.10701816 0.67167752 0.29158274 0.22242552 0.14273156
0.34912309 0.43812636 0.58484507 0.81697513 0.57758441 0.4284959
0.7292129 0.06063283]
values are [ 0. 0.59667692 0.10761908 0.6736734 0.46349645 0.
0.44056863 0.10701816 0.67167752 0.29158274 0. 0.14273156
0.34912309 0.43812636 0.58484507 0. 0.57758441 0.4284959
0.7292129 0.06063283]
type is <type 'numpy.float64'>
Этот связанный вопрос может быть полезен.
Ответ 2
Если вы находитесь в Linux (или любой POSIX-совместимой системе), вы можете определить этот массив как глобальную переменную. multiprocessing
использует fork()
для Linux при запуске нового дочернего процесса. Недавно созданный дочерний процесс автоматически разделяет память с родителем, пока он не меняет его (copy-on-write).
Поскольку вы говорите: "Мне не нужны какие-либо блокировки, так как массив (на самом деле матрица) будет доступен только для чтения", использование этого поведения было бы очень простым и в то же время чрезвычайно эффективным подходом: все дочерние процессы будут обращаться к тем же данным в физической памяти при чтении этого большого массива numpy.
Не передавайте свой массив конструктору Process()
, он проинструктирует multiprocessing
to pickle
данные для ребенка, что было бы крайне неэффективно или невозможно в вашем случае. В Linux сразу после fork()
ребенок является точной копией родителя с использованием той же физической памяти, поэтому все, что вам нужно сделать, это убедиться, что переменная Python, содержащая "матрицу", доступна из функции target
что вы передаете Process()
. Это обычно можно достичь с помощью "глобальной" переменной.
Пример кода:
from multiprocessing import Process
from numpy import random
global_array = random.random(10**4)
def child():
print sum(global_array)
def main():
processes = [Process(target=child) for _ in xrange(10)]
for p in processes:
p.start()
for p in processes:
p.join()
if __name__ == "__main__":
main()
В Windows, который не поддерживает fork()
- multiprocessing
, используется вызов API win32 CreateProcess
. Он создает совершенно новый процесс из любого исполняемого файла. Поэтому для Windows требуется рассортировать данные для ребенка, если нужны данные, которые были созданы во время выполнения родителя.
Ответ 3
Вам может быть интересен небольшой фрагмент кода, который я написал: github.com/vmlaker/benchmark-sharedmem
Единственный интересующий файл main.py
. Это эталон numpy-sharedmem - код просто передает массивы (либо numpy
или sharedmem
) в порожденные процессы через Pipe. Рабочие просто вызывают sum()
по данным. Меня интересовало сравнение времени передачи данных между двумя реализациями.
Я также написал еще один, более сложный код: github.com/vmlaker/sherlock.
Здесь я использую модуль numpy-sharedmem для обработки изображений в реальном времени с помощью OpenCV - изображения представляют собой массивы NumPy, как и OpenCV newer cv2
API. Изображения, на самом деле их ссылки, разделяются между процессами через объект словаря, созданный из multiprocessing.Manager
(в отличие от использования Queue или Pipe.) я "Получаем большие улучшения производительности по сравнению с использованием простых массивов NumPy.
Труба против очереди:
По моему опыту, IPC с Pipe быстрее, чем Queue. И это имеет смысл, поскольку Queue добавляет блокировку, чтобы сделать ее безопасной для нескольких производителей/потребителей. Труба нет. Но если у вас есть только два процесса, говорящих "назад и назад", безопасно использовать Pipe или, как гласят документы:
... нет риска коррупции от процессов, использующих разные концы трубы одновременно.
sharedmem
безопасность:
Основная проблема с модулем sharedmem
- это возможность утечки памяти при неудачном выходе программы. Это описано в длинном обсуждении здесь. Хотя 10 апреля 2011 года Sturla упоминает об исправлении утечки памяти, с тех пор я все еще испытываю утечки, используя оба репозитория, Sturla Molden принадлежит GitHub (github.com/sturlamolden/sharedmem-numpy) и Chris Lee-Messer на Bitbucket (bitbucket.org/cleemesser/numpy-sharedmem).
Ответ 4
Если ваш массив такой большой, вы можете использовать numpy.memmap
. Например, если у вас есть массив, хранящийся на диске, скажем 'test.array'
, вы можете использовать одновременные процессы для доступа к данным в нем даже в режиме "записи", но ваш случай проще, поскольку вам нужен только режим чтения.
Создание массива:
a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000))
Затем вы можете заполнить этот массив так же, как и обычный массив. Например:
a[:10,:100]=1.
a[10:,100:]=2.
Данные сохраняются на диск при удалении переменной a
.
Позже вы можете использовать несколько процессов, которые будут обращаться к данным в test.array
:
# read-only mode
b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000))
# read and writing mode
c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000))
Похожие ответы:
Ответ 5
Вам также может быть полезно взглянуть на документацию для pyro, как если бы вы могли соответствующим образом разбить свою задачу, вы можете использовать он выполняет различные разделы на разных машинах, а также на разных ядрах на одной машине.