誤入Android開發-Day9:RxJava的事件流(二)

影山小麥機
15 min readJan 30, 2025

來聊上次沒聊完的事件流:Subject、ConnectableObservable、Processor

前言

趁著年假的這段時光,繼續聊事件流,上一集提到的事件流主要是比較常見的主要事件流類型:

  1. Observable
  2. Flowable
  3. Single
  4. Maybe
  5. Completable

接下來要討論的是Subject,至於做些什麼用,我覺得可以這樣介紹Subject:「可以是觀察者、又可以是被觀察者的兩用工具」。

也就是說:它可以發射數據,也可以同時訂閱其他的數據流。

正文

Subject

說到Subject,它最特別的地方是,它既是Obaservable又是Observer,可以是數據的發射者也可以是訂閱者。

論類型的話,可以分成常見的以下四種類型:

不過總論來說,都是可以接收數據(作為Observer),也同時可以向下游發射數據(Observable)

比較有意思的場景應用:大概就是手動控制數據流的場景了

那稍微實作一下上面四種Subject,比較一下四者的差別:

PublishSubject

import io.reactivex.subjects.PublishSubject;

public class PublishSubjectExample {
public static void main(String[] args) {
PublishSubject<String> subject = PublishSubject.create();

subject.onNext("Event 1"); // 沒有訂閱者,這個數據不會被接收

subject.subscribe(data -> System.out.println("Observer 1: " + data));

subject.onNext("Event 2"); // Observer 1: Event 2
subject.onNext("Event 3"); // Observer 1: Event 3

subject.subscribe(data -> System.out.println("Observer 2: " + data));

subject.onNext("Event 4");
// Observer 1: Event 4
// Observer 2: Event 4
}
}

output:

Observer 1: Event 2
Observer 1: Event 3
Observer 1: Event 4
Observer 2: Event 4

沒有訂閱,就不緩存前面的事件,所以射出的事件不含第一個事件。

BehaviorSubject

import io.reactivex.subjects.BehaviorSubject;

public class BehaviorSubjectExample {
public static void main(String[] args) {
BehaviorSubject<String> subject = BehaviorSubject.create();

subject.onNext("Event 1"); // 被緩存起來,但沒有訂閱者

subject.subscribe(data -> System.out.println("Observer 1: " + data));

subject.onNext("Event 2"); // Observer 1: Event 2

subject.subscribe(data -> System.out.println("Observer 2: " + data));

subject.onNext("Event 3");
// Observer 1: Event 3
// Observer 2: Event 3
}
}

output:

Observer 1: Event 1
Observer 1: Event 2
Observer 2: Event 2
Observer 1: Event 3
Observer 2: Event 3

前面的第一個事件有被存起來,後面訂閱後一併射出。

ReplaySubject

import io.reactivex.subjects.ReplaySubject;

public class ReplaySubjectExample {
public static void main(String[] args) {
ReplaySubject<String> subject = ReplaySubject.create();

subject.onNext("Event 1");
subject.onNext("Event 2");

subject.subscribe(data -> System.out.println("Observer 1: " + data));

subject.onNext("Event 3"); // Observer 1: Event 3

subject.subscribe(data -> System.out.println("Observer 2: " + data));

subject.onNext("Event 4");
}
}

output:

Observer 1: Event 1
Observer 1: Event 2
Observer 1: Event 3
Observer 2: Event 1
Observer 2: Event 2
Observer 2: Event 3
Observer 1: Event 4
Observer 2: Event 4

會回放所有歷史數據的,只要新增一個訂閱者,就會回放所有的歷史。

AsyncSubject

import io.reactivex.subjects.AsyncSubject;

public class AsyncSubjectExample {
public static void main(String[] args) {
AsyncSubject<String> subject = AsyncSubject.create();

subject.onNext("Event 1");
subject.onNext("Event 2");
subject.onNext("Event 3");

subject.subscribe(data -> System.out.println("Observer 1: " + data));

subject.onNext("Event 4");

subject.subscribe(data -> System.out.println("Observer 2: " + data));

subject.onComplete(); // 這時候才會發射最後一個數據 "Event 4"
}
}

output:

Observer 1: Event 4
Observer 2: Event 4

只會在onComplete()執行後,釋出最後一個數據。

再次總結一下四種類型:

ConnectableObservable

在這個被觀察者,比較特殊的地方是:「冷被觀察者」與「熱被觀察者」

  1. 每個訂閱者都會從頭開始接收數據流(獨立事件流)
  2. 數據的生成合發射都與訂閱者有關

  1. 所有訂閱者共享一個數據流
  2. 數據的生成與發射與訂閱者無關,可以在沒有訂閱者時就開始發射數據

而ConnectableObservable允許將冷的轉為熱的,那在特性上:

  1. ConnactableObservable是熱的Observable,允許多個訂閱者共享一個數據流
  2. 手動啟動數據流:它的數據流需要調用connect()才可以開始發射。

怎麼創建一個ConnectableObservable?

如果要將冷的被觀察者轉成熱的,使用Publish()

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;

public class ConnectableObservableExample {
public static void main(String[] args) {
// 創建一個普通的冷 Observable
Observable<Integer> coldObservable = Observable.range(1, 5);

// 將其轉換為 ConnectableObservable
ConnectableObservable<Integer> connectable = coldObservable.publish();

// 訂閱者 1
connectable.subscribe(data -> System.out.println("Observer 1: " + data));

// 訂閱者 2
connectable.subscribe(data -> System.out.println("Observer 2: " + data));

// 開始發射數據
connectable.connect();
}
}

output:

Observer 1: 1
Observer 2: 1
Observer 1: 2
Observer 2: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 1: 5
Observer 2: 5

多訂閱者情境:

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;

public class ConnectableObservableExample {
public static void main(String[] args) {
ConnectableObservable<Integer> connectable = Observable.range(1, 5).publish();

// 訂閱者 1
connectable.subscribe(data -> System.out.println("Observer 1: " + data));

// 訂閱者 2
connectable.subscribe(data -> System.out.println("Observer 2: " + data));

// 開始發射數據
connectable.connect();

// 訂閱者 3,遲到訂閱
connectable.subscribe(data -> System.out.println("Observer 3: " + data));
}
}

output:

Observer 1: 1
Observer 2: 1
Observer 1: 2
Observer 2: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 1: 5
Observer 2: 5

Processor

這個東西厲害了,它主要的特性是處理Flowable這種擁有高量數據的事件流,複合性的擁有Subject的動態發送能力,又有Flowable處理背壓的高頻數據流。

這逼可以稍微表列一下Processor的核心特性:

那他跟Flowable相比,又有哪些更勝一籌的特質呢?

簡單比一比, 就可以知道Processor擁有Subject+Flowable的綜合特性,不過Processor也只是一個種類的稱呼,實際上它還分成四小類型:

以下會簡單的帶過這四種類型的實作:

PublishProcessor

屬性是即時數據流:

import io.reactivex.processors.PublishProcessor;

public class PublishProcessorExample {
public static void main(String[] args) {
PublishProcessor<String> processor = PublishProcessor.create();

processor.subscribe(data -> System.out.println("Subscriber 1: " + data));

processor.onNext("Event 1");
processor.onNext("Event 2");

processor.subscribe(data -> System.out.println("Subscriber 2: " + data));

processor.onNext("Event 3");
}
}

output:

Subscriber 1: Event 1
Subscriber 1: Event 2
Subscriber 1: Event 3
Subscriber 2: Event 3
  1. 不會緩存舊數據
  2. 訂閱的時間點開始接收訂閱之後的數據

BehaviorProcessor

狀態管理,如當前狀態、數據更新

import io.reactivex.processors.BehaviorProcessor;

public class BehaviorProcessorExample {
public static void main(String[] args) {
BehaviorProcessor<String> processor = BehaviorProcessor.create();

processor.onNext("Event 1");
processor.onNext("Event 2");

processor.subscribe(data -> System.out.println("Subscriber 1: " + data));

processor.onNext("Event 3");
}
}

output:

Subscriber 1: Event 2
Subscriber 1: Event 3

會傳出前一個狀態,到下一個狀態。

ReplayProcessor

跟前面的名字一樣,Replay代表他會回放所有的數據,也會緩存所有的數據:

import io.reactivex.processors.ReplayProcessor;

public class ReplayProcessorExample {
public static void main(String[] args) {
ReplayProcessor<String> processor = ReplayProcessor.create();

processor.onNext("Event 1");
processor.onNext("Event 2");

processor.subscribe(data -> System.out.println("Subscriber 1: " + data));

processor.onNext("Event 3");
}
}

output:

Subscriber 1: Event 1
Subscriber 1: Event 2
Subscriber 1: Event 3

嘛,所以要查詢什麼紀錄或聊天訊息的歷史訊息,用這種應該會不錯。

AsyncProcessor

只發射最後一個數據

import io.reactivex.processors.AsyncProcessor;

public class AsyncProcessorExample {
public static void main(String[] args) {
AsyncProcessor<String> processor = AsyncProcessor.create();

processor.onNext("Event 1");
processor.onNext("Event 2");
processor.onNext("Event 3");

processor.subscribe(data -> System.out.println("Subscriber 1: " + data));

processor.onComplete(); // 只有 onComplete() 之後,才會發送最後一個數據
}
}

output:

Subscriber 1: Event 3

只發送最後一個數據 Event 3,並且只有在 onComplete() 之後才會發送

最後再比一下Processor的各種類型:

尾聲

目前應該介紹了大部分類型的數據流了,接下來還是會多少提Rx的使用,但可能會聚焦在一些操作符的熟悉,坦白說,Rx的語法真的不容易,也有一些事件流需要統整使用的方式,不過單從取名字來說,就可以略窺一二使用的方式。

實際使用後應該就會明白Rx的威力了。

Sign up to discover human stories that deepen your understanding of the world.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

影山小麥機
影山小麥機

Written by 影山小麥機

本職為Mobile工程師,熱愛分享視野,也樂意站在ChatGPT的肩膀上。訂閱小麥機,收割技術、職涯、人生的難題。

No responses yet

Write a response