Ответ 1
Я бы, вероятно, обратился к этому, используя dask для загрузки ваших данных потоковым способом. Например, вы можете создать файл данных dask следующим образом:
import dask.dataframe as ddf
data = ddf.read_csv('test.csv')
Этот объект data
на самом деле ничего не сделал в этот момент; он просто содержит "рецепт" сортов для чтения фрейма данных с диска в управляемых кусках. Если вы хотите материализовать данные, вы можете вызвать compute()
:
df = data.compute().reset_index(drop=True)
На этом этапе у вас есть стандартный pandas dataframe (мы вызываем reset_index
, потому что по умолчанию каждый раздел независимо индексируется). Результат эквивалентен тому, что вы получаете, вызывая непосредственно pd.read_csv
:
df.equals(pd.read_csv('test.csv'))
# True
Преимущество dask заключается в том, что вы можете добавлять инструкции к этому "рецепту" для построения вашего фрейма данных; например, вы можете сделать каждый раздел данных разреженным следующим образом:
data = data.map_partitions(lambda part: part.to_sparse(fill_value=0))
В этот момент вызов compute()
будет содержать разреженный массив:
df = data.compute().reset_index(drop=True)
type(df)
# pandas.core.sparse.frame.SparseDataFrame
Профилирование
Чтобы проверить, как подход dask сравнивается с исходным подходом pandas, сделайте некоторое профилирование строк. Я буду использовать lprun
и mprun
, как описано здесь (полное раскрытие: это раздел моей собственной книги).
Предполагая, что вы работаете в ноутбуке Jupyter, вы можете запустить его следующим образом:
Сначала создайте отдельный файл с основными задачами, которые мы хотим сделать:
%%file dask_load.py
import numpy as np
import pandas as pd
import dask.dataframe as ddf
def compare_loads():
df = pd.read_csv('test.csv')
df_sparse = df.to_sparse(fill_value=0)
df_dask = ddf.read_csv('test.csv', blocksize=10E6)
df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
df_dask = df_dask.compute().reset_index(drop=True)
Далее сделайте линейное профилирование для времени вычисления:
%load_ext line_profiler
from dask_load import compare_loads
%lprun -f compare_loads compare_loads()
Получаю следующий результат:
Timer unit: 1e-06 s
Total time: 13.9061 s
File: /Users/jakevdp/dask_load.py
Function: compare_loads at line 6
Line # Hits Time Per Hit % Time Line Contents
==============================================================
6 def compare_loads():
7 1 4746788 4746788.0 34.1 df = pd.read_csv('test.csv')
8 1 769303 769303.0 5.5 df_sparse = df.to_sparse(fill_value=0)
9
10 1 33992 33992.0 0.2 df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 1 7848 7848.0 0.1 df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 1 8348217 8348217.0 60.0 df_dask = df_dask.compute().reset_index(drop=True)
Мы видим, что около 60% времени тратится на вызов dask, тогда как около 40% времени тратится на вызов pandas для массива примеров выше. Это говорит нам о том, что для этой задачи dask примерно на 50% медленнее, чем pandas: этого следует ожидать, поскольку фрагментация и рекомбинация разделов данных приводит к некоторым дополнительным издержкам.
Где dask shines используется в памяти: используйте mprun
для создания профиля строки за строкой:
%load_ext memory_profiler
%mprun -f compare_loads compare_loads()
Результат на моей машине таков:
Filename: /Users/jakevdp/dask_load.py
Line # Mem usage Increment Line Contents
================================================
6 70.9 MiB 70.9 MiB def compare_loads():
7 691.5 MiB 620.6 MiB df = pd.read_csv('test.csv')
8 828.8 MiB 137.3 MiB df_sparse = df.to_sparse(fill_value=0)
9
10 806.3 MiB -22.5 MiB df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 806.4 MiB 0.1 MiB df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 947.9 MiB 141.5 MiB df_dask = df_dask.compute().reset_index(drop=True)
Мы видим, что окончательный размер фрейма pandas составляет около ~ 140 МБ, но pandas использует ~ 620 МБ по пути, поскольку он считывает данные во временный плотный объект.
С другой стороны, dask использует только ~ 140 Мбайт всего при загрузке массива и создании окончательного разреженного результата. В случае, если вы читаете данные, плотный размер которых сопоставим с памятью, доступной в вашей системе, у dask есть явное преимущество, несмотря на более медленное вычислительное время на 50%.
Но для работы с большими данными вы не должны останавливаться здесь. Предположительно, вы выполняете некоторые операции над вашими данными, а абстракция данных dask позволяет вам выполнять эти операции (т.е. Добавлять их в "рецепт" ), прежде чем материализовать данные. Поэтому, если вы делаете с данными арифметику, агрегации, группировку и т.д., Вам даже не нужно беспокоиться о разреженном хранилище: просто выполните эти операции с объектом dask, вызовите compute()
в конце и dask позаботится о применении их в памяти эффективным способом.
Так, например, я мог вычислить max()
каждого столбца, используя фрейм данных dask, без необходимости сразу загружать все это в память:
>>> data.max().compute()
x 5.38114
y 5.33796
z 5.25661
txt j
dtype: object
Работа с файловыми кадрами dask напрямую позволит вам обойти заботы о представлении данных, потому что вам, вероятно, никогда не придется сразу загружать все данные в память.
Удачи!