目录
数据结构
G
M
P
调度器启动
创建 Goroutine
初始化结构体
运行队列
调度信息
调度循环
小结
数据结构
Go的运行时调度器的三个重要组成部分 — 线程 M、Goroutine G 和处理器 P:
图 6-29 Go 语言调度器
- G — 表示 Goroutine,它是一个待执行的任务;
- M — 表示操作系统的线程,它由操作系统的调度器调度和管理;
- P — 表示处理器,它可以被看做运行在线程上的本地调度器;
G
Goroutine 是 Go 语言调度器中待执行的任务,它在运行时调度器中的地位与线程在操作系统中差不多,但是它占用了更小的内存空间,也降低了上下文切换的开销。
Goroutine 只存在于 Go 语言的运行时,它是 Go 语言在用户态提供的线程,作为一种粒度更细的资源调度单元,如果使用得当能够在高并发的场景下更高效地利用机器的 CPU。
Goroutine 在 Go 语言运行时使用私有结构体 runtime.g 表示。这个私有结构体非常复杂,这里也不会介绍所有的字段,仅会挑选其中的一部分,首先是与栈相关的两个字段:
type g struct {stack stackstackguard0 uintptr
}
其中 stack
字段描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi),另一个字段 stackguard0
可以用于调度器抢占式调度。除了 stackguard0
之外,Goroutine 中还包含另外三个与抢占密切相关的字段:
type g struct {preempt bool // 抢占信号preemptStop bool // 抢占时将状态修改成 `_Gpreempted`preemptShrink bool // 在同步安全点收缩栈
}
最后,我们再节选一些比较重要的字段:
type g struct {m *msched gobufatomicstatus uint32goid int64
}
m
— 当前 Goroutine 占用的线程,可能为空;atomicstatus
— Goroutine 的状态;sched
— 存储 Goroutine 的调度相关的数据;goid
— Goroutine 的 ID,该字段对开发者不可见,Go 团队认为引入 ID 会让部分 Goroutine 变得更特殊,从而限制语言的并发能力10;
上述四个字段中,我们需要展开介绍 sched
字段的 runtime.gobuf 结构体中包含哪些内容:
type gobuf struct {sp uintptrpc uintptrg guintptrret sys.Uintreg...
}
sp
— 栈指针;pc
— 程序计数器;g
— 持有 runtime.gobuf 的 Goroutine;ret
— 系统调用的返回值;
这些内容会在调度器保存或者恢复上下文的时候用到,其中的栈指针和程序计数器会用来存储或者恢复寄存器中的值,改变程序即将执行的代码。
结构体 runtime.g 的 atomicstatus
字段存储了当前 Goroutine 的状态。除了几个已经不被使用的以及与 GC 相关的状态之外,Goroutine 可能处于以下 9 种状态:
状态 | 描述 |
---|---|
_Gidle | 刚刚被分配并且还没有被初始化 |
_Grunnable | 没有执行代码,没有栈的所有权,存储在运行队列中 |
_Grunning | 可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P |
_Gsyscall | 正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上 |
_Gwaiting | 由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上 |
_Gdead | 没有被使用,没有执行代码,可能有分配的栈 |
_Gcopystack | 栈正在被拷贝,没有执行代码,不在运行队列上 |
_Gpreempted | 由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒 |
_Gscan | GC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在 |
上述状态中比较常见是 _Grunnable
、_Grunning
、_Gsyscall
、_Gwaiting
和 _Gpreempted
五个状态,这里会重点介绍这几个状态。Goroutine 的状态迁移是个复杂的过程,触发 Goroutine 状态迁移的方法也很多,在这里我们也没有办法介绍全部的迁移路线,只会从中选择一些介绍。
虽然 Goroutine 在运行时中定义的状态非常多而且复杂,但是我们可以将这些不同的状态聚合成三种:等待中、可运行、运行中,运行期间会在这三种状态来回切换:
- 等待中:Goroutine 正在等待某些条件满足,例如:系统调用结束等,包括
_Gwaiting
、_Gsyscall
和_Gpreempted
几个状态; - 可运行:Goroutine 已经准备就绪,可以在线程运行,如果当前程序中有非常多的 Goroutine,每个 Goroutine 就可能会等待更多的时间,即
_Grunnable
; - 运行中:Goroutine 正在某个线程上运行,即
_Grunning
;
上图展示了 Goroutine 状态迁移的常见路径,其中包括创建 Goroutine 到 Goroutine 被执行、触发系统调用或者抢占式调度器的状态迁移过程。
M
Go 语言并发模型中的 M 是操作系统线程。调度器最多可以创建 10000 个线程,但是其中大多数的线程都不会执行用户代码(可能陷入系统调用),最多只会有 GOMAXPROCS
个活跃线程能够正常运行。
在默认情况下,运行时会将 GOMAXPROCS
设置成当前机器的核数,我们也可以在程序中使用 runtime.GOMAXPROCS 来改变最大的活跃线程数。
在默认情况下,一个四核机器会创建四个活跃的操作系统线程,每一个线程都对应一个运行时中的 runtime.m 结构体。
在大多数情况下,我们都会使用 Go 的默认设置,也就是线程数等于 CPU 数,默认的设置不会频繁触发操作系统的线程调度和上下文切换,所有的调度都会发生在用户态,由 Go 语言调度器触发,能够减少很多额外开销。
Go 语言会使用私有结构体 runtime.m 表示操作系统线程,这个结构体也包含了几十个字段,这里先来了解几个与 Goroutine 相关的字段:
type m struct {g0 *gcurg *g...
}
其中 g0 是持有调度栈的 Goroutine,curg
是在当前线程上运行的用户 Goroutine,这也是操作系统线程唯一关心的两个 Goroutine。
g0 是一个运行时中比较特殊的 Goroutine,它会深度参与运行时的调度过程,包括 Goroutine 的创建、大内存分配和 CGO 函数的执行。在后面的小节中,我们会经常看到 g0 的身影。
runtime.m 结构体中还存在三个与处理器相关的字段,它们分别表示正在运行代码的处理器 p
、暂存的处理器 nextp
和执行系统调用之前使用线程的处理器 oldp
:
type m struct {p puintptrnextp puintptroldp puintptr
}
除了在上面介绍的字段之外,runtime.m 还包含大量与线程状态、锁、调度、系统调用有关的字段,我们会在分析调度过程时详细介绍它们。
P
调度器中的处理器 P 是线程和 Goroutine 的中间层,它能提供线程需要的上下文环境,也会负责调度线程上的等待队列,通过处理器 P 的调度,每一个内核线程都能够执行多个 Goroutine,它能在 Goroutine 进行一些 I/O 操作时及时让出计算资源,提高线程的利用率。
因为调度器在启动时就会创建 GOMAXPROCS
个处理器,所以 Go 语言程序的处理器数量一定会等于 GOMAXPROCS
,这些处理器会绑定到不同的内核线程上。
runtime.p 是处理器的运行时表示,作为调度器的内部实现,它包含的字段也非常多,其中包括与性能追踪、垃圾回收和计时器相关的字段,这些字段也非常重要,但是在这里就不展示了,我们主要关注处理器中的线程和运行队列:
type p struct {m muintptrrunqhead uint32runqtail uint32runq [256]guintptrrunnext guintptr...
}
反向存储的线程维护着线程与处理器之间的关系,而 runqhead
、runqtail
和 runq
三个字段表示处理器持有的运行队列,其中存储着待执行的 Goroutine 列表,runnext
中是线程下一个需要执行的 Goroutine。
runtime.p 结构体中的状态 status
字段会是以下五种中的一种:
状态 | 描述 |
---|---|
_Pidle | 处理器没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空 |
_Prunning | 被线程 M 持有,并且正在执行用户代码或者调度器 |
_Psyscall | 没有执行用户代码,当前线程陷入系统调用 |
_Pgcstop | 被线程 M 持有,当前处理器由于垃圾回收被停止 |
_Pdead | 当前处理器已经不被使用 |
通过分析处理器 P 的状态,我们能够对处理器的工作过程有一些简单理解,例如处理器在执行用户代码时会处于 _Prunning
状态,在当前线程执行 I/O 操作时会陷入 _Psyscall
状态。
调度器启动
调度器的启动过程是我们平时比较难以接触的过程,不过作为程序启动前的准备工作,理解调度器的启动过程对我们理解调度器的实现原理很有帮助,运行时通过 runtime.schedinit 初始化调度器:
func schedinit() {_g_ := getg()...sched.maxmcount = 10000...sched.lastpoll = uint64(nanotime())procs := ncpuif n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {procs = n}if procresize(procs) != nil {throw("unknown runnable goroutine during bootstrap")}
}
在调度器初始函数执行的过程中会将 maxmcount
设置成 10000,这也就是一个 Go 语言程序能够创建的最大线程数,虽然最多可以创建 10000 个线程,但是可以同时运行的线程还是由 GOMAXPROCS
变量控制。
我们从环境变量 GOMAXPROCS
获取了程序能够同时运行的最大处理器数之后就会调用 runtime.procresize 更新程序中处理器的数量,在这时整个程序不会执行任何用户 Goroutine,调度器也会进入锁定状态,runtime.procresize 的执行过程如下:
- 如果全局变量
allp
切片中的处理器数量少于期望数量,会对切片进行扩容; - 使用
new
创建新的处理器结构体并调用 runtime.p.init 初始化刚刚扩容的处理器; - 通过指针将线程 m0 和处理器
allp[0]
绑定到一起; - 调用 runtime.p.destroy 释放不再使用的处理器结构;
- 通过截断改变全局变量
allp
的长度保证与期望处理器数量相等; - 将除
allp[0]
之外的处理器 P 全部设置成_Pidle
并加入到全局的空闲队列中;
调用 runtime.procresize 是调度器启动的最后一步,在这一步过后调度器会完成相应数量处理器的启动,等待用户创建运行新的 Goroutine 并为 Goroutine 调度处理器资源。
创建 Goroutine
想要启动一个新的 Goroutine 来执行任务时,我们需要使用 Go 语言的 go
关键字,编译器会通过 cmd/compile/internal/gc.state.stmt 和 cmd/compile/internal/gc.state.call 两个方法将该关键字转换成 runtime.newproc 函数调用:
func (s *state) call(n *Node, k callKind) *ssa.Value {if k == callDeferStack {...} else {switch {case k == callGo:call = s.newValue1A(ssa.OpStaticCall, types.TypeMem, newproc, s.mem())default:}}...
}
runtime.newproc 的入参是参数大小和表示函数的指针 funcval
,它会获取 Goroutine 以及调用方的程序计数器,然后调用 runtime.newproc1 函数获取新的 Goroutine 结构体、将其加入处理器的运行队列并在满足条件时调用 runtime.wakep 唤醒新的处理执行 Goroutine:
func newproc(siz int32, fn *funcval) {argp := add(unsafe.Pointer(&fn), sys.PtrSize)gp := getg()pc := getcallerpc()systemstack(func() {newg := newproc1(fn, argp, siz, gp, pc)_p_ := getg().m.p.ptr()runqput(_p_, newg, true)if mainStarted {wakep()}})
}
runtime.newproc1 会根据传入参数初始化一个 g
结构体,我们可以将该函数分成以下几个部分介绍它的实现:
- 获取或者创建新的 Goroutine 结构体;
- 将传入的参数移到 Goroutine 的栈上;
- 更新 Goroutine 调度相关的属性;
首先是 Goroutine 结构体的创建过程:
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {_g_ := getg()siz := nargsiz = (siz + 7) &^ 7_p_ := _g_.m.p.ptr()newg := gfget(_p_)if newg == nil {newg = malg(_StackMin)casgstatus(newg, _Gidle, _Gdead)allgadd(newg)}...
上述代码会先从处理器的 gFree
列表中查找空闲的 Goroutine,如果不存在空闲的 Goroutine,会通过 runtime.malg 创建一个栈大小足够的新结构体。
接下来,我们会调用 runtime.memmove 将 fn
函数的所有参数拷贝到栈上,argp
和 narg
分别是参数的内存空间和大小,我们在该方法中会将参数对应的内存空间整块拷贝到栈上:
...totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSizetotalSize += -totalSize & (sys.SpAlign - 1)sp := newg.stack.hi - totalSizespArg := spif narg > 0 {memmove(unsafe.Pointer(spArg), argp, uintptr(narg))}...
拷贝了栈上的参数之后,runtime.newproc1 会设置新的 Goroutine 结构体的参数,包括栈指针、程序计数器并更新其状态到 _Grunnable
并返回:
...memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))newg.sched.sp = spnewg.stktopsp = spnewg.sched.pc = funcPC(goexit) + sys.PCQuantumnewg.sched.g = guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched, fn)newg.gopc = callerpcnewg.startpc = fn.fncasgstatus(newg, _Gdead, _Grunnable)newg.goid = int64(_p_.goidcache)_p_.goidcache++return newg
}
我们在分析 runtime.newproc 的过程中,保留了主干省略了用于获取结构体的 runtime.gfget、runtime.malg、将 Goroutine 加入运行队列的 runtime.runqput 以及设置调度信息的过程,下面会依次分析这些函数。
初始化结构体
runtime.gfget 通过两种不同的方式获取新的 runtime.g:
- 从 Goroutine 所在处理器的
gFree
列表或者调度器的sched.gFree
列表中获取 runtime.g; - 调用 runtime.malg 生成一个新的 runtime.g 并将结构体追加到全局的 Goroutine 列表
allgs
中。
runtime.gfget 中包含两部分逻辑,它会根据处理器中 gFree
列表中 Goroutine 的数量做出不同的决策:
- 当处理器的 Goroutine 列表为空时,会将调度器持有的空闲 Goroutine 转移到当前处理器上,直到
gFree
列表中的 Goroutine 数量达到 32; - 当处理器的 Goroutine 数量充足时,会从列表头部返回一个新的 Goroutine;
func gfget(_p_ *p) *g {
retry:if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {for _p_.gFree.n < 32 {gp := sched.gFree.stack.pop()if gp == nil {gp = sched.gFree.noStack.pop()if gp == nil {break}}_p_.gFree.push(gp)}goto retry}gp := _p_.gFree.pop()if gp == nil {return nil}return gp
}
当调度器的 gFree
和处理器的 gFree
列表都不存在结构体时,运行时会调用 runtime.malg 初始化新的 runtime.g 结构,如果申请的堆栈大小大于 0,这里会通过 runtime.stackalloc 分配 2KB 的栈空间:
func malg(stacksize int32) *g {newg := new(g)if stacksize >= 0 {stacksize = round2(_StackSystem + stacksize)newg.stack = stackalloc(uint32(stacksize))newg.stackguard0 = newg.stack.lo + _StackGuardnewg.stackguard1 = ^uintptr(0)}return newg
}
runtime.malg 返回的 Goroutine 会存储到全局变量 allgs
中。
简单总结一下,runtime.newproc1 会从处理器或者调度器的缓存中获取新的结构体,也可以调用 runtime.malg 函数创建。
运行队列
runtime.runqput 会将 Goroutine 放到运行队列上,这既可能是全局的运行队列,也可能是处理器本地的运行队列:
func runqput(_p_ *p, gp *g, next bool) {if next {retryNext:oldnext := _p_.runnextif !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {goto retryNext}if oldnext == 0 {return}gp = oldnext.ptr()}
retry:h := atomic.LoadAcq(&_p_.runqhead)t := _p_.runqtailif t-h < uint32(len(_p_.runq)) {_p_.runq[t%uint32(len(_p_.runq))].set(gp)atomic.StoreRel(&_p_.runqtail, t+1)return}if runqputslow(_p_, gp, h, t) {return}goto retry
}
- 当
next
为true
时,将 Goroutine 设置到处理器的runnext
作为下一个处理器执行的任务; - 当
next
为false
并且本地运行队列还有剩余空间时,将 Goroutine 加入处理器持有的本地运行队列; - 当处理器的本地运行队列已经没有剩余空间时就会把本地队列中的一部分 Goroutine 和待加入的 Goroutine 通过 runtime.runqputslow 添加到调度器持有的全局运行队列上;
处理器本地的运行队列是一个使用数组构成的环形链表,它最多可以存储 256 个待执行任务。
简单总结一下,Go 语言有两个运行队列,其中一个是处理器本地的运行队列,另一个是调度器持有的全局运行队列,只有在本地运行队列没有剩余空间时才会使用全局队列。
调度信息
运行时创建 Goroutine 时会通过下面的代码设置调度相关的信息,前两行代码会分别将程序计数器和 Goroutine 设置成 runtime.goexit 和新创建 Goroutine 运行的函数:
...newg.sched.pc = funcPC(goexit) + sys.PCQuantumnewg.sched.g = guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched, fn)...
上述调度信息 sched
不是初始化后的 Goroutine 的最终结果,它还需要经过 runtime.gostartcallfn 和 runtime.gostartcall 的处理:
func gostartcallfn(gobuf *gobuf, fv *funcval) {gostartcall(gobuf, unsafe.Pointer(fv.fn), unsafe.Pointer(fv))
}func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {sp := buf.spif sys.RegSize > sys.PtrSize {sp -= sys.PtrSize*(*uintptr)(unsafe.Pointer(sp)) = 0}sp -= sys.PtrSize*(*uintptr)(unsafe.Pointer(sp)) = buf.pcbuf.sp = spbuf.pc = uintptr(fn)buf.ctxt = ctxt
}
调度信息的 sp
中存储了 runtime.goexit 函数的程序计数器,而 pc
中存储了传入函数的程序计数器。因为 pc
寄存器的作用就是存储程序接下来运行的位置,所以 pc
的使用比较好理解,但是 sp
中存储的 runtime.goexit 会让人感到困惑,我们需要配合下面的调度循环来理解它的作用。
调度循环
调度器启动之后,Go 语言运行时会调用 runtime.mstart 以及 runtime.mstart1,前者会初始化 g0 的 stackguard0
和 stackguard1
字段,后者会初始化线程并调用 runtime.schedule 进入调度循环:
func schedule() {_g_ := getg()top:var gp *gvar inheritTime boolif gp == nil {if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {lock(&sched.lock)gp = globrunqget(_g_.m.p.ptr(), 1)unlock(&sched.lock)}}if gp == nil {gp, inheritTime = runqget(_g_.m.p.ptr())}if gp == nil {gp, inheritTime = findrunnable()}execute(gp, inheritTime)
}
runtime.schedule 函数会从下面几个地方查找待执行的 Goroutine:
- 为了保证公平,当全局运行队列中有待执行的 Goroutine 时,通过
schedtick
保证有一定几率会从全局的运行队列中查找对应的 Goroutine; - 从处理器本地的运行队列中查找待执行的 Goroutine;
- 如果前两种方法都没有找到 Goroutine,会通过 runtime.findrunnable 进行阻塞地查找 Goroutine;
runtime.findrunnable 的实现非常复杂,这个 300 多行的函数通过以下的过程获取可运行的 Goroutine:
- 从本地运行队列、全局运行队列中查找;
- 从网络轮询器中查找是否有 Goroutine 等待运行;
- 通过 runtime.runqsteal 尝试从其他随机的处理器中窃取待运行的 Goroutine,该函数还可能窃取处理器的计时器;
因为函数的实现过于复杂,上述的执行过程是经过简化的,总而言之,当前函数一定会返回一个可执行的 Goroutine,如果当前不存在就会阻塞等待。
接下来由 runtime.execute 执行获取的 Goroutine,做好准备工作后,它会通过 runtime.gogo 将 Goroutine 调度到当前线程上。
func execute(gp *g, inheritTime bool) {_g_ := getg()_g_.m.curg = gpgp.m = _g_.mcasgstatus(gp, _Grunnable, _Grunning)gp.waitsince = 0gp.preempt = falsegp.stackguard0 = gp.stack.lo + _StackGuardif !inheritTime {_g_.m.p.ptr().schedtick++}gogo(&gp.sched)
}
runtime.gogo 在不同处理器架构上的实现都不同,但是也都大同小异,下面是该函数在 386 架构上的实现:
TEXT runtime·gogo(SB), NOSPLIT, $8-4MOVL buf+0(FP), BX // 获取调度信息MOVL gobuf_g(BX), DXMOVL 0(DX), CX // 保证 Goroutine 不为空get_tls(CX)MOVL DX, g(CX)MOVL gobuf_sp(BX), SP // 将 runtime.goexit 函数的 PC 恢复到 SP 中MOVL gobuf_ret(BX), AXMOVL gobuf_ctxt(BX), DXMOVL $0, gobuf_sp(BX)MOVL $0, gobuf_ret(BX)MOVL $0, gobuf_ctxt(BX)MOVL gobuf_pc(BX), BX // 获取待执行函数的程序计数器JMP BX // 开始执行
它从 runtime.gobuf 中取出了 runtime.goexit 的程序计数器和待执行函数的程序计数器,其中:
- runtime.goexit 的程序计数器被放到了栈 SP 上;
- 待执行函数的程序计数器被放到了寄存器 BX 上;
在函数调用一节中,我们曾经介绍过 Go 语言的调用惯例,正常的函数调用都会使用 CALL
指令,该指令会将调用方的返回地址加入栈寄存器 SP 中,然后跳转到目标函数;当目标函数返回后,会从栈中查找调用的地址并跳转回调用方继续执行剩下的代码。
runtime.gogo 就利用了 Go 语言的调用惯例成功模拟这一调用过程,通过以下几个关键指令模拟 CALL
的过程:
MOVL gobuf_sp(BX), SP // 将 runtime.goexit 函数的 PC 恢复到 SP 中MOVL gobuf_pc(BX), BX // 获取待执行函数的程序计数器JMP BX // 开始执行
上图展示了调用 JMP
指令后的栈中数据,当 Goroutine 中运行的函数返回时,程序会跳转到 runtime.goexit 所在位置执行该函数:
TEXT runtime·goexit(SB),NOSPLIT,$0-0CALL runtime·goexit1(SB)func goexit1() {mcall(goexit0)
}
经过一系列复杂的函数调用,我们最终在当前线程的 g0 的栈上调用 runtime.goexit0 函数,该函数会将 Goroutine 转换会 _Gdead
状态、清理其中的字段、移除 Goroutine 和线程的关联并调用 runtime.gfput 重新加入处理器的 Goroutine 空闲列表 gFree
:
func goexit0(gp *g) {_g_ := getg()casgstatus(gp, _Grunning, _Gdead)gp.m = nil...gp.param = nilgp.labels = nilgp.timer = nildropg()gfput(_g_.m.p.ptr(), gp)schedule()
}
在最后 runtime.goexit0 会重新调用 runtime.schedule 触发新一轮的 Goroutine 调度,Go 语言中的运行时调度循环会从 runtime.schedule 开始,最终又回到 runtime.schedule,我们可以认为调度循环永远都不会返回。
这里介绍的是 Goroutine 正常执行并退出的逻辑,实际情况会复杂得多,多数情况下 Goroutine 在执行的过程中都会经历协作式或者抢占式调度,它会让出线程的使用权等待调度器的唤醒。
小结
Goroutine 和调度器是 Go 语言能够高效地处理任务并且最大化利用资源的基础,本节介绍了 Go 语言用于处理并发任务的 G - M - P 模型,我们不仅介绍了它们各自的数据结构以及常见状态,还通过特定场景介绍调度器的工作原理以及不同数据结构之间的协作关系,相信能够帮助各位读者理解调度器的实现。