ActiveMQ(05):JMS的API結構、開發步驟與Topic

一、JMS的API結構

ActiveMQ(05):JMS的API結構、開發步驟與Topic

二、一個JMS應用的基本步驟

1:創建一個JMS connection factory

2:通過connection factory來創建JMS connection

3:啟動JMS connection

4:通過connection創建JMS session

5:創建JMS destination

6:創建JMS producer,或者創建JMS message,並設置destination

7:創建JMS consumer,或者是註冊一個JMS message listener

8:發送或者接受JMS message(s)

9:關閉所有的JMS資源(connection, session, producer, consumer等)

三、topic

3.1 關於持久化和非持久化消息

持久化消息:

這是ActiveMQ的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。

可靠性的另一個重要方面是確保持久性消息傳送至目標後,消息服務在向消費者傳送它們之前不會丟失這些消息。

這意味著在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。如果消息服務由於某種原因導致失敗,它可以恢復

此消息並將此消息傳送至相應的消費者。雖然這樣增加了消息傳送的開銷,但卻增加了可靠性。

非持久化消息:

保證這些消息最多被傳送一次。對於這些消息,可靠性並非主要的考慮因素。 此模式並不要求持久性的數據存儲,也不保證

消息服務由於某種原因導致失敗後消息不會丟失。 有兩種方法指定傳送模式:

1.使用setDeliveryMode 方法,這樣所有的消息都採用此傳送模式; 如:

<code>    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);/<code>

2.使用send 方法為每一條消息設置傳送模式

3.2 非持久的Topic消息

3.2.1 消息的發送

基本跟前面發送隊列信息是一樣的,只是把創建Destination的地方,由創建隊列替換成創建Topic,例如:

<code>Destination destination = session.createTopic("my-topic");/<code>

3.2.1 消息的接收

1:必須要接收方在線,然後客戶端再發送信息,接收方才能接收到消息

2:同樣把創建Destination的地方,由創建隊列替換成創建Topic,例如:

<code>Destination destination = session.createTopic("my-topic");/<code>

3:由於不知道客戶端發送多少信息,因此改成while循環的方式了,例如:

<code>Message message = consumer.receive();
while(message!=null) {
TextMessage txtMsg = (TextMessage)message;
System.out.println("收到消 息:" + txtMsg.getText());
message = consumer.receive(1000L);
}/<code>

3.3 持久的Topic消息

3.3.1 消息的發送

<code>/**持久的*/ 

public void test2() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.91.8:61616");
\t\t
Connection connection = connectionFactory.createConnection();
\t\t
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
\t\t
Destination destination = session.createTopic("my-topic");
\t\t
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 設置DeliveryMode.PERSISTENT模式
\t\t
connection.start();
\t\t
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message22--" + i);
\tproducer.send(message);
}
session.commit();
session.close();
connection.close();
}/<code>

1:要用持久化訂閱,發送消息者要用DeliveryMode.PERSISTENT模式發現,在連接之前設定

2:一定要設置完成後,再start這個connection

3.3.2 消息的接收

<code>public void test2() throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.91.8:61616");
Connection connection = cf.createConnection();
connection.setClientID("cc1");
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("my-topic");
TopicSubscriber ts = session.createDurableSubscriber(topic, "T1");
connection.start();
\t\t
Message message = ts.receive();
while(message!=null) {
TextMessage txtMsg = (TextMessage)message;
\tSystem.out.println("收到消息:" + txtMsg.getText());
\tmessage = ts.receive(1000L);

}
session.commit();
session.close();
connection.close();
}/<code>


1:需要在連接上設置消費者id,用來識別消費者

2:需要創建TopicSubscriber來訂閱

3:要設置好了過後再start 這個 connection

4:一定要先運行一次,等於向消息服務中間件註冊這個消費者,然後再運行客戶端發送信息,這個時候,無論消費者是否在線,

都會接收到,不在線的話,下次連接的時候,會把沒有收過的消息都接收下來。

關注我瞭解更多程序員資訊技術,領取豐富架構資料


分享到:


相關文章: