Как отслеживать события от работников в приложении Celery-Django?
В соответствии с учебным пособием по сельдеру о мониторинге работы сельдерей в режиме реального времени можно также программно фиксировать события, производимые рабочими, и предпринимать соответствующие действия.
Мой вопрос заключается в том, как интегрировать монитор в качестве примера в этом в приложении Celery-Django?
EDIT:
Пример кода в учебнике выглядит так:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
task_id = event['uuid']
print('TASK FAILED: %s[%s] %s' % (
event['name'], task_id, state[task_id].info(), ))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'worker-heartbeat': announce_dead_workers,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
celery = Celery(broker='amqp://[email protected]//')
my_monitor(celery)
Итак, я хочу захватить событие task_failed, отправленное рабочим, и получить его task_id, как показано в уроке, чтобы получить результат для этой задачи из базы данных результатов, которая была настроена для моего приложения, и обработать ее дальше. Моя проблема в том, что для меня не очевидно, как получить приложение, так как в проекте django-celery я не прозрачен для экземпляра библиотеки Celery.
Я также открыт для любой другой идеи о том, как обрабатывать результаты, когда работник закончил выполнение задачи.
Ответы
Ответ 1
Хорошо, я нашел способ сделать это, хотя я не уверен, что это решение, но оно работает для меня. Функция монитора в основном подключается непосредственно к брокеру и прислушивается к различным типам событий. Мой код выглядит следующим образом:
from celery.events import EventReceiver
from kombu import Connection as BrokerConnection
def my_monitor:
connection = BrokerConnection('amqp://guest:[email protected]:5672//')
def on_event(event):
print "EVENT HAPPENED: ", event
def on_task_failed(event):
exception = event['exception']
print "TASK FAILED!", event, " EXCEPTION: ", exception
while True:
try:
with connection as conn:
recv = EventReceiver(conn,
handlers={'task-failed' : on_task_failed,
'task-succeeded' : on_event,
'task-sent' : on_event,
'task-received' : on_event,
'task-revoked' : on_event,
'task-started' : on_event,
# OR: '*' : on_event
})
recv.capture(limit=None, timeout=None)
except (KeyboardInterrupt, SystemExit):
print "EXCEPTION KEYBOARD INTERRUPT"
sys.exit()
Это все. И я запускаю это в другом процессе, чем обычное приложение, а это значит, что я создаю дочерний процесс моего приложения celery, в котором работает только эта функция.
НТН
Ответ 2
Остерегайтесь нескольких ошибок:
- Вам нужно установить флаг
CELERY_SEND_EVENTS
как true в вашей конфигурации celery.
- Вы также можете установить монитор событий в новом потоке от своего рабочего.
Вот моя реализация:
class MonitorThread(object):
def __init__(self, celery_app, interval=1):
self.celery_app = celery_app
self.interval = interval
self.state = self.celery_app.events.State()
self.thread = threading.Thread(target=self.run, args=())
self.thread.daemon = True
self.thread.start()
def catchall(self, event):
if event['type'] != 'worker-heartbeat':
self.state.event(event)
# logic here
def run(self):
while True:
try:
with self.celery_app.connection() as connection:
recv = self.celery_app.events.Receiver(connection, handlers={
'*': self.catchall
})
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
# unable to capture
pass
time.sleep(self.interval)
if __name__ == '__main__':
app = get_celery_app() # returns app
MonitorThread(app)
app.start()