Симулятор частиц Python: обработка вне ядра
Описание проблемы
При написании симулятора частиц Монте-Карло (движение буриунов и фотонное излучение) в python/numpy. Мне нужно сохранить выходной файл моделирования ( → 10 ГБ) в файл и обработать данные на втором шаге. Совместимость с Windows и Linux важна.
Число частиц (n_particles
) составляет 10-100. Количество шагов времени (time_size
) составляет ~ 10 ^ 9.
Моделирование имеет 3 шага (код ниже для версии "все-в-ОЗУ" ):
-
Имитировать (и сохранить) массив значений emission
(содержит много элементов почти-0):
- shape (
n_particles
x time_size
), float32, размер 80 ГБ
-
Вычислить массив counts
(случайные значения из процесса Пуассона с ранее вычисленными скоростями):
-
shape (n_particles
x time_size
), uint8, размер 20 ГБ
counts = np.random.poisson(lam=emission).astype(np.uint8)
-
Найти временные метки (или индекс) счетчиков. Графы почти всегда равны 0, поэтому массивы timestamp будут помещаться в ОЗУ.
# Loop across the particles
timestamps = [np.nonzero(c) for c in counts]
Я делаю шаг 1 один раз, затем повторяю шаг 2-3 много (~ 100) раз. В будущем мне может понадобиться предварительно обработать emission
(применить cumsum
или другие функции) до вычисления counts
.
Вопрос
У меня есть работающая в памяти реализация, и я пытаюсь понять, что лучше всего подходит для реализации устаревшей версии, которая может масштабировать (намного) более длительные симуляции.
Что бы я хотел:
Мне нужно сохранить массивы в файл, и я хотел бы использовать один файл для моделирования. Мне также нужен "простой" способ хранения и вызова словаря параметра моделирования (скаляры).
В идеале мне нужен массив numpy с поддержкой файлов, который я могу предварительно распределить и заполнить куски. Затем я хотел бы, чтобы методы массива numpy (max
, cumsum
,...) работали прозрачно, требуя только ключевое слово chunksize
, чтобы указать, сколько массива загружается на каждой итерации.
Еще лучше, мне нужен Numexpr, который работает не между кешем и оперативной памятью, а между ОЗУ и жестким диском.
Каковы практические варианты
В качестве первого варианта
Я начал экспериментировать с pyTables, но я не доволен его сложностью и абстракциями (так отличается от numpy). Более того, мое текущее решение (см. Ниже) является UGLY и не очень эффективным.
Итак, мои варианты, для которых я ищу ответ,
-
реализовать массив numpy с требуемой функциональностью (как?)
-
использовать pytable более умным способом (разные структуры данных/методы)
-
используйте другую библиотеку: h5py, blaze, pandas... (я пока не пробовал ни одного из них).
Предварительное решение (pyTables)
Я сохраняю параметры моделирования в группе '/parameters'
: каждый параметр преобразуется в скалярный массив numpy. Подробное решение, но оно работает.
Я сохраняю emission
как расширяемый массив (EArray
), потому что я генерирую данные в кусках, и мне нужно добавить каждый новый кусок (хотя я знаю конечный размер). Сохранение counts
более проблематично. Если сохранить его как массива pytable, сложно выполнить такие запросы, как "counts >= 2". Поэтому я сохранил counts как несколько таблиц (по одной на каждую частицу) [UGLY], и я запрашиваю с .get_where_list('counts >= 2')
. Я не уверен, что это экономически эффективно, и
генерируя все эти таблицы вместо использования одного массива, значительно улучшает файл HDF5. Более того, как ни странно, создание этих таблиц требует создания настраиваемого типа dtype (даже для стандартных типов numpy):
dt = np.dtype([('counts', 'u1')])
for ip in xrange(n_particles):
name = "particle_%d" % ip
data_file.create_table(
group, name, description=dt, chunkshape=chunksize,
expectedrows=time_size,
title='Binned timetrace of emitted ph (bin = t_step)'
' - particle_%d' % particle)
Каждая таблица счетчиков с частицами имеет другое имя (name = "particle_%d" % ip
) и что мне нужно поместить их в список python для легкой итерации.
EDIT. Результатом этого вопроса является симулятор броуновского движения, называемый PyBroMo.
Ответы
Ответ 1
Решение PyTable
Поскольку функциональность, предоставляемая Pandas, не требуется, а обработка выполняется намного медленнее (см. нижеприведенный блокнот), наилучшим подходом является использование PyTables или h5py напрямую. Я пробовал только подход pytables до сих пор.
Все тесты были выполнены в этом ноутбуке:
Введение в структуры данных pytables
Ссылка: Официальные документы PyTables
Pytables позволяет хранить данные в файлах HDF5 в двух типах форматов: массивы и таблицы.
Массивы
Существует 3 типа массивов Array
, CArray
и EArray
. Все они позволяют хранить и извлекать (многомерные) фрагменты с нотной заметкой, подобной нарезке numpy.
# Write data to store (broadcasting works)
array1[:] = 3
# Read data from store
in_ram_array = array1[:]
Для оптимизации в некоторых случаях использования CArray
сохраняется в "кусках", размер которых можно выбрать с помощью chunk_shape
во время создания.
Array
и CArray
размер фиксируется во время создания. Вы можете заполнить/записать массив chunk-by-chunk после создания. Обратно EArray
может быть расширен с помощью метода .append()
.
Таблицы
table
- совсем другой зверь. Это в основном "таблица". У вас есть только 1D-индекс, и каждый элемент является строкой. Внутри каждой строки есть типы данных "столбцы", каждый из столбцов может иметь другой тип. Это вы знакомы с numpy records-arrays, таблица в основном представляет собой 1D-массив записей, причем каждый элемент имеет много полей в качестве столбцов.
1D или 2D массивы numpy могут храниться в таблицах, но это немного сложнее: нам нужно создать тип данных строки. Например, чтобы сохранить массив 1D uint8 numpy, нам нужно сделать:
table_uint8 = np.dtype([('field1', 'u1')])
table_1d = data_file.create_table('/', 'array_1d', description=table_uint8)
Итак, зачем использовать таблицы? Потому что, в отличие от массивов, таблицы могут быть эффективно запрошены. Например, если мы хотим искать элементы > 3 в огромной табличной таблице, мы можем сделать:
index = table_1d.get_where_list('field1 > 3')
Не только это просто (по сравнению с массивами, где нам нужно сканировать весь файл в кусках и строить index
в цикле), но это также очень быстро.
Как сохранить параметры моделирования
Лучший способ хранения параметров моделирования - использовать группу (т.е. /parameters
), преобразовать каждый скаляр в массив numpy и сохранить его как CArray
.
Массив для "emission
"
emission
- это самый большой массив, который генерируется и читается последовательно. Для этого шаблона использования Хорошая структура данных EArray
. На "смоделированных" данных с ~ 50% элементов нулей blosc сжатие (level=5
) достигает 2,2x степени сжатия. Я обнаружил, что размер блока 2 ^ 18 (256 тыс.) Имеет минимальное время обработки.
Сохранение "counts
"
Сохранение также "counts
" увеличит размер файла на 10% и займет 40% больше времени для вычисления временных меток. Сохранение counts
не является преимуществом, потому что в конце нужны только отметки времени.
Преимущество заключается в том, что повторная перестройка индекса (временных меток) проще, поскольку мы запрашиваем полную ось времени в одной команде (.get_where_list('counts >= 1')
). Напротив, с обработкой с чередованием нам нужно выполнить некоторую арифметику индекса, которая немного сложна и, возможно, может быть обузой.
Однако сложность кода может быть небольшой по сравнению со всеми другими операциями (сортировка и слияние), которые необходимы в обоих случаях.
Сохранение "timestamps
"
Временные метки могут накапливаться в ОЗУ. Тем не менее, мы не знаем размер массива перед запуском, и окончательный вызов hstack()
необходим для "слияния" разных кусков, хранящихся в списке. Это удваивает требования к памяти, поэтому объем ОЗУ может быть недостаточным.
Мы можем хранить временные метки as-we-go в таблице, используя .append()
. В конце мы можем загрузить таблицу в памяти с помощью .read()
. Это всего лишь на 10% медленнее, чем вычисления "все в памяти", но избегает требования "двойной RAM". Кроме того, мы можем избежать окончательной полной нагрузки и иметь минимальное использование ОЗУ.
H5Py
H5py - гораздо более простая библиотека, чем pytables. Для этого случая использования (в основном) последовательная обработка кажется более подходящей, чем pytables. Единственной недостающей особенностью является отсутствие сжатия "blosc". Если это приводит к большому штрафу за производительность, еще предстоит проверить.
Ответ 2
Dask.array может выполнять такие операции, как max
, cumsum
и т.д. в массиве на диске, таком как PyTables или h5py.
import h5py
d = h5py.File('myfile.hdf5')['/data']
import dask.array as da
x = da.from_array(d, chunks=(1000, 1000))
X выглядит и выглядит как массив numpy и копирует большую часть API. Операции с x будут создавать DAG операций с оперативной памятью, которые будут эффективно выполняться с использованием нескольких потоков, передаваемых с диска по мере необходимости.
da.exp(x).mean(axis=0).compute()
http://dask.pydata.org/en/latest/
conda install dask
or
pip install dask
Ответ 3
См. здесь для хранения ваших параметров в файле HDF5 (он рассолки, поэтому вы можете хранить их, как у вас есть; предел размером 64 кБ на размер рассола).
import pandas as pd
import numpy as np
n_particles = 10
chunk_size = 1000
# 1) create a new emission file, compressing as we go
emission = pd.HDFStore('emission.hdf',mode='w',complib='blosc')
# generate simulated data
for i in range(10):
df = pd.DataFrame(np.abs(np.random.randn(chunk_size,n_particles)),dtype='float32')
# create a globally unique index (time)
# http://stackoverflow.com/questions/16997048/how-does-one-append-large-amounts-of-
data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397
try:
nrows = emission.get_storer('df').nrows
except:
nrows = 0
df.index = pd.Series(df.index) + nrows
emission.append('df',df)
emission.close()
# 2) create counts
cs = pd.HDFStore('counts.hdf',mode='w',complib='blosc')
# this is an iterator, can be any size
for df in pd.read_hdf('emission.hdf','df',chunksize=200):
counts = pd.DataFrame(np.random.poisson(lam=df).astype(np.uint8))
# set the index as the same
counts.index = df.index
# store the sum across all particles (as most are zero this will be a
# nice sub-selector
# better maybe to have multiple of these sums that divide the particle space
# you don't have to do this but prob more efficient
# you can do this in another file if you want/need
counts['particles_0_4'] = counts.iloc[:,0:4].sum(1)
counts['particles_5_9'] = counts.iloc[:,5:9].sum(1)
# make the non_zero column indexable
cs.append('df',counts,data_columns=['particles_0_4','particles_5_9'])
cs.close()
# 3) find interesting counts
print pd.read_hdf('counts.hdf','df',where='particles_0_4>0')
print pd.read_hdf('counts.hdf','df',where='particles_5_9>0')
В качестве альтернативы вы можете сделать каждую частицу data_column и выбрать ее отдельно.
и некоторый вывод (довольно активное излучение в этом случае:)
0 1 2 3 4 5 6 7 8 9 particles_0_4 particles_5_9
0 2 2 2 3 2 1 0 2 1 0 9 4
1 1 0 0 0 1 0 1 0 3 0 1 4
2 0 2 0 0 2 0 0 1 2 0 2 3
3 0 0 0 1 1 0 0 2 0 3 1 2
4 3 1 0 2 1 0 0 1 0 0 6 1
5 1 0 0 1 0 0 0 3 0 0 2 3
6 0 0 0 1 1 0 1 0 0 0 1 1
7 0 2 0 2 0 0 0 0 2 0 4 2
8 0 0 0 1 3 0 0 0 0 1 1 0
10 1 0 0 0 0 0 0 0 0 1 1 0
11 0 0 1 1 0 2 0 1 2 1 2 5
12 0 2 2 4 0 0 1 1 0 1 8 2
13 0 2 1 0 0 0 0 1 1 0 3 2
14 1 0 0 0 0 3 0 0 0 0 1 3
15 0 0 0 1 1 0 0 0 0 0 1 0
16 0 0 0 4 3 0 3 0 1 0 4 4
17 0 2 2 3 0 0 2 2 0 2 7 4
18 0 1 2 1 0 0 3 2 1 2 4 6
19 1 1 0 0 0 0 1 2 1 1 2 4
20 0 0 2 1 2 2 1 0 0 1 3 3
22 0 1 2 2 0 0 0 0 1 0 5 1
23 0 2 4 1 0 1 2 0 0 2 7 3
24 1 1 1 0 1 0 0 1 2 0 3 3
26 1 3 0 4 1 0 0 0 2 1 8 2
27 0 1 1 4 0 1 2 0 0 0 6 3
28 0 1 0 0 0 0 0 0 0 0 1 0
29 0 2 0 0 1 0 1 0 0 0 2 1
30 0 1 0 2 1 2 0 2 1 1 3 5
31 0 0 1 1 1 1 1 0 1 1 2 3
32 3 0 2 1 0 0 1 0 1 0 6 2
33 1 3 1 0 4 1 1 0 1 4 5 3
34 1 1 0 0 0 0 0 3 0 1 2 3
35 0 1 0 0 1 1 2 0 1 0 1 4
36 1 0 1 0 1 2 1 2 0 1 2 5
37 0 0 0 1 0 0 0 0 3 0 1 3
38 2 5 0 0 0 3 0 1 0 0 7 4
39 1 0 0 2 1 1 3 0 0 1 3 4
40 0 1 0 0 1 0 0 4 2 2 1 6
41 0 3 3 1 1 2 0 0 2 0 7 4
42 0 1 0 2 0 0 0 0 0 1 3 0
43 0 0 2 0 5 0 3 2 1 1 2 6
44 0 2 0 1 0 0 1 0 0 0 3 1
45 1 0 0 2 0 0 0 1 4 0 3 5
46 0 2 0 0 0 0 0 1 1 0 2 2
48 3 0 0 0 0 1 1 0 0 0 3 2
50 0 1 0 1 0 1 0 0 2 1 2 3
51 0 0 2 0 0 0 2 3 1 1 2 6
52 0 0 2 3 2 3 1 0 1 5 5 5
53 0 0 0 2 1 1 0 0 1 1 2 2
54 0 1 2 2 2 0 1 0 2 0 5 3
55 0 2 1 0 0 0 0 0 3 2 3 3
56 0 1 0 0 0 2 2 0 1 1 1 5
57 0 0 0 1 1 0 0 1 0 0 1 1
58 6 1 2 0 2 2 0 0 0 0 9 2
59 0 1 1 0 0 0 0 0 2 0 2 2
60 2 0 0 0 1 0 0 1 0 1 2 1
61 0 0 3 1 1 2 0 0 1 0 4 3
62 2 0 1 0 0 0 0 1 2 1 3 3
63 2 0 1 0 1 0 1 0 0 0 3 1
65 0 0 1 0 0 0 1 5 0 1 1 6
.. .. .. .. .. .. .. .. .. .. ... ...
[9269 rows x 12 columns]
Ответ 4
Используйте OpenMM для имитации частиц (https://github.com/SimTk/openmm) и MDTraj (https://github.com/rmcgibbo/mdtraj) для обработки траектории IO.
Ответ 5
Тест pytables vs pandas.HDFStore
в принятом ответе полностью вводит в заблуждение:
-
Первая критическая ошибка: время не применяется os.fsync
после
flush, что делает тест скорости нестабильным. Так что когда-нибудь, pytables
функция происходит значительно быстрее.
-
Вторая проблема: pytables и pandas версии полностью
различные формы из-за непонимания pytables.EArray
's
форма arg. Автор пытается добавить столбец в версию pytables, но
добавьте строку в версию pandas.
-
Третья проблема заключается в том, что автор использовал разные chunkshape
во время
сравнение.
-
Автор также забыл отключить генерацию индекса таблицы во время store.append()
, что является трудоемким процессом.
В таблице ниже приведены результаты работы его версии и моих исправлений.
tbold
- его версия pytables, pdold
- его версия pandas. tbsync
и pdsync
- его версия с fsync()
после flush()
, а также отключить генерацию индекса таблицы во время добавления. tbopt
и pdopt
- моя оптимизированная версия с blosc:lz4
и дополнением 9.
| name | dt | data size [MB] | comp ratio % | chunkshape | shape | clib | indexed |
|:-------|-----:|-----------------:|---------------:|:-------------|:--------------|:----------------|:----------|
| tbold | 5.11 | 300.00 | 84.63 | (15, 262144) | (15, 5242880) | blosc[5][1] | False |
| pdold | 8.39 | 340.00 | 39.26 | (1927,) | (5242880,) | blosc[5][1] | True |
| tbsync | 7.47 | 300.00 | 84.63 | (15, 262144) | (15, 5242880) | blosc[5][1] | False |
| pdsync | 6.97 | 340.00 | 39.27 | (1927,) | (5242880,) | blosc[5][1] | False |
| tbopt | 4.78 | 300.00 | 43.07 | (4369, 15) | (5242880, 15) | blosc:lz4[9][1] | False |
| pdopt | 5.73 | 340.00 | 38.53 | (3855,) | (5242880,) | blosc:lz4[9][1] | False |
pandas.HDFStore
использует pytables
под капотом. Таким образом, если мы используем их правильно, они не должны иметь никакой разницы.
Мы видим, что версия pandas имеет больший размер данных. Это связано с тем, что pandas использует pytables.Table вместо EArray. И pandas.DataFrame всегда имеет индексный столбец. Первый столбец объекта Table - это индекс DataFrame, для которого требуется дополнительное пространство для сохранения. Это немного влияет на производительность ввода-вывода, но предоставляет больше функций, таких как внекорневой запрос. Поэтому я по-прежнему рекомендую pandas здесь. @MRocklin также упомянул более приятную внешнюю оболочку пакета, если большинство функций, которые вы использовали, - это просто операции с массивами, а не табличные запросы. Но производительность ввода-вывода не будет иметь различимых различий.
h5f.root.emission._v_attrs
Out[82]:
/emission._v_attrs (AttributeSet), 15 attributes:
[CLASS := 'GROUP',
TITLE := '',
VERSION := '1.0',
data_columns := [],
encoding := 'UTF-8',
index_cols := [(0, 'index')],
info := {1: {'names': [None], 'type': 'RangeIndex'}, 'index': {}},
levels := 1,
metadata := [],
nan_rep := 'nan',
non_index_axes := [(1, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])],
pandas_type := 'frame_table',
pandas_version := '0.15.2',
table_type := 'appendable_frame',
values_cols := ['values_block_0']]
Вот функции:
def generate_emission(shape):
"""Generate fake emission."""
emission = np.random.randn(*shape).astype('float32') - 1
emission.clip(0, 1e6, out=emission)
assert (emission >=0).all()
return emission
def test_puretb_earray(outpath,
n_particles = 15,
time_chunk_size = 2**18,
n_iter = 20,
sync = True,
clib = 'blosc',
clevel = 5,
):
time_size = n_iter * time_chunk_size
data_file = pytb.open_file(outpath, mode="w")
comp_filter = pytb.Filters(complib=clib, complevel=clevel)
emission = data_file.create_earray('/', 'emission', atom=pytb.Float32Atom(),
shape=(n_particles, 0),
chunkshape=(n_particles, time_chunk_size),
expectedrows=n_iter * time_chunk_size,
filters=comp_filter)
# generate simulated emission data
t0 =time()
for i in range(n_iter):
emission_chunk = generate_emission((n_particles, time_chunk_size))
emission.append(emission_chunk)
emission.flush()
if sync:
os.fsync(data_file.fileno())
data_file.close()
t1 = time()
return t1-t0
def test_puretb_earray2(outpath,
n_particles = 15,
time_chunk_size = 2**18,
n_iter = 20,
sync = True,
clib = 'blosc',
clevel = 5,
):
time_size = n_iter * time_chunk_size
data_file = pytb.open_file(outpath, mode="w")
comp_filter = pytb.Filters(complib=clib, complevel=clevel)
emission = data_file.create_earray('/', 'emission', atom=pytb.Float32Atom(),
shape=(0, n_particles),
expectedrows=time_size,
filters=comp_filter)
# generate simulated emission data
t0 =time()
for i in range(n_iter):
emission_chunk = generate_emission((time_chunk_size, n_particles))
emission.append(emission_chunk)
emission.flush()
if sync:
os.fsync(data_file.fileno())
data_file.close()
t1 = time()
return t1-t0
def test_purepd_df(outpath,
n_particles = 15,
time_chunk_size = 2**18,
n_iter = 20,
sync = True,
clib='blosc',
clevel=5,
autocshape=False,
oldversion=False,
):
time_size = n_iter * time_chunk_size
emission = pd.HDFStore(outpath, mode='w', complib=clib, complevel=clevel)
# generate simulated data
t0 =time()
for i in range(n_iter):
# Generate fake emission
emission_chunk = generate_emission((time_chunk_size, n_particles))
df = pd.DataFrame(emission_chunk, dtype='float32')
# create a globally unique index (time)
# http://stackoverflow.com/info/16997048/how-does-one-append-large-
# amounts-of-data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397
try:
nrows = emission.get_storer('emission').nrows
except:
nrows = 0
df.index = pd.Series(df.index) + nrows
if autocshape:
emission.append('emission', df, index=False,
expectedrows=time_size
)
else:
if oldversion:
emission.append('emission', df)
else:
emission.append('emission', df, index=False)
emission.flush(fsync=sync)
emission.close()
t1 = time()
return t1-t0
def _test_puretb_earray_nosync(outpath):
return test_puretb_earray(outpath, sync=False)
def _test_purepd_df_nosync(outpath):
return test_purepd_df(outpath, sync=False,
oldversion=True
)
def _test_puretb_earray_opt(outpath):
return test_puretb_earray2(outpath,
sync=False,
clib='blosc:lz4',
clevel=9
)
def _test_purepd_df_opt(outpath):
return test_purepd_df(outpath,
sync=False,
clib='blosc:lz4',
clevel=9,
autocshape=True
)
testfns = {
'tbold':_test_puretb_earray_nosync,
'pdold':_test_purepd_df_nosync,
'tbsync':test_puretb_earray,
'pdsync':test_purepd_df,
'tbopt': _test_puretb_earray_opt,
'pdopt': _test_purepd_df_opt,
}