Создание динамических очередей с помощью Сельдерея
Здесь мой сценарий:
Когда пользователь входит в систему на моем веб-сайте, я помещаю в очередь множество задач для данного пользователя (обычно каждая задача занимает 100 с мсек, а для каждого пользователя - 100 задач). Эти задачи помещаются в очередь по умолчанию в Celery Queue, и у меня работает 100 человек. Я использую websockets, чтобы показать прогресс пользователя в режиме реального времени по мере выполнения задач на бэкэнд. Жизнь хороша, если у меня только 1 или 2 пользователя.
Теперь, если я несколько одновременных пользователей войдут на мой сайт, последние пользователи поставлены в очередь за начальными пользователями и их задачи голодают (так как все задачи переходят в одну очередь). Мои мысли - создать динамическую очередь для каждого пользователя, чтобы обеспечить справедливость. Однако, согласно документации по сельдеву (http://docs.celeryproject.org/en/latest/userguide/routing.html#defining-queues), мне нужно определить статики статически.
Любые предложения по лучшим методам использования сельдерея для моего сценария?
Ответы
Ответ 1
http://docs.celeryproject.org/en/latest/userguide/workers.html#queues
celery -A proj control add_consumer foo -d worker1.local
То же самое можно выполнить динамически с помощью метода app.control.add_consumer():
app.control.add_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]
app.control.add_consumer('foo', reply=True,
destination=['[email protected]'])
Ответ 2
Вы можете динамически назначать задачу в очередь во время выполнения при вызове, ссылаясь на calling.html#routing-options.
Это будет работать, если вы включили CELERY_CREATE_MISSING_QUEUES
.