Как использовать инициализатор для настройки моего многопроцессорного пула?
Я пытаюсь использовать объект многопроцессного пула. Я хотел бы, чтобы каждый процесс открыл соединение с базой данных при его запуске, а затем использовал это соединение для обработки передаваемых данных. (Вместо того, чтобы открывать и закрывать соединение для каждого бита данных.) Кажется, что инициализатор для, но я не могу оборачивать голову тем, как взаимодействует рабочий и инициализатор. Поэтому у меня есть что-то вроде этого:
def get_cursor():
return psycopg2.connect(...).cursor()
def process_data(data):
# here I'd like to have the cursor so that I can do things with the data
if __name__ == "__main__":
pool = Pool(initializer=get_cursor, initargs=())
pool.map(process_data, get_some_data_iterator())
как мне (или сделать I) вернуть курсор из get_cursor() в process_data()?
Ответы
Ответ 1
Функция инициализации вызывается так:
def worker(...):
...
if initializer is not None:
initializer(*args)
поэтому никакого возвращаемого значения не сохраняется. Вы можете подумать, что это обрекает вас, но нет! Каждый работник находится в отдельном процессе. Таким образом, вы можете использовать обычную переменную global
.
Это не совсем красиво, но работает:
cursor = None
def set_global_cursor(...):
global cursor
cursor = ...
Теперь вы можете просто использовать cursor
в своей функции process_data
. Переменная cursor
внутри каждого отдельного процесса отделена от всех других процессов, поэтому они не наступают друг на друга.
(я понятия не имею, имеет ли psycopg2
другой способ борьбы с этим, который не включает в себя прежде всего использование multiprocessing
, это подразумевается как общий ответ на общую проблему с модулем multiprocessing
.)
Ответ 2
torek уже дал хорошее объяснение, почему инициализатор не работает в этом случае. Тем не менее, я не поклонник Глобальной переменной лично, поэтому я хотел бы вставить другое решение здесь.
Идея состоит в том, чтобы использовать класс для переноса функции и инициализировать класс с помощью "глобальной" переменной.
class Processor(object):
"""Process the data and save it to database."""
def __init__(self, credentials):
"""Initialize the class with 'global' variables"""
self.cursor = psycopg2.connect(credentials).cursor()
def __call__(self, data):
"""Do something with the cursor and data"""
self.cursor.find(data.key)
И затем вызовите с помощью
p = Pool(5)
p.map(Processor(credentials), list_of_data)
Итак, первый параметр инициализировал класс с учетными данными, возвращает экземпляр класса и карты, вызывающий экземпляр с данными.
Хотя это не так просто, как решение глобальной переменной, я настоятельно рекомендую избегать глобальной переменной и инкапсулировать переменные каким-то безопасным способом. (И я действительно хочу, чтобы они могли поддерживать выражение лямбда в один прекрасный день, это значительно упростит...)
Ответ 3
Вы также можете отправить функцию вместе с инициализатором и создать в нем соединение. После этого вы добавляете курсор к функции.
def init_worker(function):
function.cursor = db.conn()
Теперь вы можете получить доступ к db через function.cursor, не используя глобальные переменные, например:
def use_db(i):
print(use_db.cursor) #process local
pool = Pool(initializer=init_worker, initargs=(use_db,))
pool.map(use_db, range(10))
Ответ 4
Учитывая, что определение глобальных переменных в инициализаторе обычно нежелательно, мы можем избежать их использования, а также избежать повторной дорогостоящей инициализации в каждом вызове с простым кэшированием в каждом подпроцессе:
from functools import lru_cache
from multiprocessing.pool import Pool
from time import sleep
@lru_cache(maxsize=None)
def _initializer(a, b):
print(f'Initialized with {a}, {b}')
def _pool_func(a, b, i):
_initializer(a, b)
sleep(1)
print(f'got {i}')
arg_a = 1
arg_b = 2
with Pool(processes=10) as pool:
pool.starmap(_pool_func, ((arg_a, arg_b, i) for i in range(0, 100)))