Ответ 1
Вы можете либо написать пользовательский загрузчик, либо использовать сигналы.
У загрузчиков есть метод on_task_init
, который вызывается, когда задача должна быть выполнена,
и on_worker_init
, который вызывается основным процессом производства сельдерея и сельдерея.
Использование сигналов, вероятно, является самым простым, доступны следующие сигналы:
0.8.x:
-
task_prerun(task_id, task, args, kwargs)
Отправляется, когда задача должна быть выполнена рабочим (или локально если используется
apply
/или еслиCELERY_ALWAYS_EAGER
установлено). -
task_postrun(task_id, task, args, kwargs, retval)
Отправляется после выполнения задачи в тех же условиях, что и выше. -
task_sent(task_id, task, args, kwargs, eta, taskset)
Вызывается при выполнении задачи (не подходит для длительных операций)
Дополнительные сигналы, доступные в 0.9.x(текущая ведущая ветвь на github):
-
worker_init()
Вызывается при запуске celeryd (перед инициализацией задачи, поэтому, если на система, поддерживающая
fork
, любые изменения памяти будут скопированы в дочерний рабочие процессы). -
worker_ready()
Вызывается, когда celeryd может получать задания.
-
worker_shutdown()
Вызывается при закрытии celeryd.
Здесь пример, который предварительно вычисляет что-то при первом запуске задачи:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Если вы хотите, чтобы функция выполнялась для всех задач, просто пропустите аргумент sender
.