您的位置:首页 > 财经 > 产业 > 网络营销策划方案内容_龙岩做网站改版找哪家公司_百度搜索推广官网_智慧软文网站

网络营销策划方案内容_龙岩做网站改版找哪家公司_百度搜索推广官网_智慧软文网站

2025/1/7 22:29:26 来源:https://blog.csdn.net/tmacfrank/article/details/144952448  浏览:    关键词:网络营销策划方案内容_龙岩做网站改版找哪家公司_百度搜索推广官网_智慧软文网站
网络营销策划方案内容_龙岩做网站改版找哪家公司_百度搜索推广官网_智慧软文网站

1、filter 系列操作符

filter 系列操作符用于元素过滤。

filter 操作符会将符合 predicate 谓词条件的数据再次发射出去,过滤掉不符合条件的数据:

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->if (predicate(value)) return@transform emit(value)
}

filterNot 与 filter 刚好相反,过滤掉符合 predicate 条件的,留下不符合条件的:

public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->if (!predicate(value)) return@transform emit(value)
}

得到的不是原来的 Flow 对象,而是新创建的。示例代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.filter { it % 2 == 0 }.collect { println("filter: $it") }flow.filterNot { it % 2 == 0 }.collect { println("filterNot: $it") }}job.join()
}

运行结果:

filter: 2
filter: 4
filterNot: 1
filterNot: 3
filterNot: 5

上述例子中 flow 的类型是 Flow<Int>,但如果 Flow 中存在空元素,flow 的类型就变为 Flow<Int?>,此时再使用原来的过滤方式 it % 2就会出现 NPE。此时有如下几种处理方式:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, null, 3, null, 4, 5)val job = scope.launch {// 1.先手动进行 null 判断,再进行正常的逻辑判断flow.filter { it != null && it % 2 == 0 }.collect { println("filter: $it") }// 2.使用安全调用符 ?,% 实际上就是调用了 rem()flow.filterNot { it?.rem(2) == 0 }.collect { println("filterNot: $it") }// 3.使用 filterNotNull() 先过滤掉 nullflow.filterNotNull().filter { it % 2 == 0 }.collect { println("filterNotNull: $it") }}job.join()
}

运行结果:

filter: 2
filter: 4
filterNot: 1
filterNot: null
filterNot: 3
filterNot: null
filterNot: 5
filterNotNull: 2
filterNotNull: 4

可以看到使用 filterNotNull() 先过滤掉 null 还是比较好用的。

最后还有一个 filterIsInstance(),给出指定的类型,把符合这个类型的元素留下来:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, "Test", 3, "Name")val job = scope.launch {flow.filterIsInstance<String>().collect { println("filterIsInstance1: $it") }flow.filterIsInstance(Int::class).collect { println("filterIsInstance2: $it") }}job.join()
}

运行结果:

filterIsInstance1: Test
filterIsInstance1: Name
filterIsInstance2: 1
filterIsInstance2: 2
filterIsInstance2: 3

filterIsInstance() 有两种形式:

@Suppress("UNCHECKED_CAST")
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>public fun <R : Any> Flow<*>.filterIsInstance(klass: KClass<R>): Flow<R> = filter { klass.isInstance(it) } as Flow<R>

无参使用泛型的版本有一个潜在的坑,就是 <reified R> 的作用范围直到 R 这个类型本身,对于集合类型,如 List<String>,它只能确定到最外层的 List,内层的类型是无法区分的:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, "Test", 3, "Name", listOf("A", "B"), listOf(1, 2))val job = scope.launch {// 前两种方式都只能区分到外层的 List,无法区分 List<String> 和 List<Int>flow.filterIsInstance<List<String>>().collect { println("filterIsInstance1: $it") }flow.filterIsInstance(List::class).collect { println("filterIsInstance2: $it") }// 先确定外层的类型是 List,然后取出 List 中的第一个元素,判断其类型是 String 才满足过滤条件flow.filter { it is List<*> && it.firstOrNull()?.let { item -> item is String } == true }.collect { println("filterIsInstance3: $it") }}job.join()
}

运行结果:

filterIsInstance1: [A, B]
filterIsInstance1: [1, 2]
filterIsInstance2: [A, B]
filterIsInstance2: [1, 2]
filterIsInstance3: [A, B]

只有像第三种方式那样,把集合内的元素单独拿出来判断其类型才可以。

2、distinctUntilChanged 系列

去重,如果连续发送多条相同的数据,则新的数据不会发送:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 2, 3, 3, 3, 1)val job = scope.launch {// distinctUntilChanged 判断相等用的是 ==,也就是 equalsflow.distinctUntilChanged().collect {println("Collect: $it")}}job.join()
}

运行结果:

Collect: 1
Collect: 2
Collect: 3
Collect: 1

distinctUntilChanged() 默认使用 equals() 作为判断元素是否相等的依据,也可以自己传入判断依据:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf("Test", "test", "TEST")val job = scope.launch {// distinctUntilChanged 判断相等用的是 ==,也就是 equalsflow.distinctUntilChanged().collect {println("Collect1: $it")}flow.distinctUntilChanged { old, new -> old.uppercase() == new.uppercase() }.collect { println("Collect2: $it") }}job.join()
}

运行结果:

Collect1: Test
Collect1: test
Collect1: TEST
Collect2: Test

此外还有一个 distinctUntilChangedBy(),它会根据传入的规则生成 key,然后用 ==,也就是 equals() 对 key 进行判断,如果 key 相等就会被过滤掉:

public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> =distinctUntilChangedBy(keySelector = keySelector, areEquivalent = defaultAreEquivalent)

示例代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf("Test", "test", "TEST")val job = scope.launch {flow.distinctUntilChangedBy { it.uppercase() }.collect {println("Collect: $it")}}job.join()
}

运行结果:

Collect: Test

3、自定义操作符

Flow 的操作符其实就是用一个现成的 Flow 对象根据规则生成新的 Flow 对象的函数。因此,我们能确定,操作符函数的接收者类型、返回值类型都是 Flow,它可以有如下基本形式:

// 泛型的个数与具体的摆放位置需要根据函数功能做出调整
// flow 创建 Flow 对象也可以根据需要换成 channelFlow
fun <T> Flow<T>.customOperator(): Flow<T> = flow {}

在确定基本形式后,再根据具体需求填充函数体即可。比如说,想要将上游收集到的数据原封不动地发送给下游:

fun <T> Flow<T>.customOperator(): Flow<T> = flow {
//    this@customOperator.collect { emit(it) }collect { emit(it) }
}

测试代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3)val job = scope.launch {flow.customOperator().collect {println("Collect: $it")}}job.join()
}

运行结果:

Collect: 1
Collect: 2
Collect: 3

代码解读:

  • customOperator() 的接收者,实际上就是该函数的上游 Flow,也就是测试代码中的 flow 对象

  • customOperator() 需要先从调用上游收集数据的函数 collect() 拿到上游的数据,每拿到一个数据,就通过 emit() 发送给下游

  • customOperator() 的大括号内有双重环境,即两个 this,第一个是接收者 Flow 的 this,第二个是 flow() 的参数类型 FlowCollector 提供的 this:

    public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
    

    而 collect() 的参数正是 FlowCollector,所以 collect() 内的 this 就是 FlowCollector,可以直接调用 FlowCollector 额 emit():

    fun <T> Flow<T>.customOperator(): Flow<T> = flow { // this:FlowCollector
    //    this@customOperator.collect { emit(it) }// 上游的 this 调用 collect,下游的 this 调用 emit,这样就连接了上下游collect { emit(it) }
    }
    

    由于两个 this 只有 Flow 有 collect(),因此可以省去 this@customOperator,直接简写为 collect {}

再举一个例子,自定义 double 操作符将上游数据乘以 2 发给下游:

fun Flow<Int>.double(): Flow<Int> = flow {collect {emit(it * 2)}
}

4、时间相关操作符

4.1 timeout 操作符

timeout() 可以用于控制两条数据之间的间隔时长。如果流中的上一条数据发送完毕后,在 timeout() 指定的时间内没有发送出下一条数据,timeout() 就会抛出 TimeoutCancellationException。

前面我们多次强调过,不是抛出异常就意味着功能不能用了。对于这种可以预见到的异常,通过合适的处理方式,是可以实现正常功能的。只有哪种预料之外的异常,才会导致不可预知的结果,影响应用的表现。

对于 timeout() 在超时后会抛出 TimeoutCancellationException 这一点,可以实现类似微信中提示对方正在打字的功能:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow<Any> { /* 获取对方的输入状态 */ }val job = scope.launch {try {flow.timeout(5.seconds).collect {/* 通知对方正在输入 */ }} catch (e: TimeoutCancellationException) {/* 关闭提示 */}}job.join()
}

TimeoutCancellationException 是 CancellationException 的子类,如果这个异常没有被捕获,或导致 Flow 所在的协程被取消:

@OptIn(FlowPreview::class)
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)emit(2)delay(2000)emit(3)}val job = scope.launch {flow.timeout(2.seconds).collect { println("Collect: $it") }}job.join()
}

下游只接收到 1 和 2,然后 runBlocking() 就运行完成:

Collect: 1
Collect: 2Process finished with exit code 0

下游没有收到 3,说明 Flow 所在协程被取消了。

使用 timeout() 时要注意,由于其被标记了 @FlowPreview 注解,表示其是一个 Flow 的预览特性,后续可能会发生不向后兼容的更改,所以使用时可以使用 @OptIn(FlowPreview::class) 来进行标记。

4.2 sample 操作符

sample() 让你设置一个时间窗口,从 collect() 被调用开始,每隔一个窗口时间就进行一次取样,仅保留窗口中最新发送出来的那条数据:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(0)delay(500)emit(1)delay(800)emit(2)delay(900)emit(3)delay(1000)emit(4)}val job = scope.launch {flow.sample(1.seconds).collect { println("sample collect: $it") }}job.join()
}

运行结果:

sample collect: 1
sample collect: 2
sample collect: 3

sample() 设置间隔为 1 秒,那么第一秒内,上游发射了 0 和 1,sample() 就只保留这一秒内最新的 1。第二秒和第三秒内分别只发射了 2 和 3,因此它们直接被保留。最后发射的 4,由于发射后流就结束了,没能等到 sample() 进行第四秒的取样,所以 4 不会被保留。

sample() 适用于定时刷新的场景,只在固定时间点刷新,在刷新点之前发送的所有数据,只取最新的就够了。

4.3 debounce 操作符

防抖,在短时间内连续发生事件时,只保留一个有效事件。

具体到 Flow 的 debounce() 来说,它的效果是对于每条数据,到了 debounce() 这里先压住,不发送,在 debounce() 设置的等待时长内,如果有新的数据到达,那么等待中的数据会被丢弃,转而压住新到来的数据开启新一轮的等待。以此类推,直到某一次等待超时依然没有新数据到来,被压住的数据才会被发送出去。

不仔细看的话,似乎这个操作符可以用在 Android UI 的按钮上进行防抖,但实际上不行。在 UI 上点击按钮需要在首次点击时就触发点击事件,使用 debounce() 会在最后一次点击且等到超时后才将数据放行,这会造成响应延迟,给用户不好的使用体验。

这种 UI 上的防抖,在 RxJava 中可以使用 throttleFirst 操作符。但在 Flow 中没有现成的操作符,需要自己实现:

fun <T> Flow<T>.throttleFirst(timeWindow: Duration): Flow<T> = flow {var lastTime = 0Lcollect {// 大于窗口时间的数据才能发射,在窗口时间内的数据不发射。这样通常是第一个发来的数据会被发送出去if (System.currentTimeMillis() - lastTime > timeWindow.inWholeMilliseconds) {emit(it)lastTime = System.currentTimeMillis()}}
}

debounce() 的典型应用场景是搜索提示。在搜索框中输入文字时,通常会向服务器发送请求给出一些补全的提示项,让用户点一下就能搜索完整内容。一般等个 1 秒中,发现用户不修改了,再去请求服务器数据,而不是用户输入了立即请求,可以节约服务器资源。

5、drop 与 take 操作符

drop 与 take 系列的操作符也属于过滤型的操作符,只不过它们是截断性的过滤:

  • drop 会持续丢弃数据,直到遇到指定的数据才向下游转发;dropWhile 也是类似的功能,只不过可以通过 predicate 谓词条件指定丢弃哪些数据
  • take 与 drop 相反,它会从开始就持续转发数据,直到遇到指定的数据;takeWhile 也是通过 predicate 条件指定转发哪些数据,直到遇到满足条件的那个数据,结束 Flow

示例代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {// 从开始就丢数据,一直到把 2 丢完,后续开始转发数据flow.drop(2).collect { println("drop: $it") }// 从开始就丢数据,直到遇到第一个不满足条件的数据开始转发后续所有的flow.dropWhile { it < 3 }.collect { println("dropWhile1: $it") }flow.dropWhile { it != 3 }.collect { println("dropWhile2: $it") }flow.take(2).collect { println("take: $it") }flow.takeWhile { it != 3 }.collect { println("takeWhile: $it") }}job.join()
}

运行结果:

drop: 3
drop: 4
drop: 5
dropWhile1: 3
dropWhile1: 4
dropWhile1: 5
dropWhile2: 3
dropWhile2: 4
dropWhile2: 5
take: 1
take: 2
takeWhile: 1
takeWhile: 2

6、map 系列操作符

map 在数学里是映射的意思,即两个集合里元素一一对应的关系,本质上就是一个函数。作为 Flow 的操作符,map 让我们提供一个算法,让上游过来的数据,转换成另一个数据发送到下游。

示例代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.map { it + 1 }.collect { println("map: $it") }flow.mapNotNull { if (it == 3) null else it + 1 }.collect { println("mapNotNull: $it") }}job.join()
}

运行结果:

map: 2
map: 3
map: 4
map: 5
map: 6
mapNotNull: 2
mapNotNull: 3
mapNotNull: 5
mapNotNull: 6

mapNotNull() 会将根据其规则计算出来的数据,在转发到下游之前做一次 Null 过滤,因此上面的结果中是没有 mapNotNull: 4 的。mapNotNull() 的效果相当于:

flow.map { if (it == 3) null else it + 1 }.filterNotNull()
flow.filter { it != 3 }.map { it + 1}

此外还有一个异步计算的 map —— mapLatest()。

Flow 的数据都是线性的,或者说是同步的,当 map() 在处理上游过来的一条数据的过程中,它是不会开始生产下一条上游数据的,直到这条处理完,继续往下游发送了,上游的下一条数据才会开始生产。而 mapLatest() 没有这种限制,它是异步的,在下游处理这一条上游数据的过程中,上游依然可以生产下一条数据。并且,如果下一条上游数据来了,它会直接开始处理这条上游的新数据,取消掉正在处理的上一条数据。

示例代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {delay(100)emit(1)delay(100)emit(2)delay(100)emit(3)}val job = scope.launch {flow.map { it + 1 }.collect { println("map: $it") }flow.mapLatest {delay(120)it + 1}.collect { println("mapLatest: $it") }}job.join()
}

运行结果:

map: 2
map: 3
map: 4
mapLatest: 4

由于上游每隔 100ms 就发送一条数据,而 mapLatest() 内处理数据需要 120ms,导致数据还没处理完,上游就发来新的数据。因此只有最后一条发送的 3 不会被“冲掉”,做完计算后会发送给下游。

本质上,mapLatest() 是一个有了新数据就停止旧数据的转换流程的 map() 变种,应用场景适合数据转换生产过程比较慢,一旦新数据开始生产,旧数据就立即过时、就算生产出来也没用的场景。比如,在搜索框内输入文字,请求服务器给出搜索提示词的过程中,如果在服务器返回结果前,用户输入了新的文字,这时原本的文字已经没用了,需要用新的输入内容请求提示词,用 mapLatest() 就比较合适。

7、transform 系列操作符

transform() 是更加底层的 map(),map() 就使用 transform() 实现的:

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->return@transform emit(transform(value))
}

底层的 API 的易用性通常没有上层 API 好,但是胜在使用起来更加灵活、自由,你可以在 transform() 内根据业务需求随意发挥:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.transform {// transform 内的逻辑可以根据需求任意发挥,但是要注意,由于 this// 是 FlowCollector,所以要使用 emit 发送数据到下游emit(it + 1)}.collect { println("transform: $it") }}job.join()
}

transform() 的变种操作符 transformWhile() 相当于 transform() 与 takeWhile() 的结合,它要求返回一个 Boolean 作为提前结束 Flow 的条件:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.transformWhile {emit(it + 1)it <= 3}.collect { println("transformWhile: $it") }}job.join()
}

运行结果:

transformWhile: 2
transformWhile: 3
transformWhile: 4
transformWhile: 5

注意结果有 4 条数据,这是因为 transformWhile() 内是先发送数据后给出继续 Flow 的判断条件,由于 it = 3 时还满足 it <= 3 这个条件,因此会触发上游发送 4 这条数据,在 transformWhile() 内计算后得到 5,先将其发送,然后才发现不满足继续执行 Flow 的条件。

transform() 还有另一个变种 transformLatest(),它是 mapLatest() 的底层函数,效果与 mapLatest() 类似,不多赘述。

8、其他操作符

8.1 withIndex 操作符

withIndex() 的作用是给 Flow 中每个元素加上序号:

public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {var index = 0collect { value ->emit(IndexedValue(checkIndexOverflow(index++), value))}
}

输出的元素类型 IndexedValue 封装了数据的 index 以及数据本身:

public data class IndexedValue<out T>(public val index: Int, public val value: T)

示例代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.withIndex().collect { println("Collect 1: ${it.index}, ${it.value}") }// 将 data class 的成员直接声明为参数,下面就可以直接用了flow.withIndex().collect { (index, value) -> println("Collect 2: $index, $value") }// collectIndexed 与 withIndex 类似,只不过前者是末端操作符,后者是中间操作符flow.collectIndexed { index, value -> println("Collect 3: $index, $value") }}job.join()
}

8.2 reduce、fold 系列操作符

Flow 中的 reduce() 与 fold() 操作与 Kotlin 标准库中的同名函数作用是类似的,以 reduce() 为例:

public inline fun <S, T : S> Iterable<T>.reduce(operation: (acc: S, T) -> S): S {val iterator = this.iterator()if (!iterator.hasNext()) throw UnsupportedOperationException("Empty collection can't be reduced.")// accumulator 初始值为集合中的第一个元素值var accumulator: S = iterator.next()// 后续,以 accumulator 作为累加初始值,对集合元素做 operation 操作while (iterator.hasNext()) {accumulator = operation(accumulator, iterator.next())}return accumulator
}

比如进行一个累加操作:

val result = listOf(1, 2, 3).reduce { acc, i -> acc + i }
println("result: $result")

运行结果:

result: 6

那么 Flow 版本的 reduce() 也是同样的操作:

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {var accumulator: Any? = NULLcollect { value ->accumulator = if (accumulator !== NULL) {@Suppress("UNCHECKED_CAST")operation(accumulator as S, value)} else {value}}if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")@Suppress("UNCHECKED_CAST")return accumulator as S
}

观察 reduce() 源码可以发现它与此前介绍的其他操作符有一些不同之处:

  • reduce() 是挂起函数,此前的操作符都不是(因为 collect() 是一个挂起函数)
  • reduce() 是一个末端操作符(terminal operator),因为其内部调用 collect() 启动了收集流程。返回计算结果 S,而不是 Flow<S>

与 reduce() 类似的还有一个 runningReduce(),它是一个中间操作符,将每次进行计算得到的结果发送给下游,而不是像 reduce() 那样只返回最终结果。它不是一个挂起函数:

public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {var accumulator: Any? = NULLcollect { value ->accumulator = if (accumulator === NULL) {value} else {operation(accumulator as T, value)}emit(accumulator as T)}
}

简单使用:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {val result = flow.reduce { accumulator, value -> accumulator + value }println("reduce: $result")flow.runningReduce { accumulator, value -> accumulator + value }.collect { println("runningReduce: $it") }}job.join()
}

运行结果:

reduce: 15
runningReduce: 1
runningReduce: 3
runningReduce: 6
runningReduce: 10
runningReduce: 15

fold() 与 runningFold() 与 reduce() 和 runningReduce() 非常相似,有两个主要区别:

  1. 在于 fold() 与 runningFold() 可以通过参数提供一个初始值,并且该初始值的数据类型可以与 Flow 的数据类型不同,并且初始值的类型就是返回值的类型
  2. runningFold() 会先发射一次初始值再进行计算

源码:

public suspend inline fun <T, R> Flow<T>.fold(initial: R,crossinline operation: suspend (acc: R, value: T) -> R
): R {var accumulator = initialcollect { value ->accumulator = operation(accumulator, value)}return accumulator
}public fun <T, R> Flow<T>.runningFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {var accumulator: R = initialemit(accumulator)collect { value ->accumulator = operation(accumulator, value)emit(accumulator)}
}

示例代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {val result = flow.fold("ha") { accumulator, value -> "$accumulator - $value" }println("fold: $result")flow.runningFold("ha") { accumulator, value -> "$accumulator - $value" }.collect { println("runningFold: $it") }}job.join()
}

运行结果:

fold: ha - 1 - 2 - 3 - 4 - 5
runningFold: ha
runningFold: ha - 1
runningFold: ha - 1 - 2
runningFold: ha - 1 - 2 - 3
runningFold: ha - 1 - 2 - 3 - 4
runningFold: ha - 1 - 2 - 3 - 4 - 5

另外,runningFold() 还有一个方便的别名 scan():

public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = runningFold(initial, operation)

reduce 与 fold 系列操作符,在 Iterable 中都有对应的同名函数,功能也都是类似的。

8.3 onEach 操作符

onEach() 允许你提供一个代码块,在每条数据到来的时候,执行该代码块再把新的数据向下发送。它是一个数据监听器,不会修改 Flow,而是生成新的 Flow:

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->action(value)return@transform emit(value)
}

onEach() 可以用作数据监听器,示例代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.onEach {println("onEach: $it")}.onEach {it * 2}.collect {println("result: $it")}}job.join()
}

运行结果:

onEach: 1
result: 1
onEach: 2
result: 2
onEach: 3
result: 3
onEach: 4
result: 4
onEach: 5
result: 5

8.4 chunked 操作符

chunked 操作符用于将 Flow 中的元素分组成指定大小的块,即将 Flow<T> 类型的流处理成 Flow<List<T>> 的流:

@ExperimentalCoroutinesApi
public fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> {require(size >= 1) { "Expected positive chunk size, but got $size" }return flow {var result: ArrayList<T>? = null // Do not preallocate anythingcollect { value ->// Allocate if neededval acc = result ?: ArrayList<T>(size).also { result = it }acc.add(value)if (acc.size == size) {emit(acc)// Cleanup, but don't allocate -- it might've been the case this is the last elementresult = null}}result?.let { emit(it) }}
}

示例代码:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flowOf(1, 2, 3, 4, 5)val job = scope.launch {flow.chunked(2).collect { println("chunked: $it") }}job.join()
}

运行结果:

chunked: [1, 2]
chunked: [3, 4]
chunked: [5]

chunked 操作符是在 Kotlin 2.1.0 版本中新增的操作符,并且是 @ExperimentalCoroutinesApi,添加该操作符的修改可以参考 Kotlin 在 GitHub 上开源的 commit:c9c735a。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com