работник по выгрузке сельдерея после выполнения конкретной задачи
Я использую сельдерей (соло-пул с параллелизмом = 1), и я хочу, чтобы я мог закрыть работника после выполнения конкретной задачи. Оговорка заключается в том, что я хочу избежать возможности того, чтобы работник взял на себя все дальнейшие задачи после этого.
Здесь моя попытка в схеме:
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun
app = Celery()
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
raise WorkerShutdown()
Однако, когда я запускаю рабочего
celery -A celeryapp worker --concurrency=1 --pool=solo
и выполнить задачу
add.delay(1,4)
Я получаю следующее:
-------------- [email protected] v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: __main__:0x7f596896ce90
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)
Задача переупорядочивается и снова запускается на другого работника, что приводит к циклу.
Это также происходит, когда я перемещаю исключение WorkerShutdown
внутри самой задачи.
@app.task
def add(x, y):
print(x + y)
raise WorkerShutdown()
Есть ли способ, которым я могу закрыть работника после конкретной задачи, избегая этого неудачного побочного эффекта?
Ответы
Ответ 1
Рекомендуемый процесс для увольнения работника - отправить сигнал TERM
. Это приведет к отключению работника сельдерея после выполнения любых текущих выполняемых задач. Если вы отправите сигнал QUIT
в главный рабочий процесс, рабочий немедленно отключится.
Документы celery, однако, обычно обсуждают это с точки зрения управления celery из командной строки или через systemd/initd, но celery дополнительно предоставляет API удаленного управления работником через celery.app.control
.
Вы можете отозвать задачу, чтобы работники не могли ее выполнить. Это должно предотвратить петлю, которую вы испытываете. Кроме того, управление поддерживает отключение работника таким же образом.
Итак, я думаю, что следующее даст вам желаемое поведение.
@app.task(bind=True)
def shutdown(self):
app.control.revoke(self.id) # prevent this task from being executed again
app.control.shutdown() # send shutdown signal to all workers
Поскольку в настоящее время невозможно подтвердить задачу изнутри задачи, а затем продолжить выполнение указанной задачи, этот метод использования revoke
обходит эту проблему, так что, даже если задача снова ставится в очередь, новый работник просто игнорирует ее.
В качестве альтернативы, следующее также предотвратит повторное выполнение поставленной задачи...
@app.task(bind=True)
def some_task(self):
if self.request.delivery_info['redelivered']:
raise Ignore() # ignore if this task was redelivered
print('This should only execute on first receipt of task')
Также стоит отметить, что AsyncResult
также имеет метод revoke
который вызывает self.app.control.revoke
для вас.
Ответ 2
Если вы завершите работу работника, после завершения задачи он не будет повторно перезагружен.
@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
app.control.broadcast('shutdown')
Это будет изящно завершать работу после завершения заданий.
[2018-04-01 18:44:14,627: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-04-01 18:44:14,656: INFO/MainProcess] mingle: searching for neighbors
[2018-04-01 18:44:15,719: INFO/MainProcess] mingle: all alone
[2018-04-01 18:44:15,742: INFO/MainProcess] [email protected] ready.
[2018-04-01 18:46:28,572: INFO/MainProcess] Received task: celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b]
[2018-04-01 18:46:28,585: INFO/ForkPoolWorker-4] Task celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b] succeeded in 0.005628278013318777s: 3
[2018-04-01 18:46:28,665: WARNING/MainProcess] Got shutdown from remote
Примечание: трансляция завершает работу всех работников. Если вы хотите закрыть задание конкретного работника, начните работу с имени
celery -A celeryapp worker -n self_killing --concurrency=1 --pool=solo
Теперь вы можете отключить его с помощью целевого параметра.
app.control.broadcast('shutdown', destination=['[email protected]_killing'])
Ответ 3
Если вам нужно выключить конкретного работника и заранее не знать его имени, вы можете получить его из свойств задачи. Основываясь на ответах выше, вы можете использовать:
app.control.shutdown(destination=[self.request.hostname])
или же
app.control.broadcast('shutdown', destination=[self.request.hostname])
Замечания:
- Работник должен начинаться с имени (опция
'-n'
); - Задача должна быть определена с параметром
bind=True
.