久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

activemq特性是什么

193次閱讀
沒有評論

共計 10362 個字符,預計需要花費 26 分鐘才能閱讀完成。

這篇文章主要介紹“activemq 特性是什么”,在日常操作中,相信很多人在 activemq 特性是什么問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”activemq 特性是什么”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

activemq 特點:用通配符訂閱多個 destination, 用組合發布多重 destionation
activemq 支持 destination 的層次結構【topic 和 queen】便于歸類和管理。
通配符有三個:
.  用來分隔路徑
* 用來匹配路徑中的一節
用來匹配任意節的路徑
opics: sport League . team  
。例如:football.division.leeds。如果 leeds 參加兩種運動 –Scccer 和 Rugby, 為了方便,我們希望通過一個消息消費者而看到 Leeds 兩種運動的最新戰績,這個時候,通配符就有用武之地了
.  : used to separate elements in the destination name
*  : used to match one element   
  : match one or all trailing elements
所以,對于上面的例子,你可以訂閱這樣的主題:*.*.Leeds
如果你想知道 division1 這個賽區的所有分數,你可以訂閱這個:soccer.division1.*
如果你想知道 Rugby 的分數:你可以訂閱這個:rugby. .
然而,通配符中是為消費者服務的,如果你發送了這樣的一個主題:rugby. .,這個消息僅會發送到命名了 rugby. . 的主題,并不是所有的主題都是以 rugby 開頭的。
這里有一種   方法,使消息生產者能將一條
消息發送到多個目的地。通過使用   composite destination。
將同一條消息發送到不同的目的地是很有用的。比如一個用來存儲信息的應用,會發送一條消息給隊列
同時也要將這條消息廣播給監控的所有系統。通常,你會通過用兩個 producer 發送兩次消息來達到這個目的。composite destination 就是用來解決這種情況的
例如,如果你創建了名子為:store.order.backoffice,store.order.warehouse 的 Queue, 這樣 就會發送同時兩個 Queue。
訂閱信息     解釋
PRICE.    Any price for any product on any exchange
PRICE.STOCK.    Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.*    Any stock price on NASDAQ
PRICE.STOCK.*.IBM    Any IBM stock price on any exchange
從 5.5 版本以后,可以自定義路徑分隔符:

  plugins
  …..
  destinationPathSeparatorPlugin/
  /plugins

此時 FOO.BAR.* 可以表示為 FOO/BAR/*
也可以通過 pathSeparator 屬性定義其他符號位路徑分隔符。
  public void subscribeToLeeds() throws JMSException {
  String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Topic allLeeds = session.createTopic(*.*.Leeds
  MessageConsumer consumer = session.createConsumer(allLeeds);
  Message result = consumer.receive();
  }
   11.1.2 發送一個 message 到多重 destinations
    發送相同的 message 到不同的 destination 上:案列發送一個 [queen,opic] 組合模式,默認的組合 destination 用,分隔
    列如 store.order.backoffice,store.order.warehouse
    String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Queue ordersDestination = session.createQueue(store.orders, topic://store.orders
  MessageProducer producer = session.createProducer(ordersDestination);
  Message order = session.createObjectMessage();
  producer.send(order);
11.2 通知消息
單的說就是實現了 ActiveMQ 的 broker 上各種操作的記錄跟蹤和通知。

使用這個功能,你可以實時的知道 broker 上

  創建或銷毀了連接,
  添加或刪除了生存者或消費者,
  添加或刪除了主題或隊列,
  有消息發送和接收,
  什么時候有慢消費者,
  什么時候有快生產者
  什么時候什么消息被丟棄
  什么時候 broker 被添加到集群(主從或是網絡連接)

這個機制是 ActiveMQ 對 JMS 協議的重要補充,也是基于 JMS 實現的 ActiveMQ 的可管理性的一部分。多個 ActiveMQ 的相互協調和互操作的基礎設置。
 String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
  MessageConsumer consumer = session.createConsumer(connectionAdvisory);
  ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
  DataStructure data = (DataStructure) message.getDataStructure();
  if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
  ConnectionInfo connectionInfo = (ConnectionInfo) data;
  System.out.println(Connection started: + connectionInfo);
  } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
  RemoveInfo removeInfo = (RemoveInfo) data;
  System.out.println(Connection stopped: + removeInfo.getObjectId());
  } else {
  System.err.println(Unknown message + data);
  }
      大多數 advisor 消息都是完整的對于 destiation,但是呢 advisorysupport 類有一些方法來決定監聽哪個 advisorytopic,你也能使用通配符 -
      String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  // Lets first create a Consumer to listen too
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  // Lets first create a Consumer to listen too
  Queue queue = session.createQueue(test.Queue
  MessageConsumer testConsumer = session.createConsumer(queue);
  // so lets listen for the Consumer starting/stoping
  Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
  MessageConsumer consumer = session.createConsumer(advisoryTopic);
  consumer.setMessageListener(new MessageListener() {
  public void onMessage(Message m) {
  ActiveMQMessage message = (ActiveMQMessage) m;
  try {
  System.out.println(Consumer Count = + m.getStringProperty( consumerCount));
  DataStructure data = (DataStructure) message.getDataStructure();
  if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
  ConsumerInfo consumerInfo = (ConsumerInfo) data;
  System.out.println(Consumer started: + consumerInfo);
  } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
  RemoveInfo removeInfo = (RemoveInfo) data;
  System.out.println(Consumer stopped: + removeInfo.getObjectId());
  } else {
  System.err.println(Unknown message + data);
  }
  } catch (JMSException e) {
  e.printStackTrace();
  }
  }
  });
  testConsumer.close();
      destinationPolicy
  policyMap
  policyEntries
  policyEntry topic= advisoryForSlowConsumers= true
  /policyEntry
  /policyEntries
  /policyMap
  /destinationPolicy

ActiveMQ 中,topic 只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時,每個持久訂閱者,都相當于一個持久化的 queue 的客戶端,它會收取所有消息。這種情況下存在兩個問題:

1.  同一應用內 consumer 端負載均衡的問題:同一個應用上的一個持久訂閱不能使用多個 consumer 來共同承擔消息處理功能。因為每個都會獲取所有消息。queue 模式可以解決這個問題,broker
端又不能將消息發送到多個應用端。所以,既要發布訂閱,又要讓消費者分組,這個功能 jms 規范本身是沒有的。
2.  同一應用內 consumer 端 failover 的問題:由于只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理消息了,系統的健壯性不高, 為了解決這兩個問題,ActiveMQ 中實現了虛擬
Topic 的功能。使用起來非常簡單。對于消息發布者來說,就是一個正常的 Topic,名稱以 VirtualTopic. 開頭。例如 VirtualTopic.TEST。對于消息接收端來說,是個隊列,不同應用里使用不同的前綴作為
隊列的名稱,即可表明自己的身份即可實現消費端應用分組。例如 Consumer.A.VirtualTopic.TEST,說明它是名稱為 A 的消費端,同理 Consumer.B.VirtualTopic.TEST 說明是一個名稱為 B 的客戶端。
可以在同一個應用里使用多個 consumer 消費此 queue,則可以實現上面兩個功能。又因為不同應用使用的 queue 名稱不同(前綴不同),所以不同的應用中都可以接收到全部的消息。每個客戶端相當于一個持久訂
閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
   
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection consumerConnection = connectionFactory.createConnection();
  consumerConnection.start();
   
  Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
  Queue consumerAQueue = consumerSessionA.createQueue(Consumer.A.VirtualTopic.orders
  MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
   
  Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
  Queue consumerBQueue = consumerSessionB.createQueue(Consumer.B.VirtualTopic.orders
  MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
   
  //setup the sender
  Connection senderConnection = connectionFactory.createConnection();
  senderConnection.start();
  Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Topic ordersDestination = senerSession.createTopic(VirtualTopic.orders
  MessageProducer producer = senerSession.createProducer(ordersDestination);
同樣 queue 名稱的消費者會平分所有消息。
從 queue 接收到的消息,message.getJMSDestination().toString()為 topic://VirtualTopic.TEST,即原始的 destination。消息的 persistent 屬性為 true,即每個相當于一個持久訂閱。
Virtual Topic 這個功能特性在 broker 上有個總開關,useVirtualTopics 屬性,默認為 true,設置為 false 即可關閉此功能。
當此功能開啟,并且使用了持久化的存儲時,broker 啟動的時候會從持久化存儲里拿到所有的 destinations 的名稱,如果名稱模式與 Virtual Topics 匹配,則把它們添加到系統的 Virtual Topics 列表中去。
當然,沒有顯式定義的 Virtual Topics,也可以直接使用的,系統會自動創建對應的實際 topic。
當有 consumer 訪問此 VirtualTopics 時,系統會自動創建持久化的 queue,并在每次 Topic 收到消息時,分發到具體的 queue。

可追溯”消費者,只對 Topic 有效,如果 consumer 是可追溯的,那么它可以獲取實例創建之前的消息。通常而言,訂閱者不可能獲取實例創建之前的消息,因為 broker 根本不知道它的存在。對于 broker 而言,如果
一個 Topic 通道創建,且有發布者發布消息 (Publisher), 那么 broker 將會在內存中(非持久化) 或者磁盤中 (持久化) 保存已經發布的消息,直到所有的訂閱者都消費者,才會清除原始消息內容。那么 retroactive
類型的訂閱者,就可以獲取這些原本不屬于自己但 broker 上還保存的舊消息,就像我們訂閱一種 Feed,可以立即獲取舊的內容列表一樣。如果此訂閱者不是 durable(耐久的),它可以獲取最近發布的一些消息;如果是 durable,它可以獲取存儲器中尚未刪除的所有的舊消息。[下文會詳細介紹 Topic 的數據轉發模型]
// 在 destinationUrl 中設置, 默認為 false
feedTopic?consumer.retroactive=true
在 broker 端,可以配置當前 Topic 默認為“可追溯的”,不過 Topic 并不會在此種情況下額外的保存消息,只不過表示訂閱者默認都是可追溯的而已。
!– 只對 topic 有效,默認為 false —
policyEntry topic= feedTopic alwaysRetroactive= true /
  String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Topic topic = session.createTopic(soccer.division1.leeds?consumer.retroactive=true
  MessageConsumer consumer = session.createConsumer(topic);
  Message result = consumer.receive();

 redeliveryPolicy
   consumer 使用的重發策略,當消息在 client 端處理失敗 (比如 onMessage 方法拋出異常,事務回滾等),將會觸發消息重發。對于 Broker 端,需要重發的消息將會被立即發送(如果 broker 端使用異步發送,
且發送隊列中還有其他消息,那么重發的消息可能不會被立即到達 Consumer)。我們通過此 Policy 配置最大重發次數、重發頻率等, 如果你的 Consumer 客戶端處于不良網絡環境中,可以適當調整相關參數。參數列表,
請參見(RedeliveryPolicy)
// 在 brokerUrl 中設置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6
 . redeliveryPolicy
RedelieveryPolicy policy=connection.getRedelieveryPolicy();
policy.setInitialRedelieveryDelay(500);
policy.setBackOffMultiplier(2)
policy.setUseExponentialBackOff(true)
policy.setMaximumRedelieveries(2)

DLQ- 死信隊列 (Dead Letter Queue) 用來保存處理失敗或者過期的消息。
出現以下情況時,消息會被 redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.

當一個消息被 redelivered 超過 maximumRedeliveries(缺省為 6 次,具體設置請參考后面的鏈接)次數時,會給 broker 發送一個 Poison ack,這個消息被認為是 a poison pill,這時 broker 會將這
消息發送到 DLQ,以便后續處理。缺省的死信隊列是 ActiveMQ.DLQ,如果沒有特別指定,死信都會被發送到這個隊列。缺省持久消息過期,會被送到 DLQ,非持久消息不會送到 DLQ 可以通過配置文件 (activemq.xml)
來調整死信發送策略
destinationPolicy

  policyMap

  policyEntries

  !— 設置所有隊列,使用,否則用隊列名稱 —

  policyEntry queue=

  deadLetterStrategy

  !–

  queuePrefix: 設置死信隊列前綴

  useQueueForQueueMessages: 設置使用隊列保存死信,還可以設置 useQueueForTopicMessages,使用 Topic 來保存死信

  —

  individualDeadLetterStrategy  queuePrefix= DLQ. useQueueForQueueMessages= true   processExpired= false processNonPersistent= false /

  /deadLetterStrategy

  /policyEntry

  /policyEntries

  /policyMap

  /destinationPolicy

  …

/broker

  在一個電子系統中可能接受來自不同供應商的各種訂單信息,不同類型的訂單走的流程不盡相同,為了快速處理各種不同的訂單完成不同的業務。特定義不同的路由 信息。根據路由信息的不同,將消息進行不同的處理。如果采用 ActiveMQ 那么最好采用 apache-camel 整合,使不同的消息根據不同的流程自動 處理到不同的隊列中去。

beans

broker brokerName= testBroker

transportConnectors

transportConnector uri= tcp://localhos:61616

/transportConnectors

import resource= camel.xml

/beans

到此,關于“activemq 特性是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計10362字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 张家界市| 微博| 绥芬河市| 乌兰浩特市| 万载县| 盖州市| 南川市| 措勤县| 温宿县| 古交市| 平武县| 张掖市| 永济市| 琼海市| 城口县| 岳阳县| 新余市| 隆德县| 桦甸市| 日照市| 平潭县| 来宾市| 临安市| 乐山市| 锦州市| 阜康市| 广东省| 乌鲁木齐县| 武胜县| 三门峡市| 平湖市| 新乡县| 东山县| 天峨县| 陇南市| 屏南县| 温宿县| 涪陵区| 龙口市| 赤水市| 若羌县|