Dask: Как я буду распараллелить свой код с задержкой dask?
Это мое первое предприятие в параллельной обработке, и я смотрел на Dask, но у меня проблемы с его кодированием.
Я посмотрел их примеры и документацию, и я думаю, что dask.delayed будет работать лучше всего. Я попытался скрыть свои функции с задержкой (имя_функции) или добавить декоратор @delayed, но я не могу заставить его работать исправно. Я предпочел Dask над другими методами, поскольку он сделан в python и для его (предполагаемой) простоты. Я знаю, что dask не работает в цикле for, но они говорят, что он может работать внутри цикла.
Мой код передает файлы через функцию, содержащую входные данные для других функций, и выглядит так:
from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
name = name.split('.')[0]
....
затем выполните некоторую предварительную обработку ex:
preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
то я вызываю конструктор и передаю pre_results в вызовы функции:
fc = FunctionCalls()
Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
input_data=pre_result1, model1=pre_result2)
Что я здесь делаю, я передаю файл в цикл for, делаю предварительную обработку, а затем передаю файл в две модели.
Мысли или советы о том, как это сделать? Я начал получать нечетные ошибки, и я понятия не имел, как исправить код. Код действительно работает так, как есть. Я использую кучу массивов данных pandas, рядов и numpy, и я бы предпочел не возвращаться и не менять все, чтобы работать с dask.dataframes и т.д.
Код в моем комментарии может быть трудночитаемым. Здесь он более форматирован.
В приведенном ниже коде, когда я печатаю print (mean_squared_error), я просто получаю: Delayed ('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = delayed(mse)(observed, prediction)
Ответы
Ответ 1
Вам нужно вызвать dask.compute, чтобы в конечном итоге вычислить результат. См. Документацию dask.delayed.
Последовательный код
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
results = []
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1) # isn't this already a dataframe?
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = mse(observed, prediction)
results.append(mean_squared_error)
Параллельный код
import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
delayed_results = []
for count, name in enumerate(filenames):
df = dask.delayed(pd.read_csv)(name)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = dask.delayed(mse)(observed, prediction)
delayed_results.append(mean_squared_error)
results = dask.compute(*delayed_results)
Ответ 2
ИМО, гораздо более ясное решение, чем принятый ответ, - этот фрагмент.
import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
def compute_mse(file_name):
df = pd.read_csv(file_name)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
return mse(observed, prediction)
delayed_results = [delayed(compute_mse)(file_name) for file_name in filenames]
mean_squared_errors = compute(*delayed_results, scheduler="processes")