График задач сельдерея (выполнение задачи выполняется только по одному)
У меня есть задача, примерно такая:
@task()
def async_work(info):
...
В любой момент я могу вызвать async_work с некоторой информацией. По какой-то причине мне нужно убедиться, что одновременно выполняется только одно async_work, другой запрос на вызов должен ждать.
Итак, я придумал следующий код:
is_locked = False
@task()
def async_work(info):
while is_locked:
pass
is_locked = True
...
is_locked = False
Но он говорит, что он недействителен для доступа к локальным переменным...
Как его решить?
Ответы
Ответ 1
Недействительно для доступа к локальным переменным, так как у вас может быть несколько рабочих сельдерея, выполняющих задачи. И эти рабочие могут быть даже на разных хозяевах. Таким образом, в основном существует столько переменных is_locked
, сколько работает множество работников сельдерея
ваша задача async_work
. Таким образом, даже если ваш код не приведет к возникновению ошибок, вы не получите желаемого эффекта.
Чтобы достичь цели, вам нужно настроить сельдерей на запуск только одного работника. Поскольку любой рабочий может обрабатывать одну задачу в любой момент времени, вы получаете то, что вам нужно.
EDIT:
Согласно Руководство для рабочих > Concurrency:
По умолчанию мультипроцессор используется для выполнения параллельного выполнения задач, но вы также можете использовать Eventlet. Число работников процессы/потоки могут быть изменены с помощью аргумента --concurrency
и по умолчанию используется количество процессоров, доступных на аппарате.
Таким образом, вам нужно запустить рабочего следующим образом:
$ celery worker --concurrency=1
ИЗМЕНИТЬ 2:
Удивительно, что есть другое решение, более того, даже в официальных документах, см. Обеспечение выполнения задачи только по одной за раз.
Ответ 2
Вы, вероятно, не хотите использовать concurrency=1
для своих работников из сельдерея - вы хотите, чтобы ваши задачи обрабатывались одновременно. Вместо этого вы можете использовать какой-то механизм блокировки. Просто убедитесь, что тайм-аут для кеша больше, чем время для завершения вашей задачи.
Redis
import redis
from contextlib import contextmanager
redis_client = redis.Redis(host='localhost', port=6378)
@contextmanager
def redis_lock(lock_name):
"""Yield 1 if specified lock_name is not already set in redis. Otherwise returns 0.
Enables sort of lock functionality.
"""
status = redis_client.set(lock_name, 'lock', nx=True)
try:
yield status
finally:
redis_client.delete(lock_name)
@task()
def async_work(info):
with redis_lock('my_lock_name') as acquired:
do_some_work()
Memcache
Пример вдохновлен сельдерейской документацией
from contextlib import contextmanager
from django.core.cache import cache
@contextmanager
def memcache_lock(lock_name):
status = cache.add(lock_name, 'lock')
try:
yield status
finally:
cache.delete(lock_name)
@task()
def async_work(info):
with memcache_lock('my_lock_name') as acquired:
do_some_work()