Добавьте n задач в очередь сельдерея и дождитесь результатов

Я бы добавил несколько задач в очередь сельдерея и дождался результатов. У меня есть разные идеи, как я мог бы достичь этого, используя некоторую форму общего хранилища (memcached, redis, db и т.д.), Однако я бы подумал, что Celery может обрабатывать автоматически, но я не могу найти какие-либо ресурсы в Интернете.

Пример кода

def do_tasks(b):
    for a in b:
        c.delay(a)

    return c.all_results_some_how()

Ответы

Ответ 1

Для Celery> = 3.0, TaskSet не рекомендуется в пользу группы.

from celery import group
from tasks import add

job = group([
             add.s(2, 2),
             add.s(4, 4),
             add.s(8, 8),
             add.s(16, 16),
             add.s(32, 32),
])

Начать группу в фоновом режиме:

result = job.apply_async()

Wait:

result.join()

Ответ 2

Task.delay возвращает AsyncResult. Используйте AsyncResult.get, чтобы получить результат каждого задания.

Для этого необходимо сохранить ссылки на задачи.

def do_tasks(b):
    tasks = []
    for a in b:
        tasks.append(c.delay(a))
    return [t.get() for t in tasks]

Или вы можете использовать ResultSet:

ОБНОВЛЕНИЕ: ResultSet устарело, см. @laffuste ответ.

def do_tasks(b):
    rs = ResultSet([])
    for a in b:
        rs.add(c.delay(a))
    return rs.get()

Ответ 3

У меня есть подозрение, что вы действительно не хотите задержки, но асинхронную особенность Celery.

Я думаю, вы действительно хотите TaskSet:

from celery.task.sets import TaskSet
from someapp.tasks import sometask

def do_tasks(b):
    job = TaskSet([sometask.subtask((a,)) for a in b])
    result = job.apply_async()
    # might want to handle result.successful() == False
    return result.join()