Ответ 1
Каждый объект Task
имеет свойство .request
, которое содержит объект AsyncRequest
. Соответственно, следующая строка дает состояние задачи Task
:
task.AsyncResult(task.request.id).state
Как проверить, выполняется ли задача в сельдерее (в частности, я использую сельдерей-джанго)?
Я прочитал документацию, и я googled, но я не вижу вызова вроде:
my_example_task.state() == RUNNING
Мой вариант использования заключается в том, что у меня есть внешняя (java) служба для перекодирования. Когда я отправляю документ, подлежащий перекодировке, я хочу проверить, запущена ли работа, выполняющая эту службу, и, если нет, (перезапустить).
Я использую текущие стабильные версии - 2.4, я считаю.
Каждый объект Task
имеет свойство .request
, которое содержит объект AsyncRequest
. Соответственно, следующая строка дает состояние задачи Task
:
task.AsyncResult(task.request.id).state
Верните task_id (который указан из .delay()) и затем спросите экземпляр сельдерея о состоянии:
x = method.delay(1,2)
print x.task_id
При запросе получите новый AsyncResult, используя этот task_id:
from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Создание объекта AsyncResult
из идентификатора задачи - это рекомендуемый в FAQ способ получения статуса задачи, когда у вас есть только идентификатор задачи.
Тем не менее, начиная с Celery 3.x, существуют значительные предостережения, которые могут кусать людей, если они не обращают на них внимания. Это действительно зависит от конкретного сценария использования.
Чтобы Celery мог записывать, что задание выполняется, необходимо установить для task_track_started
значение True
. Вот простая задача, которая проверяет это:
@app.task(bind=True)
def test(self):
print self.AsyncResult(self.request.id).state
Когда task_track_started
равен False
, что является значением по умолчанию, показ состояния отображается как PENDING
, даже если задание запущено. Если вы установите task_track_started
на True
, то состояние будет STARTED
.
PENDING
означает "я не знаю".AsyncResult
с состоянием PENDING
не означает ничего, кроме того, что Celery не знает статуса задачи. Это может быть по ряду причин.
С одной стороны, AsyncResult
может быть создан с неверными идентификаторами задач. Такие "задачи" будут считаться ожидающими выполнения сельдереем:
>>> task.AsyncResult("invalid").status
'PENDING'
Хорошо, поэтому никто не собирается передавать явно недействительные идентификаторы в AsyncResult
. Справедливо, но это также дает эффект, что AsyncResult
также будет считать задачу, которая успешно выполнена, но что Celery забыл как PENDING
. Опять же, в некоторых сценариях использования это может быть проблема. Часть проблемы зависит от того, как настроен Celery для сохранения результатов задач, потому что это зависит от наличия "надгробий" в бэкенде результатов. ("Надгробия" - это термин, используемый в документации Celery для блоков данных, в которых записывается, как завершилось задание.) Использование AsyncResult
не будет работать вообще, если task_ignore_result
равно True
. Еще более неприятная проблема заключается в том, что Celery по умолчанию выбрасывает надгробия. Параметр result_expires
по умолчанию установлен на 24 часа. Поэтому, если вы запускаете задачу и записываете идентификатор в долговременное хранилище, и еще через 24 часа, вы создаете AsyncResult
с ним, статус будет PENDING
.
Все "реальные задачи" запускаются в состоянии PENDING
. Таким образом, получение PENDING
задания может означать, что задание было запрошено, но никогда не продвигалось дальше этого (по какой-либо причине). Или это может означать, что задание выполнено, но Сельдери забыл свое состояние.
AsyncResult
не будет работать для меня. Что еще я могу сделать?Я предпочитаю отслеживать цели, а не сами задачи. Я храню некоторую информацию о задачах, но она действительно вторична для отслеживания целей. Цели хранятся в хранилище независимо от сельдерея. Когда запрос должен выполнить вычисление, зависит от того, какая цель была достигнута, он проверяет, была ли цель уже достигнута, если да, то он использует эту кэшированную цель, в противном случае он запускает задачу, которая повлияет на цель, и отправляет клиент, который сделал HTTP-запрос ответом, который указывает, что он должен ждать результата.
Названия переменных и гиперссылки выше предназначены для Celery 4.x. В 3.x соответствующие переменные и гиперссылки: CELERY_TRACK_STARTED
, CELERY_IGNORE_RESULT
, CELERY_TASK_RESULT_EXPIRES
.
Вы также можете создавать пользовательские состояния и обновлять выполнение задания на выполнение задания значения. Этот пример из документов:
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states
Старый вопрос, но я недавно столкнулся с этой проблемой.
Если вы пытаетесь получить task_id, вы можете сделать это следующим образом:
import celery
from celery_app import add
from celery import uuid
task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)
Теперь вы точно знаете, что такое task_id, и теперь можете использовать его для получения AsyncResult:
# grab the AsyncResult
result = celery.result.AsyncResult(task_id)
# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf
# print the AsyncResult status
print result.status
SUCCESS
# print the result returned
print result.result
4
Просто используйте этот API из FAQ по сельдерею
result = app.AsyncResult(task_id)
Это отлично работает.
Try:
task.AsyncResult(task.request.id).state
это обеспечит статус Задачи Сельдерея. Если Задача Сельдерея уже находится в состоянии FAILURE, она выкинет исключение:
raised unexpected: KeyError('exc_type',)
для простых задач мы можем использовать http://flower.readthedocs.io/en/latest/screenshots.html и http://policystat.github.io/jobtastic/, чтобы выполнить мониторинг.
и для сложных задач, скажем, задача, которая имеет дело с большим количеством других модулей. Мы рекомендуем вручную записывать прогресс и сообщение на конкретном блоке задач.
Я нашел полезную информацию в
Руководство по работе с работниками проекта по сельдерей
В моем случае я проверяю, работает ли Сельдерей.
inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
state = 'FAILURE'
else:
state = str(task.state)
Вы можете играть с проверкой, чтобы удовлетворить ваши потребности.
Помимо вышесказанного Программный подход Использование статуса "Цветочная задача" можно легко увидеть.
Мониторинг в режиме реального времени с помощью Celery Events. Flower - это веб-инструмент для мониторинга и администрирования кластеров сельдерея.
Официальный документ: Flower - Инструмент мониторинга сельдерея
Установка:
$ pip install flower
Использование:
http://localhost:5555
vi my_celery_apps/app1.py
app = Celery(worker_name)
vi tasks/task1.py
from my_celery_apps.app1 import app
app.AsyncResult(taskid)
try:
if task.state.lower() != "success":
return
except:
""" do something """