Как ограничить одновременное потребление сообщений на основе критериев

Сценарий (я упростил):

  • Многие конечные пользователи могут запускать задания (тяжелые задания, например рендеринг большого PDF файла), из внешнего веб-приложения (производителя).
  • Задачи отправляются в одну простую очередь RabbitMQ.
  • Многие рабочие приложения (потребители) обрабатывают эти задания и записывают результаты обратно в хранилище данных.

Этот довольно стандартный шаблон работает нормально.

Проблема: если пользователь запустил 10 заданий за одну минуту, и в это время дня работает только 10 рабочих приложений, этот конечный пользователь эффективно берет на себя все время вычисления.

Вопрос. Как я могу убедиться, что только одно задание на конечного пользователя обрабатывается в любое время? ( Бонус: некоторые конечные пользователи (например, админы) не должны быть дросселированы)

Кроме того, я не хочу, чтобы приложение переднего конца блокировало конечных пользователей от запуска параллельных заданий. Я просто хочу, чтобы конечные пользователи дождались, что их параллельные задания будут завершены по одному за раз.

Решение?. Должен ли я динамически создавать одну автоматическую удаление эксклюзивной очереди для конечных пользователей? Если да, как я могу сообщить рабочим приложениям начать использовать эту очередь? Как обеспечить, чтобы один (и только один) рабочий из этой очереди потреблял?

Ответы

Ответ 1

Такая особенность не предусмотрена кроликомMQ. Однако вы можете реализовать его следующим образом. Вам придется использовать опрос, хотя это не так эффективно (по сравнению с подпиской/публикацией). Вам также придется использовать Zookeeper для координации между разными работниками.

Вы создадите 2 очереди: 1 очередь с высоким приоритетом (для заданий администратора) и 1 очередь с низким приоритетом (для обычных заданий пользователя). 10 рабочих будут получать сообщения из обеих очередей. Каждый работник будет выполнять бесконечный цикл (с идеальными интервалами сна, когда очереди пусты), где он попытается получить сообщение из каждой очереди взаимозаменяемо:

  • Для очереди с высоким приоритетом рабочий просто получает сообщение, обрабатывает его и подтверждает очередь.
  • Для очереди с низким приоритетом рабочий пытается зафиксировать блокировку в Zookeeper (путем записи в определенный файл-znode), а в случае успеха - считывает сообщение, обрабатывает его и подтверждает. Если запись zookeeper не удалась, кто-то еще удерживает блокировку, поэтому этот рабочий пропускает этот шаг и повторяет цикл.

Ответ 2

Вам нужно будет что-то создать самостоятельно, чтобы реализовать это, как говорит Димос. Вот альтернативная реализация, которая требует дополнительной очереди и некоторого постоянного хранилища.

  • Как и существующая очередь для заданий, создайте "обрабатываемую очередь заданий". В эту очередь добавляются только те задания, которые соответствуют вашим бизнес-правилам.
  • Создайте пользователя (с именем "Ограничитель" ) для очереди заданий. Ограничитель также нуждается в постоянном хранилище (например, Redis или реляционной базе данных) для записи того, какие задания в настоящее время обрабатываются. Ограничитель читает из очереди заданий и записывает в обрабатываемую очередь заданий.
  • Когда рабочее приложение завершает обработку задания, оно добавляет к заданию очередь событие "законченное задание".

    ------------     ------------     ----------- 
    | Producer | -> () job queue ) -> | Limiter | 
    ------------     ------------     ----------- 
                         ^                |                    
                         |                V                    
                         |     ------------------------       
                         |    () processable job queue )  
           job finished  |     ------------------------       
                         |                |
                         |                V
                         |     ------------------------
                         \-----| Job Processors (x10) |
                               ------------------------
    

Логика ограничителя такова:

  • Когда получено сообщение о задании, проверьте постоянное хранилище, чтобы узнать, запущено ли задание для текущего пользователя:
    • Если нет, запишите задание в хранилище как запущенное и добавьте сообщение задания в очередь обрабатываемых заданий.
    • Если существующее задание выполняется, запишите задание в хранилище в качестве незавершенного задания.
    • Если задание для пользователя-администратора, всегда добавляйте его в очередь обрабатываемых заданий.
  • Когда получено сообщение "законченное задание", удалите это задание из списка "выполняемые задания" в постоянном хранилище. Затем проверьте хранилище на ожидающее задание для этого пользователя:
    • Если задание найдено, измените статус этого задания в ожидании работы и добавьте его в очередь обработки.
    • В противном случае ничего не делать.
  • Только один экземпляр процесса ограничения может запускаться одновременно. Это может быть достигнуто либо за счет запуска только одного экземпляра процесса лимитера, либо путем использования механизмов блокировки в постоянном хранилище.

Это довольно тяжелый вес, но вы всегда можете проверить постоянное хранилище, если вам нужно посмотреть, что происходит.