开源库地址:
https://github.com/ReactiveX/RxJava
解读版本:1.1.8
怎么初始化一个观察者?(以下例子以订阅String类型为例子)
我们可以直接使用
Observer
来初始化,
Observer
是一个接口,里面有
onNext
,
onCompleted
,
onError
三个抽象方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
Observer<String> observer=new Observer<String>() { @Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(String s) { } };
|
此外我们可以使用
Observer
的子类
Subscriber
来初始化。
Subscriber
相对于
Observer
增加了
onStart
和
unsubscribe
,事实上,即使你使用的是
Observer
,在内部仍然会被包装为
Subscriber
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
Subscriber<String> subscriber=new Subscriber<String>() { @Override public void onCompleted() { }
@Override public void onError(Throwable e) {
}
@Override public void onNext(String s) {
} };
|
怎么初始化一个被观察者?
初始化被观察者使用
Observable
,在
call
内进行处理事件。可以看出call的参数为
Subscriber
,这也进一步证实了
Observer
会被包装为
Subscriber
。只要被观察者调用call方法,订阅者就可以接受到事件/数据了。
1 2 3 4 5 6 7 8
|
Observable<String> observable=Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); subscriber.onNext("welcome to china"); subscriber.onCompleted(); } });
|
那么问题来了,怎么建立一个订阅关系?
只需被观察者调用
Observable.subscribe(Subscriber)
即可。
于是整个流程是这样的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
Observable<String> observable=Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); subscriber.onNext("welcome to china"); subscriber.onCompleted(); } });
Observer<String> observer=new Observer<String>() { @Override public void onCompleted() { Log.d("JG","onCompleted"); }
@Override public void onError(Throwable e) { Log.d("JG","onError"); }
@Override public void onNext(String s) { Log.d("JG",s); } }; observable.subscribe(observer);
|
运行结果如下。
此外,该库还有非常完善的异常捕获机制,当在处理数据时发生异常,可以自动捕获并回调到
onError
中。将
observable
修改成如下后,进行测试:
1 2 3 4 5 6 7
|
Observable<String> observable=Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { throw new RuntimeException(); } });
|
测试结果如下:
OK,基本用法已经介绍完了,接下来,本篇将深入源码内部一探究竟(除了操作符,下篇会对操作符进行完全解析,如果只对操作符感兴趣敬请期待下篇)。如果你只是刚接触RxJava,请看
给 Android 开发者的 RxJava 详解
这篇文章。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
public abstract class Subscriber<T> implements Observer<T>, Subscription { private static final long NOT_SET = Long.MIN_VALUE; private final SubscriptionList subscriptions; private final Subscriber<?> subscriber; private Producer producer;
private long requested = NOT_SET;
protected Subscriber() { this(null, false); } protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) { this.subscriber = subscriber; this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
public interface Func0<R> extends Function, Callable<R> { @Override R call(); }
public interface Func1<T, R> extends Function { R call(T t); }
public interface Func2<T1, T2, R> extends Function { R call(T1 t1, T2 t2); }
public interface FuncN<R> extends Function { R call(Object... args); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public interface Action0 extends Action { void call(); }
public interface Action1<T> extends Action { void call(T t); }
public interface Action2<T1, T2> extends Action { void call(T1 t1, T2 t2); }
public interface ActionN extends Action { void call(Object... args); }
|
1 2 3
|
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { }
|
1 2 3
|
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> { }
|
1 2 3
|
public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> { }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); }
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); } public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return create(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
|
1 2 3 4 5 6
|
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
|
1 2 3 4
|
public abstract void onSuccess(T value);
public abstract void onError(Throwable error);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
Action1<SubjectObserver<T>> onStart = Actions.empty(); Action1<SubjectObserver<T>> onAdded = Actions.empty(); Action1<SubjectObserver<T>> onTerminated = Actions.empty(); @Override public void call(final Subscriber<? super T> child) { SubjectObserver<T> bo = new SubjectObserver<T>(child); addUnsubscriber(child, bo); onStart.call(bo); if (!child.isUnsubscribed()) { if (add(bo) && child.isUnsubscribed()) { remove(bo); } } }
boolean add(SubjectObserver<T> o) { do { State oldState = get(); if (oldState.terminated) { onTerminated.call(o); return false; } State newState = oldState.add(o); if (compareAndSet(oldState, newState)) { onAdded.call(o); return true; } } while (true); }
|
1 2 3 4 5 6 7 8 9
|
AsyncSubject<Object> subject = AsyncSubject.create(); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onCompleted();
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) { final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>(); if (hasDefault) { state.setLatest(NotificationLite.instance().next(defaultValue)); } state.onAdded = new Action1<SubjectObserver<T>>() {
@Override public void call(SubjectObserver<T> o) { o.emitFirst(state.getLatest(), state.nl); } }; state.onTerminated = state.onAdded; return new BehaviorSubject<T>(state, state); } @Override public void onNext(T v) { Object last = state.getLatest(); if (last == null || state.active) { Object n = nl.next(v); for (SubjectObserver<T> bo : state.next(n)) { bo.emitNext(n, state.nl); } } } @Override public void onCompleted() { Object last = state.getLatest(); if (last == null || state.active) { Object n = nl.completed(); for (SubjectObserver<T> bo : state.terminate(n)) { bo.emitNext(n, state.nl); } } }
|
1 2 3 4 5 6 7 8 9
|
PublishSubject<Object> subject = PublishSubject.create();
subject.subscribe(observer1); subject.onNext("one"); subject.onNext("two");
subject.subscribe(observer2); subject.onNext("three"); subject.onCompleted();
|
1 2 3 4 5 6 7 8
|
ReplaySubject<Object> subject = ReplaySubject.create(); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onCompleted(); subject.subscribe(observer1); subject.subscribe(observer2);
|