久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spring Cloud Bus消息總線舉例分析

210次閱讀
沒有評論

共計 11131 個字符,預計需要花費 28 分鐘才能閱讀完成。

這篇文章主要介紹“Spring Cloud Bus 消息總線舉例分析”,在日常操作中,相信很多人在 Spring Cloud Bus 消息總線舉例分析問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spring Cloud Bus 消息總線舉例分析”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

Bus 實例演示

在分析 Bus 的實現之前,我們先來看兩個使用 Spring Cloud Bus 的簡單例子。

1. 所有節點的配置新增

Bus 的例子比較簡單,因為 Bus 的 AutoConfiguration 層都有了默認的配置,只需要引入消息中間件對應的 Spring Cloud Stream 以及 Spring Cloud Bus 依賴即可,之后所有啟動的應用都會使用同一個 Topic 進行消息的接收和發送。

Bus 對應的 Demo 已經放到了 github 上,  該 Demo 會模擬啟動 5 個節點,只需要對其中任意的一個實例新增配置項,所有節點都會新增該配置項。

訪問任意節點提供的 Controller 提供的獲取配置的地址(key 為 hangzhou):

curl -X GET  http://localhost:10001/bus/env?key=hangzhou

所有節點返回的結果都是 unknown,因為所有節點的配置中沒有 hangzhou 這個 key。

Bus 內部提供了 EnvironmentBusEndpoint 這個 Endpoint 通過 message broker 用來新增 / 更新配置。

訪問任意節點該 Endpoint  對應的  url: /actuator/bus-env?name=hangzhou value=alibaba 進行配置項的新增(比如訪問 node1 的 url):

curl -X POST  http://localhost:10001/actuator/bus-env?name=hangzhou value=alibaba  -H  content-type: application/json

然后再次訪問所有節點 /bus/env 獲取配置:

curl -X GET  http://localhost:10001/bus/env?key=hangzhou
unknown%
~ ?
$ curl -X GET  http://localhost:10002/bus/env?key=hangzhou 
unknown%
~ ?
$ curl -X GET  http://localhost:10003/bus/env?key=hangzhou 
unknown%
~ ?
$ curl -X GET  http://localhost:10004/bus/env?key=hangzhou 
unknown%
~ ?
$ curl -X GET  http://localhost:10005/bus/env?key=hangzhou 
unknown%
~ ?
$ curl -X POST  http://localhost:10001/actuator/bus-env?name=hangzhou value=alibaba  -H  content-type: application/json
~ ?
$ curl -X GET  http://localhost:10005/bus/env?key=hangzhou 
alibaba%
~ ?
$ curl -X GET  http://localhost:10004/bus/env?key=hangzhou 
alibaba%
~ ?
$ curl -X GET  http://localhost:10003/bus/env?key=hangzhou 
alibaba%
~ ?
$ curl -X GET  http://localhost:10002/bus/env?key=hangzhou 
alibaba%
~ ?
$ curl -X GET  http://localhost:10001/bus/env?key=hangzhou
alibaba%

可以看到,所有節點都新增了一個 key 為 hangzhou 的配置,且對應的 value 是 alibaba。這個配置項是通過 Bus 提供的  EnvironmentBusEndpoint 完成的。

這里引用 程序猿 DD 畫的一張圖片,Spring Cloud Config 配合 Bus 完成所有節點配置的刷新來描述之前的實例(本文實例不是刷新,而是新增配置,但是流程是一樣的):

Spring Cloud Bus 消息總線舉例分析

2. 部分節點的配置修改

比如在 node1 上指定 destination 為 rocketmq-bus-node2 (node2 配置了 spring.cloud.bus.id 為 rocketmq-bus-node2:10002,可以匹配上) 進行配置的修改:

curl -X POST  http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou value=xihu  -H  content-type: application/json

訪問 /bus/env 獲取配置(由于在 node1 上發送消息,Bus 也會對發送方的節點 node1 進行配置修改):

~ ?
$ curl -X POST  http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou value=xihu  -H  content-type: application/json
~ ?
$ curl -X GET  http://localhost:10005/bus/env?key=hangzhou 
alibaba%
~ ?
$ curl -X GET  http://localhost:10004/bus/env?key=hangzhou 
alibaba%
~ ?
$ curl -X GET  http://localhost:10003/bus/env?key=hangzhou 
alibaba%
~ ?
$ curl -X GET  http://localhost:10002/bus/env?key=hangzhou 
xihu%
~ ?
$ curl -X GET  http://localhost:10001/bus/env?key=hangzhou
xihu%

可以看到,只有 node1 和 node2 修改了配置,其余的 3 個節點配置未改變。

Bus 的實現 1. Bus 概念介紹 1)事件

Bus 中定義了遠程事件 RemoteApplicationEvent,該事件繼承了 Spring 的事件 ApplicationEvent,而且它目前有 4 個具體的實現:

Spring Cloud Bus 消息總線舉例分析

EnvironmentChangeRemoteApplicationEvent:遠程環境變更事件。主要用于接收一個 Map String,String 類型的數據并更新到 Spring 上下文中  Environment 中的事件。文中的實例就是使用這個事件并配合  EnvironmentBusEndpoint 和  EnvironmentChangeListener 完成的。

AckRemoteApplicationEvent:遠程確認事件。Bus 內部成功接收到遠程事件后會發送回 AckRemoteApplicationEvent 確認事件進行確認。

RefreshRemoteApplicationEvent: 遠程配置刷新事件。配合  @RefreshScope 以及所有的  @ConfigurationProperties 注解修飾的配置類的動態刷新。

UnknownRemoteApplicationEvent:遠程未知事件。Bus 內部消息體進行轉換遠程事件的時候如果發生異常會統一包裝成該事件。

Bus 內部還存在一個非 RemoteApplicationEvent 事件 -SentApplicationEvent 消息發送事件,配合 Trace 進行遠程消息發送的記錄。

這些事件會配合 ApplicationListener 進行操作,比如 EnvironmentChangeRemoteApplicationEvent 配了 EnvironmentChangeListener 進行配置的新增 / 修改:

public class EnvironmentChangeListener
 implements ApplicationListener EnvironmentChangeRemoteApplicationEvent  { private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);
 @Autowired
 private EnvironmentManager env;
 @Override
 public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) { Map String, String  values = event.getValues();
 log.info( Received remote environment change request. Keys/values to update  
 + values);
 for (Map.Entry String, String  entry : values.entrySet()) { env.setProperty(entry.getKey(), entry.getValue());
 }
 }
}

收到其它節點發送來 EnvironmentChangeRemoteApplicationEven 事件之后調用 EnvironmentManager#setProperty 進行配置的設置,該方法內部針對每一個配置項都會發送一個 EnvironmentChangeEvent 事件,然后被 ConfigurationPropertiesRebinder 所監聽,進行 rebind 操作新增 / 更新配置。

2)Actuator Endpoint

Bus 內部暴露了 2 個 Endpoint,分別是 EnvironmentBusEndpoint 和 RefreshBusEndpoint,進行配置的新增 / 修改以及全局配置刷新。它們對應的 Endpoint id 即 url 是  bus-env 和 bus-refresh。

3)配置

Bus 對于消息的發送必定涉及到 Topic、Group 之類的信息,這些內容都被封裝到了 BusProperties 中,其默認的配置前綴為 spring.cloud.bus,比如:

spring.cloud.bus.refresh.enabled 用于開啟 / 關閉全局刷新的 Listener。

spring.cloud.bus.env.enabled 用于開啟 / 關閉配置新增 / 修改的 Endpoint。

spring.cloud.bus.ack.enabled 用于開啟開啟 / 關閉 AckRemoteApplicationEvent 事件的發送。

spring.cloud.bus.trace.enabled 用于開啟 / 關閉息記錄 Trace 的 Listener。

消息發送涉及到的 Topic 默認用的是 springCloudBus,可以配置進行修改,Group 可以設置成廣播模式或使用 UUID 配合 offset 為 lastest 的模式。

每個 Bus 應用都有一個對應的 Bus id,官方取值方式較復雜:

${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}

建議手動配置 Bus id,因為 Bus 遠程事件中的 destination 會根據 Bus id 進行匹配:

spring.cloud.bus.id=${spring.application.name}-${server.port}

2. Bus 底層分析

Bus 的底層分析無非牽扯到這幾個方面:

消息是如何發送的

消息是如何接收的

destination 是如何匹配的

遠程事件收到后如何觸發下一個 action

BusAutoConfiguration 自動化配置類被 @EnableBinding(SpringCloudBusClient.class)所修飾。

@EnableBinding 的用法在文章《Spring Cloud Stream 體系及原理介紹》中已經說明,且它的 value 為 SpringCloudBusClient.class,會在 SpringCloudBusClient 中基于代理創建出 input 和 output 的 DirectChannel:

public interface SpringCloudBusClient {
 String INPUT =  springCloudBusInput 
 String OUTPUT =  springCloudBusOutput 
 @Output(SpringCloudBusClient.OUTPUT)
 MessageChannel springCloudBusOutput();
 @Input(SpringCloudBusClient.INPUT)
 SubscribableChannel springCloudBusInput();}

springCloudBusInput 和 springCloudBusOutput 這兩個 Binding 的屬性可以通過配置文件進行修改(比如修改 topic):

spring.cloud.stream.bindings:
 springCloudBusInput:
 destination: my-bus-topic
 springCloudBusOutput:
 destination: my-bus-topic

消息的接收和發送:

// BusAutoConfiguration
@EventListener(classes = RemoteApplicationEvent.class) // 1
public void acceptLocal(RemoteApplicationEvent event) { if (this.serviceMatcher.isFromSelf(event)
   !(event instanceof AckRemoteApplicationEvent)) { // 2
 this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3
 }
@StreamListener(SpringCloudBusClient.INPUT) // 4
public void acceptRemote(RemoteApplicationEvent event) { if (event instanceof AckRemoteApplicationEvent) { if (this.bus.getTrace().isEnabled()   !this.serviceMatcher.isFromSelf(event)
   this.applicationEventPublisher != null) { // 5
 this.applicationEventPublisher.publishEvent(event);
 }
 // If it s an ACK we are finished processing at this point
 return;
 }
 if (this.serviceMatcher.isForSelf(event)
   this.applicationEventPublisher != null) { // 6
 if (!this.serviceMatcher.isFromSelf(event)) { // 7
 this.applicationEventPublisher.publishEvent(event);
 }
 if (this.bus.getAck().isEnabled()) { // 8
 AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
 this.serviceMatcher.getServiceId(),
 this.bus.getAck().getDestinationService(),
 event.getDestinationService(), event.getId(), event.getClass());
 this.cloudBusOutboundChannel
 .send(MessageBuilder.withPayload(ack).build());
 this.applicationEventPublisher.publishEvent(ack);
 }
 }
 if (this.bus.getTrace().isEnabled()   this.applicationEventPublisher != null) { // 9
 // We are set to register sent events so publish it for local consumption,
 // irrespective of the origin
 this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
 event.getOriginService(), event.getDestinationService(),
 event.getId(), event.getClass()));
 }
}

利用 Spring 事件的監聽機制監聽本地所有的 RemoteApplicationEvent 遠程事件(比如 bus-env 會在本地發送 EnvironmentChangeRemoteApplicationEvent 事件,bus-refresh 會在本地發送 RefreshRemoteApplicationEvent 事件,這些事件在這里都會被監聽到)。

判斷本地接收到的事件不是 AckRemoteApplicationEvent 遠程確認事件 (不然會死循環,一直接收消息,發送消息 …) 以及該事件是應用自身發送出去的(事件發送方是應用自身),如果都滿足執行步驟 3。

構造 Message 并將該遠程事件作為 payload,然后使用 Spring Cloud Stream 構造的 Binding name 為 springCloudBusOutput 的 MessageChannel 將消息發送到 broker。

4.@StreamListener 注解消費 Spring Cloud Stream 構造的 Binding name 為 springCloudBusInput 的 MessageChannel,接收的消息為遠程消息。

如果該遠程事件是 AckRemoteApplicationEvent 遠程確認事件并且應用開啟了消息追蹤 trace 開關,同時該遠程事件不是應用自身發送的(事件發送方不是應用自身,表示事件是其它應用發送過來的),那么本地發送 AckRemoteApplicationEvent 遠程確認事件表示應用確認收到了其它應用發送過來的遠程事件,流程結束。

如果該遠程事件是其它應用發送給應用自身的(事件的接收方是應用自身),那么進行步驟 7 和 8,否則執行步驟 9。

該遠程事件不是應用自身發送 (事件發送方不是應用自身) 的話,將該事件以本地的方式發送出去。應用自身一開始已經在本地被對應的消息接收方處理了,無需再次發送。

如果開啟了 AckRemoteApplicationEvent 遠程確認事件的開關,構造 AckRemoteApplicationEvent 事件并在遠程和本地都發送該事件(本地發送是因為步驟 5 沒有進行本地 AckRemoteApplicationEvent 事件的發送,也就是自身應用對自身應用確認; 遠程發送是為了告訴其它應用,自身應用收到了消息)。

如果開啟了消息記錄 Trace 的開關,本地構造并發送 SentApplicationEvent 事件。

Spring Cloud Bus 消息總線舉例分析

bus-env 觸發后所有節點的 EnvironmentChangeListener 監聽到了配置的變化,控制臺都會打印出以下信息:

o.s.c.b.event.EnvironmentChangeListener : Received remote environment change request. Keys/values to update {hangzhou=alibaba}

如果在本地監聽遠程確認事件 AckRemoteApplicationEvent,都會收到所有節點的信息,比如 node5 節點的控制臺監聽到的 AckRemoteApplicationEvent 事件如下:

ServiceId [rocketmq-bus-node5:10005] listeners on {type : AckRemoteApplicationEvent , timestamp :1554124670484, originService : rocketmq-bus-node5:10005 , destinationService : ** , id : 375f0426-c24e-4904-bce1-5e09371fc9bc , ackId : 750d033f-356a-4aad-8cf0-3481ace8698c , ackDestinationService : ** , event : org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent}
ServiceId [rocketmq-bus-node5:10005] listeners on {type : AckRemoteApplicationEvent , timestamp :1554124670184, originService : rocketmq-bus-node1:10001 , destinationService : ** , id : 91f06cf1-4bd9-4dd8-9526-9299a35bb7cc , ackId : 750d033f-356a-4aad-8cf0-3481ace8698c , ackDestinationService : ** , event : org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent}
ServiceId [rocketmq-bus-node5:10005] listeners on {type : AckRemoteApplicationEvent , timestamp :1554124670402, originService : rocketmq-bus-node2:10002 , destinationService : ** , id : 7df3963c-7c3e-4549-9a22-a23fa90a6b85 , ackId : 750d033f-356a-4aad-8cf0-3481ace8698c , ackDestinationService : ** , event : org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent}
ServiceId [rocketmq-bus-node5:10005] listeners on {type : AckRemoteApplicationEvent , timestamp :1554124670406, originService : rocketmq-bus-node3:10003 , destinationService : ** , id : 728b45ee-5e26-46c2-af1a-e8d1571e5d3a , ackId : 750d033f-356a-4aad-8cf0-3481ace8698c , ackDestinationService : ** , event : org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent}
ServiceId [rocketmq-bus-node5:10005] listeners on {type : AckRemoteApplicationEvent , timestamp :1554124670427, originService : rocketmq-bus-node4:10004 , destinationService : ** , id : 1812fd6d-6f98-4e5b-a38a-4b11aee08aeb , ackId : 750d033f-356a-4aad-8cf0-3481ace8698c , ackDestinationService : ** , event : org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent}

那么回到本章節開頭提到的 4 個問題,我們分別做一下解答:

消息是如何發送的: 在 BusAutoConfiguration#acceptLocal 方法中通過 Spring Cloud Stream 發送事件到 springCloudBustopic 中。

消息是如何接收的: 在 BusAutoConfiguration#acceptRemote 方法中通過 Spring Cloud Stream 接收 springCloudBustopic 的消息。

destination 是如何匹配的:  在 BusAutoConfiguration#acceptRemote 方法中接收遠程事件方法里對 destination 進行匹配。

遠程事件收到后如何觸發下一個 action: Bus 內部通過 Spring 的事件機制接收本地的 RemoteApplicationEvent 具體的實現事件再做下一步的動作(比如 EnvironmentChangeListener 接收了 EnvironmentChangeRemoteApplicationEvent 事件,RefreshListener 接收了 RefreshRemoteApplicationEvent 事件)。

到此,關于“Spring Cloud Bus 消息總線舉例分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-25發表,共計11131字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 宁波市| 仁寿县| 衡山县| 江口县| 阿合奇县| 逊克县| 社旗县| 安乡县| 绩溪县| 吉林省| 靖远县| 杨浦区| 睢宁县| 彝良县| 旌德县| 城口县| 康保县| 萨嘎县| 汝阳县| 蒙城县| 贵定县| 安溪县| 永兴县| 碌曲县| 安新县| 五峰| 高台县| 白沙| 纳雍县| 禄丰县| 定南县| 广丰县| 赤城县| 咸丰县| 黎川县| 沾益县| 巫溪县| 中宁县| 吉安市| 弥渡县| 新巴尔虎右旗|