Ответ 1
map_partitions
Вы можете применить свою функцию ко всем разделам вашего фрейма данных с помощью функции map_partitions
.
df.map_partitions(func, columns=...)
Обратите внимание, что func будет указывать только часть набора данных за раз, а не весь набор данных, например, с помощью pandas apply
(который предположительно вам не нужен, если вы хотите сделать parallelism.)
map
/apply
Вы можете сопоставить функцию по ряду последовательностей с помощью map
df.mycolumn.map(func)
Вы можете сопоставить функцию по-разному по файловому кадру с помощью apply
df.apply(func, axis=1)
Темы против процессов
Начиная с версии 0.6.0 dask.dataframes
распараллеливается с потоками. Пользовательские функции Python не получат большой пользы от parallelism на основе потоков. Вместо этого вы можете попробовать процессы
df = dd.read_csv(...)
from dask.multiprocessing import get
df.map_partitions(func, columns=...).compute(get=get)
Но избегайте apply
Однако вы действительно должны избегать apply
с пользовательскими функциями Python, как в Pandas, так и в Dask. Это часто является источником плохой работы. Может быть, если вы найдете способ сделать свою операцию в векторном виде, то может быть, ваш код Pandas будет на 100 раз быстрее, и вам вообще не понадобится dask.dataframe.
Рассмотрим numba
Для вашей конкретной проблемы вы можете рассмотреть numba
. Это значительно улучшает вашу производительность.
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)
In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --
In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms
In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)
In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms
In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
Отказ от ответственности, я работаю в компании, которая делает как numba
, так и dask
и использует многих разработчиков pandas
.