Как написать общую переменную в python joblib
Следующий код распараллеливает цикл for.
import networkx as nx;
import numpy as np;
from joblib import Parallel, delayed;
import multiprocessing;
def core_func(repeat_index, G, numpy_arrary_2D):
for u in G.nodes():
numpy_arrary_2D[repeat_index][u] = 2;
return;
if __name__ == "__main__":
G = nx.erdos_renyi_graph(100000,0.99);
nRepeat = 5000;
numpy_array = np.zeros([nRepeat,G.number_of_nodes()]);
Parallel(n_jobs=4)(delayed(core_func)(repeat_index, G, numpy_array) for repeat_index in range(nRepeat));
print(np.mean(numpy_array));
Как видно, ожидаемое значение для печати равно 2. Однако, когда я запускаю свой код в кластере (многоядерная, разделяемая память), он возвращает 0.0.
Я думаю, проблема в том, что каждый рабочий создает свою собственную копию объекта numpy_array
, а созданный в основной функции не обновляется. Как изменить код, чтобы можно было обновить массив numpy numpy_array
?
Ответы
Ответ 1
joblib
по умолчанию использует многопроцессорный пул процессов, как говорится в его руководстве:
Под капотом объект Parallel создает многопроцессорный пул, который разветвляет интерпретатор Python в нескольких процессах для выполнения каждого из элементов списка. Функция с задержкой - это простой трюк, позволяющий создать кортеж (function, args, kwargs) с синтаксисом вызова функции.
Это означает, что каждый процесс наследует исходное состояние массива, но все, что он записывает в него, теряется при выходе из процесса. Только результат функции возвращается обратно в вызывающий (основной) процесс. Но вы ничего не возвращаете, поэтому None
не возвращается.
Чтобы сделать общий массив изменяемым, у вас есть два способа: использование потоков и использование общей памяти.
Потоки, в отличие от процессов, разделяют память. Таким образом, вы можете записать в массив, и каждая работа увидит это изменение. Согласно руководству joblib
, это делается следующим образом:
Parallel(n_jobs=4, backend="threading")(delayed(core_func)(repeat_index, G, numpy_array) for repeat_index in range(nRepeat));
Когда вы запустите это:
$ python r1.py
2.0
Однако, когда вы будете записывать сложные вещи в массив, убедитесь, что вы правильно обрабатываете блокировки вокруг данных или фрагментов данных, иначе вы попадете в условия гонки (Google google).
Также внимательно прочитайте о GIL, так как вычислительная многопоточность в Python ограничена (в отличие от многопоточности ввода/вывода).
Если вам все еще нужны процессы (например, из-за GIL), вы можете поместить этот массив в общую память.
Это немного более сложная тема, но пример joblib + numpy совместно используемой памяти также показан в руководстве joblib
.
Ответ 2
Как писал Сергей в своем ответе, процессы не разделяют состояние и память. Вот почему вы не видите ожидаемого ответа.
Темы разделяют состояние и пространство памяти, поскольку они работают под одним и тем же процессом. Это полезно, если у вас много операций ввода-вывода. Это не даст вам больше вычислительной мощности (больше процессоров) из-за GIL
Один способ взаимодействия между процессами - Прокси-объекты с помощью Менеджера. Вы создаете объект-менеджер, который синхронизирует ресурсы между процессами.
Объект менеджера, возвращаемый менеджером(), управляет процессом сервера, который содержит объекты Python и позволяет другим процессам манипулировать ими с помощью прокси.
Я не тестировал этот код (у меня нет всех модулей, которые вы используете), и для этого может потребоваться больше изменений в коде, но с использованием объекта диспетчера он должен выглядеть так:
if __name__ == "__main__":
G = nx.erdos_renyi_graph(100000,0.99);
nRepeat = 5000;
manager = multiprocessing.Manager()
numpys = manager.list(np.zeros([nRepeat, G.number_of_nodes()])
Parallel(n_jobs=4)(delayed(core_func)(repeat_index, G, numpys, que) for repeat_index in range(nRepeat));
print(np.mean(numpys));