我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RxJava2Scheduler使用实例深入解析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RxJava2Scheduler使用实例深入解析

前言

欢迎来到大家深入理解 RxJava2 系列第二篇,这里先插上一句,本系列文章用的源码都是基于 RxJava 2.2.0 正式版。本篇文章将先与大家一起理解 Scheduler 与 Worker ,顺着 RxJava2 的源码捋一下它们的实现原理。

Scheduler 与 Worker

Scheduler 与 Worker 在 RxJava2 中是一个非常重要的概念,他们是 RxJava 线程调度的核心与基石。用过的人肯定都会了解一些,但是想必了解 Worker 的读者们就不多了。很多人会疑惑,既然有了 Scheduler 可以直接调度 Runnable,为何又强加一个 Worker 的概念,诸位稍安勿躁,跟着笔者的思路一起走下去。

定义

笔者这里展示一下 Scheduler 最核心的定义部分:

public abstract class Scheduler {
    @NonNull
    public abstract Worker createWorker();
    public Disposable scheduleDirect(@NonNull Runnable run) {
        ...
    }
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        ...
    }
    @NonNull
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        ...
    }
    public abstract static class Worker implements Disposable {
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            ...
        }
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
        @NonNull
        public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
            ...
        }
    }
}

从上面的定义可以看出,Scheduler 本质上就是用来调度 Runnable 的,支持立即、延时和周期形式的调用,而 Worker 是任务的最小单元的载体。在 RxJava2 内部的实现中,通常一个或者多个 Worker 对应一个ScheduledThreadPoolExecutor对象,这些暂且不表。

scheduleDirect / schedulePeriodicallyDirect

在 RxJava 1.x 时代, Scheduler 是没有scheduleDirect/schedulePeriodicallyDirect的,只能够先createWorker,再通过 Worker 来调度任务。这些方法是对 Worker 调用的简化,可以认为是创建了一个只能调度一次任务的 Worker 并立马调度了该任务。在Scheduler基类的源码中,也可以看出默认的实现是直接 createWorker 并创建对应的 Task 的(虽然在部分 Scheduler 覆盖的实现上并没有创建 Worker,但是可以认为存在虚拟的 Worker)。

createWorker

一个 Scheduler 可以创建多个 Worker,这两者是一对多的关系,而 Worker 与 Task 也是一对多的关系。

如下图所示:

Worke 的存在为了确保两件事:

  • 同一个 Worker 创建的 Task 都会确保串行,且立即执行的任务符合先进先出原则。
  • Worker 绑定了调用了他的方法的 Runnable,当该 Worker 取消时,基于他的 Task 均被取消

因此当有操作符需要使用 Scheduler 时,可以通过 Worker 来将一系列的 Runnable 统一的调度和取消,最典型的例子就是observeOn,下面会详细分析。

Schedulers

RxJava2 默认内置了几种 Scheduler 的实现,适用于不同的场景,这些 Scheduler 均在 Schedulers 类中可以直接获得

方法说明
Schedulers.computation()适用于计算密集型任务
Schedulers.io()适用于 IO 密集型任务
Schedulers.trampoline()在某个调用 schedule 的线程执行
Schedulers.newThread()每个 Worker 对应一个新线程
Schedulers.single()所有 Worker 使用同一个线程执行任务
Schedulers.from(Executor)使用 Executor 作为任务执行的线程

这里我们挑选两个最常用的 computation / io 源码稍作分析。

NewThreadWorker

NewThreadWorker 在 computation / io / newThread 均有涉及,我们先了解一下这个类。

上面笔者有提到过 Worker 与ScheduledThreadPoolExecutor 的关系,而这里的NewThreadWorkerScheduledThreadPoolExecutor便是一对一的关系。在NewThreadWorker构造函数中会通过工厂方法创建一个corePoolSize 为 1 的ScheduledThreadPoolExecutor对象并持有之。

ScheduledThreadPoolExecutor 从 JDK1.5 开始存在,这个类继承于 ThreadPoolExecutor,可以支持即使、延时和周期的任务。但是注意在ScheduledThreadPoolExecutor中 maximumPoolSize 参数是无效的,corePoolSize 表示其最大线程数,且它的队列是无界的。这里不再细说该类,否则涉及的就太多了。

有了这个类,RxJava2 实现 Worker 时便是站在了巨人的肩膀上,线程调度可以直接使用该类解决,略微麻烦之处就是封一层Disposable的逻辑。

具体细节读者可以从源码一探究竟。

ComputationScheduler

作为计算密集型的 Scheduler,ComputationScheduler的线程数是与 CPU 核心密切相关的,原因是当线程数远远超过 CPU 核心数目时,CPU 的时间更多的损耗在了线程的上下文切换,因此比较通用的方式是保持最大线程数和 CPU 核心数一致。

最大线程数目

MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
static int cap(int cpuCount, int paramThreads) {
    return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}

从上面代码可见MAX_THREADS 大于 0,但是不超过 CPU 核心数,实际数值也受用户设置的 System Properties 的影响。

FixedSchedulerPool

顾名思义,FixedSchedulerPool 可以认为是固定数目的真正的 Worker 的缓存池。

确定了MAX_THREADS后,在ComputationScheduler的构造函数,会创建FixedSchedulerPool对象,FixedSchedulerPool 内部会直接创建一个长度为MAX_THREADSPoolWorker数组。PoolWorker继承自NewThreadWorker,但是没有任何额外的代码。

static final class PoolWorker extends NewThreadWorker {
    PoolWorker(ThreadFactory threadFactory) {
        super(threadFactory);
    }
}

也就是说当FixedSchedulerPool创建时,已经有MAX_THREADS个 corePoolSize 为 1 的 ScheduledThreadPoolExecutor随之创建。

PoolWorker

从使用角度来说,有了FixedSchedulerPool 好像就够了,我们只需要每次createWorker时从池子里取一个PoolWorker并返回即可。

但是这里忽略了一个要点,每个 Worker 是独立的,每个 Worker 内部的任务是绑定在这个 Worker 中的。如果按照上述的做法,暴露出去PoolWorker,会出现 2 个问题:

  • createWorker 会可能会返回相同的 Worker,导致这个 Worker 被 dispose 后,其内部所有的任务会被一并取消,而违背了不同 Worker 之间的任务的独立性
  • PoolWorker也就是NewThreadWorker 被 dispose 后,其关联的ScheduledThreadPoolExecutor被 shutdown,后续再次获取该 Worker 也会导致无法创建任务

EventLoopWorker

为了解决上述的问题,我们需要在PoolWorker外再包一层,createWorker每次都会创建一个EventLoopWorker对象。

EventLoopWorker 其实是个代理对象,他会将 Runnable 代理给FixedSchedulerPool中取到的PoolWorker来调度,并且他会负责管理经由他创建的任务,当自身被取消时,会将创建的任务统统取消。

示意图

IoScheduler

与 ComputationScheduler 恰恰相反,IO 密集型的 Scheduler 线程数是无上限的。这是因为 IO 设备的速度是远远低于 CPU 速度的,在等待 IO 操作时, CPU 往往是闲置的,因此应该创建更多的线程让 CPU 尽可能的利用。当然并不是说线程越多越好,线程数目膨胀到一定程度既会影响 CPU 的效率,也会消耗大量的内存。在IoScheduler中,每个 Worker 在空置一段时间后就会被清除以控制线程的数目。

CachedWorkerPool

CachedWorkerPool是一个变长并定期清理的ThreadWorker的缓存池,内部通过一个ConcurrentLinkedQueue维护。和PoolWorker类似,ThreadWorker也是继承自NewThreadWorker

static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;
    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }
    public long getExpirationTime() {
        return expirationTime;
    }
    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}

仅仅是增加了一个expirationTime字段,用来标识这个ThreadWorker的超时时间。

于此同时,在CachedWorkerPool初始化时会传入 Worker 的超时时间,目前是写死的 60 秒。这个超时时间表示ThreadWorker闲置后最大存活时间(实际中不保证 60 秒时被回收)。

EventLoopWorker

IoScheduler中也存在一个EventLoopWorker类,它和ComputationScheduler中的作用也是类似的:

  • 管理自身调度过的任务
  • 管理ThreadWorker,使其可被回收再次使用

Worker 的管理

  • 创建:在闲置队列中查找ThreadWorker,如果存在则取出,否则new``一个新的ThreadWorker,最后在外面包一层EventLoopWorker```并返回。
  • 回收:当EventLoopWorker dispose 后,会更新内部的ThreadWorker超时时间,并促使CachedWorkerPoolThreadWorker加入闲置队列
  • 清理:CachedWorkerPool在初始化时启动定时任务,每隔 60 秒清理队列中超时的ThreadWorker

这里说个细节,因为CachedWorkerPool是每隔 60 秒清理一次队列的,因此ThreadWorker的存活时间取决于入队的时机,如果一直没有被再次取出,其被实际清理的延迟在 60 - 120 秒之间,有兴趣的读者可以想一想为什么。

示意图

对比

熟悉线程的读者朋友们会发现,ComputationSchedulerIoScheduler很像某些参数下的ThreadPoolExecutor

ThreadPoolExecutor 参数ComputationScheduler(n)IoScheduler
corePoolSizen0
maximumPoolSizenInteger.MAX_VALUE
keepAliveTime060
unit-TimeUnit.SECONDS
workQueueLinkedBlockingQueueSynchronousQueue

他们对线程的控制外在的表现很相似。 但是实际的线程执行对象不一样:

  • ThreadPoolExecutor:Thread
  • Scheduler:支持立即、延迟、定时调度任务的对象,通常为 ScheduledThreadPoolExecutor(coreSize = 1)

这两者的对比有助于我们更加深刻地理解 Scheduler 设计的内在逻辑。

结语

Scheduler 是 RxJava 线程的核心概念,RxJava 基于此屏蔽了 Thread 相关的概念,只与 Scheduler / Worker / Runnable 打交道。

以上就是RxJava2 Scheduler使用实例深入解析的详细内容,更多关于RxJava2 Scheduler使用的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RxJava2Scheduler使用实例深入解析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RxJava2Scheduler使用实例深入解析

这篇文章主要为大家介绍了RxJava2Scheduler使用实例深入解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2022-11-13

深入解析NumPy函数:实际应用与示例

NumPy是Python中一个重要的科学计算库,提供了强大的多维数组对象和广播功能,以及许多用于数组的操作和计算的函数。在数据科学和机器学习领域中,NumPy被广泛应用于数组操作和数值计算。本文将全面解析NumPy的常用函数,并给出应用和实
深入解析NumPy函数:实际应用与示例
2024-01-26

深入解析Python命令行参数并实例应用

Python命令行参数详解及实例应用在Python编程中,我们经常会需要从命令行中获取参数来执行不同的操作。Python内置了一个argparse模块,可以帮助我们解析命令行参数,并根据参数执行不同的逻辑。本文将详细介绍argparse模
深入解析Python命令行参数并实例应用
2024-02-03

Golang Makefile示例深入讲解使用

一次偶然的机会,在 github 上看到有人用 Makefile,就尝试了一下,发现真的非常合适,Makefile 本身就是用来描述依赖的,可读性非常好,而且与强大的 shell 结合在一起,基本可以实现任何想要的功能
2023-01-12

深入解析Go语言输入函数并附实例

Golang小白一枚,正在不断学习积累知识,现将学习到的知识记录一下,也是将我的所得分享给大家!而今天这篇文章《深入解析Go语言输入函数并附实例》带大家来了解一下##content_title##,希望对大家的知识积累有所帮助,从而弥补自己
深入解析Go语言输入函数并附实例
2024-04-04

Native层消息机制深入探究实例解析

这篇文章主要为大家介绍了Native层消息机制的深入探究及实例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-01-17

React Streaming SSR原理示例深入解析

这篇文章主要为大家介绍了React Streaming SSR原理示例深入解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2022-12-20

深入解析C++中的std::thread的使用

这篇文章主要介绍了C++中的std::thread的使用,在C++11新标准中,可以简单通过使用thread库,来管理多线程,本文通过实例代码给大家详细讲解,需要的朋友可以参考下
2023-05-16

通过案例深入解析linux NFS机制

接上篇,创建web02服务器,将web01、web02服务器的/data目录挂载到nfs01服务器的共享目录/data上,并以不同方式实现开机自启动。 web01篇:在/etc/rc.local中添加如下一行:重启并检查:web02篇: 克
2022-06-03

Golang函数式编程深入分析实例

习惯与函数式编程语言的开发者,会认为for循环和if判断语句是冗长的代码,通过使用map和filter处理集合元素让代码更可读。本文介绍Go闭包实现集合转换和过滤功能
2023-01-10

深入了解Oracle数据库实例

Oracle数据库是世界领先的关系型数据库管理系统(RDBMS),广泛应用于企业级系统中。Oracle数据库的实例是数据库系统的一个重要组成部分,它包括内存结构和后台进程,用于管理数据库的操作。深入了解Oracle数据库实例,可以帮助开发人
深入了解Oracle数据库实例
2024-03-08

Android 使用XML做动画UI的深入解析

效果: http://www.56.com/u82/v_OTM4MDk5MTk.html第一步: 创建anim文件夹放置动画xml文件在res文件夹下,创建一个anim的子文件夹。 第二步: 加载动画接着在Activity
2022-06-06

深入解析SQL的定义和使用范围

SQL的定义及应用领域详解摘要:本文旨在介绍 SQL(Structured Query Language)的定义及其在不同应用领域中的具体应用。首先,我们将简要介绍 SQL 的定义和历史背景。接着,我们将深入探讨 SQL 在数据管理、数据分
深入解析SQL的定义和使用范围
2023-12-28

深入解析:使用pip更改源的方法

快速上手:pip换源方法详解,需要具体代码示例引言:在Python开发过程中,使用pip安装第三方库是非常常见的操作。然而,由于网络的原因,有时我们会遇到pip安装速度缓慢的问题。这是因为pip默认使用的是官方源,而官方源有时候会受到网络
深入解析:使用pip更改源的方法
2024-01-16

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录