本文用于记录 Go 语言运行时及调度器方面源码的学习笔记。
启动过程
使用 gdb 调试程序,在 macOS 下注意 build 时使用增加 -ldflags=-compressdwarf=false
参数,并且自建证书给 gdb。
寻找入口
使用 info files
查看执行文件,使用 breakpoint
定位 entry point 所在的文件位置,确定入口文件。
(gdb) info files
Symbols from "/Users/bailian/GoProject/go-build/go-build".
Local exec file:
`/Users/bailian/GoProject/go-build/go-build', file type mach-o-x86-64.
Entry point: 0x1063f40
0x0000000001001000 - 0x00000000010a6f0a is .text
......
(gdb) b *0x1063f40
Breakpoint 2 at 0x1063f40: file /Users/bailian/GoProject/go/src/runtime/rt0_darwin_amd64.s, line 8.
Go 使用的 plan9 汇编语言......
可以在汇编文件中看到执行程序的初始化流程:
// rt0 其实是 runtime0 的缩写,意为运行时的创生,随后所有创建的都是 1 为后缀。
// 操作系统通过入口参数的约定与应用程序进行沟通,为了支持从系统给运行时传递参数,Go 程序 在进行引导时将对这部分参数进行处理。
// 程序刚刚启动时,栈指针 SP 的前两个值分别对应 argc 和 argv,分别存储参数的数量和具体的参数的值
TEXT _rt0_amd64(SB),NOSPLIT,$-8
MOVQ 0(SP), DI // argc
LEAQ 8(SP), SI // argv
JMP runtime·rt0_go(SB)
TEXT runtime·rt0_go(SB),NOSPLIT,$0
// 在偶数堆栈上向前复制参数
MOVQ DI, AX // argc
MOVQ SI, BX // argv
SUBQ $(4*8+7), SP // 2args 2auto
ANDQ $~15, SP
MOVQ AX, 16(SP)
MOVQ BX, 24(SP)
// 初始化 g0 执行栈
MOVQ $runtime·g0(SB), DI
LEAQ (-64*1024+104)(SP), BX
MOVQ BX, g_stackguard0(DI)
MOVQ BX, g_stackguard1(DI)
MOVQ BX, (g_stack+stack_lo)(DI)
MOVQ SP, (g_stack+stack_hi)(DI)
// 确定 CPU 处理器的信息
MOVL $0, AX
CPUID
MOVL AX, SI
CMPL AX, $0
JE nocpuinfo
......
needtls:
#ifdef GOOS_darwin
// Darwin 系统跳过 TLS 设置
JMP ok
#endif
// 设置 TLS 伪寄存器
LEAQ runtime·m0+m_tls(SB), DI // DI = m0.tls
CALL runtime·settls(SB) // 将 TLS 地址设置到 DI
// 使用它进行存储,确保能正常运行
get_tls(BX)
MOVQ $0x123, g(BX)
MOVQ runtime·m0+m_tls(SB), AX
CMPQ AX, $0x123
JEQ 2(PC) // 跳转到下面的 get_tls 指令
CALL runtime·abort(SB)
ok:
// 程序刚刚启动,此时位于主线程
// 当前栈与资源保存在 g0
// 该线程保存在 m0
get_tls(BX)
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX
// g0 和 m0 是一组全局变量,在程序运行之初就已经存在。 除了程序参数外,会首先将 m0 与 g0 通过指针互相关联。
// save m->g0 = g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
// 在正式初始化运行时组件之前,还需要做一些校验和系统级的初始化工作,这包括:运行时类型检查, 系统参数的获取以及影响内存管理和程序调度的相关常量的初始化。
CLD // convention is D is always left cleared
CALL runtime·check(SB) // 运行时类型检查。 其本质上基本上属于对编译器翻译工作的一个校验,显然如果编译器的编译工作 不正确,运行时的运行过程便不是一个有效的过程。
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
// argc, argv 作为来自操作系统的参数传递给 args 处理程序参数的相关事宜。
CALL runtime·args(SB)
// 系统初始化
CALL runtime·osinit(SB)
// 进行各种运行时组件初始化工作,这包括我们的调度器与内存分配器、回收器的初始化
CALL runtime·schedinit(SB)
// create a new goroutine to start program
// 将入口函数作为参数,准备传递给第一个 G
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
PUSHQ $0 // 参数大小
// 新建 goroutine,将参数传入
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// 启动 M
CALL runtime·mstart(SB)
CALL runtime·abort(SB) // mstart should never return
......
// 全局变量 声明 runtime.mainPC 地址为 runtime.main 函数地址,RODATA read only data
DATA runtime·mainPC+0(SB)/8,$runtime·main(SB)
GLOBL runtime·mainPC(SB),RODATA,$8
初始化
args
args
函数将参数指针保存到了 argc
和 argv
这两个全局变量中, 供其他初始化函数使用,而后调用了平台特定的 sysargs
。 对于 Darwin 系统而言,只负责获取程序的 executable_path
。这个参数用于设置 os
包中的 executablePath
变量。
func sysargs(argc int32, argv **byte) {
// skip over argv, envv and the first string will be the path
n := argc + 1
for argv_index(argv, n) != nil {
n++
}
executablePath = gostringnocopy(argv_index(argv, n+1))
// strip "executable_path=" prefix if available, it's added after OS X 10.11.
const prefix = "executable_path="
if len(executablePath) > len(prefix) && executablePath[:len(prefix)] == prefix {
executablePath = executablePath[len(prefix):]
}
}
而在 Linux 平台中,这个过程就变得复杂起来了。 与 Darwin 使用 mach-o
不同,Linux 使用 ELF 格式 [Matz et al. 2014]。 ELF 除了 argc, argv, envp 之外,会携带辅助向量(auxiliary vector) 将某些内核级的信息传递给用户进程,例如内存物理页大小。因此对于 Linux 而言,物理页大小在 sysargs
中便能直接完成初始化。
osinit
osinit
完成对 CPU 核心数的获取,因为这与调度器有关。 而 Darwin 上由于使用的是 mach-o
格式,在此前的 sysargs
上 还没有确定内存页的大小,因而在这个函数中,还会额外使用 sysctl
完成物理页大小的查询。
var ncpu int32
// Linux
func osinit() {
ncpu = getproccount()
}
// Darwin
func osinit() {
ncpu = getncpu()
physPageSize = getPageSize() // 内部使用 sysctl 来获取物理页大小.
}
Darwin
从操作系统发展来看,是从 NeXTSTEP 和 FreeBSD 2.x 发展而来的后代, macOS 系统调用的特殊之处在于它提供了两套调用接口,一个是 Mach 调用,另一个则是 POSIX 调用。 Mach 是 NeXTSTEP 遗留下来的产物,其 BSD 层本质上是对 Mach 内核的一层封装。 尽管用户态进程可以直接访问 Mach 调用,但出于通用性的考虑, 物理页大小获取的方式是通过 POSIXsysctl
这个系统调用进行获取 [Bacon, 2007]。事实上
Linux
与Darwin
下的系统调用如何参与到 Go 程序中去稍有不同,我们暂时不做深入讨论,留到以后再统一分析。
可以看出,对运行时最为重要的两个系统级参数:CPU 核心数与内存物理页大小。
schedinit
schedinit
函数名表面上是调度器的初始化,但实际上它包含了所有核心组件的初始化工作。
关于执行栈:[[Go 栈笔记]]
关于内存分配器组件:[[Go 内存分配器]]
func schedinit() {
_g_ := getg()
......
// 设置最大系统线程数量(M)
sched.maxmcount = 10000
// 初始化 skipPC,用于 traceback。
tracebackinit()
// 验证链接器(linker)的模块数据正确性
moduledataverify()
// 执行栈的初始化,将 stackpool 与 stackLarge 的双向链表置为 nil
stackinit()
// 内存分配器的初始化:初始化堆、分配 mcache
mallocinit()
// 初始化当前系统线程 M:通过 schedt.mnext 获得 id 及 m.gsignal 的初始化(一个栈大小为 32KB 的 G)
mcommoninit(_g_.m)
// cpu 相关初始化
cpuinit() // must run before alginit
alginit() // maps must not be used before this call
// 模块加载相关初始化
modulesinit() // provides activeModules
typelinksinit() // uses maps, activeModules
itabsinit() // uses activeModules
msigsave(_g_.m)
initSigmask = _g_.m.sigmask
// 处理用户参数及环境变量
goargs()
goenvs()
// 处理调试相关环境变量
parsedebugvars()
// 垃圾回收器初始化
gcinit()
// 初始化网络轮询时间
sched.lastpoll = uint64(nanotime())
// 设置 processor 数量,处理用户 GOMAXPROCS 环境变量
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
// 调整 P 的数量,初始化 P,会导致 STW,在运行时调用 runtime.GOMAXPROCS() 也是最终执行这个方法
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
......
}
我们最感兴趣的三大运行时组件在如下函数签名中进行大量初始化工作:
stackinit()
goroutine 执行栈初始化mallocinit()
内存分配器初始化mcommoninit()
系统线程的部分初始化工作gcinit()
垃圾回收器初始化procresize()
根据 CPU 核心数,初始化系统线程的本地缓存
main goroutine
runtime.main
已经在 newproc
时作为一个 G 被放入 P 中,会在 mstart
启动 schedule
后被调度执行
// 关于 go linkname
//go:linkname localname [importpath.name] 简单来说通过这种机制,可以实现调用其他包不能导出的内容。
//go:linkname runtime_inittask runtime..inittask
var runtime_inittask initTask
// 可以看到这里链接的是 main..inittask 变量,但我们自己写的 main 包中并没有这个变量,它是编译器生成的。
// cmd/compile/internal/gc.fninit 函数中有实现过程
//go:linkname main_inittask main..inittask
var main_inittask initTask
//go:linkname main_main main.main
func main_main()
func main() {
......
// 规定栈最大限制,64 位系统最大 1GB,32 位系统最大 250 MB
if sys.PtrSize == 8 {
maxstacksize = 1000000000
} else {
maxstacksize = 250000000
}
// 允许新建 G 时可以启动新 M
mainStarted = true
// 非 wasm 程序启动系统监控(定期垃圾回收、并发任务调度)
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
newm(sysmon, nil)
})
}
// 将 main goroutine 锁在主 OS 线程下运行,有些程序需要
lockOSThread()
......
// 执行 runtime init
doInit(&runtime_inittask) // must be before defer
......
// 启动 GC
gcenable()
......
// 执行 main 包和 import 包的 init 函数
doInit(&main_inittask)
......
// 执行 main.main
fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
fn()
......
// main 执行结束后直接退出
exit(0)
for {
var x *int32
*x = 0
}
}
调度器
基本结构
M:Machine,是对于系统线程的抽象。
P:Processor 的抽象,它主要是提供了 G 的本地队列,用于减少全局锁,提高性能。
G:Goroutine,使用 go
关键字创建的执行体。本质上是需要执行的函数体的抽象,将需要执行的函数参数进行拷贝,保存了函数体的入口地址,用于执行。
调度器 sched
- 管理了能够将 G 和 M 绑定的 M 队列
- 管理了空闲的 P 队列(链表)
- 管理了 runnable G 全局队列
- 管理了即将进入 runnable 状态的(dead 状态)G 的队列
- 管理了发生阻塞的 G 的队列
- 管理了 defer 调用池
- 管理了 GC 和系统监控的信号
- 管理了需要在 safe point 时执行的函数
- 统计了(极少发生的)动态调整 P 所花的时间
初始化 schedinit
调度器的初始化过程:M(mcommoninit)-->P(procresize)-->G(newproc),它们分别负责初始化 M 资源池(allm)、P 资源池(allp)、G 的运行现场(g.sched)以及调度队列(p.runq)。
M 的初始化
M 只有两个状态:自旋、非自旋。在调度器初始化阶段,只有一个 M,就是主 OS 线程,因此不涉及状态部分,只有对 M 的初步初始化及信号部分处理。
P初始化
通常情况下(在程序运行时不调整 P 的个数),P 只会在四种状态下进行切换。当程序刚开始初始化时,所有的 P 都处于 _Pgcstop
状态,随着 P 的初始化 runtime.procresize
,会被置为 _Pidle
。如果是非初始化阶段调用 runtime.procresize
,当前 P 状态会被置为 _Prunning
。
当 M 需要运行时,会 runtime.acquirep
绑定 P,状态变为 _Prunning
。通过 runtime.releasep
来释放,状态变为 _Pidle
。
runtime.entersyscall
时,P 的状态变为 _Psyscall
,runtime.exitsyscall
后,状态变为 _Pidel
。
如果发生 GC,会在 stopTheWorld
时,状态变为 _Pgcstop
,startTheWorld
后通过 procresize
状态会变为 _Prunning
或 _Pidel
(其他 P)。
在运行中调用 runtime.GOMAXPROCS()
后,会调整 gomaxprocs
的值,procresize
中,如果 nprocs
大于 old
则新创建 P,状态为 _Pidel
。如果是收缩(小于原有 P 数量),则会将多出的 P 状态改为 _Pdead
,这是中间态,它会在下一次 gomaxprocs
增加时继续复用。
P 初始化的主要流程都在 procresize
中:
// 调用之前需要先 STW,并且 sched locked。
func procresize(nprocs int32) *p {
// 获取当前 P 数量
old := gomaxprocs
......
// 更新统计信息,记录此次修改时间
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// 这里只有在用户调用了 runtime.GOMAXPROCS 并且参数大于原有 P 数量才会进入
if nprocs > int32(len(allp)) {
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
// P 不会被释放,始终存在 allp 的底层数组中,cap 代表 P 的最大值
if nprocs <= int32(cap(allp)) {
// 如果 nprocs 仍然小于最大的 P 值,就复用一定数量的 P
allp = allp[:nprocs]
} else {
// 如果超过了最大值,就创建更多的 P,定义 cap 的值,为最大 P 数量
nallp := make([]*p, nprocs)
// 将原有的 P copy 复用
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}
// 初始化新的 P,扩容和程序初运行时都会进入
for i := old; i < nprocs; i++ {
pp := allp[i]
// pp 在复用 _Pdead P 时不等于 nil,所以不用新创建
if pp == nil {
pp = new(p)
}
// 初始化 pp,将 P.id 与 allp 的索引绑定,当前状态为 _Pgcstop
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// 如果当前 P 不在收缩范围内,则将当前 P 状态置为 _Prunning
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else {
// 如果当前 P 在收缩范围中,则解除与当前 M 的绑定,换为与 allp[0] 绑定
if _g_.m.p != 0 {
if trace.enabled {
traceGoSched()
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p.ptr().m = 0
}
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
acquirep(p)
if trace.enabled {
traceGoStart()
}
}
// 释放掉多余 P 的相关资源,但保留 P 本身,将状态置为 _Pdead 等待复用
for i := nprocs; i < old; i++ {
p := allp[i]
p.destroy()
// can't free P itself because it can be referenced by an M in syscall
}
// 修剪 allp,保留 cap 与底层数组
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
// 当前 P 已经处理
if _g_.m.p.ptr() == p {
continue
}
p.status = _Pidle
if runqempty(p) {
// 将没有本地任务的 P 放入 idel 链表
pidleput(p)
} else {
// 有本地任务的 P,为其绑定一个 M
p.m.set(mget())
p.link.set(runnablePs)
// 放入当前链表
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
// 将 gomaxprocs 值设置为 nprocs
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
// 返回由本地任务的 P 链表
return runnablePs
}
G 初始化
运行完 runtime.procresize
之后,就是使用 runtime.newproc
来完成 main goroutine
的初始化,并且放入调度器中运行。
// CALL runtime·newproc(SB)
// 上面汇编代码中将 main goroutine 作为 fn 传入了 newproc
func newproc(siz int32, fn *funcval) {
// 得到参数的内存地址
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, gp, pc)
})
}
// 创建一个运行 fn 的 G,具有 narg 字节大小的参数,从 argp 开始。
// callerps 是 go 语句的起始地址,也就是 G 的调用地址,新创建的 G 会被放入 G 的队列红等待运行。
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
// 获得当前 G,初始化时是 g0
_g_ := getg()
......
// 禁止当前 m 被抢占
acquirem() // disable preemption because it can be holding p in a local var
siz := narg
siz = (siz + 7) &^ 7
// 参数不应该超过 G 的初始栈大小:2KB
if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
throw("newproc: function arguments too large for new goroutine")
}
......
// 得到当前 P
_p_ := _g_.m.p.ptr()
// 尝试得到一个可用的 G(G 状态为 _Gdead 时可复用),会先寻找当前 P 的 gFree 链表,如果没有去全局的 gFree 链表获取。
newg := gfget(_p_)
// 初始化时找不到,运行时可能已被耗尽
if newg == nil {
// 创建一个最小栈的 G,当前版本:_StackMin = 2048 2KB
newg = malg(_StackMin)
// 将新 G 状态由 _Gidle 置为 _Gdead
casgstatus(newg, _Gidle, _Gdead)
// allg 是存放运行时所有的 G 的列表,此时将 _Gdead 状态的 G 添加至 allg ,是防止 GC 扫描打扫未初始化的栈
allgadd(newg)
}
......
// 计算运行空间大小,对齐
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
totalSize += -totalSize & (sys.SpAlign - 1)
// 确定 sp 和参数入栈位置
sp := newg.stack.hi - totalSize
spArg := sp
......
// 处理 G 的参数,当有参数时,需要将参数拷贝到 G 的执行栈中
if narg > 0 {
// 从 argp 参数开始的位置,复制 narg 个字节到 spArg
memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
// 栈到栈的拷贝,涉及到写屏障,学完 GC 回来再看
if writeBarrier.needed && !_g_.m.curg.gcscandone {
f := findfunc(fn.fn)
stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
if stkmap.nbit > 0 {
// We're in the prologue, so it's always stack map index 0.
bv := stackmapdata(stkmap, 0)
bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
}
}
}
// 清理并初始化 G 的运行现场,因为有可能得到复用的 G
// g.sched 是 gobuf 结构,用于保存上下文
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 看起来像是在这里运行了 fn,其实没有,需要等到调度器执行,后面有详细理解
gostartcallfn(&newg.sched, fn)
// 初始化 G 的基本状态
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
.....
// GC 运行周期,初始化时为 false,不可以被扫。如果 G 自上次扫描后未运行,则为 true,也就是标记可以被 GC 扫描
newg.gcscanvalid = false
// 将 G 的状态从 _Gdead 置为 _Grunable
casgstatus(newg, _Gdead, _Grunnable)
// P 维护了一个 G id 缓存列表,每次都会获取 _GoidCacheBatch(当前版本16) 个 id,放入自身的列表,性能优化吧。这里是判断是否用完了,用完了就再取一批。
if _p_.goidcache == _p_.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
// 设置 id,增加 id 缓存信息
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
......
// 将创建好的 G 放入 P 中,先放本地,满了进全局。
// true 表示放入执行队列的下一个,false 表示放入队尾
runqput(_p_, newg, true)
// 如果有空闲的 P,并且没有自旋中的 M,则直接唤醒 P
// 初始化时 mainStarted 为 fasle,所以不可以
// 什么情况会有空闲 P,但没有自旋中的 M?
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
releasem(_g_.m)
}
关于 gostartcallfn
:
// 获取了传入 fv 的入口地址
func gostartcallfn(gobuf *gobuf, fv *funcval) {
var fn unsafe.Pointer
if fv != nil {
fn = unsafe.Pointer(fv.fn)
} else {
fn = unsafe.Pointer(funcPC(nilfunc))
}
gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
// 将 fn 与 fv 保存至 g.sched buf 中
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
sp := buf.sp
if sys.RegSize > sys.PtrSize {
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = 0
}
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = buf.pc
buf.sp = sp
buf.pc = uintptr(fn)
buf.ctxt = ctxt
}
关于 runqput
:
func runqput(_p_ *p, gp *g, next bool) {
......
// 插入下一个
if next {
retryNext:
oldnext := _p_.runnext
// 通过原子操作将 _p_.runnext 的值替换为 gp
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
// 如果原本就没有 oldnext,直接就返回了
if oldnext == 0 {
return
}
// 将原有的 next G 作为新的 G,继续添加
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
// 本地队列未满则入队
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 满了则放进全局队列,还会带走一半的本地队列,性能优化吧
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
调度循环
启动前
在启动调度器以前,需要确定 G 的栈边界,也就是栈高位指针和低位指针。
func mstart() {
// 在初始化时获取到的是 g0,也就是系统栈,每个 M 都有一个系统栈。系统栈主要用于 runtime 的程序逻辑。系统栈大小固定,是程序设计时算好的。
_g_ := getg()
// 验证当前 g0 栈是否已初始化,不同系统的处理方式不一致。
// m0 的 g0 已经在汇编中初始化,所以不用进入。而后创建的 M,如果属于操作系统分配的栈,则需要在这里确定栈边界
osStack := _g_.stack.lo == 0
if osStack {
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
// 为什么要扣除 1KB 的空间?
_g_.stack.lo = _g_.stack.hi - size + 1024
}
// 初始化栈 guard,用于栈溢出检测
// 进而可以同时调用 Go 或 C 函数
_g_.stackguard0 = _g_.stack.lo + _StackGuard
_g_.stackguard1 = _g_.stackguard0
// 启动 M
mstart1()
// 这里应该就是处理 m0.g0 属于操作系统分配栈的逻辑
if GOOS == "windows" || GOOS == "solaris" || GOOS == "illumos" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "aix" {
// 由于 windows, solaris, darwin, aix 和 plan9 总是系统分配的栈,在 mstart 之前放进 _g_.stack 的
// 因此上面的逻辑还没有设置 osStack。
osStack = true
}
mexit(osStack)
}
启动调度器
func mstart1() {
_g_ := getg()
......
// 为了在 mcall 的栈顶使用调用方来结束当前线程,做记录
// 当进入 schedule 之后,我们再也不会回到 mstart1,所以其他调用可以复用当前帧。
save(getcallerpc(), getcallersp())
asminit()
minit()
// 设置信号 handler;在 minit 之后,因为 minit 可以准备处理信号的的线程
if _g_.m == &m0 {
mstartm0()
}
// M 的启动函数,m0 没有 fn
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
// 如果当前 M 不是 m0,需要绑定 P
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
// m 开始进入调度,永不返回
schedule()
}
M 与 P 的绑定
很简单,就是将 m.p 绑定 P 的指针,p.m 绑定 M 的指针,绑定前 P 的状态要求是 _Pidel
,绑定后变为 _Prunning
。
schedule 永不返回
调度循环 schedule
无法返回,因此最后一个 mexit
目前还不会被执行,因此当下所有的 Go 程序创建的线程都无法被释放 (只有一个特例,当使用 runtime.LockOSThread
锁住的 G 退出时会使用 gogo
退出 M)。
调度逻辑
schedule
开始就正式进行调度,下面是核心调度逻辑:
func schedule() {
// g0
_g_ := getg()
......
// m.lockedg 会在 runtime.LockOSThread 下变为非零
if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}
......
top:
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _g_.m.p.ptr().runSafePointFn != 0 {
runSafePointFn()
}
var gp *g
var inheritTime bool
......
// 如果正在 GC,去找 GC 中的 G
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}
// 每隔 61 次优先取全局队列的 G,防止饿死
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 尝试获取 G,用于验证 M 是否处于自旋状态但取不到 G
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
// 在此 M 进入自旋,持续寻找可用 G,并阻塞
if gp == nil {
gp, inheritTime = findrunnable()
}
// 这时一定取到 G 了
if _g_.m.spinning {
// 将会把 M 标记为非自旋状态,如果标记后,没有自旋状态中的 M,并且还有 Pidel 链表中还有空闲的 P,需要新启动一个 M。M 有可能死亡
resetspinning()
}
......
execute(gp, inheritTime)
}
G 的运行
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// 将 G 切换为 _Grunning 状态
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
// 抢占信号 信号在后面了解
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
// 绑定至当前 M
_g_.m.curg = gp
gp.m = _g_.m
......
// 开始执行 G 中的函数
gogo(&gp.sched)
}
gogo
的实现
TEXT runtime·gogo(SB), NOSPLIT, $16-8
MOVQ buf+0(FP), BX // 运行现场
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // 确认 g != nil
get_tls(CX)
MOVQ DX, g(CX)
MOVQ gobuf_sp(BX), SP // 恢复 SP
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_bp(BX), BP
MOVQ $0, gobuf_sp(BX) // 清理,辅助 GC
MOVQ $0, gobuf_ret(BX)
MOVQ $0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
MOVQ gobuf_pc(BX), BX // 获取 G 要执行的函数的入口地址
JMP BX // 开始执行
在这里看似是 JMP BX
后就结束执行了,没有后续操作。但其实在前面有对其 PC
进行巧妙的处理。
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
......
siz := narg
siz = (siz + 7) &^ 7
......
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
sp := newg.stack.hi - totalSize
spArg := sp
......
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
// 将 goexit 作为 PC 存入 gobuf
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 在这里对 gobuf 进行处理
gostartcallfn(&newg.sched, fn)
......
}
看下 gostartcallfn
的处理:
func gostartcallfn(gobuf *gobuf, fv *funcval) {
var fn unsafe.Pointer
if fv != nil {
fn = unsafe.Pointer(fv.fn)
} else {
fn = unsafe.Pointer(funcPC(nilfunc))
}
gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
// x86
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
// 原 sp
sp := buf.sp
if sys.RegSize > sys.PtrSize {
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = 0
}
// sp 地址下移以适应新的布局
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = buf.pc
buf.sp = sp
// 还原 pc 为 fn,也就是原函数地址
buf.pc = uintptr(fn)
buf.ctxt = ctxt
}
在不同架构下的 gostartcall
的处理也不一样,这里是 x86 架构下的处理。但是想要的效果都是一样的,也就是拆解 CALL
指令,先手动将 goexit
压入栈,然后 JMP
至 fn,等待 fn 运行完成,执行 RET
指令时,自然会将 goexit
出栈,放入 PC
寄存器。
这也是上面的 gogo
为什么没有使用 CALL
而是使用了 JMP
,使用 CALL
命令 cpu 会将 PC(下一条指令)
压入栈中,并 JMP
。而直接 JMP
,等待 ret
时就会将 goexit
恢复到 PC
,从而达到执行 goexit
的目的。
接下来就是去执行 goexit
了:
// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
// traceback from goexit1 must hit code range of goexit
BYTE $0x90 // NOP
接下来是 goexit1
:
func goexit1() {
......
// 通过 mcall 调用 goexit0
mcall(goexit0)
}
mcall
主要是更改执行栈为 m.g0
在系统栈中执行调用,接下来看 goexit0
:
func goexit0(gp *g) {
// 此时已经是 g0
_g_ := getg()
// 将 G 状态变为 _Gdead
casgstatus(gp, _Grunning, _Gdead)
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
// 清理
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.paniconfault = false
gp._defer = nil // 应该已经为 true,但是以防万一
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
gp.gcAssistBytes = 0
}
// 现在可以对 G 进行栈扫描,因为它已经没有栈了
gp.gcscanvalid = true
dropg()
if GOARCH == "wasm" { // wasm 目前还没有线程
// 将 G 放入 gfree 链表中等待复用
gfput(_g_.m.p.ptr(), gp)
schedule() // 再次进行调度
}
......
// 将 G 放入 gfree 链表中等待复用
gfput(_g_.m.p.ptr(), gp)
if locked {
// 这个 G 有可能在当前线程上锁住,这个时候需要 kill 线程,而不是将 M 放回线程池
// 这个操作会返回 mstart,从而释放当前 P 并退出该线程
if GOOS != "plan9" { // See golang.org/issue/22227.
// 回到 M 的运行现场,在 mstart1 中有保存 M 的运行现场(g0.sched),这里将会回到 mstart 中继续执行 mexit
gogo(&_g_.m.g0.sched)
} else {
// Clear lockedExt on plan9 since we may end up re-using
// this thread.
_g_.m.lockedExt = 0
}
}
// 再次调度
schedule()
}
如何寻找 G
回头看看调度逻辑中如何找到可运行的 G:
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
// 如果在 GC,则暂停,直到复始后重新开始
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
......
// 首先从 P 本地队列中寻找
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 找不到则去全局队列中寻找
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// Poll 网络,优先级比从其他 P 中偷取高
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// 准备从其他 P 中偷取
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
// 如果没有可偷取的就不偷了
goto stop
}
// 如果自旋中的 M 数量大于正在运行中 P 的数量,则直接阻塞
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
// M 进入自旋状态
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ {
// 随机偷取
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
// 再次检查 GC,如果进入 GC,回到顶部,暂停 M
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // 如果偷了两次都偷不到,则优先查找 ready 队列
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
......
// 放弃当前 P 之前,对 allp 做一个快照
// 一旦我们不再阻塞在 safe-point 时候,可以立刻在下面进行修改
allpSnapshot := allp
// 准备归还 P,调度器加锁
lock(&sched.lock)
// 再次检查 GC......
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
// 再次检查全局队列
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
// 归还 P
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
// 将 P 放入 Pidel 链表
pidleput(_p_)
// 解锁调度器
unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// 再次检查所有 P 的本地队列
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
// 再次检查 idel GC work
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
lock(&sched.lock)
_p_ = pidleget()
if _p_ != nil && _p_.gcBgMarkWorker == 0 {
pidleput(_p_)
_p_ = nil
}
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// Go back to idle GC check.
goto stop
}
}
// 再次检查 poll 网络
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
list := netpoll(true) // block until new work is available
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if !list.empty() {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
injectglist(&list)
}
}
// 真的找不到了,暂止当前 M
stopm()
goto top
}
总结查找 G 顺序:本地 > 全局 > poll 网络 > 偷。
如何偷取 G:
// 从 p2 的本地队列中窃取一半的元素,并放入 p 的本地队列中
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
return gp
}
M 的自旋
M 的自旋状态就是不断执行 schedule 的过程。
M 会在有 G 可用时,尽量保证有正在运行中 P 数量的自旋 M,而当没有 G 可用时,M 会陷入阻塞,等待唤醒。这样尽量保证在有 G 可用时不需要多次重复唤醒 M,也避免了无 G 可用时的查找 G 的 cpu 浪费。