Ответ 1
Хороший вопрос, это касается нескольких моментов, когда данные перемещаются к кластеру и возвращаются к клиенту (ваш сеанс python). Давайте рассмотрим несколько этапов вашей работы.
Загрузка данных с помощью Pandas
Это фреймворк Pandas в вашем сеансе python, поэтому он явно остается в вашем локальном процессе.
log = pd.read_csv('800000test', sep='\t') # on client
Преобразовать в ленивый Dask.dataframe
Это разбивает ваш фреймворк Pandas на двадцать фреймов Pandas, однако они все еще находятся на клиенте. Датские кадры данных не отправляют данные до кластера.
logd = dd.from_pandas(log,npartitions=20) # still on client
Вычислить len
Вызов len
на самом деле вызывает вычисление здесь (обычно вы используете df.some_aggregation().compute()
. Итак, теперь Dask начинает работать. Сначала он перемещает ваши данные в кластер (медленный), тогда он вызывает len на всех из 20 разделов (быстрая), он объединяет эти (быстрые) и затем переносит результат на ваш клиент, чтобы он мог печатать.
print(len(logd)) # costly roundtrip client -> cluster -> client
Анализ
Итак, проблема в том, что наш dask.dataframe все еще имел все свои данные в локальном сеансе python.
Было бы гораздо быстрее использовать, скажем, локальный планировщик, а не распределенный планировщик. Это должно вычисляться в миллисекундах
with dask.set_options(get=dask.threaded.get): # no cluster, just local threads
print(len(logd)) # stays on client
Но, предположительно, вы хотите знать, как масштабировать до более крупных наборов данных, поэтому давайте сделаем это правильно.
Загрузите данные о рабочих
Вместо загрузки с помощью Pandas на вашем клиентском/локальном сеансе пусть рабочие Dask загружают биты csv файла. Таким образом, связь между клиентом и работником не требуется.
# log = pd.read_csv('800000test', sep='\t') # on client
log = dd.read_csv('800000test', sep='\t') # on cluster workers
Однако, в отличие от pd.read_csv
, dd.read_csv
является ленивым, поэтому это должно возвращаться почти сразу. Мы можем заставить Dask фактически выполнять вычисления с помощью метода persist
log = client.persist(log) # triggers computation asynchronously
Теперь кластер начинает действовать и загружает ваши данные непосредственно в рабочих. Это относительно быстро. Обратите внимание, что этот метод немедленно возвращается, когда работа выполняется в фоновом режиме. Если вы хотите подождать, пока он закончится, вызовите wait
.
from dask.distributed import wait
wait(log) # blocks until read is done
Если вы тестируете небольшой набор данных и хотите получить больше разделов, попробуйте изменить размер блока.
log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks
Несмотря на это, операции с log
должны быть быстрыми
len(log) # fast
Изменить
В ответ на вопрос о этот blogpost здесь приведены предположения о том, где мы живем.
Обычно, когда вы указываете имя файла на dd.read_csv
, он предполагает, что этот файл отображается со всех рабочих. Это верно, если вы используете сетевую файловую систему или глобальный магазин, такой как S3 или HDFS. Если вы используете сетевую файловую систему, вам нужно либо использовать абсолютные пути (например, /path/to/myfile.*.csv
), либо убедиться, что ваши рабочие и клиенты имеют один и тот же рабочий каталог.
Если это не так, и ваши данные находятся только на вашей клиентской машине, вам придется загружать и рассылать их.
Простой, но неоптимальный
Простым способом является только то, что вы делали изначально, но сохраняйте свой dask.dataframe
log = pd.read_csv('800000test', sep='\t') # on client
logd = dd.from_pandas(log,npartitions=20) # still on client
logd = client.persist(logd) # moves to workers
Это хорошо, но приводит к чуть-чуть-менее идеальному общению.
Комплексный, но оптимальный
Вместо этого вы можете явно разбросать свои данные в кластере
[future] = client.scatter([log])
Это более сложный API, поэтому я просто укажу вам на документы
http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html