Использование высокой памяти с использованием многопроцессорности Python
Я видел пару сообщений об использовании памяти, используя модуль Pipon Multiprocessing. Однако вопросы, кажется, не отвечают на проблему, которую я здесь. Я отправляю свой анализ с надеждой, что кто-то может мне помочь.
Проблема
Я использую многопроцессорную обработку для выполнения задач параллельно, и я заметил, что потребление памяти рабочими процессами растет бесконечно. У меня есть небольшой отдельный пример, который должен повторить то, что я замечаю.
import multiprocessing as mp
import time
def calculate(num):
l = [num*num for num in range(num)]
s = sum(l)
del l # delete lists as an option
return s
if __name__ == "__main__":
pool = mp.Pool(processes=2)
time.sleep(5)
print "launching calculation"
num_tasks = 1000
tasks = [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
for f in tasks:
print f.get(5)
print "calculation finished"
time.sleep(10)
print "closing pool"
pool.close()
print "closed pool"
print "joining pool"
pool.join()
print "joined pool"
time.sleep(5)
Система
Я запускаю Windows, и я использую диспетчер задач для контроля использования памяти. Я запускаю Python 2.7.6.
Наблюдение
Я обобщил потребление памяти двумя рабочими процессами ниже.
+---------------+----------------------+----------------------+
| num_tasks | memory with del | memory without del |
| | proc_1 | proc_2 | proc_1 | proc_2 |
+---------------+----------------------+----------------------+
| 1000 | 4884 | 4694 | 4892 | 4952 |
| 5000 | 5588 | 5596 | 6140 | 6268 |
| 10000 | 6528 | 6580 | 6640 | 6644 |
+---------------+----------------------+----------------------+
В приведенной выше таблице я попытался изменить количество задач и понаблюдать за памятью, потребляемой в конце всех вычислений, и до join
-в <<23 > . Параметры "del" и "без del" - это то, что я не комментирую или комментирую строку del l
внутри функции calculate(num)
, соответственно. Перед вычислением потребление памяти составляет около 4400.
- Похоже на то, что вручную очистка списков приводит к снижению использования памяти для рабочих процессов. Я думал, что сборщик мусора позаботился об этом. Есть ли способ принудительного сбора мусора?
- Непонятно, что с увеличением числа задач использование памяти в обоих случаях растет. Есть ли способ ограничить использование памяти?
У меня есть процесс, основанный на этом примере и предназначенный для долгосрочного выполнения. Я наблюдаю, что эти рабочие процессы забивают много памяти (~ 4 ГБ) после ночного прогона. Выполнение join
для освобождения памяти не является опцией, и я пытаюсь найти способ без join
-ing.
Это кажется немного загадочным. Кто-нибудь сталкивался с чем-то похожим? Как я могу исправить эту проблему?
Ответы
Ответ 1
Я провел много исследований и не смог найти решение, чтобы решить проблему как таковую. Но есть приличная работа вокруг, которая предотвращает выброс памяти за небольшую стоимость, особенно на длинном рабочем коде на стороне сервера.
Решением по существу было перезапустить отдельные рабочие процессы после определенного количества задач. Класс Pool
в python принимает maxtasksperchild
как аргумент. Вы можете указать maxtasksperchild=1000
, таким образом ограничивая 1000 задач, которые будут выполняться на каждом дочернем процессе. После достижения номера maxtasksperchild
пул обновляет свои дочерние процессы. Используя разумное число для максимальных задач, можно сбалансировать максимальную память, которая потребляется, с начальными затратами, связанными с перезапуском внутреннего процесса. Конструкция Pool
выполняется как:
pool = mp.Pool(processes=2,maxtasksperchild=1000)
Я размещаю свое полное решение здесь, чтобы оно могло быть полезным для других!
import multiprocessing as mp
import time
def calculate(num):
l = [num*num for num in range(num)]
s = sum(l)
del l # delete lists as an option
return s
if __name__ == "__main__":
# fix is in the following line #
pool = mp.Pool(processes=2,maxtasksperchild=1000)
time.sleep(5)
print "launching calculation"
num_tasks = 1000
tasks = [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
for f in tasks:
print f.get(5)
print "calculation finished"
time.sleep(10)
print "closing pool"
pool.close()
print "closed pool"
print "joining pool"
pool.join()
print "joined pool"
time.sleep(5)