Как совместить сельдерей с асинчио?

Как создать обертку, которая делает задачи сельдерея похожими на asyncio.Task? Или есть лучший способ интегрировать сельдерей с asyncio?

@asksol, создатель сельдерея, сказал следующее:

Очень распространено использование Celery как распределенного слоя поверх асинхронных интерфейсов ввода-вывода (верхний совет: маршрутизация задач, связанных с CPU, для предпрофессионального работника означает, что они не будут блокировать цикл событий).

Но я не мог найти примеры кода специально для asyncio структуры.

Ответы

Ответ 1

Это будет возможно из версии 5 сельдерея, как указано на официальном сайте:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. Следующая крупная версия Celery будет поддерживать только Python 3.5, планировали ли мы использовать новую асинхронную библиотеку.
  2. Отброшенная поддержка Python 2 позволит нам удалить огромное количество кода совместимости, и переход с Python 3.5 позволяет нам использовать функции ввода, async/await, asyncio и аналогичных концепций, которые не могут быть альтернативными в старых версиях.

Вышеуказанные были указаны из предыдущей ссылки.

Поэтому лучше всего подождать, пока версия 5.0 будет распространяться!

Между тем, счастливое кодирование :)

Ответ 2

Вы можете обернуть любой блокирующий вызов в run_in_executor используя run_in_executor как описано в документации, я также добавил в пример пользовательский тайм-аут:

def run_async_task(
    target,
    *args,
    timeout = 60,
    **keywords
) -> Future:
    loop = asyncio.get_event_loop()
    return asyncio.wait_for(
        loop.run_in_executor(
            executor,
            functools.partial(target, *args, **keywords)
        ),
        timeout=timeout,
        loop=loop
    )
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
    run_async_task, your_task.delay, some_arg, some_karg="" 
)
result = loop.run_until_complete(
    run_async_task, async_result.result 
)

Ответ 3

Самый простой способ сделать это - обернуть async функцию в asgiref.sync.async_to_sync (из asgiref):

from asgiref.sync import async_to_sync
from celery.task import periodic_task


async def return_hello():
    await sleep(1)
    return 'hello'


@periodic_task(
    run_every=2,
    name='return_hello',
)
def task_return_hello():
    async_to_sync(return_hello)()

Я взял этот пример из блога, который я написал.