Как очистить все задачи определенной очереди с помощью сельдерея в python?
Как очистить все запланированные и запущенные задачи определенного que с сельдереем в python? Вопросы кажутся довольно напряженными, но для добавления я не ищу код командной строки
У меня есть следующая строка, которая определяет que и хочет очистить этот que для управления задачами:
CELERY_ROUTES = {"socialreport.tasks.twitter_save": {"queue": "twitter_save"}}
В 1 момент времени я хочу очистить все задачи в que twitter_save с помощью кода python, может быть, с функцией широковещания? Я не мог найти документацию об этом. Возможно ли это?
Ответы
Ответ 1
просто обновить ответ @Sam Stoelinga для сельдерея 3.1, теперь это можно сделать так на терминале:
celery amqp queue.purge <QUEUE_NAME>
Для Django обязательно запустите его из файла manage.py:
./manage.py celery amqp queue.purge <QUEUE_NAME>
Если нет, убедитесь, что сельдерей способен правильно указать брокеру, установив флаг --broker=
.
Ответ 2
Lol это довольно легко, надеюсь, что кто-то может мне помочь, хотя.
from celery.bin.camqadm import camqadm
camqadm('queue.purge', queue_name_as_string)
Единственная проблема с этим мне все еще нужно остановить celeryd перед очисткой que, после очистки я должен снова запустить celeryd для обработки задач для очереди. Обновит этот вопрос, если мне удастся.
Мне это удалось, но, пожалуйста, исправьте меня, если это не лучший способ остановить celeryd, очистить que и запустить его снова. Я знаю, что использую термин, потому что на самом деле хочу, чтобы это прекратило задачу.
kill_command = "ps auxww | grep 'celeryd -n twitter_save' | awk '{print $2}' | xargs kill -9"
subprocess.call(kill_command, shell=True)
camqadm('queue.purge', 'twitter_save')
rerun_command = "/home/samos/Software/virt_env/twittersyncv1/bin/python %s/manage.py celeryd -n twitter_save -l info -Q twitter_save" % settings.PROJECT_ROOT
os.popen(rerun_command+' &')
send_task("socialreport.tasks.twitter_save")
Ответ 3
Оригинальный ответ не работает для Celery 3.1. Обновление Hassek - это правильная команда, если вы хотите сделать это из командной строки. Но если вы хотите сделать это программно, сделайте следующее:
Предполагая, что вы использовали приложение Celery как:
celery_app = Celery(...)
Тогда:
import celery.bin.amqp
amqp = celery.bin.amqp.amqp(app = celery_app)
amqp.run('queue.purge', 'name_of_your_queue')
Это удобно для случаев, когда вы ставили в очередь множество задач, и одна задача сталкивается с фатальным условием, которое, как вы знаете, предотвратит выполнение остальных задач.
например. вы ставите в очередь множество задач веб-искателя, а в середине задач ваш IP-адрес сервера блокируется. Нет смысла выполнять остальные задачи. Таким образом, в этом случае ваша задача сама может очистить собственную очередь.