2023年7月24日发(作者:)
Rxjava3⽂档级教程⼀:介绍和基本使⽤商业转载请联系作者获得授权,⾮商业转载请注明出处。⽬录⼀ Rxjava3简介ReactiveX的历史ReactiveX是Reactive Extensions的缩写,⼀般简写为Rx,最初是LINQ的⼀个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11⽉开源,Rx是⼀个编程模型,⽬标是提供⼀致的编程接⼝,帮助开发者更⽅便的处理异步数据流,Rx近⼏年越来越流⾏了,现在已经⽀持⼏乎全部的流⾏编程语⾔了,Rx的⼤部分语⾔库由ReactiveX这个组织负责维护,⽐较流⾏的有RxJava/RxJS/,社区⽹站是 。什么是ReactiveX微软给的定义是,Rx是⼀个函数库,让开发者可以利⽤可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使⽤Rx,开发者可以⽤Observables表⽰异步数据流,⽤LINQ操作符查询异步数据流, ⽤Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。给的定义是,Rx是⼀个使⽤可观察数据流进⾏异步编程的编程接⼝,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。Observable拥有它的近亲Iterable的全部优雅与灵活。任何对Iterable的操作,你都可以对Observable使⽤。RxJavaRxJava是响应式编程(Reactive Extensions)的java实现,它基于观察者模式的实现了异步编程接⼝。Rxjava 3.x 的;RxJava2将被⽀持到2021年2⽉28⽇,错误的会同时在2.x和3.x修复,但新功能只会在3.x上添加;Rxjava 3.0的⼀些改变:;Rxjava 3.x ⽂档可以在中找到;使⽤Rxjava3.x之前的准备⼯作:添加依赖 //RxJava的依赖包 implementation '3:rxandroid:3.0.0' //RxAndroid的依赖包
implementation '3:rxjava:3.0.0'RxJava 在很长⼀段时间⾥以java6 为baseline( Android 运⾏时⽀持的锅),但在即将到来的 Android Studio 4预览中,⼀个叫做desuging 的过程能够将许多 Java 7和8的特性,透明地转换成与 Java 6兼容的特性。因此我们可以将 RxJava 的基准提⾼到 java 8,并为许多 Java 8构造增加官⽅⽀持⽐如:Optional、Stream等,因此必须将项⽬的编译⽬标设置更改为 java8:android { compileOptions { sourceCompatibility N_1_8 targetCompatibility N_1_8 }}⼆ Rx中的⼀些概念2.1 字段含义Reactive 直译为反应性的,有活性的,根据上下⽂⼀般翻译为反应式、响应式Iterable 可迭代对象,⽀持以迭代器的形式遍历,许多语⾔中都存在这个概念Observable 可观察对象,在Rx中定义为更强⼤的Iterable,在观察者模式中是被观察的对象,⼀旦数据产⽣或发⽣变化,会通过某种⽅式通知观察者或订阅者Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的⼀个特殊实现emit 直译为发射,发布,发出,含义是Observable在数据产⽣或变化时发送通知给Observer,调⽤Observer对应的⽅法,⽂章⾥⼀律译为发射items 直译为项⽬,条⽬,在Rx⾥是指Observable发射的数据项,⽂章⾥⼀律译为数据,数据项。2.2 上/下流在RxJava中,数据以流的⽅式组织:Rxjava包括⼀个源数据流,源数据流后跟着若⼲个⽤于消费数据流的步骤。source .operator1() .operator2() .operator3() .subscribe(consumer)在代码中,对于operator2来说,在它前⾯叫做上流,在它后⾯的叫做下流。2.3 流对象在RxJava的⽂档中,emission, emits, item, event, signal, data and message都被认为在数据流中被传递的数据对象。2.4 背压(Backpressure)当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,⽽是存放在⼀个异步缓存池中,如果缓存池中的数据⼀直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。为此,RxJava带来了backpressure的概念。背压是⼀种流量的控制步骤,在不知道上流还有多少数据的情形下控制内存的使⽤,表⽰它们还能处理多少数据。背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,⼀种告诉上游的被观察者降低发送速度的策略在Rxjava1.0中,有的Observable⽀持背压,有的不⽀持,为了解决这种问题,2.0把⽀持背压和不⽀持背压的Observable区分开来:⽀持背压的有Flowable类,不⽀持背压的有Observable,Single, Maybe and Completable类。1. 在订阅的时候如果使⽤FlowableSubscriber,那么需要通过t(_VALUE)去主动请求上游的数据项。如果遇到背压报错的时候,FlowableSubscriber默认已经将错误try-catch,并通过onError()进⾏回调,程序并不会崩溃;2. 在订阅的时候如果使⽤Consumer,那么不需要主动去请求上游数据,默认已经调⽤了t(_VALUE)。如果遇到背压报错、且对Throwable的Consumer没有new出来,则程序直接崩溃;3. 背压策略的上游的默认缓存池是128。背压策略:1. error, 缓冲区⼤概在1282. buffer, 缓冲区在1000左右3. drop, 把存不下的事件丢弃4. latest, 只保留最新的5. missing, 缺省设置,不做任何操作public enum BackpressureStrategy { /** * OnNext events are written without any buffering or dropping. * Downstream has to deal with any overflow. *
Useful when one applies one of the custom-parameter onBackpressureXXX operators. */ MISSING, /** * Signals a MissingBackpressureException in case the downstream can't keep up. */ ERROR, /** * Buffers all onNext values until the downstream consumes it. */ BUFFER, /** * Drops the most recent onNext value if the downstream can't keep up. */ DROP, /** * Keeps only the latest onNext value, overwriting any previous value if the * downstream can't keep up. */ LATEST}2.5 线程调度器(Schedulers)对于Android开发者⽽⾔,RxJava最简单的是通过调度器来⽅便地切换线程。在不同平台还有不同的调度器,例如我们Android的主线程:read()。调度器read()ation()()ead()功能需要引⽤rxandroid, 切换到UI线程⽤于计算任务,如事件循环和回调处理,默认线程数等于处理器数量⽤于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需求,它默认是⼀个CacheThreadScheduler为每⼀个任务创建⼀个新线程在当前线程中⽴刻执⾏,如当前线程中有任务在执⾏则将其暂停, 等插⼊进来的任务执⾏完成之后,在将未完成的任务继续完成。指定Executor作为调度器line()(executor)2.6 事件调度器RxJava事件发出去并不是置之不顾,要有合理的管理者来管理它们,在合适的时机要进⾏释放事件,这样才不会导致内存泄漏,这⾥的管理者我们称为事件调度器(或事件管理者)CompositeDisposable。2.7 基类RxJava 3 中的基类相⽐RxJava 2 没啥改变,主要有以下⼏个基类:le:发送0个N个的数据,⽀持Reactive-Streams和背压able:发送0个N个的数据,不⽀持背压,:只能发送单个数据或者⼀个错误table:没有发送任何数据,但只处理 onComplete 和 onError 事件。:能够发射0或者1个数据,要么成功,要么失败。2.8 Observables的"热"和"冷"Observable什么时候开始发射数据序列?这取决于Observable的实现,⼀个"热"的Observable可能⼀创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有⼀些数据错过了)。⼀个"冷"的Observable会⼀直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。在⼀些ReactiveX实现⾥,还存在⼀种被称作Connectable的Observable,不管有没有观察者订阅它,这种Observable都不会开始发射数据,除⾮Connect⽅法被调⽤。三 RxJava的简单使⽤需要知道的是,RxJava以观察者模式为⾻架,有两种常见的观察者模式:Observable(被观察者)/Observer(观察者)Flowable(被观察者)/Subscriber(观察者)RxJava2/3中,Observeable⽤于订阅Observer,是不⽀持背压的,⽽Flowable⽤于订阅Subscriber,是⽀持背压(Backpressure)的。3.1 Observable/ObserverObservable正常⽤法: Observable mObservable=(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { (1); (2); lete(); } }); Observer mObserver=new Observer() { //这是新加⼊的⽅法,在订阅后发送数据之前, //回⾸先调⽤这个⽅法,⽽Disposable可⽤于取消订阅 @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.e("lucas", "onNext: "+value ); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; ibe(mObserver);这种观察者模型不⽀持背压:当被观察者快速发送⼤量数据时,下游不会做其他处理,即使数据⼤量堆积,调⽤链也不会报MissingBackpressureException,消耗内存过⼤只会OOM。所以,当我们使⽤Observable/Observer的时候,我们需要考虑的是,数据量是不是很⼤(官⽅给出以1000个事件为分界线作为参考)。3.2 Flowable/Subscriber (0,10) .subscribe(new Subscriber() { Subscription sub; //当订阅后,会⾸先调⽤这个⽅法,其实就相当于onStart(), //传⼊的Subscription s参数可以⽤于请求数据或者取消订阅 @Override public void onSubscribe(Subscription s) { Log.w("TAG","onsubscribe start"); sub=s; t(1); Log.w("TAG","onsubscribe end"); } @Override public void onNext(Integer o) { Log.w("TAG","onNext--->"+o); t(1); } @Override public void onError(Throwable t) { tackTrace(); } @Override public void onComplete() { Log.w("TAG","onComplete"); } });输出如下:onsubscribe startonNext--->0onNext--->Next--->9onCompleteonsubscribe endFlowable是⽀持背压的,也就是说,⼀般⽽⾔,上游的被观察者会响应下游观察者的数据请求,下游调⽤request(n)来告诉上游发送多少个数据。这样避免了⼤量数据堆积在调⽤链上,使内存⼀直处于较低⽔平。当然,Flowable也可以通过creat()来创建: (new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) throws Exception { (1); (2); (3); (4); lete(); } } //需要指定背压策略 , );Flowable虽然可以通过create()来创建,但是你必须指定背压的策略,以保证你创建的Flowable是⽀持背压的。根据上⾯的代码的结果输出中可以看到,当我们调⽤t(n)⽅法的时候,不等onSubscribe()中后⾯的代码执⾏,就会⽴刻执⾏到onNext⽅法,因此,如果你在onNext⽅法中使⽤到需要初始化的类时,应当尽量在t(n)这个⽅法调⽤之前做好初始化的⼯作;当然,这也不是绝对的,我在测试的时候发现,通过create()⾃定义Flowable的时候,即使调⽤了t(n)⽅法,也会等onSubscribe()⽅法中后⾯的代码都执⾏完之后,才开始调⽤onNext。TIPS: 尽可能确保在request()之前已经完成了所有的初始化⼯作,否则就有空指针的风险。3.3 其他的观察者最常⽤的其实就是上⾯说的两种订阅观察者,但是⼀些情况下,我们也会⽤到⼀些其他的⼀类观察者⽐如Single/SingleObserverCompletable/CompletableObserverMaybe/MaybeObserver3.3.1 Single/SingleObserverSingle类似于Observable,不同的是,它总是只发射⼀个值,或者⼀个错误通知,⽽不是发射⼀系列的值(当然就不存在背压问题),所以当你使⽤⼀个单⼀连续事件流,这样你可以使⽤Single。Single观察者只包含两个事件,⼀个是正常处理成功的onSuccess,另⼀个是处理失败的onError。因此,不同于Observable需要三个⽅法onNext, onError, onCompleted,订阅Single只需要两个⽅法:onSuccess - Single发射单个的值到这个⽅法onError - 如果⽆法发射需要的值,Single发射⼀个Throwable对象到这个⽅法Single只会调⽤这两个⽅法中的⼀个,⽽且只会调⽤⼀次,调⽤了任何⼀个⽅法之后,订阅关系终⽌。Single的操作符:Single也可以组合使⽤多种操作,⼀些操作符让你可以混合使⽤Observable和Single:操作符composeconcat andconcatWithcreateerrorflatMapflatMapObservablefromjustmap返回值Single说明创建⼀个⾃定义的操作符Observable连接多个Single和Observable发射的数据SingleSingleSingleObservableSingleSingleSingle调⽤观察者的create⽅法创建⼀个Single返回⼀个⽴即给订阅者发射错误通知的Single返回⼀个Single,它发射对原Single的数据执⾏flatMap操作后的结果返回⼀个Observable,它发射对原Single的数据执⾏flatMap操作后的结果将Future转换成Single返回⼀个发射⼀个指定值的Single返回⼀个Single,它发射对原Single的数据执⾏map操作后的结果将⼀个Single(它发射的数据是另⼀个Single,假设为B)转换成另⼀个Single(它发射来⾃另⼀个Single(B)的数据)合并发射来⾃多个Single的数据指⽰Single在指定的调度程序上调⽤订阅者的⽅法将⼀个发射错误通知的Single转换成⼀个发射指定数据项的SinglemergeSinglemerge and mergeWithobserveOnonErrorReturnObservableSingleSinglesubscribeOn操作符timeouttoSinglezip and zipWithSingle返回值SingleSingleSingle指⽰Single在指定的调度程序上执⾏操作说明它给原有的Single添加超时控制,如果超时了就发射⼀个错误通知将⼀个发射单个值的Observable转换为⼀个Single将多个Single转换为⼀个,后者发射的数据是对前者应⽤⼀个函数后的结果操作符详细的图解可以参考英⽂⽂档: //被观察者 Single single = (new SingleOnSubscribe() { @Override public void subscribe(SingleEmitter e) throws Exception { ess("test"); ess("test2");//错误写法,重复调⽤也不会处理,因为只会调⽤⼀次 } }); //订阅观察者SingleObserver ibe(new SingleObserver() { @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(String s) { //相当于onNext和onComplete Log.d("lucas", s ); } @Override public void onError(Throwable e) { } });//运⾏结果2020-04-03 23:02:37.337 15462-15462/ication D/lucas: test3.3.2 Completable/CompletableObserver如果你的观察者连onNext事件都不关⼼,可以使⽤Completable,它只有onComplete和onError两个事件: (new CompletableOnSubscribe() {//被观察者 @Override public void subscribe(CompletableEmitter e) throws Exception { lete();//单⼀onComplete或者onError } }).subscribe(new CompletableObserver() {//观察者 @Override public void onSubscribe(Disposable d) { } @Override public void onComplete() { Log.e("lucas", "onComplete: "); } @Override public void onError(Throwable e) { } });//打印结果2020-04-03 23:12:08.099 16264-16264/ication E/lucas: onComplete:
要转换成其他类型的被观察者,也是可以使⽤toFlowable()、toObservable()等⽅法去转换。3.3.3 Maybe/MaybeObserver如果你有⼀个需求是可能发送⼀个数据或者不会发送任何数据,这时候你就需要Maybe,它类似于Single和Completable的混合体。 Maybe可能会调⽤以下其中⼀种情况(也就是所谓的Maybe):onSuccess或者onErroronComplete或者onError可以看到onSuccess和onComplete是互斥的存在,例⼦代码如下: //被观察者 Maybe maybe = (new MaybeOnSubscribe() { @Override public void subscribe(MaybeEmitter e) throws Exception { ess("test");//发送⼀个数据的情况,或者onError,不需要再调⽤onComplete(调⽤了也不会触发onComplete回调⽅法) //lete();//不需要发送数据的情况,或者onError } }); //订阅观察者 ibe(new MaybeObserver() { @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(String s) { //发送⼀个数据时,相当于onNext和onComplete,但不会触发另⼀个⽅法onComplete Log.i("lucas", s); } @Override public void onComplete() { //⽆数据发送时候的onComplete事件 Log.i("lucas", "onComplete"); } @Override public void onError(Throwable e) { } });//打印结果2020-04-03 23:14:40.266 16558-16558/ication I/lucas: test要转换成其他类型的被观察者,也是可以使⽤toFlowable()、toObservable()等⽅法去转换。//判断是否登陆(isLogin()) //可能涉及到IO操作,放在⼦线程 .subscribeOn(ead()) //取回结果传到主线程 .observeOn(read()) .subscribe(new MaybeObserver() { @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(Boolean value) { if(value){ ... }else{ ... } } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });复制代码上⾯就是Maybe/MaybeObserver的普通⽤法,你可以看到,实际上,这种观察者模式并不⽤于发送⼤量数据,⽽是发送单个数据,也就是说,当你只想要某个事件的结果(true or false)的时候,你可以⽤这种观察者模式3.4 事件调度器释放事件public class Main { private static CompositeDisposable mRxEvent = new CompositeDisposable(); public static void main(String[] args) { Disposable subscribe = (new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { ("俊俊俊很帅"); ("你值得拥有"); ("取消关注"); ("但还是要保持微笑"); lete(); } }).subscribe( new Consumer() { @Override public void accept(@NonNull String s) throws Exception { //对应onNext() n("accept=" + s); } }, new Consumer() { @Override public void accept(@NonNull Throwable throwable) throws Exception { //对应onError() } }, new Action() { @Override public void run() throws Exception { //对应onComplete() } }, new Consumer() { @Override public void accept(@NonNull Disposable disposable) throws Exception { //对应onSubscribe() } });
(subscribe); (); }}CompositeDisposable提供的⽅法中,都是对事件的管理dispose():释放所有事件clear():释放所有事件,实现同dispose()add():增加某个事件addAll():增加所有事件remove():移除某个事件并释放delete():移除某个事件3.5 ⼩结这是上⾯那些基类被观察者的上层接⼝://Observable接⼝interface ObservableSource { void subscribe(Observer super T> observer);}//Single接⼝interface SingleSource { void subscribe(SingleObserver super T> observer);}//Completable接⼝interface CompletableSource { void subscribe(CompletableObserver observer);}//Maybe接⼝interface MaybeSource { void subscribe(MaybeObserver super T> observer);}//Flowable接⼝public interface Publisher { public void subscribe(Subscriber super T> s);}其实我们可以看到,每⼀种观察者都继承⾃各⾃的接⼝,这也就把他们能完全的区分开,各⾃独⽴(特别是Observable和Flowable),保证了他们各⾃的拓展或者配套的操作符不会相互影响。例如flatMap操作符实现://Flowable中flatMap的定义Flowable flatMap(Function super T, ? extends Publisher extends R>> mapper);//Observable中flatMap的定义Observable flatMap(Function super T, ? extends ObservableSource extends R>> mapper);假如你想为Flowable写⼀个⾃定义的操作符,那么只要保证Function< Publisher >中的类型实现了Publisher接⼝即可。这么说可能很抽象,⼤家不理解其实也没关系,因为并不推荐⼤家⾃定义操作符,RxJava中的操纵符的组合已经可以满⾜⼤家的需求了。当然,你也会注意到上⾯那些接⼝中的subscribe()⽅法的返回类型为void了,在1.X中,这个⽅法⼀般会返回⼀个Subscription对象,⽤于取消订阅。现在,这个功能的对象已经被放到观察者Observer或者subscriber的内部实现⽅法中了,Flowable/Subscriberpublic interface Subscriber {
public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete();}public interface Subscription { public void request(long n); public void cancel();}复制代码上⾯的实例中,onSubscribe(Subscription s)传⼊的参数s就肩负着取消订阅的功能,当然,他也可以⽤于请求上游的数据。在Observable/observer中,传⼊的参数是另⼀个对象Observable/Observerpublic interface Observer { void onSubscribe(Disposable d); void onNext(T value); void onError(Throwable e); void onComplete();}public interface Disposable { /** * Dispose the resource, the operation should be idempotent. */ void dispose(); /** * Returns true if this resource has been disposed. * @return true if this resource has been disposed */ boolean isDisposed();}复制代码在Observer接⼝中,onSubscribe(Disposable d)⽅法传⼊的Disposable也是⽤于取消订阅,基本功能是差不多的,只不过命名不⼀致,⼤家知道就好。其实这种设计可以说还是符合逻辑的,因为取消订阅这个动作就只有观察者(Observer等)才能做的,现在把它并⼊到观察者内部,也算顺理成章吧。Rxjava中,被观察者不能接收null作为数据源。补充 1 Rxjava 3.x 主要更新内容如下API changes:将eagerTruncate添加到replay运算符,以便head节点将在截断时丢失它保留的项引⽤ ()新增 pplier() ()使⽤ Scheduler 添加 concatMap,保证 mapper 函数的运⾏位置 ()新增 startWithItem 和 startWithIterable ()ConnectableFlowable/ConnetableFlowable 重新设计 ()将 as() 并⼊ to() ()更改 tIfEmpty() 以返回 Single ()⽤ Supplier 代替 Callable ()将⼀些实验操作符推⼴到标准 ()从某些主题/处理器中删除 getValues() ()删除 replay(Scheduler) 及其重载 ()删除 dematerialize() ()删除 startWith(T|Iterable) ()删除 as() ()删除 le(T) ()删除 ibe(4 args) ()删除 ibe(4 args) ()删除 letable() ()删除 ngGet() ()参考⽂章:因为写RxJava系列的⽂章时进⾏了很多阅读和参考,因此不分⼀⼆三等,将全系列的参考引⽤统⼀如下:RxJava3官⽅github:ReactiveX⽂档中⽂翻译:single:操作符系列讲的很好的⽂章:基础介绍:
发布者:admin,转转请注明出处:http://www.yc00.com/news/1690213550a315810.html
评论列表(0条)