文章目录
- async() 和 Channel 的类比
- 提供一次性结果给其他协程:async()
- 提供多次不同结果给其他协程:Channel
- produce() 和手动创建 Channel 的选择
- Channel 的本质和适用场景
- Channel 的 API 详解
- 遍历 Channel
- 设置 Channel 缓冲区大小:capacity
- 设置缓冲溢出策略:onBufferOverflow
- Channel 的关闭:close() 和 cancel()
- Channel 的 close()
- Channel 的 cancel()
- Channel 的关闭及时释放资源:onUndeliveredElement
- Channel 的 trySend() 和 tryReceive()
- receiveCatching()
- 把 SendChannel 暴露出来
- 总结
有了解或深入使用过协程的人会知道,在日常业务开发我们几乎用不到 Channel,它更上层的 Flow 就能满足我们的需求开发;但如果你在公司是架构师或技术研究员这类造轮子的岗位,就很有必要了解 Channel,因为 Channel 是协程间通信的关键技术点。
在 Channel 和 Flow 简介与对比 我们有提到 可以把 Channel 理解为支持多条数据的 async(),那么我们就从 async() 说起,然后再讲解 Channel。
async() 和 Channel 的类比
提供一次性结果给其他协程:async()
如果我们希望协程可以提供一个结果给别的协程用,可以通过 async():
fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val deferred = scope.async { gitHub.contributors("square", "retrofit") }launch {delay(5000)// async 可以将结果在另一个协程获取println("Contributors: ${deferred.await()}") }delay(10000)
}
async() 将异步协程的启动和结果的获取拆分开,启动协程用 async(),获取结果用 await()。多次调用 await() 也是只能拿到同样的返回值,因为 async() 是一次性的。
假设现在我们有新的需求:设计一个股票软件,为了有实时性,需要对每次发起的网络请求结果提供给其他协程使用。
或许我们可以想到的方案是轮询或者 Web-Socket,但是前面我们提到 async() 是一次性的,无法满足这种需求;接下来就是 Channel 登场了。
提供多次不同结果给其他协程:Channel
我们先看下如何使用 Channel 每次获取请求结果在其他协程使用:
fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)// produce() 启动生产数据的协程val receiver = scope.produce {while (isActive) {val data = gitHub.contributors("square", "retrofit")// send() 发送数据send(data)}}launch {delay(5000)// receive() 每次调用都会接收 send() 请求的不同的数据while (isActive) {println("Contributors: ${receiver.receive()}")}}delay(10000)
}
使用 Channel 只需要三个步骤:
-
produce() 启动一个 [生产数据给别的协程来用] 的协程
-
send() 发送数据
-
使用 produce() 返回的 ReceiveChannel 对象,在其他协程调用 receive() 获取 send() 发送的数据
produce() 的 block 代码块提供的是 ProducerScope,实际上它是 CoroutineScope 的子类:
Produce.ktpublic fun <E> CoroutineScope.produce(context: CoroutineContext = EmptyCoroutineContext,capacity: Int = 0,@BuildeInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> = produce(context, capacity, BufferOverflow.SUSPEND, CoroutinStart.DEFAULT, onCompletion = null, block = block)// ProduceScope 继承自 CoroutineScope
public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {}
我们可以直接把 ProducerScope 当成正常协程 CoroutineScope 来用就行, 在它里面也可以启动子协程,协程取消流程和异常流程都适用。
了解了 Channel 的使用方式,我们简单类比下 async() 和 Channel 的区别:
-
启动协程的区别:async() 返回的 Deferred 对象,produce() 返回的 ReceiveChannel 对象
-
接收结果的区别:Deferred.await() 返回结果是一次性的,多次调用返回相同的返回值;ReceiveChannel.receive() 每次调用从 send() 获取不同的返回值
produce() 和手动创建 Channel 的选择
根据上面使用 Channel 的方式我们可以知道,Channel 即能通过 send() 发送数据,又能通过 receive() 接收数据,只不过发送和接收使用的对象类型不同而已;可以通过 Channel 的实现一探究竟:
// Channel 是一个接口,既实现了 SendChannel 也实现了 ReceiveChannel
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {...
}
Channel 即实现了 SendChannel 可以调用 send() 发送数据,又实现了 ReceiveChannel 可以调用 receive() 接收数据。
实际上相比上面提到的 Channel 的使用方式,还有更干净的使用 Channel 的方式:
fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)// Channel 是一个接口,Channel() 是一个工厂函数,根据传参不同创建不同的 Channelval channel = Channel<List<Contributor>>()scope.launch {// 发送数据channel.send(gitHub.contributors("square", "retrofit")}scope.launch {// 接收数据channel.receive()}delay(10000)
}
Channel 就是一个单向的管道,有入口也有出口:
-
通过 Channel() 工厂函数创建 Channel
-
在不同的协程调用 channel.send() 发送数据
-
在不同的协程调用 channel.receive() 接收数据
Channel 有两种代码编写方式,一种是直接用 Channel,一种是用 produce(),那实际开发中我们应该使用哪种方式?
实际上 produce() 是把 Channel 应用结构封装起来,将 Channel 的创建和数据的发送包在了协程的内部;简单说就是将手动创建 Channel 和手动启动协程的操作封装起来,方便我们使用:
Produce.ktinternal fun <E> CoroutineScope.produce(...
): ReceiveChannel<E> {// 创建了一个 Channelval channel = Channel<E>(capacity, onBufferOverflow)val newContext = newCoroutineContext(context)// 创建一个 ProducerCoroutine 协程,将跨协程的事件流委托交给 channel,并启动协程val coroutine = ProducerCoroutine(newContext, channel)if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)coroutine.start(start, coroutine, block)return coroutine
}// ProducerCoroutine 继承自 ChannelCoroutine,将 channel 继续传递
private class ProducerCoroutine<E>(parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {...
}// ChannelCoroutine 实现了 Channel 接口,并把 Channel 的处理委托为 _channel
internal open class ChannelCoroutine<E>(parentContext: CoroutineContext,protected val _channel: Channel<E>,initParentJob: Boolean,active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob, active), Channel<E> by _channel {...
}
手动创建 Channel 和使用 produce() 可以分场景使用:
-
如果你的业务逻辑比较散需要把代码散开来写,可以手动创建 Channel、手动开启协程
-
如果希望把 Channel 的创建及数据的发送和协程放在一起,可以直接用 produce() 简化代码结构
Channel 的本质和适用场景
Channel 的本质就是一个在不同协程之间传递数据的通道,任何协程都能调用它的 send() 发送数据和调用 receive() 读取数据。
之所以拆分成 SendChannel 和 ReceiveChannel 仅是为了暴露尽量少的 API 提供给开发者,实际在使用的对象都是同时实现了这两个接口。
可以把 Channel 理解为是一个队列,承担类似 Java 里的阻塞式队列 BlockingQueue 的角色,只不过 Channel 是协程版的,把阻塞式的实现改成了挂起式;当从队列添加数据但元素满了或者从队列读取数据但队列为空,就会将协程挂起而不是把线程卡住。
因为 Channel 是这种挂起式的队列,所以也导致了它不适合做可订阅的事件流。
比如我们看下面的示例代码:
fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val channel = Channel<List<Contributor>>()// 在一个协程发送数据scope.launch {channel.send(gitHub.contributors("square", "retrofit")}// 有多个接收者接收数据scope.launch {while (isActive) {channel.receive()}}scope.launch {while (isActive) {channel.receive()}}delay(10000)
}
在上面示例代码中,只有一个发送者但有多个订阅者,由于每一个 send() 只能被一个 receive() 收到,如果其中一个订阅者接收了数据,那么另一个就得等待直到 send() 有下一个数据另一个订阅者才能接收;这就导致每个订阅者都没法获取完整的事件序列,即订阅者不能共享每一个事件。
Channel 不适用于超过多个订阅者的场景,因为此时订阅机制不能正常工作。
Channel 适合的场景是,只能用它做一些小范围的内部订阅,整个功能模块独立都是自己做的不对外提供,在这个模块内需要一个单点的订阅。如果有看 Compose 的源码会发现会有挺多地方就是使用的 Channel 做单点订阅。
Channel 目前更多的是作为 Flow 的下层 API 支持,事件订阅应该使用 SharedFlow 而不是 Channel。
Channel 的 API 详解
遍历 Channel
在上面的小节我们使用 Channel 在其他协程通过 while 循环不断从 Channel 获取数据:
fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val channel = Channel<List<Contributor>>()scope.launch {channel.send(gitHub.contributors("square", "retrofit")}launch {// 循环从 Channel 获取数据while (isActive) {val contributors = channel.receive()println("Contributors: $contributors")}}delay(10000)
}
其实还有一种更简便的写法,可以直接对 Channel 对象进行循环遍历:
fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val channel = Channel<List<Contributor>>()scope.launch {channel.send(gitHub.contributors("square", "retrofit")}launch {// 直接循环遍历 channel 对象for (data in channel) {println("Contributors: $data")}}delay(10000)
}
能过通过 for 循环遍历实际上也是使用了 kotlin 重载操作符的能力,Channel 重载了 iterator():
Channel.ktpublic interface ReceiveChannel<out E> {public operator fun iterator(): ChannelIterator<E>
}
一般 for循环我们是遍历一个 List 或者一个 Map,而 Channel 的遍历比较特殊,Channel 的遍历过程是挂起式的,在没有元素的时候也会把协程挂起,等待下一个元素的出现,或者 Channel 被关闭了循环才结束。
设置 Channel 缓冲区大小:capacity
Channel 通过 send() 发送数据,而如果我们不断的调用 send() 发送数据,但没有 receive() 消费数据,Channel 的队列就会被塞满;send() 函数是一个挂起函数,当 Channel 队列满了的时候它就会挂起协程,直到有数据了才把数据写到队列。
Channel 队列的长度默认为 0,即就算是第一次调用 send() 也会挂起协程,直到有别的协程调用 receive() 把 send() 发送的数据取走,send() 才会结束。
如果在调用 send() 之前已经有调用 receive() 挂起在等待接收,那么此时调用 send() 就会直接发送成功然后返回。
队列长度为 0 其实就是没有缓存,所有的数据全靠 send() 和 receive() 对接。
我们也可以给 Channel 配置缓冲区,定制 capacity 参数,也就是设置队列的大小:
fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)// 协程提供的尺寸:// UNLIMITED:取的 Int.MAX_VALUE,不限制// BUFFERED:默认值 64val channel = Channel<List<Contributor>>(8) // 缓存区大小(队列大小)scope.launch {channel.send(gitHub.contributors("square", "retrofit")}launch {for (data in channel) {println("Contributors: $data")}}delay(10000)
}
按上面的例子我们把缓冲区调整为 8,那么第一次 send() 时数据就会直接被放进队列的头部,放完后 send() 也不挂起协程直接就返回了,直到队列长度放不下时才挂起协程。
大多数时候我们根据应用的情况填一个手动的值是最合适的。
设置缓冲溢出策略:onBufferOverflow
在上面的小节提到了设置 Channel 的缓冲区大小,当队列满的时候默认是挂起协程,实际上还可以调整策略,比如丢弃数据。
调整缓冲溢出策略用的 Channel 的第二个参数 onBufferOverflow:
fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)// 协程提供的策略:// SUSPEND:默认值,队列满了就挂起协程// DROP_OLDEST:有新数据塞到队列,就把队列头部的元素丢弃// DROP_LATEST:丢弃新元素val channel = Channel<List<Contributor>>(8, DROP_OLDEST)// 给 capacity 设置 CONFLATED,代表缓冲区长度为1,并且总是丢弃旧元素的 Channel// 等同于 capacity = 1 并且 onBufferOverflow = DROP_OLDEST// 但实际上它要求 onBufferOverflow = SUSPEND,因为默认值就是所以可以不填// val channel = Channel<List<Contributor>>(CONFLATED)scope.launch {channel.send(gitHub.contributors("square", "retrofit")}launch {for (data in channel) {println("Contributors: $data")}}delay(10000)
}
Channel 的关闭:close() 和 cancel()
Channel 的关闭有两个函数:close() 和 cancel()。两个都是关闭 Channel,close() 是从发送端关闭,cancel() 是从接收端关闭。
Channel 的 close() 和 cancel() 分别归属于不同的接口,close() 是 SendChannel 接口,cancel() 是 ReceiveChannel 接口。
Channel 的 close()
Channel 实现了 SendChannel 和 ReceiveChannel,分别会有 isClosedForSend 和 isClosedForReceive 两个标记区分当前状态是否可发送和接收:
Channel.ktpublic interface SendChannel<in E> {@DelicateCoroutinesApipublic val isClosedForSend: Boolean
}public interface ReceiveChannel<out E> {@DelicateCoroutinesApipublic val isClosedForReceive: Boolean
}
Channel 调用了 close() 后,isClosedForSend 会被修改为 true,后续再调用 send() 就会抛出 ClosedSendChannelException,在使用这个 Channel 所在的协程都不能再调用 send()。
如果数据都已经接收完了,isClosedForReceive 会被修改为 true,后续再调用 receive() 就会抛出 ClosedReceiveChannelException。
Channel 调用了 close() 不会发生异常的情况:
-
允许 Channel 调用 receive():Channel 有可能还有数据没接收完缓冲区的数据
-
调用 close() 之前在挂起等待的 send():它们的数据没进缓冲区在队列外排着队,也允许让 receive() 接收
close() 也允许自定义异常,当后续再发送 send() 和 receive() 就不会抛 ClosedSendChannelException 和 ClosedReceiveChannelException,而是会抛自定义的异常:
close(IllegalStateException("Data error!"))
Channel 的 cancel()
当我们的界面被销毁了不再需要新数据了,就可以调用 cancel() 不再接收数据,它会将发送和接收两端都关闭,并且会同时修改 isClosedForSend 和 isClosedForReceive 为 true。
调用了 cancel() 后续再调用 send() 和 receive() 会抛出 CancellationException。可以通过这个异常区分是 close() 的异常还是 cancel() 的异常。
或许会有疑惑为什么要把发送也关闭?我们发送数据就是为了接收,我们都不再接收数据了,发送也就没有意义,自然的就可以将发送端也关闭。
Channel 的关闭及时释放资源:onUndeliveredElement
上面的小节我们提到当调用 Channel 的 cancel() 后会同时关闭发送端和接收端,但这种关闭可能会带来问题:一些已经调用 send() 但没被接收的数据因为被丢弃没释放,可能导致资源泄漏。
比如写文件的操作,希望在接收到使用后就把文件流关闭,但文件没接收就调用了 cancel(),文件流没关闭导致了资源泄漏。
通过设置 Channel 的 onUndeliveredElement,它是针对那些已发送但最终被丢弃没被接收的数据怎么处理的函数。
// 通过 onUndeliveredElement 如果发送了没被接收,就把文件流关闭
val fileChannel = Channel<FileWriter>() { it.close() }
Channel 的 trySend() 和 tryReceive()
trySend() 和 tryReceive() 两个函数和命名的意思一样:能发送就发或者能接收就收,不行就算了。
和 send() 及 receive() 的区别:
-
send() 和 receive() 都是挂起函数,trySend() 和 tryReceive() 是非挂起函数
-
不仅不挂起,也不阻塞线程,调用后瞬时返回:如果因为缓冲满了而发送不了,或者因为缓冲是空的而取不到数据,也并不会等待,而是直接返回,只不过返回的是失败的结果
Channel.ktpublic interface SendChannel<in E> {public fun trySend(element: E): ChannelResult<Unit>
}public interface ReceiveChannel<out E> {public fun tryReceive(): ChannelResult<E>
}
receiveCatching()
receiveCatching() 也是一个挂起函数,相比 receive() 它在遇到异常的时候不会抛异常,而是会把异常包进 ChannelResult 里面,将异常返回到你手里让你自己处理,相当于是 receive() 和 tryReceive() 的结合。
Channel.ktpublic interface ReceiveChannel<out E> {public suspend fun receiveCatching(): ChannelResult<E>
}
把 SendChannel 暴露出来
前面提到把 Channel 的创建及数据发送和协程放在一起可以使用 produce() 简化代码;
实际上还提供了 把 Channel 的创建及数据接收和协程放在一起的函数:actor()。
produce() 是提供一个协程,然后在内部创建一个 Channel 对象出来,并把 ReceiveChannel 用返回值的形式暴露出来,然后在内部提供 SendChannel。
actor() 是提供一个协程,然后在内部创建一个 Channel 对象出来,并把 SendChannel 用返回值的形式暴露出来,然后在内部提供 ReceiveChannel。
使用方式上和 produce() 是一样的,只是处理方式是相反的:
fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)// actor() 返回 SendChannel,在 block 处理数据接收val sender = scope.actor<Int> {for (num in this) {println("Number: $num")}}scope.launch {for (num in 1..100) {// 发送数据sender.send(num)delay(1000)}}delay(10000)
}
总结
一、async() 和 Channel 的类比
1、提供一次性结果给其他协程:async()
async() 能提供一次性结果给其他协程,它将异步协程的启动和结果的获取拆分开,启动协程用 async(),获取结果用 await()。多次调用 await() 也是只能拿到同样的返回值,因为 async() 是一次性的。
2、提供多次不同结果给其他协程:Channel
可以把 Channel 理解为支持多条数据的 async()。
使用 Channel 只需要三个步骤:
-
produce() 启动一个 [生产数据给别的协程来用] 的协程
-
send() 发送数据
-
使用 produce() 返回的 ReceiveChannel 对象,在其他协程调用 receive() 获取 send() 发送的数据
3、async() 和 Channel 的区别
-
启动协程的区别:async() 返回的 Deferred 对象,produce() 返回的 ReceiveChannel 对象
-
接收结果的区别:Deferred.await() 返回结果是一次性的,多次调用返回相同的返回值;ReceiveChannel.receive() 每次调用从 send() 获取不同的返回值
二、produce() 和手动创建 Channel 的选择
Channel 有两种代码编写方式,一种是直接用 Channel,一种是用 produce(), produce() 是把 Channel 应用结构封装起来,将 Channel 的创建和数据的发送包在了协程的内部;简单说就是将手动创建 Channel 和手动启动协程的操作封装起来,方便我们使用。
手动创建 Channel 和使用 produce() 可以分场景使用:
-
如果你的业务逻辑比较散需要把代码散开来写,可以手动创建 Channel、手动开启协程
-
如果希望把 Channel 的创建及数据的发送和协程放在一起,可以直接用 produce() 简化代码结构
三、Channel 的本质和适用场景
Channel 的本质就是一个在不同协程之间传递数据的通道,任何协程都能调用它的 send() 发送数据和调用 receive() 读取数据。
可以把 Channel 理解为是一个队列,承担类似 Java 里的阻塞式队列 BlockingQueue 的角色,只不过 Channel 是协程版的,把阻塞式的实现改成了挂起式;当从队列添加数据但元素满了或者从队列读取数据但队列为空,就会将协程挂起而不是把线程卡住。
因为 Channel 是这种挂起式的队列,所以也导致了它不适合做可订阅的事件流;不适用于超过多个订阅者的场景,因为此时订阅机制不能正常工作。
Channel 适合的场景是:只能用它做一些小范围的内部订阅,整个功能模块独立都是自己做的不对外提供,在这个模块内需要一个单点的订阅。
四、Channel 的 API
1、遍历 Channel
可以通过 for 循环遍历 Channel 对象,Channel 的遍历过程是挂起式的,在没有元素的时候也会把协程挂起,等待下一个元素的出现,或者 Channel 被关闭了循环才结束。
2、设置 Channel 缓冲区大小:capacity
Channel 的 send() 函数是一个挂起函数,当 Channel 队列满了的时候它就会挂起协程,直到有数据了才把数据写到队列。
Channel 队列的长度默认为 0,即就算是第一次调用 send() 也会挂起协程,直到有别的协程调用 receive() 把 send() 发送的数据取走,send() 才会结束。
队列长度为 0 其实就是没有缓存,所有的数据全靠 send() 和 receive() 对接。
已提供的缓冲区大小:
-
UNLIMITED:取的 Int.MAX_VALUE,不限制
-
BUFFERED:默认值 64
大多数时候我们根据应用的情况填一个手动的值是最合适的。
3、设置缓冲溢出策略:onBufferOverflow
提供的缓冲溢出策略有三个:
-
SUSPEND:默认值,队列满了就挂起协程
-
DROP_OLDEST:有新数据塞到队列,就把队列头部的元素丢弃
-
DROP_LATEST:丢弃新元素
4、Channel 的关闭:close() 和 cancel()
(1)close()
Channel 的关闭有两个函数:close() 和 cancel()。两个都是关闭 Channel,close() 是从发送端关闭,cancel() 是从接收端关闭。
Channel 调用了 close() 后如果后续再调用 send() 就会抛出 ClosedSendChannelException;如果数据都已经接收完了,后续再调用 receive() 就会抛出 ClosedReceiveChannelException。
Channel 调用了 close() 不会发生异常的情况:
-
允许 Channel 调用 receive():Channel 有可能还有数据没接收完缓冲区的数据
-
调用 close() 之前在挂起等待的 send():它们的数据没进缓冲区在队列外排着队,也允许让 receive() 接收
(2)cancel()
Channel 的 cancel() 会将发送和接收两端都关闭,调用了 cancel() 后续再调用 send() 和 receive() 会抛出 CancellationException。可以通过这个异常区分是 close() 的异常还是 cancel() 的异常。
调用 cancel() 把发送也关闭的原因是:我们发送数据就是为了接收,我们都不再接收数据了,发送也就没有意义,自然的就可以将发送端也关闭。
(3)Channel 的关闭及时释放资源:onUndeliveredElement
当调用 Channel 的 cancel() 后会同时关闭发送端和接收端,但这种关闭可能会带来问题:一些已经调用 send() 但没被接收的数据因为被丢弃没释放,可能导致资源泄漏。
通过设置 Channel 的 onUndeliveredElement,它是针对那些已发送但最终被丢弃没被接收的数据怎么处理的函数。
(4)Channel 的 trySend() 和 tryReceive()
和 send() 及 receive() 的区别:
-
send() 和 receive() 都是挂起函数,trySend() 和 tryReceive() 是非挂起函数
-
不仅不挂起,也不阻塞线程,调用后瞬时返回:如果因为缓冲满了而发送不了,或者因为缓冲是空的而取不到数据,也并不会等待,而是直接返回,只不过返回的是失败的结果
(5)receiveCatching()
receiveCatching() 也是一个挂起函数,相比 receive() 它在遇到异常的时候不会抛异常,而是会把异常包进 ChannelResult 里面,将异常返回到你手里让你自己处理,相当于是 receive() 和 tryReceive() 的结合。
5、把 SendChannel 暴露出来
Channel 的创建及数据发送和协程放在一起可以使用 produce() 简化代码;实际上还提供了 把 Channel 的创建及数据接收和协程放在一起的函数:actor()。
produce() 是提供一个协程,然后在内部创建一个 Channel 对象出来,并把 ReceiveChannel 用返回值的形式暴露出来,然后在内部提供 SendChannel。
actor() 是提供一个协程,然后在内部创建一个 Channel 对象出来,并把 SendChannel 用返回值的形式暴露出来,然后在内部提供 ReceiveChannel。