Автоматическая перезагрузка сельдерея при любых изменениях
Я мог бы автоматически перезагрузить сельдерей, когда есть изменения в модулях в CELERY_IMPORTS
в settings.py
.
Я пытался предоставить материнским модулям возможность обнаруживать изменения даже в дочерних модулях, но не обнаруживал изменений в дочерних модулях. Это заставляет меня понять, что обнаружение не производится рекурсивно сельдереем. Я просмотрел его в документации, но я не отвечал ни на один ответ на мою проблему.
Мне действительно мешает добавить все связанные части сельдерея моего проекта в CELERY_IMPORTS
, чтобы обнаружить изменения.
Есть ли способ сказать сельдерей, что "автоматически перезагружайтесь, когда есть какие-либо изменения в любом месте проекта".
Спасибо!
Ответы
Ответ 1
Вы можете вручную включить дополнительные модули с помощью -I|--include
. Объедините это с инструментами GNU, такими как find
и awk
, и вы сможете найти все файлы .py
и включить их.
$ celery -A app worker --autoreload --include=$(find . -name "*.py" -type f | awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=',' | sed 's/.$//')
Давайте объясним это:
find . -name "*.py" -type f
find
ищет рекурсивно для всех файлов, содержащих .py
. Результат выглядит примерно так:
./app.py
./some_package/foopy
./some_package/bar.py
Тогда:
awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=','
Эта строка выводит значение find
как входной сигнал и удаляет все вхождения ./
. Затем он заменяет все /
на .
. В последнем sub()
удаляется замена .py
пустой строкой. ORS
заменяет все символы новой строки на ,
. Эти выходы:
app,some_package.foo,some_package.bar,
Последняя команда sed
удаляет последний ,
.
Итак, выполняемая команда выглядит так:
$ celery -A app worker --autoreload --include=app,some_package.foo,some_package.bar
Если у вас есть virtualenv
внутри вашего источника, вы можете исключить его, добавив -path .path_to_your_env -prune -o
:
$ celery -A app worker --autoreload --include=$(find . -path .path_to_your_env -prune -o -name "*.py" -type f | awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=',' | sed 's/.$//')
Ответ 2
Сельдерей --autoreload
не работает и считается устаревшим.
Поскольку вы используете django, вы можете написать для этого команду управления. В Django есть утилита автозагрузки, которая используется сервером запуска для перезапуска сервера WSGI при изменении кода.
Та же функциональность может быть использована для перезагрузки сельдерея. Создайте отдельную команду управления под названием сельдерей. Напишите функцию, чтобы убить существующего работника и начать нового работника. Теперь подключите эту функцию для автоматической перезагрузки следующим образом.
import shlex
import subprocess
from django.core.management.base import BaseCommand
from django.utils import autoreload
def restart_celery():
cmd = 'pkill celery'
subprocess.call(shlex.split(cmd))
cmd = 'celery worker -l info -A foo'
subprocess.call(shlex.split(cmd))
class Command(BaseCommand):
def handle(self, *args, **options):
print('Starting celery worker with autoreload...')
# For Django>=2.2
autoreload.run_with_reloader(restart_celery)
# For django<2.1
# autoreload.main(restart_celery)
Теперь вы можете запустить celery worker с помощью python manage.py celery
который будет автоматически перезагружаться при изменении кодовой базы.
Это только для целей разработки и не использовать его в производстве. Код взят из моего другого ответа здесь.
Ответ 3
Решение OrangeTux не сработало для меня, поэтому я написал немного Python script для достижения более или менее того же. Он отслеживает изменения файлов с помощью inotify и запускает перезапуск сельдерея, если он обнаруживает IN_MODIFY
, IN_ATTRIB
или IN_DELETE
.
#!/usr/bin/env python
"""Runs a celery worker, and reloads on a file change. Run as ./run_celery [directory]. If
directory is not given, default to cwd."""
import os
import sys
import signal
import time
import multiprocessing
import subprocess
import threading
import inotify.adapters
CELERY_CMD = tuple("celery -A amcat.amcatcelery worker -l info -Q amcat".split())
CHANGE_EVENTS = ("IN_MODIFY", "IN_ATTRIB", "IN_DELETE")
WATCH_EXTENSIONS = (".py",)
def watch_tree(stop, path, event):
"""
@type stop: multiprocessing.Event
@type event: multiprocessing.Event
"""
path = os.path.abspath(path)
for e in inotify.adapters.InotifyTree(path).event_gen():
if stop.is_set():
break
if e is not None:
_, attrs, path, filename = e
if filename is None:
continue
if any(filename.endswith(ename) for ename in WATCH_EXTENSIONS):
continue
if any(ename in attrs for ename in CHANGE_EVENTS):
event.set()
class Watcher(threading.Thread):
def __init__(self, path):
super(Watcher, self).__init__()
self.celery = subprocess.Popen(CELERY_CMD)
self.stop_event_wtree = multiprocessing.Event()
self.event_triggered_wtree = multiprocessing.Event()
self.wtree = multiprocessing.Process(target=watch_tree, args=(self.stop_event_wtree, path, self.event_triggered_wtree))
self.wtree.start()
self.running = True
def run(self):
while self.running:
if self.event_triggered_wtree.is_set():
self.event_triggered_wtree.clear()
self.restart_celery()
time.sleep(1)
def join(self, timeout=None):
self.running = False
self.stop_event_wtree.set()
self.celery.terminate()
self.wtree.join()
self.celery.wait()
super(Watcher, self).join(timeout=timeout)
def restart_celery(self):
self.celery.terminate()
self.celery.wait()
self.celery = subprocess.Popen(CELERY_CMD)
if __name__ == '__main__':
watcher = Watcher(sys.argv[1] if len(sys.argv) > 1 else ".")
watcher.start()
signal.signal(signal.SIGINT, lambda signal, frame: watcher.join())
signal.pause()
Вероятно, вы должны изменить CELERY_CMD
или любые другие глобальные переменные.