Отчетность дала результаты многолетней задачи Сельдерея
Проблема
Я сегментировал долговременную задачу в логических подзадачах, поэтому я могу сообщать результаты каждой подзадачи по мере ее завершения. Тем не менее, я пытаюсь сообщить о результатах задачи, которая никогда не будет успешно завершена (вместо этого она даст значения, как она есть), и я изо всех сил стараюсь сделать это с помощью моего существующего решения.
Фон
Я создаю веб-интерфейс для некоторых программ Python, которые я написал. Пользователи могут отправлять задания через веб-формы, а затем проверять их работу.
Скажем, у меня есть две функции, каждая из которых доступна через отдельные формы:
-
med_func
: Выполняется ~ 1 минута, результаты передаются на render()
, который создает дополнительные данные.
-
long_func
: возвращает генератор. Каждый yield
занимает порядка 30 минут и должен сообщаться пользователю. Есть так много уроков, мы можем считать этот итератор бесконечным (завершение только тогда, когда отменено).
Код, текущая реализация
С med_func
я сообщаю результаты следующим образом:
При отправке формы я сохраняю AsyncResult
в сеансе Django:
task_result = med_func.apply_async([form], link=render.s())
request.session["task_result"] = task_result
Представление Django для страницы результатов обращается к этому AsyncResult
. Когда задача завершена, результаты сохраняются в объект, который передается как контекст шаблону Django.
def results(request):
""" Serve (possibly incomplete) results of a session latest run. """
session = request.session
try: # Load most recent task
task_result = session["task_result"]
except KeyError: # Already cleared, or doesn't exist
if "results" not in session:
session["status"] = "No job submitted"
else: # Extract data from Asynchronous Tasks
session["status"] = task_result.status
if task_result.ready():
session["results"] = task_result.get()
render_task = task_result.children[0]
# Decorate with rendering results
session["render_status"] = render_task.status
if render_task.ready():
session["results"].render_output = render_task.get()
del(request.session["task_result"]) # Don't need any more
return render_to_response('results.html', request.session)
Это решение работает только тогда, когда функция фактически завершается. Я не могу объединить логические подзадачи long_func
, потому что есть неизвестное число yield
(каждая итерация цикла long_func
может не дать результата).
Вопрос
Есть ли разумный способ доступа к предоставленным объектам из чрезвычайно долговременной задачи Сельдерея, чтобы они могли отображаться до истощения генератора?
Ответы
Ответ 1
Чтобы сельдерей знал, что такое текущее состояние задачи, он устанавливает некоторые метаданные в любом исходном бэкэнде. Вы можете копировать, чтобы хранить другие виды метаданных.
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.backend.mark_as_started(
report_progress.request.id,
progress=progress)
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
Я бы не выбрал тонну данных, но он хорошо работает для отслеживания прогресса долговременной задачи.
Ответ 2
Пол ответ велик. В качестве альтернативы использованию mark_as_started
вы можете использовать метод Task
update_state
. Они в конечном счете делают то же самое, но имя "update_state" немного более подходит для того, что вы пытаетесь сделать. Вы можете опционально определить настраиваемое состояние, которое указывает, что ваша задача выполняется (я назвал свое пользовательское состояние "PROGRESS" ):
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.update_state(state='PROGRESS', meta={'progress': progress})
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
Ответ 3
Часть сельдерея:
def long_func(*args, **kwargs):
i = 0
while True:
yield i
do_something_here(*args, **kwargs)
i += 1
@task()
def test_yield_task(task_id=None, **kwargs):
the_progress = 0
for the_progress in long_func(**kwargs):
cache.set('celery-task-%s' % task_id, the_progress)
Сторона Webclient, начальная задача:
r = test_yield_task.apply_async()
request.session['task_id'] = r.task_id
Тестирование последнего полученного значения:
v = cache.get('celery-task-%s' % session.get('task_id'))
if v:
do_someting()
Если вам не нравится использовать кеш, или это невозможно, вы можете использовать db, файл или любое другое место, в котором рабочий и сельдерей будет иметь оба доступа. С кешем это простейшее решение, но рабочие и серверы должны использовать один и тот же кеш.
Ответ 4
Пара вариантов:
1 - группы задач. Если вы можете перечислить все вспомогательные задачи с момента вызова, вы можете применить группу в целом, которая возвращает объект TaskSetResult, который вы можете использовать для мониторинга результатов группы в целом или отдельных задач в группе - запросите это как необходимо, когда вам нужно проверить статус.
2 - обратные вызовы. Если вы не можете перечислить все подзадачи (или даже если можете!), Вы можете определить веб-крючок/обратный вызов, который последний шаг в задаче вызвал, когда остальная часть задачи будет завершена. Крючок будет против URI в вашем приложении, которое поглощает результат и делает его доступным через DB или API-интерфейс приложения.
Некоторые из них могут решить вашу проблему.
Ответ 5
См. также этот отличный PyCon preso от одного из инженеров Instagram.
http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html
На отметке 16:00 видео он обсуждает, как они структурируют длинные списки подзадач.
Ответ 6
Лично я хотел бы видеть время начала, продолжительность, прогресс (количество предоставленных элементов), время остановки (или ETA), статус и любую другую полезную информацию. Было бы неплохо, если бы он выглядел похожим на соответствующий дисплей, возможно, как ps
на Linux. Это, в конце концов, статус процесса.
Вы можете включить некоторые опции для приостановки или уничтожения задачи и/или "открыть" ее и отобразить подробную информацию о дочерних элементах или результатах.