Как совместить сельдерей с асинчио?
Как создать обертку, которая делает задачи сельдерея похожими на asyncio.Task
? Или есть лучший способ интегрировать сельдерей с asyncio
?
@asksol, создатель сельдерея, сказал следующее:
Очень распространено использование Celery как распределенного слоя поверх асинхронных интерфейсов ввода-вывода (верхний совет: маршрутизация задач, связанных с CPU, для предпрофессионального работника означает, что они не будут блокировать цикл событий).
Но я не мог найти примеры кода специально для asyncio
структуры.
Ответы
Ответ 1
Это будет возможно из версии 5 сельдерея, как указано на официальном сайте:
http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface
- Следующая крупная версия Celery будет поддерживать только Python 3.5, планировали ли мы использовать новую асинхронную библиотеку.
- Отброшенная поддержка Python 2 позволит нам удалить огромное количество кода совместимости, и переход с Python 3.5 позволяет нам использовать функции ввода, async/await, asyncio и аналогичных концепций, которые не могут быть альтернативными в старых версиях.
Вышеуказанные были указаны из предыдущей ссылки.
Поэтому лучше всего подождать, пока версия 5.0 будет распространяться!
Между тем, счастливое кодирование :)
Ответ 2
Вы можете обернуть любой блокирующий вызов в run_in_executor
используя run_in_executor
как описано в документации, я также добавил в пример пользовательский тайм-аут:
def run_async_task(
target,
*args,
timeout = 60,
**keywords
) -> Future:
loop = asyncio.get_event_loop()
return asyncio.wait_for(
loop.run_in_executor(
executor,
functools.partial(target, *args, **keywords)
),
timeout=timeout,
loop=loop
)
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
run_async_task, your_task.delay, some_arg, some_karg=""
)
result = loop.run_until_complete(
run_async_task, async_result.result
)
Ответ 3
Самый простой способ сделать это - обернуть async
функцию в asgiref.sync.async_to_sync
(из asgiref
):
from asgiref.sync import async_to_sync
from celery.task import periodic_task
async def return_hello():
await sleep(1)
return 'hello'
@periodic_task(
run_every=2,
name='return_hello',
)
def task_return_hello():
async_to_sync(return_hello)()
Я взял этот пример из блога, который я написал.