Как я могу обернуть синхронную функцию в async coroutine?
Я использую aiohttp для создания сервера API, который отправляет запросы TCP на отдельный сервер. Модуль, который отправляет запросы TCP, является синхронным и является черным ящиком для моих целей. Поэтому моя проблема заключается в том, что эти запросы блокируют весь API. Мне нужен способ обернуть запросы модуля в асинхронной сопрограмме, которая не будет блокировать остальную часть API.
Итак, просто используя sleep
в качестве простого примера, есть ли способ каким-то образом обернуть временный синхронный код в неблокирующей сопрограмме, что-то вроде этого:
async def sleep_async(delay):
# After calling sleep, loop should be released until sleep is done
yield sleep(delay)
return 'I slept asynchronously'
Ответы
Ответ 1
В конце концов я нашел ответ в этой теме. Метод, который я искал, это run_in_executor. Это позволяет асинхронно запускать синхронную функцию без блокировки цикла событий.
В примере sleep
я разместил выше, это может выглядеть так:
import asyncio
from time import sleep
from concurrent.futures import ProcessPoolExecutor
async def sleep_async(loop, delay):
# Can set executor to None if a default has been set for loop
await loop.run_in_executor(ProcessPoolExecutor(), sleep, delay)
return 'I slept asynchronously'
Также см. Следующий ответ → Как мы вызываем нормальную функцию, где ожидается сопрограмма?
Ответ 2
Вы можете использовать декоратор для переноса версии синхронизации в асинхронную версию.
import time
from functools import wraps, partial
def wrap(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
@wrap
def sleep_async(delay):
time.sleep(delay)
return 'I slept asynchronously'
или используйте aioify
% pip install aioify
затем
@aioify
def sleep_async(delay):
pass
Ответ 3
Не уверен, что слишком поздно, но вы также можете использовать декоратор для выполнения своей функции в потоке. ХОТИТЕ, обратите внимание, что это все еще будет не-кооперативная блокировка в отличие от асинхронной, которая является кооперативной блокировкой.
def wrap(func):
from concurrent.futures import ThreadPoolExecutor
pool=ThreadPoolExecutor()
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
future=pool.submit(func, *args, **kwargs)
return asyncio.wrap_future(future)
return run