Как вы распараллеливаете apply() на Pandas Dataframes, используя все ядра на одной машине?
По состоянию на август 2017 года Pandas DataFame.apply(), к сожалению, по-прежнему ограничивается работой с одним ядром, что означает, что мульти -core машина будет тратить большую часть своего времени вычисления при запуске df.apply(myfunc, axis=1)
.
Как вы можете использовать все свои ядра для запуска приложения параллельно на базе данных?
Ответы
Ответ 1
Самый простой способ - использовать Dask map_partitions. Им нужен этот импорт (вам нужно pip install dask
):
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get
и синтаксис
data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y,z, ...): return <whatever>
res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)
(я считаю, что 30 - подходящее количество разделов, если у вас 16 ядер). Просто для полноты я подсчитал разницу на моей машине (16 ядер):
data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)
def vectorized(): return myfunc(data['col1'], data['col2'] )
t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))
+28,16970546543598
t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))
2,708152851089835
t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))
+0,010668013244867325
Предоставление коэффициента 10 ускорений, идущих от pandas, применимо к применению dask на разделах. Конечно, если у вас есть функция, которую вы можете векторизовать, вам нужно - в этом случае функция (y*(x**2+1)
) тривиально векторизована, но есть много вещей, которые невозможно векторизовать.
Ответ 2
Вы можете использовать пакет swifter
:
pip install swifter
Он работает как плагин для панд, позволяя вам повторно использовать функцию apply
:
import swifter
def some_function(data):
return data * 10
data['out'] = data['in'].swifter.apply(some_function)
Он автоматически определит наиболее эффективный способ распараллеливания функции, независимо от того, векторизована ли она (как в приведенном выше примере) или нет.
Дополнительные примеры и сравнения производительности доступны на GitHub. Обратите внимание, что пакет находится в активной разработке, поэтому API может измениться.
Также обратите внимание, что это не будет работать автоматически для строковых столбцов. При использовании строк Swifter будет использовать "простые" панды apply
, которые не будут параллельными. В этом случае даже принудительное использование dask
не приведет к повышению производительности, и вам будет лучше просто разделить ваш набор данных вручную и распараллелить, используя multiprocessing
.
Ответ 3
вместо этого вы можете попробовать pandarallel
: простой и эффективный инструмент для распараллеливания ваших операций pandas на всех ваших процессорах (в Linux и macOS)
- Распараллеливание сопряжено с затратами (создание новых процессов, отправка данных через общую память и т.д.), Поэтому распараллеливание эффективно только в том случае, если объем параллелизации вычислений достаточно высок. Для очень небольшого количества данных, использование параллелизации не всегда того стоит.
- Применяемые функции НЕ должны быть лямбда-функциями.
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)
# ALLOWED
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
см. https://github.com/nalepae/pandarallel
Ответ 4
Если вы хотите остаться в нативном питоне:
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(f, df['col'])
будет параллельно применять функцию f
к столбцу col
кадра данных df
Ответ 5
Вот пример базового трансформатора sklearn, в котором применяются панды распараллелены
import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator
class ParllelTransformer(BaseEstimator, TransformerMixin):
def __init__(self,
n_jobs=1):
"""
n_jobs - parallel jobs to run
"""
self.variety = variety
self.user_abbrevs = user_abbrevs
self.n_jobs = n_jobs
def fit(self, X, y=None):
return self
def transform(self, X, *_):
X_copy = X.copy()
cores = mp.cpu_count()
partitions = 1
if self.n_jobs <= -1:
partitions = cores
elif self.n_jobs <= 0:
partitions = 1
else:
partitions = min(self.n_jobs, cores)
if partitions == 1:
# transform sequentially
return X_copy.apply(self._transform_one)
# splitting data into batches
data_split = np.array_split(X_copy, partitions)
pool = mp.Pool(cores)
# Here reduce function - concationation of transformed batches
data = pd.concat(
pool.map(self._preprocess_part, data_split)
)
pool.close()
pool.join()
return data
def _transform_part(self, df_part):
return df_part.apply(self._transform_one)
def _transform_one(self, line):
# some kind of transformations here
return line
для получения дополнительной информации см. https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8