RxJava 2 版本的 Rxbus

基于 RxJava 的 RxBus 作为一种事件总线,相信许多人都了解一些,Square 的 Otto 也因此弃用,因为现在 RxJava 太火了,用它几行代码就可以写出事件总线。不过大家所熟悉的是基于 RxJava 1.x 版本的,2016 年十月底 RxJava 更新到 2.x 版本了,具体变化请看 What’s different in 2.0,下面总结下适合不同场景的 RxJava 2 版本的 RxBus 写法。

  • 没有背压处理(Backpressure)的 Rxbus

  • 有背压处理的 RxBus

  • 有异常处理的 Rxbus (订阅者处理事件出现异常也能继续收到事件)

没有背压处理(Backpressure)的 Rxbus

在 RxJava 2.0 之后,io.reactivex.Observable中没有进行背压处理了,如果有大量消息堆积在总线中来不及处理会产生MissingBackpressureException或者OutOfMemoryError,有新的类io.reactivex.Flowable 专门针对背压问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
public class RxBus {
private final Subject<Object> mBus;
private RxBus() {
// toSerialized method made bus thread safe
mBus = PublishSubject.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Observable<Object> toObservable() {
return mBus;
}
public boolean hasObservers() {
return mBus.hasObservers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}

有背压处理的 RxBus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import io.reactivex.Flowable;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;
public class RxBus {
private final FlowableProcessor<Object> mBus;
private RxBus() {
// toSerialized method made bus thread safe
mBus = PublishProcessor.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Flowable<T> toFlowable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Flowable<Object> toFlowable() {
return mBus;
}
public boolean hasSubscribers() {
return mBus.hasSubscribers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}

有异常处理的 Rxbus

上面的两种 RxBus 在订阅者处理事件出现异常后,订阅者无法再收到事件,这是 RxJava 当初本身的设计原则,但是在事件总线中这反而是个问题,不过 JakeWharton 大神写了即使出现异常也不会终止订阅关系的 RxRelay,所以基于 RxRelay 就能写出有异常处理能力的 Rxbus。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.Observable;
public class RxBus {
private final Relay<Object> mBus;
private RxBus() {
// toSerialized method made bus thread safe
mBus = PublishRelay.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.accept(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Observable<Object> toObservable() {
return mBus;
}
public boolean hasObservers() {
return mBus.hasObservers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}