Как проверить, работает ли Celery/Supervisor с помощью Python

Как написать script в Python, который выводится, если сельдерей работает на машине (Ubuntu)?

Мой прецедент. У меня есть простой файл python с некоторыми задачами. Я не использую Django или Flask. Я использую диспетчер для запуска очереди задач. Например,

tasks.py

from celery import Celery, task
app = Celery('tasks')
@app.task()
def add_together(a, b):
    return a + b

Руководитель:

[program:celery_worker]
directory = /var/app/
command=celery -A tasks worker info

Все это работает, теперь я хочу, чтобы страница проверяла, запущен ли процесс сельдерея/супервизора. то есть что-то вроде этого, возможно, используя флажок, позволяющий мне размещать страницу, дающую статус 200, позволяющий мне загрузить баланс.

Например...

check_status.py

from flask import Flask

app = Flask(__name__)

@app.route('/')
def status_check():

    #check supervisor is running
    if supervisor:
         return render_template('up.html')
    else:
        return render_template('down.html')

if __name__ == '__main__':
    app.run()

Ответы

Ответ 1

Вы можете запустить команду celery status через код, импортировав пакет celery.bin.celery:

import celery
import celery.bin.base
import celery.bin.celery
import celery.platforms

app = celery.Celery('tasks', broker='redis://')

status = celery.bin.celery.CeleryCommand.commands['status']()
status.app = status.get_app()

def celery_is_up():
    try:
        status.run()
        return True
    except celery.bin.base.Error as e:
        if e.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise e

if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

Ответ 2

Как насчет использования подпроцесса, не уверен, что это хорошая идея:

>>> import subprocess
>>> output = subprocess.check_output('ps aux'.split())
>>> 'supervisord' in output
True

Ответ 3

вы можете разобрать состояние процесса из supervisorctl status вывода

import subprocess

def is_celery_worker_running():
    ctl_output = subprocess.check_output('supervisorctl status celery_worker'.split()).strip()
    if ctl_output == 'unix:///var/run/supervisor.sock no such file':
        # supervisord not running
        return False
    elif ctl_output == 'No such process celery_worker':
        return False
    else:
        state = ctl_output.split()[1]
        return state == 'RUNNING'

Ответ 4

Редкий веб-интерфейс пользователя поставляется с супервизором. Может быть, вы могли бы это использовать. Его можно включить в конфигурации супервизора. Ключ для поиска - [inet_http_server]

Вы даже можете посмотреть исходный код этой части, чтобы идеи могли реализовать свои собственные.

Ответ 5

Кажется, что эта строка в Ответ Rotten194:

status.app = status.get_app()

должен быть

status.app = status.get_app(app)

Ответ 6

Это не применимо для сельдерея, но для всех, кто оказался здесь, чтобы проверить, работает ли супервизор, проверьте, существует ли файл pidfile, определенный для супервизора в вашем файле конфигурации supervisord.conf. Если это так, он работает; если нет, это не так. Файл pidfile по умолчанию -/tmp/supervisord.pid, что я использовал ниже.

import os
import sys

if os.path.isfile("/tmp/supervisord.pid"):
    print "supervisord is running."
    sys.exit()

Ответ 7

Вдохновлен @vgel answer, используя Celery 4.3.0.

import celery
import celery.bin.base
import celery.bin.control
import celery.platforms

# Importing Celery app from my own application
from my_app.celery import app as celery_app


def celery_running():
    """Test Celery server is available

    Inspired by https://stackoverflow.com/a/33545849
    """
    status = celery.bin.control.status(celery_app)
    try:
        status.run()
        return True
    except celery.bin.base.Error as exc:
        if exc.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise


if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

Ответ 8

По моему опыту, я установил сообщение для отслеживания того, было ли оно полным или нет, чтобы очереди несли ответственность за повторные задачи.