RxJava简单分析

RxJava2源码分析

RxJava是什么

RxJava是一个非常流行的基于观察者模式的响应式编程框架,在Android开发等领域有很广泛的应用。作为Java开发者,我们有必要了解一下RxJava的实现原理,掌握RxJava的设计思想,这样我们才能更加熟练的使用RxJava来编写优秀的代码。

基本

首先我们建立一个常见的Observable,用lamdba快速创建了一些数据

现在层层剥离,可以看到emitter就是定义的数据操作

ObserveableEmitter继承了Emitter

最后我们的数据又包装成ObservableOnSubscribe

可以看到最终创建的是一个ObserveCreate对象,顺便会判空,判空代码很简单就是判null抛NullPointException

ObservableCreate继承了Observable接口,内部持有onSubscribe对象也就是我们的上游数据source

onAssembly是不是很熟悉,拿出你们的iPhone,后背就写着Assembled in China,这里就是对我们的数据源进行组装了(上面其实也组装了特别多),这里有个全局回调函数,全局搜索apply()还不止一个,暂时放一放

那么到这里数据已经包装好了,下面是下游(观察者)的精彩操作

调用onSubscribe开始订阅上游数据,可以看到同样有全局回调函数,和上面一样默认空

上上双重判空略过,我们只关心整个的逻辑,subscribeActual才是我们需要关注的,observer又被包装成CreateEmitter准备发射

这里的onSubscribe就是我们自己写的回调

进入subscribe,我们又回来啦,这里直接调用之前的emitter发送数据

回调下游的onNext,其中isDisposed判断是否要切断数据流

我们上游是有一个onComplete表示切断数据,现在调用了,可以看到他也会判断是否切断,成功回调后,最终也会到dispose这个方法切断数据流

直接把自己传进去,可以看到这个是由DisposableHelper来真正操作的,前面的判断也是如此

判断是否已经处理完,这个函数决定是否回调observable的onXX,否则当调用onComplete时表示切断水管且不再回调onXX,还可以看到这里是一个原子引用,保证并发的安全(666)

complete后面接着一个next,但是此时我们已经切断了数据,所以虽然接收到但是我们也只得抛弃掉

这就完成了一个发射数据-> 接收数据的流程

线程切换

无独有偶,线程也只是变成scheduler包装起来还是那个Observable只不过变成SubscribeOn了

有了上面的经验我们直接看subcribeActual的里一个重载函数

可以看到scheduler里面真正要跑的是worker

把我们的任务包装成disposetask,这是一个runnable


用了线程池来启动任务


进行线程切换如Schedules.io()会用到concurrent并发包的线程池,这里把线程服务放入线程池

最终来到scheduleActual,这里有一系列的操作线程池运行我们的任务,还会包装成future

可以看到这时的dispose操作伴随着线程池的shutdown

线程经过包装,默认优先级是5,设置成守护进程

worker <- task <- thread 就是大致的一个流程

总结

RxJava的核心思想简单理解就差不多了,剩下的应该就很容易去分析了

分享到: