RxJava2.0---Observable(被观察者)Observer(观察者)

RxJava2.0---Observable(被观察者)Observer(观察者)

2023年7月25日发(作者:)

RxJava2.0----Observable(被观察者)Observer(观察者)⼀ 引⾔RxJava2.0中出现了两种观察者模式:● Observable(被观察者)/Observer(观察者)● Flowable(被观察者)/Subscriber(观察者)其中Observable和Flowable可以相互切换,但是观察者却不能共⽤。除了这两种,还有4种:● Single/SingleObserver● Completable/CompletableObserver● Maybe/MaybeObserve● Subject/able的创建Observable的创建是通过操作函数,例如:create(),defer(),just(),from(),rang(),timer(),interval()等⽅法来创建。()创建Observable最基本的创建⽅式。 传⼊了⼀个 ObservableOnSubscribe对象作为参数,它的作⽤相当于⼀个计划表,当 Observable被订阅的时候,ObservableOnSubscribe的subscribe()⽅法会⾃动被调⽤,数据发射源ObservableEmitter对事件依照设定依次发射create使⽤实例://数据发射源对象ObservableEmitter observableEmitter ;private void sendData(){if(observableEmitter != null){ for(int i = 0;i < 10; i++){ //数据发射源对象发射数据 observableEmitter .onNext("Observable 数据data " +i); } observableEmitter .onComplete();}} private ObservableOnSubscribe observableOnSubscribe = new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { printThread("subscribe"); observableEmitter = e;

observableEmitter .onNext("Observable 数据data"); sendData(); } };Observable observable = (observableOnSubscribe )()● Just将单个数据转换为发射那个数据的Observable,创建⼀个发射指定值的Observable● Just类似于From,但是From会将数组或Iterable的素具取出然后逐个发射,⽽Just只是简单的原样发射,将数组或Iterable当做单个数据● 注意:如果你传递null给Just,它会返回⼀个发射null值的Observable。 如果需要空Observable你应该使⽤Empty操作符● RxJava将这个操作符实现为just函数,它接受1-10个参数,返回⼀个按参数列表顺序发射这些数据的Observable。JustObservable observable = (1);//单个

Observable observable = (1,2,3,4,5,6,7,8,9,10);//最多10个参数()它接受两个参数,⼀个是范围的起始值,⼀个是范围的数据的数⽬。如果你将第⼆个参数设为0,将导致Observable不发射任何数据(如果设置为负数,会抛异常)。rangeObservable observable = (0,10);//发射0,1,2,3,4,5,6,7,8,()直到有观察者订阅时才创建Observable,并且为每个观察者创建⼀个新的Observable,该操作符能保证订阅执⾏时数据源是最新的数据deferString test="old data"; final Observable observable=(test); Observable observable_defer = (new Callable>() { @Override public ObservableSource call() throws Exception { return observable; } }); test="new data"; ibe(consumer2);//打印 old data observable_ibe(consumer2);//打印 new ray()/fromIterable/fromPublisher/fromFuturefrom操作符可以转换Future、Iterable和数组。对于Iterable和数组,产⽣的Observable会发射Iterable或数组的每⼀项数据。对于Future,它会发射()⽅法返回的单个数据,并且还可以增加通过: from(Future,timeout, timeUnit)指定超时时间,如果执⾏的时候Future超时会回调onError()⽅法。al()/intervalRange()Interval操作符返回⼀个Observable,它按固定的时间间隔发射⼀个⽆限递增的整数序列Interval //1秒后,每隔1秒发射从0发射,直到取消订阅 al(500,1000,ECONDS).subscribe(consumer1); //2秒后,每隔1秒发射从0发射,直到发射10个数据后停⽌ alRange(10,10,2,1,S).subscribe(consumer1);()Timer操作符创建⼀个在给定的时间段之后返回⼀个特殊值的Observable。它在延迟⼀段给定的时间后发射⼀个简单的数字0.![enter description here][13]//3 秒后发射数字(3,S).subscribe(consumer1);//3 秒后发射数字0 数据在主线程发射(3,S,read()).subscribe(consumer1)()/never()/error()● Empty:创建⼀个不发射任何数据但是正常终⽌的Observable● Never:创建⼀个不发射数据也不终⽌的Observable● Error:创建⼀个不发射数据以⼀个错误终⽌的rvable()通过Flowable切换er观察者的创建创建⼀个Observer,需要实现四个⽅法onSubscribe(),onNext(),onError(),onComplete()回调onSubscribe(Disposable d)返回Disposable 对象,让观察者管理订阅状态。回调onNext(Object o)数据源,onError(),onComplete()标志事件流终⽌。两者只会回调执⾏⼀个。Observer observer= new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { // 回调返回Disposable 对象,让观察者管理订阅状态, 例如取消订阅 printThread(tag+" onSubscribe "); } @Override public void onNext(@NonNull Object o) { //数据接收处理 printThread(tag+" onNext "ng()); } @Override public void onError(@NonNull Throwable e) { //发⽣异常,终⽌事件流 printThread(tag+" onError "+sage()); } @Override public void onComplete() { //事件流结束 printThread(tag+" onComplete "); } };ibe() 订阅subscribe(Observer o)subscribe()//参数为空,即使观察者为null,Observeable也会在subscribe()的时候创建,处理,发射数据。subscribe( Consumer onNext)subscribe(Consumer onNext, Consumer onError)subscribe( Consumer onNext) subscribe(ConsumeronNext, Consumer onError)subscribe( Consumer onNext) subscribe(ConsumeronNext, Consumer onError) subscribe(Consumer onNext, Consumer onError,ActiononComplete)后⾯ 四个subscribe( )最终都是调⽤了subscribe(Observer o)来实现订阅的,如下⾯截取的源码:public final Disposable subscribe(Consumer onNext, Consumer onError, Action onComplete, Consumer onSubscribe)

public final Disposable subscribe(Consumer onNext, Consumer onError, Action onComplete, Consumer onSubscribe) { ....... LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls;}我们看到subscribe()返回值是Disposable 对象,RxJava给我们⼀个类CompositeDisposable ,可以让我们管理所有Disposable。使⽤实例:CompositeDisposable disposables = new CompositeDisposable();Disposable d1 = (3,S).subscribe(consumer1);Disposable d2 = al(1,1 ,S).subscribe(consumer1);(d1);(d2);........();/d1,d2的事件流停⽌发射数据,//结束d1,d2任务,

发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1690217475a316495.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信