Вызов сопрограммы в asyncio.Protocol.data_received
У меня возникла проблема с асинхронным элементом в обратном вызове asyncio.Protocol.data_received
нового модуля Python asyncio.
Рассмотрим следующий сервер:
class MathServer(asyncio.Protocol):
@asyncio.coroutine
def slow_sqrt(self, x):
yield from asyncio.sleep(1)
return math.sqrt(x)
def fast_sqrt(self, x):
return math.sqrt(x)
def connection_made(self, transport):
self.transport = transport
#@asyncio.coroutine
def data_received(self, data):
print('data received: {}'.format(data.decode()))
x = json.loads(data.decode())
#res = self.fast_sqrt(x)
res = yield from self.slow_sqrt(x)
self.transport.write(json.dumps(res).encode('utf8'))
self.transport.close()
используется со следующим клиентом:
class MathClient(asyncio.Protocol):
def connection_made(self, transport):
transport.write(json.dumps(2.).encode('utf8'))
def data_received(self, data):
print('data received: {}'.format(data.decode()))
def connection_lost(self, exc):
asyncio.get_event_loop().stop()
При вызове self.fast_sqrt
все работает так, как ожидалось.
С self.slow_sqrt
он не работает.
Он также не работает с self.fast_sqrt
и декоратором @asyncio.coroutine
на data_received
.
Я чувствую, что мне не хватает чего-то фундаментального здесь.
Полный код находится здесь:
Протестировано с помощью
- Python 3.4.0b1 (Windows)
- Python 3.3.3 + asyncio-0.2.1 (FreeBSD)
Вопрос о том же: с slow_sqrt
, клиент/сервер будет просто ничего не делать.
Ответы
Ответ 1
Кажется, это нужно развязать с помощью Future
- хотя я все еще не уверен, что это правильный путь.
class MathServer(asyncio.Protocol):
@asyncio.coroutine
def slow_sqrt(self, x):
yield from asyncio.sleep(2)
return math.sqrt(x)
def fast_sqrt(self, x):
return math.sqrt(x)
def consume(self):
while True:
self.waiter = asyncio.Future()
yield from self.waiter
while len(self.receive_queue):
data = self.receive_queue.popleft()
if self.transport:
try:
res = self.process(data)
if isinstance(res, asyncio.Future) or \
inspect.isgenerator(res):
res = yield from res
except Exception as e:
print(e)
def connection_made(self, transport):
self.transport = transport
self.receive_queue = deque()
asyncio.Task(self.consume())
def data_received(self, data):
self.receive_queue.append(data)
if not self.waiter.done():
self.waiter.set_result(None)
print("data_received {} {}".format(len(data), len(self.receive_queue)))
def process(self, data):
x = json.loads(data.decode())
#res = self.fast_sqrt(x)
res = yield from self.slow_sqrt(x)
self.transport.write(json.dumps(res).encode('utf8'))
#self.transport.close()
def connection_lost(self, exc):
self.transport = None
Вот ответ от Guido van Rossum:
Решение прост: напишите эту логику как отдельный метод, отмеченный с помощью @coroutine
и отпустите его в data_received()
, используя async()
(== Task()
, в этом случае). Причина, почему это не построено в протокол это то, что если бы это было, для этого потребовалось бы альтернативное событие для реализации сопрограмм.
def data_received(self, data):
asyncio.ensure_future(self.process_data(data))
@asyncio.coroutine
def process_data(self, data):
# ...stuff using yield from...
Полный код находится здесь:
- Client
- Server
Ответ 2
У меня была похожая проблема, когда я хотел запустить сопрограмму, когда был вызван мой MyProtocol.connection_made
. Мое решение было довольно похоже, за исключением того, что мой протокол имел доступ к циклу. Для тех, кто использует более позднюю версию python, у меня работает следующее (я использую python 3.6.8):
class MyProtocol(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
async def do_async_thing(self):
await asyncio.sleep(1)
def connection_made(self, transport):
self.transport = transport
self.loop.create_task(self.do_async_thing())
# Other member functions left out for brevity.
Имеет смысл, что это работает - цикл должен планировать задачу, которая должна иметь независимый контекст, то есть может выполняться независимо от любого другого стека вызовов. Вот почему вы даете циклу сопрограмму, которую он может запустить, do_async_thing()
вместе с экземпляром класса в этом случае, который он будет вызывать, когда сможет. Когда он вызывается, больше не имеет ничего общего с функцией-членом connection_made
.
Стоит отметить, что этого также можно достичь, используя asyncio.ensure_future(coro, loop=None)
вместо self.loop.create_task(coro)
, но последний предположительно будет использовать цикл по умолчанию. На самом деле, это так - я только что проверил исходный код.