我的编程空间,编程开发者的网络收藏夹
学习永远不晚

KotlinSelect协程多路复用的实现详解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

KotlinSelect协程多路复用的实现详解

前言

协程通信三剑客:Channel、Select、Flow,本篇将会重点分析Select的使用及原理。

通过本篇文章,你将了解到:

  • Select 的引入
  • Select 的使用
  • Invoke函数 的妙用
  • Select 的原理
  • Select 注意事项

1. Select 的引入

多路数据的选择

串行执行

如今的二维码识别应用场景越来越广了,早期应用比较广泛的识别SDK如zxing、zbar,它们各有各的特点,也存在识别不出来的情况,为了将两者优势结合起来,我们想到的方法是同一份二维码图片分别给两者进行识别。

如下:

    //从zxing 获取二维码信息
    suspend fun getQrcodeInfoFromZxing(bitmap: Bitmap?): String {
        //模拟耗时
        delay(2000)
        return "I'm fish"
    }
    //从zbar 获取二维码信息
    suspend fun getQrcodeInfoFromZbar(bitmap: Bitmap?): String {
        delay(1000)
        return "I'm fish"
    }
    fun testSelect() {
        runBlocking {
            var bitmap = null
            var starTime = System.currentTimeMillis()
            var qrcoe1 = getQrcodeInfoFromZxing(bitmap)
            var qrcode2 = getQrcodeInfoFromZbar(bitmap)
            println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
        }
    }

查看打印,最后花费的时间:

qrcode1=I’m fish qrcode2=I’m fish useTime:3013 ms

当然这是串行的方式效率比较低,我们想到了用协程来优化它。

协程并行执行

如下:

    fun testSelect1() {
        var bitmap = null;
        var starTime = System.currentTimeMillis()
        var deferredZxing = GlobalScope.async {
            getQrcodeInfoFromZxing(bitmap)
        }
        var deferredZbar = GlobalScope.async {
            getQrcodeInfoFromZbar(bitmap)
        }
        runBlocking {
            //挂起等待识别结果
            var qrcoe1 = deferredZxing.await()
            //挂起等待识别结果
            var qrcode2 = deferredZbar.await()
            println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
        }
    }

查看打印,最后花费的时间:

qrcode1=I’m fish qrcode2=I’m fish useTime:2084 ms

可以看出,花费时间明显变少了。

与上个Demo 相比,虽然识别过程是放在协程里并行执行的,但是在等待识别结果却是串行的。我们引入两个识别库的初衷是哪个识别快就用哪个的结果,为了达成这个目的,传统的方式是:

同时监听并记录识别结果的返回。

同时监听多路结果

如下:

    fun testSelect2() {
        var bitmap = null;
        var starTime = System.currentTimeMillis()
        var deferredZxing = GlobalScope.async {
            getQrcodeInfoFromZxing(bitmap)
        }
        var deferredZbar = GlobalScope.async {
            getQrcodeInfoFromZbar(bitmap)
        }
        var isEnd = false
        var result: String? = null
        GlobalScope.launch {
            if (!isEnd) {
                //没有结束,则继续识别
                var resultTmp = deferredZxing.await()
                if (!isEnd) {
                    //识别没有结束,说明自己是第一个返回结果的
                    result = resultTmp
                    println("zxing recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                    //标记识别结束
                    isEnd = true
                }
            }
        }
        GlobalScope.launch {
            if (!isEnd) {
                var resultTmp = deferredZbar.await()
                if (!isEnd) {
                    //识别没有结束,说明自己是第一个返回结果的
                    result = resultTmp
                    println("zbar recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                    isEnd = true
                }
            }
        }
        //检测是否有结果返回
        runBlocking {
            while (!isEnd) {
                delay(1)
            }
            println("recognize result:$result")
        }
    }

通过检测isEnd 标记来判断是否有某个模块返回结果。

结果如下:

zbar recognize ok useTime:1070 ms

recognize result:I’m fish

由于模拟设定的zbar 解析速度快,因此每次都是采纳的是zbar的结果,所花费的时间大幅减少了,该结果符合预期。

Select 闪亮登场

虽说上个Demo结果符合预期,但是多了很多额外的代码、多引入了其它协程,并且需要子模块对标记进行赋值(对"isEnd"进行赋值),没有达到解耦的目的。我们希望子模块的任务是单一且闭环的,如果能在一个函数里统一检测结果的返回就好了。

Select 就是为了解决多路数据的选择而生的。

来看看它是怎么解决该问题的:

    fun testSelect3() {
        var bitmap = null;
        var starTime = System.currentTimeMillis()
        var deferredZxing = GlobalScope.async {
            getQrcodeInfoFromZxing(bitmap)
        }
        var deferredZbar = GlobalScope.async {
            getQrcodeInfoFromZbar(bitmap)
        }
        runBlocking {
            //通过select 监听zxing、zbar 结果返回
            var result = select<String> {
                //监听zxing
                deferredZxing.onAwait {value->
                    //value 为deferredZxing 识别的结果
                    "zxing result $value"
                }
                //监听zbar
                deferredZbar.onAwait { value->
                    "zbar result $value"
                }
            }
            //运行到此,说明已经有结果返回
            println("result from $result useTime:${System.currentTimeMillis() - starTime}")
        }
    }

结果如下:

result from zbar result I’m fish useTime:1079

符合预期,同时可以看出:相比上个Demo,这样写简洁了许多。

2. Select 的使用

除了可以监听async的结果,Select 还可以监听Channel的发送方/接收方 数据,我们以监听接收方数据为例:

    fun testSelect4() {
        runBlocking {
            var bitmap = null;
            var starTime = System.currentTimeMillis()
            var receiveChannelZxing = produce {
                //生产数据
                var result = getQrcodeInfoFromZxing(bitmap)
                //发送数据
                send(result)
            }
            var receiveChannelZbar = produce {
                var result = getQrcodeInfoFromZbar(bitmap)
                send(result)
            }
            var result = select<String> {
                //监听是否有数据发送过来
                receiveChannelZxing.onReceive {
                    value->"zxing result $value"
                }
                receiveChannelZbar.onReceive {
                        value->"zbar result $value"
                }
            }
            println("result from $result useTime:${System.currentTimeMillis() - starTime}")
        }
    }

结果如下:

result from zbar result I’m fish useTime:1028

不论是async还是Channel,Select 都可以监听它们的数据,从而形成多路复用的效果。

在监听协程里调用select 表达式,表达式{}内声明需要监听的协程的数据,对于select 来说有两种场景:

  • 没有数据,则select 挂起协程并等待直到其它协程数据准备完成后再次恢复select 所在的协程。
  • 有数据,则select 正常执行并返回获取的数据。

3. Invoke函数的妙用

在分析Select 原理之前,需要弄明白invoke函数的原理。

对于Kotlin 类来说,都可以重写其invoke函数。

    operator fun invoke():String {
        return "I'm fish"
    }

如上,重写了SelectDemo里的invoke函数,和普通成员函数一样,我们可以通过对象调用它。

fun main(args: Array<String>) {
    var selectDemo = SelectDemo()
    var result = selectDemo.invoke()
    println("result:$result")
}

当然,可以进一步简化:

fun main(args: Array<String>) {
    var selectDemo = SelectDemo()
    var result = selectDemo()
    println("result:$result")
}

这里涉及到了kotlin的语法糖:对象居然可以像函数一样调用。

作为函数,invoke 当然也可以接收高阶函数作为参数:

    operator fun invoke(block: (Int) -> String): String {
        return block(3)
    }
fun main(args: Array<String>) {
    var selectDemo = SelectDemo()
    var result = selectDemo { age ->
        when (age) {
            3 -> "I'm fish3"
            4 -> "I'm fish4"
            else -> "error"
        }
    }
    println("result:$result")
}

因此,当看到对象作为函数调用时,实际上调用的是invoke函数,具体的逻辑需要查看其invoke函数的实现。

4. Select 的原理

上篇分析过Channel,因此本篇趁热打铁,通过Select 监听Channel数据的变化来分析其原理,为方便讲解,我们先以监听一个Channel的为例。

先从select 表达式本身入手。

    fun testSelect5() {
        runBlocking {
            var starTime = System.currentTimeMillis()
            var receiveChannelZxing = produce {
                //发送数据
                send("I'm fish")
            }
            //确保channel 数据已经send
            delay(1000)
            var result = select<String> {
                //监听是否有数据发送过来
                receiveChannelZxing.onReceive { value ->
                    "zxing result $value"
                }
            }
            println("result from $result useTime:${System.currentTimeMillis() - starTime}")
        }
    }

select 是挂起函数,因此协程运行到此有可能被挂起。

#Select.kt
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
    //...
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        //传入父协程体
        val scope = SelectBuilderImpl(uCont)
        try {
            //执行builder
            builder(scope)
        } catch (e: Throwable) {
            scope.handleBuilderException(e)
        }
        //通过返回值判断是否需要挂起协程
        scope.getResult()
    }
}

重点看builder(scope),builder 是高阶函数,实际上就是执行了select花括号里的内容,而它里面就是监听数据是否返回。

receiveChannelZxing.onReceive

刚开始看的时候势必以为onReceive是个函数,然而它是ReceiveChannel 里的成员变量:

#Channel.kt
    public val onReceive: SelectClause1<E>

通过上一节的分析可知,关键是要找到SelectClause1 的invoke的实现。

#Select.kt
public interface SelectBuilder<in R> {
    //block 有个入参
    //声明了SelectClause1的扩展函数invoke
    public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
}
override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
    //SelectBuilderImpl 实现了 SelectClause1 的invoke函数
    registerSelectClause1(this@SelectBuilderImpl, block)
}

再看onReceive 的赋值:

#AbstractChannel.kt
final override val onReceive: SelectClause1<E>
    get() = object : SelectClause1<E> {
        @Suppress("UNCHECKED_CAST")
        override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
            registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
        }
    }

因此,简单总结调用栈如下:

当调用receiveChannelZxing.onReceive{},实际上调用了SelectClause1.invoke(),而它里面又调用了SelectClause1.registerSelectClause1(),最终调用了AbstractChannel.registerSelectReceiveMode。

AbstractChannel. registerSelectReceiveMode

#AbstractChannel.kt
private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
    while (true) {
        //如果已经有结果了,则直接返回------->①
        if (select.isSelected) return
        if (isEmptyImpl) {
            //没有发送者在等待,则入队等待,并返回 ------->②
            if (enqueueReceiveSelect(select, block, receiveMode)) return
        } else {
            //直接取出值------->③
            val pollResult = pollSelectInternal(select)
            when {
                pollResult === ALREADY_SELECTED -> return
                pollResult === POLL_FAILED -> {} // retry
                pollResult === RETRY_ATOMIC -> {} // retry
                //调用block------->④
                else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
            }
        }
    }
}

分为4个点,接着来一一分析。

①select 同时监听多个值,若是有1个符合要求的数据返回了,那么该isSelected 标记为true,当检测到该标记为true时直接退出。

结合之前的Demo,zbar 已经识别出结果了,当select 检测zxing的结果时直接返回。

②:

#AbstractChannel.kt
private fun <R> enqueueReceiveSelect(
    select: SelectInstance<R>,
    block: suspend (Any?) -> R,
    receiveMode: Int
): Boolean {
    //构造为Node元素
    val node = AbstractChannel.ReceiveSelect(this, select, block, receiveMode)
    //添加到Channel队列里
    val result = enqueueReceive(node)
    if (result) select.disposeOnSelect(node)
    return result
}

当select 时,发现Channel里没有数据,说明Channel还没有开始send,因此构造了Node(ReceiveSelect)加入到Channel queue里。当send数据时,会查找queue里是否有接收者等待,若有则调用Node(ReceiveSelect.completeResumeReceive):

#AbstractChannel.kt
        override fun completeResumeReceive(value: E) {
            block.startCoroutineCancellable(
                if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
                select.completion,
                resumeOnCancellationFun(value)
            )
        }

block 被调度执行,最后会恢复select 协程的执行。

③取出数据,并尝试恢复send协程。

④在③的基础上,拿到数据后,直接执行block(此时并没有切换线程进行调度)。

小结一下select 原理:

可以看出:

select 本身执行并不耗时,若最终没有数据返回则挂起等待,若是有数据返回则不会挂起协程。

我们从头再捋一下select 配合Channel 的原理:

虽然以Channel为例讲解了select 原理,实际上async等结合select 原理大致差不多,重点都是利用了协程的挂起/恢复做文章。

5. Select注意事项

如果select有多个数据同时到达,select 默认会选择第一个数据,若想要随机选择数据,可做如下处理:

            var result = selectUnbiased<String> {
                //监听是否有数据发送过来
                receiveChannelZxing.onReceive { value ->
                    "zxing result $value"
                }
            }

想要知道select 还可以监听哪些数据,可查看该数据是否实现了SelectClauseX(X 表示0、1、2)。

以上即为Select 的原理及其使用,下篇将会进入协程的精华部分:Flow的运用,该部分内容较多,可能会分几篇分析,敬请期待。

本文基于Kotlin 1.5.3,文中完整Demo传送门

到此这篇关于Kotlin Select多路复用的实现详解的文章就介绍到这了,更多相关Kotlin Select 内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

KotlinSelect协程多路复用的实现详解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

多路复用技术的详细讲解

     “复用”是一种将若干个彼此独立的信号,合并为一个可在同一信道上同时传输的复合信号的方法。比如,传输的语音信号的频谱一般在300~3400Hz内,为了使若干个这种信号能在同一信道上传输,可以把它们的频谱调制到不同的频段,合并在一起而不致相互影响,并能在接收端
多路复用技术的详细讲解
2024-04-17

Java NIO多路复用的方法以及Linux epoll实现原理详解

这篇文章主要介绍“Java NIO多路复用的方法以及Linux epoll实现原理详解”,在日常操作中,相信很多人在Java NIO多路复用的方法以及Linux epoll实现原理详解问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作
2023-06-02

Python用threading实现多线程详解

多线程 多线程是个提高程序运行效率的好办法,本来要顺序执行的程序现在可以并行执行,可想而知效率要提高很多。但是多线程也不是能提高所有程序的效率。程序的两个极端是‘CPU 密集型'和‘I/O 密集型'两种,多线程技术比较适用于后者,因为在串行
2022-06-04

Kotlin使用协程实现高效并发程序流程详解

这篇文章主要介绍了Kotlin使用协程实现高效并发程序流程,协程属于Kotlin中非常有特色的一项技术,因为大部分编程语言中是没有协程这个概念的。那么什么是协程呢?它其实和线程有点相似,可以简单地将它理解成一种轻量级的线程
2023-01-18

Golang使用http协议实现心跳检测程序过程详解

这篇文章主要介绍了Golang使用http协议实现心跳检测程序过程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
2023-03-15

Android隐私协议提示弹窗的实现流程详解

这篇文章主要介绍了Android隐私协议提示弹窗的实现流程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
2023-01-30

C语言三子棋的实现思路到过程详解

所谓三子棋,就是三行三列的棋盘,玩家可以和电脑下棋,率先连成三个的获胜。这篇文章主要为大家详细介绍了如何通过C语言实现三子棋小游戏,感兴趣的小伙伴可以尝试一下
2023-02-13

使用Node.js实现ORM的一种思路详解(图文)

ORM是O和R的映射。O代表面向对象,R代表关系型数据库。二者有相似之处同时也各有特色。就是因为这种即是又非的情况,才需要做映射的。理想情况是,根据关系型数据库(含业务需求)的特点来设计数据库。同时根据面向对象(含业务需求)的特点来设计模型
2022-06-04

支付宝小程序实现类似微信多行输入功能(思路详解)

支付宝小程序实现多行输入功能类似微信,主要通过技术思路和实现步骤实现。首先,使用TextInput组件,通过height和lines属性设定文本框高度和最大行数。其次,监听输入事件,当文本超出单行限制时,自动换行并调整文本框高度。最后,注意height属性只能在组件创建后修改,监听输入事件会影响性能,对于过长文本建议使用TextArea组件。
支付宝小程序实现类似微信多行输入功能(思路详解)
2024-04-02

java编程实现并查集的路径压缩代码详解

首先看两张路径压缩的图片:并查集(Union-find Sets)是一种非常精巧而实用的数据结构,它主要用于处理一些不相交集合的合并问题。一些常见的用途有求连通子图、求最小生成树的 Kruskal 算法和求最近公共祖先(Least Comm
2023-05-30

详解Java中多线程异常捕获Runnable的实现

详解Java中多线程异常捕获Runnable的实现1、背景: Java 多线程异常不向主线程抛,自己处理,外部捕获不了异常。所以要实现主线程对子线程异常的捕获。2、工具: 实现Runnable接口的LayerIn
2023-05-31

编程热搜

  • Android:VolumeShaper
    VolumeShaper(支持版本改一下,minsdkversion:26,android8.0(api26)进一步学习对声音的编辑,可以让音频的声音有变化的播放 VolumeShaper.Configuration的三个参数 durati
    Android:VolumeShaper
  • Android崩溃异常捕获方法
    开发中最让人头疼的是应用突然爆炸,然后跳回到桌面。而且我们常常不知道这种状况会何时出现,在应用调试阶段还好,还可以通过调试工具的日志查看错误出现在哪里。但平时使用的时候给你闹崩溃,那你就欲哭无泪了。 那么今天主要讲一下如何去捕捉系统出现的U
    Android崩溃异常捕获方法
  • android开发教程之获取power_profile.xml文件的方法(android运行时能耗值)
    系统的设置–>电池–>使用情况中,统计的能耗的使用情况也是以power_profile.xml的value作为基础参数的1、我的手机中power_profile.xml的内容: HTC t328w代码如下:
    android开发教程之获取power_profile.xml文件的方法(android运行时能耗值)
  • Android SQLite数据库基本操作方法
    程序的最主要的功能在于对数据进行操作,通过对数据进行操作来实现某个功能。而数据库就是很重要的一个方面的,Android中内置了小巧轻便,功能却很强的一个数据库–SQLite数据库。那么就来看一下在Android程序中怎么去操作SQLite数
    Android SQLite数据库基本操作方法
  • ubuntu21.04怎么创建桌面快捷图标?ubuntu软件放到桌面的技巧
    工作的时候为了方便直接打开编辑文件,一些常用的软件或者文件我们会放在桌面,但是在ubuntu20.04下直接直接拖拽文件到桌面根本没有效果,在进入桌面后发现软件列表中的软件只能收藏到面板,无法复制到桌面使用,不知道为什么会这样,似乎并不是很
    ubuntu21.04怎么创建桌面快捷图标?ubuntu软件放到桌面的技巧
  • android获取当前手机号示例程序
    代码如下: public String getLocalNumber() { TelephonyManager tManager =
    android获取当前手机号示例程序
  • Android音视频开发(三)TextureView
    简介 TextureView与SurfaceView类似,可用于显示视频或OpenGL场景。 与SurfaceView的区别 SurfaceView不能使用变换和缩放等操作,不能叠加(Overlay)两个SurfaceView。 Textu
    Android音视频开发(三)TextureView
  • android获取屏幕高度和宽度的实现方法
    本文实例讲述了android获取屏幕高度和宽度的实现方法。分享给大家供大家参考。具体分析如下: 我们需要获取Android手机或Pad的屏幕的物理尺寸,以便于界面的设计或是其他功能的实现。下面就介绍讲一讲如何获取屏幕的物理尺寸 下面的代码即
    android获取屏幕高度和宽度的实现方法
  • Android自定义popupwindow实例代码
    先来看看效果图:一、布局
  • Android第一次实验
    一、实验原理 1.1实验目标 编程实现用户名与密码的存储与调用。 1.2实验要求 设计用户登录界面、登录成功界面、用户注册界面,用户注册时,将其用户名、密码保存到SharedPreference中,登录时输入用户名、密码,读取SharedP
    Android第一次实验

目录