Как сохранить несколько независимых сельдерейских очередей?
Я пытаюсь сохранить несколько очередей сельдерея с разными задачами и работниками в той же самой базе данных redis. На самом деле просто удобная проблема только в том, что на моем компьютере есть только один сервер redis, а не два.
Я следил за учебниками по сельдеристу дословно, так как это единственный способ заставить его работать на меня. Теперь, когда я пытаюсь дублировать все с слегка измененными именами/очередями, он продолжает выходить из строя.
Примечание. Я новичок в Python и Celery, что, очевидно, является частью проблемы. Я не уверен, какие части называются "задача/задачи" как имя или специальные слова.
Моя сокращенная версия документов:
Запустите celery -A tasks worker
, чтобы вызвать рабочих.
tasks.py содержит код задачи с celery = Celery('tasks', broker='redis://localhost')
для подключения к Celery и @task()
над моими функциями, которые я хочу отложить.
В моей программе для задач очередей...
from tasks import do_work
do_work.delay()
Итак, учитывая все вышесказанное, какие шаги я должен предпринять, чтобы превратить это в два типа задач, которые выполняются независимо в отдельных очередях и рабочих? Например, blue_tasks и red_tasks?
Я попытался изменить все экземпляры задач на blue_tasks или red_tasks. Однако, когда я ставил очередь blue_tasks, работники red_tasks, с которых я начал, начинают пытаться работать с ними.
Я читал о очередях по умолчанию и тому подобное, поэтому я пробовал этот код, который не работал:
CELERY_DEFAULT_QUEUE = 'red'
CELERY_QUEUES = (
Queue('red', Exchange('red'), routing_key='red'),
)
В качестве побочного примечания я не понимаю, почему ошибки celery worker
устраняются с помощью сельдерея, пытающегося подключиться к экземпляру amqp по умолчанию, а celery -A tasks worker
указывает сельдерею подключиться к Redis. Какой код задачи celery worker
пытается запустить на рабочем месте, если ничего не указано?
Ответы
Ответ 1
По умолчанию все идет в очередь по умолчанию с именем celery
(и это то, что celery worker
будет обрабатывать, если очередь не указана)
Итак, скажите, что у вас есть функция задачи do_work
в django_project_root/myapp/tasks.py
.
Вы можете настроить задачу do_work
для работы в собственной очереди следующим образом:
CELERY_ROUTES = {
'myproject.tasks.do_work': {'queue': 'red'},
}
Затем запустите рабочего, используя celery worker -Q red
, и он будет обрабатывать вещи только в этой очереди (другой рабочий, вызываемый с помощью celery worker
, будет только записывать вещи в очереди по умолчанию)
Раздел маршрутизации задач в документации должен объяснить все.
Ответ 2
Чтобы динамически ссылаться на разные очереди, выполните следующие шаги:
1) Укажите имя очереди с атрибутом 'queue'
celery.send_task('job1', args=[], kwargs={}, queue='queue_name_1')
celery.send_task('job1', args=[], kwargs={}, queue='queue_name_2')
(Здесь конкретное задание использует две очереди)
2) Добавьте в конфигурационный файл следующую запись
CELERY_CREATE_MISSING_QUEUES = True
3) При запуске работника используйте -Q, чтобы указать имя очереди, из которого должны быть заняты задания
celery -A proj worker -l info -Q queue1
celery -A proj worker -l info -Q queue2