Как отправить периодические задания в определенную очередь в Сельдерей
По умолчанию сельдерей отправляет все задачи в очередь "сельдерей", но вы можете изменить это поведение, добавив дополнительный параметр:
@task(queue='celery_periodic')
def recalc_last_hour():
log.debug('sending new task')
recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example
Настройки планировщика:
CELERYBEAT_SCHEDULE = {
'installer_recalc_hour': {
'task': 'stats.installer.tasks.recalc_last_hour',
'schedule': 15 # every 15 sec for test
},
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
Запустить сотрудника:
python manage.py celery worker -c 1 -Q celery_periodic -B -E
Эта схема работает не так, как ожидалось: эти рабочие отправляют периодические задачи в очередь "сельдерей", а не "celery_periodic". Как я могу это исправить?
P.S. сельдерея == 3.0.16
Ответы
Ответ 1
Я нашел решение этой проблемы:
1) Прежде всего я изменил способ настройки периодических задач. Я использовал @periodic_task декоратор:
@periodic_task(run_every=crontab(minute='5'),
queue='celery_periodic',
options={'queue': 'celery_periodic'})
def recalc_last_hour():
dt = datetime.utcnow()
prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) \
- timedelta(hours=1)
log.debug('Generating task for hour %s', str(prev_hour))
recalc_hour.delay(prev_hour)
2) Я написал celery_periodic дважды в параметрах @periodic_task:
-
queue = 'celery_periodic' используется, когда вы вызываете задачу из кода (.delay или .apply_async)
-
options = {'queue': 'celery_periodic'} опция используется, когда бит celery вызывает ее.
Я уверен, то же самое возможно, если вы будете настраивать периодические задачи с помощью переменной CELERYBEAT_SCHEDULE.
UPD. Это решение подходит как для баз данных, так и для файлов на основе CELERYBEAT_SCHEDULER.
Ответ 2
Периодичность отправляется в очереди с помощью celerybeat. Вы можете делать все, что мы делаем с помощью Celery api. Вот список конфигураций поставляется с celerybeat.
http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields
В вашем случае
CELERYBEAT_SCHEDULE = {
'installer_recalc_hour': {
'task': 'stats.installer.tasks.recalc_last_hour',
'schedule': 15 # every 15 sec for test,
'options': {'queue' : 'celery_periodic'} ##options are mapped to apply_async options
},
}
Ответ 3
И если вы используете планировщик базы данных djcelery, вы можете указать очередь в поле "Параметры выполнения → очередь"