Retry Lost или Failed Tasks (Сельдерей, Django и RabbitMQ)

Есть ли способ определить, потеряна ли какая-либо задача и повторить ее?

Я думаю, что причиной потери может быть ошибка диспетчера или авария рабочего потока.

Я планировал повторить их, но я не уверен, как определить, какие задачи нужно повторить?

И как сделать этот процесс автоматически? Могу ли я использовать свой собственный планировщик, который будет создавать новые задачи?

Изменить: я нашел из документации, что RabbitMQ никогда не потерял задачи, но что происходит, когда рабочий поток падает в середине выполнения задачи?

Ответы

Ответ 1

Вам нужно установить

CELERY_ACKS_LATE = True

Late ack означает, что сообщения задачи будут подтверждены после выполнения задачи, не только раньше, это поведение по умолчанию. Таким образом, если у работника случится сбой кролика, MQ все равно получит сообщение.

Очевидно, что общий крах (Rabbit + workers) в то же время не может восстановить задачу, за исключением случаев, когда вы выполняете ведение журнала при запуске задачи и завершении задачи. Лично я пишу в mongodb строку каждый раз, когда запускается задача, а другая, когда задача заканчивается (независимо формировать результат), таким образом я могу узнать, какая задача была прервана анализом журналов mongo.

Вы можете сделать это легко, переопределив методы __call__ и after_return базового класса задач сельдерея.

После этого вы увидите часть моего кода, которая использует класс taskLogger в качестве диспетчера контекста (с точкой входа и выхода). Класс taskLogger просто записывает строку, содержащую информацию о задаче в экземпляре mongodb.

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

Надеюсь, это поможет