誤入Android開發-Day8:RxJava的事件流(一)

影山小麥機
19 min readJan 28, 2025

今天來聊聊事件流們

前言

這幾篇想把RxJava稍微做比較好的整理,大概可能會分成三篇吧。

基本上,會延續前幾天的RxJava文章去做更細節實作性的延伸,從中嘗試回答幾個問題:

  1. 為什麼要有RxJava?
  2. RxJava解決了什麼問題?
  3. 為什麼這麼設計?

這一陣子在實作應用層的程式設計的時候,經常會碰到RxJava,然後有時候會把它跟協程寫在一起,但這樣做會進到一個誤區是這樣可能會有執行時序混雜的問題,所以我在實作上的結論是,要用Rx,就要實踐到底,用同一種框架管理好事情執行的時間序。

而上述的這幾個問題其實是我在接觸Rx的時候的幾個疑問,為什麼正常的OOP框架沒有辦法處理好Android這個Domain的運作,而需要一個學習曲線更高的框架來程式碼中實作呢?

接下來,我想從這幾個角度切入RxJava技術的實踐。

正文

為什麼要有RxJava?

說真的,我作為一個程式懶人,我平常只是覺得OOP其實他媽就蠻好用的了,多了一個Rx的FRP框架,對我來說有點搞剛,但細查之後,是發現有點意思,

你上網一查或丟給AI回答,你大概會得到這幾個答案:

  1. 優雅的異步操作
  2. 強大的組合操作
  3. 線程切換的便捷
  4. 響應式編程的理念
  5. 方便擴展與維護

這大概會是教科書式上的回答,那實際在開發上確實也是遇到很多這類的問題,比如第一點的優雅的異步操作,傳統的處理方式可能是下面這個樣子:

fetchData(new Callback() {
@Override
public void onSuccess(Data data) {
process(data, new Callback() {
@Override
public void onSuccess(Result result) {
display(result);
}
});
}
});

RxJava的處理方式會是下面這個樣子:

fetchDataObservable()
.flatMap(this::processDataObservable)
.subscribe(this::display);

OK,我相信Rx有點看不懂了,什麼是flatMap,什麼是subscribe?

最簡單的說法就是:

flatMap(this::processDataObservable): 將獲取的每條數據進行處理(例如,對數據進一步計算或查詢相關信息),每次處理返回一個新的「數據流」。
或者說個比較可以意會的概念:forEach

那subscribe呢?就如字面上的意思,「訂閱」。

但訂閱跟程式碼有什麼關係?這就涉及到函數式編程(Functional Programming)的基本概念了:「觀察者模式」

寫個簡單的概念:

Observable<String> observable = Observable.just("Hello", "RxJava");

observable
.subscribe(System.out::println); // Observer 訂閱

上述的Observable就是所謂的「被觀察者」,它會發射「數據流」出來,可以想像一下,原本在OOP的世界裡,都是一個值、一個值的傳輸,Rx的世界裡,是一連串的值噴出來傳出。所以一旦訂閱了這個被觀察者,就會獲取到這個「被觀察者」一連串的提供。

舉個簡單的例子:

Observable.just(1, 2, 3)
.flatMap(num -> Observable.just(num * 2))
.subscribe(System.out::println);

上面的observable放入了1、2、3

這樣print出來會是下面這樣:

2
4
6

這個時候,就可以扯到另外一個模式叫做「迭代器模式」(Iterator Pattern),Observable 發射的數據流本質上是一個迭代器,Observer 或 Subscriber 會按順序接收每一個數據。不過,跟傳統迭代器的概念不一樣的是:你必須訂閱,才會迭代每一個數值給你。

嘛,其實是還有幾個模式啦,比如裝飾器模式(Decorator Pattern),但這個就是動態添加職責(或功能),而不用大幅修改結構:

比如:

Observable.just(1, 2, 3)
.map(num -> num * 2) // 使用裝飾器擴展功能
.filter(num -> num > 3)
.subscribe(System.out::println);

大概講到這裡,應該就會對RxJava,有一定的「形狀」上的認識。再想想其前面那個Callback地獄,你會發現這個設計模式真有它存在的理由。但Rx說真的有他的實踐的陡峭度,接下來我們可以稍微再研究一下它的「流」的種類

我們來聊聊一些Rx的「為什麼這樣設計」。

流的種類:大概可以分成七種,不過我這邊想要講的大概就是基礎款,比較簡單的邏輯:所以大概會是五種:「Observable、Flowable、Single、Maybe、Completable」。

所以在下一個篇章,會是進階版的「流」

想想看喔,把OOP世界裡面的「值類別」全部串起來,想像成他們是一連串的流動,是什麼樣的感覺。

如同佔了篇幅將近一半的Observable,就是最常見的「被觀察者」,也就是我接下來要講的第一種:

1. Observable

典型的被觀察者,它可以做的事情如下:

發射事件
onNext()

發射錯誤
onError()

完成通知
onCompletable()

如何創建一個被觀察者?

你可以用just

Observable<String> observable = Observable.just("Hello", "RxJava!");

observable.subscribe(
item -> System.out.println("Received: " + item), // onNext
throwable -> System.out.println("Error: " + throwable), // onError
() -> System.out.println("Completed!") // onComplete
);

這樣他會依序發出:

Received: Hello
Received: RxJava!
Completed!

然後如果你想要一筆一筆的發射,可以用create()

Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("Event 1");
emitter.onNext("Event 2");
emitter.onComplete(); // 事件流完成
});

observable.subscribe(System.out::println);

Output會是下面這個樣子:

Event 1
Event 2

講講使用Observable使用上的優缺點:

  1. 靈活
  2. 操作符豐富
  3. 線程控制

至於缺點,大概就是兩個:1. 不支持背壓 2.複雜度較高

嘛,如果數據量大或速度過快的時候,使用另一種流Flowable,會是更好的選擇,也就是支持背壓,這樣也就不會有內存的問題

如果只是面對簡單場景,Observable也相較之下使用複雜。

Observable還有很多的操作符可以用,這邊就先做簡單的流的使用介紹,往後,篇章會再詳述。

2. Flowable

上面既然有些許提到Flowable,尤其是提到背壓,那就不得不說說這個事件流的特性:

它的使用情境大部分如下面兩種:

  1. 數據量大且產生速度快
  2. 需要控制數據流數

我可以想像到的可能是像是一連串的血糖值這種連續生理數據之類的值。也因為這個流的特殊屬性是處理量大的值流動,所以產生了背壓(Backpressure)的特性。

這在Flowable中,是一種處理數據的策略,尤其是當數據可能存在內存中導致溢出,這時候就需要背壓策略,這個我們等等會來聊。

首先,我們還是看一下Flowable寫起來是怎麼樣:

import io.reactivex.Flowable;
import io.reactivex.BackpressureStrategy;

Flowable<Integer> flowable = Flowable.create(emitter -> {
for (int i = 0; i < 1000; i++) {
if (emitter.isCancelled()) {
return; // 检查订阅是否被取消
}
emitter.onNext(i); // 发射数据
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER); // 指定背压策略

其實型態差不多,也是用Create進行值的發射,但因為數量多,所以選用Flowable,但差別是背壓策略:

幾種策略可以稍微寫一下:

Buffer


// BUFFER
// 行為:緩存所有未處理的數據,直到下游能夠處理為止。
// 優點:不會丟失數據。
// 缺點:如果數據發射速度遠遠超過處理速度,可能導致內存耗盡。
// 適用場景:下游需要完整處理所有數據,且數據量可控或內存足夠大。

Flowable.create(emitter -> {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i); // 發射數據
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER)
.subscribe(System.out::println);

Drop

// DROP
// 行為:丟棄下游無法處理的數據,只保留下游能及時處理的部分。
// 優點:防止內存占用過多,適合不要求數據完整性的場景。
// 缺點:數據會丟失。
// 適用場景:允許丟失數據的情況,例如即時監控或日誌流。

Flowable.create(emitter -> {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i); // 發射數據
}
emitter.onComplete();
}, BackpressureStrategy.DROP)
.subscribe(System.out::println);

Latest

// LATEST
// 行為:丟棄積壓的舊數據,只保留最新的數據,確保下游能消費最新的狀態。
// 優點:只需保留最新狀態,適合對過期數據不敏感的場景。
// 缺點:丟失舊數據,可能導致完整性問題。
// 適用場景:只關注最新狀態的場景,例如實時數據更新、傳感器數據流。

Flowable.create(emitter -> {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i); // 發射數據
}
emitter.onComplete();
}, BackpressureStrategy.LATEST)
.subscribe(System.out::println);

Missing

// Missing
// 行為:不提供內建的背壓機制,當下游無法處理數據時會直接拋出 MissingBackpressureException。
// 優點:讓開發者明確知道數據流中的背壓問題,並提供自定義的處理方式。
// 缺點:需要手動處理數據流量,可能增加代碼複雜性。
// 適用場景:需要開發者完全控制數據流的場景。

Flowable.create(emitter -> {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i); // 發射數據
}
emitter.onComplete();
}, BackpressureStrategy.MISSING)
.subscribe(System.out::println,
throwable -> System.out.println("Error: " + throwable));

ERROR

行為:當下游處理不及時時,直接拋出 MissingBackpressureException。

優點:便於發現和調試背壓問題。

缺點:一旦出現背壓,數據流會中斷。

適用場景:希望明確處理背壓問題,快速發現系統瓶頸。

3. Single

接著談談Single,如其名,就是一次吐一個東西給你:

它只會發射一個事件或一個錯誤,也不會發射完成事件,因為就一個值,發射完就完成了。

成功,就是onSuccess()

失敗,就是onError()

那應該怎麼用呢?

Single<String> single = Single.create(emitter -> {
try {
// 模擬計算
String result = "Result from computation";
emitter.onSuccess(result); // 發射成功事件
} catch (Exception e) {
emitter.onError(e); // 發射錯誤事件
}
});

single.subscribe(
value -> System.out.println("Success: " + value), // onSuccess
error -> System.out.println("Error: " + error) // onError
);

加上幾個操作符來用用:

// map()
Single<Integer> single = Single.just(10)
.map(value -> value * 2); // 將數據乘以 2

single.subscribe(System.out::println);


// flap()
Single<String> single = Single.just("Hello")
.flatMap(value -> Single.just(value + " RxJava"));

single.subscribe(System.out::println);

// zip()
Single<Integer> single1 = Single.just(5);
Single<Integer> single2 = Single.just(10);

Single.zip(single1, single2, (a, b) -> a + b)
.subscribe(System.out::println);

那Single用在什麼情境適合呢?可以說就是API的響應了,RestfulAPI的串接,或者像是從本地數據庫拿資料也是個好的方式。

4. Maybe

使用Maybe的時候,可以想著:「可能成功也可能完成,也可能失敗」,比起Single不是成功就是失敗,Maybe曖昧一點。

至於可能發射的事件也就是三種:

onSuccess()

onError()

onComplete()

使用場景上,可能可以如下面三者:

  1. 數據庫查詢:查詢某條數據是否存在,返回數據或表示不存在。

2. 用戶操作:檢查某條用戶輸入是否有效,返回結果或表示無輸入。

3. 條件判斷:根據條件返回結果或直接完成。

至於實際使用呢,下面給個範例:

Maybe<String> maybe = Maybe.just("Hello, Maybe!");

maybe.subscribe(
value -> System.out.println("Success: " + value), // onSuccess
throwable -> System.out.println("Error: " + throwable), // onError
() -> System.out.println("Completed!") // onComplete
);

// output
Success: Hello, Maybe!

如果要創建一個不發射數據,那就直接用empty:

Maybe<String> maybe = Maybe.empty();

maybe.subscribe(
value -> System.out.println("Success: " + value), // onSuccess
throwable -> System.out.println("Error: " + throwable), // onError
() -> System.out.println("Completed!") // onComplete
);

其餘創建數據發射,跟上面幾種數據流都差不多,用create:

Maybe<Integer> maybe = Maybe.create(emitter -> {
int value = 5;
if (value > 0) {
emitter.onSuccess(value); // 發射數據
} else {
emitter.onComplete(); // 發射完成事件
}
});

maybe.subscribe(
value -> System.out.println("Success: " + value),
throwable -> System.out.println("Error: " + throwable),
() -> System.out.println("Completed!")
);

如果想直接發射錯誤事件,那就用error():

Maybe<String> maybe = Maybe.error(new RuntimeException("Something went wrong"));

maybe.subscribe(
value -> System.out.println("Success: " + value),
throwable -> System.out.println("Error: " + throwable),
() -> System.out.println("Completed!")
);

Maybe也可以用doOnSuccess或doOnComplete:

Maybe<String> maybe = Maybe.just("Hello")
.doOnSuccess(value -> System.out.println("Logging success: " + value))
.doOnComplete(() -> System.out.println("Logging completed"));

maybe.subscribe(System.out::println);

接下來套個使用場景來實作:

用於查詢數據庫中的單條數據,可能返回數據,也可能查無數據。

Maybe<String> queryResult = Maybe.create(emitter -> {
// 模擬數據庫查詢
String result = null; // 模擬查不到數據
if (result != null) {
emitter.onSuccess(result);
} else {
emitter.onComplete();
}
});

queryResult.subscribe(
value -> System.out.println("Result: " + value),
throwable -> System.out.println("Error: " + throwable),
() -> System.out.println("No data found!")
);

5. Completable

看字根大概就可以知道跟「完成」有關,所以事件的完成與否就是關鍵的邏輯,重點也大概如下:

1. 只關注完成與失敗

onComplete():表示任務成功完成。

onError(Throwable):表示任務失敗並發射錯誤。

2. 不發射數據

與 Observable、Single 不同,Completable 不會發射任何數據。

3. 適用場景

當你只需要執行一個任務,而不需要關心數據結果時,Completable 是最合適的選擇,例如 文件寫入、數據庫插入、網絡請求的執行狀態 等。

實際使用也頗簡單:

Completable completable = Completable.complete();

completable.subscribe(
() -> System.out.println("Task completed!"), // onComplete
throwable -> System.out.println("Error: " + throwable) // onError
);

不過確實是有一些騷操作,比如說常用操作符可以這樣做:

用fromAction

Completable completable = Completable.fromAction(() -> {
// 模擬任務執行
System.out.println("Executing an action...");
});

completable.subscribe(
() -> System.out.println("Task completed!"),
throwable -> System.out.println("Error: " + throwable)
);

或用andThen來連接另一個流,讓一個Completable完成後再執行另一個Completable:

Completable completable = Completable.fromAction(() -> {
System.out.println("Performing initial task...");
});

completable
.andThen(Single.just("Next step"))
.subscribe(
result -> System.out.println("Received: " + result),
throwable -> System.out.println("Error: " + throwable)
);

所以大概看了這些舉例,大概心裡有個底知道Completable會用在那些情景了,比如文件寫入、數據庫插入、執行任務…etc

完成後就從onComplete吐完成的訊息給你。

尾聲

以上是Rx比較簡單易懂的流的使用方式,下一期還是會講流,但是會稍微複雜些。

RxJava熟悉使用會有非常好的效益,尤其是把Rx坐在SDK層,徹底分離應用層與SDK的產品時,是一件非常威力強大的做法,你只需要調用被觀察者的接口,在SDK有明確的input與output,就可以免得數值的管理亂套。

但也有人批評:同樣的事情我用Coroutine就可以做了,為什麼一定要Rx?
對於這個問題,我的想法是:如果單純是接案的案子,不是一個大系統的產品,那麼用Coroution就行了,但如果是個大產品,會涉及到很多CallBack,那還是建議Rx吧。

各位新年快樂,咱們下集見!

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

影山小麥機
影山小麥機

Written by 影山小麥機

本職為Mobile工程師,熱愛分享視野,也樂意站在ChatGPT的肩膀上。訂閱小麥機,收割技術、職涯、人生的難題。

No responses yet

Write a response