Pandas применяется многопроцессорная обработка
Я пытаюсь использовать многопроцессорную обработку с pandas dataframe, который разделяет dataframe на 8 частей. примените некоторую функцию к каждой части, используя apply (каждая часть обрабатывается в другом процессе).
EDIT:
Здесь я наконец нашел решение:
import multiprocessing as mp
import pandas.util.testing as pdt
def process_apply(x):
# do some stuff to data here
def process(df):
res = df.apply(process_apply, axis=1)
return res
if __name__ == '__main__':
p = mp.Pool(processes=8)
split_dfs = np.array_split(big_df,8)
pool_results = p.map(aoi_proc, split_dfs)
p.close()
p.join()
# merging parts processed by different processes
parts = pd.concat(pool_results, axis=0)
# merging newly calculated parts to big_df
big_df = pd.concat([big_df, parts], axis=1)
# checking if the dfs were merged correctly
pdt.assert_series_equal(parts['id'], big_df['id'])
Ответы
Ответ 1
Более общая версия, основанная на авторском решении, которая позволяет запускать ее для каждой функции и фрейма данных:
from multiprocessing import Pool
from functools import partial
import numpy as np
def parallelize(data, func, num_of_processes=8):
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def run_on_subset(func, data_subset):
return data_subset.apply(func, axis=1)
def parallelize_on_rows(data, func, num_of_processes=8):
return parallelize(data, partial(run_on_subset, func), num_of_processes)
Итак, следующая строка:
df.apply(some_func, axis=1)
Станет:
parallelize_on_rows(df, some_func)
Ответ 2
Так как у меня не так много данных script, это предположение, но я бы предложил использовать p.map
вместо apply_async
с обратным вызовом.
p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
results.extend(result)
Ответ 3
Вы можете использовать https://github.com/nalepae/pandarallel, как в следующем примере:
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
Ответ 4
Это хорошо сработало для меня:
rows_iter = (row for _, row in df.iterrows())
with multiprocessing.Pool() as pool:
df['new_column'] = pool.map(process_apply, rows_iter)
Ответ 5
Я также сталкиваюсь с той же проблемой, когда я использую multiprocessing.map()
для применения функции к другому фрагменту большого блока данных.
Я просто хочу добавить несколько точек на всякий случай, если другие люди столкнутся с той же проблемой, что и я.
- не забудьте добавить
if __name__ == '__main__':
- выполните файл в файле
.py
, если вы используете ipython/jupyter notebook
, тогда вы не можете запустить multiprocessing
(это верно для моего случая, хотя я не знаю)
Ответ 6
Установите Pyxtension, который упрощает использование параллельной карты, и используйте так:
from pyxtension.streams import stream
big_df = pd.concat(stream(np.array_split(df, multiprocessing.cpu_count())).mpmap(process))