Ответ 1
Я думаю, что у вас есть несколько проблем с начальным пониманием. Честно говоря, я немного удивлен, увидев следующее: both need 5 threads to handle the volume
. Как вы определили, что вам нужен этот точный номер? У вас есть гарантии, что 5 потоков будет достаточно?
RabbitMQ настроен и проверен временем, так что это все о правильном дизайне и эффективная обработка сообщений.
Попробуйте проанализировать проблему и найти правильное решение. BTW, сама очередь сообщений не будет предоставлять никаких гарантий, что у вас действительно хорошее решение. Вам нужно понять, что вы делаете, а также провести дополнительное тестирование.
Как вы определенно знаете, есть много вариантов:
Я использую макет B
как самый простой способ проиллюстрировать проблему 1
производителя N
потребителей. Так как вы так обеспокоены пропускной способностью. Кстати, как вы могли бы ожидать, RabbitMQ ведет себя неплохо (источник). Обратите внимание на prefetchCount
, я расскажу об этом позже:
Таким образом, логика обработки сообщений, вероятно, является правильным местом для обеспечения достаточной пропускной способности. Естественно, вы можете охватить новый поток каждый раз, когда вам нужно обработать сообщение, но в конечном итоге такой подход убьет вашу систему. В основном, больше потоков у вас больше латентности вы получите (вы можете проверить закон Амдаля, если хотите).
(см. Закон Амдаля, иллюстрированный)
Совет # 1: Будьте осторожны с потоками, используйте ThreadPools (подробнее)
Пул потоков можно описать как набор объектов Runnable (рабочая очередь) и соединения запущенных потоков. Эти потоки постоянно работают и проверяют рабочий запрос на новую работу. Если есть новая работа, которую нужно выполнить, они выполняют этот Runnable. Нить сам класс предоставляет способ, например. выполнить (Runnable r), чтобы добавить новый Runnable object to work queue.
public class Main {
private static final int NTHREDS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Runnable worker = new MyRunnable(10000000L + i);
executor.execute(worker);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
executor.awaitTermination();
System.out.println("Finished all threads");
}
}
Совет № 2: Будьте осторожны с накладными сообщениями
Я бы сказал, что это очевидная техника оптимизации. Скорее всего, вы будете отправлять небольшие и простые в обработке сообщения. Весь подход заключен в том, что небольшие сообщения должны постоянно устанавливаться и обрабатываться. Большие сообщения в конце концов будут играть плохую шутку, поэтому лучше избегать этого.
Так что лучше отправить крошечные фрагменты информации, но как насчет обработки? Каждый раз, когда вы отправляете задание, накладные расходы накладные. Пакетная обработка может быть очень полезна в случае высокой скорости входящего сообщения.
Например, допустим, что у нас есть простая логика обработки сообщений, и мы не хотим иметь потоки с конкретными потоками каждый раз, когда сообщение обрабатывается. Чтобы оптимизировать этот очень простой CompositeRunnable can be introduced
:
class CompositeRunnable implements Runnable {
protected Queue<Runnable> queue = new LinkedList<>();
public void add(Runnable a) {
queue.add(a);
}
@Override
public void run() {
for(Runnable r: queue) {
r.run();
}
}
}
Или сделайте то же самое несколько иначе, собрав сообщения для обработки:
class CompositeMessageWorker<T> implements Runnable {
protected Queue<T> queue = new LinkedList<>();
public void add(T message) {
queue.add(message);
}
@Override
public void run() {
for(T message: queue) {
// process a message
}
}
}
Таким образом, вы можете обрабатывать сообщения более эффективно.
Совет № 3: оптимизация обработки сообщений
Несмотря на то, что вы знаете, что можно обрабатывать сообщения параллельно (Tip #1
) и уменьшать накладные расходы на обработку (Tip #2
), вам нужно делать все быстро. Избыточные этапы обработки, тяжелые циклы и т.д. Могут сильно повлиять на производительность. См. Интересные тематические исследования:
Совет № 4: Управление соединениями и каналами
- Запуск нового канала в существующем соединении включает в себя одну сеть round trip - запуск нового соединения занимает несколько.
- Каждое соединение использует файловый дескриптор на сервере. Каналов нет.
- Публикация большого сообщения на одном канале блокирует соединение в то время как он гаснет. Кроме того, мультиплексирование довольно прозрачно.
- Связи, которые публикуются, могут блокироваться, если сервер перегружен - это хорошая идея отделить публикацию и потребление соединения
- Будьте готовы к обработке пакетов сообщений
(источник)
Обратите внимание, что все советы прекрасно работают вместе. Не стесняйтесь, дайте мне знать, если вам нужны дополнительные данные.
Полный пример потребителя (источник)
Обратите внимание на следующее:
- channel.basicQos(упреждающий)- Как вы видели ранее
prefetchCount
, может оказаться очень полезным:Эта команда позволяет потребителю выбрать окно предварительной выборки, которое определяет количество неподтвержденных сообщений, которые он готов Получать. Установив счетчик предварительной выборки на ненулевое значение, брокер не будет доставлять потребителю никаких сообщений, которые предел. Чтобы переместить окно вперед, потребитель должен признать получение сообщения (или группы сообщений).
- ExecutorService threadExecutor - вы можете указать правильно настроенную службу-исполнитель.
Пример:
static class Worker extends DefaultConsumer {
String name;
Channel channel;
String queue;
int processed;
ExecutorService executorService;
public Worker(int prefetch, ExecutorService threadExecutor,
, Channel c, String q) throws Exception {
super(c);
channel = c;
queue = q;
channel.basicQos(prefetch);
channel.basicConsume(queue, false, this);
executorService = threadExecutor;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
Runnable task = new VariableLengthTask(this,
envelope.getDeliveryTag(),
channel);
executorService.submit(task);
}
}
Вы также можете проверить следующее: