1 前言
上一章节我们学习了Kotlin的协程【Kotlin精简】第8章 协程,我们知道 协程实质是对线程切换的封装,能更加安全实现异步代码同步化
,本质上协程、线程都是服务于并发场景下,其中协程是协作式任务,线程是抢占式任务。默认协程用来处理实时性不高的数据,请求到结果后整个协程就结束了,即它是一锤子买卖。
本章节我们来学习一下依赖Kotlin协程
实现的Flow数据流
。
2 Flow简介
2.1 Flow是什么
Flow
是google
官方提供的一套基于Kotlin协程
的响应式编程模型
,它与RxJava
的使用类似,但相比之下Flow
使用起来更简单,另外Flow
作用在协程
内,可以与协程的生命周期绑定,当协程取消时,Flow
也会被取消,避免了内存泄漏风险。
Flow
是Kotlin
提供的一个工具,使用协程封装成生产者-消费者模式
。
上游来负责生产、中介进行数据加工(可选)、下游来接收消耗。
官方对数据流三个成员的定义:
- 上游 - 提供方(生产者):会生成添加到数据流中的数据。通过协程,数据流还可以异步生成数据。
- 中介 - 数据加工 (可选):修改发送到数据流的值,或修正数据流本身。
- 下游 - 使用方(消费者):使用或接收数据流中的值。
在Flow
数据流中,api使用emit()生产
,collect()消费
2.2 Flow 特性
flow{}
构建块中的代码可以使用挂起函数
Flow
构建器函数可以不用supend
修饰符
流的每次单独收集都是按顺序执行的,除非使用特殊操作符
Flow
是一种类似序列的冷流,flow
构建器中代码直到流被收集的时候才运行
2.3 冷流热流
flow{}
会创建一个数据流,并且这个数据流默认是冷流。下面是冷流和热流的区别:
冷流
:当执行订阅的时候,上游发布者才开始发射数据流。订阅者与发布者是一一对应的关系,即当存在多个订阅者时,每个新的订阅者都会重新收到完整的数据。主动需要即是主动收集才会提供发射数据,即有消费collect
才会触发热流
:不管是否被订阅,上游发布者都会发送数据流到内存中。订阅者与发布者是一对多的关系,当上游发送数据时,多个订阅者都会收到消息。不管你需不需要一上来数据全都发射给你,不管是否消费collect
都会触发
3 Flow使用
Flow官方文档可以参考一下,我们这里简单介绍一些Flow常用的流创建方式、操作符等。
Flow流使用步骤:
- 创建流:
flow { ... }
、flowOf{ ... }
- 使用操作符修改、加工流数据
- 发射流:
collect
3.1 创建流
flow
用于创建从顺序调用到发出函数的任意流。flowOf()
函数根据一组固定的值创建流。asFlow()
扩展函数可以将各种类型的函数转换为流。channelFlow
创建从潜在并发调用到send函数的任意流。MutableStateFlow
和MutableSharedFlow
创建可直接更新的热流。
suspend fun main() {//1.不需要用挂起函数修饰符flow {for (i in 1..3) {delay(1000)//可以使用挂起函数emit(i)//发射元素}}.collect {println("yvan flow:${it}")}
// yvan flow:1
// yvan flow:2
// yvan flow:3// 2.flowOf 不需要挂起函数修饰符 flowOf自动实现发射元素flowOf(1, 2, 3, 4, 5).collect {println("yvan flowOf:${it}")}
// yvan flowOf:1
// yvan flowOf:2
// yvan flowOf:3
// yvan flowOf:4
// yvan flowOf:5// 3.asFlow 不需要挂起函数修饰符 flowOf自动实现发射元素(5..10).asFlow().collect {println("yvan asFlow:${it}")}
// yvan asFlow:5
// yvan asFlow:6
// yvan asFlow:7
// yvan asFlow:8
// yvan asFlow:9
// yvan asFlow:10// 4.从潜在并发调用到send函数的任意流channelFlow {for (i in 1..3) {delay(1000)//可以使用挂起函数send(i)//发射元素}}.collect{println("yvan channelFlow:${it}")}
// yvan channelFlow:1
// yvan channelFlow:2
// yvan channelFlow:3// 5.热流SharedFlow的创建runBlocking {val sharedFlow = MutableSharedFlow<Int>(// 相当于粘性次数replay = 2,// 接受得慢时候,发送入栈extraBufferCapacity = 1,onBufferOverflow = BufferOverflow.SUSPEND)launch {sharedFlow.collect {println("yvan MutableSharedFlow:${it}")}}sharedFlow.emit(1)sharedFlow.emit(2)}
// yvan MutableSharedFlow:1
// yvan MutableSharedFlow:2// 6.热流StateFlow的创建runBlocking {val stateFlow = MutableStateFlow(0)launch {stateFlow.collect {println("yvan MutableStateFlow:${it}")}}stateFlow.emit(1)stateFlow.emit(2)}
// yvan MutableStateFlow:2
}
3.2 常用操作符
除上面3.1创建流前面三种创建操作符外,还有回调操作符、变换操作符、过滤操作符、组合操作符、功能性操作符、末端操作符等
3.2.1 创建操作符
上面3.1有介绍
flow
:创建Flow
的操作符。flowOf
:构造一组数据的Flow
进行发送。asFlow
:将其他数据转换成Flow
,一般是集合向Flow
的转换,如listOf(1,2,3).asFlow()
。callbackFlow
:将基于回调的 API 转换为Flow
数据流
3.2.2 回调操作符
我们先来看一个简单的例子
flow {println("yvan start emit hello")emit("hello") //发送数据println("yvan start emit world")emit("world") //发送数据println("yvan end emit")}.flowOn(Dispatchers.IO).onEmpty { println("yvan onEmpty") }.onStart { println("yvan onStart") }.onEach { println("yvan onEach: $it") }.onCompletion { println("yvan onCompletion") }.catch { exception -> exception.message?.let { println("yvan catch exception:$it") } }.collect {//接收数据流println("yvan collect: $it")}
// yvan onStart
// yvan start emit hello
// yvan onEach: hello
// yvan collect: hello
// yvan start emit world
// yvan onEach: world
// yvan collect: world
// yvan end emit
// yvan onCompletion
onStart
:上游flow{}
开始发送数据之前执行onEach
:上游向下游发送数据之前调用,每一个上游数据发送后都会经过onEach()
onEmpty
:当流完成却没有发出任何元素时执行。如emptyFlow().onEmpty {}
onCompletion
:flow
数据流取消或者结束时执行onSubscription
:SharedFlow
专用操作符,建立订阅之后回调。和onStart
的区别:因为SharedFlow
是热流,因此如果在onStart
发送数据,下游可能接收不到,因为提前执行了。
3.2.3 变换操作符
map
:对上游发送的数据进行变换,collect最后接收的是变换之后的值
flow {//发送数据emit("hello")}.map {"$it world"}.collect {//接收数据流println("yvan collect: $it")}// 输出:yvan collect: hello world
mapLatest
:类似于collectLatest,当emit发送新值,会取消掉map上一次转换还未完成的值。
flow {//发送数据repeat(10){delay(10)emit(it)}}.mapLatest {delay(15)it}.collect {//接收数据流println("yvan collect: $it")}//输出:yvan collect: 9
mapNotNull
:仅发送map之后不为空的值。
flow {//发送数据repeat(10){delay(10)emit(it)}}.mapNotNull {delay(15)if (it > 5){it}else{null}}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 6//yvan collect: 7//yvan collect: 8//yvan collect: 9
transform
:对发出的值进行变换 。不同于map的是,经过transform之后可以重新发送数据,甚至发送多个数据,因为transform内部又重新构建了flow。
flow {//发送数据repeat(10){delay(10)emit(it)}}.transform {delay(100)emit(it*10)}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 0//yvan collect: 10//yvan collect: 20//yvan collect: 30//yvan collect: 40//yvan collect: 50//yvan collect: 60//yvan collect: 70//yvan collect: 80//yvan collect: 90
transformLatest
:类似于mapLatest,当有新值发送时,会取消掉之前还未转换完成的值。
flow {//发送数据repeat(10){delay(10)emit(it)}}.transformLatest {delay(100)emit(it*10)}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 90
transformWhile
:返回值是一个Boolean,当为true时会继续往下执行;反之为false,本次发送的流程会中断。
flow {//发送数据repeat(10){delay(10)emit(it)}}.transformWhile {emit(it)it != 2}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 0//yvan collect: 1//yvan collect: 2
asSharedFlow
:MutableStateFlow 转换为 StateFlow ,即从可变状态变成不可变状态。asStateFlow
:MutableSharedFlow 转换为 SharedFlow ,即从可变状态变成不可变状态。receiveAsFlow
:Channel 转换为Flow ,上游与下游是一对一的关系。如果有多个下游观察者,可能会轮流收到值。consumeAsFlow
:Channel 转换为Flow ,有多个下游观察者时会crash。withIndex
:将数据包装成IndexedValue类型,内部包含了当前数据的Index。scan(initial: R, operation: suspend (accumulator: R, value: T) -> R)
:把initial初始值和每一步的操作结果发送出去。produceIn
:转换为Channel的 ReceiveChannelrunningFold(initial, operation: (accumulator: R, value: T) -> R)
:initial值与前面的流共同计算后返回一个新流,将每步的结果发送出去。runningReduce
:返回一个新流,将每步的结果发送出去,默认没有initial值。shareIn
:flow 转化为热流SharedFlow,后面会详细介绍。stateIn
:flow转化为热流StateFlow,后面会详细介绍。
3.2.4 过滤操作符
filter
:筛选符合条件的值,返回true继续往下执行。
flow {//发送数据repeat(10){delay(10)emit(it)}}.filter {it < 2}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 0//yvan collect: 1
filterNot
:与filter相反,筛选不符合条件的值,返回false继续往下执行。filterNotNull
:筛选不为空的值。
flow {//发送数据emit(1)emit(null)emit(2)}.filterNotNull().collect {//接收数据流println("yvan collect: $it")}// 输出://yvan collect: 1//yvan collect: 2
filterInstance
:筛选对应类型的值,如.filterIsInstance()用来过滤String类型的值
flow {//发送数据emit("1")emit(null)emit(2)}.filterIsInstance<String>().collect {//接收数据流println("yvan collect: $it")}// 输出:// yvan collect: 1
drop
:drop(count: Int)参数为Int类型,意为丢弃掉前count个值。
flow {//发送数据emit("1")emit(null)emit(2)}.drop(1).collect {//接收数据流println("yvan collect: $it")}// 输出:// yvan collect: null// yvan collect: 2
dropWhile
:找到第一个不满足条件的值,返回其和其后所有的值。
flow {//发送数据emit("1")emit(null)emit(2)}.dropWhile { it != null }.collect {//接收数据流println("yvan collect: $it")}// 输出:// yvan collect: null// yvan collect: 2
take
:与drop()相反,意为取前n个值。takeWhile
:与dropWhile()相反,找到第一个不满足条件的值,返回其前面所有的值。debounce
:debounce(timeoutMillis: Long)指定时间内只接收最新的值,其他的过滤掉。防抖动,指定数据接收的间隔时间。
flow {//发送数据emit(1)delay(900)emit(2)delay(800)emit(3)delay(1000)emit(4)delay(700)emit(5)}.debounce(1000).collect {//接收数据流println("yvan collect: $it")}// 由于1、2、4的时间间隔小于1000所以被过滤,3时间1000,5是最后一个,所以输出是3和5//输出:// yvan collect: 3// yvan collect: 5
sample
:sample(periodMillis: Long)在指定周期内,获取最新发出的值,定时周期接收。如:
flow {//发送数据emit(1)delay(50)emit(2)delay(100)emit(3)delay(200)emit(4)delay(50)emit(5)}.sample(200).collect {//接收数据流println("yvan collect: $it")}// 输出:// yvan collect: 3// yvan collect: 4
distinctUntilChangedBy
:判断两个连续值是否重复,可以设置是否丢弃重复值。
flowOf(1, 2, 1, 2).distinctUntilChanged().collect {println("yvan distinctUntilChangedBy = $it")}//输出://yvan distinctUntilChangedBy = 1//yvan distinctUntilChangedBy = 2
distinctUntilChanged
:若连续两个值相同,则跳过后面的值。
flowOf(1, 1, 2, 2).distinctUntilChanged().collect {println("yvan distinctUntilChanged = $it")}//输出://yvan distinctUntilChanged = 1//yvan distinctUntilChanged = 2//yvan distinctUntilChanged = 1//yvan distinctUntilChanged = 2
3.2.5 组合操作符
combine
:组合两个Flow流最新发出的数据,直到两个流都结束为止。扩展:在kotlinx-coroutines-core-jvm中的FlowKt中,可以将更多的flow结合起来返回一个Flow,典型应用场景:多个筛选条件选中后,展示符合条件的数据。如果后续某个筛选条件发生了改变,只需要通过发生改变的Flow的flow.value = newValue重新发送,combine就会自动构建出新的Flow,这样UI层会接收到新的变化条件进行刷新即可。
val flow1 = flow {emit("A")delay(100)emit("B")}val flow2 = flow {emit(1)delay(50)emit(2)}flow1.combine(flow2){ it1,it2->return@combine it1 to it2}.collect{println("yvan $it")}//输出://yvan (A, 1)//yvan (A, 2)//yvan (B, 2)
combineTransform
: combine + transform操作merge
:listOf(flow1, flow2).merge(),多个流合并为一个流。zip是提供高级函数组合流的数据,而merage则是之间将两个流在一个collect里输出
val flow1 = flowOf("A", "B", "C")val flow2 = flowOf(1, 2, 3, 4)merge(flow1, flow2).collect {println("yvan $it")}//输出://yvan A//yvan B//yvan C//yvan 1//yvan 2//yvan 3//yvan 4
flattenConcat
:以顺序方式将给定的流展开为单个流 。示例如下:
flow {emit(flowOf(1,2))emit(flowOf(3,4))} .flattenConcat().collect {println("yvan $it")}//输出://yvan 1//yvan 2//yvan 3//yvan 4
flattenMerge
:作用和 flattenConcat 一样,将多个flow流按并发后输出,但是可以设置并发收集流的数量。
flow{emit(flowOf(1,2,3))emit(flowOf("A","B","C"))}.flattenMerge().collect{println("yvan $it")}//输出://yvan 1//yvan A//yvan B//yvan C//yvan 2//yvan 3
flatMapContact
:相当于 map + flattenConcat , 通过 map 转成一个流,在通过 flattenConcat发送。flatMapLatest
:当有新值发送时,会取消掉之前还未转换完成的值。flatMapMerge
:相当于map + flattenMerge ,参数concurrency: Int 来限制并发数。zip
:组合两个Flow流最新发出的数据,上游流在同一协程中顺序收集,没有任何缓冲。不同于combine的是,当其中一个流结束时,另外的Flow也会调用cancel,生成的流完成。
val flow = flowOf(1, 2, 3).onEach { delay(50) }val flow2 = flowOf("a", "b", "c", "d").onEach { delay(150) }val startTime = System.currentTimeMillis() // 记录开始的时间flow.zip(flow2) { i, s -> i.toString() + s }.collect {// Will print "1a 2b 3c"println("yvan $it 耗时 ${System.currentTimeMillis() - startTime} ms")}//输出(flow已经执行完,所以flow2中的d被cancel了)://yvan 1a 耗时 156 ms//yvan 2b 耗时 307 ms//yvan 3c 耗时 459 ms//如果换做combine,执行结果如下(组合的是最新发出的数据)://yvan 2a 耗时 156 ms//yvan 3a 耗时 159 ms//yvan 3b 耗时 311 ms//yvan 3c 耗时 466 ms//yvan 3d 耗时 620 ms//注:上面combine多次执行的结果可能不一致,但每次组合的是最新发出的数据
3.2.6 功能性操作符
cancellable
:判断当前协程是否被取消 ,如果已取消,则抛出异常catch
:对此操作符之前的流发生的异常进行捕获,对此操作符之后的流无影响。当发生异常时,默认collect{}中lambda将不会再执行。当然,可以自行通过emit()继续发送。retry
:流发生异常时的重试机制。如果是无限重试,直接调用retry()默认方法即可,retry()最终调用的也是retryWhen()方法。
public fun <T> Flow<T>.retry(retries: Int = Int.MAX_VALUE, //指定重试次数predicate: (Throwable) -> Boolean = { true } //返回true且满足retries次数要求,继续重试;false停止重试
): Flow<T> {require(retries > 0) { "Expected positive amount of retries, but had $retries" }return retryWhen { cause, attempt -> predicate(cause) && attempt < retries }
}
retryWhen
:流发生异常时的重试机制。
public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> = { ...... }
有条件的进行重试 ,lambda 中有两个参数: cause是 异常原因,attempt是当前重试的位置,lambda返回true时继续重试; 反之停止重试。
buffer
:流执行总时间就是所有运算符执行时间之和。如果上下游运算符都比较耗时,可以考虑使用buffer()优化,该运算符会在执行期间为流创建一个单独的协程。
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {......}
flowOf("A", "B", "C").onEach { println("yvan 1$it") }.buffer() // <--------------- buffer between onEach and collect.collect { println("yvan 2$it") }// 输出:// yvan 1A// yvan 1B// yvan 1C// yvan 2A// yvan 2B// yvan 2C
上述代码将在两个协程中执行,其中buffer()以上还是在协程P中执行,而buffer()下面的collect()会在协程Q中执行,数据通过Channel进行传递,从而减少了执行的总时间。
conflate
:仅保留最新值, 内部实现是 buffer(CONFLATED)
flowOf("A", "B", "C").onEach { println("yvan 1$it") }.conflate() // <--------------- buffer between onEach and collect.collect { println("yvan 2$it") }// 输出:// yvan 1A// yvan 1B// yvan 1C// yvan 2C
flowOn
:flowOn 会更改上游数据流的 CoroutineContext,且只会影响flowOn之前(或之上)的任何中间运算符。下游数据流(晚于 flowOn 的中间运算符和使用方)不会受到影响。如果有多个 flowOn 运算符,每个运算符都会更改当前位置的上游数据流。
flow {//这里的线程应该是跟随创建线程println("yvan 1 thread:${Thread.currentThread().name}")emit("flowOnTest")}.flowOn(Dispatchers.IO).map {println("yvan 2 thread:${Thread.currentThread().name}")return@map it}.flowOn(Dispatchers.Default).collect {println("yvan 3 thread:${Thread.currentThread().name} 结果:$it ")}//输出://yvan 1 thread:DefaultDispatcher-worker-2//yvan 2 thread:DefaultDispatcher-worker-1//yvan 3 thread:DefaultDispatcher-worker-1 结果:flowOnTest
3.2.7 末端操作符
collect
:数据收集操作符,默认的flow是冷流
,即当执行collect时,上游才会被触发执行。collectIndexed
:带下标的收集操作,如collectIndexed{ index, value -> }。collectLatest
:与collect的区别:当新值从上游发出时,如果上个收集还未完成,会取消上个值得收集操作。toCollection、toList、toSet
:将flow{}结果转化为集合。
注:还有很多操作符没有列出来~
3.3 发射流
发射流即上面3.2.7的末端操作符collect
4 热流
4.1 SharedFlow
我们知道flow{}
构建的是冷流
,而SharedFlow(共享Flow)
默认是热流
,发送器与收集器是一对多的关系。
public fun <T> MutableSharedFlow(// replay:重播给新订阅者时缓存数据的个数,默认是0。// 当新订阅者collect时,会先尝试获取上游replay个数据,为0时则不会获取之前的数据。// replay缓存是针对后续所有的订阅者准备的。replay: Int = 0,// extraBufferCapacity:除了replay外,缓冲值的数量。// 当有剩余的缓冲区空间时,Emit不挂起(可选,不能为负,默认为零) 。// extraBufferCapacity是为上游快速发射器及下游慢速收集器这种场景提供缓冲的,// 有点类似于线程池中的存储队列。// 要注意,replay保存的是最新值,而extraBufferCapacity保存的是最先发送的一个或多个值。extraBufferCapacity: Int = 0,// 配置缓冲区溢出的操作(可选,默认为SUSPEND,暂停尝试发出值),// 可选值有:SUSPEND-暂停发送、DROP_OLDEST-丢弃队列中最老的、DROP_LATEST-丢弃队列中最新的。onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
上面操作符中提到shareIn,其作用是将普通flow
转化为SharedFlow
public fun <T> Flow<T>.shareIn(// 协程作用域范围scope: CoroutineScope,// 控制共享的开始、结束策略。一共有三种,// Eagerly:马上开始,在scope作用域结束时终止// Lazily:当订阅者出现时开始,在scope作用域结束时终止// WhileSubscribed(stopTimeoutMillis: Long = 0,replayExpirationMillis: Long = Long.MAX_VALUE):// 其中stopTimeoutMillis:表示最后一个订阅者结束订阅与停止上游流的时间差,默认值为0(立即停止上游流),replayExpirationMillis:数据重播的超时时间。started: SharingStarted,// 重播给新订阅者的数量replay: Int = 0
): SharedFlow<T>
举例:
//ViewModel中 普通flow通过shareIn转化为SharedFlowval flowConvertSharedFlow by lazy {flow {emit("1、flow")emit("2、convert")emit("3、SharedFlow")}.shareIn(//协程作用域范围viewModelScope, //立即开始SharingStarted.Eagerly, //重播给新订阅者的数量replay = 3 ).onStart { println("yvan onStart") }}//Activity中mBtnConvertF.setOnClickListener {val builder: StringBuilder = StringBuilder()lifecycleScope.launch {mFlowModel.flowConvertSharedFlow.collect {println(it)builder.append(it).append("\n")mTvConvertF.text = builder.toString()}}}//输出://yvan onStart//1、flow//2、convert//3、SharedFlow
4.2 StateFlow
StateFlow
可以认为是一个replay为1
,且没有缓冲区
的SharedFlow
,所以新订阅者collect
时会先获取一个默认值
,构造函数如下:
//MutableStateFlow构造函数
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)//MutableStateFlow接口继承了MutableSharedFlow接口
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {public override var value: Tpublic fun compareAndSet(expect: T, update: T): Boolean
}
StateFlow
有自动去重的功能,即如果上游连续发送的value
重复时,下游的接收方只会接收第一次的值,后续的重复值不会再接收,可以通过StateFlow.value
获取发送的值。
上面操作符中提到的stateIn
作用是将普通flow
转化为StateFlow
public fun <T> Flow<T>.stateIn(// 协程作用域范围scope: CoroutineScope,// 控制共享的开始、结束策略。一共有三种,// Eagerly:马上开始,在scope作用域结束时终止// Lazily:当订阅者出现时开始,在scope作用域结束时终止// WhileSubscribed(stopTimeoutMillis: Long = 0,replayExpirationMillis: Long = Long.MAX_VALUE):// 其中stopTimeoutMillis:表示最后一个订阅者结束订阅与停止上游流的时间差,默认值为0(立即停止上游流),replayExpirationMillis:数据重播的超时时间。started: SharingStarted,// 默认StateFlow的初始值,会发送到下游initialValue: T
): StateFlow<T> {//这里设置的replay是1 及重播给新订阅者的缓存为1val config = configureSharing(1)......}
使用举例:
//ViewModel中val flowConvertStateFlow by lazy {flow {//转化为StateFlow是 emit()可以是0个或1个 或多个,// 当是多个时,新订阅者collect只会收到最后一个值(replay为1)emit("1、flow convert StateFlow")}.stateIn(//协程作用域范围viewModelScope, //立即开始SharingStarted.Eagerly, // 默认StateFlow的初始值,会发送到下游"0、initialValue" ).onStart { println("yvan onStart") }}//Activity中mBtnConvertSF.setOnClickListener {lifecycleScope.launch {val builder = StringBuilder()mFlowModel.flowConvertStateFlow.collect {println(it)builder.append(it).append("\n")mTvConvertSF.text = builder.toString()}}}//输出://yvan onStart//0、initialValue//1、flow convert StateFlow
注:在UI层使用Lifecycle.repeatOnLifecycle
配合上游的SharingStarted.WhileSubscribed
一块使用是一种更安全、性能更好的流收集方式。
4.3 StateFlow和LiveData的异同点
我们知道通过LiveData
可以让数据被观察,且具备生命周期感知能力,但LiveData
的缺点也很明显:
LiveData
的接收只能在主线程;LiveData
发送数据是一次性买卖,不能多次发送;LiveData
发送数据的线程是固定的,不能切换线程,setValue/postValue
本质上都是在主线程上发送的。当需要来回切换线程时,LiveData
就显得无能为力了。
StateFlow
和 LiveData
具有相似之处。两者都是可观察的数据容器类,并且在应用架构中使用时,两者都遵循相似模式。但两者还是有不同之处的:
StateFlow
需要将初始状态
传递给构造函数,而LiveData
不需要。- 当
View
进入STOPPED
状态时,LiveData.observe()
会自动取消注册使用方,而从StateFlow
或任何其他数据流收集数据的操作并不会自动停止。如需实现相同的行为,需要从Lifecycle.repeatOnLifecycle
块收集数据流。
4.4 StateFlow、SharedFlow 和 Channel对比
Flow
底层使用的Channel机制
实现,StateFlow
、SharedFlow
都是一对多
的关系,如果上游发送者与下游UI层的订阅者是一对一的关系,可以使用Channel
来实现,Channel
默认是粘性的
。
Channel使用场景:一次性消费场景,比如弹窗,需求是在UI层只弹一次,即使App切到后台再切回来,也不会重复订阅(不会多次弹窗);
如果使用SharedFlow/StateFlow
,UI层使用的lifecycle.repeatOnLifecycle
、Flow.flowWithLifecycle
,则在App切换前后台时,UI层会重复订阅,弹窗事件可能会多次执行,不符合要求。
Channel使用特点:
- 每个消息只有一个订阅者可以收到,用于一对一的通信
- 第一个订阅者可以收到
collect
之前的事件,即粘性事件
Channel使用举例:
//viewModel中
private val _loadingChannel = Channel<Boolean>()
val loadingFlow = _loadingChannel.receiveAsFlow()private suspend fun loadStart() {_loadingChannel.send(true)
}private suspend fun loadFinish() {_loadingChannel.send(false)
}//UI层接收Loading信息mViewModel.loadingFlow.flowWithLifecycle2(this, Lifecycle.State.STARTED) { isShow ->mStatusViewUtil.showLoadingView(isShow)}
5 扩展
在新项目或者新需求中,我们可以直接使用协程来替代之前的多线程场景的使用方式,如可以通过withContext(Dispatchers.IO)
在协程中来回切换线程且能在线程执行完毕后自动切回当前线程,避免使用接口回调的方式导致逻辑可读性变差。然而,如果我们是在现有项目中开发或者网络框架就是回调方式使用时,没有办法直接使用协程,但是可以通过suspendCancellableCoroutine
或callbackFlow
将接口回调转化成协程:
suspendCancellableCoroutine
等待单次回调API的结果时挂起协程,并将结果返回给调用者;如果需要返回Flow数据流,可以使用callbackFlow
。
5.1 suspendCancellableCoroutine
使用举例:
//ViewModel中/*** suspendCancellableCoroutine将回调转化为协程使用*/suspend fun suspendCancelableData(): String {return try {getSccInfo()} catch (e: Exception) {"error: ${e.message}"}}/*** suspendCancellableCoroutine将回调转化为协程使用*/private suspend fun getSccInfo(): String = suspendCancellableCoroutine { continuation ->val callback = object : ICallBack {override fun onSuccess(sucStr: String?) {//1、返回结果 将结果赋值给getSccInfo()挂起函数的返回值//2、如果调用了continuation.cancel(),resume()的结果将不会返回了,因为协程取消了continuation.resume(sucStr ?: "empty")}override fun onError(error: Exception) {//这里会将异常抛给上层 需要上层进行处理continuation.resumeWithException(error)}}continuation.invokeOnCancellation {//协程取消时调用,可以在这里进行解注册println("invokeOnCancellation")}//模拟网络请求 此时协程被suspendCancellableCoroutine挂起,直到触发回调Thread {Thread.sleep(500)//模拟Server返回数据callback.onSuccess("getServerInfo")//模拟抛异常//callback.onError(IllegalArgumentException("server error"))}.start()//模拟取消协程//continuation.cancel()}//Activity中mBtnScc.setOnClickListener {lifecycleScope.launch {val result = mFlowModel.suspendCancelableData()println(result)}}//输出:// getServerInfo
suspendCancellableCoroutine
声明了作用域,并且传入一个CancellableContinuation
参数,它可以调用resume
、resumeWithException
来处理对应的成功、失败回调,还可以调用cancel()
方法取消协程的执行(抛出CancellationException
异常,但程序不会崩溃,当然也可以通过catch
抓住该异常进行处理)。
上面例子中,当开始执行时会将suspendCancellableCoroutine
作用域内协程挂起,如果成功返回数据,会回调continuation.resume()
方法将结果返回;如果出现异常,会回调continuation.resumeWithException()
将异常抛到上层。这样整个函数处理完后,上层会从挂起点恢复并继续往下执行。
5.2 callbackFlow
callbackFlow
相对于suspendCancellableCoroutine
,对接口回调封装以后返回的是Flow数据流,后续就可以对数据流进行一系列操作。
callbackFlow
中的几个重要方法:
trySend/offer
:在接口回调中使用,用于上游发射数据,类似于flow{}中的emit(),kotlin 1.5.0以下使用offer,1.5.0以上推荐使用trySend()awaitClose
:写在最后,这是一个挂起函数, 当 flow 被关闭的时候 block 中的代码会被执行 可以在这里取消接口的注册等。
使用举例,比如当前有个场景:去某个地方,需要先对目的地进行搜索,再出发到达目的地,假设搜索、到达目的地两个行为都是使用回调来执行的,我们现在使用callbackFlow
对他们进行修改:
ViewModel中,搜索目的地:
fun getSearchCallbackFlow(): Flow<Boolean> = callbackFlow {val callback = object : ICallBack {override fun onSuccess(sucStr: String?) {//搜索目的地成功trySend(true)}override fun onError(error: Exception) {//搜索目的地失败trySend(false)}}//模拟网络请求Thread {Thread.sleep(500)//模拟Server返回数据callback.onSuccess("getServerInfo")}.start()//这是一个挂起函数, 当 flow 被关闭的时候 block 中的代码会被执行 可以在这里取消接口的注册等awaitClose { println("awaitClose") }
}
ViewModel中,前往目的地:
fun goDesCallbackFlow(isSuc: Boolean): Flow<String?> = callbackFlow {val callback = object : ICallBack {override fun onSuccess(sucStr: String?) {trySend(sucStr)}override fun onError(error: Exception) {trySend(error.message)}}//模拟网络请求Thread {Thread.sleep(500)if (isSuc) {//到达目的地callback.onSuccess("arrive at the destination")} else {//发生了错误callback.onError(IllegalArgumentException("Not at destination"))}}.start()awaitClose { println("awaitClose") }}
Activity中,使用Flow.flatMapConcat对两者进行整合:mBtnCallbackFlow.setOnClickListener {lifecycleScope.launch {//将两个callbackFlow串联起来 先搜索目的地,然后到达目的地mFlowModel.getSearchCallbackFlow().flatMapConcat {mFlowModel.goDesCallbackFlow(it)}.collect {mTvCallbackFlow.text = it ?: "error"}}}//输出:// arrive at the destination
以下结论摘自官网:
与 flow
构建器不同,callbackFlow
允许通过 send
函数从不同 CoroutineContext
发出值,或者通过 offer/trySend
函数在协程外发出值。
在协程内部,callbackFlow
会使用通道,它在概念上与阻塞队列非常相似。通道都有容量配置,限定了可缓冲元素数的上限。在 callbackFlow
中所创建通道的默认容量为 64 个元素。当您尝试向完整通道添加新元素时,send
会将数据提供方挂起,直到新元素有空间为止,而 offer
不会将相关元素添加到通道中,并会立即返回 false
。
4 总结
Flow
是 Kotlin
提供的解决复杂异步场景的方案,Flow
本质是挂起函数
,主要用于构建类似生产者-中介-消费者
模型,可对流进行处理
、过滤
、变换
、组合
、重试
、异常捕获
、线程切换
等,便于开发者进行流的创建
、处理
与消费
。