久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

RxJava線程切換過程是怎樣的

164次閱讀
沒有評論

共計 7836 個字符,預計需要花費 20 分鐘才能閱讀完成。

今天就跟大家聊聊有關 RxJava 線程切換過程是怎樣的,可能很多人都不太了解,為了讓大家更加了解,丸趣 TV 小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

線程切換過程

下面我們就來看看它的又一利器,調度器 Scheduler:就像我們所知道的,Scheduler 是給 Observable 數據流添加多線程功能所準備的,一般我們會通過使用 subscribeOn()、observeOn() 方法傳入對應的 Scheduler 去指定數據流的每部分操作應該以何種方式運行在何種線程。對于我們而言,最常見的莫過于在非主線程獲取并處理數據之后在主線程更新 UI 這樣的場景了:

這是我們十分常見的調用方法,一氣呵成就把不同線程之間的處理都搞定了,因為是鏈式所以結構也很清晰,我們現在來看看這其中的線程切換流程。

subscribeOn()

當我們調用 subscribeOn() 的時候:

可以看到這里也是調用了 create() 去生成一個 Observable,而 OperatorSubscribeOn 則是實現了 OnSubscribe 接口,同時將原始的 Observable 和我們需要的 scheduler 傳入:

可以看出來,這里對 subscriber 的處理與前文中 OperatorMap 中 call() 對 subscriber 的處理很相似。在這里我們同樣會根據傳入的 subscriber 構造出新的 Subscribers,不過這一系列的過程大部分都是由 worker 通過 schedule() 去執行的,從后面 setProducer() 中對于線程的判斷,再結合 subscribeOn() 方法的目的我們能大概推測出,這個 worker 在一定程度上就相當于一個新線程的代理執行者,schedule() 所實現的與 Thread 類中 run() 應該十分類似。我們現在來看看這個 worker 的執行過程。
首先從 Schedulers.io() 進入:

這個通過 hook 拿到 scheduler 的過程我們先不管,直接進 CachedThreadScheduler,看它的 createWorker() 方法:

這里的 pool 是一個原子變量引用 AtomicReference,所持有的則是 CachedWorkerPool,因而這個 pool 顧名思義就是用來保存 worker 的緩存池啦,我們從緩存池里拿到需要的 worker 并作了一層封裝成為 EventLoopWorker:

在這里我們終于發現目標 ThreadWorker,它繼承自 NewThreadWorker,之前的 schedule() 方法最終都會到這個 scheduleActual() 方法里:

這里我們看到了 executor 線程池,我們用 Schedulers.io() 最終實現的線程切換的本質就在這里了。現在再結合之前的過程我們從頭梳理一下:

在 subscribeOn() 時,我們會新生成一個 Observable,它的成員 onSubscribe 會在目標 Subscriber 訂閱時使用傳入的 Scheduler 的 worker 作為線程調度執行者,在對應的線程中通知原始 Observable 發送消息給這個過程中臨時生成的 Subscriber,這個 Subscriber 又會通知到目標 Subscriber,這樣就完成了 subscribeOn() 的過程。

observeOn()

下面我們接著來看看 observeOn():

我們直接看最終調用的部分,可以看到這里又是一個 lift(),在這里傳入了 OperatorObserveOn,它與 OperatorSubscribeOn 不同,是一個 Operator(Operator 的功能我們上文中已經講過就不贅述了),它構造出了新的觀察者 ObserveOnSubscriber 并實現了 Action0 接口:

可以看出來,這里 ObserveOnSubscriber 所有的發送給目標 Subscriber child 的消息都被切換到了 recursiveScheduler 的線程作處理,也就達到了將線程切回的目的。

總結 observeOn() 整體流程如下:

對比 subscribeOn() 和 observeOn() 這兩個過程,我們不難發現兩者的區別:subscribeOn() 將初始 Observable 的訂閱事件整體都切換到了另一個線程;而 observeOn() 則是將初始 Observable 發送的消息切換到另一個線程通知到目標 Subscriber。前者把“訂閱 + 發送”的切換了一個線程,后者把“發送”切換了一個線程。所以,我們的代碼中所實現的功能其實是:

這樣就能很容易實現耗時任務在子線程操作,在主線程作更新操作等這些常見場景的功能啦。

4. 其他角色

Subject
Subject 在 Rx 系列是一個比較特殊的角色,它繼承了 Observable 的同時也實現了 Observer 接口,也就是說它既可作為觀察者,也可作為被觀察者,他一般被用來作為連接多個不同 Observable、Observer 之間的紐帶。可能你會奇怪,我們不是已經有了像 map()、flatMap() 這類的操作符去變化 Observable 數據流了嗎,為什么還要引入 Subject 這個東西呢?這是因為 Subject 所承擔的工作并非是針對 Observable 數據流內容的轉換連接,而是數據流本身在 Observable、Observer 之間的調度。光這么說可能還是很模糊,我們舉個《RxJava Essentials》中的例子:

我們通過 create() 創建了一個 PublishSubject,觀察者成功訂閱了這個 subject,然而這個 subject 卻沒有任何數據要發送,我們只是知道他未來會發送的會是 String 值而已。之后,當我們調用 subject.onNext() 時,消息才被發送,Observer 的 onNext() 被觸發調用,輸出了 Hello World。

這里我們注意到,當訂閱事件發生時,我們的 subject 是沒有產生數據流的,直到它發射了 Hello World,數據流才開始運轉,試想我們如果將訂閱過程和 subject.onNext() 調換一下位置,那么 Observer 就一定不會接受到 Hello World 了(這不是廢話嗎 - -|||),因而這也在根本上反映了 Observable 的冷熱區別。

一般而言,我們的 Observable 都屬于 Cold Observables,就像看視頻,每次點開新視頻我們都要從頭開始播放;而 Subject 則默認屬于 Hot Observables,就像看直播,視頻數據永遠都是新的。
基于這種屬性,Subject 自然擁有了對接收到的數據流進行選擇調度等的能力了,因此,我們對于 Subject 的使用也就通常基于如下的思路:

在前面的例子里我們用到的是 PublishSubject,它只會把在訂閱發生的時間點之后來自原始 Observable 的數據發射給觀察者。等一下,這功能聽起來是不是有些似曾相識呢?

沒錯,就是 EventBus 和 Otto。(RxJava 的出現慢慢讓 Otto 退出了舞臺,現在 Otto 的 Repo 已經是 Deprecated 狀態了,而 EventBus 依舊堅挺)基于 RxJava 的觀察訂閱取消的能力和 PublishSubject 的功能,我們十分容易就能寫出實現了最基本功能的簡易事件總線框架:

當然 Subject 還有其他如 BehaviorSubject、ReplaySubject、AsyncSubject 等類型,大家可以去看官方文檔,寫得十分詳細,這里就不介紹了。

三. 后記

前面相信最近這段日子里,提到 RxJava,大家就會想到 Google 最近剛剛開源的 Agera。Agera 作為專門為 Android 打造的 Reactive Programming 框架,難免會被拿來與 RxJava 做對比。本文前面 RxJava 的主體流程分析已近尾聲,現在我們再來看看 Agera 這東東又是怎么一回事。

首先先上結論:

Agera 最初是為了 Google Play Movies 而開發的一個內部框架,現在開源出來了,它雖然是在 RxJava 之后才出現,但是完全獨立于 RxJava,與它沒有任何關系(只不過開源的時間十分微妙罷了 233333)。與 RxJava 比起來,Agera 更加專注于 Android 的生命周期,而 RxJava 則更加純粹地面向 Java 平臺而非 Android。

也許你可能會問:“那么 RxAndroid 呢,不是還有它嗎?”事實上,RxAndroid 早在 1.0 版本的時候就進行了很大的重構,很多模塊被拆分到其他的項目中去了,同時也刪除了部分代碼,僅存下來的部分多是和 Android 線程相關的部分,比如 AndroidSchedulers、MainThreadSubscription 等。鑒于這種情況,我們暫且不去關注 RxAndroid,先把目光放在 Agera 上。

同樣也是基于觀察者模式,Agera 和 RxJava 的角色分類大致相似,在 Agera 中,主要角色有兩個:Observable(被觀察者)、Updatable(觀察者)。

是的,相較于 RxJava 中的 Observable,Agera 中的 Observable 只是一個簡單的接口,也沒有范性的存在,Updatable 亦是如此,這樣我們要如何做到消息的傳遞呢?這就需要另外一個接口了:

終于看到了泛型 T,我們的消息的傳遞能力就是依賴于此接口了。所以我們將這個接口和基礎的 Observable 結合一下:

這里的 Repository T 在一定程度上就是我們想要的 RxJava 中的 Observable T 啦。類似地,Repository 也有兩種類型的實現:

Direct – 所包含的數據總是可用的或者是可被同步計算出來的;一個 Direct 的 Repository 總是處于活躍(active)狀態下

Deferred – 所包含的數據是異步計算或拉去所得;一個 Deffered 的 Repository 直到有 Updatable 被添加進來之前都會是非活躍(inactive)狀態下
是不是感到似曾相識呢?沒錯,Repository 也是有冷熱區分的,不過我們現在暫且不去關注這一點。回到上面接著看,既然現在發數據的角色有了,那么我們要如何接收數據呢?答案就是 Receiver:

相信看到這里,大家應該也隱約感覺到了:在 Agera 的世界里,數據的傳輸與事件的傳遞是相互隔離開的,這是目前 Agera 與 Rx 系列的最大本質區別。Agera 所使用的是一種 push event, pull data 的模型,這意味著 event 并不會攜帶任何 data,Updatable 在需要更新時,它自己會承擔起從數據源拉取數據的任務。這樣,提供數據的責任就從 Observable 中拆分了出來交給了 Repository,讓其自身能夠專注于發送一些簡單的事件如按鈕點擊、一次下拉刷新的觸發等等。

那么,這樣的實現有什么好處呢?

當這兩種處理分發邏輯分離開時,Updatable 就不必觀察到來自 Repository 的完整數據變化的歷史,畢竟在大多數場景下,尤其是更新 UI 的場景下,最新的數據往往才是有用的數據。

但是我就是需要看到變化的歷史數據,怎么辦?

不用擔心,這里我們再請出一個角色 Reservoir:

顧名思義,Reservoir 就是我們用來存儲變化中的數據的地方,它繼承了 Receiver、Repository,也就相當于同時具有了接收數據,發送數據的能力。通過查看其具體實現我們可以知道它的本質操作都是使用內部的 Queue 實現的:通過 accept() 接收到數據后入列,通過 get() 拿到數據后出列。若一個 Updatable 觀察了此 Reservoir,其隊列中發生調度變化后即將出列的下一個數據如果是可用的(非空),就會通知該 Updatable,進一步拉取這個數據發送給 Receiver。

現在,我們已經大概了解了這幾個角色的功能屬性了,接下來我們來看一段官方示例代碼:

是不是有些云里霧里的感覺呢?多虧有注釋,我們大概能夠猜出到底上面都做了什么:使用需要的圖片規格作為參數拼接到 url 中,拉取對應的圖片并用 ImageView 顯示出來。我們結合 API 來看看整個過程:

Repositories.repositoryWithInitialValue(Result.absent())
創建一個可運行(抑或說執行)的 repository。
初始化傳入值是 Result,它用來概括一些諸如 apply()、merge() 的操作的結果的不可變對象,并且存在兩種狀態 succeeded()、failed()。
返回 REventSource

observe()
用于添加新的 Observable 作為更新我們的圖片的 Event source,本例中不需要。
返回 RFrequency

onUpdatesPerLoop()
在每一個 Looper Thread loop 中若有來自多個 Event Source 的 update() 處理時,只需開啟一個數據處理流。
返回 RFlow

getFrom(new Supplier(…))
忽略輸入值,使用來自給定 Supplier 的新獲取的數據作為輸出值。
返回 RFlow

goTo(executor)
切換到給定的 executor 繼續數據處理流。

attemptTransform(function())
使用給定的 function() 變換輸入值,若變換失敗,則終止數據流;若成功,則取新的變換后的值作為當前流指令的輸出。
返回 RTermination

orSkip()
若前面的操作檢查為失敗,就跳過剩下的數據處理流,并且不會通知所有已添加的 Updatable。

thenTransform(function())
與 attemptTransform(function()) 相似,區別在于當必要時會發出通知。
返回 RConfig

onDeactivation(SEND_INTERRUPT)
用于明確 repository 不再 active 時的行為。
返回 RConfig

compile()
執行這個 repository。
返回 Repository

整體流程乍看起來并沒有什么特別的地方,但是真正的玄機其實藏在執行每一步的返回值里:
初始的 REventSource T, T 代表著事件源的開端,它從傳入值接收了 T initialValue,這里的中,第一個 T 是當前 repository 的數據的類型,第二個 T 則是數據處理流開端的時候的數據的類型。

之后,當 observe() 調用后,我們傳入事件源給 REventSource,相當于設定好了需要的事件源和對應的開端,這里返回的是 RFrequency T, T,它繼承自 REventSource,為其添加了事件源的發送頻率的屬性。

之后,我們來到了 onUpdatesPerLoop(),這里明確了所開啟的數據流的個數(也就是前面所講的頻率)后,返回了 RFlow,這里也就意味著我們的數據流正式生成了。同時,這里也是流式調用的起點。

拿到我們的 RFlow 之后,我們就可以為其提供數據源了,也就是前面說的 Supplier,于是調用 getFrom(),這樣我們的數據流也就真正意義擁有了數據“干貨”。

有了數據之后我們就可以按具體需要進行數據轉換了,這里我們可以直接使用 transform(),返回 RFlow,以便進一步進行流式調用;也可以調用 attemptTransform() 來對可能出現的異常進行處理,比如 orSkip()、orEnd() 之后繼續進行流式調用。

經過一系列的流式調用之后,我們終于對數據處理完成啦,現在我們可以選擇先對成型的數據在做一次最后的包裝 thenTransform(),或是與另一個 Supplier 合并 thenMergeIn() 等。這些處理之后,我們的返回值也就轉為了 RConfig,進入了最終配置和 repository 聲明結束的狀態。
在最終的這個配置過程中,我們調用了 onDeactivation(),為這個 repository 明確了最終進入非活躍狀態時的行為,如果不需要其他多余的配置的話,我們就可以進入最終的 compile() 方法了。當我們調用 compile() 時,就會按照前面所走過的所有流程與配置去執行并生成這個 repository。到此,我們的 repository 才真正被創建了出來。

以上就是 repository 從無到有的全過程。當 repository 誕生后,我們也就可以傳輸需要的數據啦。再回到上面的示例代碼:

我們在 onResume()、onPause() 這兩個生命周期下分別添加、移除了 Updatable。相較于 RxJava 中通過 Subscription 去取消訂閱的做法,Agera 的這種寫法顯然更為清晰也更為整潔。我們的 Activity 實現了 Updatable 和 Receiver 接口,直接看其實現方法:

可以看到這里 repository 將數據發送給了 receiver,也就是自己,在對應的 accept() 方法中接收到我們想要的 bitmap 后,這張圖片也就顯示出來了,示例代碼中的完整流程也就結束了。

總結一下上述過程:

首先 Repositories.repositoryWithInitialValue() 生成原點 REventSource。

配置完 Observable 之后進入 RFrequency 狀態,接著配置數據流的流數。

前面配置完成后,數據流 RFlow 生成,之后通過 getFrom()、mergeIn()、transform() 等方法可進一步進行流式調用;也可以使用 attemptXXX() 方法代替原方法,后面接著調用 orSkip()、orEnd() 進行 error handling 處理。當使用 attemptXXX() 方法時,數據流狀態會變為 RTermination,它代表此時的狀態已具有終結數據流的能力,是否終結數據流要根據 failed check 觸發,結合后面跟著調用的 orSkip()、orEnd(),我們的數據流會從 RTermination 再次切換為 RFlow,以便進行后面的流式調用。

經過前面一系列的流式處理,我們需要結束數據流時,可以選擇調用 thenXXX() 方法,對數據流進行最終的處理,處理之后,數據流狀態會變為 RConfig;也可以為此行為添加 error handling 處理,選擇 thenAttemptXXX() 方法,后面同樣接上 orSkip()、orEnd() 即可,最終數據流也會轉為 Rconfig 狀態。

此時,我們可以在結束前按需要選擇對數據流進行最后的配置,例如:調用 onDeactivation() 配置從“訂閱”到“取消訂閱”的過程是否需要繼續執行數據流等等。

一切都部署完畢后,我們 compile() 這個 RConfig,得到最終的成型的 Repository,它具有添加 Updatable、發送數據通知 Receiver 的能力。

我們根據需要添加 Updatable,repository 在數據流處理完成后會通過 update() 發送 event 通知 Updatable。

Updatable 收到通知后則會拉取 repository 的成果數據,并將數據通過 accept() 發送給 Receiver。完成 Push event,pull data 的流程。

看完上述內容,你們對 RxJava 線程切換過程是怎樣的有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注丸趣 TV 行業資訊頻道,感謝大家的支持。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計7836字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 白河县| 陆川县| 霍城县| 丁青县| 济南市| 揭东县| 乌拉特中旗| 和平区| 昭通市| 福州市| 甘谷县| 监利县| 林西县| 灵川县| 兰溪市| 岳普湖县| 双流县| 澳门| 廊坊市| 合作市| 成武县| 遵义市| 永清县| 临汾市| 寻乌县| 泌阳县| 台山市| 乐至县| 若羌县| 铜川市| 克拉玛依市| 濮阳市| 桐城市| 霍邱县| 无极县| 荣昌县| 习水县| 卓资县| 宣威市| 中超| 洪雅县|