Задача сельдерея, выполняющая больше задач
Я использую celerybeat, чтобы начать основную задачу, которая выполняет ряд второстепенных задач. У меня уже есть две задачи.
Есть ли способ сделать это? Обеспечивает ли сельдерей выполнение задач из задач?
Мой пример:
@task
def compute(users=None):
if users is None:
users = User.objects.all()
tasks = []
for user in users:
tasks.append(compute_for_user.subtask((user.id,)))
job = TaskSet(tasks)
job.apply_async() # raises a IOError: Socket closed
@task
def compute_for_user(user_id):
#do some stuff
compute
вызывается из celerybeat, но вызывает IOError при попытке запустить apply_async
. Любые идеи?
Ответы
Ответ 1
Чтобы ответить на ваши вступительные вопросы: Начиная с версии 2.0, Celery предоставляет простой способ запускать задачи из других задач. То, что вы называете "вторичными задачами", - это то, что он вызывает "подзадачи". См. Документацию для Наборы задач, подзадачи и обратные вызовы, которые @Paperino были достаточно любезны для ссылки.
Для версии 3.0, сельдерей изменился на группы для этого и других типов поведения.
Ваш код показывает, что вы уже знакомы с этим интерфейсом. Кажется, что ваш фактический вопрос: "Почему я получаю" Socket Closed "IOError
, когда я пытаюсь запустить свой набор подзадач?" Я не думаю, что кто-то может ответить на это, потому что вы не предоставили достаточно информации о своей программе. Ваша выдержка не может быть запущена как есть, поэтому мы не можем исследовать проблему, которую вы имеете для себя. Пожалуйста, разместите stacktrace, снабженный IOError
, и с какой-либо удачей придет тот, кто может помочь вам в вашем crasher.
Ответ 2
И так как версия 3.0 "TaskSet" больше не является термином... Группы, цепочки и аккорды в качестве специального типа подзадачи - это новая вещь, см. http://docs.celeryproject.org/en/3.1/whatsnew-3.0.html#group-chord-chain-are-now-subtasks
Ответ 3
Вы можете использовать что-то вроде этого (поддержка в версии 3.0)
g = group(compute_for_user.s(user.id) for user in users)
g.apply_async()