Добавьте n задач в очередь сельдерея и дождитесь результатов
Я бы добавил несколько задач в очередь сельдерея и дождался результатов. У меня есть разные идеи, как я мог бы достичь этого, используя некоторую форму общего хранилища (memcached, redis, db и т.д.), Однако я бы подумал, что Celery может обрабатывать автоматически, но я не могу найти какие-либо ресурсы в Интернете.
Пример кода
def do_tasks(b):
for a in b:
c.delay(a)
return c.all_results_some_how()
Ответы
Ответ 1
Для Celery> = 3.0, TaskSet не рекомендуется в пользу группы.
from celery import group
from tasks import add
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
add.s(16, 16),
add.s(32, 32),
])
Начать группу в фоновом режиме:
result = job.apply_async()
Wait:
result.join()
Ответ 2
Task.delay
возвращает AsyncResult
. Используйте AsyncResult.get
, чтобы получить результат каждого задания.
Для этого необходимо сохранить ссылки на задачи.
def do_tasks(b):
tasks = []
for a in b:
tasks.append(c.delay(a))
return [t.get() for t in tasks]
Или вы можете использовать ResultSet
:
ОБНОВЛЕНИЕ: ResultSet
устарело, см. @laffuste ответ.
def do_tasks(b):
rs = ResultSet([])
for a in b:
rs.add(c.delay(a))
return rs.get()
Ответ 3
У меня есть подозрение, что вы действительно не хотите задержки, но асинхронную особенность Celery.
Я думаю, вы действительно хотите TaskSet:
from celery.task.sets import TaskSet
from someapp.tasks import sometask
def do_tasks(b):
job = TaskSet([sometask.subtask((a,)) for a in b])
result = job.apply_async()
# might want to handle result.successful() == False
return result.join()