Как эффективно обрабатывать последовательные куски Pandas dataframe
У меня есть большой фрейм данных (несколько миллионов строк).
Я хочу иметь возможность выполнять операцию groupby на нем, а просто группировать произвольные последовательные (предпочтительно равные) подмножества строк, а не использовать какое-либо конкретное свойство отдельных строк, чтобы решить, к какой группе они идут.
Вариант использования: я хочу применить функцию к каждой строке через параллельную карту в IPython. Не имеет значения, какие строки попадают в какой-то back-end движок, поскольку функция вычисляет результат на основе одной строки за раз. (Концептуально, по крайней мере, в действительности он векторизован.)
Я придумал что-то вроде этого:
# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)
# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]
# Process chunks in parallel
results = dview.map_sync(my_function, groups)
Но это кажется очень длинным, и не гарантирует равных размеров кусков. Особенно, если индекс разрежен или нецелый или что-то еще.
Любые предложения для лучшего способа?
Спасибо!
Ответы
Ответ 1
На практике вы не можете гарантировать равные размеры блоков: количество строк может быть простым, в конце концов, в этом случае ваши единственные параметры chunking будут кусками размера 1 или одного большого фрагмента. Я склонен передавать массив в groupby
. Начиная с:
>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 13 0.510273 0.051608 0.230402 0.756921
0 14 0.950544 0.576539 0.642602 0.907850
[15 rows x 5 columns]
где я намеренно сделал индекс неинформативным, установив его в 0, мы просто определяем наш размер (здесь 10) и целочисленное разделение массива на него:
>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
... print(k,g)
...
0 0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 8 0.241049 0.246149 0.241935 0.563428
0 9 0.493819 0.918858 0.193236 0.266257
[10 rows x 5 columns]
1 0 1 2 3 4
0 10 0.037693 0.370789 0.369117 0.401041
0 11 0.721843 0.862295 0.671733 0.605006
[...]
0 14 0.950544 0.576539 0.642602 0.907850
[5 rows x 5 columns]
Методы, основанные на разрезе DataFrame, могут завершиться неудачно, если индекс несовместим с этим, хотя вы всегда можете использовать .iloc[a:b]
для игнорирования значений индекса и доступа к данным по положению.
Ответ 2
Используйте numpy array_split():
import numpy as np
import pandas as pd
data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
assert len(chunk) == len(data) / 5
Ответ 3
Я не уверен, что это именно то, что вы хотите, но я нашел эти функции grouper на другом потоке SO довольно полезным для создания многопроцессорного пула.
Вот краткий пример из этого потока, который может сделать что-то вроде того, что вы хотите:
import numpy as np
import pandas as pds
df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])
def chunker(seq, size):
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))
for i in chunker(df,5):
print i
Что дает вам что-то вроде этого:
a b c d
0 0.860574 0.059326 0.339192 0.786399
1 0.029196 0.395613 0.524240 0.380265
2 0.235759 0.164282 0.350042 0.877004
3 0.545394 0.881960 0.994079 0.721279
4 0.584504 0.648308 0.655147 0.511390
a b c d
5 0.276160 0.982803 0.451825 0.845363
6 0.728453 0.246870 0.515770 0.343479
7 0.971947 0.278430 0.006910 0.888512
8 0.044888 0.875791 0.842361 0.890675
9 0.200563 0.246080 0.333202 0.574488
a b c d
10 0.971125 0.106790 0.274001 0.960579
11 0.722224 0.575325 0.465267 0.258976
12 0.574039 0.258625 0.469209 0.886768
13 0.915423 0.713076 0.073338 0.622967
Я надеюсь, что это поможет.
ИЗМЕНИТЬ
В этом случае я использовал эту функцию с пулом процессоров в (приблизительно) следующим образом:
from multiprocessing import Pool
nprocs = 4
pool = Pool(nprocs)
for chunk in chunker(df, nprocs):
data = pool.map(myfunction, chunk)
data.domorestuff()
Я предполагаю, что это должно быть очень похоже на использование распределенного механизма IPython, но я его не пробовал.
Ответ 4
Знак хорошей среды - это много вариантов, поэтому я добавлю это из Anaconda Blaze, используя Odo
import blaze as bz
import pandas as pd
df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})
for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
# Do stuff with chunked dataframe