Как реализовать авторесурсы для задач Сельдерея
В Celery, вы можете retry
выполнить любую задачу в случае исключения. Вы можете сделать это так:
@task(max_retries=5)
def div(a, b):
try:
return a / b
except ZeroDivisionError, exc:
raise div.retry(exc=exc)
В этом случае, если вы хотите разделить на ноль, задача будет выполняться пять раз. Но вы должны явно проверить наличие ошибок в вашем коде. Задание не будет выполнено, если вы пропустите блок try-except
.
Я хочу, чтобы мои функции выглядели следующим образом:
@celery.task(autoretry_on=ZeroDivisionError, max_retries=5)
def div(a, b):
return a / b
Ответы
Ответ 1
Я искал эту проблему некоторое время, но нашел только этот запрос функции.
Я решил написать свой собственный декоратор для выполнения повторных попыток:
def task_autoretry(*args_task, **kwargs_task):
def real_decorator(func):
@task(*args_task, **kwargs_task)
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except kwargs_task.get('autoretry_on', Exception), exc:
wrapper.retry(exc=exc)
return wrapper
return real_decorator
С помощью этого декоратора я могу перезаписать свою предыдущую задачу:
@task_autoretry(autoretry_on=ZeroDivisionError, max_retries=5)
def div(a, b):
return a / b
Ответ 2
Сельдерей (начиная с версии 4.0) имеет именно то, что вы искали:
@app.task(autoretry_for=(SomeException,))
def my_task():
...
Смотрите: http://docs.celeryproject.org/en/latest/userguide/tasks.html#automatic-retry-for-known-exceptions
Ответ 3
Я изменил ваш ответ для работы с существующим API Celery (в настоящее время 3.1.17)
class MyCelery(Celery):
def task(self, *args_task, **opts_task):
def real_decorator(func):
sup = super(MyCelery, self).task
@sup(*args_task, **opts_task)
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except opts_task.get('autoretry_on', Exception) as exc:
logger.info('Yo! We did it!')
wrapper.retry(exc=exc, args=args, kwargs=kwargs)
return wrapper
return real_decorator
Затем в ваших задачах
app = MyCelery()
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(autoretry_on=Exception)
def mytask():
raise Exception('Retrying!')
Это позволяет добавлять функции autoretry_on к вашим задачам без необходимости использовать отдельный декоратор для определения задач.