Kotlin 协程之线程池探索之旅(与Java线程池PK) 您所在的位置:网站首页 windows查看线程占用cpu Kotlin 协程之线程池探索之旅(与Java线程池PK)

Kotlin 协程之线程池探索之旅(与Java线程池PK)

2023-05-06 09:27| 来源: 网络整理| 查看: 265

前言

协程系列文章:

一个小故事讲明白进程、线程、Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotlin 协程的挂起/恢复没那么神秘(原理篇) Kotlin 协程调度切换线程是时候解开真相了 Kotlin 协程之线程池探索之旅(与Java线程池PK) Kotlin 协程之取消与异常处理探索之旅(上) Kotlin 协程之取消与异常处理探索之旅(下) 来,跟我一起撸Kotlin runBlocking/launch/join/async/delay 原理&使用 继续来,同我一起撸Kotlin Channel 深水区 Kotlin 协程 Select:看我如何多路复用 Kotlin Sequence 是时候派上用场了 Kotlin Flow啊,你将流向何方? Kotlin Flow 背压和线程切换竟然如此相似 Kotlin SharedFlow&StateFlow 热流到底有多热? 狂飙吧,Lifecycle与协程、Flow的化学反应 来吧!接受Kotlin 协程--线程池的7个灵魂拷问 当,Kotlin Flow与Channel相逢 这一次,让Kotlin Flow 操作符真正好用起来

上篇文章分析了协程切换到主线程执行的详细流程,本篇将分析如何切换到子线程执行。 通过本篇文章,你将了解到:

切换到子线程场景 Dispatchers.Default 分发流程详解 Dispatchers.IO 分发流程详解 与Java 线程池比对 协程到底在哪个线程执行? 1. 切换到子线程场景 Demo 展示

先看一个最常见的网络请求Demo:

fun showStuName() { GlobalScope.launch(Dispatchers.Main) { var stuInfo = withContext(Dispatchers.IO) { //模拟网络请求 Thread.sleep(3000) "我是小鱼人" } //展示 Toast.makeText(context, stuInfo, Toast.LENGTH_SHORT).show() } } 复制代码

因为是耗时操作,因此需要切换到子线程处理,又因为是网络请求,属于I/O操作,因此使用Dispatchers.IO 分发器。

若任务偏向于计算型,比较耗费CPU,可以改写如下:

fun dealCpuTask() { GlobalScope.launch(Dispatchers.Main) { //切换到子线程 withContext(Dispatchers.Default) { var i = 0 val count = 100000 while(i < count) { Thread.sleep(1) } } } } 复制代码 Dispatchers.IO/Dispatchers.Default 异同

两者都是协程分发器,Dispatchers.IO 侧重于任务本身是阻塞型的,比如文件、数据库、网络等操作,此时是不怎么占用CPU的。而Dispatchers.Default 侧重于计算型的任务,可能会长时间占用CPU。 协程线程池在设计的时候,针对两者在线程的调度策略上有所不同。

image.png

2. Dispatchers.Default 分发流程详解 任务分发

以上面的Demo 为例,从源码角度分析分发流程。 从前面的文章很容易了解到:withContext()函数里构造了DispatchedContinuation,它本身也是个Runnable,通过:

//this 指DispatchedContinuation 本身 dispatcher.dispatch(context, this) 复制代码

进行分发。 而dispatcher 就是分发器,我们这里用的是Dispatchers.Default,因此先来看看它的实现。

#Dispatchers.kt actual object Dispatchers { @JvmStatic actual val Default: CoroutineDispatcher = createDefaultDispatcher() @JvmStatic public val IO: CoroutineDispatcher = DefaultScheduler.IO @JvmStatic public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher } 复制代码

可以看出Dispatchers 是个单例。

#CoroutineContext.kt //useCoroutinesScheduler 默认为true //使用DefaultScheduler internal actual fun createDefaultDispatcher(): CoroutineDispatcher = if (useCoroutinesScheduler) DefaultScheduler else CommonPool #Dispatcher.kt internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { //定义IO 分发器 //... } 复制代码

DefaultScheduler 也是个单例,内容不多,其功能实现还得继续往上看。 ExperimentalCoroutineDispatcher 定义如下:

#Dispatcher.kt open class ExperimentalCoroutineDispatcher( //核心线程数 private val corePoolSize: Int, //最大线程个数 private val maxPoolSize: Int, //空闲线程的存活时间 private val idleWorkerKeepAliveNs: Long, //线程名前缀 private val schedulerName: String = "CoroutineScheduler" ) : ExecutorCoroutineDispatcher() { constructor( //初始化参数的值 corePoolSize: Int = CORE_POOL_SIZE, maxPoolSize: Int = MAX_POOL_SIZE, schedulerName: String = DEFAULT_SCHEDULER_NAME ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName) //真正的线程池实现 private var coroutineScheduler = createScheduler() //分发 override fun dispatch(context: CoroutineContext, block: Runnable): Unit = try { //分发实现 coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { //... } } //真正的线程池实现为:CoroutineScheduler private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) 复制代码

查看CoroutineScheduler.dispatch()函数:

//block 为DispatchedContinuation fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { //构建Task对象,block 本身就是Task类型 val task = createTask(block, taskContext) //当前线程是否是Worker类型,也就是说当前线程是否是线程池内的线程 val currentWorker = currentWorker()//① //如果是,则尝试提交任务到本地队列,否则返回任务本身 val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)//② if (notAdded != null) { //如果没有提交到本地队列,则提交到全局队列 if (!addToGlobalQueue(notAdded)) {//③ //添加队列失败则抛出异常 throw RejectedExecutionException("$schedulerName was terminated") } } //是否需要跳过唤醒线程,主要用在IO分发器 val skipUnpark = tailDispatch && currentWorker != null if (task.mode == TASK_NON_BLOCKING) {//④ if (skipUnpark) return //非阻塞任务,唤醒cpu 线程 signalCpuWork()//⑤ } else { //阻塞任务,唤醒blocking 线程 signalBlockingWork(skipUnpark = skipUnpark)//⑥ } } 复制代码

这函数是分发核心,注释里标明了6个点,现在一一阐述: ①

private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this } 复制代码

Worker 本身是继承自Thread 的,因此Worker 是线程类,代表线程池里的线程。通过判断是否是Worker类型来确认当前线程是否属于线程池内的线程。

private fun CoroutineScheduler.Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? { //Worker 为空,直接返回任务本身 if (this == null) return task //非阻塞的任务,则直接返回 if (task.mode == TASK_NON_BLOCKING && state === CoroutineScheduler.WorkerState.BLOCKING) { return task } //表示本地队列里存有任务了 mayHaveLocalTasks = true //加入到本地队列里 //localQueue 为Worker的成员变量 return localQueue.add(task, fair = tailDispatch) } 复制代码

③ 若是②没有成功加入到本地队列里,则尝试加入到全局队列里:

private fun addToGlobalQueue(task: Task): Boolean { return if (task.isBlocking) { //加入到阻塞队列 globalBlockingQueue.addLast(task) } else { //加入到cpu队列 globalCpuQueue.addLast(task) } } 复制代码

结合②③分析,目前为止,出现了三个队列:

image.png

④ 主要用于判断任务是阻塞还是非阻塞的,这在任务构造的时候就已经指定,若是使用Dispatchers.Default 分发器,那么构造的任务是非阻塞的,而使用Dispatchers.IO,则构造的任务是阻塞的。

⑤ ⑤⑥ 是针对阻塞与否进行不同的处理。

fun signalCpuWork() { //尝试去唤醒正在挂起的线程,若是有线程可以被唤醒,则无需创建新线程 if (tryUnpark()) return //若唤醒不成功,则需要尝试创建线程 if (tryCreateWorker()) return //再试一次,边界条件 tryUnpark() } 复制代码

tryUnpark()函数主要作用是从栈里取出挂起的线程(Worker),入栈的的时机是当线程没有任务可以处理时进行挂起,此时会记录在栈里。 重点是tryCreateWorker()函数:

private fun tryCreateWorker(state: Long = controlState.value): Boolean { //获取当前已经创建的线程数 val created = createdWorkers(state) //获取当前阻塞的任务数 val blocking = blockingTasks(state) //已创建的线程数-阻塞的任务数=非阻塞的线程数 //coerceAtLeast(0) 表示结果至少是0 val cpuWorkers = (created - blocking).coerceAtLeast(0) //如果非阻塞数小于核心线程数 if (cpuWorkers < corePoolSize) { //创建线程 val newCpuWorkers = createNewWorker() //如果当前只有一个非阻塞线程并且核心线程数>1,那么再创建一个线程 //目的是为了方便"偷"任务... if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker() //创建成功 if (newCpuWorkers > 0) return true } return false } 复制代码

创建新线程为什么与阻塞任务的多少关联呢? 简单举个例子:

现在若是已经创建了5个线程,而这几个线程都在执行IO任务,此时就需要再创建新的线程来执行任务,因为此时CPU是空闲的。 只要非阻塞任务的个数小于核心线程数,那么就需要创建新的线程,目的是为了充分利用CPU。

再看createNewWorker() 是如何创建新的线程(Worker)的。

private fun createNewWorker(): Int { //workers 为Worker 数组,因为需要对数组进行add 操作,因此需要同步访问 synchronized(workers) { if (isTerminated) return -1 val state = controlState.value //获取已创建的线程数 val created = createdWorkers(state) //阻塞的任务数 val blocking = blockingTasks(state) //非阻塞的线程数 val cpuWorkers = (created - blocking).coerceAtLeast(0) //非阻塞的线程数不能超过核心线程数 if (cpuWorkers >= corePoolSize) return 0 //已创建的线程数不能大于最大线程数 if (created >= maxPoolSize) return 0 val newIndex = createdWorkers + 1 require(newIndex > 0 && workers[newIndex] == null) //构造线程 val worker = Worker(newIndex) //记录到数组里 workers[newIndex] = worker //记录创建的线程数 require(newIndex == incrementCreatedWorkers()) //开启线程 worker.start() //当前非阻塞线程数 return cpuWorkers + 1 } } 复制代码

⑥ signalBlockingWork()函数调用时会记录阻塞的任务数,其它与signalCpuWork 一致。

至此,Dispatchers.Default 任务分发流程已经结束,其重点:

构造任务,添加到队列里(三个队列中选一个)。 唤醒挂起的线程或是创建新的线程。 任务执行

既然任务都提交到队列了,该线程出场执行任务了。

internal inner class Worker private constructor() : Thread() {} 复制代码

Worker 创建并启动后,将会执行run()函数:

override fun run() = runWorker() private fun runWorker() { var rescanned = false //一直查找,除非worker终止了 while (!isTerminated && state != CoroutineScheduler.WorkerState.TERMINATED) { //从队列里寻找任务 //mayHaveLocalTasks:本地队列里是否有任务 val task = findTask(mayHaveLocalTasks) //① if (task != null) { rescanned = false minDelayUntilStealableTaskNs = 0L //任务获取到后,执行任务 executeTask(task)//② //任务执行完毕,继续循环查找任务 continue } else { mayHaveLocalTasks = false } if (minDelayUntilStealableTaskNs != 0L) { //延迟一会儿,再去偷 if (!rescanned) { rescanned = true } else { //挂起一段时间 } continue } //尝试挂起 tryPark()//③ } //释放token tryReleaseCpu(CoroutineScheduler.WorkerState.TERMINATED) } 复制代码

同样的,标注了3个重点,一一分析之。

① findTask()顾名思义:找任务。 传入的参数表示是否扫描本地队列,若是之前有提交任务到本地队列,则此处mayHaveLocalTasks = true。

fun findTask(scanLocalQueue: Boolean): Task? { //尝试获取cpu 许可 //若是拿到cpu 许可,则可以执行任何任务 if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) //拿不到,若是本地队列有任务,则从本地取,否则从全局阻塞队列取 val task = if (scanLocalQueue) { localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() } else { globalBlockingQueue.removeFirstOrNull() } //都拿不到,则偷别人的 return task ?: trySteal(blockingOnly = true) } private fun findAnyTask(scanLocalQueue: Boolean): Task? { if (scanLocalQueue) { //可以从本地队列找 val globalFirst = nextInt(2 * corePoolSize) == 0 if (globalFirst) pollGlobalQueues()?.let { return it } localQueue.poll()?.let { return it } if (!globalFirst) pollGlobalQueues()?.let { return it } } else { //从全局队列找 pollGlobalQueues()?.let { return it } } //偷别人的 return trySteal(blockingOnly = false) } 复制代码

此处解释一下获取cpu 许可的含义:

它和核心线程数相关,假设我们是8核CPU,那么同一时间最多只能有8个线程在CPU上执行。因此,若是其它线程想要执行非阻塞任务(占用CPU),需要申请许可(token),申请成功说明有CPU空闲,此时该线程可以执行非阻塞任务。否则,只能执行阻塞任务。

当从本地队列、全局队列里都没找出任务时,当前的Worker打起了别个Woker的主意。我们知道全局队列是所有Worker共享,而本地队列是每个Worker私有的。因此,当前Worker发现自己没任务可以执行的时候会去看看其它Worker的本地队列里是否有可以执行的任务,若是有就可以偷过来用。 看看如何偷的:

private fun trySteal(blockingOnly: Boolean): Task? { //自己本地没有才能偷 kotlinx.coroutines.assert { localQueue.size == 0 } //所有的已创建的workers个数 val created = createdWorkers //遍历workers数组 repeat(created) { ++currentIndex if (currentIndex > created) currentIndex = 1 val worker = workers[currentIndex] if (worker !== null && worker !== this) { //从别的worker里的本地队列偷到自己的本地队列 val stealResult = if (blockingOnly) { localQueue.tryStealBlockingFrom(victim = worker.localQueue) } else { localQueue.tryStealFrom(victim = worker.localQueue) } //偷成功,则取出任务 if (stealResult == TASK_STOLEN) { return localQueue.poll() } else if (stealResult > 0) { minDelay = min(minDelay, stealResult) } } } //...没偷到 return null } 复制代码

实际上偷的本质是:

从别人的本队队列里取出任务放到自己的本地队列,最后取出任务返回。

② 拿到任务后,就开始执行任务。

private fun executeTask(task: Task) { //模式:阻塞/非阻塞 val taskMode = task.mode idleReset(taskMode) //当前任务是非阻塞任务,则尝试释放cpu token,并执行signalCpuWork beforeTask(taskMode) //真正执行任务 runSafely(task) //修改状态 afterTask(taskMode) } fun runSafely(task: Task) { try { //task 其实就是DispatchedContinuation task.run() } catch (e: Throwable) { //.. } finally { unTrackTask() } } 复制代码

此时线程正式执行任务了。

③ 若是线程没有找到任何任务执行,则尝试挂起。

private fun tryPark() { //没有在挂起栈里 if (!inStack()) { //将worker放入挂起栈里 parkedWorkersStackPush(this) return } //... while (inStack() && workerCtl.value == CoroutineScheduler.PARKED) { // Prevent spurious wakeups if (isTerminated || state == CoroutineScheduler.WorkerState.TERMINATED) break //... //真正挂起,并标记worker state 状态 park() } } 复制代码

最后一步的park()里会修改state = WorkerState.TERMINATED,在最外层的循环里会判断该标记,若是终止了,则循环停止,整个线程执行结束。

至此,任务执行流程结束,其重点:

从全局队列、本地队列里查找任务。 若是没找到,则尝试从别的Worker 本地队列里偷取任务。 1、2 能够找到任务则执行Runnable.run()函数,该函数里最终会执行协程体里的代码。 若是没有任务,则根据策略挂起一段时间或是最终退出线程的执行。

结合任务分发与任务执行流程,有如下流程图:

image.png

3. Dispatchers.IO 分发流程详解 Dispatchers.IO 定义 internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { //创建LimitingDispatcher val IO: CoroutineDispatcher = LimitingDispatcher( this, systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), "Dispatchers.IO", TASK_PROBABLY_BLOCKING ) //... } 复制代码

Dispatchers.IO 作为DefaultScheduler 里的成员变量,并且它的分发器使用的是DefaultScheduler 本身。 构造函数里指明了并行的数量限制,以及它属于TASK_PROBABLY_BLOCKING(阻塞任务)。

任务分发 private fun dispatch(block: Runnable, tailDispatch: Boolean) { var taskToSchedule = block while (true) { //记录在等待执行的任务 val inFlight = inFlightTasks.incrementAndGet() //如果小于并行数 if (inFlight = parallelism) { return } //若释放了,则取出队列里的任务执行 taskToSchedule = queue.poll() ?: return } } 复制代码

可以看出Dispatchers.IO 任务分发是借助于DefaultScheduler,也就是Dispatchers.Default的能力,因此两者是共用一个线程池。 只是Dispatchers.IO 比较特殊,它有个队列,该队列作用:

当IO 任务分派个数超过设定的并行数时,不会直接进行分发,而是先存放在队列里。

那它什么时候取出来呢? 当任务执行完毕,也就是DispatchedTask.run()函数执行完毕后会调用: taskContext.afterTask(),来看它的实现:

override fun afterTask() { //从队列里取出 var next = queue.poll() if (next != null) { //继续分发 dispatcher.dispatchWithContext(next, this, true) return } inFlightTasks.decrementAndGet() //... } 复制代码

举个简单例子:

假设现在最大的并行数是64,线程池分配了64个线程执行IO任务,当第65个任务到来之时,因为超出了64,因此会放入队列里。当64个任务有某个任务执行完毕后,会从队列里取出第65个任务进行分发。

这样做的目的是什么呢?

为了限制突然间创建了许多IO线程,浪费资源,因此在线程池之外再加了一层防护,多出的任务先进入缓冲队列。

4. 与Java 线程池比对

使用过Java 线程池的小伙伴可能会知道,Java 线程池与Kotlin协程池 本质上都是:"池化技术的体现”。 它们的优势:

减少线程频繁开启/关闭的资源消耗。 及时响应并执行任务。 较好地管控/监控 应用内的线程使用。

Java 线程池原理:

核心线程+队列+非核心线程。 首先使用核心线程执行任务,若是核心线程个数已满,则将任务加入到队列里,核心线程从队列里取出任务执行,若是队列已满,则再开启非核心线程执行任务。

更详细的Java 线程池原理与使用请移步:Java 线程池之必懂应用-原理篇(上)

协程线程池原理:

全局队列(阻塞+非阻塞)+ 本地队列。 IO 任务分发还有个缓存队列。 线程从队列里寻找任务(包括偷)并执行,若是使用IO 分发器,则超出限制的任务将会放到缓存队列里。

两者区别:

Java 线程池开放API,比较灵活,调用者可以根据不同的需求组合不同形式的线程池,没有区分任务的特点(阻塞/非阻塞)。 协程线程池专供协程使用,区分任务特点,进而进行更加合理的调度。 5. 协程到底在哪个线程执行?

回到我们上篇末尾的问题:

fun launch3() { GlobalScope.launch(Dispatchers.IO) { println("1>>>${Thread.currentThread()}") withContext(Dispatchers.Default) { println("2>>>${Thread.currentThread()}") delay(2000) println("3>>>${Thread.currentThread()}") } println("4>>>${Thread.currentThread()}") } } 复制代码

理解了线程池原理,答案就呼之欲出了。 1、4 可能不在同一线程。 2、3 可能不在同一线程。 1、2 可能在同一线程。

看到这结果,你可能会觉得:废话! 容我解释:因为线程池本身的调度侧重于执行任务,而非使用哪个特定的线程执行,因此具体分派到哪个线程执行需要看哪个线程刚好拿到了任务。

下篇将分析协程的取消与异常处理,敬请关注。

本文基于Kotlin 1.5.3,文中完整Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力 持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

1、Android各种Context的前世今生 2、Android DecorView 必知必会 3、Window/WindowManager 不可不知之事 4、View Measure/Layout/Draw 真明白了 5、Android事件分发全套服务 6、Android invalidate/postInvalidate/requestLayout 彻底厘清 7、Android Window 如何确定大小/onMeasure()多次执行原因 8、Android事件驱动Handler-Message-Looper解析 9、Android 键盘一招搞定 10、Android 各种坐标彻底明了 11、Android Activity/Window/View 的background 12、Android Activity创建到View的显示过 13、Android IPC 系列 14、Android 存储系列 15、Java 并发系列不再疑惑 16、Java 线程池系列 17、Android Jetpack 前置基础系列 18、Android Jetpack 易学易懂系列 19、Kotlin 轻松入门系列 20、Kotlin 协程系列全面解读



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有