Kotlin 协程 Select:看我如何多路复用 您所在的位置:网站首页 脑电图结果分析怎么看 Kotlin 协程 Select:看我如何多路复用

Kotlin 协程 Select:看我如何多路复用

2023-05-14 03:24| 来源: 网络整理| 查看: 265

前言

协程系列文章:

一个小故事讲明白进程、线程、Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotlin 协程的挂起/恢复没那么神秘(原理篇) Kotlin 协程调度切换线程是时候解开真相了 Kotlin 协程之线程池探索之旅(与Java线程池PK) Kotlin 协程之取消与异常处理探索之旅(上) Kotlin 协程之取消与异常处理探索之旅(下) 来,跟我一起撸Kotlin runBlocking/launch/join/async/delay 原理&使用 继续来,同我一起撸Kotlin Channel 深水区 Kotlin 协程 Select:看我如何多路复用 Kotlin Sequence 是时候派上用场了 Kotlin Flow啊,你将流向何方? Kotlin Flow 背压和线程切换竟然如此相似 Kotlin SharedFlow&StateFlow 热流到底有多热? 狂飙吧,Lifecycle与协程、Flow的化学反应 来吧!接受Kotlin 协程--线程池的7个灵魂拷问 当,Kotlin Flow与Channel相逢 这一次,让Kotlin Flow 操作符真正好用起来

协程通信三剑客:Channel、Select、Flow,上篇已经分析了Channel的深水区,本篇将会重点分析Select的使用及原理。 通过本篇文章,你将了解到:

Select 的引入 Select 的使用 Invoke函数 的妙用 Select 的原理 Select 注意事项 1. Select 的引入 多路数据的选择 串行执行

如今的二维码识别应用场景越来越广了,早期应用比较广泛的识别SDK如zxing、zbar,它们各有各的特点,也存在识别不出来的情况,为了将两者优势结合起来,我们想到的方法是同一份二维码图片分别给两者进行识别。 如下:

//从zxing 获取二维码信息 suspend fun getQrcodeInfoFromZxing(bitmap: Bitmap?): String { //模拟耗时 delay(2000) return "I'm fish" } //从zbar 获取二维码信息 suspend fun getQrcodeInfoFromZbar(bitmap: Bitmap?): String { delay(1000) return "I'm fish" } fun testSelect() { runBlocking { var bitmap = null var starTime = System.currentTimeMillis() var qrcoe1 = getQrcodeInfoFromZxing(bitmap) var qrcode2 = getQrcodeInfoFromZbar(bitmap) println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms") } } 复制代码

查看打印,最后花费的时间:

qrcode1=I'm fish qrcode2=I'm fish useTime:3013 ms

当然这是串行的方式效率比较低,我们想到了用协程来优化它。

协程并行执行

如下:

fun testSelect1() { var bitmap = null; var starTime = System.currentTimeMillis() var deferredZxing = GlobalScope.async { getQrcodeInfoFromZxing(bitmap) } var deferredZbar = GlobalScope.async { getQrcodeInfoFromZbar(bitmap) } runBlocking { //挂起等待识别结果 var qrcoe1 = deferredZxing.await() //挂起等待识别结果 var qrcode2 = deferredZbar.await() println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms") } } 复制代码

查看打印,最后花费的时间:

qrcode1=I'm fish qrcode2=I'm fish useTime:2084 ms

可以看出,花费时间明显变少了。 与上个Demo 相比,虽然识别过程是放在协程里并行执行的,但是在等待识别结果却是串行的。我们引入两个识别库的初衷是哪个识别快就用哪个的结果,为了达成这个目的,传统的方式是:

同时监听并记录识别结果的返回。

同时监听多路结果

如下:

fun testSelect2() { var bitmap = null; var starTime = System.currentTimeMillis() var deferredZxing = GlobalScope.async { getQrcodeInfoFromZxing(bitmap) } var deferredZbar = GlobalScope.async { getQrcodeInfoFromZbar(bitmap) } var isEnd = false var result: String? = null GlobalScope.launch { if (!isEnd) { //没有结束,则继续识别 var resultTmp = deferredZxing.await() if (!isEnd) { //识别没有结束,说明自己是第一个返回结果的 result = resultTmp println("zxing recognize ok useTime:${System.currentTimeMillis() - starTime} ms") //标记识别结束 isEnd = true } } } GlobalScope.launch { if (!isEnd) { var resultTmp = deferredZbar.await() if (!isEnd) { //识别没有结束,说明自己是第一个返回结果的 result = resultTmp println("zbar recognize ok useTime:${System.currentTimeMillis() - starTime} ms") isEnd = true } } } //检测是否有结果返回 runBlocking { while (!isEnd) { delay(1) } println("recognize result:$result") } } 复制代码

通过检测isEnd 标记来判断是否有某个模块返回结果。 结果如下:

zbar recognize ok useTime:1070 ms recognize result:I'm fish

由于模拟设定的zbar 解析速度快,因此每次都是采纳的是zbar的结果,所花费的时间大幅减少了,该结果符合预期。

Select 闪亮登场

虽说上个Demo结果符合预期,但是多了很多额外的代码、多引入了其它协程,并且需要子模块对标记进行赋值(对"isEnd"进行赋值),没有达到解耦的目的。我们希望子模块的任务是单一且闭环的,如果能在一个函数里统一检测结果的返回就好了。 Select 就是为了解决多路数据的选择而生的。 来看看它是怎么解决该问题的:

fun testSelect3() { var bitmap = null; var starTime = System.currentTimeMillis() var deferredZxing = GlobalScope.async { getQrcodeInfoFromZxing(bitmap) } var deferredZbar = GlobalScope.async { getQrcodeInfoFromZbar(bitmap) } runBlocking { //通过select 监听zxing、zbar 结果返回 var result = select { //监听zxing deferredZxing.onAwait {value-> //value 为deferredZxing 识别的结果 "zxing result $value" } //监听zbar deferredZbar.onAwait { value-> "zbar result $value" } } //运行到此,说明已经有结果返回 println("result from $result useTime:${System.currentTimeMillis() - starTime}") } } 复制代码

结果如下:

result from zbar result I'm fish useTime:1079

符合预期,同时可以看出:相比上个Demo,这样写简洁了许多。

2. Select 的使用

除了可以监听async的结果,Select 还可以监听Channel的发送方/接收方 数据,我们以监听接收方数据为例:

fun testSelect4() { runBlocking { var bitmap = null; var starTime = System.currentTimeMillis() var receiveChannelZxing = produce { //生产数据 var result = getQrcodeInfoFromZxing(bitmap) //发送数据 send(result) } var receiveChannelZbar = produce { var result = getQrcodeInfoFromZbar(bitmap) send(result) } var result = select { //监听是否有数据发送过来 receiveChannelZxing.onReceive { value->"zxing result $value" } receiveChannelZbar.onReceive { value->"zbar result $value" } } println("result from $result useTime:${System.currentTimeMillis() - starTime}") } } 复制代码

结果如下:

result from zbar result I'm fish useTime:1028

不论是async还是Channel,Select 都可以监听它们的数据,从而形成多路复用的效果。

image.png

在监听协程里调用select 表达式,表达式{}内声明需要监听的协程的数据,对于select 来说有两种场景:

没有数据,则select 挂起协程并等待直到其它协程数据准备完成后再次恢复select 所在的协程。 有数据,则select 正常执行并返回获取的数据。 3. Invoke函数 的妙用

在分析Select 原理之前,需要弄明白invoke函数的原理。 对于Kotlin 类来说,都可以重写其invoke函数。

operator fun invoke():String { return "I'm fish" } 复制代码

如上,重写了SelectDemo里的invoke函数,和普通成员函数一样,我们可以通过对象调用它。

fun main(args: Array) { var selectDemo = SelectDemo() var result = selectDemo.invoke() println("result:$result") } 复制代码

当然,可以进一步简化:

fun main(args: Array) { var selectDemo = SelectDemo() var result = selectDemo() println("result:$result") } 复制代码

这里涉及到了kotlin的语法糖:对象居然可以像函数一样调用。 作为函数,invoke 当然也可以接收高阶函数作为参数:

operator fun invoke(block: (Int) -> String): String { return block(3) } fun main(args: Array) { var selectDemo = SelectDemo() var result = selectDemo { age -> when (age) { 3 -> "I'm fish3" 4 -> "I'm fish4" else -> "error" } } println("result:$result") } 复制代码

因此,当看到对象作为函数调用时,实际上调用的是invoke函数,具体的逻辑需要查看其invoke函数的实现。

4. Select 的原理

上篇分析过Channel,因此本篇趁热打铁,通过Select 监听Channel数据的变化来分析其原理,为方便讲解,我们先以监听一个Channel的为例。 先从select 表达式本身入手。

fun testSelect5() { runBlocking { var starTime = System.currentTimeMillis() var receiveChannelZxing = produce { //发送数据 send("I'm fish") } //确保channel 数据已经send delay(1000) var result = select { //监听是否有数据发送过来 receiveChannelZxing.onReceive { value -> "zxing result $value" } } println("result from $result useTime:${System.currentTimeMillis() - starTime}") } } 复制代码

select 是挂起函数,因此协程运行到此有可能被挂起。

#Select.kt public suspend inline fun select(crossinline builder: SelectBuilder.() -> Unit): R { //... return suspendCoroutineUninterceptedOrReturn { uCont -> //传入父协程体 val scope = SelectBuilderImpl(uCont) try { //执行builder builder(scope) } catch (e: Throwable) { scope.handleBuilderException(e) } //通过返回值判断是否需要挂起协程 scope.getResult() } } 复制代码

重点看builder(scope),builder 是高阶函数,实际上就是执行了select花括号里的内容,而它里面就是监听数据是否返回。

receiveChannelZxing.onReceive 刚开始看的时候势必以为onReceive是个函数,然而它是ReceiveChannel 里的成员变量:

#Channel.kt public val onReceive: SelectClause1 复制代码

通过上一节的分析可知,关键是要找到SelectClause1 的invoke的实现。

#Select.kt public interface SelectBuilder { //block 有个入参 //声明了SelectClause1的扩展函数invoke public operator fun SelectClause1.invoke(block: suspend (Q) -> R) } override fun SelectClause1.invoke(block: suspend (Q) -> R) { //SelectBuilderImpl 实现了 SelectClause1 的invoke函数 registerSelectClause1(this@SelectBuilderImpl, block) } 复制代码

再看onReceive 的赋值:

#AbstractChannel.kt final override val onReceive: SelectClause1 get() = object : SelectClause1 { @Suppress("UNCHECKED_CAST") override fun registerSelectClause1(select: SelectInstance, block: suspend (E) -> R) { registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R) } } 复制代码

因此,简单总结调用栈如下:

当调用receiveChannelZxing.onReceive{},实际上调用了SelectClause1.invoke(),而它里面又调用了SelectClause1.registerSelectClause1(),最终调用了AbstractChannel.registerSelectReceiveMode。

AbstractChannel. registerSelectReceiveMode

#AbstractChannel.kt private fun registerSelectReceiveMode(select: SelectInstance, receiveMode: Int, block: suspend (Any?) -> R) { while (true) { //如果已经有结果了,则直接返回------->① if (select.isSelected) return if (isEmptyImpl) { //没有发送者在等待,则入队等待,并返回 ------->② if (enqueueReceiveSelect(select, block, receiveMode)) return } else { //直接取出值------->③ val pollResult = pollSelectInternal(select) when { pollResult === ALREADY_SELECTED -> return pollResult === POLL_FAILED -> {} // retry pollResult === RETRY_ATOMIC -> {} // retry //调用block------->④ else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult) } } } } 复制代码

分为4个点,接着来一一分析。 ① select 同时监听多个值,若是有1个符合要求的数据返回了,那么该isSelected 标记为true,当检测到该标记为true时直接退出。 结合之前的Demo,zbar 已经识别出结果了,当select 检测zxing的结果时直接返回。

#AbstractChannel.kt private fun enqueueReceiveSelect( select: SelectInstance, block: suspend (Any?) -> R, receiveMode: Int ): Boolean { //构造为Node元素 val node = AbstractChannel.ReceiveSelect(this, select, block, receiveMode) //添加到Channel队列里 val result = enqueueReceive(node) if (result) select.disposeOnSelect(node) return result } 复制代码

当select 时,发现Channel里没有数据,说明Channel还没有开始send,因此构造了Node(ReceiveSelect)加入到Channel queue里。当send数据时,会查找queue里是否有接收者等待,若有则调用Node(ReceiveSelect.completeResumeReceive):

#AbstractChannel.kt override fun completeResumeReceive(value: E) { block.startCoroutineCancellable( if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value, select.completion, resumeOnCancellationFun(value) ) } 复制代码

block 被调度执行,最后会恢复select 协程的执行。

③ 取出数据,并尝试恢复send协程。

④ 在③的基础上,拿到数据后,直接执行block(此时并没有切换线程进行调度)。

小结一下select 原理:

image.png

可以看出:

select 本身执行并不耗时,若最终没有数据返回则挂起等待,若是有数据返回则不会挂起协程。

我们从头再捋一下select 配合Channel 的原理:

image.png

虽然以Channel为例讲解了select 原理,实际上async等结合select 原理大致差不多,重点都是利用了协程的挂起/恢复做文章。

5. Select 注意事项

如果select有多个数据同时到达,select 默认会选择第一个数据,若想要随机选择数据,可做如下处理:

var result = selectUnbiased { //监听是否有数据发送过来 receiveChannelZxing.onReceive { value -> "zxing result $value" } } 复制代码

想要知道select 还可以监听哪些数据,可查看该数据是否实现了SelectClauseX(X 表示0、1、2)。

以上即为Select 的原理及其使用,下篇将会进入协程的精华部分:Flow的运用,该部分内容较多,可能会分几篇分析,敬请期待。

本文基于Kotlin 1.5.3,文中完整Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力 持续更新中,和我一起步步为营系统、深入学习Android/Kotlin


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有