У Python/Django опроса базы данных есть утечка памяти
У меня есть Python script, работающий с Django для базы данных и memcache, но он, в частности, запускается как автономный демон (т.е. не отвечает на запросы веб-серверов). Демон проверяет заявку на модель Django для объектов с status=STATUS_NEW
, а затем отмечает их STATUS_WORKING и помещает их в очередь.
Ряд процессов (созданных с использованием пакета многопроцессорности) вытащит вещи из очереди и будет работать над реквизитом с pr.id
, который был передан в очередь. Я полагаю, что утечка памяти, вероятно, находится в следующем коде (но она может быть в коде "Worker" на другой стороне очереди, хотя это маловероятно, потому что поскольку размер памяти растет, даже когда никаких заявок не возникает, т.е. когда все рабочие блокируются на Queue.get()).
from requisitions.models import Requisition # our Django model
from multiprocessing import Queue
while True:
# Wait for "N"ew requisitions, then pop them into the queue.
for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
pr.set_status(pr.STATUS_WORKING)
pr.save()
queue.put(pr.id)
time.sleep(settings.DAEMON_POLL_WAIT)
Где settings.DAEMON_POLL_WAIT=0.01
.
Кажется, если я оставлю это в течение определенного периода времени (т.е. через пару дней), процесс Python будет расти до бесконечного размера и, в конечном итоге, в системе будет нехватка памяти.
Что происходит здесь (или как я могу узнать), и что еще более важно - как вы можете запустить демон, который делает это?
Моя первая мысль - изменить динамику функции, в частности, поместив проверку на новые объекты заявки в django.core.cache cache
, т.е.
from django.core.cache import cache
while True:
time.sleep(settings.DAEMON_POLL_WAIT)
if cache.get('new_requisitions'):
# Possible race condition
cache.clear()
process_new_requisitions(queue)
def process_new_requisitions(queue):
for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
pr.set_status(pr.STATUS_WORKING)
pr.save()
queue.put(pr.id)
Процесс создания Requisitions с status=STATUS_NEW
может выполнить cache.set('new_requisitions', 1)
(или, альтернативно, мы могли бы поймать сигнал или событие Requisition.save(), когда создается новое Требование, а затем установите флаг в кеше).
Однако я не уверен, что решение, которое я предложил здесь, касается проблем с памятью (которые, вероятно, связаны с сборкой мусора), поэтому обзор с помощью process_new_requisitions
может решить проблему).
Я благодарен за любые мысли и отзывы.
Ответы
Ответ 1
Вам необходимо регулярно reset список запросов, которые Django сохраняет для целей отладки. Обычно он очищается после каждого запроса, но поскольку ваше приложение не основано на запросе, вам необходимо сделать это вручную:
from django import db
db.reset_queries()
См. также:
Ответ 2
Имеет ли файл settings.py для процесса демона DEBUG = True
? Если это так, Django хранит в памяти запись всего SQL, который он запустил до сих пор, что может привести к утечке памяти.
Ответ 3
У меня было много данных, чтобы справиться с этим, поэтому решение этой проблемы заключалось в многопроцессорной обработке и использовании пулов для противодействия разрастанию памяти.
Чтобы это было просто, я просто определил некоторые "глобальные" (верхний уровень, независимо от того, что входит в функции Python), вместо того, чтобы пытаться сделать маринование.
Здесь это в абстрактной форме:
import multiprocessing as mp
WORKERS = 16 # I had 7 cores, allocated 16 because processing was I/O bound
# this is a global function
def worker(params):
# do stuff
return something_for_the_callback_to_analyze
# this is a global function
def worker_callback(worker_return_value):
# report stuff, or pass
# My multiprocess_launch was inside of a class
def multiprocess_launcher(params):
# somehow define a collection
while True:
if len(collection) == 0:
break
# Take a slice
pool_sub_batch = []
for _ in range(WORKERS):
if collection: # as long as there still something in the collection
pool_sub_batch.append( collection.pop() )
# Start a pool, limited to the slice
pool_size = WORKERS
if len(pool_sub_batch) < WORKERS:
pool_size = len(pool_sub_batch)
pool = mp.Pool(processes=pool_size)
for sub_batch in pool_sub_batch:
pool.apply_async(worker, args = (sub_batch), callback = worker_callback)
pool.close()
pool.join()
# Loop, more slices
Ответ 4
Помимо db.reset_queries() и DEBUG = Ложные трюки, вот еще один подход:
Просто создайте другой процесс, который выполняет запрос django и подает очередь. Этот процесс будет работать в собственном контексте памяти, и после выполнения вашей задачи он отпустит вашу память.
Я считаю, что иногда (если не всегда) неизбежно контролировать проблемы с памятью с помощью долгого процесса, который выполняет тяжелые транзакции django.