Как обрабатывать ошибки в сельдерее Task.map
Скажем, у меня есть две задачи сельдерея:
@celery.task
def run_flakey_things(*args, **kwargs):
return run_flakey_and_synchronous_thing.map(
xrange(10)
).apply_async()
@celery.task
def run_flakey_and_synchronous_thing(a):
if a % 5:
return a
raise RuntimeError(a)
Итак, когда вы идете запустить run_flakey_things
, он сразу же упадет, потому что первый элемент в последовательности вызывает исключение. Я хотел бы запустить задачу для всех элементов в последовательности в порядке, как это делает карта, но продолжить работу над исключением, создавая новое исключение, когда все это завершено.
Идеал был бы, если бы я мог добавить on_failure к объекту xmap
перед его применением, но xmap
не является полным объектом задачи.
Ответы
Ответ 1
Вы можете изменить возвращаемое значение для указания и распространения ошибок. Например:
import traceback
@celery.task
def run_flakey_things(*args, **kwargs):
return run_flakey_and_synchronous_thing.map(
xrange(10)
).apply_async()
@celery.task
def run_flakey_and_synchronous_thing(a):
d = {'value': None, 'error': None}
try:
if a % 5:
d['value'] = a
except:
d['error'] = traceback.format_exc()
return d
Затем вы можете сделать несколько вещей:
1) Измените run_flakey_things, чтобы группировать вещи с ошибками и без них, возвращать их без ошибок и сообщать об ошибках.
2) Обрабатывайте это поведение во всем, что вызывает run_flakey_things
Ответ 2
Я не использовал сельдерей. Однако вы можете добавить метод __call__
в подзадачу? Что-то вроде:
class MyTask:
def __call__(self, *args, **kwargs):
try:
super(self, MyTask).__call__(*args, **kwargs)
except Exception, exc:
# store exc to be raised later in the main task
@celery.task(base=MyTask):
def run_flakey_and_synchronous_thing(a):
# ...
Затем для основной задачи run_flakey_things
, возможно, переопределите apply_async()
, чтобы получить сохраненное исключение, как в Как переопределить __call__ в сельдерее на главном?.