Asyncio + aiohttp - redis Паб /Sub и websocket чтение/запись в одном обработчике
Сейчас я играю с aiohttp, чтобы узнать, как он будет работать в качестве серверного приложения для мобильного приложения с подключением к сети.
Вот простой пример "Hello world" (как смысл здесь):
import asyncio
import aiohttp
from aiohttp import web
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
print('Connection opened')
try:
while True:
msg = yield from ws.receive()
ws.send_str(msg.data + '/answer')
except:
pass
finally:
print('Connection closed')
return ws
if __name__ == "__main__":
app = aiohttp.web.Application()
app.router.add_route('GET', '/ws', WebsocketEchoHandler())
loop = asyncio.get_event_loop()
handler = app.make_handler()
f = loop.create_server(
handler,
'127.0.0.1',
8080,
)
srv = loop.run_until_complete(f)
print("Server started at {sock[0]}:{sock[1]}".format(
sock=srv.sockets[0].getsockname()
))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()
Проблема
Теперь я хотел бы использовать структуру, описанную ниже (node server = python aiohttp). Чтобы быть более конкретным, используйте механизм Redis Pub/Sub с asyncio-redis читать и записывать как подключение к websocket, так и Redis в моем WebsocketEchoHandler.
WebsocketEchoHandler - это мертвый простой цикл, поэтому я не уверен, как это сделать. Используя Tornado и brükva Я бы просто использовал обратные вызовы.
![http://goldfirestudios.com/blog/136/Horizontally-Scaling- Node. JS-и-WebSockets-с-Redis]()
Дополнительный (внебиржевой вопрос)
Поскольку я уже использую Redis, какой из двух подходов я должен взять:
- Как в "классическом" веб-приложении, у вас есть контроллер/просмотр для всего, используйте Redis только для обмена сообщениями и т.д.
- Веб-приложение должно быть просто слоем между клиентом и Redis, используемым также как очередь задач (простейшая Python RQ). Каждый запрос должен быть делегирован работникам.
ИЗМЕНИТЬ
Изображение из http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis
EDIT 2
Кажется, что мне нужно уточнить.
- Обработчик только для Websocket показан выше
-
Обработчик Redis Pub/Sub может выглядеть так:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
while True:
msg = yield from subscriber.next_published()
ws.send_str(msg.value + '/answer')
except:
pass
finally:
print('Connection closed')
return ws
Этот обработчик только подписывается на канал Redis ch1 и ch2 и отправляет каждое полученное сообщение из этих каналов в websocket.
-
Я хочу иметь этот обработчик:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
while True:
# If message recived from redis OR from websocket
msg_ws = yield from ws.receive()
msg_redis = yield from subscriber.next_published()
if msg_ws:
# push to redis / do something else
self.on_msg_from_ws(msg_ws)
if msg_redis:
self.on_msg_from_redis(msg_redis)
except:
pass
finally:
print('Connection closed')
return ws
Но следующий код всегда называется последовательным, поэтому чтение из блоков websocket, читаемых из Redis:
msg_ws = yield from ws.receive()
msg_redis = yield from subscriber.next_published()
Я хочу, чтобы чтение выполнялось на событии, где событие является сообщением, полученным от одного из двух источников.
Ответы
Ответ 1
Вы должны использовать две петли while
- одну, которая обрабатывает сообщения из websocket, и одну, которая обрабатывает сообщения из redis. Ваш основной обработчик может просто запускать две сопрограммы, одну обработку каждого цикла, а затем ждать их обоих:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
# Kick off both coroutines in parallel, and then block
# until both are completed.
yield from asyncio.gather(self.handle_ws(ws), self.handle_redis(subscriber))
except Exception as e: # Don't do except: pass
import traceback
traceback.print_exc()
finally:
print('Connection closed')
return ws
@asyncio.coroutine
def handle_ws(self, ws):
while True:
msg_ws = yield from ws.receive()
if msg_ws:
self.on_msg_from_ws(msg_ws)
@asyncio.coroutine
def handle_redis(self, subscriber):
while True:
msg_redis = yield from subscriber.next_published()
if msg_redis:
self.on_msg_from_redis(msg_redis)
Таким образом, вы можете читать любой из двух потенциальных источников, не заботясь о другом.