Симулятор частиц Python: обработка вне ядра

Описание проблемы

При написании симулятора частиц Монте-Карло (движение буриунов и фотонное излучение) в python/numpy. Мне нужно сохранить выходной файл моделирования ( → 10 ГБ) в файл и обработать данные на втором шаге. Совместимость с Windows и Linux важна.

Число частиц (n_particles) составляет 10-100. Количество шагов времени (time_size) составляет ~ 10 ^ 9.

Моделирование имеет 3 шага (код ниже для версии "все-в-ОЗУ" ):

  • Имитировать (и сохранить) массив значений emission (содержит много элементов почти-0):

    • shape (n_particles x time_size), float32, размер 80 ГБ
  • Вычислить массив counts (случайные значения из процесса Пуассона с ранее вычисленными скоростями):

    • shape (n_particles x time_size), uint8, размер 20 ГБ

      counts = np.random.poisson(lam=emission).astype(np.uint8)
      
  • Найти временные метки (или индекс) счетчиков. Графы почти всегда равны 0, поэтому массивы timestamp будут помещаться в ОЗУ.

    # Loop across the particles
    timestamps = [np.nonzero(c) for c in counts]
    

Я делаю шаг 1 один раз, затем повторяю шаг 2-3 много (~ 100) раз. В будущем мне может понадобиться предварительно обработать emission (применить cumsum или другие функции) до вычисления counts.

Вопрос

У меня есть работающая в памяти реализация, и я пытаюсь понять, что лучше всего подходит для реализации устаревшей версии, которая может масштабировать (намного) более длительные симуляции.

Что бы я хотел:

Мне нужно сохранить массивы в файл, и я хотел бы использовать один файл для моделирования. Мне также нужен "простой" способ хранения и вызова словаря параметра моделирования (скаляры).

В идеале мне нужен массив numpy с поддержкой файлов, который я могу предварительно распределить и заполнить куски. Затем я хотел бы, чтобы методы массива numpy (max, cumsum,...) работали прозрачно, требуя только ключевое слово chunksize, чтобы указать, сколько массива загружается на каждой итерации.

Еще лучше, мне нужен Numexpr, который работает не между кешем и оперативной памятью, а между ОЗУ и жестким диском.

Каковы практические варианты

В качестве первого варианта Я начал экспериментировать с pyTables, но я не доволен его сложностью и абстракциями (так отличается от numpy). Более того, мое текущее решение (см. Ниже) является UGLY и не очень эффективным.

Итак, мои варианты, для которых я ищу ответ,

  • реализовать массив numpy с требуемой функциональностью (как?)

  • использовать pytable более умным способом (разные структуры данных/методы)

  • используйте другую библиотеку: h5py, blaze, pandas... (я пока не пробовал ни одного из них).

Предварительное решение (pyTables)

Я сохраняю параметры моделирования в группе '/parameters': каждый параметр преобразуется в скалярный массив numpy. Подробное решение, но оно работает.

Я сохраняю emission как расширяемый массив (EArray), потому что я генерирую данные в кусках, и мне нужно добавить каждый новый кусок (хотя я знаю конечный размер). Сохранение counts более проблематично. Если сохранить его как массива pytable, сложно выполнить такие запросы, как "counts >= 2". Поэтому я сохранил counts как несколько таблиц (по одной на каждую частицу) [UGLY], и я запрашиваю с .get_where_list('counts >= 2'). Я не уверен, что это экономически эффективно, и генерируя все эти таблицы вместо использования одного массива, значительно улучшает файл HDF5. Более того, как ни странно, создание этих таблиц требует создания настраиваемого типа dtype (даже для стандартных типов numpy):

    dt = np.dtype([('counts', 'u1')])        
    for ip in xrange(n_particles):
        name = "particle_%d" % ip
        data_file.create_table(
                    group, name, description=dt, chunkshape=chunksize,
                    expectedrows=time_size,
                    title='Binned timetrace of emitted ph (bin = t_step)'
                        ' - particle_%d' % particle)

Каждая таблица счетчиков с частицами имеет другое имя (name = "particle_%d" % ip) и что мне нужно поместить их в список python для легкой итерации.

EDIT. Результатом этого вопроса является симулятор броуновского движения, называемый PyBroMo.

Ответы

Ответ 1

Решение PyTable

Поскольку функциональность, предоставляемая Pandas, не требуется, а обработка выполняется намного медленнее (см. нижеприведенный блокнот), наилучшим подходом является использование PyTables или h5py напрямую. Я пробовал только подход pytables до сих пор.

Все тесты были выполнены в этом ноутбуке:

Введение в структуры данных pytables

Ссылка: Официальные документы PyTables

Pytables позволяет хранить данные в файлах HDF5 в двух типах форматов: массивы и таблицы.

Массивы

Существует 3 типа массивов Array, CArray и EArray. Все они позволяют хранить и извлекать (многомерные) фрагменты с нотной заметкой, подобной нарезке numpy.

# Write data to store (broadcasting works)
array1[:]  = 3

# Read data from store
in_ram_array = array1[:]

Для оптимизации в некоторых случаях использования CArray сохраняется в "кусках", размер которых можно выбрать с помощью chunk_shape во время создания.

Array и CArray размер фиксируется во время создания. Вы можете заполнить/записать массив chunk-by-chunk после создания. Обратно EArray может быть расширен с помощью метода .append().

Таблицы

table - совсем другой зверь. Это в основном "таблица". У вас есть только 1D-индекс, и каждый элемент является строкой. Внутри каждой строки есть типы данных "столбцы", каждый из столбцов может иметь другой тип. Это вы знакомы с numpy records-arrays, таблица в основном представляет собой 1D-массив записей, причем каждый элемент имеет много полей в качестве столбцов.

1D или 2D массивы numpy могут храниться в таблицах, но это немного сложнее: нам нужно создать тип данных строки. Например, чтобы сохранить массив 1D uint8 numpy, нам нужно сделать:

table_uint8 = np.dtype([('field1', 'u1')])
table_1d = data_file.create_table('/', 'array_1d', description=table_uint8)

Итак, зачем использовать таблицы? Потому что, в отличие от массивов, таблицы могут быть эффективно запрошены. Например, если мы хотим искать элементы > 3 в огромной табличной таблице, мы можем сделать:

index = table_1d.get_where_list('field1 > 3')

Не только это просто (по сравнению с массивами, где нам нужно сканировать весь файл в кусках и строить index в цикле), но это также очень быстро.

Как сохранить параметры моделирования

Лучший способ хранения параметров моделирования - использовать группу (т.е. /parameters), преобразовать каждый скаляр в массив numpy и сохранить его как CArray.

Массив для "emission"

emission - это самый большой массив, который генерируется и читается последовательно. Для этого шаблона использования Хорошая структура данных EArray. На "смоделированных" данных с ~ 50% элементов нулей blosc сжатие (level=5) достигает 2,2x степени сжатия. Я обнаружил, что размер блока 2 ^ 18 (256 тыс.) Имеет минимальное время обработки.

Сохранение "counts"

Сохранение также "counts" увеличит размер файла на 10% и займет 40% больше времени для вычисления временных меток. Сохранение counts не является преимуществом, потому что в конце нужны только отметки времени.

Преимущество заключается в том, что повторная перестройка индекса (временных меток) проще, поскольку мы запрашиваем полную ось времени в одной команде (.get_where_list('counts >= 1')). Напротив, с обработкой с чередованием нам нужно выполнить некоторую арифметику индекса, которая немного сложна и, возможно, может быть обузой.

Однако сложность кода может быть небольшой по сравнению со всеми другими операциями (сортировка и слияние), которые необходимы в обоих случаях.

Сохранение "timestamps"

Временные метки могут накапливаться в ОЗУ. Тем не менее, мы не знаем размер массива перед запуском, и окончательный вызов hstack() необходим для "слияния" разных кусков, хранящихся в списке. Это удваивает требования к памяти, поэтому объем ОЗУ может быть недостаточным.

Мы можем хранить временные метки as-we-go в таблице, используя .append(). В конце мы можем загрузить таблицу в памяти с помощью .read(). Это всего лишь на 10% медленнее, чем вычисления "все в памяти", но избегает требования "двойной RAM". Кроме того, мы можем избежать окончательной полной нагрузки и иметь минимальное использование ОЗУ.

H5Py

H5py - гораздо более простая библиотека, чем pytables. Для этого случая использования (в основном) последовательная обработка кажется более подходящей, чем pytables. Единственной недостающей особенностью является отсутствие сжатия "blosc". Если это приводит к большому штрафу за производительность, еще предстоит проверить.

Ответ 2

Dask.array может выполнять такие операции, как max, cumsum и т.д. в массиве на диске, таком как PyTables или h5py.

import h5py
d = h5py.File('myfile.hdf5')['/data']
import dask.array as da
x = da.from_array(d, chunks=(1000, 1000))

X выглядит и выглядит как массив numpy и копирует большую часть API. Операции с x будут создавать DAG операций с оперативной памятью, которые будут эффективно выполняться с использованием нескольких потоков, передаваемых с диска по мере необходимости.

da.exp(x).mean(axis=0).compute()

http://dask.pydata.org/en/latest/

conda install dask
or 
pip install dask

Ответ 3

См. здесь для хранения ваших параметров в файле HDF5 (он рассолки, поэтому вы можете хранить их, как у вас есть; предел размером 64 кБ на размер рассола).

import pandas as pd                                                                                                                                                                                                                                                                                               
import numpy as np                                                                                                                                                                                                                                                                                                

n_particles = 10                                                                                                                                                                                                                                                                                                  
chunk_size = 1000                                                                                                                                                                                                                                                                                                 

# 1) create a new emission file, compressing as we go                                                                                                                                                                                                                                                             
emission = pd.HDFStore('emission.hdf',mode='w',complib='blosc')                                                                                                                                                                                                                                                   

# generate simulated data                                                                                                                                                                                                                                                                                         
for i in range(10):                                                                                                                                                                                                                                                                                               

    df = pd.DataFrame(np.abs(np.random.randn(chunk_size,n_particles)),dtype='float32')                                                                                                                                                                                                                            

    # create a globally unique index (time)                                                                                                                                                                                                                                                                       
    # http://stackoverflow.com/questions/16997048/how-does-one-append-large-amounts-of-

    data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397                                                                                                                                                              
        try:                                                                                                                                                                                                                                                                                                          
            nrows = emission.get_storer('df').nrows                                                                                                                                                                                                                                                                   
        except:                                                                                                                                                                                                                                                                                                       
            nrows = 0                                                                                                                                                                                                                                                                                                 

        df.index = pd.Series(df.index) + nrows                                                                                                                                                                                                                                                                        
        emission.append('df',df)                                                                                                                                                                                                                                                                                      

    emission.close()                                                                                                                                                                                                                                                                                                  

    # 2) create counts                                                                                                                                                                                                                                                                                                
    cs = pd.HDFStore('counts.hdf',mode='w',complib='blosc')                                                                                                                                                                                                                                                           

    # this is an iterator, can be any size                                                                                                                                                                                                                                                                            
    for df in pd.read_hdf('emission.hdf','df',chunksize=200):                                                                                                                                                                                                                                                         

        counts = pd.DataFrame(np.random.poisson(lam=df).astype(np.uint8))                                                                                                                                                                                                                                             

        # set the index as the same                                                                                                                                                                                                                                                                                   
        counts.index = df.index                                                                                                                                                                                                                                                                                       

        # store the sum across all particles (as most are zero this will be a 
        # nice sub-selector                                                                                                                                                                                                                       
        # better maybe to have multiple of these sums that divide the particle space                                                                                                                                                                                                                                  
        # you don't have to do this but prob more efficient                                                                                                                                                                                                                                                           
        # you can do this in another file if you want/need                                                                                                                                                                                                                                                               
        counts['particles_0_4'] = counts.iloc[:,0:4].sum(1)                                                                                                                                                                                                                                                           
        counts['particles_5_9'] = counts.iloc[:,5:9].sum(1)                                                                                                                                                                                                                                                           

        # make the non_zero column indexable                                                                                                                                                                                                                                                                          
        cs.append('df',counts,data_columns=['particles_0_4','particles_5_9'])                                                                                                                                                                                                                                         

    cs.close()                                                                                                                                                                                                                                                                                                        

    # 3) find interesting counts                                                                                                                                                                                                                                                                                      
    print pd.read_hdf('counts.hdf','df',where='particles_0_4>0')                                                                                                                                                                                                                                                      
    print pd.read_hdf('counts.hdf','df',where='particles_5_9>0')         

В качестве альтернативы вы можете сделать каждую частицу data_column и выбрать ее отдельно.

и некоторый вывод (довольно активное излучение в этом случае:)

    0  1  2  3  4  5  6  7  8  9  particles_0_4  particles_5_9
0   2  2  2  3  2  1  0  2  1  0              9              4
1   1  0  0  0  1  0  1  0  3  0              1              4
2   0  2  0  0  2  0  0  1  2  0              2              3
3   0  0  0  1  1  0  0  2  0  3              1              2
4   3  1  0  2  1  0  0  1  0  0              6              1
5   1  0  0  1  0  0  0  3  0  0              2              3
6   0  0  0  1  1  0  1  0  0  0              1              1
7   0  2  0  2  0  0  0  0  2  0              4              2
8   0  0  0  1  3  0  0  0  0  1              1              0
10  1  0  0  0  0  0  0  0  0  1              1              0
11  0  0  1  1  0  2  0  1  2  1              2              5
12  0  2  2  4  0  0  1  1  0  1              8              2
13  0  2  1  0  0  0  0  1  1  0              3              2
14  1  0  0  0  0  3  0  0  0  0              1              3
15  0  0  0  1  1  0  0  0  0  0              1              0
16  0  0  0  4  3  0  3  0  1  0              4              4
17  0  2  2  3  0  0  2  2  0  2              7              4
18  0  1  2  1  0  0  3  2  1  2              4              6
19  1  1  0  0  0  0  1  2  1  1              2              4
20  0  0  2  1  2  2  1  0  0  1              3              3
22  0  1  2  2  0  0  0  0  1  0              5              1
23  0  2  4  1  0  1  2  0  0  2              7              3
24  1  1  1  0  1  0  0  1  2  0              3              3
26  1  3  0  4  1  0  0  0  2  1              8              2
27  0  1  1  4  0  1  2  0  0  0              6              3
28  0  1  0  0  0  0  0  0  0  0              1              0
29  0  2  0  0  1  0  1  0  0  0              2              1
30  0  1  0  2  1  2  0  2  1  1              3              5
31  0  0  1  1  1  1  1  0  1  1              2              3
32  3  0  2  1  0  0  1  0  1  0              6              2
33  1  3  1  0  4  1  1  0  1  4              5              3
34  1  1  0  0  0  0  0  3  0  1              2              3
35  0  1  0  0  1  1  2  0  1  0              1              4
36  1  0  1  0  1  2  1  2  0  1              2              5
37  0  0  0  1  0  0  0  0  3  0              1              3
38  2  5  0  0  0  3  0  1  0  0              7              4
39  1  0  0  2  1  1  3  0  0  1              3              4
40  0  1  0  0  1  0  0  4  2  2              1              6
41  0  3  3  1  1  2  0  0  2  0              7              4
42  0  1  0  2  0  0  0  0  0  1              3              0
43  0  0  2  0  5  0  3  2  1  1              2              6
44  0  2  0  1  0  0  1  0  0  0              3              1
45  1  0  0  2  0  0  0  1  4  0              3              5
46  0  2  0  0  0  0  0  1  1  0              2              2
48  3  0  0  0  0  1  1  0  0  0              3              2
50  0  1  0  1  0  1  0  0  2  1              2              3
51  0  0  2  0  0  0  2  3  1  1              2              6
52  0  0  2  3  2  3  1  0  1  5              5              5
53  0  0  0  2  1  1  0  0  1  1              2              2
54  0  1  2  2  2  0  1  0  2  0              5              3
55  0  2  1  0  0  0  0  0  3  2              3              3
56  0  1  0  0  0  2  2  0  1  1              1              5
57  0  0  0  1  1  0  0  1  0  0              1              1
58  6  1  2  0  2  2  0  0  0  0              9              2
59  0  1  1  0  0  0  0  0  2  0              2              2
60  2  0  0  0  1  0  0  1  0  1              2              1
61  0  0  3  1  1  2  0  0  1  0              4              3
62  2  0  1  0  0  0  0  1  2  1              3              3
63  2  0  1  0  1  0  1  0  0  0              3              1
65  0  0  1  0  0  0  1  5  0  1              1              6
   .. .. .. .. .. .. .. .. .. ..            ...            ...

[9269 rows x 12 columns]

Ответ 5

Тест pytables vs pandas.HDFStore в принятом ответе полностью вводит в заблуждение:

  • Первая критическая ошибка: время не применяется os.fsync после flush, что делает тест скорости нестабильным. Так что когда-нибудь, pytables функция происходит значительно быстрее.

  • Вторая проблема: pytables и pandas версии полностью различные формы из-за непонимания pytables.EArray 's форма arg. Автор пытается добавить столбец в версию pytables, но добавьте строку в версию pandas.

  • Третья проблема заключается в том, что автор использовал разные chunkshape во время сравнение.

  • Автор также забыл отключить генерацию индекса таблицы во время store.append(), что является трудоемким процессом.

В таблице ниже приведены результаты работы его версии и моих исправлений. tbold - его версия pytables, pdold - его версия pandas. tbsync и pdsync - его версия с fsync() после flush(), а также отключить генерацию индекса таблицы во время добавления. tbopt и pdopt - моя оптимизированная версия с blosc:lz4 и дополнением 9.

| name   |   dt |   data size [MB] |   comp ratio % | chunkshape   | shape         | clib            | indexed   |
|:-------|-----:|-----------------:|---------------:|:-------------|:--------------|:----------------|:----------|
| tbold  | 5.11 |           300.00 |          84.63 | (15, 262144) | (15, 5242880) | blosc[5][1]     | False     |
| pdold  | 8.39 |           340.00 |          39.26 | (1927,)      | (5242880,)    | blosc[5][1]     | True      |
| tbsync | 7.47 |           300.00 |          84.63 | (15, 262144) | (15, 5242880) | blosc[5][1]     | False     |
| pdsync | 6.97 |           340.00 |          39.27 | (1927,)      | (5242880,)    | blosc[5][1]     | False     |
| tbopt  | 4.78 |           300.00 |          43.07 | (4369, 15)   | (5242880, 15) | blosc:lz4[9][1] | False     |
| pdopt  | 5.73 |           340.00 |          38.53 | (3855,)      | (5242880,)    | blosc:lz4[9][1] | False     |

pandas.HDFStore использует pytables под капотом. Таким образом, если мы используем их правильно, они не должны иметь никакой разницы.

Мы видим, что версия pandas имеет больший размер данных. Это связано с тем, что pandas использует pytables.Table вместо EArray. И pandas.DataFrame всегда имеет индексный столбец. Первый столбец объекта Table - это индекс DataFrame, для которого требуется дополнительное пространство для сохранения. Это немного влияет на производительность ввода-вывода, но предоставляет больше функций, таких как внекорневой запрос. Поэтому я по-прежнему рекомендую pandas здесь. @MRocklin также упомянул более приятную внешнюю оболочку пакета, если большинство функций, которые вы использовали, - это просто операции с массивами, а не табличные запросы. Но производительность ввода-вывода не будет иметь различимых различий.

h5f.root.emission._v_attrs
Out[82]: 
/emission._v_attrs (AttributeSet), 15 attributes:
   [CLASS := 'GROUP',
    TITLE := '',
    VERSION := '1.0',
    data_columns := [],
    encoding := 'UTF-8',
    index_cols := [(0, 'index')],
    info := {1: {'names': [None], 'type': 'RangeIndex'}, 'index': {}},
    levels := 1,
    metadata := [],
    nan_rep := 'nan',
    non_index_axes := [(1, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])],
    pandas_type := 'frame_table',
    pandas_version := '0.15.2',
    table_type := 'appendable_frame',
    values_cols := ['values_block_0']]

Вот функции:

def generate_emission(shape):
    """Generate fake emission."""
    emission = np.random.randn(*shape).astype('float32') - 1
    emission.clip(0, 1e6, out=emission)
    assert (emission >=0).all()
    return emission



def test_puretb_earray(outpath,
                        n_particles = 15,
                        time_chunk_size = 2**18,
                        n_iter = 20,
                       sync = True,
                       clib = 'blosc',
                       clevel = 5,
                       ):

    time_size = n_iter * time_chunk_size
    data_file = pytb.open_file(outpath, mode="w")
    comp_filter = pytb.Filters(complib=clib, complevel=clevel)
    emission = data_file.create_earray('/', 'emission', atom=pytb.Float32Atom(),
                                       shape=(n_particles, 0),
                                       chunkshape=(n_particles, time_chunk_size),
                                       expectedrows=n_iter * time_chunk_size,
                                       filters=comp_filter)

    # generate simulated emission data
    t0 =time()
    for i in range(n_iter):
        emission_chunk = generate_emission((n_particles, time_chunk_size))
        emission.append(emission_chunk)

    emission.flush()
    if sync:
        os.fsync(data_file.fileno())
    data_file.close()
    t1 = time()
    return t1-t0


def test_puretb_earray2(outpath,
                        n_particles = 15,
                        time_chunk_size = 2**18,
                        n_iter = 20,
                       sync = True,
                       clib = 'blosc',
                       clevel = 5,
                       ):

    time_size = n_iter * time_chunk_size
    data_file = pytb.open_file(outpath, mode="w")
    comp_filter = pytb.Filters(complib=clib, complevel=clevel)
    emission = data_file.create_earray('/', 'emission', atom=pytb.Float32Atom(),
                                       shape=(0, n_particles),
                                       expectedrows=time_size,
                                       filters=comp_filter)

    # generate simulated emission data
    t0 =time()
    for i in range(n_iter):
        emission_chunk = generate_emission((time_chunk_size, n_particles))
        emission.append(emission_chunk)

    emission.flush()

    if sync:
        os.fsync(data_file.fileno())
    data_file.close()
    t1 = time()
    return t1-t0


def test_purepd_df(outpath,
                   n_particles = 15,
                   time_chunk_size = 2**18,
                   n_iter = 20,
                   sync = True,
                   clib='blosc',
                   clevel=5,
                   autocshape=False,
                   oldversion=False,
                   ):
    time_size = n_iter * time_chunk_size
    emission = pd.HDFStore(outpath, mode='w', complib=clib, complevel=clevel)
    # generate simulated data
    t0 =time()
    for i in range(n_iter):

        # Generate fake emission
        emission_chunk = generate_emission((time_chunk_size, n_particles))

        df = pd.DataFrame(emission_chunk, dtype='float32')

        # create a globally unique index (time)
        # http://stackoverflow.com/info/16997048/how-does-one-append-large-
        # amounts-of-data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397
        try:
            nrows = emission.get_storer('emission').nrows
        except:
            nrows = 0

        df.index = pd.Series(df.index) + nrows

        if autocshape:
            emission.append('emission', df, index=False,
                            expectedrows=time_size
                            )
        else:
            if oldversion:
                emission.append('emission', df)
            else:
                emission.append('emission', df, index=False)

    emission.flush(fsync=sync)
    emission.close()
    t1 = time()
    return t1-t0


def _test_puretb_earray_nosync(outpath):
    return test_puretb_earray(outpath, sync=False)


def _test_purepd_df_nosync(outpath):
    return test_purepd_df(outpath, sync=False,
                          oldversion=True
                          )


def _test_puretb_earray_opt(outpath):
    return test_puretb_earray2(outpath,
                               sync=False,
                               clib='blosc:lz4',
                               clevel=9
                               )


def _test_purepd_df_opt(outpath):
    return test_purepd_df(outpath,
                          sync=False,
                          clib='blosc:lz4',
                          clevel=9,
                          autocshape=True
                          )


testfns = {
    'tbold':_test_puretb_earray_nosync,
    'pdold':_test_purepd_df_nosync,
    'tbsync':test_puretb_earray,
    'pdsync':test_purepd_df,
    'tbopt': _test_puretb_earray_opt,
    'pdopt': _test_purepd_df_opt,
}