График задач сельдерея (выполнение задачи выполняется только по одному)

У меня есть задача, примерно такая:

@task()
def async_work(info):
    ...

В любой момент я могу вызвать async_work с некоторой информацией. По какой-то причине мне нужно убедиться, что одновременно выполняется только одно async_work, другой запрос на вызов должен ждать.

Итак, я придумал следующий код:

is_locked = False    
@task()
    def async_work(info):
        while is_locked:
            pass
        is_locked = True
        ...
        is_locked = False

Но он говорит, что он недействителен для доступа к локальным переменным... Как его решить?

Ответы

Ответ 1

Недействительно для доступа к локальным переменным, так как у вас может быть несколько рабочих сельдерея, выполняющих задачи. И эти рабочие могут быть даже на разных хозяевах. Таким образом, в основном существует столько переменных is_locked, сколько работает множество работников сельдерея  ваша задача async_work. Таким образом, даже если ваш код не приведет к возникновению ошибок, вы не получите желаемого эффекта.

Чтобы достичь цели, вам нужно настроить сельдерей на запуск только одного работника. Поскольку любой рабочий может обрабатывать одну задачу в любой момент времени, вы получаете то, что вам нужно.

EDIT:

Согласно Руководство для рабочих > Concurrency:

По умолчанию мультипроцессор используется для выполнения параллельного выполнения задач, но вы также можете использовать Eventlet. Число работников процессы/потоки могут быть изменены с помощью аргумента --concurrencyи по умолчанию используется количество процессоров, доступных на аппарате.

Таким образом, вам нужно запустить рабочего следующим образом:

$ celery worker --concurrency=1

ИЗМЕНИТЬ 2:

Удивительно, что есть другое решение, более того, даже в официальных документах, см. Обеспечение выполнения задачи только по одной за раз.

Ответ 2

Вы, вероятно, не хотите использовать concurrency=1 для своих работников из сельдерея - вы хотите, чтобы ваши задачи обрабатывались одновременно. Вместо этого вы можете использовать какой-то механизм блокировки. Просто убедитесь, что тайм-аут для кеша больше, чем время для завершения вашей задачи.

Redis

import redis
from contextlib import contextmanager

redis_client = redis.Redis(host='localhost', port=6378)


@contextmanager
def redis_lock(lock_name):
    """Yield 1 if specified lock_name is not already set in redis. Otherwise returns 0.

    Enables sort of lock functionality.
    """
    status = redis_client.set(lock_name, 'lock', nx=True)
    try:
        yield status
    finally:
        redis_client.delete(lock_name)


@task()
def async_work(info):
    with redis_lock('my_lock_name') as acquired:
        do_some_work()

Memcache

Пример вдохновлен сельдерейской документацией

from contextlib import contextmanager
from django.core.cache import cache

@contextmanager
def memcache_lock(lock_name):
    status = cache.add(lock_name, 'lock')
    try:
        yield status
    finally:
        cache.delete(lock_name)


@task()
def async_work(info):
    with memcache_lock('my_lock_name') as acquired:
        do_some_work()