Задача сельдерея и настройка декоратора
Я работаю над проектом, использующим django и сельдерей (django-celery). Наша команда решила обернуть весь код доступа к данным в (app-name)/manager.py
(НЕ переносить в Менеджеров, как способ django
), и разрешить код в (app-name)/task.py только для сборки и выполнения задач с сельдереем мы не имеем зависимости ORM от Django в этом слое).
В моем manager.py
у меня есть что-то вроде этого:
def get_tag(tag_name):
ctype = ContentType.objects.get_for_model(Photo)
try:
tag = Tag.objects.get(name=tag_name)
except ObjectDoesNotExist:
return Tag.objects.none()
return tag
def get_tagged_photos(tag):
ctype = ContentType.objects.get_for_model(Photo)
return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)
def get_tagged_photos_count(tag):
return get_tagged_photos(tag).count()
В моей task.py мне нравится включать их в задачи (тогда, возможно, использовать эти задачи для выполнения более сложных задач), поэтому я пишу этот декоратор:
import manager #the module within same app containing data access functions
class mfunc_to_task(object):
def __init__(mfunc_type='get'):
self.mfunc_type = mfunc_type
def __call__(self, f):
def wrapper_f(*args, **kwargs):
callback = kwargs.pop('callback', None)
mfunc = getattr(manager, f.__name__)
result = mfunc(*args, **kwargs)
if callback:
if self.mfunc_type == 'get':
subtask(callback).delay(result)
elif self.mfunc_type == 'get_or_create':
subtask(callback).delay(result[0])
else:
subtask(callback).delay()
return result
return wrapper_f
затем (все еще в task.py
):
#@task
@mfunc_to_task()
def get_tag():
pass
#@task
@mfunc_to_task()
def get_tagged_photos():
pass
#@task
@mfunc_to_task()
def get_tagged_photos_count():
pass
Все отлично работает без @task
.
Но после применения этого декоратора @task
(вверху, как указано в документации по сельдерину), все начинает разваливаться. По-видимому, каждый раз, когда вызывается mfunc_to_task.__call__
, те же функции task.get_tag
передаются как f
. Таким образом, я получал один и тот же wrapper_f
каждый раз, и теперь единственное, что я делаю, это получить один тег.
Я новичок в декораторах. Любой может помочь мне понять, что здесь не так, или указать другие способы достижения этой задачи? Мне очень жаль, что я написал один и тот же код для каждой функции доступа к данным.
Ответы
Ответ 1
Не совсем понятно, почему передаваемые аргументы не будут работать?
если вы используете этот пример:
@task()
def add(x, y):
return x + y
позволяет добавить некоторые записи в MyCoolTask:
from celery import task
from celery.registry import tasks
import logging
import celery
logger = logging.getLogger(__name__)
class MyCoolTask(celery.Task):
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
logger.info("Starting to run")
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point of the task whatever is the state
logger.info("Ending run")
pass
и создайте расширенный класс (расширяющий MyCoolTask, но теперь с аргументами):
class AddTask(MyCoolTask):
def run(self,x,y):
if x and y:
result=add(x,y)
logger.info('result = %d' % result)
return result
else:
logger.error('No x or y in arguments')
tasks.register(AddTask)
и убедитесь, что вы передаете kwargs как данные json:
{"x":8,"y":9}
Получаю результат:
[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17
Ответ 2
Вместо использования декоратора, почему вы не создаете базовый класс, расширяющий celery.Task
?
Таким образом, все ваши задачи могут расширить ваш настраиваемый класс задач, где вы можете реализовать свое личное поведение с помощью методов __call__
и after_return
,
Вы также можете определить общие методы и объект для всей своей задачи.
class MyCoolTask(celery.Task):
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point of the task whatever is the state
pass