1、filter 系列操作符
filter 系列操作符用于元素过滤。
filter 操作符会将符合 predicate 谓词条件的数据再次发射出去,过滤掉不符合条件的数据:
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->if (predicate(value)) return@transform emit(value)
}
filterNot 与 filter 刚好相反,过滤掉符合 predicate 条件的,留下不符合条件的:
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->if (!predicate(value)) return@transform emit(value)
}
得到的不是原来的 Flow 对象,而是新创建的。示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.filter { it % 2 == 0 }.collect { println("filter: $it") }flow.filterNot { it % 2 == 0 }.collect { println("filterNot: $it") }}job.join()
}
运行结果:
filter: 2
filter: 4
filterNot: 1
filterNot: 3
filterNot: 5
上述例子中 flow 的类型是 Flow<Int>
,但如果 Flow 中存在空元素,flow 的类型就变为 Flow<Int?>
,此时再使用原来的过滤方式 it % 2
就会出现 NPE。此时有如下几种处理方式:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, null, 3, null, 4, 5)val job = scope.launch {// 1.先手动进行 null 判断,再进行正常的逻辑判断flow.filter { it != null && it % 2 == 0 }.collect { println("filter: $it") }// 2.使用安全调用符 ?,% 实际上就是调用了 rem()flow.filterNot { it?.rem(2) == 0 }.collect { println("filterNot: $it") }// 3.使用 filterNotNull() 先过滤掉 nullflow.filterNotNull().filter { it % 2 == 0 }.collect { println("filterNotNull: $it") }}job.join()
}
运行结果:
filter: 2
filter: 4
filterNot: 1
filterNot: null
filterNot: 3
filterNot: null
filterNot: 5
filterNotNull: 2
filterNotNull: 4
可以看到使用 filterNotNull() 先过滤掉 null 还是比较好用的。
最后还有一个 filterIsInstance(),给出指定的类型,把符合这个类型的元素留下来:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, "Test", 3, "Name")val job = scope.launch {flow.filterIsInstance<String>().collect { println("filterIsInstance1: $it") }flow.filterIsInstance(Int::class).collect { println("filterIsInstance2: $it") }}job.join()
}
运行结果:
filterIsInstance1: Test
filterIsInstance1: Name
filterIsInstance2: 1
filterIsInstance2: 2
filterIsInstance2: 3
filterIsInstance() 有两种形式:
@Suppress("UNCHECKED_CAST")
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>public fun <R : Any> Flow<*>.filterIsInstance(klass: KClass<R>): Flow<R> = filter { klass.isInstance(it) } as Flow<R>
无参使用泛型的版本有一个潜在的坑,就是 <reified R>
的作用范围直到 R 这个类型本身,对于集合类型,如 List<String>
,它只能确定到最外层的 List,内层的类型是无法区分的:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, "Test", 3, "Name", listOf("A", "B"), listOf(1, 2))val job = scope.launch {// 前两种方式都只能区分到外层的 List,无法区分 List<String> 和 List<Int>flow.filterIsInstance<List<String>>().collect { println("filterIsInstance1: $it") }flow.filterIsInstance(List::class).collect { println("filterIsInstance2: $it") }// 先确定外层的类型是 List,然后取出 List 中的第一个元素,判断其类型是 String 才满足过滤条件flow.filter { it is List<*> && it.firstOrNull()?.let { item -> item is String } == true }.collect { println("filterIsInstance3: $it") }}job.join()
}
运行结果:
filterIsInstance1: [A, B]
filterIsInstance1: [1, 2]
filterIsInstance2: [A, B]
filterIsInstance2: [1, 2]
filterIsInstance3: [A, B]
只有像第三种方式那样,把集合内的元素单独拿出来判断其类型才可以。
2、distinctUntilChanged 系列
去重,如果连续发送多条相同的数据,则新的数据不会发送:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 2, 3, 3, 3, 1)val job = scope.launch {// distinctUntilChanged 判断相等用的是 ==,也就是 equalsflow.distinctUntilChanged().collect {println("Collect: $it")}}job.join()
}
运行结果:
Collect: 1
Collect: 2
Collect: 3
Collect: 1
distinctUntilChanged() 默认使用 equals() 作为判断元素是否相等的依据,也可以自己传入判断依据:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf("Test", "test", "TEST")val job = scope.launch {// distinctUntilChanged 判断相等用的是 ==,也就是 equalsflow.distinctUntilChanged().collect {println("Collect1: $it")}flow.distinctUntilChanged { old, new -> old.uppercase() == new.uppercase() }.collect { println("Collect2: $it") }}job.join()
}
运行结果:
Collect1: Test
Collect1: test
Collect1: TEST
Collect2: Test
此外还有一个 distinctUntilChangedBy(),它会根据传入的规则生成 key,然后用 ==,也就是 equals() 对 key 进行判断,如果 key 相等就会被过滤掉:
public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> =distinctUntilChangedBy(keySelector = keySelector, areEquivalent = defaultAreEquivalent)
示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf("Test", "test", "TEST")val job = scope.launch {flow.distinctUntilChangedBy { it.uppercase() }.collect {println("Collect: $it")}}job.join()
}
运行结果:
Collect: Test
3、自定义操作符
Flow 的操作符其实就是用一个现成的 Flow 对象根据规则生成新的 Flow 对象的函数。因此,我们能确定,操作符函数的接收者类型、返回值类型都是 Flow,它可以有如下基本形式:
// 泛型的个数与具体的摆放位置需要根据函数功能做出调整
// flow 创建 Flow 对象也可以根据需要换成 channelFlow
fun <T> Flow<T>.customOperator(): Flow<T> = flow {}
在确定基本形式后,再根据具体需求填充函数体即可。比如说,想要将上游收集到的数据原封不动地发送给下游:
fun <T> Flow<T>.customOperator(): Flow<T> = flow {
// this@customOperator.collect { emit(it) }collect { emit(it) }
}
测试代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3)val job = scope.launch {flow.customOperator().collect {println("Collect: $it")}}job.join()
}
运行结果:
Collect: 1
Collect: 2
Collect: 3
代码解读:
-
customOperator() 的接收者,实际上就是该函数的上游 Flow,也就是测试代码中的 flow 对象
-
customOperator() 需要先从调用上游收集数据的函数 collect() 拿到上游的数据,每拿到一个数据,就通过 emit() 发送给下游
-
customOperator() 的大括号内有双重环境,即两个 this,第一个是接收者 Flow 的 this,第二个是 flow() 的参数类型 FlowCollector 提供的 this:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
而 collect() 的参数正是 FlowCollector,所以 collect() 内的 this 就是 FlowCollector,可以直接调用 FlowCollector 额 emit():
fun <T> Flow<T>.customOperator(): Flow<T> = flow { // this:FlowCollector // this@customOperator.collect { emit(it) }// 上游的 this 调用 collect,下游的 this 调用 emit,这样就连接了上下游collect { emit(it) } }
由于两个 this 只有 Flow 有 collect(),因此可以省去
this@customOperator
,直接简写为collect {}
再举一个例子,自定义 double 操作符将上游数据乘以 2 发给下游:
fun Flow<Int>.double(): Flow<Int> = flow {collect {emit(it * 2)}
}
4、时间相关操作符
4.1 timeout 操作符
timeout() 可以用于控制两条数据之间的间隔时长。如果流中的上一条数据发送完毕后,在 timeout() 指定的时间内没有发送出下一条数据,timeout() 就会抛出 TimeoutCancellationException。
前面我们多次强调过,不是抛出异常就意味着功能不能用了。对于这种可以预见到的异常,通过合适的处理方式,是可以实现正常功能的。只有哪种预料之外的异常,才会导致不可预知的结果,影响应用的表现。
对于 timeout() 在超时后会抛出 TimeoutCancellationException 这一点,可以实现类似微信中提示对方正在打字的功能:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow<Any> { /* 获取对方的输入状态 */ }val job = scope.launch {try {flow.timeout(5.seconds).collect {/* 通知对方正在输入 */ }} catch (e: TimeoutCancellationException) {/* 关闭提示 */}}job.join()
}
TimeoutCancellationException 是 CancellationException 的子类,如果这个异常没有被捕获,或导致 Flow 所在的协程被取消:
@OptIn(FlowPreview::class)
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)emit(2)delay(2000)emit(3)}val job = scope.launch {flow.timeout(2.seconds).collect { println("Collect: $it") }}job.join()
}
下游只接收到 1 和 2,然后 runBlocking() 就运行完成:
Collect: 1
Collect: 2Process finished with exit code 0
下游没有收到 3,说明 Flow 所在协程被取消了。
使用 timeout() 时要注意,由于其被标记了 @FlowPreview 注解,表示其是一个 Flow 的预览特性,后续可能会发生不向后兼容的更改,所以使用时可以使用 @OptIn(FlowPreview::class)
来进行标记。
4.2 sample 操作符
sample() 让你设置一个时间窗口,从 collect() 被调用开始,每隔一个窗口时间就进行一次取样,仅保留窗口中最新发送出来的那条数据:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(0)delay(500)emit(1)delay(800)emit(2)delay(900)emit(3)delay(1000)emit(4)}val job = scope.launch {flow.sample(1.seconds).collect { println("sample collect: $it") }}job.join()
}
运行结果:
sample collect: 1
sample collect: 2
sample collect: 3
sample() 设置间隔为 1 秒,那么第一秒内,上游发射了 0 和 1,sample() 就只保留这一秒内最新的 1。第二秒和第三秒内分别只发射了 2 和 3,因此它们直接被保留。最后发射的 4,由于发射后流就结束了,没能等到 sample() 进行第四秒的取样,所以 4 不会被保留。
sample() 适用于定时刷新的场景,只在固定时间点刷新,在刷新点之前发送的所有数据,只取最新的就够了。
4.3 debounce 操作符
防抖,在短时间内连续发生事件时,只保留一个有效事件。
具体到 Flow 的 debounce() 来说,它的效果是对于每条数据,到了 debounce() 这里先压住,不发送,在 debounce() 设置的等待时长内,如果有新的数据到达,那么等待中的数据会被丢弃,转而压住新到来的数据开启新一轮的等待。以此类推,直到某一次等待超时依然没有新数据到来,被压住的数据才会被发送出去。
不仔细看的话,似乎这个操作符可以用在 Android UI 的按钮上进行防抖,但实际上不行。在 UI 上点击按钮需要在首次点击时就触发点击事件,使用 debounce() 会在最后一次点击且等到超时后才将数据放行,这会造成响应延迟,给用户不好的使用体验。
这种 UI 上的防抖,在 RxJava 中可以使用 throttleFirst 操作符。但在 Flow 中没有现成的操作符,需要自己实现:
fun <T> Flow<T>.throttleFirst(timeWindow: Duration): Flow<T> = flow {var lastTime = 0Lcollect {// 大于窗口时间的数据才能发射,在窗口时间内的数据不发射。这样通常是第一个发来的数据会被发送出去if (System.currentTimeMillis() - lastTime > timeWindow.inWholeMilliseconds) {emit(it)lastTime = System.currentTimeMillis()}}
}
debounce() 的典型应用场景是搜索提示。在搜索框中输入文字时,通常会向服务器发送请求给出一些补全的提示项,让用户点一下就能搜索完整内容。一般等个 1 秒中,发现用户不修改了,再去请求服务器数据,而不是用户输入了立即请求,可以节约服务器资源。
5、drop 与 take 操作符
drop 与 take 系列的操作符也属于过滤型的操作符,只不过它们是截断性的过滤:
- drop 会持续丢弃数据,直到遇到指定的数据才向下游转发;dropWhile 也是类似的功能,只不过可以通过 predicate 谓词条件指定丢弃哪些数据
- take 与 drop 相反,它会从开始就持续转发数据,直到遇到指定的数据;takeWhile 也是通过 predicate 条件指定转发哪些数据,直到遇到满足条件的那个数据,结束 Flow
示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {// 从开始就丢数据,一直到把 2 丢完,后续开始转发数据flow.drop(2).collect { println("drop: $it") }// 从开始就丢数据,直到遇到第一个不满足条件的数据开始转发后续所有的flow.dropWhile { it < 3 }.collect { println("dropWhile1: $it") }flow.dropWhile { it != 3 }.collect { println("dropWhile2: $it") }flow.take(2).collect { println("take: $it") }flow.takeWhile { it != 3 }.collect { println("takeWhile: $it") }}job.join()
}
运行结果:
drop: 3
drop: 4
drop: 5
dropWhile1: 3
dropWhile1: 4
dropWhile1: 5
dropWhile2: 3
dropWhile2: 4
dropWhile2: 5
take: 1
take: 2
takeWhile: 1
takeWhile: 2
6、map 系列操作符
map 在数学里是映射的意思,即两个集合里元素一一对应的关系,本质上就是一个函数。作为 Flow 的操作符,map 让我们提供一个算法,让上游过来的数据,转换成另一个数据发送到下游。
示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.map { it + 1 }.collect { println("map: $it") }flow.mapNotNull { if (it == 3) null else it + 1 }.collect { println("mapNotNull: $it") }}job.join()
}
运行结果:
map: 2
map: 3
map: 4
map: 5
map: 6
mapNotNull: 2
mapNotNull: 3
mapNotNull: 5
mapNotNull: 6
mapNotNull() 会将根据其规则计算出来的数据,在转发到下游之前做一次 Null 过滤,因此上面的结果中是没有 mapNotNull: 4
的。mapNotNull() 的效果相当于:
flow.map { if (it == 3) null else it + 1 }.filterNotNull()
flow.filter { it != 3 }.map { it + 1}
此外还有一个异步计算的 map —— mapLatest()。
Flow 的数据都是线性的,或者说是同步的,当 map() 在处理上游过来的一条数据的过程中,它是不会开始生产下一条上游数据的,直到这条处理完,继续往下游发送了,上游的下一条数据才会开始生产。而 mapLatest() 没有这种限制,它是异步的,在下游处理这一条上游数据的过程中,上游依然可以生产下一条数据。并且,如果下一条上游数据来了,它会直接开始处理这条上游的新数据,取消掉正在处理的上一条数据。
示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {delay(100)emit(1)delay(100)emit(2)delay(100)emit(3)}val job = scope.launch {flow.map { it + 1 }.collect { println("map: $it") }flow.mapLatest {delay(120)it + 1}.collect { println("mapLatest: $it") }}job.join()
}
运行结果:
map: 2
map: 3
map: 4
mapLatest: 4
由于上游每隔 100ms 就发送一条数据,而 mapLatest() 内处理数据需要 120ms,导致数据还没处理完,上游就发来新的数据。因此只有最后一条发送的 3 不会被“冲掉”,做完计算后会发送给下游。
本质上,mapLatest() 是一个有了新数据就停止旧数据的转换流程的 map() 变种,应用场景适合数据转换生产过程比较慢,一旦新数据开始生产,旧数据就立即过时、就算生产出来也没用的场景。比如,在搜索框内输入文字,请求服务器给出搜索提示词的过程中,如果在服务器返回结果前,用户输入了新的文字,这时原本的文字已经没用了,需要用新的输入内容请求提示词,用 mapLatest() 就比较合适。
7、transform 系列操作符
transform() 是更加底层的 map(),map() 就使用 transform() 实现的:
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->return@transform emit(transform(value))
}
底层的 API 的易用性通常没有上层 API 好,但是胜在使用起来更加灵活、自由,你可以在 transform() 内根据业务需求随意发挥:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.transform {// transform 内的逻辑可以根据需求任意发挥,但是要注意,由于 this// 是 FlowCollector,所以要使用 emit 发送数据到下游emit(it + 1)}.collect { println("transform: $it") }}job.join()
}
transform() 的变种操作符 transformWhile() 相当于 transform() 与 takeWhile() 的结合,它要求返回一个 Boolean 作为提前结束 Flow 的条件:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.transformWhile {emit(it + 1)it <= 3}.collect { println("transformWhile: $it") }}job.join()
}
运行结果:
transformWhile: 2
transformWhile: 3
transformWhile: 4
transformWhile: 5
注意结果有 4 条数据,这是因为 transformWhile() 内是先发送数据后给出继续 Flow 的判断条件,由于 it = 3 时还满足 it <= 3 这个条件,因此会触发上游发送 4 这条数据,在 transformWhile() 内计算后得到 5,先将其发送,然后才发现不满足继续执行 Flow 的条件。
transform() 还有另一个变种 transformLatest(),它是 mapLatest() 的底层函数,效果与 mapLatest() 类似,不多赘述。
8、其他操作符
8.1 withIndex 操作符
withIndex() 的作用是给 Flow 中每个元素加上序号:
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {var index = 0collect { value ->emit(IndexedValue(checkIndexOverflow(index++), value))}
}
输出的元素类型 IndexedValue 封装了数据的 index 以及数据本身:
public data class IndexedValue<out T>(public val index: Int, public val value: T)
示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.withIndex().collect { println("Collect 1: ${it.index}, ${it.value}") }// 将 data class 的成员直接声明为参数,下面就可以直接用了flow.withIndex().collect { (index, value) -> println("Collect 2: $index, $value") }// collectIndexed 与 withIndex 类似,只不过前者是末端操作符,后者是中间操作符flow.collectIndexed { index, value -> println("Collect 3: $index, $value") }}job.join()
}
8.2 reduce、fold 系列操作符
Flow 中的 reduce() 与 fold() 操作与 Kotlin 标准库中的同名函数作用是类似的,以 reduce() 为例:
public inline fun <S, T : S> Iterable<T>.reduce(operation: (acc: S, T) -> S): S {val iterator = this.iterator()if (!iterator.hasNext()) throw UnsupportedOperationException("Empty collection can't be reduced.")// accumulator 初始值为集合中的第一个元素值var accumulator: S = iterator.next()// 后续,以 accumulator 作为累加初始值,对集合元素做 operation 操作while (iterator.hasNext()) {accumulator = operation(accumulator, iterator.next())}return accumulator
}
比如进行一个累加操作:
val result = listOf(1, 2, 3).reduce { acc, i -> acc + i }
println("result: $result")
运行结果:
result: 6
那么 Flow 版本的 reduce() 也是同样的操作:
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {var accumulator: Any? = NULLcollect { value ->accumulator = if (accumulator !== NULL) {@Suppress("UNCHECKED_CAST")operation(accumulator as S, value)} else {value}}if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")@Suppress("UNCHECKED_CAST")return accumulator as S
}
观察 reduce() 源码可以发现它与此前介绍的其他操作符有一些不同之处:
- reduce() 是挂起函数,此前的操作符都不是(因为 collect() 是一个挂起函数)
- reduce() 是一个末端操作符(terminal operator),因为其内部调用 collect() 启动了收集流程。返回计算结果 S,而不是
Flow<S>
与 reduce() 类似的还有一个 runningReduce(),它是一个中间操作符,将每次进行计算得到的结果发送给下游,而不是像 reduce() 那样只返回最终结果。它不是一个挂起函数:
public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {var accumulator: Any? = NULLcollect { value ->accumulator = if (accumulator === NULL) {value} else {operation(accumulator as T, value)}emit(accumulator as T)}
}
简单使用:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {val result = flow.reduce { accumulator, value -> accumulator + value }println("reduce: $result")flow.runningReduce { accumulator, value -> accumulator + value }.collect { println("runningReduce: $it") }}job.join()
}
运行结果:
reduce: 15
runningReduce: 1
runningReduce: 3
runningReduce: 6
runningReduce: 10
runningReduce: 15
fold() 与 runningFold() 与 reduce() 和 runningReduce() 非常相似,有两个主要区别:
- 在于 fold() 与 runningFold() 可以通过参数提供一个初始值,并且该初始值的数据类型可以与 Flow 的数据类型不同,并且初始值的类型就是返回值的类型
- runningFold() 会先发射一次初始值再进行计算
源码:
public suspend inline fun <T, R> Flow<T>.fold(initial: R,crossinline operation: suspend (acc: R, value: T) -> R
): R {var accumulator = initialcollect { value ->accumulator = operation(accumulator, value)}return accumulator
}public fun <T, R> Flow<T>.runningFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {var accumulator: R = initialemit(accumulator)collect { value ->accumulator = operation(accumulator, value)emit(accumulator)}
}
示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {val result = flow.fold("ha") { accumulator, value -> "$accumulator - $value" }println("fold: $result")flow.runningFold("ha") { accumulator, value -> "$accumulator - $value" }.collect { println("runningFold: $it") }}job.join()
}
运行结果:
fold: ha - 1 - 2 - 3 - 4 - 5
runningFold: ha
runningFold: ha - 1
runningFold: ha - 1 - 2
runningFold: ha - 1 - 2 - 3
runningFold: ha - 1 - 2 - 3 - 4
runningFold: ha - 1 - 2 - 3 - 4 - 5
另外,runningFold() 还有一个方便的别名 scan():
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = runningFold(initial, operation)
reduce 与 fold 系列操作符,在 Iterable 中都有对应的同名函数,功能也都是类似的。
8.3 onEach 操作符
onEach() 允许你提供一个代码块,在每条数据到来的时候,执行该代码块再把新的数据向下发送。它是一个数据监听器,不会修改 Flow,而是生成新的 Flow:
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->action(value)return@transform emit(value)
}
onEach() 可以用作数据监听器,示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.onEach {println("onEach: $it")}.onEach {it * 2}.collect {println("result: $it")}}job.join()
}
运行结果:
onEach: 1
result: 1
onEach: 2
result: 2
onEach: 3
result: 3
onEach: 4
result: 4
onEach: 5
result: 5
8.4 chunked 操作符
chunked 操作符用于将 Flow 中的元素分组成指定大小的块,即将 Flow<T>
类型的流处理成 Flow<List<T>>
的流:
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> {require(size >= 1) { "Expected positive chunk size, but got $size" }return flow {var result: ArrayList<T>? = null // Do not preallocate anythingcollect { value ->// Allocate if neededval acc = result ?: ArrayList<T>(size).also { result = it }acc.add(value)if (acc.size == size) {emit(acc)// Cleanup, but don't allocate -- it might've been the case this is the last elementresult = null}}result?.let { emit(it) }}
}
示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.chunked(2).collect { println("chunked: $it") }}job.join()
}
运行结果:
chunked: [1, 2]
chunked: [3, 4]
chunked: [5]
chunked 操作符是在 Kotlin 2.1.0 版本中新增的操作符,并且是 @ExperimentalCoroutinesApi,添加该操作符的修改可以参考 Kotlin 在 GitHub 上开源的 commit:c9c735a。