RxJava 觀察綁定和事件發送流程及其中的線程切換分析

本文的所有分析都是基於 RxJava2 進行的。以下的 RxJava 指 RxJava2

閱讀本文你將會知道:

  • RxJava 的觀察綁定和事件發送過程
  • RxJava 觀察綁定和事件發送過程中的線程切換

從 RxJava1.0 到 RxJava2.0,在項目開發中已經使用了很長時間這個庫了。鏈式調用,絲滑的線程切換很香,但是如果沒弄清楚其中的奧妙很容易掉進線程調度的坑裡。這篇文章我們就來對 RxJava 的訂閱過程、時間發送過程、線程調度進行分析

訂閱和事件流

先說結論

<code>subscribe()
/<code>

圖解

為了更便於理解訂閱的流轉方向,我將Observable調用 subscribe() 訂閱描述為了 Observer beSubscribed()

RxJava 觀察綁定和事件發送流程及其中的線程切換分析

訂閱及數據發送

源碼分析

Observabe 創建過程

此過程對應圖中 黑色箭頭 部分,以操作符中的 map() 操作為例:

<code>@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable map(Function super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
}
/<code>

調用 map 操作符時,RxJavaPliguns 會註冊一個新的 ObservableMap 對象,查看其它操作符會發現都有對應的 Observable 對象產生。同時,上游的 Observabe 會作為 source 參數傳入賦值給這個新的 Observable 的 source 屬性。層層向下,可以對這個新生成的 Observable 又可以繼續使用操作符。

訂閱過程:

當調用最後一個 Observable 的 subscribe() 方法時,即開始訂閱過程。此過程對應圖中 紅色箭頭 部分

<code>@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {

observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}/<code>

在調用 subscribe(Observer) 時實際上會去調用各個 Observable 實現子類中的 subscribeActual() 方法:

<code>@Override
public void subscribeActual(Observer super U> t) {
source.subscribe(new MapObserver(t, function));
}
/<code>

而在這個 subscribeActual() 方法也很簡單,調用了 source 去訂閱一個新生成的 Observer 對象,同時這個新的 MapObserver 會將調用 subscribe() 時傳入的 observer ,賦值給 downstream 屬性。這樣每一級訂閱都會將上級的 Observable 、本級生成的 Observer 、訂閱下級傳入的 Observer 聯繫起來,直到達到 Observable 最初創建的地方整個訂閱過程結束。

事件發送過程:

此過程對應圖中 綠色箭頭 部分Observable 事件起點創建有很多中操作符,他們都會創建出最初發送的事件/數據,以 ObservableCreate 為例:

<code>@Override
protected void subscribeActual(Observer super T> observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
/<code>

訂閱時會調用 source.subscrebe(parent) ,而這個 source 又是從哪兒來的呢?

<code>public ObservableCreate(ObservableOnSubscribe source) {
this.source = source;
}
/<code>
<code>Observable.create(object : ObservableOnSubscribe<string> {
override fun subscribe(emitter: ObservableEmitter<string>) {
emitter.onNext("data")
}

})/<string>/<string>/<code>

從代碼中我們可以看出,這個 source 即為我們創建時傳入的 ObservableOnSubscribe ,因此 emitter.onNext("data") 即是事件發送的起點。我們再繼續看 emitter 的 onNext() 做了什麼:

<code>@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}

}/<code>

源碼中現實調用了 observer.onNext() ,而這個 observer 則是前面訂閱過程中 source.subscribe(new MapObserver(t, function)) 傳入的那個 observer ,從而將事件發送到了下一級,下一級的 Observer 同樣在 onNext() 將事件發送到更下一級,一直到最終我們 subscribe() 時傳入的那個 Observer 實例完畢。

線程調度

事件訂閱發送流程通過上面的文章基本已經能夠摸清了,我們接下來關注另一個重點 線程調度 問題。

調度方式

RxJava 中線程變換通過 subscribeOn() 和 observeOn() 兩個操作來進行。其中 subscribeOn() 改變的是訂閱線程的執行線程,即事件發生的線程。 observeOn() 改變的是事件結果觀察者回調所在線程,即 onNext() 方法所在的線程。

RxJava 觀察綁定和事件發送流程及其中的線程切換分析

舉個栗子

使用 RxJava + Retrofit 進行網絡請求時,用 RxJava 管理網絡請求過程的線程切換。 subscribeOn() 指定的是網絡請求的線程, observeOn()

指定的是網絡請求後事件流的執行線程。

源碼分析

前面說過,每次操作符的使用,RxJava 都會生成一個對應的新的 Observable 對象。 observeOn() 與 subscribeOn() 也不例外。線程調度的核心邏輯都在 ObservableSubscribeOn 與 ObservableObserveOn 兩個類中

subscribeOn()過程

<code>@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}
/<code>

調用 subscribeOn() 時會產生一個新的 ObservableSubscribeOn 並把當前這個 Observable 和傳入的 Scheduler 作為參數傳入。前面分析過當最終調用 subscribe() 時會引起整個觀察鏈的 Observable 自下而上調用 subscribe() ,而這個 subscribe() 方法中實際為調用抽象類 Observable 的各個實現子類的 subscribeActual() 方法 。

<code>@Override
public void subscribeActual(final Observer super T> observer) {
final SubscribeOnObserver parent = new SubscribeOnObserver
(observer);

observer.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
/<code>

主要看這句 scheduler.scheduleDirect(new SubscribeTask(parent)); , SubscribeTask 前面內容已經分析過,就是調用上級 Observable 來訂閱生成的這個 SubscribeOnObserver 。

<code>@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit);

return task;
}/<code>

scheduleDirect 方法,會使用傳入的 scheduler 在指定的線程創建一個 Worker 對象來執行 SubscribeTask ,從而達到了切換訂閱線程的目的。所以多個 subscribeOn() 疊加時,最終線程還是會回到最後執行的(代碼第一次出現的) subscribeOn() 指定的線程。

observeOn()過程

調用 observeOn(Scheduler) 方法,會調用內部的同名方法生成一個新的 ObservableObserveOn 對象,並把當前這個 Observable 和傳入的 Scheduler 作為參數傳入。訂閱過程與 ObservableSubscribeOn 不一樣,會直接在當前線程調用上級 Observable 訂閱自己,,我們主要看 ObservableObserveOn 的 ObserveOnObserver 是如何調度結果數據發送的線程的。

<code>@Override 

public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}

void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}/<code>

從源碼中可以發現,最終會使用 worker 去向下游發送事件。這個 worker 就是我們 observeOn() 方法中指定的線程創建的 worker。從而達到切換線程的目的,由於事件又是自上而下的,所以每次切換都能在下游事件中感受到線程的變化。

日誌分析

把 subscribeOn() 和 observeOn() 放一起來說不太容易說明白其中的線程變換,我先看看單獨使用其中的一個操作符的時候,導致的線程變化。

僅調用 subscribeOn() 調度線程

<code>Observable.just("Data")
.map {
Log.d("Map 1", Thread.currentThread().name)
return@map it
}
.subscribeOn(Schedulers.io())
.doOnSubscribe {
Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
}
.map {
Log.d("Map 2 ", Thread.currentThread().name)

return@map it
}
.subscribeOn(Schedulers.newThread())
.doOnSubscribe {
Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
}
.map {
Log.d("Map 3 ", Thread.currentThread().name)
return@map it
}
.subscribe(object : Observer<string> {
override fun onComplete() {

}

override fun onSubscribe(d: Disposable) {
Log.d("onSubscribe", Thread.currentThread().name)
}

override fun onNext(t: String) {
Log.d("onNext", Thread.currentThread().name)
}

override fun onError(e: Throwable) {
e.printStackTrace()
}

})/<string>/<code>

執行結果:

RxJava 觀察綁定和事件發送流程及其中的線程切換分析

image.png

從日誌可以看出:

  • 1、訂閱是自下向上的(onSubscribe -->doOnSubscribe 2 -->doOnsubscribe 1)
  • 2、自下向上看,每次調用 subscribeOn 訂閱線程將會發生改變,直到下次調用 subscribeOn
  • 3、事件是自上向下傳遞的(Map 1 --> Map 2 --> Map 3 --> onNext),且所在線程為最後一次線程切換後所在的線程 RxCachedThreadScheduler-1

僅調用 subscribeOn() 調度線程

<code>Observable.just("Data")
.map {
Log.d("Map 1", Thread.currentThread().name)
return@map it
}
// .doOnSubscribe {
// Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
// }
// .subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map {
Log.d("Map 2 ", Thread.currentThread().name)
return@map it
}
// .doOnSubscribe {
// Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
// }
// .subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map {
Log.d("Map 3 ", Thread.currentThread().name)
return@map it
}
.subscribe(object : Observer<string> {
override fun onComplete() {

}

override fun onSubscribe(d: Disposable) {
Log.d("onSubscribe", Thread.currentThread().name)
}

override fun onNext(t: String) {
Log.d("onNext", Thread.currentThread().name)
}

override fun onError(e: Throwable) {
e.printStackTrace()
}

})/<string>/<code>

執行結果:

RxJava 觀察綁定和事件發送流程及其中的線程切換分析

日誌打印

從日誌可以看出:

<code>observeOn
/<code>

混合使用調度線程

我們把上述代碼中註釋部分都打開,得到的日誌如下:

RxJava 觀察綁定和事件發送流程及其中的線程切換分析

日誌打印

通過上面的三次日誌打印我們可以看出:

訂閱鏈的日誌自下而上打印完畢後,再自上而下打印觀察結果。 subscribeOn 會切換線程,並不是像有的文章所說只有第一次指定線程(即自下而上的最後一次)有效。第一次有效只是我們的錯覺,因為訂閱是自下而上的,不管前面的線程怎樣切換追蹤都會切換到 subscribeOn 第一次指定線程(即自下而上的最後一次)。我們在回調結果中未進行線程切換操作時,只能感知到這一次線程切換 (Map1 與 doOnSubscribe 1 所在線程一致)。 observeOn 的每次指定線程都會讓事件流切換到對應的線程中去。完整的事件訂閱和發送流程如下圖所示,從我們調用 subscribe() 將觀察者和觀察對象關聯起來開始, subscribe() 中傳入的 Observer 的 onNext 或 onError 結束,形成了一個逆時針的 n 形的鏈條。右邊部分的觀察鏈中,每次 subscribeOn 都會切換觀察線程。左邊部分的事件發送鏈,會從觀察鏈的最後一次指定的線程開始發送事件,每次調用 observeOn 都會指定新的事件發送線程。

圖解

參照上面的源碼和日誌分析,再結合本圖相信大家會對 RxJava 的現場調度有一個更立體的認識

RxJava 觀察綁定和事件發送流程及其中的線程切換分析

RxJava2 線程切換流程


分享到:


相關文章: