Запуск фоновой задачи async в Tornado
Читая документацию Tornado, очень ясно, как вызвать функцию async для ответа:
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")
Не хватает того, как вызов должен выполняться асинхронно для фоновой задачи, которая не имеет отношения к текущему запросу:
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def _background_task():
pass # do lots of background stuff
@gen.coroutine
def get(self):
_dont_care = yield self._background_task()
self.render("template.html")
Этот код должен работать, за исключением того, что он выполняется синхронно, и запрос ожидает его до его завершения.
Каков правильный способ асинхронного вызова этой задачи, сразу же возвращая текущий запрос?
Ответы
Ответ 1
Обновление: Начиная с Tornado 4.0 (июль 2014 г.), нижеприведенная функциональность доступна в методе IOLoop.spawn_callback.
К сожалению, это довольно сложно. Вам нужно как отсоединить фоновую задачу от текущего запроса (чтобы сбой в фоновой задаче не привел к случайному исключению, выданному в запросе), так и убедиться, что что-то прослушивает результат фоновой задачи (для регистрации ошибок). если ничего другого). Это означает что-то вроде этого:
from tornado.ioloop import IOLoop
from tornado.stack_context import run_in_stack_context, NullContext
IOLoop.current().add_future(run_in_stack_context(NullContext(), self._background_task),
lambda f: f.result())
Нечто подобное может быть добавлено к самому торнадо в будущем.
Ответ 2
Я рекомендую использовать toro. Он обеспечивает относительно простой механизм настройки фоновой очереди задач.
Следующий код (например, в queue.py) запускает простой "worker()", который просто ждет, пока в его очереди не будет чего-то. Если вы вызываете queue.add(function,async,*args,**kwargs)
, это добавляет элемент в очередь, который будет пробуждать рабочий(), который затем запускает задачу.
Я добавил параметр async, чтобы он мог поддерживать фоновые задачи, завернутые в @gen.coroutine, и те, у кого нет.
import toro,tornado.gen
queue = toro.Queue()
@tornado.gen.coroutine
def add(function,async,*args,**kwargs):
item = dict(function=function,async=async,args=args,kwargs=kwargs)
yield queue.put(item)
@tornado.gen.coroutine
def worker():
while True:
print("worker() sleeping until I get next item")
item = yield queue.get()
print("worker() waking up to process: %s" % item)
try:
if item['async']:
yield item['function'](*item['args'],**item['kwargs'])
else:
item['function'](*item['args'],**item['kwargs'])
except Exception as e:
print("worker() failed to run item: %s, received exception:\n%s" % (item,e))
@tornado.gen.coroutine
def start():
yield worker()
В главном приложении торнадо:
import queue
queue.start()
И теперь вы можете просто запланировать заднюю задачу:
def my_func(arg1,somekwarg=None):
print("in my_func() with %s %s" % (arg1,somekwarg))
queue.add(my_func,False,somearg,somekwarg=someval)
Ответ 3
У меня есть трудоемкая задача в почтовом запросе, возможно, больше 30 минут, но клиент должен немедленно вернуть результат.
Во-первых, я использовал IOLoop.current(). spawn_callback. Оно работает! но! Если первая задача запроса запущена, вторая задача запроса заблокирована! Поскольку все задачи находятся в главном цикле событий при использовании spawn_callback, поэтому одна задача является синхронным исполнением, другие задачи блокированы.
Наконец, я использую tornado.concurrent. Пример:
import datetime
import time
from tornado.ioloop import IOLoop
import tornado.web
from tornado import concurrent
executor = concurrent.futures.ThreadPoolExecutor(8)
class Handler(tornado.web.RequestHandler):
def get(self):
def task(arg):
for i in range(10):
time.sleep(1)
print(arg, i)
executor.submit(task, datetime.datetime.now())
self.write('request accepted')
def make_app():
return tornado.web.Application([
(r"/", Handler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8000, '0.0.0.0')
IOLoop.current().start()
и посетите http://127.0.0.1:8000, вы можете видеть, что он работает нормально:
2017-01-17 22:42:10.983632 0
2017-01-17 22:42:10.983632 1
2017-01-17 22:42:10.983632 2
2017-01-17 22:42:13.710145 0
2017-01-17 22:42:10.983632 3
2017-01-17 22:42:13.710145 1
2017-01-17 22:42:10.983632 4
2017-01-17 22:42:13.710145 2
2017-01-17 22:42:10.983632 5
2017-01-17 22:42:16.694966 0
2017-01-17 22:42:13.710145 3
2017-01-17 22:42:10.983632 6
2017-01-17 22:42:16.694966 1
2017-01-17 22:42:13.710145 4
2017-01-17 22:42:10.983632 7
2017-01-17 22:42:16.694966 2
2017-01-17 22:42:13.710145 5
2017-01-17 22:42:10.983632 8
2017-01-17 22:42:16.694966 3
2017-01-17 22:42:13.710145 6
2017-01-17 22:42:19.790646 0
2017-01-17 22:42:10.983632 9
2017-01-17 22:42:16.694966 4
2017-01-17 22:42:13.710145 7
2017-01-17 22:42:19.790646 1
2017-01-17 22:42:16.694966 5
2017-01-17 22:42:13.710145 8
2017-01-17 22:42:19.790646 2
2017-01-17 22:42:16.694966 6
2017-01-17 22:42:13.710145 9
2017-01-17 22:42:19.790646 3
2017-01-17 22:42:16.694966 7
2017-01-17 22:42:19.790646 4
2017-01-17 22:42:16.694966 8
2017-01-17 22:42:19.790646 5
2017-01-17 22:42:16.694966 9
2017-01-17 22:42:19.790646 6
2017-01-17 22:42:19.790646 7
2017-01-17 22:42:19.790646 8
2017-01-17 22:42:19.790646 9
Хотите помочь всем!
Ответ 4
Просто выполните:
self._background_task()
Строка _background_task
возвращает a Future
, которая не разрешена до завершения сопрограммы. Если вы не получаете Future
и вместо этого просто выполняете следующую строку сразу, то get()
не ждет завершения _background_task
.
Интересная деталь заключается в том, что до завершения _background_task
она поддерживает ссылку на self
. (Не забудьте добавить self
в качестве параметра, кстати.) Ваш RequestHandler не будет собираться мусором, пока не завершится _background_task
.