Как я могу использовать Tornado и Redis асинхронно?
Я пытаюсь найти, как я могу использовать Redis и Tornado асинхронно. Я нашел tornado-redis, но мне нужно больше, чем просто добавить yield
в код.
У меня есть следующий код:
import redis
import tornado.web
class WaiterHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
client = redis.StrictRedis(port=6279)
pubsub = client.pubsub()
pubsub.subscribe('test_channel')
for item in pubsub.listen():
if item['type'] == 'message':
print item['channel']
print item['data']
self.write(item['data'])
self.finish()
class GetHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello world")
application = tornado.web.Application([
(r"/", GetHandler),
(r"/wait", WaiterHandler),
])
if __name__ == '__main__':
application.listen(8888)
print 'running'
tornado.ioloop.IOLoop.instance().start()
Мне нужно получить доступ к URL-адресу /
и получить "Hello World", в то время как запрос находится в /wait
.
Как я могу это сделать?
Ответы
Ответ 1
Вы не должны использовать Redis pub/sub в основном потоке Tornado, так как он блокирует цикл ввода-вывода. Вы можете обрабатывать длинный опрос от веб-клиентов в основном потоке, но вы должны создать отдельный поток для прослушивания Redis. Затем вы можете использовать ioloop.add_callback()
и/или threading.Queue
для связи с основным потоком при получении сообщений.
Ответ 2
Вам необходимо использовать клиент Redis для Tornado IOLoop.
Есть несколько из них, toredis, brukva и т.д.
Здесь пример pubsub в toredis: https://github.com/mrjoes/toredis/blob/master/tests/test_handler.py
Ответ 3
Для Python >= 3.3 я бы посоветовал вам использовать aioredis.
Я не тестировал код ниже, но он должен быть примерно таким:
import redis
import tornado.web
from tornado.web import RequestHandler
import aioredis
import asyncio
from aioredis.pubsub import Receiver
class WaiterHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop)
ch = redis.channels['test_channel']
result = None
while await ch.wait_message():
item = await ch.get()
if item['type'] == 'message':
print item['channel']
print item['data']
result = item['data']
self.write(result)
self.finish()
class GetHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello world")
application = tornado.web.Application([
(r"/", GetHandler),
(r"/wait", WaiterHandler),
])
if __name__ == '__main__':
print 'running'
tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
server = tornado.httpserver.HTTPServer(application)
server.bind(8888)
# zero means creating as many processes as there are cores.
server.start(0)
tornado.ioloop.IOLoop.instance().start()
Ответ 4
Хорошо, так вот мой пример того, как я буду делать это с получением запросов.
Я добавил два основных компонента:
Первый - это простой поточный прослушиватель pubsub, который добавляет новые сообщения в локальный объект списка.
Я также добавил в список аксессоров списка, чтобы вы могли читать из потока слушателей, как если бы вы читали из обычного списка. Что касается вашего WebRequest
, вы просто читаете данные из локального объекта списка. Это немедленно возвращается и не блокирует текущий запрос от завершения или будущих запросов от принятия и обработки.
class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)
self.output = []
# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]
def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]
def __str__(self):
with self.lock:
return self.output.__str__()
# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])
def stop(self):
self._Thread__stop()
Второй класс ApplicationMixin. Это дополнительный объект, на который вы наследуете свой класс веб-запросов, чтобы добавить функциональность и атрибуты. В этом случае он проверяет, существует ли уже прослушиватель каналов для запрошенного канала, создается один, если ни один не найден, и возвращает дескриптор прослушивателя в WebRequest.
# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]
Класс WebRequest теперь рассматривает слушателя как статический список (имея в виду, что вам нужно дать строку self.write
)
class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?
Наконец, после создания приложения я добавил пустой словарь в качестве атрибута
# add a dictionary containing channels to your application
application.channels = {}
Как и некоторая очистка работающих потоков, после выхода из приложения
# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()
Полный код:
import threading
import redis
import tornado.web
class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)
self.output = []
# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]
def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]
def __str__(self):
with self.lock:
return self.output.__str__()
# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])
def stop(self):
self._Thread__stop()
# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]
class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?
class GetHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello world")
application = tornado.web.Application([
(r"/", GetHandler),
(r"/channel/(?P<channel>\S+)", ReadChannel),
])
# add a dictionary containing channels to your application
application.channels = {}
if __name__ == '__main__':
application.listen(8888)
print 'running'
try:
tornado.ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
pass
# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()