Ответ 1
Вам нужно установить
CELERY_ACKS_LATE = True
Late ack означает, что сообщения задачи будут подтверждены после выполнения задачи, не только раньше, это поведение по умолчанию. Таким образом, если у работника случится сбой кролика, MQ все равно получит сообщение.
Очевидно, что общий крах (Rabbit + workers) в то же время не может восстановить задачу, за исключением случаев, когда вы выполняете ведение журнала при запуске задачи и завершении задачи. Лично я пишу в mongodb строку каждый раз, когда запускается задача, а другая, когда задача заканчивается (независимо формировать результат), таким образом я могу узнать, какая задача была прервана анализом журналов mongo.
Вы можете сделать это легко, переопределив методы __call__
и after_return
базового класса задач сельдерея.
После этого вы увидите часть моего кода, которая использует класс taskLogger в качестве диспетчера контекста (с точкой входа и выхода). Класс taskLogger просто записывает строку, содержащую информацию о задаче в экземпляре mongodb.
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
#Inizialize context managers
self.taskLogger = TaskLogger(args, kwargs)
self.taskLogger.__enter__()
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point for context managers
self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)
Надеюсь, это поможет