RxJava2.0文章六-解决上游、下游发射事件速度不平衡问题

RxJava2.0文章六-解决上游、下游发射事件速度不平衡问题

2023年7月25日发(作者:)

RxJava2.0⽂章六-解决上游、下游发射事件速度不平衡问题前⾔上⼀节中,我们找到了造成上游、下游发射事件速度不平衡问题的原因,就是是否有⽔缸,那么接下来我们就从两个⽅⾯来解决:分别是从数量上和速度上。1. 从数量上解决1. 从数量上解决这⾥需要使⽤sample操作符,sample操作符意思是每隔指定事件就从上游取出事件发射给下游,我们每隔2秒取⼀个事件给下游,⽰例代码如下: /** * 从数量上解决 */ public static void demo1(){ (new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { (i); } } }).subscribeOn(()) // 让上游的 for循环在⼦线程中执⾏ .sample(2 , S) // 每隔2秒从上游取出事件发射给下游 .observeOn(read()) //下游切换到主线程中 .subscribe(new Consumer() { // 建⽴连接后,new Consumer()然后复写 onNext()⽅法 @Override public void accept(Integer integer) throws Exception { Log.e("TAG" , "integer -> " + integer) ; } });

}运⾏效果如下:图⽚.png可以看到,上游在不停的发射数据,但我们只是每隔⼀定时间取⼀个放进⽔缸中,并没有全部放进⽔缸中,这次只是占⽤5M。到这⾥,⼤家以后可以出去吹⽜,我曾经通过技术⼿段去优化⼀个程序,使内存从 300多M降低到5M。2. 从速度上解决2. 从速度上解决解决⽅式就是:可以给上游的 for循环语句之后,让其延迟2秒即可,具体代码如下:/** * 从速度上解决 */ public static void demo2(){ (new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { (i); (2000); // 在上游的for循环执⾏完之后,添加2秒延迟 } } }).subscribeOn(()) // 表⽰上游的for循环操作在⼦线程中执⾏ .observeOn(read()) // 切换到主线程中执⾏下边的操作 .subscribe(new Consumer() { // 建⽴连接之后,new Consumer()之后,只是复写onNext()⽅法,不需要复写其他⽅法 @Override public void accept(Integer integer) throws Exception { Log.e("TAG" , "integer -> " + integer) ; } }); }运⾏效果如下:图⽚.png有效果图可知:内存也变为6M,⽽且内存线很平稳。上游通过适当的延迟,既减缓了事件进⼊⽔缸的速度,也让下游有充⾜的时间从⽔缸中去事件去处理,如此⼀来,就不⾄于⼤量事件⼀下⼦涌进⽔缸,也不会OOM;3. 从 数量、速度 上修改之前的Zip操作符的合并多个上游事件3. 从 数量、速度 上修改之前的Zip操作符的合并多个上游事件我们之前有⼀个zip操作符,把多个上游的事件组合之后,然后把组合事件发送给下游,下边通过⽰例代码也来实现下:从数量上修改,代码如下:/** * 从数量上解决: * 取样:每隔2秒从上游取出事件发射给下游 */ public static void demo3(){ // 创建第⼀个上游:Observable1 Observable observable1 = (new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { (i); } } }).subscribeOn(()).sample(2 , S) ; // 让上游的for循环在⼦线程中执⾏,并且是每隔2秒从上游发射事件给下游 // 创建第⼆个上游:Observable2 Observable observable2 = (new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { ("A"); } }).subscribeOn(()) ; // 通过zip操作符:把上游1、上游2组合,然后把组合后的事件再发射给下游 (observable1, observable2, new BiFunction() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).subscribe(new Consumer() { // 建⽴连接之后,通过new Consumer()复写accept() -> 表⽰的是onNext() @Override // 然后再new Consumer()复写 accept() -> 表⽰的是onError() public void accept(String s) throws Exception { Log.e("TAG" , "s -> " + s) ; } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { Log.e("TAG" , "throwable -> " + throwable) ; } }); }从速度上解决,代码如下:/** * 从速度上解决: * 让上游的for循环执⾏完之后,延迟两秒发射数据给下游 */ public static void demo4(){ // 创建第⼀个上游:Observable1 Observable observable1 = (new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { (i); (2000); // 让上游的for循环完之后,然后延迟2秒,让上游减慢发射事件的速度 } } }).subscribeOn(()) ; // 创建第⼆个上游:Observable2 Observable observable2 = (new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { ("A"); } }) ; // 通过zip操作符,把上游1、上游2组合,然后把组合后的事件发射给下游 (observable1, observable2, new BiFunction() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).subscribe(new Consumer() { // 建⽴连接之后,然后new Consumer(),复写accept() -> 相当于 onNext() @Override // 然后再次new Consumer(),复写accept() -> 相当于 onError() public void accept(String s) throws Exception { Log.e("TAG" , "s -> " + s) ; } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { Log.e("TAG" , "throwable -> " + throwable) ; } }); }通过本节的学习,⼤家应该对如何处理上下游发射数据速度不平衡应该都知道如何解决了。下⼀节我们就来看下 Flowable,它的⽤法和我们这⼀节讲解的⽤法基本⼀样,只是对它稍微的封装了。

发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1690216080a316121.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信