ACTIVEMQ - список подписчиков издателя hello world example
Есть две программы: абонент и издатель...
Подписчик может помещать сообщение в тему, и сообщение отправляется успешно.
Когда я проверяю сервер activemq в моем браузере, он показывает 1 msg в очереди. Но когда я запускаю потребительский код, он не получает сообщение
Вот код производителя:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class producer {
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
// JMS messages are sent and received using a Session. We will
// create here a non-transactional session object. If you want
// to use transactions you should set the first parameter to 'true'
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("testt");
MessageProducer producer = session.createProducer(topic);
// We will send a small text message saying 'Hello'
TextMessage message = session.createTextMessage();
message.setText("HELLO JMS WORLD");
// Here we are sending the message!
producer.send(message);
System.out.println("Sent message '" + message.getText() + "'");
connection.close();
}
}
После запуска этого кода вывод на консоли:
26 Jan, 2012 2:30:04 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Sent message 'HELLO JMS WORLD'
И вот потребительский код:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class consumer {
// URL of the JMS server
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// Name of the topic from which we will receive messages from = " testt"
public static void main(String[] args) throws JMSException {
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("testt");
MessageConsumer consumer = session.createConsumer(topic);
MessageListener listner = new MessageListener() {
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message"
+ textMessage.getText() + "'");
}
} catch (JMSException e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
}
};
consumer.setMessageListener(listner);
connection.close();
}
}
После запуска этого кода он ничего не показывает.
Может ли кто-нибудь помочь мне преодолеть эту проблему?
Ответы
Ответ 1
Ваша проблема заключается в том, что ваш потребитель работает, а затем немедленно отключается.
Попробуйте добавить это в своего потребителя:
consumer.setMessageListener(listner);
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
connection.close();
Это будет ждать, пока вы не нажмете клавишу до остановки.
Другие вещи, которые следует учитывать:
- Используйте блок finally для закрытия
- Соглашения о присвоении имен Java поощряют использование прописных букв для первой буквы класса
Ответ 2
Основная проблема (помимо быстрого закрытия приложения) заключается в том, что вы отправляете тему. Темы не сохраняют сообщения, поэтому, если вы запускаете приложение, которое производит и затем запускает пользователя, потребитель ничего не получит, потому что он не был подписан на эту тему в момент отправки сообщения. Если вы исправите проблему завершения работы, а затем запустите пользователя в одном терминале и затем запустите продюсер, вы должны увидеть сообщение, полученное вашим потребителем. Если вам требуется сохранение сообщения, вам необходимо использовать очередь, которая будет удерживаться в сообщении до тех пор, пока кто-то не будет ее использовать.
Ответ 3
только некоторые:
- работа с очередью не является темой. сообщения в темах будут отброшены, когда потребитель не будет доступен, они НЕ сохраняются.
- добавьте connection.start() после настройки прослушивателя сообщений. вы должны начать соединение, когда все потребители/производители настроены правильно.
- Подождите некоторое время перед закрытием соединения.
тема, вероятно, станет вашим самым важным источником сбоев.
Ответ 4
Ваш класс производителя верен. Он работает плавно.
Но ваш потребитель неверен, и вам нужно его изменить.
-
Сначала добавьте setClientID ( "any_string_value" ) после создания соединения объекта;
например: Connection connection = connectionFactory.createConnection();
// need to setClientID value, any string value you wish
connection.setClientID("12345");
-
во-вторых, используйте метод createDurableSubscriber() вместо createConsumer() для передачи сообщения через тему.
MessageConsumer consumer = session.createDurableSubscriber(topic,"SUB1234");
Ниже приведен модифицированный класс comsumer:
package mq.test;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class consumer {
// URL of the JMS server
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// Name of the topic from which we will receive messages from = " testt"
public static void main(String[] args) throws JMSException {
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
// need to setClientID value, any string value you wish
connection.setClientID("12345");
try{
connection.start();
}catch(Exception e){
System.err.println("NOT CONNECTED!!!");
}
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test_data");
//need to use createDurableSubscriber() method instead of createConsumer() for topic
// MessageConsumer consumer = session.createConsumer(topic);
MessageConsumer consumer = session.createDurableSubscriber(topic,
"SUB1234");
MessageListener listner = new MessageListener() {
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message"
+ textMessage.getText() + "'");
}
} catch (JMSException e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
}
};
consumer.setMessageListener(listner);
//connection.close();
}
}
Теперь ваш код будет успешно выполнен.