Параллелизировать применять после pandas groupby
Я использовал rosetta.parallel.pandas_easy для параллелизации, применяемого после группы, например:
from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)
Однако кто-нибудь понял, как распараллелить функцию, которая возвращает фреймворк? Этот код не работает для rosetta, как и ожидалось.
def tmpFunc(df):
df['c'] = df.a + df.b
return df
df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
Ответы
Ответ 1
Кажется, что это работает, хотя он действительно должен быть встроен в pandas
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
def tmpFunc(df):
df['c'] = df.a + df.b
return df
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
if __name__ == '__main__':
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)
print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)
print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)
Ответ 2
Ответ Ivan велик, но похоже, что его можно немного упростить, также устраняя необходимость зависеть от joblib:
from multiprocessing import Pool, cpu_count
def applyParallel(dfGrouped, func):
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for name, group in dfGrouped])
return pandas.concat(ret_list)
Кстати: это не может заменить какой-либо groupby.apply(), но он будет охватывать типичные случаи: например. он должен охватывать случаи 2 и 3 в документации, в то время как вы должны получить поведение case 1, указав аргумент axis=1
до окончательного вызова pandas.concat()
.
Ответ 3
У меня есть хак, который я использую для получения распараллеливания в Pandas. Я разбиваю свою фреймворк на куски, помещаю каждый фрагмент в элемент списка, а затем использую параллельные биты ipython для параллельной работы в списке данных. Затем я вернул список вместе с помощью функции pandas concat
.
Однако это не применимо. Это работает для меня, потому что функция, которую я хочу применить к каждому фрагменту кадра данных, занимает около минуты. И вытягивание и сбор моих данных не так долго. Таким образом, это, безусловно, куд. С этим сказал, вот пример. Я использую ноутбук Ipython, поэтому в моем коде вы увидите магию %%time
:
## make some example data
import pandas as pd
np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n),
'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')
В этом примере я собираюсь сделать "куски" на основе вышеперечисленного groupby, но это не обязательно должно быть так, как данные разбиваются на разделы. Хотя это довольно распространенная картина.
dflist = []
for name, group in grouped:
dflist.append(group)
настроить параллельные биты
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True
напишите глупую функцию, применимую к нашим данным
def myFunc(inDf):
inDf['newCol'] = inDf.data ** 10
return inDf
теперь можно запустить код последовательно, затем параллельно.
сначала:
%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s
теперь параллельна
%%time
parallel_list = lview.map(myFunc, dflist)
CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s
то требуется всего несколько мс, чтобы объединить их обратно в один фрейм данных
%%time
combinedDf = pd.concat(parallel_list)
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms
На моем MacBook работает 6 IPython-движков, но вы можете увидеть, что он сокращает время выполнения до 2 с от 14 секунд.
Для действительно длительных стохастических симуляций я могу использовать AWS-сервер, создав кластер с StarCluster. Однако большую часть времени я распараллеливаю только 8 процессоров на моем MBP.
Ответ 4
Краткий комментарий для сопровождения ответа JD Long. Я обнаружил, что если количество групп очень велико (скажем, сотни тысяч), и ваша прикладная функция делает что-то довольно простое и быстрое, а затем разбивает ваш блок данных на куски и назначает каждому куску работнику для выполнения groupby-apply (в последовательном порядке) может быть намного быстрее, чем выполнять параллельную групповую подачу заявки, а рабочие считывают очередь, содержащую множество групп. Пример:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
Итак, наш фреймворк выглядит так:
a
0 3425
1 1016
2 8141
3 9263
4 8018
Обратите внимание, что столбец "a" имеет много групп (думаю, идентификаторы клиентов):
len(df.a.unique())
15000
Функция для работы с нашими группами:
def f1(group):
time.sleep(0.0001)
return group
Запустите пул:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
Сделайте параллельную групповую подачу:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
Теперь добавим столбец, который разбивает df на несколько меньших групп:
df['b'] = np.random.randint(0, 12, nrows)
Теперь вместо 15000 групп есть только 12:
len(df.b.unique())
12
Мы разделим наш df и применим groupby-apply на каждом фрагменте.
ppe = ProcessPoolExecutor(12)
Wrapper fun:
def f2(df):
df.groupby('a').apply(f1)
return df
Отправляйте каждый блок, который будет работать последовательно:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
Обратите внимание, что количество времени, затрачиваемого на группу, не изменилось. Скорее, что изменилось, это длина очереди, из которой считаются рабочие. Я подозреваю, что происходит то, что рабочие не могут одновременно обращаться к разделяемой памяти и постоянно возвращаются к чтению очереди, и поэтому наступают друг на друга. С более крупными кусками, чтобы работать, рабочие возвращаются реже, и поэтому эта проблема улучшается, а общее выполнение выполняется быстрее.
Ответ 5
Лично я рекомендовал бы использовать dask, согласно этой теме.
Как указал @chrisb, многопроцессорная обработка с использованием панд в python может привести к ненужным накладным расходам. Он также может работать не так хорошо, как многопоточность или даже как один поток.
Dask создан специально для мультипроцессинга.