【Rxjava详解】(四)线程切换

lift()变换原理

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在RxJava的内部,它们是基于同一个基础的变换方法:lift()

首先看一下lift() 的内部实现(仅显示了部分主要逻辑代码):

public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {return Observable.create(new OnSubscribe<R>() {@Overridepublic void call(Subscriber subscriber) {Subscriber newSubscriber = operator.call(subscriber);newSubscriber.onStart();onSubscribe.call(newSubscriber);}});
}

方法用于将当前的 Observable 对象转换成另一种类型的 Observable 对象。它接受一个 Operator 参数,用于定义转换的规则。返回的是一个新的 Observable 对象。

它创建了一个新的 Observable 对象,并且将 operator 对象作用于当前 Observable 对象的订阅过程中。

Observable.create 方法中,通过创建一个匿名内部类实现了 OnSubscribe 接口的 call 方法。在 call 方法中,首先通过调用 operator.call(subscriber),将原始的 Subscriber 对象转换成一个新的 Subscriber 对象 newSubscriber。然后调用 newSubscriber.onStart() 方法进行一些初始化操作。最后调用 onSubscribe.call(newSubscriber),将转换后的 newSubscriber 对象传递给原始的 onSubscribe 对象进行订阅操作。

类似于这个图(别的地方扒下来的)

https://s2.loli.net/2023/11/23/iTWHCOKE791j8mA.gif

RxJava不建议开发者自定义Operator来直接使用lift(),而是建议尽量使用已有的lift()包装方法(如map()、flatMap()等)进行组合来实现需求,因为直接使用lift()非常容易发生一些难以发现的错误。

线程控制Scheduler

在不指定线程的情况下,RxJava遵循的是线程不变的原则,即在哪个线程调用subscribe()方法就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。也就是说事件的发出和消费都是在同一个线程的。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于RxJava是至关重要的。而要实现异步,则需要用到RxJava的另一个概念:Scheduler

Scheduler简介

RxJava中,Scheduler相当于线程控制器,通过使用 Scheduler 可以实现事件的异步处理和线程切换。Scheduler 可以指定事件发送和处理所在的线程,从而实现异步的操作,RxJava 提供了多种类型的 Scheduler

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个Scheduler 使用的固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费CPU
  • 另外,Android还有一个专用的AndroidSchedulers.mainThread(),它指定的操作将在Android主线程运行。

有了这几个Scheduler,就可以使用subscribeOn()observeOn()两个方法来对线程进行控制了。subscribeOn()指定subscribe()所发生的线程,即Observable.OnSubscribe()被激活时所处的线程或者叫做事件产生的线程。observeOn()指定Subscriber所运行在的线程或者叫做事件消费的线程。

Observable.just("Hello").subscribeOn(Schedulers.io()) // 在 IO 线程发送事件.map(str -> str + " World").observeOn(AndroidSchedulers.mainThread()) // 在主线程中处理事件.subscribe(str -> {// 更新 UItextView.setText(str);}, throwable -> {// 处理错误Log.e(TAG, "Error: " + throwable.getMessage());});

上面这段代码中,subscribeOn(Schedulers.io())的指定会让创建的事件的内容HelloWorld !将会在IO线程发出;而由于observeOn(AndroidScheculers.mainThread()) 的指定,因此subscriber()方法设置后的回调中内容的打印将发生在主线程中。事实上,这种在subscribe()之前写上两句subscribeOn(Scheduler.io())observeOn(AndroidSchedulers.mainThread())的使用方式非常常见,它适用于多数的***后台线程取数据,主线程显示***的程序策略。

Scheduler的原理

我们可以多切换几次线程,因为observeOn()指定的是Subscriber的线程,而这个Subscriber并不是subscribe() 参数中的Subscriber,而是observeOn()执行时的当前Observable所对应的Subscriber,即它直接对应的Subscriber。换句话说observeOn() 指定的是它之后的操作所在的线程。所以想要多次切换线程,只要在每个想要切换线程的位置调用一次observeOn()即可。

Observable.just("Hello").subscribeOn(Schedulers.io()) // 在 IO 线程执行.observeOn(Schedulers.computation()) // 切换到计算线程执行.map(s -> s + " World").observeOn(AndroidSchedulers.mainThread()) // 切换到主线程执行.subscribe(s -> {// 更新 UItextView.setText(s);});

如上,通过observeOn()的多次调用,程序实现了线程的多次切换。 不过,不同于observeOn(),subscribeOn()的位置放在哪里都可以,但它是只能调用一次的。

subscribeOn()observeOn()的内部实现,也是用的lift()

具体看图(不同颜色的箭头表示不同的线程,subscribeOn()原理图:

https://s2.loli.net/2023/11/23/BW1uFAZn4rGHOe6.jpg

observeOn()原理图:

https://s2.loli.net/2023/11/23/3rfzvsEFhUDTn1Y.jpg

从图中可以看出,subscribeOn()observeOn()都做了线程切换的工作(图中的schedule...部位)。不同的是,subscribeOn()的线程切换发生在OnSubscribe中,即在它通知上一级 OnSubscribe时,这时事件还没有开始发送,因此subscribeOn()的线程控制可以从事件发出的开端就造成影响;而observeOn()的线程切换则发生在它内建的Subscriber中,即发生在它即将给下一级Subscriber发送事件时,因此observeOn()控制的是它后面的线程。

用一张图来(扒的)解释当多个subscribeOn()observeOn()混合使用时,线程调度是怎么发生的

https://s2.loli.net/2023/11/23/Q8JYfSKCa3hjgcr.jpg

图中共有5处含有对事件的操作。由图中可以看出,①和②两处受第一个subscribeOn()影响,运行在红色线程;③和④处受第一个observeOn()的影响,运行在绿色线程;⑤处受第二个 onserveOn()影响,运行在紫色线程;而第二个subscribeOn(),由于在通知过程中线程就被第一个subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个subscribeOn()的时候,只有第一个subscribeOn()起作用。

在前面讲Subscriber的时候,提到过SubscriberonStart()可以用作流程开始前的初始化。然而onStart()由于在subscribe()发生时就被调用了,因此不能指定线程,而是只能执行在subscribe()被调用时的线程。这就导致如果onStart()中含有对线程有要求的代码(例如在界面上显示一个ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测subscribe()将会在什么线程执行。

而与Subscriber.onStart()相对应的,有一个方法Observable.doOnSubscribe()。它和Subscriber.onStart()同样是在subscribe()调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下,doOnSubscribe()执行在subscribe()发生的线程;而如果在doOnSubscribe()之后有subscribeOn()的话,它将执行在离它最近的subscribeOn()所指定的线程。

示例代码:

Observable.create(onSubscribe).subscribeOn(Schedulers.io()).doOnSubscribe(new Action0() {@Overridepublic void call() {progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行}}).subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);

Agera

之前Google发布agera,它在Github上的介绍是:Reactive Programming for Android,可以进行了解。它为 Android 应用程序提供了一种简单且灵活的方式来处理数据流和事件驱动的编程模型。很轻量化,很适合安卓。

但是缺点也很明显:与 RxJava 相比,Agera 的功能相对较为有限,操作符和功能较少。对于一些复杂的数据流操作和并发处理,可能需要额外的工作量来实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/214381.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

please upgrade numpy version to >=1.20

升级 upgrade numpy_升级numpy-CSDN博客 pip install numpy --upgrade 没有pip conda install numpy --upgrade 会报错 conda list numpy来查看numpy版本 似乎这个numpy要看numpy-base这个 似乎没有pip

记录ruoyi-plus-vue部署的问题

ruoyi-vue-plus5.x 后端 ruoyi-vue-plus5.x 前端 前端本地启动命令 # 克隆项目 git clone https://gitee.com/JavaLionLi/plus-ui.git# 安装依赖 npm install --registryhttps://registry.npmmirror.com# 启动服务 npm run dev# 构建生产环境 yarn build:prod # 前端访问地址…

百度文心一言(千帆大模型)聊天API使用指导

开篇不得不吐槽下百度&#xff0c;百度智能云平台首页跳转千帆大模型平台的按钮太多了&#xff0c;不同按钮跳转不同的子页面&#xff0c;不熟悉的&#xff0c;能把人找懵。入口太多&#xff0c;就导致用户不知道从何开始。本文就从一个前端开发人员的角度&#xff0c;教大家快…

AlphaPose-RKNN-rk3588

1. AlphaPose背景介绍 AlphaPose是一个用于人体姿态估计的开源工具。人体姿态估计在计算机视觉中是一个核心问题&#xff0c;它旨在定位并识别图像或视频中的人体关键点和骨骼结构。在许多应用中&#xff0c;如动作识别、行为分析、虚拟现实和增强现实&#xff0c;人体姿态估计…

视频服务网关的三大部署(二)

视频网关是软硬一体的一款产品&#xff0c;可提供多协议&#xff08;RTSP/ONVIF/GB28181/海康ISUP/EHOME/大华、海康SDK等&#xff09;的设备视频接入、采集、处理、存储和分发等服务&#xff0c; 配合视频网关云管理平台&#xff0c;可广泛应用于安防监控、智能检测、智慧园区…

飞利浦、书客、雷士的护眼台灯到底怎么选?三款台灯测评对比

随着生活水平的提高&#xff0c;相信越来越多的家庭会比较在意生活质量的提高&#xff0c;会越来越重视健康问题&#xff0c;特别是有关孩子学习方面的。面对如今青少年儿童如此高的近视率的情况下&#xff0c;很多家长会选择选购一台专业护眼台灯为孩子的视力保驾护航。 不过想…

2021年03月 Scratch(三级)真题解析#中国电子学会#全国青少年软件编程等级考试

Scratch等级考试(1~4级)全部真题・点这里 一、单选题(共25题,每题2分,共50分) 第1题 在《采矿》游戏中,当角色捡到黄金时财富值加1分,捡到钻石时财富值加2分,下面哪个程序实现这个功能? A: B: C: D: 答案:D A将变量值固定,BC为双重判断

【拿完年终奖后】想要转行网络安全,一定不要错过这个时间段。

网络安全&#xff0c;作为当下互联网行业中较为热门的岗位&#xff0c;薪资可观、人才需求量大&#xff0c;作为转行必考虑。 在这里奉劝所有零基础想转行&#xff08;入门&#xff09; 网络安全的朋友们 在转行之前&#xff0c;一定要对网络安全行业做一个大概了解&#xf…

2016年10月3日 Go生态洞察:Go 1.7中的子测试和子基准测试

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

企业微信web登录实现

企业微信登录流程 实现方式 使用js-sdk 使用 wecom/jssdk 初始化企业微信登录组件。 为了满足网站定制化的需求&#xff0c;我们支持将企业微信登录组件内嵌到开发者的网站中。用户使用企业微信登录授权后&#xff0c;登录组件将 auth code 返回给网站。 企业微信登录组件主…

Python-函数传参与数据类型

Python中&#xff0c;函数参数传递是通过对象的引用进行的&#xff0c;我们可以进行下面的验证。 def use_name(val):print("name id :%s" % (id(val)))val "hanshu1"print("name id modified :%s" % (id(val)))def test_ref():name "ha…

MyBatisPlus总结

MyBatis-Plus时Mybatis的Best Partner MyBatis-Plus (opens new window)&#xff08;简称 MP&#xff09;是一个 MyBatis (opens new window)的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生。 特性 无侵入损耗小强大的 CR…