Интерактивные взломы Tornado celery
Поскольку никто не предоставлял решение этого сообщения, а также тот факт, что мне отчаянно нужно обходное решение, вот моя ситуация и некоторые абстрактные решения/идеи для обсуждения.
Мой стек:
- Торнадо
- Сельдерей
- MongoDB
- Redis
- RabbitMQ
Моя проблема: найдите способ для Торнадо, чтобы отправить задачу сельдерея (решена), а затем асинхронно собрать результат (любые идеи?).
Сценарий 1: (запрос/ответ взлома плюс веб-хост)
- Tornado получает (пользовательский) запрос, затем сохраняет в локальной памяти (или в Redis) запрос {jobID: (user)}, чтобы помнить, где распространять ответ, и запускает задачу celery с идентификатором job
- Когда сельдерей завершает задачу, он выполняет webhook на каком-то url и сообщает торнадо, что этот идентификатор job закончил (плюс результаты)
- Tornado получает запрос (пользователь) и перенаправляет ответ на (пользователь)
Может ли это случиться? Есть ли у него логика?
Сценарий 2: (торнадо плюс длинный опрос)
- Tornado отправляет задачу сельдерея и возвращает некоторые первичные данные json клиенту (jQuery)
- jQuery делает некоторый длинный опрос при получении первичного json, скажем, каждые x микросекунд, а торнадо отвечает в соответствии с некоторым флагом базы данных. Когда задача celery завершается, этот флаг базы данных имеет значение True, затем завершается "loop" jQuery.
Является ли это эффективным?
Любые другие идеи/схемы?
Ответы
Ответ 1
Я наткнулся на этот вопрос, и поражение результатов backend неоднократно выглядело не оптимальным для меня. Поэтому я реализовал Mixin, аналогичный вашему сценарию 1, используя Unix Sockets.
Он уведомляет Tornado, как только задача заканчивается (чтобы быть точным, как только будет выполняться следующая задача в цепочке), и только один раз обратится к результатам. Вот ссылка .
Ответ 2
Мое решение включает опрос от торнадо до сельдерея:
class CeleryHandler(tornado.web.RequestHandlerr):
@tornado.web.asynchronous
def get(self):
task = yourCeleryTask.delay(**kwargs)
def check_celery_task():
if task.ready():
self.write({'success':True} )
self.set_header("Content-Type", "application/json")
self.finish()
else:
tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)
tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)
Вот post об этом.
Ответ 3
Вот наше решение проблемы. Поскольку мы ищем результат в нескольких обработчиках в нашем приложении, мы сделали поиск сельдерея классом mixin.
Это также делает код более читаемым с помощью шаблона tornado.gen.
from functools import partial
class CeleryResultMixin(object):
"""
Adds a callback function which could wait for the result asynchronously
"""
def wait_for_result(self, task, callback):
if task.ready():
callback(task.result)
else:
# TODO: Is this going to be too demanding on the result backend ?
# Probably there should be a timeout before each add_callback
tornado.ioloop.IOLoop.instance().add_callback(
partial(self.wait_for_result, task, callback)
)
class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
"""Execute a task asynchronously over a celery worker.
Wait for the result without blocking
When the result is available send it back
"""
@tornado.web.asynchronous
@tornado.web.authenticated
@tornado.gen.engine
def post(self):
"""Test the provided Magento connection
"""
task = expensive_task.delay(
self.get_argument('somearg'),
)
result = yield tornado.gen.Task(self.wait_for_result, task)
self.write({
'success': True,
'result': result.some_value
})
self.finish()
Ответ 4
Теперь https://github.com/mher/tornado-celery приходит на помощь...
class GenAsyncHandler(web.RequestHandler):
@asynchronous
@gen.coroutine
def get(self):
response = yield gen.Task(tasks.sleep.apply_async, args=[3])
self.write(str(response.result))
self.finish()