Android主流三方库源码分析(五、深入理解RxJava源码)(1)

注意到这里通过get()方法首先从ObservableEmitter的AtomicReference中拿到了保存的Disposable状态。然后交给了DisposableHelper进行判断处理。接下来看看DisposableHelper的处理。

2.4.5、DisposableHelper#isDisposed() && DisposableHelper#set()

public enum DisposableHelper implements Disposable {

DISPOSED;

public static boolean isDisposed(Disposable d) {
// 1
return d == DISPOSED;
}

public static boolean set(AtomicReference field, Disposable d) {
for (;😉 {
Disposable current = field.get();
if (current == DISPOSED) {
if (d != null) {
d.dispose();
}
return false;
}
// 2
if (field.compareAndSet(current, d)) {
if (current != null) {
current.dispose();
}
return true;
}
}
}

public static boolean dispose(AtomicReference field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
// …
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}


}

DisposableHelper是一个枚举类,内部只有一个值即DISPOSED, 从上面的分析可知它就是用来标记事件流被切断(废弃)状态的。先看到注释2和注释3处的代码field.compareAndSet(current, d)和field.getAndSet(d),这里使用了原子引用AtomicReference内部包装的CAS方法处理了标志Disposable的并发读写问题。最后看到注释3处,将我们传入的CreateEmitter这个原子引用类保存的Dispable状态和DisposableHelper内部的DISPOSED进行比较,如果相等,就证明数据流被切断了。为了更进一步理解Disposed的作用,再来看看CreateEmitter中剩余的关键方法。

2.4.6、CreateEmitter

@Override
public void onNext(T t) {

// 1
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
// 2
RxJavaPlugins.onError(t);
}
}

@Override
public boolean tryOnError(Throwable t) {

// 3
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
// 4
dispose();
}
return true;
}
return false;
}

@Override
public void onComplete() {
// 5
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
// 6
dispose();
}
}
}

在注释1、3、5处,onNext()和onError()、onComplete()方法首先都会判断事件流是否被切断,如果事件流此时被切断了,那么onNext()和onComplete()则会退出方法体,不做处理,onError()则会执行到RxJavaPlugins.onError(t)这句代码,内部会直接抛出异常,导致崩溃。如果事件流没有被切断,那么在onError()和onComplete()内部最终会调用到注释4、6处的这句dispose()代码,将事件流进行切断,由此可知,onError()和onComplete()只能调用一个,如果先执行的是onComplete(),再调用onError()的话就会导致异常崩溃

三、RxJava的线程切换

首先给出RxJava线程切换的例子:

Observable.create(new ObservableOnSubscribe() {
@Override
public voidsubscribe(ObservableEmitteremitter) throws Exception {
emitter.onNext(“1”);
emitter.onNext(“2”);
emitter.onNext(“3”);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe”);
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " +e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, “onComplete”);
}
});

可以看到,RxJava的线程切换主要分为subscribeOn()和observeOn()方法,首先,来分析下subscribeOn()方法。

1、subscribeOn(Schedulers.io())

在Schedulers.io()方法中,我们需要先传入一个Scheduler调度类,这里是传入了一个调度到io子线程的调度类,我们看看这个Schedulers.io()方法内部是怎么构造这个调度器的。

2、Schedulers#io()

static final Scheduler IO;

public static Scheduler io() {
// 1
return RxJavaPlugins.onIoScheduler(IO);
}

static {

// 2
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}

static final class IOTask implements Callable {
@Override
public Scheduler call() throws Exception {
// 3
return IoHolder.DEFAULT;
}
}

static final class IoHolder {
// 4
static final Scheduler DEFAULT = new IoScheduler();
}

Schedulers这个类的代码很多,这里我只拿出有关Schedulers.io这个方法涉及的逻辑代码进行讲解。首先,在注释1处,同前面分析的订阅流程的处理一样,只是一个处理hook的逻辑,最终返回的还是传入的这个IO对象。再看到注释2处,在Schedulers的静态代码块中将IO对象进行了初始化,其实质就是新建了一个IOTask的静态内部类,在IOTask的call方法中,也就是注释3处,可以了解到使用了静态内部类的方式把创建的IOScheduler对象给返回出去了。绕了这么大圈子,Schedulers.io方法其实质就是返回了一个IOScheduler对象

3、Observable#subscribeOn()

public final Observable subscribeOn(Scheduler scheduler) {

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}

在subscribeOn()方法里面,又将ObservableCreate包装成了一个ObservableSubscribeOn对象。我们关注到ObservableSubscribeOn类。

4、ObservableSubscribeOn

public final class ObservableSubscribeOn extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
// 1
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> observer) {
// 2
final SubscribeOnObserver parent = new SubscribeOnObserver(observer);

// 3
observer.onSubscribe(parent);

// 4
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}


}

首先,在注释1处,将传进来的source和scheduler保存起来。接着,等到实际订阅的时候,就会执行到这个subscribeActual方法,在注释2处,将我们自定义的Observer包装成了一个SubscribeOnObserver对象。在注释3处,通知观察者订阅了被观察者。在注释4处,内部先创建了一个SubscribeTask对象,来看看它的实现。

5、ObservableSubscribeOn#SubscribeTask

final class SubscribeTask implements Runnable {
private final SubscribeOnObserver parent;

SubscribeTask(SubscribeOnObserver parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}

SubscribeTask是ObservableSubscribeOn的内部类,它实质上就是一个任务类,在它的run方法中会执行到source.subscribe(parent)的订阅方法,这个source其实就是我们在ObservableSubscribeOn构造方法中传进来的ObservableCreate对象。接下来看看scheduler.scheduleDirect()内部的处理。

6、Scheduler#scheduleDirect()

public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

// 1
final Worker w = createWorker();

// 2
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

// 3
DisposeTask task = new DisposeTask(decoratedRun, w);

// 4
w.schedule(task, delay, unit);

return task;
}

这里最后会执行到上面这个scheduleDirect()重载方法。首先,在注释1处,会调用createWorker()方法创建一个工作者对象Worker,它是一个抽象类,这里的实现类就是IoScheduler,下面,我们看看IoScheduler类的createWorker()方法。

6.1、IOScheduler#createWorker()

final AtomicReference pool;

public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference(NONE);
start();
}

@Override
public Worker createWorker() {
// 1
return new EventLoopWorker(pool.get());
}

static final class EventLoopWorker extends Scheduler.Worker {

EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
// 2
this.threadWorker = pool.get();
}

}

首先,在注释1处调用了pool.get()这个方法,pool是一个CachedWorkerPool类型的原子引用对象,它的作用就是用于缓存工作者对象Worker的。然后,将得到的CachedWorkerPool传入新创建的EventLoopWorker对象中。重点关注一下注释2处,这里将CachedWorkerPool缓存的threadWorker对象保存起来了。

下面,我们继续分析3.6处代码段的注释2处的代码,这里又是一个关于hook的封装处理,最终还是返回的当前的Runnable对象。在注释3处新建了一个切断任务DisposeTask将decoratedRun和w对象包装了起来。最后在注释4处调用了工作者的schedule()方法。下面我们来分析下它内部的处理。

6.2、IoScheduler#schedule()

@Override
public Disposable schedule(@NonNull Runnableaction, long delayTime, @NonNull TimeUnit unit){

return threadWorker.scheduleActual(action,delayTime, unit, tasks);
}

内部调用了threadWorker的scheduleActual()方法,实际上是调用到了父类NewThreadWorker的scheduleActual()方法,我们继续看看NewThreadWorker的scheduleActual()方法中做的事情。

6.3、NewThreadWorker#scheduleActual()

public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

// 1
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
// 2
if (delayTime <= 0) {
// 3
f = executor.submit((Callable)sr);
} else {
// 4
f = executor.schedule((Callable)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

在NewThreadWorker的scheduleActual()方法的内部,在注释1处首先会新建一个ScheduledRunnable对象,将Runnable对象和parent包装起来了,这里parent是一个DisposableContainer对象,它实际的实现类是CompositeDisposable类,它是一个保存所有事件流是否被切断状态的容器,其内部的实现是使用了RxJava自己定义的一个简单的OpenHashSet类进行存储。最后注释2处,判断是否设置了延迟时间,如果设置了,则调用线程池的submit()方法立即进行线程切换,否则,调用schedule()方法进行延时执行线程切换。

7、为什么多次执行subscribeOn(),只有第一次有效?

从上面的分析,我们可以很容易了解到被观察者被订阅时是从最外面的一层(ObservableSubscribeOn)通知到里面的一层(ObservableOnSubscribe),当连续执行了到多次subscribeOn()的时候,其实就是先执行倒数第一次的subscribeOn()方法,直到最后一次执行的subscribeOn()方法,这样肯定会覆盖前面的线程切换。

8、observeOn(AndroidSchedulers.mainThread())

public final Observable observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {

return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}

可以看到,observeOn()方法内部最终也是返回了一个ObservableObserveOn对象,我们直接来看看ObservableObserveOn的subscribeActual()方法。

9、ObservableObserveOn#subscribeActual()

@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1
if (scheduler instanceof TrampolineScheduler) {
// 2
source.subscribe(observer);
} else {
// 3
Scheduler.Worker w = scheduler.createWorker();
// 4
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
}
}

首先,在注释1处,判断指定的调度器是不是TrampolineScheduler,这是一个不进行线程切换,立即执行当前代码的调度器。如果是,则会直接调用ObservableSubscribeOn的subscribe()方法,如果不是,则会在注释3处创建一个工作者对象。然后,在注释4处创建一个新的ObserveOnObserver将SubscribeOnobserver对象包装起来,并传入ObservableSubscribeOn的subscribe()方法进行订阅。接下来看看ObserveOnObserver类的重点方法。

10、ObserveOnObserver

@Override
public void onNext(T t) {

if (sourceMode != QueueDisposable.ASYNC) {
// 1
queue.offer(t);
}
schedule();
}

@Override
public void onError(Throwable t) {

schedule();
}

@Override
public void onComplete() {

schedule();
}

去除非主线逻辑的代码,在ObserveOnObserver的onNext()和onError()、onComplete()方法中最后都会调用到schedule()方法。接着看schedule()方法,其中onNext()还会把消息存放到队列中

11、ObserveOnObserver#schedule()

void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

这里使用了worker进行调度ObserveOnObserver这个实现了Runnable的任务。worker就是在AndroidSchedulers.mainThread()中创建的,内部其实就是使用Handler进行线程切换的,此处不再赘述了。接着看ObserveOnObserver的run()方法。

12、ObserveOnObserver#run()

@Override
public void run() {
// 1

学习分享

在当下这个信息共享的时代,很多资源都可以在网络上找到,只取决于你愿不愿意找或是找的方法对不对了

很多朋友不是没有资料,大多都是有几十上百个G,但是杂乱无章,不知道怎么看从哪看起,甚至是看后就忘

如果大家觉得自己在网上找的资料非常杂乱、不成体系的话,我也分享一套给大家,比较系统,我平常自己也会经常研读。

2021最新上万页的大厂面试真题

七大模块学习资料:如NDK模块开发、Android框架体系架构…

只有系统,有方向的学习,才能在段时间内迅速提高自己的技术。

这份体系学习笔记,适应人群:
**第一,**学习知识比较碎片化,没有合理的学习路线与进阶方向。
**第二,**开发几年,不知道如何进阶更进一步,比较迷茫。
**第三,**到了合适的年纪,后续不知道该如何发展,转型管理,还是加强技术研究。

由于文章内容比较多,篇幅不允许,部分未展示内容以截图方式展示 。

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化学习资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

[外链图片转存中…(img-17eJx6Da-1714198241955)]

只有系统,有方向的学习,才能在段时间内迅速提高自己的技术。

这份体系学习笔记,适应人群:
**第一,**学习知识比较碎片化,没有合理的学习路线与进阶方向。
**第二,**开发几年,不知道如何进阶更进一步,比较迷茫。
**第三,**到了合适的年纪,后续不知道该如何发展,转型管理,还是加强技术研究。

由于文章内容比较多,篇幅不允许,部分未展示内容以截图方式展示 。

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化学习资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/652761.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自制音频格式二维码的方法,适合多种音频格式使用

现在可以通过二维码的方法来传递音频文件是很常用的一种方式&#xff0c;可以将单个或者多个音频放入一个二维码&#xff0c;通过手机扫码来调取云端储存的音频文件来播放内容&#xff0c;这样可以让多人同时扫码获取内容&#xff0c;提升传播速度。 音频二维码制作的方法也比…

pve(Proxmox VE)安装i225v网卡驱动

配置pve源 备份原来的源 mv /etc/apt/sources.list /etc/apt/sources.list.bak打开文件 vi /etc/apt/sources.list将以下内容粘贴进去 deb https://mirrors.tuna.tsinghua.edu.cn/debian/ bookworm main contrib non-free non-free-firmwaredeb https://mirrors.tuna.tsing…

网络安全实训Day24(End)

写在前面 并没有完整上完四个星期&#xff0c;老师已经趁着清明节假期的东风跑掉了。可以很明显地看出这次持续了“四个星期”实训的知识体系并不完整&#xff0c;内容也只能算是一次基础的“复习”。更多的内容还是靠自己继续自学吧。 网络空间安全实训-渗透测试 文件包含攻击…

高级IO|从封装epoll服务器到实现Reactor服务器|Part1

从封装epoll_server到实现reactor服务器(part1) 项目复习&#xff1a;从封装epoll_server到实现reactor服务器(part1)EPOLL模式服务器初步 select, poll, epoll的优缺点epoll的几个细节封装epoll_server基本框架先写好创建监听套接字和创建epoll模型可以Accept了吗&#xff1f…

linglong扫描系统 JWT密钥硬编码 登录绕过漏洞复现

0x01 产品简介 linglong扫描系统是一款甲方资产巡航扫描系统。系统定位是发现资产,进行端口爆破。帮助企业更快发现弱口令问题。主要功能包括: 资产探测、端口爆破、定时任务、管理后台识别、报表展示等功能模块。 0x02 漏洞概述 linglong扫描系统 存在JWT密钥硬编码漏洞,…

上位机图像处理和嵌入式模块部署(树莓派4b用skynet实现进程通信)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面我们说过&#xff0c;在工业系统上面一般都是使用多进程来代替多线程。这后面&#xff0c;主要的原因还是基于安全的考虑。毕竟一个系统里面&a…

Tomcat架构设计精髓分析-Connector高内聚低耦合设计

优秀的模块化设计通常都会采用高内聚、低耦合 高内聚是指相关度比较高的功能要尽可能集中&#xff0c;不要分散。低耦合是指两个相关的模块要尽可能减少依赖的部分和降低依赖的程序&#xff0c;不要让两个模块产中强依赖。 Tomca连接器需要实现的功能: 监听网络端口 接受网络…

LMDeploy量化部署LLMVLM实践-笔记五

本次课程由西北工业大学博士生、书生浦源挑战赛冠军队伍队长、第一期书生浦语大模型实战营优秀学员【安泓郡】讲解【OpenCompass 大模型评测实战】课程 课程视频&#xff1a;https://www.bilibili.com/video/BV1tr421x75B/ 课程文档&#xff1a;https://github.com/InternLM/…

Apache RocketMQ ACL 2.0 全新升级

作者&#xff1a;徒钟 引言 RocketMQ 作为一款流行的分布式消息中间件&#xff0c;被广泛应用于各种大型分布式系统和微服务中&#xff0c;承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大&#xff0c;安全相关的挑战日益突出&am…

PDF高效编辑器,支持修改PDF文档并转换格式从PDF文件转换成图片文件,轻松管理你的文档世界!

PDF文件已成为我们工作、学习和生活中不可或缺的一部分。然而&#xff0c;传统的PDF阅读器往往只能满足简单的查看需求&#xff0c;对于需要频繁编辑、修改或转换格式的用户来说&#xff0c;就显得力不从心。现在&#xff0c;我们为您带来一款全新的PDF高效编辑器&#xff0c;让…

qml和c++结合使用

目录 文章简介1. 创建qml工程2. 创建一个类和qml文件&#xff0c;修改main函数3. 函数说明&#xff1a;4. qml 文件间的调用5. 界面布局6. 代码举例 文章简介 初学qml用来记录qml的学习过程&#xff0c;方便后面归纳总结整理。 1. 创建qml工程 如下图&#xff0c;我使用的是…

node.js egg.js

Egg 是 Node.js 社区广泛使用的框架&#xff0c;简洁且扩展性强&#xff0c;按照固定约定进行开发&#xff0c;低协作成本。 在Egg.js框架中&#xff0c;ctx 是一个非常核心且常用的对象&#xff0c;全称为 Context&#xff0c;它代表了当前 HTTP 请求的上下文。ctx 对象封装了…