RabbitMQ и каналы безопасности потоков Java
в этом руководстве https://www.rabbitmq.com/api-guide.html Ребята из RabbitMQ заявляют:
Каналы и Concurrency Соображения (безопасность потока)
Канальные экземпляры не должны делиться между потоками. Приложения должны предпочесть использовать канал для потока, а не использовать один и тот же канал для нескольких потоков. Хотя некоторые операции над каналами безопасны для одновременного вызова, некоторые из них не приводят к некорректному чередованию кадров на проводе. Совместное использование каналов между потоками также будет мешать работе * Publisher Confirms.
Безопасность потоков очень важна, поэтому я старался быть настолько старательным, насколько это возможно, но здесь проблема:
У меня есть это приложение, которое получает сообщения от Rabbit. Когда сообщение получено, оно обрабатывает его, а затем запускается, когда оно выполняется. Приложение может обрабатывать всего 2 элемента одновременно в фиксированном пуле потоков с двумя потоками. Предварительная выборка QOS для Rabbit равна 2, потому что я не хочу, чтобы приложение было больше, чем может обрабатывать в течение периода времени.
Теперь моя потребительская доставкаДоставка:
Task run = new Task(JSON.parse(message));
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));
На этом этапе вы уже поняли, что TestWrapperThread выполняет вызов channel.basicAck(deliveryTag, false);
как последнюю операцию.
По моему пониманию документации, это неверно и потенциально вредно, потому что канал не является потокобезопасным, и это поведение может повредить. Но как я должен это делать? Я имею в виду, у меня есть несколько идей, но они будут делать все более сложным, и я хотел бы понять, действительно ли это необходимо или нет.
Заранее спасибо
Ответы
Ответ 1
Я полагаю, что вы используете Channel
только для своего потребителя, а не для других операций, таких как публикация и т.д.
В вашем случае единственная потенциальная проблема здесь:
channel.basicAck(deliveryTag, false);
потому что вы вызываете это через два потока, btw эта операция безопасна, если вы видите Java-код:
класс ChannelN.java
вызывает:
public void basicAck(long deliveryTag, boolean multiple)
throws IOException
{
transmit(new Basic.Ack(deliveryTag, multiple));
}
см. код github для ChannelN.java
Метод transmit
внутри AMQChannel использует:
public void transmit(Method m) throws IOException {
synchronized (_channelMutex) {
transmit(new AMQCommand(m));
}
}
_channelMutex
является protected final Object _channelMutex = new Object();
созданный с помощью класса.
см. код github для AMQChannel.java
ИЗМЕНИТЬ
Как вы можете прочитать в официальной документации, "некоторые" операции потокобезопасны, теперь неясно, какие из них.
Я изучил код, и я думаю, что нет проблем с вызовом ACK для большего количества потоков.
Надеюсь, это поможет.
EDIT2
Добавляю также комментарий Николаса:
Обратите внимание, что потребление (basicConsume) и acking из более чем одного потока - это общий шаблон rabbitmq, который уже используется java-клиентом.
Таким образом, вы можете использовать его в безопасности.