相关文章推荐

rxjava适用于大量的任务之间没有依赖关系,可以并发执行的情况。并且可以方便的执行哪些操作在哪些线程池中执行。提供了很多变换函数,方便对数据进行操作。
ReactiveX Observable模型允许您使用与数组(如数组)集合相同的简单,可组合操作来处理异步事件流。

很难使用Futures来优化组合条件异步执行流程(或者不可能,因为每个请求的延迟在运行时会有所不同)
另一方面,ReactiveX Observable用于组合异步数据的流和序列。

reactivex 编程思想可以跟 Iterable类比,Iterable是 pull 模式,消费者从Iterable pull 数据。reactive 是 push 模式,当有数据时,主动 push 给消费者。

最核心的两个东西

  1. Observables(被观察者,事件源)
  2. Subscribers(观察者)

回调方法 (onNext, onCompleted, onError)
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法的一个子集:

onNext(T item)

Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。

onError(Exception ex)

当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。

onComplete

正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。

根据Observable协议的定义,onNext可能会被调用零次或者很多次,最后会有一次onCompleted或onError调用(不会同时),传递数据给onNext通常被称作发射,onCompleted和onError被称作通知。

Window

把Observable的数据分批发送出来。observer接收到的是Observable数据。

Observable.publish( )

控制Observable什么时候开始发射数据

BlockingObservable

获取数据时,如果没有可用数据,会阻塞当前线程。增加了一些可用于阻塞Observable发射的数据的操作符。

blockingforeach

翻译自 doc:会阻塞。不会像 subscribe 一样,异步调用 onerror 和 oncomplete。如果有错误直接抛异常。这里面的代码会在发射线程执行,而不是当前线程。

blockingSubscribe

不能指定执行线程,Observable 负责调用当前线程进行回掉。
Note that calling this method will block the caller thread until the upstream terminates normally or with an error. Therefore, calling this method from special threads such as the Android Main Thread or the Swing Event Dispatch Thread is not recommended.
调用线程会阻塞,直到处理结束或者有错误产生。也就是 Observable 的流程走完后,才会执行调用线程之后的代码。

SubscribeOn

SubscribeOn这个操作符指定的是Observable自身在哪个调度器上执行, 而且跟调用的位置没有关系 。只有第一次调用SubscribeOn指定的线程池才有作用。
如果 Observable 是使用现成的数据来发送(fromIterable,just 等),用户是不能显示的使用 subscribeOn 指定的线程。

observeOn

ObserveOn则是指定Observable自身在哪个调度器上执行,当每次调用了 ObserveOn这个操作符时,之后都会在选择的调度器上进行观察,直到再次调用ObserveOn切换了调度器
调用了ObserveOn之后,Observable执行的发射和通知(包括 map 等)都会在这个线程上。影响范围是直到下次调用了 ObserveOn。

注意:当遇到一个异常时ObserveOn会立即向前传递这个onError终止通知,它不会等待慢速消费的Observable接受任何之前它已经收到但还没有发射的数据项。这可能意味着onError通知会跳到(并吞掉)原始Observable发射的数据项前面.

subscribeOn指定了原始的发送线程,发送数据之后.第一个map是在第一个observeOn指定的调度器SCHEDULER里执行,最后的subscribe里的代码在最后一个observeOn(Schedulers.newThread())指定的线程里执行。

mObservable.subscribeOn(Schedulers.newThread())
				.observeOn(SCHEDULER)
                .map(t -> t + " first map: " + Thread.currentThread().getName()) // 这个在SCHEDULER上执行
                .observeOn(Schedulers.newThread())
                .map(t -> t + " second map: " + Thread.currentThread().getName()) // 这个在Schedulers.newThread的线程上执行
                .subscribe(mObserver);

注册一个动作作为原始Observable生命周期事件的一种占位符。
doOnEach
doOnNext
doFinally:Observable 结束时触发
doOnSubscribe :当有观察者订阅的时候,触发
doOnCompleted:当它产生的Observable正常终止调用onCompleted时会被调用。
doOnError:

订阅者onError

将异常处理交给订阅者来做,Observerable的操作符调用链中一旦有一个抛出了异常,就会直接执行onError()方法。
使用RxJava,Observable对象根本不需要知道如何处理错误!操作符也不需要处理错误状态-一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。

结合多个 Observable 的发射项,取每个 Observable 的发射项,然后组合后发出一个。
在这里插入图片描述

rxjava 2

RxJava 2.x 最大的改动就是对于 backpressure 的处理 。为此将原来的 Observable 拆分成了新的 Observable 和 Flowable,。

不过此次更新中,出现了两种观察者模式:

  • Observable ( 被观察者 ) / Observer ( 观察者 )
  • Flowable (被观察者)/ Subscriber (观察者)
Schedulers.io()

对于I / O绑定工作,例如阻塞I / O的异步性能,此调度程序由一个将根据需要增长的线程池支持;默认情况下,Schedulers.io()是一个CachedThreadScheduler,它类似于带有线程缓存的新线程调度程序。

meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ); Schedulers.io( ) by default is a CachedThreadScheduler, which is something like a new thread scheduler with thread caching

Schedulers.computation()

用于计算工作,如事件循环和回调处理;不要将此调度程序用于I / O(改为使用Schedulers.io());默认情况下,线程数等于处理器数。
meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors

  • Emitter:onNext,onError,onComplete方法
  • ObservableEmitter: 实现了Emitter,并绑定Disposable对象,实现订阅和取消订阅功能
  • CreateEmitter:实现ObservableEmitter,包装了Observer。由这个类调用Observer的onNext,onError等方法。
  • ObservableOnSubscribe:实现了ObservableEmitter到Observable的绑定
  • ObservableSource:Observable抽象类的接口
  • Observable:抽象类
  • ObservableCreate:实现类
  • Observer:观察者接口

RxJava 中上游是怎么发送事件的,下游又是怎样接收到的?

Observable持有ObservableOnSubscribe对象,ObservableOnSubscribe持有ObservableEmitter对象,ObservableEmitter持有Observer对象。Observable调用ObservableEmitter的onNext方法时,会由ObservableEmitter通知到Observer。
ObservableEmitter通知给Observer之前会检查null值、订阅关系是否存在等

几篇不错的博客

基本工作原理: https://juejin.im/post/5a521d68f265da3e4e25750e
线程切换实现:https://juejin.im/post/5a6751af6fb9a01cb2571794
https://www.jianshu.com/p/88aa273d37be

我写的rxjava的几个例子:https://github.com/jliang981/commonJava/tree/master/jliang-midware/src/main/java/com/step/jliang/rxjava

rxjava适用于大量的任务之间没有依赖关系,可以并发执行的情况。回调方法 (onNext, onCompleted, onError)Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法的一个子集:onNext(T item)Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现... zip函数允许你传入多个请求,然后合并出另外的结果传出来,普通的用法就不多说了,网上一堆介绍的 然后做项目时有个疑问点,Observable.zip如果传入一个列表,合并列表里的所有请求的时候,请求回来的顺序是未知的,返回回来的数组是否会按传入时的顺序返回回来呢。于是做了以下实验: Integer[] skuSerials={1,2,3,4,5}; ArrayList<Observable<Integer>> requestList=new Array RxJava 在 GitHub 的介绍RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM // 翻译:RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
使用zip方法可以把多个类型不同的数据源Observable合并为一个类型的数据源Observable。 public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) 举个例子: * 把多个Observable合并后,并且把这些Observable的数据进行转换再发射出去。转换之后的数据数目由最短数据长度的那个Observable决定。发射完最终会自动调用观察者的onComplete方法() * 如以下代码: 数据长度为6的observable1和数据长度为4的observable2进行合并转换后,观察者只接收到4个数据 Observable observable1 = Observabl...
1、zip Observable ob1 = Observable.just(1,2,3); Observable ob2 = Observable.just(10,20,30,40); Observable.zip(ob1,ob2, (int1, int2) -> int1+"-"+int2).subscribe(o -> Log.v("TAG", "zip:"+o
zip操作符允许接受多个ObservableSource发射的数据,并能够将他们重新组合并重新发射的一种操作符。常见的使用方式: 首先传入若干个ObservableSource,然后每个ObservableSource都能通过observer.onNext来发射数据,最终在Function接口实现方法中可以获取到这些数据,并做一个重新整合或其他操作,然后返回。比如这里我将获取的值相加然后返回,最终在Observer.onNext中能获取到值123: 看看zip的实现: 将传入的Functi
 
推荐文章