共計 7836 個字符,預計需要花費 20 分鐘才能閱讀完成。
今天就跟大家聊聊有關 RxJava 線程切換過程是怎樣的,可能很多人都不太了解,為了讓大家更加了解,丸趣 TV 小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
線程切換過程
下面我們就來看看它的又一利器,調度器 Scheduler:就像我們所知道的,Scheduler 是給 Observable 數據流添加多線程功能所準備的,一般我們會通過使用 subscribeOn()、observeOn() 方法傳入對應的 Scheduler 去指定數據流的每部分操作應該以何種方式運行在何種線程。對于我們而言,最常見的莫過于在非主線程獲取并處理數據之后在主線程更新 UI 這樣的場景了:
這是我們十分常見的調用方法,一氣呵成就把不同線程之間的處理都搞定了,因為是鏈式所以結構也很清晰,我們現在來看看這其中的線程切換流程。
subscribeOn()
當我們調用 subscribeOn() 的時候:
可以看到這里也是調用了 create() 去生成一個 Observable,而 OperatorSubscribeOn 則是實現了 OnSubscribe 接口,同時將原始的 Observable 和我們需要的 scheduler 傳入:
可以看出來,這里對 subscriber 的處理與前文中 OperatorMap 中 call() 對 subscriber 的處理很相似。在這里我們同樣會根據傳入的 subscriber 構造出新的 Subscribers,不過這一系列的過程大部分都是由 worker 通過 schedule() 去執行的,從后面 setProducer() 中對于線程的判斷,再結合 subscribeOn() 方法的目的我們能大概推測出,這個 worker 在一定程度上就相當于一個新線程的代理執行者,schedule() 所實現的與 Thread 類中 run() 應該十分類似。我們現在來看看這個 worker 的執行過程。
首先從 Schedulers.io() 進入:
這個通過 hook 拿到 scheduler 的過程我們先不管,直接進 CachedThreadScheduler,看它的 createWorker() 方法:
這里的 pool 是一個原子變量引用 AtomicReference,所持有的則是 CachedWorkerPool,因而這個 pool 顧名思義就是用來保存 worker 的緩存池啦,我們從緩存池里拿到需要的 worker 并作了一層封裝成為 EventLoopWorker:
在這里我們終于發現目標 ThreadWorker,它繼承自 NewThreadWorker,之前的 schedule() 方法最終都會到這個 scheduleActual() 方法里:
這里我們看到了 executor 線程池,我們用 Schedulers.io() 最終實現的線程切換的本質就在這里了。現在再結合之前的過程我們從頭梳理一下:
在 subscribeOn() 時,我們會新生成一個 Observable,它的成員 onSubscribe 會在目標 Subscriber 訂閱時使用傳入的 Scheduler 的 worker 作為線程調度執行者,在對應的線程中通知原始 Observable 發送消息給這個過程中臨時生成的 Subscriber,這個 Subscriber 又會通知到目標 Subscriber,這樣就完成了 subscribeOn() 的過程。
observeOn()
下面我們接著來看看 observeOn():
我們直接看最終調用的部分,可以看到這里又是一個 lift(),在這里傳入了 OperatorObserveOn,它與 OperatorSubscribeOn 不同,是一個 Operator(Operator 的功能我們上文中已經講過就不贅述了),它構造出了新的觀察者 ObserveOnSubscriber 并實現了 Action0 接口:
可以看出來,這里 ObserveOnSubscriber 所有的發送給目標 Subscriber child 的消息都被切換到了 recursiveScheduler 的線程作處理,也就達到了將線程切回的目的。
總結 observeOn() 整體流程如下:
對比 subscribeOn() 和 observeOn() 這兩個過程,我們不難發現兩者的區別:subscribeOn() 將初始 Observable 的訂閱事件整體都切換到了另一個線程;而 observeOn() 則是將初始 Observable 發送的消息切換到另一個線程通知到目標 Subscriber。前者把“訂閱 + 發送”的切換了一個線程,后者把“發送”切換了一個線程。所以,我們的代碼中所實現的功能其實是:
這樣就能很容易實現耗時任務在子線程操作,在主線程作更新操作等這些常見場景的功能啦。
4. 其他角色
Subject
Subject 在 Rx 系列是一個比較特殊的角色,它繼承了 Observable 的同時也實現了 Observer 接口,也就是說它既可作為觀察者,也可作為被觀察者,他一般被用來作為連接多個不同 Observable、Observer 之間的紐帶。可能你會奇怪,我們不是已經有了像 map()、flatMap() 這類的操作符去變化 Observable 數據流了嗎,為什么還要引入 Subject 這個東西呢?這是因為 Subject 所承擔的工作并非是針對 Observable 數據流內容的轉換連接,而是數據流本身在 Observable、Observer 之間的調度。光這么說可能還是很模糊,我們舉個《RxJava Essentials》中的例子:
我們通過 create() 創建了一個 PublishSubject,觀察者成功訂閱了這個 subject,然而這個 subject 卻沒有任何數據要發送,我們只是知道他未來會發送的會是 String 值而已。之后,當我們調用 subject.onNext() 時,消息才被發送,Observer 的 onNext() 被觸發調用,輸出了 Hello World。
這里我們注意到,當訂閱事件發生時,我們的 subject 是沒有產生數據流的,直到它發射了 Hello World,數據流才開始運轉,試想我們如果將訂閱過程和 subject.onNext() 調換一下位置,那么 Observer 就一定不會接受到 Hello World 了(這不是廢話嗎 - -|||),因而這也在根本上反映了 Observable 的冷熱區別。
一般而言,我們的 Observable 都屬于 Cold Observables,就像看視頻,每次點開新視頻我們都要從頭開始播放;而 Subject 則默認屬于 Hot Observables,就像看直播,視頻數據永遠都是新的。
基于這種屬性,Subject 自然擁有了對接收到的數據流進行選擇調度等的能力了,因此,我們對于 Subject 的使用也就通常基于如下的思路:
在前面的例子里我們用到的是 PublishSubject,它只會把在訂閱發生的時間點之后來自原始 Observable 的數據發射給觀察者。等一下,這功能聽起來是不是有些似曾相識呢?
沒錯,就是 EventBus 和 Otto。(RxJava 的出現慢慢讓 Otto 退出了舞臺,現在 Otto 的 Repo 已經是 Deprecated 狀態了,而 EventBus 依舊堅挺)基于 RxJava 的觀察訂閱取消的能力和 PublishSubject 的功能,我們十分容易就能寫出實現了最基本功能的簡易事件總線框架:
當然 Subject 還有其他如 BehaviorSubject、ReplaySubject、AsyncSubject 等類型,大家可以去看官方文檔,寫得十分詳細,這里就不介紹了。
三. 后記
前面相信最近這段日子里,提到 RxJava,大家就會想到 Google 最近剛剛開源的 Agera。Agera 作為專門為 Android 打造的 Reactive Programming 框架,難免會被拿來與 RxJava 做對比。本文前面 RxJava 的主體流程分析已近尾聲,現在我們再來看看 Agera 這東東又是怎么一回事。
首先先上結論:
Agera 最初是為了 Google Play Movies 而開發的一個內部框架,現在開源出來了,它雖然是在 RxJava 之后才出現,但是完全獨立于 RxJava,與它沒有任何關系(只不過開源的時間十分微妙罷了 233333)。與 RxJava 比起來,Agera 更加專注于 Android 的生命周期,而 RxJava 則更加純粹地面向 Java 平臺而非 Android。
也許你可能會問:“那么 RxAndroid 呢,不是還有它嗎?”事實上,RxAndroid 早在 1.0 版本的時候就進行了很大的重構,很多模塊被拆分到其他的項目中去了,同時也刪除了部分代碼,僅存下來的部分多是和 Android 線程相關的部分,比如 AndroidSchedulers、MainThreadSubscription 等。鑒于這種情況,我們暫且不去關注 RxAndroid,先把目光放在 Agera 上。
同樣也是基于觀察者模式,Agera 和 RxJava 的角色分類大致相似,在 Agera 中,主要角色有兩個:Observable(被觀察者)、Updatable(觀察者)。
是的,相較于 RxJava 中的 Observable,Agera 中的 Observable 只是一個簡單的接口,也沒有范性的存在,Updatable 亦是如此,這樣我們要如何做到消息的傳遞呢?這就需要另外一個接口了:
終于看到了泛型 T,我們的消息的傳遞能力就是依賴于此接口了。所以我們將這個接口和基礎的 Observable 結合一下:
這里的 Repository T 在一定程度上就是我們想要的 RxJava 中的 Observable T 啦。類似地,Repository 也有兩種類型的實現:
Direct – 所包含的數據總是可用的或者是可被同步計算出來的;一個 Direct 的 Repository 總是處于活躍(active)狀態下
Deferred – 所包含的數據是異步計算或拉去所得;一個 Deffered 的 Repository 直到有 Updatable 被添加進來之前都會是非活躍(inactive)狀態下
是不是感到似曾相識呢?沒錯,Repository 也是有冷熱區分的,不過我們現在暫且不去關注這一點。回到上面接著看,既然現在發數據的角色有了,那么我們要如何接收數據呢?答案就是 Receiver:
相信看到這里,大家應該也隱約感覺到了:在 Agera 的世界里,數據的傳輸與事件的傳遞是相互隔離開的,這是目前 Agera 與 Rx 系列的最大本質區別。Agera 所使用的是一種 push event, pull data 的模型,這意味著 event 并不會攜帶任何 data,Updatable 在需要更新時,它自己會承擔起從數據源拉取數據的任務。這樣,提供數據的責任就從 Observable 中拆分了出來交給了 Repository,讓其自身能夠專注于發送一些簡單的事件如按鈕點擊、一次下拉刷新的觸發等等。
那么,這樣的實現有什么好處呢?
當這兩種處理分發邏輯分離開時,Updatable 就不必觀察到來自 Repository 的完整數據變化的歷史,畢竟在大多數場景下,尤其是更新 UI 的場景下,最新的數據往往才是有用的數據。
但是我就是需要看到變化的歷史數據,怎么辦?
不用擔心,這里我們再請出一個角色 Reservoir:
顧名思義,Reservoir 就是我們用來存儲變化中的數據的地方,它繼承了 Receiver、Repository,也就相當于同時具有了接收數據,發送數據的能力。通過查看其具體實現我們可以知道它的本質操作都是使用內部的 Queue 實現的:通過 accept() 接收到數據后入列,通過 get() 拿到數據后出列。若一個 Updatable 觀察了此 Reservoir,其隊列中發生調度變化后即將出列的下一個數據如果是可用的(非空),就會通知該 Updatable,進一步拉取這個數據發送給 Receiver。
現在,我們已經大概了解了這幾個角色的功能屬性了,接下來我們來看一段官方示例代碼:
是不是有些云里霧里的感覺呢?多虧有注釋,我們大概能夠猜出到底上面都做了什么:使用需要的圖片規格作為參數拼接到 url 中,拉取對應的圖片并用 ImageView 顯示出來。我們結合 API 來看看整個過程:
Repositories.repositoryWithInitialValue(Result.absent())
創建一個可運行(抑或說執行)的 repository。
初始化傳入值是 Result,它用來概括一些諸如 apply()、merge() 的操作的結果的不可變對象,并且存在兩種狀態 succeeded()、failed()。
返回 REventSource
observe()
用于添加新的 Observable 作為更新我們的圖片的 Event source,本例中不需要。
返回 RFrequency
onUpdatesPerLoop()
在每一個 Looper Thread loop 中若有來自多個 Event Source 的 update() 處理時,只需開啟一個數據處理流。
返回 RFlow
getFrom(new Supplier(…))
忽略輸入值,使用來自給定 Supplier 的新獲取的數據作為輸出值。
返回 RFlow
goTo(executor)
切換到給定的 executor 繼續數據處理流。
attemptTransform(function())
使用給定的 function() 變換輸入值,若變換失敗,則終止數據流;若成功,則取新的變換后的值作為當前流指令的輸出。
返回 RTermination
orSkip()
若前面的操作檢查為失敗,就跳過剩下的數據處理流,并且不會通知所有已添加的 Updatable。
thenTransform(function())
與 attemptTransform(function()) 相似,區別在于當必要時會發出通知。
返回 RConfig
onDeactivation(SEND_INTERRUPT)
用于明確 repository 不再 active 時的行為。
返回 RConfig
compile()
執行這個 repository。
返回 Repository
整體流程乍看起來并沒有什么特別的地方,但是真正的玄機其實藏在執行每一步的返回值里:
初始的 REventSource T, T 代表著事件源的開端,它從傳入值接收了 T initialValue,這里的中,第一個 T 是當前 repository 的數據的類型,第二個 T 則是數據處理流開端的時候的數據的類型。
之后,當 observe() 調用后,我們傳入事件源給 REventSource,相當于設定好了需要的事件源和對應的開端,這里返回的是 RFrequency T, T,它繼承自 REventSource,為其添加了事件源的發送頻率的屬性。
之后,我們來到了 onUpdatesPerLoop(),這里明確了所開啟的數據流的個數(也就是前面所講的頻率)后,返回了 RFlow,這里也就意味著我們的數據流正式生成了。同時,這里也是流式調用的起點。
拿到我們的 RFlow 之后,我們就可以為其提供數據源了,也就是前面說的 Supplier,于是調用 getFrom(),這樣我們的數據流也就真正意義擁有了數據“干貨”。
有了數據之后我們就可以按具體需要進行數據轉換了,這里我們可以直接使用 transform(),返回 RFlow,以便進一步進行流式調用;也可以調用 attemptTransform() 來對可能出現的異常進行處理,比如 orSkip()、orEnd() 之后繼續進行流式調用。
經過一系列的流式調用之后,我們終于對數據處理完成啦,現在我們可以選擇先對成型的數據在做一次最后的包裝 thenTransform(),或是與另一個 Supplier 合并 thenMergeIn() 等。這些處理之后,我們的返回值也就轉為了 RConfig,進入了最終配置和 repository 聲明結束的狀態。
在最終的這個配置過程中,我們調用了 onDeactivation(),為這個 repository 明確了最終進入非活躍狀態時的行為,如果不需要其他多余的配置的話,我們就可以進入最終的 compile() 方法了。當我們調用 compile() 時,就會按照前面所走過的所有流程與配置去執行并生成這個 repository。到此,我們的 repository 才真正被創建了出來。
以上就是 repository 從無到有的全過程。當 repository 誕生后,我們也就可以傳輸需要的數據啦。再回到上面的示例代碼:
我們在 onResume()、onPause() 這兩個生命周期下分別添加、移除了 Updatable。相較于 RxJava 中通過 Subscription 去取消訂閱的做法,Agera 的這種寫法顯然更為清晰也更為整潔。我們的 Activity 實現了 Updatable 和 Receiver 接口,直接看其實現方法:
可以看到這里 repository 將數據發送給了 receiver,也就是自己,在對應的 accept() 方法中接收到我們想要的 bitmap 后,這張圖片也就顯示出來了,示例代碼中的完整流程也就結束了。
總結一下上述過程:
首先 Repositories.repositoryWithInitialValue() 生成原點 REventSource。
配置完 Observable 之后進入 RFrequency 狀態,接著配置數據流的流數。
前面配置完成后,數據流 RFlow 生成,之后通過 getFrom()、mergeIn()、transform() 等方法可進一步進行流式調用;也可以使用 attemptXXX() 方法代替原方法,后面接著調用 orSkip()、orEnd() 進行 error handling 處理。當使用 attemptXXX() 方法時,數據流狀態會變為 RTermination,它代表此時的狀態已具有終結數據流的能力,是否終結數據流要根據 failed check 觸發,結合后面跟著調用的 orSkip()、orEnd(),我們的數據流會從 RTermination 再次切換為 RFlow,以便進行后面的流式調用。
經過前面一系列的流式處理,我們需要結束數據流時,可以選擇調用 thenXXX() 方法,對數據流進行最終的處理,處理之后,數據流狀態會變為 RConfig;也可以為此行為添加 error handling 處理,選擇 thenAttemptXXX() 方法,后面同樣接上 orSkip()、orEnd() 即可,最終數據流也會轉為 Rconfig 狀態。
此時,我們可以在結束前按需要選擇對數據流進行最后的配置,例如:調用 onDeactivation() 配置從“訂閱”到“取消訂閱”的過程是否需要繼續執行數據流等等。
一切都部署完畢后,我們 compile() 這個 RConfig,得到最終的成型的 Repository,它具有添加 Updatable、發送數據通知 Receiver 的能力。
我們根據需要添加 Updatable,repository 在數據流處理完成后會通過 update() 發送 event 通知 Updatable。
Updatable 收到通知后則會拉取 repository 的成果數據,并將數據通過 accept() 發送給 Receiver。完成 Push event,pull data 的流程。
看完上述內容,你們對 RxJava 線程切換過程是怎樣的有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注丸趣 TV 行業資訊頻道,感謝大家的支持。