У Python/Django опроса базы данных есть утечка памяти

У меня есть Python script, работающий с Django для базы данных и memcache, но он, в частности, запускается как автономный демон (т.е. не отвечает на запросы веб-серверов). Демон проверяет заявку на модель Django для объектов с status=STATUS_NEW, а затем отмечает их STATUS_WORKING и помещает их в очередь.

Ряд процессов (созданных с использованием пакета многопроцессорности) вытащит вещи из очереди и будет работать над реквизитом с pr.id, который был передан в очередь. Я полагаю, что утечка памяти, вероятно, находится в следующем коде (но она может быть в коде "Worker" на другой стороне очереди, хотя это маловероятно, потому что поскольку размер памяти растет, даже когда никаких заявок не возникает, т.е. когда все рабочие блокируются на Queue.get()).

from requisitions.models import Requisition # our Django model
from multiprocessing import Queue

while True:
    # Wait for "N"ew requisitions, then pop them into the queue.
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
        pr.set_status(pr.STATUS_WORKING)
        pr.save()
        queue.put(pr.id)

    time.sleep(settings.DAEMON_POLL_WAIT)

Где settings.DAEMON_POLL_WAIT=0.01.

Кажется, если я оставлю это в течение определенного периода времени (т.е. через пару дней), процесс Python будет расти до бесконечного размера и, в конечном итоге, в системе будет нехватка памяти.

Что происходит здесь (или как я могу узнать), и что еще более важно - как вы можете запустить демон, который делает это?

Моя первая мысль - изменить динамику функции, в частности, поместив проверку на новые объекты заявки в django.core.cache cache, т.е.

from django.core.cache import cache

while True:
    time.sleep(settings.DAEMON_POLL_WAIT)
    if cache.get('new_requisitions'):
       # Possible race condition
       cache.clear()
       process_new_requisitions(queue)

 def process_new_requisitions(queue):
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
        pr.set_status(pr.STATUS_WORKING)
        pr.save()
        queue.put(pr.id)

Процесс создания Requisitions с status=STATUS_NEW может выполнить cache.set('new_requisitions', 1) (или, альтернативно, мы могли бы поймать сигнал или событие Requisition.save(), когда создается новое Требование, а затем установите флаг в кеше).

Однако я не уверен, что решение, которое я предложил здесь, касается проблем с памятью (которые, вероятно, связаны с сборкой мусора), поэтому обзор с помощью process_new_requisitions может решить проблему).

Я благодарен за любые мысли и отзывы.

Ответы

Ответ 1

Вам необходимо регулярно reset список запросов, которые Django сохраняет для целей отладки. Обычно он очищается после каждого запроса, но поскольку ваше приложение не основано на запросе, вам необходимо сделать это вручную:

from django import db

db.reset_queries()

См. также:

  • "Отладка утечки памяти Django с помощью TrackRefs и Guppy" от Mikko Ohtamaa:

    Django отслеживает все запросы для отладочные цели (connection.queries). Этот список сбрасывается в конце HTTP-запроса. Но в автономном режиме нет Запросы. Поэтому вам нужно вручную reset в список запросов после каждого рабочий цикл

  • "Почему Django пропускает память?" в FAQ Django - он говорит как о настройке DEBUG в False, что всегда важно, и об очистке списка запросов с помощью db.reset_queries(), важна в таких приложениях, как ваша.

Ответ 2

Имеет ли файл settings.py для процесса демона DEBUG = True? Если это так, Django хранит в памяти запись всего SQL, который он запустил до сих пор, что может привести к утечке памяти.

Ответ 3

У меня было много данных, чтобы справиться с этим, поэтому решение этой проблемы заключалось в многопроцессорной обработке и использовании пулов для противодействия разрастанию памяти.

Чтобы это было просто, я просто определил некоторые "глобальные" (верхний уровень, независимо от того, что входит в функции Python), вместо того, чтобы пытаться сделать маринование.

Здесь это в абстрактной форме:

import multiprocessing as mp

WORKERS = 16 # I had 7 cores, allocated 16 because processing was I/O bound

# this is a global function
def worker(params):
  # do stuff
  return something_for_the_callback_to_analyze

# this is a global function
def worker_callback(worker_return_value):
  # report stuff, or pass

# My multiprocess_launch was inside of a class
def multiprocess_launcher(params):
  # somehow define a collection
  while True:
    if len(collection) == 0:
      break
    # Take a slice
    pool_sub_batch = []
    for _ in range(WORKERS):
      if collection: # as long as there still something in the collection
        pool_sub_batch.append( collection.pop() )
    # Start a pool, limited to the slice
    pool_size = WORKERS
    if len(pool_sub_batch) < WORKERS:
      pool_size = len(pool_sub_batch)
    pool = mp.Pool(processes=pool_size)
    for sub_batch in pool_sub_batch:
      pool.apply_async(worker, args = (sub_batch), callback = worker_callback)
    pool.close()
    pool.join()
    # Loop, more slices

Ответ 4

Помимо db.reset_queries() и DEBUG = Ложные трюки, вот еще один подход: Просто создайте другой процесс, который выполняет запрос django и подает очередь. Этот процесс будет работать в собственном контексте памяти, и после выполнения вашей задачи он отпустит вашу память.

Я считаю, что иногда (если не всегда) неизбежно контролировать проблемы с памятью с помощью долгого процесса, который выполняет тяжелые транзакции django.