文章同步发布于公众号:移动开发那些事白话kotlin协程
1 什么是协程
Kotlin协程(Coroutine
)是一种轻量级的线程管理框架,允许开发者以更简洁,更高效的方式处理异步操作,避免回调地狱和线程阻塞,它有几个核心特性:
- 挂起与恢复
suspend
:可在耗时操作时挂起,释放线程资源,完成后自动恢复; - 非阻塞式并发:通过协作式调度实现任务的切换;
- 结构化并发:通过作用域自动管理协程生命周期;
2 协程的使用
2.1 协程的启动和调度
一般需要一个协程的作用域来启动和管理协程,然后在作用域里
使用launch
或者async
函数启动协程,使用suspend
来挂起协程:
launch
: 用于非阻塞的异步任务;async
: 用于可能返回结果的异步任务
fun main() = runBlocking {// 在默认的环境里启动协程val scope = CoroutineScope(Dispatchers.Default)scope.launch {// 在这里执行异步任务val result = requestData();}val result = scope.async {// 执行可返回结果的异步任务}// result.await() 拿到具体的结果}
// suspend关键字,表示这个方法在协程域内调用,不会阻塞线程
suspend fun requestData():Map {//实际的执行
}
并且在协程的使用过程中,可通过Dispatchers
来指定协程运行在哪个环境(这里的环境主要是指运行在哪个线程池下,不同的线程池有不同的处理策略):
Main
: 主线程IO
: 网络/文件操作Default
: CPU密集型计算
例:
// 在IO环境下启动协程
val result = async(Dispatchers.IO) {执行异步操作
}
2.2 协程的取消
协程的取消可使用cancle
的关键字来处理。在启动协程时,会返回一个Job
对象,在需要取消时,可调用job.cancel
来达到这个目的;
val job: Job = GlobalScope.launch {// 执行异步操作
}
// 取消
job.cancel()
2.3 协程的异常处理
异常的处理可使用自定义的CoroutineExceptionHandler
或者try-catch
块来实现;
val task1 = launch(CoroutineExceptionHandler { _, e -> logError(e) }) { /* ... */ }try {task1.join()} catch (e: CancellationException) {// 处理取消异常}
3 协程底层实现原理
协程底层是基于协程的生命周期状态来处理的,协程的生命周期包括:
New
: 创建但未启动时的初始态;Active
Running
: 正在执行代码,会占用线程资源Suspended
: 执行suspend
函数时,释放线程,等待恢复;
Completed
:完成状态(正常完成或者异常完成)Cancelling
: 取消状态,进入资源清理阶段,但可执行finally
代码块;
3.1 核心原理
其核心原理是基于状态机。当遇到一个挂起函数时,就会将协程的代码置换为一个状态机,如代码里,有一个挂起函数:
suspend fun doSomething() {delay(1000) // 挂起函数println("Something done")
}fun main() = runBlocking {launch {doSomething()}println("Main function continues")
}
编译器会将上面的代码编译成:
// 简化的状态机代码示意
class DoSomethingCoroutine : Continuation<Unit> {// 当前状态var state = 0override fun resumeWith(result: Result<Unit>) {when (state) {0 -> {state = 1// 调用 delay 函数并传入当前协程作为 continuation// delay函数执行完成后,会调用resumeWith方法delay(1000, this)}1 -> {println("Something done")}}}
}
在挂起的异步操作完成后,会调用协程的resumeWith
方法,将结果传递给协程,协程会从暂停的位置恢复执行,并根据状态机的状态继续执行后续的代码;
3.2 调度器(CoroutineDispatcher
)原理
协程调度器负责决定协程在哪个线程或者线程池上使用;
- Default:用于CPU密集型,默认使用一个线程池
- IO: 专门的线程池
- Main: 用于在主线程上执行协程,通常用于更新UI;
3.2.1 Default
Dispatchers.Default
使用了一个基于ForkJoinPool
的线程池。ForkJoinPool
是 Java 7 引入的一种特殊线程池,它采用工作窃取算法(Work-Stealing Algorithm),可以高效地处理大量的小任务。
当一个协程通过 Dispatchers.Default
调度执行时,dispatch
方法会将协程任务封装成一个 Runnable
对象,并将其提交到 ForkJoinPool
中。ForkJoinPool 会从线程池中选择一个空闲的线程来执行该任务。
3.2.2 IO
Dispatchers.IO
也使用了一个线程池,不过这个线程池的大小可以根据系统资源动态调整。它的目的是为了处理大量的 I/O 阻塞操作,避免阻塞其他协程的执行。
当一个协程通过 Dispatchers.IO
调度执行时,dispatch
方法会将协程任务封装成一个 Runnable
对象,并将其提交到 IO 线程池中。由于 I/O 操作通常会阻塞线程,IO 线程池会有足够的线程来处理这些阻塞操作,从而保证其他协程可以继续执行。
3.2.3 Main
当一个协程通过 Dispatchers.Main
调度执行时,dispatch
方法会将协程任务封装成一个Runnable
对象,并通过Handler
将其发送到主线程的消息队列中。主线程的消息循环会依次取出消息队列中的任务并执行。
4 Flow
Flow
是 kotlin协程中的响应式编程,基于协程构建,主要用于处理异步数据流,并且是冷流,只在被收集时才会开始发送元素(同时,Flow
具有背压机制用于处理生产者和消费者的速度不匹配的问题),其有几个关键的组件:
- Flow 接口: 表示一个冷流,只有在被收集时才会开始发射元素,并且提供了一系列的操作符用于数据流的转换和处理;
- FlowCollector:用于收集
Flow
发射的元素; - FlowBuilder:用于构建Flow对象,常见的构建方式有:
flow
,flowOf
,asFlow
4.1 常见的应用场景
- 异步数据流
- UI数据更新(数据以Flow的形式暴露给UI,数据发生变化时,UI可以自动更新)
- 事件处理(将各种事件转换为Flow进行处理)
val dataFlow:Flow<String> = flow {emit("data")
}
// 默认不使用背压机制,当速度不匹配时,生产者会先暂停等前面的数据处理完再继续生产数据
dataFlow.collect{data ->.....
}
4.2 背压机制
背压(Backpressure)是一种反馈机制,用于处理生产者产生数据的速度快于消费者处理数据的速度的情况。当消费者处理数据的能力有限时,如果生产者持续快速地产生数据,可能会导致消费者内存溢出或者系统资源耗尽。背压机制允许消费者向生产者反馈自身的处理能力,从而使生产者调整数据的产生速度,以达到生产者和消费者之间的平衡。
常见的几个背压操作符:
buffer
: 创建一个缓冲区,来不及消息的内容会放到缓冲区里;conflate
: 会丢弃缓冲区中未处理的数据,只保留最新的数据collectLatest
: 当有新数据时,会取消当前正在处理的数据,只处理最新的数据
val dataFlow:Flow<String> = flow {emit("data")
}// 指定使用buffer的背压策略
dataFlow.buffer().collect{data ->.....
}
5 Channel
Channel
是 Kotlin 协程库中用于在协程之间进行通信的工具,类似于队列,支持一个或多个协程向其发送元素,也支持一个或多个协程从其中接收元素,可用于实现生产者 - 消费者模式,它有不同的创建方式:
- Channel<类型>(10) : 创建固定大小的有缓冲的channel
- Channel<类型>(Channel.RENDEZVOUS) :创建无缓冲的channel,发送和接收要同步
- Channel<类型>(Channel.UNLIMITED) :创建无限缓冲的channel,
一个简单的使用Channel
的示例:
fun main() = runBlocking {// 初始化一个默认大小的channel;val channel = Channel<Int>()// 生产者协程launch {for (i in 1..5) {// 通过send发送元素,如果缓冲区已满,该操作会挂起,直到有空间可用channel.send(i)println("Sent $i")}// 关闭 Channel,关闭后不能再发送数据,但可以继续接收数据channel.close() }// 消费者协程launch {// 当channel关闭,并且没有更多元素时,这个循环会自动结束for (element in channel) {// 会通过receive()方法接收元素println("Received $element")}println("Channel closed")}
}
5.1 Channel的类型
5.1.3 带容量限制的缓冲 Channel
指定一个固定的容量,当缓冲区满时,发送操作会挂起。
// 创建一个容量为10的Channel
val channel = Channel<Int>(10)
5.1.3 无缓冲的 Channel(Channel.RENDEZVOUS)
发送者和接收者必须同时准备好,发送操作会挂起,直到有接收者接收元素;接收操作也会挂起,直到有发送者发送元素。这种类型适用于需要严格同步的场景。
// 创建一个无缓冲的Channel
val channel = Channel<Int>(Channel.RENDEZVOUS)
5.1.3 无限缓冲的 Channel(Channel.UNLIMITED)
(默认创建)缓冲区可以容纳任意数量的元素,发送操作不会挂起。但需要注意,如果生产者速度远大于消费者速度,可能会导致内存占用过高。
// 创建一个不限制容量的Channel ,下面两个方法是等价的
val channel = Channel<Int>()
// val channel = Channel<Int>(Channel.UNLIMITED))
5.2 Channel底层原理
Channel
底层的核心是队列:
- 无缓冲的 Channel: 使用特殊队列,本身不存储元素,而是协调发送者和接收者的同步;
- 有缓冲的 Channel(指定容量):使用普通的
ArrayDeque
- 无限缓冲的 Channel(Channel.UNLIMITED) : 使用无界队列,如
LinkedList
;
6 参考
- kotlin 协程