您的位置:首页 > 汽车 > 时评 > 聊聊 golang 中 channel

聊聊 golang 中 channel

2025/1/10 21:07:21 来源:https://blog.csdn.net/molaifeng/article/details/139861281  浏览:    关键词:聊聊 golang 中 channel

1、引言

Do not communicate by sharing memory; instead, share memory by communicating

Golang 的并发哲学是“不要通过共享内存进行通信,而要通过通信来共享内存”,提倡通过 channel 进行 goroutine 之间的数据传递和同步,而不是通过共享变量(内存)来实现。

func write(chanInt chan int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}func TestCSP(t *testing.T) {chanInt := make(chan int, 10)chanExit := make(chan bool)go write(chanInt)go read(chanInt, chanExit)for {select {case _, ok := <-chanExit:if !ok {fmt.Println("done")return}}}}

如上述示例,write 函数负责写,read 函数负责读,chanInt 负责在两个 goroutine 进行数据同步,chanExit 负责监听数据已处理完成,并最终退出。整个程序没有看到锁,非常的优雅。

接下来,来说说 channel 的特性,最后结合底层源码来加深印象。

2、特性

2.1 基本用法

由于 channel 是引用类型,需要用 make 来初始化

chanBuffer := make(chan int, 10)
chanNoBuffer := make(chan int)

这里创建的是可读写的 channel,区别在于是否有 capacity(容量)

  • 带缓冲区的 channel,可以存储 cap 个数据
  • 不带缓冲区的 channel,一般用于同步
chanWriteOnly := make(chan<- int)
chanReadOnly := make(<-chan int)

这里创建的是只写和只读的 channel,不过这样写意义不大,一般用于传参,接下来用这两个 chan 把引言示示例中关于 writeread 函数给改下

func write(chanInt chan<- int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt <-chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}

查看 channel 的长度和容量

func TestChanLenCAP(t *testing.T) {chanInt := make(chan int, 2)chanInt <- 1fmt.Println(len(chanInt)) // 1fmt.Println(cap(chanInt)) // 2
}

关闭 channel

close(ch)

判断 channel 是否已关闭

func TestChanIsClosed(t *testing.T) {chanInt := make(chan int, 10)close(chanInt)if _, ok := <-chanInt; !ok {fmt.Println("closed")}
}

向一个已关闭的 channel 读数据,会读到零值,并且每次读也都是零值,因此可以利用这个特性来判断 channel 是否已关闭。

2.2 异常情况

接下来看看几种需要注意的异常情况

注意: Golang 版本为 1.19.12。不同版本的调度器和运行时的行为可能会有所不同,尤其是与死锁检测相关的机制。这些变化可能导致在某些版本中程序会更快地检测到死锁,而在其他版本中则可能仅仅是阻塞而不报错。

2.2.1 给一个 nil channel发送数据,

func TestWriteNil(t *testing.T) {var chanInt chan intchanInt <- 1
}

由于 chanInt 还没初始化,值为 nil,此时代码会阻塞在 chanInt <- 1 这一行,并最终形成死锁。

fatal error: all goroutines are asleep - deadlock!

解法:channel 使用前需要使用 make 初始化。

2.2.2 从一个 nil channel 读数据

func TestReadNil(t *testing.T) {var chanInt chan int<-chanInt
}

由于 chanInt 还没初始化,值为 nil,此时代码会阻塞在 <-chanInt 这一行,并最终形成死锁。

fatal error: all goroutines are asleep - deadlock!

解法:channel 使用前需要使用 make 初始化。

2.2.3 关闭一个 nil channel

func TestCloseNil(t *testing.T) {var chanInt chan intclose(chanInt)
}

如果尝试关闭一个 nilchannel,会导致运行时错误 panic: close of nil channel

panic: close of nil channel [recovered]panic: close of nil channel

解法:channel 使用前需要使用 make 初始化。

前三个异常说明,channel 使用前一定要使用 make 进行初始化。

2.2.4 向一个已关闭的 channel 发数据

func TestWriteClosed(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)chanNoBuffer <- 1
}

向一个已关闭的 channel 发送数据会引起 panic

panic: send on closed channel [recovered]panic: send on closed channel

这是因为一旦 channel 被关闭,就不能再向其发送数据,但可以继续从中接收数据

解法:判断 channel 是否已关闭。

2.2.5 向一个已关闭的 channel 发起重复关闭动作

func TestClosedOnceMore(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)close(chanNoBuffer)
}

尝试关闭一个已经关闭的 channel 会导致运行时错误 panic: close of closed channel。这个错误通常出现在多个 goroutine 试图关闭同一个 channel 或者代码逻辑不正确导致同一个 channel 被关闭多次。

panic: close of closed channel [recovered]panic: close of closed channel

解法:判断 channel 是否已关闭。

2.2.6 向没有缓冲区的 channel 写数据,但没有读取方

func TestSendNoBuffer(t *testing.T) {ch := make(chan int)ch <- 4
}

无缓冲的 channel 是一种同步通信机制,当只有发送方,没有接收方,会陷入阻塞而死锁。

fatal error: all goroutines are asleep - deadlock!

解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。

2.2.7 向没有缓冲区的 channel 读取数据,但没有写入方

func TestReadNoBuffer(t *testing.T) {ch := make(chan int)<-ch
}

尝试从一个无缓冲的 channel 读取数据时,如果没有其他 goroutine 向该 channel 发送数据,读取操作将会阻塞。这会导致程序死锁,并最终导致运行时错误。

fatal error: all goroutines are asleep - deadlock!

解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。

2.2.8 无缓冲区 channel 的发送和接收操作没有同时进行

func ReadNoBufferChan(chanBool chan bool) {<-chanBool
}func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)ch <- truego ReadNoBufferChan(ch)time.Sleep(1 * time.Second)
}

上面两个异常一直强调,由于无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。代码执行到 ch <- chan 时,调度器发现没有任何 goroutine 接收,于是阻塞并死锁。

fatal error: all goroutines are asleep - deadlock!

解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。

func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)go ReadNoBufferChan(ch)ch <- truetime.Sleep(1 * time.Second)
}

go ReadNoBufferChan(ch) 提前,这样就确保了在发送数据之前,有一个 goroutine 正在等待接收数据。

对于无缓冲的 channel

  • 读取和写入要成对出现,并且不能在同一个 goroutine
  • 使用 for 读取数据时,写入方需要关闭 channel

2.2.9 向有缓存区的 channel 先读数据

func TestWriteBufferChan(t *testing.T) {ch := make(chan int, 1)if _, ok := <-ch; !ok {fmt.Println("closed")}
}

当尝试从一个空的带缓冲的 channel 读取数据时,读取操作会阻塞,直到有数据被写入 channel。这是因为即使是带缓冲的 channel,也需要在读取数据时有数据可读。

带缓冲的 channel 和无缓冲的 channel 的主要区别在于:带缓冲的 channel 可以存储一定数量的数据,而无缓冲的 channel 则需要发送和接收操作同步进行。然而,这并不改变以下事实:当一个 goroutine 试图从空的 channel 读取数据时,它会被阻塞,直到有其他 goroutine 写入数据。

fatal error: all goroutines are asleep - deadlock!

解法:需要在读取数据时有数据可读。

2.2.10 向有缓存区的 channel 写数据,但没有读取数据

func TestReadBufferChan(t *testing.T) {ch := make(chan int, 1)ch <- 1ch <- 2
}

当带缓冲的 channel 在缓冲区满时,写入操作会阻塞,直到有数据被读取以腾出缓冲区空间。如没有读取方,最后就会因阻塞而死锁。

fatal error: all goroutines are asleep - deadlock!

解法:当带缓冲的 channel 在缓冲区满时,需要有读取方,或者增加缓冲区的大小。

注意:对于带缓冲的 channel 在缓冲区没超过容量之前,写入数据,若没有读取,不像不带缓冲区的 channel 那样,不会产生死锁的。

其实,最后这两个带缓冲区 channel 异常情况总结就是

  • 若在同一个 goroutine 里,写数据操作一定在读数据操作前
  • channel 空了,接收者会阻塞
  • channel 满了,发送者会阻塞

3、底层实现

3.1 数据结构

Golangchannel 在运行时使用 runtime.hchan 结构体表示。

// runtime/chan.go
type hchan struct {qcount   uint           // 队列中的数据个数dataqsiz uint           // 环形缓冲区的大小buf      unsafe.Pointer // 环形缓冲区指针elemsize uint16         // 单个元素的大小closed   uint32         // 标志 channel 是否关闭elemtype *_type         // 元素的类型sendx    uint           // 发送操作的索引recvx    uint           // 接收操作的索引recvq    waitq          // 等待接收的 goroutine 队列sendq    waitq          // 等待发送的 goroutine 队列lock     mutex          // 保护 channel 的锁
}

在这里插入图片描述

先看看环形缓冲区相关的字段:

  • qcount: 当前缓冲区中的元素个数。
  • dataqsiz: 环形缓冲区的容量。
  • buf: 实际存储数据的缓冲区,类型为 unsafe.Pointer(类似 C 语言的 void *)。
  • elemsize: 每个元素的大小。
  • sendx: 环形缓冲区中下一个待写入的位置。
  • recvx: 环形缓冲区中下一个待读取的位置。

再来看看发送和接收队列:

  • recvq: 等待接收的 goroutine 队列。
  • sendq: 等待发送的 goroutine 队列。

这两个队列是通过 waitq 结构体来实现的,waitq 本质上是一个双向链表,链表中的每个节点是一个 sudog 结构体,sudog 代表一个等待中的 goroutine

type waitq struct {first *sudoglast  *sudog
}

最后看看 lock 字段

  • lock 锁用于保护 channel 数据结构的互斥锁。Golang 使用自旋锁和互斥锁的结合来保证 channel 操作的线程安全。

3.2 初始化

func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}

这里主要说下 switch 相关的分支代码

  • 第一个分支:如果 channel 的缓冲区大小是 0(也就是创建无缓冲 channel),或 channel 中的元素大小是 0(如 struct{}{}Golang 中“空结构体”是不占内存的,size0)时,调用 mallocgc() 在堆上为 channel 开辟一段大小为 hchanSize 的内存空间。
    • 这里说下 c.buf = c.raceaddr()c.raceaddr() 会返回一个地址,这个地址在内存中不会被实际用于存储数据,但会被数据竞争检测工具(如 Golangrace detector)用于同步,这也是无缓冲区的 channel 用来做数据同步场景的由来。
  • 第二个分支:如果元素不包含指针时。调用 mallocgc 一次性分配 hchanbuf 的内存。
  • 第三个分支:默认情况元素类型中有指针类型,调用了两次分配空间的函数 new/mallocgc

仔细看,三个分支都调用了 mallocgc 在堆上分配内存,也就说 channel 本身会被 GC 自动回收。

在函数的最后会初始化通道结构的字段,包括元素大小、元素类型、缓冲区大小和锁。

3.2 发送数据

// entry point for c <- x from compiled code
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed.  it is easiest to loop and re-run* the operation; we'll see that it's now closed.*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 当 channel 为 nil 时处理if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}// 竞态检测,是用来分析是否存在数据竞争。go test -race ./...if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second full()).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation. However, nothing here// guarantees forward progress. We rely on the side effects of lock release in// chanrecv() and closechan() to update this thread's view of c.closed and full().if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 加锁lock(&c.lock)// 检查 channel 是否关闭if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 检查是否有等待接收的 goroutineif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 检查 channel 缓冲区是否有空位if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}// 非阻塞模式下if !block {unlock(&c.lock)return false}// 阻塞模式下,将当前 goroutine 加入发送队列并挂起,receiver 会帮我们完成后续的工作// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包 sudogmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.// 将这个发送 g 从 Grunning -> Gwaiting// 进入休眠atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// 以下唤醒后需要执行的代码// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)if closed {// 唤醒后,发现 channel 被关闭了if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}

代码比较长,可以分为两大部分:异常检测和发送数据

3.2.1 异常检测

代码一开始就排除了在异常章节中 nil channel 的情形,比如未初始化,或是被 GC 回收了。

接着会检测非阻塞模式下,也就是有缓冲区的 channel,如果还未 close 并且缓冲区已经满了,则直接返回 false


func TestASyncSendFull(t *testing.T) {ch := make(chan int, 1) // 创建一个缓冲区大小为 1 的 channelch <- 1                 // 向 channel 发送一个元素,此时缓冲区已满select {case ch <- 2: // 尝试发送第二个元素fmt.Println("Successfully sent 2")default: // 缓冲区已满,进入 default 分支fmt.Println("channel is full, unable to send 2")}
}

3.2.2 发送数据

发送数据可以归纳为以下三点

  • 直接发送:当 recvq 存在等待的接收者时,那么通过 runtime.send 直接将数据发送给阻塞的接收者
    • 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的 goroutine 标记成可运行状态 grunnable 并把该 goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
  • 异步发送:当 buf 缓冲区存在空余空间时,将发送的数据写入 channel 的缓冲区;
  • 阻塞发送:当不存在缓冲区或者缓冲区已满时,等待其他 goroutinechannel 接收数据;
    • 将当前 goroutine 加入 sendq 发送队列并挂起,阻塞等待其他的协程从 channel 接收数据;
    • 当唤醒后,检查是否因为 channel 关闭而唤醒,如果是则触发 panic

发送数据的过程中包含几个会触发 goroutine 调度的时机:

  • 发送数据时发现 channel 上存在等待接收数据的 goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度
  • 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 channelsendq 发送队列并调用 runtime.goparkunlock 触发 goroutine 的调度让出处理器的使用权;

3.3 接收数据

// entry points for <- c from compiled code
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return
}// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}// 如果在 nil channel 上进行 recv 操作,那么会永远阻塞if c == nil {// 非阻塞的情况下,要直接返回,非阻塞出现在一些 select 的场景中if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block && empty(c) {// After observing that the channel is not ready for receiving, we observe whether the// channel is closed.//// Reordering of these checks could lead to incorrect behavior when racing with a close.// For example, if the channel was open and not empty, was closed, and then drained,// reordered reads could incorrectly indicate "open and empty". To prevent reordering,// we use atomic loads for both checks, and rely on emptying and closing to happen in// separate critical sections under the same lock.  This assumption fails when closing// an unbuffered channel with a blocked send, but that is an error condition anyway.if atomic.Load(&c.closed) == 0 {// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.return}// The channel is irreversibly closed. Re-check whether the channel has any pending data// to receive, which could have arrived between the empty and closed checks above.// Sequential consistency is also required here, when racing with such a send.if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// 当前 channel 中没有数据可读if c.closed != 0 {if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channel's buffer have data.} else {// sender 队列中有 sudog 在等待// 直接从该 sudog 中获取数据拷贝到当前 g 即可// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}if c.qcount > 0 {// 直接从 buffer 里拷贝数据// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)// 接收索引 +1c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}// buffer 元素计数 -1c.qcount--unlock(&c.lock)return true, true}// 非阻塞时,且无数据可收if !block {unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包成 sudogmysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil// 进入 recvq 队列c.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us up// 被唤醒if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)// 如果 channel 未被关闭,那就是真的 recv 到数据了return true, success
}func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}if ep != nil {// copy data from sender// 直接从发送者复制数据recvDirect(c.elemtype, sg, ep)}} else {// 缓冲区已满,从队列头部取出数据// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)}// 将数据从队列复制到接收者// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}// 将数据从发送者复制到队列// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)sg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}

Golangchannel 中,有两种接收方式

num <- ch
num, ok <- ch

这两种分别对应上述源码中的 chanrecv1chanrecv2,不过最终都会走到 chanrecv 函数。

3.3.1 异常检测

当我们从一个 nil channel 接收数据时(这里 nil 有可能是被 GC 回收导致的),若是非阻塞的 channel 会直接返回,否则会直接调用 runtime.gopark 让出处理器的使用权。

如果当前 channel 已经被 close 并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回。这里也就说明了为什么可以多次从已关闭的 channel 读取数据而不会报错。

3.3.2 接收数据

channel 接收数据可以归纳为以下三种情况:

3.3.2.1 直接接收

sendq 发送队列存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据。具体分为以下两种场景,可以仔细看 recv 函数

  • 场景一

buf 缓冲区的容量 dataqsiz0,也就是同步的 channel,调用 recvDirectsendq 发送队列中 sudog 存储的 ep 数据直接拷贝到接收者的内存地址中。

  • 场景二

当缓冲区已满时(会有两次内存的拷贝)

  • 先取出 buf 缓冲区头部的数据发给接收者(第一次拷贝)
  • 接着取出 sendq 发送队列头的数据拷贝到 buf 缓冲区中,并释放一个 sudog 阻塞的 goroutine(第二次拷贝)

到这里获取有人会问,为什么不直接从 sendq 取出数据发给接收方,而是要从 buf 里取出发给接收方?

原因在于 Golang 在缓冲模式下,channel 的数据在缓冲区中按照 FIFO(先入先出)顺序存储。缓冲区头部的数据肯定是最先存入的,那么也就需要最先取出。

这里再说下场景二下关于 recvxsendx 的更新机制。

  • 缓冲区已满时的处理逻辑

buf 缓冲区满时,recvx 指向的是 buf 的头部位置,这也是下一个将要被接收的数据。注意此时 sendx 也是指向缓冲区的头部位置。因为缓冲区已满,下一次发送会覆盖最旧的数据。

  • 从缓冲区读取数据

此时从已满的 buf 缓冲区读取数据,接收者从缓冲区的头部位置 recvx 获取数据,并将数据传递给接收方。并更新 recvx,使其指向下一个将要被接收的数据位置。

  • sendq 拷贝到缓冲区

由于此时 buf 头部的数据已经发送,那么则取出 sendq 头部的数据覆盖刚刚头部的位置所在的数据,并更新 sendx,使其和 recvx 保持一致,指向下一个要发送的位置。

这两个场景,无论发生哪种情况,运行时都会调用 runtime.goready 将当前处理器的 runnext 设置成发送数据的 goroutine,在调度器下一次调度时将阻塞的发送方唤醒。

3.3.2.2 异步接收

buf 缓冲区的 qcount 大于 0 时,也就是带缓冲的 channel 有数据时,那么会从 buf 缓冲区中 recvx 的索引位置取出数据进行处理:

  • 如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中,并通过 runtime.typedmemclr 清除队列中的数据
  • 最后更新 channel 上相关数据:recvx 指向下一个位置(如果移动到了环形队列的队尾,下标需要回到队头),channelqcount 长度减一,并释放持有 channel 的锁
3.3.2.3 阻塞接收

当不属于上述两种情况,即当 channelsendq 发送队列中不存在等待的 goroutine 并且 buf 缓冲区中也不存在任何数据时,从 channel 中接收数据的操作会变成阻塞的。此时会将当前的goroutine 挂起并加入 channel 的接收队列 recvq,以便在有数据可用时能够被唤醒。

当然了,若是 goroutine 被唤醒后会完成 channel 的阻塞数据接收。接收完最后进行基本的参数检查,解除 channel 的绑定并释放 sudog

结合异常检测那一节,发现从 channel 接收数据时,会触发 goroutine 调度的两个时机:

  • channelnil
  • buf 缓冲区中不存在数据并且也不存在数据的发送者时

3.4 关闭管道

最后来看看关闭通道实现

func closechan(c *hchan) {// 关闭一个 nil channel 会直接 panicif c == nil {panic(plainError("close of nil channel"))}// 上锁,这个锁的粒度比较大,一直到释放完所有的 sudog 才解锁lock(&c.lock)// 在 close channel 时,如果 channel 已经关闭过了,直接触发 panicif c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}c.closed = 1var glist gList// release all readersfor {sg := c.recvq.dequeue()// 弹出的 sudog 是 nil,说明读队列已经空了if sg == nil {break}// sg.elem unsafe.Pointer,指向 sudog 的数据元素// 该元素可能在堆上分配,也可能在栈上if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}// 将 goroutine 入 glist// 为最后将全部 goroutine 都 ready 做准备gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panic(向一个关闭的 channel 发数据会引起 panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panicgp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 在释放所有挂在 channel 上的读或写 sudog 时,是一直在临界区的unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0// 使 g 的状态切换到 Grunnablegoready(gp, 3)}
}

3.4.1 异常检测

  • 关闭一个 nil channel 会直接 panic
  • close channel 时,如果 channel 已经关闭过了,直接触发 panic

3.4.2 释放所有接收方和发送方

关闭 channel 的主要工作是释放所有的 readerswriters

主要就是取出 recvqsendqsudog 加入到 goroutine 待清除 glist 队列中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素。同时需要注意的是:在处理 sendq 时有可能会 panic,在之前的异常情况中列举往一个 closechannel 发送数据会引起 panic

最后会为所有被阻塞的 goroutine 调用 runtime.goready 触发调度。将所有 glist 队列中的 goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度。

3.4.3 优雅关闭通道

最后说说如何优雅关闭 channel

通过之前的异常小节介绍,发现:

  • 向已关闭的 channel 发送数据,会导致 panic
  • 重复关闭 channel,也会导致 panic

同时,还了解了:

  • 从一个已关闭的 channel 中接收数据,会得到零值,且不会导致程序异常
  • 关闭一个 channel,那么所有接收这个 channelselect case 都会收到信号

那么这里就引用 How to Gracefully Close Channels 介绍的优雅关闭 channel 方法来收尾。

package _0240623import ("log""math/rand""strconv""sync""testing""time"
)func TesGracefullyCloseChannel(t *testing.T) {rand.Seed(time.Now().UnixNano()) // needed before Go 1.20log.SetFlags(0)// ...const Max = 100000const NumReceivers = 10const NumSenders = 1000wgReceivers := sync.WaitGroup{}wgReceivers.Add(NumReceivers)// ...dataCh := make(chan int)stopCh := make(chan struct{})// stopCh is an additional signal channel.// Its sender is the moderator goroutine shown// below, and its receivers are all senders// and receivers of dataCh.toStop := make(chan string, 1)// The channel toStop is used to notify the// moderator to close the additional signal// channel (stopCh). Its senders are any senders// and receivers of dataCh, and its receiver is// the moderator goroutine shown below.// It must be a buffered channel.var stoppedBy string// moderatorgo func() {stoppedBy = <-toStopclose(stopCh)}()// sendersfor i := 0; i < NumSenders; i++ {go func(id string) {for {value := rand.Intn(Max)if value == 0 {// Here, the try-send operation is// to notify the moderator to close// the additional signal channel.select {case toStop <- "sender#" + id:default:}return}// The try-receive operation here is to// try to exit the sender goroutine as// early as possible. Try-receive and// try-send select blocks are specially// optimized by the standard Go// compiler, so they are very efficient.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and for ever in theory) if the send// to dataCh is also non-blocking. If// this is unacceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase dataCh <- value:}}}(strconv.Itoa(i))}// receiversfor i := 0; i < NumReceivers; i++ {go func(id string) {defer wgReceivers.Done()for {// Same as the sender goroutine, the// try-receive operation here is to// try to exit the receiver goroutine// as early as possible.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and forever in theory) if the receive// from dataCh is also non-blocking. If// this is not acceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase value := <-dataCh:if value == Max-1 {// Here, the same trick is// used to notify the moderator// to close the additional// signal channel.select {case toStop <- "receiver#" + id:default:}return}log.Println(value)}}}(strconv.Itoa(i))}// ...wgReceivers.Wait()log.Println("stopped by", stoppedBy)
}

这段代码的核心是这里

// moderator
go func() {stoppedBy = <-toStopclose(stopCh)
}()

对于生产者和消费者是 M*N 的情况,显然既不能在生产方关闭通道,也不适合在消费方关闭通道。那么就引入中间方,那就是 toStop,起个 goroutine 然后 stoppedBy = <-toStop 阻塞在这里,只要生产者和消费者一方满足条件,向 toStop 写入数据了,那么就可以关闭 stopCh。这也正好契合上面的 moderator 注释,一个 协调者,用来协调生产者和消费者在 M*N 情况下如何优雅关闭 channel

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com