Правильная настройка сельдерея django redis и сельдерея
Я пытаюсь настроить django + celery + redis + celery_beats, но это дает мне проблемы. Документация довольно проста, но когда я запускаю сервер django, redis, сельдерей и сельдерей, ничто не печатается или не регистрируется (вся моя тестовая задача делает свой журнал что-то).
Это моя структура папок:
- aenima
- aenima
- __init__.py
- celery.py
- criptoball
- tasks.py
celery.py выглядит так:
from __future__ import absolute_import, unicode_literals
import os
from django.conf import settings
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aenima.settings')
app = Celery("criptoball")
app.conf.broker_url = 'redis://localhost:6379/0'
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a 'CELERY_' prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.timezone = 'UTC'
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
app.conf.beat_schedule = {
'test-every-30-seconds': {
'task': 'tasks.test_celery',
'schedule': 30.0,
'args': (16, 16)
}, }
и tasks.py выглядит так:
from __future__ import absolute_import, unicode_literals
from datetime import datetime, timedelta
from celery import shared_task
import logging
from django_celery_beat.models import PeriodicTask, IntervalSchedule
cada_10_seg = IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
test_celery_periodic = PeriodicTask.objects.create(interval=cada_10_seg, name='test_celery', task='criptoball.tasks.test_celery',
expires=datetime.utcnow()+timedelta(seconds=30))
@shared_task
def test_celery(x, y):
logger = logging.getLogger("AENIMA")
print("EUREKA")
logger.debug("EUREKA")
Это django_celery_beat docs
Не уверен, что мне не хватает. Когда я бегу
сельдерей -A aenima beat -l debug --scheduler django_celery_beat.schedulers: DatabaseScheduler
сельдерей -A рабочий aenima -l отладка
redis-cli ping PONG
django runningerver и redis server, я ничего не печатаю.
settings.py
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_IMPORTS = ('criptoball.tasks',)
До сих пор не найдено никакого официального ответа на эту тему.
Я хотел бы решить все это, эта ошибка может быть только одним из многих. Большое спасибо за вашу помощь!
Редактировать:
Добавлены настройки для redis, объявлены задачи по-разному и увеличен уровень отладки. Теперь ошибка:
Получена незарегистрированная задача типа u'tasks.test_celery '. Сообщение было проигнорировано и отброшено.
Помните ли вы импортировать модуль, содержащий эту задачу? Или, может быть, вы используете относительный импорт? KeyError: u'aenima.criptoball.tasks.test_celery '
Я считаю, что документация на сельдерей плохая.
EDIT 2 После того, как все было проработано, оно сработало, когда я поставил все задачи внутри одного файла celery.py. @shared_task не работает, пришлось использовать @app.task.
Ответы
Ответ 1
Раньше у меня были эти проблемы. Это не ваш код. Обычно это проблема с окружающей средой. Вы должны запустить все под virtualenv
, добавив файл requirements.txt
с конкретными версиями пакетов.
Существует проблема с информацией о celery 4.x
и django 1.x
, поэтому вы должны рассмотреть пакеты, которые используете.
В этом руководстве подробно объясняется, как построить virtualenv
с сельдереем.
Если вы скажете мне свои версии пакетов, я могу попробовать и помочь по-другому.
Редактировать:
Я думаю, что это что-то о том, как вы управляете сельдереем. Если мы исправим первую проблему, попробуйте сыграть с этим:
celery -A aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler
или же
celery -A aenima.aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler
Последняя ошибка, которую вы получаете, связана с обнаружением вашего модуля. Сначала попробуйте.
Ответ 2
Использование virtualenv
для этого было бы удобно.
Сначала, как @Gal сказал, что вам нужно убедиться, что у вас есть celery 4.x
Вы можете установить это, выполнив это через pip
:
пипетка сельдерея
Конечно, вы также можете установить версию 4.x
добавив ее в свой файл requirements.txt
следующим образом:
сельдерей == 4.1.0
Или более высокие версии, если они доступны в будущем.
Затем вы можете переустановить все свои пакеты, используя:
-
pip install -r requirements.txt
Который гарантирует, что у вас есть определенный пакет сельдерея.
Теперь часть сельдерея, хотя ваш код может и не ошибаться, но я напишу так, как я получил приложение Celery для работы.
__init __.py:
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery_conf import app as celery_app
__all__ = ['celery_app']
celery_conf.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from datetime import timedelta
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<PATH.TO.YOUR.SETTINGS>')
app = Celery('tasks')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a 'CELERY_' prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
# Set a beat schedule to update every hour.
app.conf.beat_schedule = {
'update-every-hour': {
'task': 'tasks.update',
'schedule': timedelta(minutes=60),
'args': (16, 16),
},
}
# The default task that Celery runs.
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
tasks.py:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import requests
from django.conf import settings
from django.http import HttpResponse
from celery.task import Task
from celery.five import python_2_unicode_compatible
from celery import Celery
app = Celery()
@python_2_unicode_compatible
class Update(Task):
name = 'tasks.update'
def run(self, *args, **kwargs):
# Run the task you want to do.
""" For me the regular TaskRegistry didn't work to register classes,
so I found this handy TaskRegistry demo and made use of it to
register tasks as classes."""
class TaskRegistry(Task):
def NotRegistered_str(self):
self.assertTrue(repr(TaskRegistry.NotRegistered('tasks.add')))
def assertRegisterUnregisterCls(self, r, task):
with self.assertRaises(r.NotRegistered):
r.unregister(task)
r.register(task)
self.assertIn(task.name, r)
def assertRegisterUnregisterFunc(self, r, task, task_name):
with self.assertRaises(r.NotRegistered):
r.unregister(task_name)
r.register(task, task_name)
self.assertIn(task_name, r)
def task_registry(self):
r = TaskRegistry()
self.assertIsInstance(r, dict, 'TaskRegistry is mapping')
self.assertRegisterUnregisterCls(r, Update)
r.register(Update)
r.unregister(Update.name)
self.assertNotIn(Update, r)
r.register(Update)
tasks = dict(r)
self.assertIsInstance(
tasks.get(Update.name), Update)
self.assertIsInstance(
r[Update.name], Update)
r.unregister(Update)
self.assertNotIn(Update.name, r)
self.assertTrue(Update().run())
def compat(self):
r = TaskRegistry()
r.regular()
r.periodic()
Как я уже объяснял в коде, регулярная taskregistry
не работала, что было встроено в Celery 4.x, поэтому я использовал демо-задачу. Конечно, вы также не можете использовать классы для выполнения задач, но я предпочитаю использовать класс.
settings.py:
# Broker settings for redis
CELERY_BROKER_HOST = '<YOUR_HOST>'
CELERY_BROKER_PORT = 6379
CELERY_BROKER_URL = 'redis://'
CELERY_DEFAULT_QUEUE = 'default'
# Celery routes
CELERY_IMPORTS = (
'PATH.TO.tasks' # The path to your tasks.py
)
CELERY_DATABASE_URL = {
'default': '<CELERY_DATABASE>', # You can also use your already being used database here
}
INSTALLED_APPS = [
...
'PATH.TO.TASKS' # But exclude the tasks.py from this path
]
LOGGING = {
...
'loggers': {
'celery': {
'level': 'DEBUG',
'handlers': ['console'],
'propagate': True,
},
}
}
Я запускаю своего работника со следующими командами:
redis-server --daemonize да
celery multi start worker -A PATH.TO.TASKS -l info --beat # Но исключить task.py из пути
Надеюсь, эта информация поможет вам или кому-либо, кто борется с сельдереем.
РЕДАКТИРОВАТЬ:
Обратите внимание, что я запускаю рабочего как демона, поэтому вы не сможете увидеть журналы в консоли. Для меня он зарегистрирован в .txt
файле.
Плюс обратите внимание также на пути к использованию, например, для некоторых из них вам необходимо включить путь к вашему приложению:
project.apps.app
А для других случаев вам нужно включить task.py без .py
а также я записал, когда следует исключить этот файл и когда этого не делать.
EDIT 2:
Декодер @shared_task возвращает прокси-сервер, который всегда использует экземпляр задачи в current_app. Это делает декоратор @shared_task полезным для библиотек и приложений многократного использования, поскольку у них не будет доступа к приложению пользователя.
Обратите внимание, что @shared_task
не имеет доступа к приложению пользователя. Приложение, которое вы сейчас пытаетесь зарегистрировать, не имеет доступа к вашему приложению. Метод, который вы на самом деле хотите использовать для регистрации задачи:
from celery import Celery
app = Celery()
@app.task
def test_celery(x, y):
logger = logging.getLogger("AENIMA")
print("EUREKA")
logger.debug("EUREKA")
Ответ 3
Получена незарегистрированная задача типа u'tasks.test_celery '. Сообщение было проигнорировано и отброшено.
Помните ли вы импортировать модуль, содержащий эту задачу? Или, может быть, вы используете относительный импорт?
Возможно, неправильный путь к вашей задаче:
app.conf.beat_schedule = {
'test-every-30-seconds': {
'task': 'criptoball.tasks.test_celery',
'schedule': 30.0,
'args': (16, 16)
},
}
tasks.test_celery
должен быть полным путем: criptoball.tasks.test_celery
Ответ 4
Есть одна вещь, которую вы должны исправить: используйте:
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
чтобы рассказать Celery, какие задачи приложений вы хотите, чтобы узнать, используете ли вы Celery 3.x.