來聊上次沒聊完的事件流:Subject、ConnectableObservable、Processor

前言
趁著年假的這段時光,繼續聊事件流,上一集提到的事件流主要是比較常見的主要事件流類型:
- Observable
- Flowable
- Single
- Maybe
- Completable
接下來要討論的是Subject,至於做些什麼用,我覺得可以這樣介紹Subject:「可以是觀察者、又可以是被觀察者的兩用工具」。
也就是說:它可以發射數據,也可以同時訂閱其他的數據流。
正文
Subject
說到Subject,它最特別的地方是,它既是Obaservable又是Observer,可以是數據的發射者也可以是訂閱者。
論類型的話,可以分成常見的以下四種類型:

不過總論來說,都是可以接收數據(作為Observer),也同時可以向下游發射數據(Observable)
比較有意思的場景應用:大概就是手動控制數據流的場景了
那稍微實作一下上面四種Subject,比較一下四者的差別:
PublishSubject
import io.reactivex.subjects.PublishSubject;
public class PublishSubjectExample {
public static void main(String[] args) {
PublishSubject<String> subject = PublishSubject.create();
subject.onNext("Event 1"); // 沒有訂閱者,這個數據不會被接收
subject.subscribe(data -> System.out.println("Observer 1: " + data));
subject.onNext("Event 2"); // Observer 1: Event 2
subject.onNext("Event 3"); // Observer 1: Event 3
subject.subscribe(data -> System.out.println("Observer 2: " + data));
subject.onNext("Event 4");
// Observer 1: Event 4
// Observer 2: Event 4
}
}
output:
Observer 1: Event 2
Observer 1: Event 3
Observer 1: Event 4
Observer 2: Event 4
沒有訂閱,就不緩存前面的事件,所以射出的事件不含第一個事件。
BehaviorSubject
import io.reactivex.subjects.BehaviorSubject;
public class BehaviorSubjectExample {
public static void main(String[] args) {
BehaviorSubject<String> subject = BehaviorSubject.create();
subject.onNext("Event 1"); // 被緩存起來,但沒有訂閱者
subject.subscribe(data -> System.out.println("Observer 1: " + data));
subject.onNext("Event 2"); // Observer 1: Event 2
subject.subscribe(data -> System.out.println("Observer 2: " + data));
subject.onNext("Event 3");
// Observer 1: Event 3
// Observer 2: Event 3
}
}
output:
Observer 1: Event 1
Observer 1: Event 2
Observer 2: Event 2
Observer 1: Event 3
Observer 2: Event 3
前面的第一個事件有被存起來,後面訂閱後一併射出。
ReplaySubject
import io.reactivex.subjects.ReplaySubject;
public class ReplaySubjectExample {
public static void main(String[] args) {
ReplaySubject<String> subject = ReplaySubject.create();
subject.onNext("Event 1");
subject.onNext("Event 2");
subject.subscribe(data -> System.out.println("Observer 1: " + data));
subject.onNext("Event 3"); // Observer 1: Event 3
subject.subscribe(data -> System.out.println("Observer 2: " + data));
subject.onNext("Event 4");
}
}
output:
Observer 1: Event 1
Observer 1: Event 2
Observer 1: Event 3
Observer 2: Event 1
Observer 2: Event 2
Observer 2: Event 3
Observer 1: Event 4
Observer 2: Event 4
會回放所有歷史數據的,只要新增一個訂閱者,就會回放所有的歷史。
AsyncSubject
import io.reactivex.subjects.AsyncSubject;
public class AsyncSubjectExample {
public static void main(String[] args) {
AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("Event 1");
subject.onNext("Event 2");
subject.onNext("Event 3");
subject.subscribe(data -> System.out.println("Observer 1: " + data));
subject.onNext("Event 4");
subject.subscribe(data -> System.out.println("Observer 2: " + data));
subject.onComplete(); // 這時候才會發射最後一個數據 "Event 4"
}
}
output:
Observer 1: Event 4
Observer 2: Event 4
只會在onComplete()執行後,釋出最後一個數據。

再次總結一下四種類型:
ConnectableObservable
在這個被觀察者,比較特殊的地方是:「冷被觀察者」與「熱被觀察者」
冷
- 每個訂閱者都會從頭開始接收數據流(獨立事件流)
- 數據的生成合發射都與訂閱者有關
熱
- 所有訂閱者共享一個數據流
- 數據的生成與發射與訂閱者無關,可以在沒有訂閱者時就開始發射數據
而ConnectableObservable允許將冷的轉為熱的,那在特性上:
- ConnactableObservable是熱的Observable,允許多個訂閱者共享一個數據流
- 手動啟動數據流:它的數據流需要調用connect()才可以開始發射。
怎麼創建一個ConnectableObservable?
如果要將冷的被觀察者轉成熱的,使用Publish()
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
public class ConnectableObservableExample {
public static void main(String[] args) {
// 創建一個普通的冷 Observable
Observable<Integer> coldObservable = Observable.range(1, 5);
// 將其轉換為 ConnectableObservable
ConnectableObservable<Integer> connectable = coldObservable.publish();
// 訂閱者 1
connectable.subscribe(data -> System.out.println("Observer 1: " + data));
// 訂閱者 2
connectable.subscribe(data -> System.out.println("Observer 2: " + data));
// 開始發射數據
connectable.connect();
}
}
output:
Observer 1: 1
Observer 2: 1
Observer 1: 2
Observer 2: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 1: 5
Observer 2: 5
多訂閱者情境:
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
public class ConnectableObservableExample {
public static void main(String[] args) {
ConnectableObservable<Integer> connectable = Observable.range(1, 5).publish();
// 訂閱者 1
connectable.subscribe(data -> System.out.println("Observer 1: " + data));
// 訂閱者 2
connectable.subscribe(data -> System.out.println("Observer 2: " + data));
// 開始發射數據
connectable.connect();
// 訂閱者 3,遲到訂閱
connectable.subscribe(data -> System.out.println("Observer 3: " + data));
}
}
output:
Observer 1: 1
Observer 2: 1
Observer 1: 2
Observer 2: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 1: 5
Observer 2: 5
Processor
這個東西厲害了,它主要的特性是處理Flowable這種擁有高量數據的事件流,複合性的擁有Subject的動態發送能力,又有Flowable處理背壓的高頻數據流。
這逼可以稍微表列一下Processor的核心特性:

那他跟Flowable相比,又有哪些更勝一籌的特質呢?

簡單比一比, 就可以知道Processor擁有Subject+Flowable的綜合特性,不過Processor也只是一個種類的稱呼,實際上它還分成四小類型:

以下會簡單的帶過這四種類型的實作:
PublishProcessor
屬性是即時數據流:
import io.reactivex.processors.PublishProcessor;
public class PublishProcessorExample {
public static void main(String[] args) {
PublishProcessor<String> processor = PublishProcessor.create();
processor.subscribe(data -> System.out.println("Subscriber 1: " + data));
processor.onNext("Event 1");
processor.onNext("Event 2");
processor.subscribe(data -> System.out.println("Subscriber 2: " + data));
processor.onNext("Event 3");
}
}
output:
Subscriber 1: Event 1
Subscriber 1: Event 2
Subscriber 1: Event 3
Subscriber 2: Event 3
- 不會緩存舊數據
- 訂閱的時間點開始接收訂閱之後的數據
BehaviorProcessor
狀態管理,如當前狀態、數據更新
import io.reactivex.processors.BehaviorProcessor;
public class BehaviorProcessorExample {
public static void main(String[] args) {
BehaviorProcessor<String> processor = BehaviorProcessor.create();
processor.onNext("Event 1");
processor.onNext("Event 2");
processor.subscribe(data -> System.out.println("Subscriber 1: " + data));
processor.onNext("Event 3");
}
}
output:
Subscriber 1: Event 2
Subscriber 1: Event 3
會傳出前一個狀態,到下一個狀態。
ReplayProcessor
跟前面的名字一樣,Replay代表他會回放所有的數據,也會緩存所有的數據:
import io.reactivex.processors.ReplayProcessor;
public class ReplayProcessorExample {
public static void main(String[] args) {
ReplayProcessor<String> processor = ReplayProcessor.create();
processor.onNext("Event 1");
processor.onNext("Event 2");
processor.subscribe(data -> System.out.println("Subscriber 1: " + data));
processor.onNext("Event 3");
}
}
output:
Subscriber 1: Event 1
Subscriber 1: Event 2
Subscriber 1: Event 3
嘛,所以要查詢什麼紀錄或聊天訊息的歷史訊息,用這種應該會不錯。
AsyncProcessor
只發射最後一個數據
import io.reactivex.processors.AsyncProcessor;
public class AsyncProcessorExample {
public static void main(String[] args) {
AsyncProcessor<String> processor = AsyncProcessor.create();
processor.onNext("Event 1");
processor.onNext("Event 2");
processor.onNext("Event 3");
processor.subscribe(data -> System.out.println("Subscriber 1: " + data));
processor.onComplete(); // 只有 onComplete() 之後,才會發送最後一個數據
}
}
output:
Subscriber 1: Event 3
只發送最後一個數據 Event 3,並且只有在 onComplete() 之後才會發送。
最後再比一下Processor的各種類型:

尾聲
目前應該介紹了大部分類型的數據流了,接下來還是會多少提Rx的使用,但可能會聚焦在一些操作符的熟悉,坦白說,Rx的語法真的不容易,也有一些事件流需要統整使用的方式,不過單從取名字來說,就可以略窺一二使用的方式。
實際使用後應該就會明白Rx的威力了。