ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

在goroutine上创建协程gogoroutine

目前实现的版本是参照go0.x的一个实现也就是只有一个m,在m上调度g,对应的是我们只用一个goroutine然后在goroutine上调度多个gogoroutine。在实现gogoroutine之前需要介绍一下go的调用惯例,这个文章讲的比较清楚。目前Golang在传递参数和返回值的时候是通过栈传递的。gcc会使用寄存器传递参数,golang有一个使用寄存器传递参数的提案。如果要实现一个goroutine上的协程可以利用这么一个调用惯例。

上下文切换

Go的上下文切换其实和操作系统的上下文切换比较类似,go的实现也是类似的,但相对来说比较简单,因为Go的调用惯例其他的寄存器在函数调用的时候是没有被使用的,主要是保存栈指针和一个指令寄存器PC就可以。Golang的抢占一开始是协作式的,入口是在函数调用的时候。在引入了异步抢占也就是让信号处理函数去切换到调度逻辑(这个切换的过程也类似后面讲到的gogo和gosave,但是他的入口是任何一个指令的地方都会发送所以要保存所有的寄存器)实现抢占以后就可以实现非协作的抢占了。

现在实现的是相对简单的协作式抢占。

0.x的代码是C写的,其中两个汇编函数实现的上下文的保存和切换。这里简单先补充一下Go中汇编的一些知识点。

函数定义:

1
 TEXT 包名·函数名(SB),一些标签,$栈帧大小-参数大小(包括返回值)

SP表示栈指针,AX是对应的ax系列的寄存器。
保存上下文的汇编函数如下,gobuf是一个结构体有两个指针成员分别是sp和pc。

1
2
3
4
5
6
7
TEXT gosave(SB), NOSPLIT, $0
MOVQ 8(SP), AX // 8(SP)是函数的第一个参数:gobuf的地址
MOVQ SP, 0(AX) // 保存SP也就是栈指针到 gobuf.sp 中。
MOVQ 0(SP), BX // 0(SP)是函数的返回地址
MOVQ BX, 8(AX) // 将函数的返回地址保存到 gobuf.pc 中。
MOVL $0, AX // return 0
RET

这段函数其实主要是保存了gosave调用时的栈指针,而返回地址就是gosave返回后的下一条指令的地址,返回值是0,这个0可以标记到这个函数是从gosave返回的,这个要结合后面的gogo来理解。
现在来看gogo

1
2
3
4
5
6
7
TEXT gogo(SB), 7, $0
MOVQ 8(SP), AX // 8(SP)是gobuf这个参数的地址
MOVQ 0(AX), SP // 将栈针修改为之前保存的SP
MOVQ 8(AX), AX // 获取PC
MOVQ AX, 0(SP) // 把 PC 放到0(SP)上
MOVL $1, AX // return 1
RET

gogo返回以后其实是返回到了之前gosave需要返回的地方,并且返回值是1。这里用寄存器做返回值是c里面的一种方式。所以如果一个gosave是返回0那么它是从真正的调用者那里返回的,如果返回的是1就是从gogo返回的,如果这里点理解了以后就可以实现一个上下文切换了。
对应Go的函数调用的版本就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TEXT ·gogogo(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ 0(AX), SP // restore SP
MOVQ 8(AX), AX
MOVQ AX, 0(SP) // put PC on the stack
MOVL $1, 16(SP) // return true
RET


TEXT ·gosave(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ SP, 0(AX) // save SP
MOVQ 0(SP), BX
MOVQ BX, 8(AX) // save PC
MOVB $0, 16(SP) // return false
RET

这面的区别是参数gobuf和返回值用了16个字节,因为Go的调用是用栈的,之前说到过。
0(SP)就是返回地址,8(SP)是gobuf,16(SP)是true或者false的返回地址。

gogoroutine的创建

原本函数做的两件事情就是分配栈和指定PC,pc只要对于一个新的gogoroutine指向函数指针就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void
sys·newproc(int32 siz, byte* fn, byte* arg0)
{
byte *stk, *sp;
G *newg;

//prints("newproc siz=");
//sys·printint(siz);
//prints(" fn=");
//sys·printpointer(fn);

siz = (siz+7) & ~7;
if(siz > 1024)
throw("sys·newproc: too many args");

lock(&sched);

if((newg = gfget()) != nil){
newg->status = Gwaiting;
stk = newg->stack0;
}else{
newg = mal(sizeof(G));
stk = mal(4096);
newg->stack0 = stk;
newg->status = Gwaiting;
newg->alllink = allg;
allg = newg;
}

到这就是分配了一段给栈用的内存,内存的具体分配后面会有一个文章讲解。

1
2
3
		// 160 这个地方是一个约定
// 一些小函数会利用栈之外的160个字节进行优化
newg->stackguard = stk+160;

上面多留出来的地方是一个x86的约定需要给出一个redzone给一些小函数优化用的,不用分配栈直接去使用栈外的这个redzone来增加效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   // 栈是从高地址向低地址增长的,至于这个 4*8是留给什么的没搞清
sp = stk + 4096 - 4*8;
newg->stackbase = sp;
// 拷贝参数
sp -= siz;
mcpy(sp, (byte*)&arg0, siz);
// 函数结束时返回到 goexit 进行收尾工作
sp -= 8;
*(byte**)sp = (byte*)sys·goexit;
// 留给 gogo 的修改返回地址用的地方
// 相当于假装在gogo的地方返回到了fn的函数指针
sp -= 8; // retpc used by gogo
newg->sched.SP = sp;
newg->sched.PC = fn;

sched.gcount++;
goidgen++;
newg->goid = goidgen;

readylocked(newg);
unlock(&sched);

//prints(" goid=");
//sys·printint(newg->goid);
//prints("\n");
}

对应的go代码,Go当中获取函数地址的方式比较tricky,需要了解interface的layout,他是一个interface的头和一个interface包含的对象的地址。FuncPC就是通过转换获取这个函数地址,对应的实现可以在Go的源码找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func NewProc(f interface{}, args ...interface{}) {
pc := FuncPC(f)
stack := Malloc(1024)
sp := stack + 1024 - 4*8
*(*uintptr)(unsafe.Pointer(sp - 8)) = FuncPC(goexit) + 1
gogoRoutine := GoGoRoutine{}
gogoRoutine.Sched.PC = pc
gogoRoutine.Sched.SP = sp - 8 - 8
gogoRoutine.Stack = stack
globalgoid++
gogoRoutine.goid = globalgoid
gogoRoutine.status = _Grunnable
ggput(&gogoRoutine)
}

调度主体

0.x版本的逻辑其实很简单,通过if(gosave)的方式可以判断是从哪里跳过来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Scheduler loop: find g to run, run it, repeat.
static void
scheduler(void)
{
G* gp;

// Initialization.
m->procid = getprocid();
lock(&sched);

if(gosave(&m->sched)){
// 这里的 gosave 返回的是 true
// 说明是通过 gogo 过来的
// 如果当前的 g 是 running 的话就保存上下文
// 切换成 runnable 放入到 queue 中
// 走出 if 去到调度逻辑。
// Jumped here via gosave/gogo, so didn'
// execute lock(&sched) above.
lock(&sched);

// Just finished running m->curg.
gp = m->curg;
gp->m = nil; // for debugger
switch(gp->status){
case Grunnable:
case Gdead:
// Shouldn't have been running!
throw("bad gp->status in sched");
case Grunning:
gp->status = Grunnable;
gput(gp);
break;
case Gmoribund:
gp->status = Gdead;
if(--sched.gcount == 0)
sys·exit(0);
break;
}
notewakeup(&gp->stopped);
}
// 真正的 gosave 是返回 false的。
// 这个地方是 gosave 的返回地址
// 也是 gogo 后 if 处理完的地方
// 在这里寻找合适的g然后运行。
// Find (or wait for) g to run. Unlocks sched.
gp = nextgandunlock();

noteclear(&gp->stopped);
gp->status = Grunning;
m->curg = gp;
gp->m = m; // for debugger
g = gp;
gogo(&gp->sched);
}

对应的go代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
   //go:noline
func schedule() {
if gosave(&sched.gg0.Sched) {
curgg := sched.curgg
switch curgg.status {
case _Grunnable:
panic("invalid status")
case _Grunning:
curgg.status = _Grunnable
ggput(curgg)
break
case _Gdead:
break
}
}
// 调度循环
for {
// println("find g")
gg := ggget()
if gg == nil {
time.Sleep(time.Second)
continue
}
gg.status = _Grunning
sched.curgg = gg
gogogo(&gg.Sched)
}

}

这样就能实现一个简单的单goroutine上跑多个gogoroutine的上下文切换了。