ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

WASM 作为一种通用字节码可以让很多语言编译成 WASM 跑在沙箱VM环境当中:跑在浏览器的 V8 引擎上,让别的语言可以写前端、
构建成 Istio 的代理 Envoy 的 WASM 插件形成一些过滤器或者日志插件(有点像内核的 eBPF)、作为各种语言的浇水层(让Rust调用Go编译出的WASM)。

The major problem is that, whilst the Go compiler supports WebAssembly, it does not support WASI (WebAssembly System Interface). It generates an ABI that is deeply tied to JavaScript, and one needs to use the wasm_exec.js file provided by the Go toolchain, which doesn’t work outside a JavaScript host.

Go 的 WASM 目前没有支持 WASI,类似于没有 native 的系统调用,所以很多文件、网络的请求不能通过系统执行,目前只能跑在浏览器通过浏览器的接口执行。
TinyGo 是一个支持比较好的 WASM 的 Go 的运行时,有些需要WASI的项目就需要TinyGo来构建。

调度

WASM 是一个单线程的环境,js 是一个基于事件的模型,对于 goroutine 的调度是如何进行的,本着这个好奇研究了一下 Go 本身的 WASM 运行时。比如下面这段代码就明确了 Go 的 WASM 没有多线程也就没有跑 sysmon。
没有sysmon就相当于sysmon的forcegc触发gc的部分也没有,只能靠主动GC和内存分配的时候触发超过阈值开始gc。

1
2
3
4
5
6
7
8
9
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
// For runtime_syscall_doAllThreadsSyscall, we
// register sysmon is not ready for the world to be
// stopped.
atomic.Store(&sched.sysmonStarting, 1)
systemstack(func() {
newm(sysmon, nil, -1)
})
}

在这里回忆一下 GMP 的关系。P的数量由启动时环境变量 $GOMAXPROCS 或者是由 runtime 的方法 GOMAXPROCS() 决定。这意味着在程序执行的任意时刻都只有 $GOMAXPROCS 个 goroutine 在同时运行。
M的数量可以通过 runtime/debug 中的 SetMaxThreads 函数,设置 M 的最大数量一个 M 阻塞了,会创建新的 M。
M 与 P 的数量没有绝对关系,一个 M 阻塞,P 就会去创建或者切换另一个 M,所以,即使 P 的默认数量是 1,也有可能会创建很多个 M 出来。
在确定了 P 的最大数量 n 后,运行时系统会根据这个数量创建 n 个 P。
没有足够的 M 来关联 P 并运行其中的可运行的 G。比如所有的 M 此时都阻塞住了,而 P 中还有很多就绪任务,就会去寻找空闲的 M,而没有空闲的,就会去创建新的 M。

这个关系在对应的代码里面也有体现,在 osinit 的时候会强制将 P 设置为1,newosproc 也是空的,无法启动新进程。

1
2
3
4
5
func osinit() {
ncpu = 1
getg().m.procid = 2
physPageSize = 64 * 1024
}

没有 sysmon 也就没有异步抢占,只能靠 goroutine 之间的协作式抢占来切换 goroutine。

在 schedule 里面也有一段专用的调度逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// wasm only:
// If a callback returned and no other goroutine is awake,
// then wake event handler goroutine which pauses execution
// until a callback was triggered.
gp, otherReady := beforeIdle(now, pollUntil)
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
if otherReady {
goto top
}

等于是在 wasm 中在找不到 G 的时候会启动一个 pause goroutine 占住当前的 P。

Lock_js 有 handle event 的逻辑。

内存分配

通过 WASM 的内存分配接口代替 linux 里面的 mmap 接管过来做内存分配。

系统调用

系统调用是使用 js 实现的,而且也不是像 linux 那种完整一套系统调用这个可能需要 WASI 的支持。
没有系统调用 M 也不会阻塞,等于是一个完全的单 P 单 M 多 G 的模型。

1
2
3
var jsProcess = js.Global().Get("process")
var jsFS = js.Global().Get("fs")
var constants = jsFS.Get("constants")

目前实现的版本是参照go0.x的一个实现也就是只有一个m,在m上调度g,对应的是我们只用一个goroutine然后在goroutine上调度多个gogoroutine。在实现gogoroutine之前需要介绍一下go的调用惯例,这个文章讲的比较清楚。目前Golang在传递参数和返回值的时候是通过栈传递的。gcc会使用寄存器传递参数,golang有一个使用寄存器传递参数的提案。如果要实现一个goroutine上的协程可以利用这么一个调用惯例。

上下文切换

Go的上下文切换其实和操作系统的上下文切换比较类似,go的实现也是类似的,但相对来说比较简单,因为Go的调用惯例其他的寄存器在函数调用的时候是没有被使用的,主要是保存栈指针和一个指令寄存器PC就可以。Golang的抢占一开始是协作式的,入口是在函数调用的时候。在引入了异步抢占也就是让信号处理函数去切换到调度逻辑(这个切换的过程也类似后面讲到的gogo和gosave,但是他的入口是任何一个指令的地方都会发送所以要保存所有的寄存器)实现抢占以后就可以实现非协作的抢占了。

现在实现的是相对简单的协作式抢占。

0.x的代码是C写的,其中两个汇编函数实现的上下文的保存和切换。这里简单先补充一下Go中汇编的一些知识点。

函数定义:

1
 TEXT 包名·函数名(SB),一些标签,$栈帧大小-参数大小(包括返回值)

SP表示栈指针,AX是对应的ax系列的寄存器。
保存上下文的汇编函数如下,gobuf是一个结构体有两个指针成员分别是sp和pc。

1
2
3
4
5
6
7
TEXT gosave(SB), NOSPLIT, $0
MOVQ 8(SP), AX // 8(SP)是函数的第一个参数:gobuf的地址
MOVQ SP, 0(AX) // 保存SP也就是栈指针到 gobuf.sp 中。
MOVQ 0(SP), BX // 0(SP)是函数的返回地址
MOVQ BX, 8(AX) // 将函数的返回地址保存到 gobuf.pc 中。
MOVL $0, AX // return 0
RET

这段函数其实主要是保存了gosave调用时的栈指针,而返回地址就是gosave返回后的下一条指令的地址,返回值是0,这个0可以标记到这个函数是从gosave返回的,这个要结合后面的gogo来理解。
现在来看gogo

1
2
3
4
5
6
7
TEXT gogo(SB), 7, $0
MOVQ 8(SP), AX // 8(SP)是gobuf这个参数的地址
MOVQ 0(AX), SP // 将栈针修改为之前保存的SP
MOVQ 8(AX), AX // 获取PC
MOVQ AX, 0(SP) // 把 PC 放到0(SP)上
MOVL $1, AX // return 1
RET

gogo返回以后其实是返回到了之前gosave需要返回的地方,并且返回值是1。这里用寄存器做返回值是c里面的一种方式。所以如果一个gosave是返回0那么它是从真正的调用者那里返回的,如果返回的是1就是从gogo返回的,如果这里点理解了以后就可以实现一个上下文切换了。
对应Go的函数调用的版本就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TEXT ·gogogo(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ 0(AX), SP // restore SP
MOVQ 8(AX), AX
MOVQ AX, 0(SP) // put PC on the stack
MOVL $1, 16(SP) // return true
RET


TEXT ·gosave(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ SP, 0(AX) // save SP
MOVQ 0(SP), BX
MOVQ BX, 8(AX) // save PC
MOVB $0, 16(SP) // return false
RET

这面的区别是参数gobuf和返回值用了16个字节,因为Go的调用是用栈的,之前说到过。
0(SP)就是返回地址,8(SP)是gobuf,16(SP)是true或者false的返回地址。

gogoroutine的创建

原本函数做的两件事情就是分配栈和指定PC,pc只要对于一个新的gogoroutine指向函数指针就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void
sys·newproc(int32 siz, byte* fn, byte* arg0)
{
byte *stk, *sp;
G *newg;

//prints("newproc siz=");
//sys·printint(siz);
//prints(" fn=");
//sys·printpointer(fn);

siz = (siz+7) & ~7;
if(siz > 1024)
throw("sys·newproc: too many args");

lock(&sched);

if((newg = gfget()) != nil){
newg->status = Gwaiting;
stk = newg->stack0;
}else{
newg = mal(sizeof(G));
stk = mal(4096);
newg->stack0 = stk;
newg->status = Gwaiting;
newg->alllink = allg;
allg = newg;
}

到这就是分配了一段给栈用的内存,内存的具体分配后面会有一个文章讲解。

1
2
3
		// 160 这个地方是一个约定
// 一些小函数会利用栈之外的160个字节进行优化
newg->stackguard = stk+160;

上面多留出来的地方是一个x86的约定需要给出一个redzone给一些小函数优化用的,不用分配栈直接去使用栈外的这个redzone来增加效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   // 栈是从高地址向低地址增长的,至于这个 4*8是留给什么的没搞清
sp = stk + 4096 - 4*8;
newg->stackbase = sp;
// 拷贝参数
sp -= siz;
mcpy(sp, (byte*)&arg0, siz);
// 函数结束时返回到 goexit 进行收尾工作
sp -= 8;
*(byte**)sp = (byte*)sys·goexit;
// 留给 gogo 的修改返回地址用的地方
// 相当于假装在gogo的地方返回到了fn的函数指针
sp -= 8; // retpc used by gogo
newg->sched.SP = sp;
newg->sched.PC = fn;

sched.gcount++;
goidgen++;
newg->goid = goidgen;

readylocked(newg);
unlock(&sched);

//prints(" goid=");
//sys·printint(newg->goid);
//prints("\n");
}

对应的go代码,Go当中获取函数地址的方式比较tricky,需要了解interface的layout,他是一个interface的头和一个interface包含的对象的地址。FuncPC就是通过转换获取这个函数地址,对应的实现可以在Go的源码找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func NewProc(f interface{}, args ...interface{}) {
pc := FuncPC(f)
stack := Malloc(1024)
sp := stack + 1024 - 4*8
*(*uintptr)(unsafe.Pointer(sp - 8)) = FuncPC(goexit) + 1
gogoRoutine := GoGoRoutine{}
gogoRoutine.Sched.PC = pc
gogoRoutine.Sched.SP = sp - 8 - 8
gogoRoutine.Stack = stack
globalgoid++
gogoRoutine.goid = globalgoid
gogoRoutine.status = _Grunnable
ggput(&gogoRoutine)
}

调度主体

0.x版本的逻辑其实很简单,通过if(gosave)的方式可以判断是从哪里跳过来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Scheduler loop: find g to run, run it, repeat.
static void
scheduler(void)
{
G* gp;

// Initialization.
m->procid = getprocid();
lock(&sched);

if(gosave(&m->sched)){
// 这里的 gosave 返回的是 true
// 说明是通过 gogo 过来的
// 如果当前的 g 是 running 的话就保存上下文
// 切换成 runnable 放入到 queue 中
// 走出 if 去到调度逻辑。
// Jumped here via gosave/gogo, so didn'
// execute lock(&sched) above.
lock(&sched);

// Just finished running m->curg.
gp = m->curg;
gp->m = nil; // for debugger
switch(gp->status){
case Grunnable:
case Gdead:
// Shouldn't have been running!
throw("bad gp->status in sched");
case Grunning:
gp->status = Grunnable;
gput(gp);
break;
case Gmoribund:
gp->status = Gdead;
if(--sched.gcount == 0)
sys·exit(0);
break;
}
notewakeup(&gp->stopped);
}
// 真正的 gosave 是返回 false的。
// 这个地方是 gosave 的返回地址
// 也是 gogo 后 if 处理完的地方
// 在这里寻找合适的g然后运行。
// Find (or wait for) g to run. Unlocks sched.
gp = nextgandunlock();

noteclear(&gp->stopped);
gp->status = Grunning;
m->curg = gp;
gp->m = m; // for debugger
g = gp;
gogo(&gp->sched);
}

对应的go代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
   //go:noline
func schedule() {
if gosave(&sched.gg0.Sched) {
curgg := sched.curgg
switch curgg.status {
case _Grunnable:
panic("invalid status")
case _Grunning:
curgg.status = _Grunnable
ggput(curgg)
break
case _Gdead:
break
}
}
// 调度循环
for {
// println("find g")
gg := ggget()
if gg == nil {
time.Sleep(time.Second)
continue
}
gg.status = _Grunning
sched.curgg = gg
gogogo(&gg.Sched)
}

}

这样就能实现一个简单的单goroutine上跑多个gogoroutine的上下文切换了。

大规模的并发并行计算考虑的问题主要关于数据、计算的吞吐量还有容错。不管是设计者还是使用者,关心的是在期望上(因为可能会失败)运行任务的效率,从设计者的角度来讲要提供一个可靠高效的环境,从使用者的角度来讲这个东西需要足够的简单,简单意味着编程效率也保证了可靠性。大数据的整套技术栈在解决一些吞吐量大的批处理问题时得心应手,但是到了深度学习的场景,计算吞吐的要求反而提高了,比较现实点的情况是很多时候我们考虑的是能处理多大的数据量的日志,和多快能把一个ImageNet ResNet50训练完。这个区别在于 Spark 或者 Hadoop 的 MapReduce 的设计和 Tensorflow 以及 PyTorch 的设计理念不同以及对应的计算场景的不同。到了强化学习的场景又是一个大融合,强化学习的场景不光有迭代式的反向误差传播的计算,同时也包含了大规模的仿真环境的计算,Ray主要是在这方面提供了一个一站式的解决框架。分布式系统的问题某种程度上和操作系统的问题其实很类似,都是要考量如何从整个系统的角度充分利用物理资源,从错误中恢复,满足效率。这篇文章主要是比较一下几种主流的计算引擎或者框架或者说是算法的异同。

Spark(MapReduce)

我把Spark和MapReduce都归于大数据栈这一类,RDD(Resillient Distributed Dataset) 和 MapReduce
这两者的区别没有和MPI/PS的区别大那么多。

MapReduce 有 JobTracker,Spark 有 Driver

MapReduce 有 TaskTracker,Spark 有 Executor

MapReduce 中间结果是基于 HDFS,会落盘,Spark 中间结果是基于内存的,也可以落盘,主要是利用内存做缓存

MapReduce 计算抽象由Map和Reduce构成,Spark 的 RDD 有一系列的Transform和Action,封装程度更高

MapReduce 的错误处理比较简单,把失败的Map重试就好了,重试是一种非常好理解的错误处理。
Spark 的重试是根据 RDD 的有向无环图中的血缘关系计算的,可以理解为从失败的拓扑序上重新计算,也可以有中间的checkpoint。

RDD 的特性是只读的,在机器学习场景下参数不大的时候 MLLib 通过把参数存到 Driver 上来计算,当参数比较大特别是深度网络的参数大得吓人 Driver 存不下的时候,
只能通过新增 RDD,对于要频繁更新的模型参数要生成非常多的 RDD,这是 Spark 在深度学习上设计的缺陷。
一般来说一些简单的机器学习任务通过 sklearn 就能完成,当数据量比较大的时候就需要通过 Spark 的MLLib来处理。
当然 MLLib 现在也在开始从RDD-based转向SparkSQL用的Dataframe-based,从大的角度上讲两者互相融合是可行的,可能需要一些时间。
上 Spark 用 Yarn 调度 Tensorflow,还是用 Kuberenetes 调度 Spark 和 Tensorflow,我个人支持后者,而且这种分层是我比较喜欢的一种分层。

Ray

Ray 的基本抽象就是 Remote 或者 Actor,一个是分布式调用函数,一个式分布式调用类。Ray 和 Ray 的 RLlib 主要面对的问题是强化学习有大量的 simulating 的环境,比如仿真一局Dota,涉及到模拟一局Dota,反馈Agent的神经网络,是并行计算和神经网络训练的结合。当然 Ray 本身的抽象就是个分布式 goroutine,所以某种程度上可以完成的事情不光是强化学习一种任务,比如HypterTunning等一些并行计算的模型也是试用的。

反过来想,如果没有 Ray 的话,如何做这个系统呢,要构建大批量的仿真计算环境,然后能根据仿真的反馈训练神经网络。
这两个任务的调度控制就是一个问题,当然放到 k8s 的调度器里做似乎也可以,然后涉及这些分布式任务的同步问题,
需要构建这些任务的关系和信息传输,似乎用一些 DAG (比如 argo)的 workflow 也能解决,但他们之间通信的高效性似乎会是一个问题,需要
选择一种高效的远程调用传输方式,肯能gRPC也可以,还有他们的元数据管理用什么呢,自己搞个Redis似乎也行。
Ray 从这些方面综合考虑了这些问题提供了一个一站式的RL训练平台。

PS和MPI

MPI和PS的介绍有很多,我也不需要费篇幅唠叨。

PS和MPI是比较常用的分布式深度学习的训练方式,两者的主要区别在于Paramater Server,在PS的场景下参数统一走一个或者shard多个PS更新。

在MPI的场景下每个Worker是对等的(或者分层级对等,比如主机上的四张卡走NVLink,主机之间走万兆网卡)工作节点,使用AllReduce参数的同步在Worker之间进行。

总结

MPI在我出生前应该就有了,但是MapReduce之所以能火起来主要还是在 fault-tolerance 上,MPI的抽象比较基础,但是MapReduce和Spark在廉价大集群上的表现非常亮眼,对于没那么并行化理想的场景能够tolerate,时间到了深度学习的场景这玩意儿又冒出来了,一方面是深度学习的计算资源好得不得了,因为配备GPU的机器和传统的机器比起来好很多,对于这种纯粹的并行计算框架来说非常友好,错误处理的问题就没那么严重,即使是这样也慢慢开始有人着手优化MPI/PS的fault tolerance,在一些并行化退化的场景下能够把训练并行度降级不至于完全失败的工作。

从长远来看 Ray 还会有很多进化的空间,Spark 也会更好地适配深度学习场景,深度学习本身在System上的优化也层出不穷,大家对于大规模的并发并行计算系统的方方面面的要求都会越来越高。

贝叶斯优化(BO)

贝叶斯优化是一种黑盒优化方法,一般有几个特征:

输入纬度不大,一般小于 20 个,时间复杂度是$O(n^3)$
有设定的取值空间
目标函数需要是连续的(我觉得这个好像不是必须的,离散的也可以)
目标函数的计算非常消耗成本(时间等等)
目标函数是黑盒的,没有明确的结构
目标函数(derivative-free)没有一阶二阶导数(不然就可以用梯度下降去算了)

因为这些特性,在做机器学习的超参数调优的时候特别合适。

BayesOpt consists of two main components: a Bayesian statistical model for modeling the objective function, and an acquisition function for deciding where to sample next.

贝叶斯优化主要是要两个部分,一个是统计模型比如高斯过程(GP),一个是采样函数(AC)决定下一个样本从拿里获取。

高斯过程(GP)

高斯过程是贝叶斯优化中的一种统计模型,先抛开具体的数学问题,简单讲一下就是,假设我在一个变量空间(比如多个超参数)采样了一个目标函数(我们训练结果的 evaluation),然后我们会得到一个后验分布,这就类似条件概率里面,我们知道了某个事情发生以后,如果随机变量是相关的,我们有更大的概率确定其他变量的分布。例如下图,绿色是我们的分布空间,每次采样以后,分布的空间就会缩小,进而接近我们的曲线。

从数学上描述这件事情就需要高斯分布和高斯过程了。高斯分布我相信大家都耳熟能详,他由平均值 $\mu$ 还有标准差 $\sigma$ 决定分布。高斯过程的本质是一个多元高斯分布,只不过他是无限空间上的,定义高斯过程需要一个核函数定义他的协方差矩阵,也就是一个矩阵定义多个随机变量的相关性。当然了,有限的离散点用协方差矩阵可以,如果取值是连续的就需要核函数描述这个“无限”的协方差矩阵了。

协方差的核函数就很多选项,A Visual Exploration of Gaussian Processes 提供了一个非常 intuitive 的可视化来解释 GP 和各种描述协方差矩阵的核函数。

高斯过程被定义为一个随机过程,相当于每个样本点自己也是个随机函数,对于高斯分布来说,每个样本点也是一个高斯分布函数就是高斯过程,cs4780 的 Lecture 15有非常详细的解释。

Definition: A GP is a (potentially infinte) collection of random variables (RV) such that the joint distribution of every finite subset of RVs is multivariate Gaussian:
$ f \sim GP(\mu, k), $
where $\mu(\mathbf{x})$ and $k(\mathbf{x}, \mathbf{x}’)$ are the mean resp. covariance function! Now, in order to model the predictive distribution $P(f_* \mid \mathbf{x}_*, D)$ we can use a Bayesian approach by using a GP prior: $P(f\mid \mathbf{x}) \sim \mathcal{N}(\mu, \Sigma)$ and condition it on the training data $D$ to model the joint distribution of $f=f(X)$ (vector of training observations) and $f_* = f(\mathbf{x}_*)$ (prediction at test input).

采样函数

采样函数一般有 expected improvement(EI),当然还有 probability improvement(PI), upper confidence bound(UCB), knowledge gradient(KG),entropy search and predictive entropy search 等等。
采样的策略有两种:
Explore:探索新的点,这种采样有助于估计更准确的;
Exploit:利用已有结果附近的点进行采样,从而希望找到更大的;

这两个标准是互相矛盾的,如何在这两者之间寻找一个平衡点可以说是采样函数面对的主要挑战。

Expected improvement is a popular acquisition function owing to its good practical performance and an analytic form that is easy to compute. As the name suggests it rewards evaluation of the objective $f$ based on the expected improvement relative to the current best. If $f^* = \max_i y_i$ is the current best observed outcome and our goal is to maximize $f$, then EI is defined as

$\text{EI}(x) = \mathbb{E}\bigl[\max((f(x) - f^*), 0)\bigr]$

在 Facebook 的 Ax 在这里提到使用了 PyTorch 的一个 BoTorch 用的就是 EI,就是期望的增加度。

高斯分布式的期望就是 $\mu$ 所以是很好算的,$f^*$ 是已知的定值,这里有一个计算的推导。

BoTorch 提供了一个优化的 EI 叫 Noisy EI,主要功能是抗噪。

The above definition of the EI function assumes that the objective function is observed free of noise. In many types of experiments, such as those found in A/B testing and reinforcement learning, the observations are typically noisy. For these cases, BoTorch implements an efficient variant of EI, called Noisy EI, which allow for optimization of highly noisy outcomes, along with any number of constraints (i.e., ensuring that auxiliary outcomes do not increase or decrease too much). For more on Noisy EI, see our blog post.

这是采样的过程的一个示例:

应用

高斯过程本身可以用来回归和分类,使用高斯过程的贝叶斯优化有很多具体的应用场景,除了超参数优化之外,对于网络结果(层数,数据并行度)等等也是可以使用的。
除此之外像 horovod 也使用了贝叶斯优化,在这个目录下面

Horovod comes with several adjustable “knobs” that can affect runtime performance, including –fusion-threshold-mb and –cycle-time-ms (tensor fusion), –cache-capacity (response cache), and hierarchical collective algorithms –hierarchical-allreduce and –hierarchical-allgather.

他主要是服务于一些 Tensor Fusion 和 response cache 等参数以及层级 collective 通信选择,文档提到了通过设置一些参数细致控制调优的过程。

参考

  1. 一个例子搞清楚(先验分布/后验分布/似然估计)
  2. A Tutorial on Bayesian Optimization
  3. Practical Bayesian Optimization of Machine Learning Algorithms
  4. Awesome-AutoML-Papers
  5. Gaussian Processes for Machine Learning

背景

Horovod 是一个兼容主流计算框架的分布式机器学习训练框架,主要基于的算法是 AllReduce,这个是 baidu-research 在17年做的一个实现,这个东西原来是高性能计算范畴里的东西应用了 MPI 并行计算接口来实现,这是并行计算里的一个框架,已经很老了,这里有一个介绍 MPI 的 tutorial 写的比较好。

在介绍 horovod 的之前需要解释一下 AllReduce。在 MapReduce 里面 reduce 被翻译成了规约,在上面提到的 MPI tutorial 里面的解释是

Reduce is a classic concept from functional programming. Data reduction involves reducing a set of numbers into a smaller set of numbers via a function. For example, let’s say we have a list of numbers [1, 2, 3, 4, 5]. Reducing this list of numbers with the sum function would produce sum([1, 2, 3, 4, 5]) = 15. Similarly, the multiplication reduction would yield multiply([1, 2, 3, 4, 5]) = 120.

就是说把一个大的集合“缩减”成了小的集合,这里要注意的是这种缩减的计算是要满足交换律的,也就是减法或者除法是不行的,因为在并行计算当中不太好去控制计算的顺序。Reduce 就是这个意思,具体到 MPI_Reduce 就是把不同节点的数字“缩减”到一个节点上,支持的计算方式有加法乘法和取大小值等。

教程中给出的 Reduce 是求和。

AllReduce 就是在每个节点都获得 Reduce 的结果

基于这个标准就有很多的 All-Reduce 的实现,比如 Ring-Reduce,这个实现分两部分,一部分是 Scatter-Reduce 另一部分是 All-Gather。最早是在这篇 post里提到的。这个算法的好处是可以摆脱之前 PS 非常依赖 Parameter-Server 的带宽,Parameter-Server 的带宽会成为计算瓶颈的问题,而 AllReduce 可以让每个节点在带宽传输中的位置是对等的,并且减少传输次数。具体的算法可以看文章的解释,scatter-reduce 就是让每个节点有 K/N 的一个 reduce(也就是 sum),然后把自己的一个 K/N 的 reduce 再传递给其他节点,每个节点只和自己相邻的节点通信。

In the system we described, each of the N GPUs will send and receive values N-1 times for the scatter-reduce, and N-1 times for the allgather. Each time, the GPUs will send K / N values, where K is the total number of values in array being summed across the different GPUs. Therefore, the total amount of data transferred to and from every GPU is

Data Transferred=2(N−1)KN

数据传输量在 N 比较大的时候越没有影响,这就消弭了多节点给 Parameter-Server 造成的瓶颈。

还有一些其他术语,假设有 4 台 4 卡的 GPU 服务器。size 是工作进程(GPU)的数量(6),rank 是所有工作进程的 id(0-15),local rank 是当前服务器上的 id(0-3)。

Horovod 的介绍

使用 horovod 有一定的侵入性,代码需要一定的修改才能变成适配分布式训练,但是有一个好处就是适配的成本不高,并且 horovod 提供的各种框架的支持可以让 horovod 比较好的在各个框架的基础上使用,他支持 tensorflow/keras/mxnet/pytorch,MPI 的实现也有很多,比如 OpenMPI 还有 Nvidia 的 NCCL,还有 facebook 的 gloo,他们都实现了一种并行计算的通信和计算方式。而且 horovod 的本身的实现也很简单。

使用

Keras 用 ResNet50 训练 ImageNet 为例,主要侵入了几部分 hvd.init() 这个是 MPI 的初始化,让并行进程能够知道自己的 rank/local_rank 等信息。

第二部根据 local_rank(相当于单节点上的第n张卡),并且设置不占用全部显存,按需分配(可能因内没有统一管理导致显存碎片),然后传递给 keras 设置 session。

1
2
3
4
5
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))

然后在 rank 0 上恢复一个 checkpoint 并且广播给其他节点,这里的 broadcast 后面会介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
for try_epoch in range(args.epochs, 0, -1):
if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
resume_from_epoch = try_epoch
break

# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch')

# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0

设定传输的压缩函数,具体的压缩后面会提到,然后要么从之前的模型恢复要么重新训练。关键的 wrapper 在 opt 上,会给本地的 opt 包装一个 DistributedOptimizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast both model and optimizer weights
# to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch),
compression=compression)
else:
# ResNet-50 model that is included with Keras is optimized for inference.
# Add L2 weight decay & adjust BN settings.
model_config = model.get_config()
for layer, layer_config in zip(model.layers, model_config['layers']):
if hasattr(layer, 'kernel_regularizer'):
regularizer = keras.regularizers.l2(args.wd)
layer_config['config']['kernel_regularizer'] = \
{'class_name': regularizer.__class__.__name__,
'config': regularizer.get_config()}
if type(layer) == keras.layers.BatchNormalization:
layer_config['config']['momentum'] = 0.9
layer_config['config']['epsilon'] = 1e-5

model = keras.models.Model.from_config(model_config)

# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(),
momentum=args.momentum)

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt, compression=compression)

model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy', 'top_k_categorical_accuracy'])

然后设置一些回调函数,hvd.callbacks.BroadcastGlobalVariablesCallback(0) 保证的是 rank 0 上的所有参数只在 rank 0 初始化,然后广播给其他节点,后面是学习率 decay 的设置和一些统计信息的回调打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),

# Horovod: average metrics among workers at the end of every epoch.
#
# Note: This callback must be in the list before the ReduceLROnPlateau,
# TensorBoard, or other metrics-based callbacks.
hvd.callbacks.MetricAverageCallback(),

# Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose),

# Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3),
]

最后直接用 allreduce 计算一个 evaluation score。

1
2
# Evaluate the model on the full data set.
score = hvd.allreduce(model.evaluate_generator(input_fn(False, args.train_dir, args.val_batch_size),NUM_IMAGES['validation']))

实现

适配层和压缩算法

horovod 的实现主要分几部分,第一部分是一个适配层,用于兼容各种框架,比如 tensorflow 的适配就是实现一个新的 Op,这个可以参考 add new op,里面规范了 Tensorflow 自定义算子的实现。

请注意,生成的函数将获得一个蛇形名称(以符合 PEP8)。因此,如果您的操作在 C++ 文件中命名为 ZeroOut,则 Python 函数将称为 zero_out。

C++ 的定义是驼峰的,生成出来的 python 函数是下划线小写的,所以最后对应的是,适配Op的代码在 horovod/tensorflow 目录下面

C++ Python
HorovodAllgather horovod_allgather
HorovodAllreduce horovod_allreduce
HorovodBroadcast horovod_broadcast

另外在适配层可以加入一些压缩算法(在 horovod/[framework]/compression.py),我觉得压缩算法和框架无关的,放到适配层下面可能有别的原因,比如 tensorflow 默认带了一个 float16 压缩,具体的其他压缩算法比如3LC,可以通过有损压缩或者无损压缩提高带宽利用率。

统一层

这一层的实现是统一的,所有的适配层最后都是发出一些 Op+Tensor 的 Message 到队列中,后台初始化的时候会有一个专门的线程专门消费这个队列。他有一个同步消息的过程,相当于这个 tensor 在所有节点上都就绪以后就可以开始计算了,主体的流程是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// The coordinator currently follows a master-worker paradigm. Rank zero acts
// as the master (the "coordinator"), whereas all other ranks are simply
// workers. Each rank runs its own background thread which progresses in ticks.
// In each tick, the following actions happen:
//
// a) The workers send a Request to the coordinator, indicating what
// they would like to do (which tensor they would like to gather and
// reduce, as well as their shape and type). They repeat this for every
// tensor that they would like to operate on.
//
// b) The workers send an empty "DONE" message to the coordinator to
// indicate that there are no more tensors they wish to operate on.
//
// c) The coordinator receives the Requests from the workers, as well
// as from its own TensorFlow ops, and stores them in a [request table]. The
// coordinator continues to receive Request messages until it has
// received MPI_SIZE number of empty "DONE" messages.
//
// d) The coordinator finds all tensors that are ready to be reduced,
// gathered, or all operations that result in an error. For each of those,
// it sends a Response to all the workers. When no more Responses
// are available, it sends a "DONE" response to the workers. If the process
// is being shutdown, it instead sends a "SHUTDOWN" response.
//
// e) The workers listen for Response messages, processing each one by
// doing the required reduce or gather, until they receive a "DONE"
// response from the coordinator. At that point, the tick ends.
// If instead of "DONE" they receive "SHUTDOWN", they exit their background
// loop.

简单来讲就是说 coordinator 集 size 个 request DONE,然后找出就绪的 tensor (在 message_table 里面查找)构造出一个 read_to_reduce 的列表,然后发出 size 个 request 告知进程进行计算,然后 worker 接受到 response 开始真正的计算过程(通过 op_manager 具体执行)。

这是整体同步的过程,如果打开 horovod 的 trace log(HOROVOD_LOG_LEVEL=trace) 就能看到同步的过程。horovod 的主要 Op 除了 AllReduce 之外还有 allgather 和 broadcast。

算子实现层

具体的 op 在 common/op 可以看到有 NCCL/Gloo/MPI 等等的,这些由 op_manager 管理,他会根据优先级找到可以用来计算的 op 进行计算,比如 MPI 用的就是 MPI_Allreduce,具体 scatter-gather 和 all-gather openMPI 有现成的实现,NCCL 就直接调用 ncclAllReduce,比较新的 nccl 也支持跨节点的 allreduce 了,不用自己再套一层。

除了 allreduce 之外,还有两个比较重要的算子。

allgather 主要是比 allreduce 少一层 reduce,所有数据被发送到所有进程就可以。allreduce 的第二步就是把每个进程的 scatter-reduce 的 reduce 结果发送到所有进程。

broadcast 的作用是一对多的广播,主要是把初始化的参数同步给其他进程的时候使用。

总结一下 SysML 2019 的一些论文。

TICTAC

问题背景是解决分布式训练的 scale 问题。
如图,网络带宽和传输的顺序是关键因素,网络带宽很好理解,如果 Best 要提高只能加带宽,同时传输顺序如图会影响计算时间。

Communication to Computation Ratio 传输计算比,如果比值大说明效率高

  1. 增大 BatchSize,但是会过拟合(贾扬清的 Train ImageNet in 1 Hour),减小参数精度(FP16,腾讯绝艺 1024 张V100)。

  2. Overlay Coeffiecient 提供传输和计算的重合率,是 TicTac 的优化方向。

计算最优传输依赖,这是个 NP 问题,需要找近似解。recv op 是图的 root,可行的拓扑排序有很多种,解决方案就是找到近似优化排序。(原问题是 NP 问题)

几种符号

op.P 直接 Op 执行计算时间
op.M Op传输时间
op.M+ 触发计算 op 的最小传输时间

Tic

设 Communication Time 对于每个 recv op 来说都相等,从直觉上解释就是计算执行 recv op 的优先度。
如图计算对应的值

1
2
3
4
5
6
case 1
A.M = B.M = 1
case 2
A.M = B.M = 1
C.M = 1+1
D.M = 1+1+1

越小优先级越高,这个可以在 DAG 中静态算出

Tac

Communication Time 对于每个 recv op 来说有对应的时间,对应的算法。

系统设计

Time Oracle: Tensorflow metrics 计算 op time

Ordering Wizard: 计算静态依赖优先级

Enforcing: 修改 Tensorflow gRPC 子模块

P3

PRIORITY-BASED PARAMETER PROPAGATION FOR DISTRIBUTED DNN TRAINING

和上一篇论文类似,也是基于传输优化的思路,因为网络带宽基本上就是一个硬件问题。一些优化手段是用了有损压缩或者降低收敛时准确率的方式。还是通过提高 overlap 来实现。
直观的现象是如图 L4 的一次正向传播和反向传播之间的间隔很大。

解决方法:带优先级的参数切分

在 MXNet 里面,worker 算完当层的梯度
就会提一个 pull request 当其他 work 同步了这个梯度。
问题在于发送的顺序和反向传播的顺序一致,并且颗粒度
是以 layer 为单位的。

根据领域知识将大 layer 分片成小 slice。
对每个 slice 进行权重排序,优先传输权重高的 slice。
被称为 Priority-Based Parameter Propagation

如图如果参数传输有优先级能够被中断就能减少 delay

大部分模型每层的参数是不平衡的,特别是全连接的参数非常大。

对每一层切分以后利用 TCP 的全双工可以达到流水线的效果。

P3 的设计

参数切片:根据领域知识和实验选择大小(50000)
参数优先级:下一层先需要的先发送,可以抢占低优先级的

我个人觉得简单,这个比上一个更好理解也简单。

BlueConnect

主要是一个对分层(原本的一层或者两层)的 All-Reduce 的泛化算法。
首先使用递归的方法可以减少 all-gather 的传输次数。

这个只要三次 logp

ring reduce 要7次 (p-1)

问题背景

多卡之间的带宽很高 32GB/s,不同网络拓扑下,最慢的会成为瓶颈。
分两层可以解决网络和总线的带宽差异。

对于二层的一个泛化,

三层的 reduce-scatter 的例子,reduce-gather 是反的。

BEYOND DATA AND MODEL PARALLELISM FOR DEEP NEURAL NETWORKS

这个主要是提到了并行程度除了数据和模型之外可以更细粒度到参数和属性,也就是模型的小划分和样本的小划分。
SOAP= Sample Attribute Operator Parameter

全连接层有大量的参数,造成传输瓶颈。
领域特定的优化:RNN 和 CNN 用 data 并行,最后全连接用 model 并行。

搜索策略

找到最优并行策略是 NP 问题。
Execution Optimizer 通过找到最小 cost 的
并行策略寻找较优解。
Cost = tensor size/ bandwidth

随机从搜索空间选取策略,依据 MCMC
最够贪心,又不会太局部贪心。

总的来有点和 AutoML 类似,在一个搜索空间选择一个策略使得 cost 最低,一个是原本的 cost 函数,这个是
计算本身执行的时间。目前主流框架也不支持 SOAP 级别的并行划分,没手调的好,但很接近。

可以看到搜索出来的结果全连接基本上是单卡算的

行星运动我们都是按万有引力定律算的,实际上按照爱因斯坦的说法,万有引力不是惯性力是时空弯曲的效果,但是按照爱因斯坦的公式算起来特别复杂,所以现在航天轨道的设计还是按万有引力定律在算。

牛顿万有引力定律

牛顿万有引力定律:任意两个质点由通过连心线方向上的力相互吸引。该吸引力的大小与它们的质量乘积成正比,与它们距离的平方成反比,G 是万有引力常数。

$$ F=-\frac{G M \cdot m}{r^2} \cdot \frac{\vec{r}}{r} $$

多体问题

多体问题,是所有吸引力的叠加,这里排除物体相撞的情况,因为宇宙非常大,两个天体之间的距离非常远,暂时不考虑这种情况,通过加入一个小常量可以在计算上忽略这个问题。

$$ F=-\frac{G M m r}{(r^2 + \epsilon^2)^{\frac{3}{2}}} $$

这样累加的时候自己也可以加进去,因为 r 是 0,那一项相当于没算。

$$ F_j= \sum_{i=1}^{n} F_i $$

对于 N body 有很多的稍微近似的算法,多是基于实际的物理场景的,比如大部分天体是属于一个星系的,每个星系之间都非常远,所以可以将一些星系作为整体计算,构建一颗树,树中的某些节点是其所有叶子的质心,这样就可以减少计算量,但是放到 GPU 的场景下一个 O(N^2) 的暴力算法利用 GPU 的并行能力可以非常快的算出来。

Python 暴力解

下面是 N-Body 暴力实现的例子,网上有个各种语言实验的,比较好用来测试准确性,来自这里,就是两层循环。

1
2
3
4
5
6
7
8
9
10
11
12
cat <<EOF >> nbody.txt
0.01 3 20
1
0 0 0
0.01 0 0
0.1
1 1 0
0 0 0.02
0.001
0 1 1
0.01 -0.01 -0.01
EOF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import math

class Vector:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z

def __add__(self, other):
return Vector(self.x + other.x, self.y + other.y, self.z + other.z)

def __sub__(self, other):
return Vector(self.x - other.x, self.y - other.y, self.z - other.z)

def __mul__(self, other):
return Vector(self.x * other, self.y * other, self.z * other)

def __div__(self, other):
return Vector(self.x / other, self.y / other, self.z / other)

def __eq__(self, other):
if isinstance(other, Vector):
return self.x == other.x and self.y == other.y and self.z == other.z
return False

def __ne__(self, other):
return not self.__eq__(other)

def __str__(self):
return '({x}, {y}, {z})'.format(x=self.x, y=self.y, z=self.z)

def abs(self):
return math.sqrt(self.x*self.x + self.y*self.y + self.z*self.z)

origin = Vector(0, 0, 0)

class NBody:
def __init__(self, fileName):
with open(fileName, "r") as fh:
lines = fh.readlines()
gbt = lines[0].split()
self.gc = float(gbt[0])
self.bodies = int(gbt[1])
self.timeSteps = int(gbt[2])
self.masses = [0.0 for i in range(self.bodies)]
self.positions = [origin for i in range(self.bodies)]
self.velocities = [origin for i in range(self.bodies)]
self.accelerations = [origin for i in range(self.bodies)]
for i in range(self.bodies):
self.masses[i] = float(lines[i*3 + 1])
self.positions[i] = self.__decompose(lines[i*3 + 2])
self.velocities[i] = self.__decompose(lines[i*3 + 3])

print("Contents of", fileName)
for line in lines:
print(line.rstrip())
print
print("Body : x y z |")
print(" vx vy vz")

def __decompose(self, line):
xyz = line.split()
x = float(xyz[0])
y = float(xyz[1])
z = float(xyz[2])
return Vector(x, y, z)

def __computeAccelerations(self):
for i in range(self.bodies):
self.accelerations[i] = origin
for j in range(self.bodies):
if i != j:
temp = self.gc * self.masses[j] / math.pow((self.positions[i] - self.positions[j]).abs(), 3)
self.accelerations[i] += (self.positions[j] - self.positions[i]) * temp
return None

def __computePositions(self):
for i in range(self.bodies):
self.positions[i] += self.velocities[i] + self.accelerations[i] * 0.5
return None

def __computeVelocities(self):
for i in range(self.bodies):
self.velocities[i] += self.accelerations[i]
return None

def __resolveCollisions(self):
for i in range(self.bodies):
for j in range(self.bodies):
if self.positions[i] == self.positions[j]:
(self.velocities[i], self.velocities[j]) = (self.velocities[j], self.velocities[i])
return None

def simulate(self):
self.__computeAccelerations()
self.__computePositions()
self.__computeVelocities()
self.__resolveCollisions()
return None

def printResults(self):
fmt = "Body %d : % 8.6f % 8.6f % 8.6f | % 8.6f % 8.6f % 8.6f"
for i in range(self.bodies):
print(fmt % (i+1, self.positions[i].x, self.positions[i].y, self.positions[i].z, self.velocities[i].x, self.velocities[i].y, self.velocities[i].z))
return None

nb = NBody("nbody.txt")
for i in range(nb.timeSteps):
print("\nCycle %d" % (i + 1))
nb.simulate()
nb.printResults()

CUDA 并行计算

在 CUDA 中并行化首先要理解 CUDA 的层次结构,CUDA 有 grid 和 block 对线程做划分。首先设计一个 computation tile,相当于一个 block 的计算。

左侧表示一个 block 中的 p 个 body 执行 p 次,最后就能更新所有 body 的加速度,这里的内存占用是 O(p) 的不是 O(p^2),浅绿色表示的是串行执行的流,官方的文档没有更新,对应 CUDA 10 的例子里是这样的,vec3 存的是 x/y/z 轴的加速度,vec4 存的是坐标加质量,就是计算 i 受到 j 的加速度,这里没有并行设计是串行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename T>
__device__ typename vec3<T>::Type
bodyBodyInteraction(typename vec3<T>::Type ai,
typename vec4<T>::Type bi,
typename vec4<T>::Type bj)
{
typename vec3<T>::Type r;

// r_ij [3 FLOPS]
r.x = bj.x - bi.x;
r.y = bj.y - bi.y;
r.z = bj.z - bi.z;

// distSqr = dot(r_ij, r_ij) + EPS^2 [6 FLOPS]
T distSqr = r.x * r.x + r.y * r.y + r.z * r.z;
distSqr += getSofteningSquared<T>();

// invDistCube =1/distSqr^(3/2) [4 FLOPS (2 mul, 1 sqrt, 1 inv)]
T invDist = rsqrt_T(distSqr);
T invDistCube = invDist * invDist * invDist;

// s = m_j * invDistCube [1 FLOP]
T s = bj.w * invDistCube;

// a_i = a_i + s * r_ij [6 FLOPS]
ai.x += r.x * s;
ai.y += r.y * s;
ai.z += r.z * s;

return ai;
}

block 的 tile computation 就是串行计算 p 次 p 个 body 的加速度。

1
2
3
4
for (unsigned int counter = 0; counter < blockDim.x; counter++)
{
acc = bodyBodyInteraction<T>(acc, bodyPos, sharedPos[counter]);
}

每次计算完了以后要对线程做共享内存的同步

黑实线就是一次 block 共享内存同步的边界,也就是一次 tile compuation,图中是 p 个 body 按时间串行执行的流程,超出 p 的时间计算的不是 p 中的 body,看下面这张图。

也就是按照时间串行的计算每个 body 对一个 body 的加速度,然后 N/p 个 block 并行,每个 block 有 p 个线程并行,没 p 次计算每个 block 要同步一次,但是多个 block 之间不需要同步,所以每个并行线程的主体是这样的要在 tile computation 之间包一层同步的调用 cg::sync(ca),同步一次 block 中线程的共享内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename T>
__device__ typename vec3<T>::Type
computeBodyAccel(typename vec4<T>::Type bodyPos,
typename vec4<T>::Type *positions,
int numTiles, cg::thread_block cta)
{
typename vec4<T>::Type *sharedPos = SharedMemory<typename vec4<T>::Type>();

typename vec3<T>::Type acc = {0.0f, 0.0f, 0.0f};

for (int tile = 0; tile < numTiles; tile++)
{
sharedPos[threadIdx.x] = positions[tile * blockDim.x + threadIdx.x];

cg::sync(cta);
// This is the "tile_calculation" from the GPUG3 article.
#pragma unroll 128

for (unsigned int counter = 0; counter < blockDim.x; counter++)
{
acc = bodyBodyInteraction<T>(acc, bodyPos, sharedPos[counter]);
}

cg::sync(cta);
}

return acc;
}

从 GPU 的角度来看,时间复杂度是 O(N),空间复杂度也是 O(N),但前提是要开 N 个线程同时计算。

参考资料

n body nvidia

O(N) 算法计算天体运动

n body problem

模型的保存分三种类型

  1. 知道模型结构,单纯保存变量
  2. 不知道模型结构,保存模型和变量
  3. 不需要再改变量,只要常量化的模型(“冻结”)

第一种用于训练的存档,并且临时恢复,这个时候用户是把训练需要的网络结构在代码里面构造好了的,只是在一定的时间下需要暂时保存网络中的变量,为了在崩溃之后继续训练。所以自然而然会有一个问题,如果我用 Python 写的代码,需要在 C++ 当中恢复,我需要知道你的模型结构,才能恢复,这个最蠢的办法是用 C++ 把你的网络结构再构造一遍,但我们按照统一的协议(比如 Protobuf)确定网络结构,就可以直接从标准序列化的数据中解析网络结构,这就是第二种情况,独立于语言,模型和变量一起保存的情况。然后如果碰到我们不需要再训练了,比如只是把这个模型进行部署,不需要改变相关的变量,那么其实只要一个带常量的模型就可以,这就是第三种情况,把变量冻结的正向传播模型。接下来会依次解释这几种情况的工作方式。

除了这些以外,针对用于服务的模型还可以做很多的优化。

存档

存档只是单纯的保存变量,并且能够恢复,可以在一定的迭代次数以后保存变量,并且从任意一个存档开始重新训练。以两个变量加减 1 为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import tensorflow as tf

# Create some variables.
v1 = tf.get_variable("v1", shape=[3], initializer = tf.zeros_initializer)
v2 = tf.get_variable("v2", shape=[5], initializer = tf.zeros_initializer)

inc_v1 = v1.assign(v1+1)
dec_v2 = v2.assign(v2-1)

# Add an op to initialize the variables.
init_op = tf.global_variables_initializer()

# Add ops to save and restore all the variables.
saver = tf.train.Saver()

# Later, launch the model, initialize the variables, do some work, and save the
# variables to disk.
with tf.Session() as sess:
sess.run(init_op)
# Do some work with the model.
inc_v1.op.run()
dec_v2.op.run()
# Save the variables to disk.
save_path = saver.save(sess, "/tmp/tf-test/model.ckpt")
print("Model saved in path: %s" % save_path)

可以在 /tmp/tf-test 下面看到这几个文件 checkpoint model.ckpt.data-00000-of-00001 model.ckpt.index model.ckpt.meta

可以通过脚本观察保存的变量 python $tensorflow-src/tensorflow/python/tools/inspect_checkpoint.py --file_name=/tmp/tf-test/model.ckpt --all_tensors

得到保存的变量的内容,注意 model.ckpt 这个只是文件前缀。

1
2
3
4
tensor_name:  v1
[1. 1. 1.]
tensor_name: v2
[-1. -1. -1. -1. -1.]

如果要恢复的话,可以通过下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import tensorflow as tf

# Create some variables.
v1 = tf.get_variable("v1", shape=[3])
v2 = tf.get_variable("v2", shape=[5])

# Add ops to save and restore all the variables.
saver = tf.train.Saver()

# Later, launch the model, use the saver to restore variables from disk, and
# do some work with the model.
with tf.Session() as sess:
# Restore variables from disk.
saver.restore(sess, "/tmp/tf-test/model.ckpt")
print("Model restored.")
# Check the values of the variables
print("v1 : %s" % v1.eval())
print("v2 : %s" % v2.eval())

得到一样的效果

1
2
v1 : [1. 1. 1.]
v2 : [-1. -1. -1. -1. -1.]

具体来说 .meta 对应的是 MetaGraph 和 SaverGraph,.index 对应的是变量值的位置,key 是变量名,value 是变量保存的入口定义,data 变量的值具体保存的文件。这是恢复代码中已经原样构造出了 Graph,如果没有构造的化,需要通过 tf.train.import_meta_graph('/tmp/model.ckpt.meta') 来加载,但是存档保存的信息比较单一,Tensorflow 提供了一个更丰富的 API 来使用。

保存

SavedModelBuilder 保存的 API 比较丰富,能够保存多个 MetaGraph 和 Variables 的组合,除此之外还能附带 assets,并且要指定模型签名,simple_saved 的方法是一个简单版本的调用,适用于 Predict API。这里要展开一下 GraphDef, MetaGraphDef, SignatureDef, tags 这些东西的概念。对于 MetaGraph,这篇文章解释得很清楚。SignatureDef 是对应了一种图的输入和输出,可以依据这个进行 serving API 的调用,类似于函数签名,相对于一个接口的定义。

tensorflow_serving 自己给了个例子,执行 python mnist_saved_model.py /tmp/tf-test-2 以后可以获得一个目录,下面有版本 1 的模型数据,执行 saved_model_cli show --dir /tmp/tf-test-2/1 可以查看对应的签名。可以看到对应的层级关系,默认用于服务的模型会打上 serve 的标签,函数签名有两个,分别对应了 predict 和 classify 的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['predict_images']:
The given SavedModel SignatureDef contains the following input(s):
inputs['images'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 784)
name: x:0
The given SavedModel SignatureDef contains the following output(s):
outputs['scores'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: y:0
Method name is: tensorflow/serving/predict

signature_def['serving_default']:
The given SavedModel SignatureDef contains the following input(s):
inputs['inputs'] tensor_info:
dtype: DT_STRING
shape: unknown_rank
name: tf_example:0
The given SavedModel SignatureDef contains the following output(s):
outputs['classes'] tensor_info:
dtype: DT_STRING
shape: (-1, 10)
name: index_to_string_Lookup:0
outputs['scores'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: TopKV2:0
Method name is: tensorflow/serving/classify

可以参考 tensorflow 的 REST API,比如 GET http://host:port/v1/models/${MODEL_NAME}[/versions/${MODEL_VERSION}] 其实对应这个例子就是 GET http://host:port/v1/models/tf-test-2/versions/1,然后感觉函数签名不同的 method name,可以调用不同的 request,比如 POST http://host:port/v1/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]:predict 这个格式,如果输入和输出对应的是 imagesscores 那么就对应了第一个签名。

冻结

冻结的情况就是变量不再需要修改,直接把变量转化成常量保存成单一的模型,方便在部署的场景下使用。
冻结模型的代码在这里,他的主要流程如下

  1. 清除所有 Op 中的 device,让原来在指定 CPU/GPU/节点 上的 Op 不再绑定。
  2. 通过 graph_util.convert_variables_to_constants 将所有的 Variable eval 一次,把变量的 Op 的结果拿到,替换成 constant

优化

除了冻结模型以外,还可以删减一些多余的节点,比如 Summary 节点或者 Identity 节点,甚者把 16bit 的浮点数权重修改为 8bit 的浮点数权重(这个在 Tensorflow Lite 里很有用)。这篇文章 列出了详细的优化方式,主要是靠 transform_graph 这个工具,地址在,他有很详细的柴剪列表,并且可以自己编写裁剪函数,充分做到模型在部署环节的“纯净化”,调用方式也很简单。

1
2
3
4
5
6
7
8
9
10
transform_graph \
--in_graph=tensorflow_inception_graph.pb \
--out_graph=optimized_inception_graph.pb \
--inputs='Mul:0' \
--outputs='softmax:0' \
--transforms='
strip_unused_nodes(type=float, shape="1,299,299,3")
remove_nodes(op=Identity, op=CheckNumerics)
fold_old_batch_norms
'

transforms里面加入你想进行优化的 transformer 和对应的参数即可,在科赛上也有在线可以跑的notebook

参考

  1. Cloud ML Engine

我照着 raft 论文重新过了一遍 etcd/raft 的代码,主要的文件是 etcd 下面的 raft.go。对照这个代码重新梳理一遍也算是深入理解一下 raft 算法。接下来会包含两个视频一个是选举相关的内容,一个是日志复制的内容,我 walktrough 的时候默认是大致读过论文的,对一些机制和字段都有了解,没有具体解析每个字段的来历,并且把一些问题按照代码的顺序重新理了一遍。

第一部分是关于选举的,raft 本身是一个 quorum based 的算法,一致性要靠“大多数人”的同意,并且为了简单和不必要的竞争,raft 只有一个主节点,在收到大多数人的投票以后成为主节点。

第二部分是关于日志复制的,raft 需要让主节点把客户端的请求复制到大多数节点上才能算达成一致,并且 commit,下面介绍的是复制的机制,并且 ConfChange 和 Snapshot 也是走这个流程达成一致的。

之后的 walkthrough 应该会慢慢补上,关于一些其他的逻辑和具体的一些函数的细节部分会在后面放出。

mvcc

v2 版本的实现存在 watcher 丢失或者丢失事件的问题,导致客户端要重新获取一遍。新版本是有本地嵌入式的数据库来避免这些问题。
mvcc 是 etcd v3 版本的存储实现主要有两个部分,一个是 backend 的 boltDB 和内存索引 key index。

BoltDB 是一个 B+ 树的嵌入式 KV store,相对于 leveldb 适用于写多读少的情况,我之前的文章简单介绍了一下 leveldb 的设计,BoltDB 比较适合读多写少,或者存在大范围 scan 的情况下比较合适,还有类似 levedb 的产品 badger,是纯 Go 实现的,在多写的情况下 leveldb 的衍生品表现更好。

BatchTx 就是收集很多的 call,然后在一个 Update 里面完成。

backend 里面存的 key 是 revision ,其中 main revision 是事务编号,sub revision 是事务中操作的编号,etcd 在 boltdb 上还做了一层缓存,所以多了一些锁机制。
内存索引中存的是真正的 key 到 revison 的索引。

key_index -> tx_buffer -> boltdb.tx

调用路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kv:
WriteView 是一些直接操作
ReadView
Read() TxnRead
Write() TxnWrite 是拿到对应的 transaction

revision:
main
sub
kvstore:
readView readView{kv} 用了 Read() Read/Write 是 kvstore 自己实现的用于拿 backend 的和自己的各种锁。(kvstore\_txn.go)
writeView writeView{kv} 用了 Write()
treeIndex:
keyIndex


writeView:
拿到 kv 的 Write(storeTxnWrite): 首先查 tw.s.kvindex.Get 然后再 backend.Tx UnsageSeqPut,然后更新 keyindex,append changes,处理 lease (old detach new attach)
readView
storeTxnRead: keyindex Revisons -> tx.UnsafeRange

lessor lesor(重音在后面)
leaseBucket 在单独的一个 bucket 里面

BPF

BPF (Berkeley Packet Filter) 最早是用在 tcpdump 里面的,比如 tcpdump tcp and dst port 80 这样的过滤规则会单独复制 tcp 协议并且目的端口是 80 的包到用户态。整个实现是基于内核中的一个虚拟机来实现的,通过翻译 BPF 规则到字节码运行到内核中的虚拟机当中。最早的论文是这篇,这篇论文我大概翻了一下,主要讲的是原本的基于栈的过滤太重了,而 BPF 是一套能充分利用 CPU 寄存器,动态注册 filter 的虚拟机实现,相对于基于内存的实现更高效,不过那个时候的内存比较小才几十兆。bpf 会从链路层复制 pakcet 并根据 filter 的规则选择抛弃或者复制,字节码是这样的,具体语法就不介绍了,一般也不会去直接写这些字节码,然后通过内核中实现的一个虚拟机翻译这些字节码,注册过滤规则,这样不修改内核的虚拟机也能实现很多功能。

在 Linux 中对应的 API 是

1
2
3
socket(SOCK_RAW)
bind(iface)
setsockopt(SO_ATTACH_FILTER)

下面是一个低层级的 demo,首先 ethernet header 的十二个字节记录了 ip 的协议,ip 的第9个字节记录 tcp 的协议,如果协议编号不匹配都跳到最后 reject,然后在到 tcp 的第二个字节是 port 看看是不是 80,都满足的话就 accept。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <net/if.h>
#include <net/ethernet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <netpacket/packet.h>
#include <linux/filter.h>

#define OP_LDH (BPF_LD | BPF_H | BPF_ABS)
#define OP_LDB (BPF_LD | BPF_B | BPF_ABS)
#define OP_JEQ (BPF_JMP | BPF_JEQ | BPF_K)
#define OP_RET (BPF_RET | BPF_K)

// Filter TCP segments to port 80
static struct sock_filter bpfcode[8] = {
{ OP_LDH, 0, 0, 12 }, // ldh [12]
{ OP_JEQ, 0, 5, ETH_P_IP }, // jeq #0x800, L2, L8
{ OP_LDB, 0, 0, 23 }, // ldb [23] # 14 bytes of ethernet header + 9 bytes in IP header until the protocol
{ OP_JEQ, 0, 3, IPPROTO_TCP }, // jeq #0x6, L4, L8
{ OP_LDH, 0, 0, 36 }, // ldh [36] # 14 bytes of ethernet header + 20 bytes of IP header (we assume no options) + 2 bytes of offset until the port
{ OP_JEQ, 0, 1, 80 }, // jeq #0x50, L6, L8
{ OP_RET, 0, 0, -1, }, // ret #0xffffffff # (accept)
{ OP_RET, 0, 0, 0 }, // ret #0x0 # (reject)

};

int main(int argc, char **argv)
{
int sock;
int n;
char buf[2000];
struct sockaddr_ll addr;
struct packet_mreq mreq;
struct iphdr *ip;
char saddr_str[INET_ADDRSTRLEN], daddr_str[INET_ADDRSTRLEN];
char *proto_str;
char *name;
struct sock_fprog bpf = { 8, bpfcode };

if (argc != 2) {
printf("Usage: %s ifname\n", argv[0]);
return 1;
}

name = argv[1];

sock = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
if (sock < 0) {
perror("socket");
return 1;
}

memset(&addr, 0, sizeof(addr));
addr.sll_ifindex = if_nametoindex(name);
addr.sll_family = AF_PACKET;
addr.sll_protocol = htons(ETH_P_ALL);

if (bind(sock, (struct sockaddr *) &addr, sizeof(addr))) {
perror("bind");
return 1;
}

if (setsockopt(sock, SOL_SOCKET, SO_ATTACH_FILTER, &bpf, sizeof(bpf))) {
perror("setsockopt ATTACH_FILTER");
return 1;
}

memset(&mreq, 0, sizeof(mreq));
mreq.mr_type = PACKET_MR_PROMISC;
mreq.mr_ifindex = if_nametoindex(name);

if (setsockopt(sock, SOL_PACKET,
PACKET_ADD_MEMBERSHIP, (char *)&mreq, sizeof(mreq))) {
perror("setsockopt MR_PROMISC");
return 1;
}

for (;;) {
n = recv(sock, buf, sizeof(buf), 0);
if (n < 1) {
perror("recv");
return 0;
}

ip = (struct iphdr *)(buf + sizeof(struct ether_header));

inet_ntop(AF_INET, &ip->saddr, saddr_str, sizeof(saddr_str));
inet_ntop(AF_INET, &ip->daddr, daddr_str, sizeof(daddr_str));

switch (ip->protocol) {
#define PTOSTR(_p,_str) \
case _p: proto_str = _str; break

PTOSTR(IPPROTO_ICMP, "icmp");
PTOSTR(IPPROTO_TCP, "tcp");
PTOSTR(IPPROTO_UDP, "udp");
default:
proto_str = "";
break;
}

printf("IPv%d proto=%d(%s) src=%s dst=%s\n",
ip->version, ip->protocol, proto_str, saddr_str, daddr_str);
}

return 0;
}

执行 curl "http://www.baidu.com",结果如下:

1
2
3
4
5
6
7
8
sudo ./filter ens3
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244

这些低级别的操作都封装在了 libpcap 里面,一般不太会自己这么写。

eBPF

eBPF 是 extended BPF 具有更强大的功能。老的 BPF 现在叫 cBPF (classic BPF)。

首先是字节码的指令集更加丰富了,并且现在有了 64 位的寄存器(相较于上古时期的 32 位的CPU),有了 JIT mapping 技术和 LLVM 的后端。JIT 指的的是 Just In Time,实时编译。

一般的 ePBF 的工作流是编写一个 C 的子集(比如没有循环),通过 LLVM 编译到字节码,然成生成 ELF 文件,然后 JIT 编译进内核。

eBPF 一个最重要的功能是可以做到动态跟踪(dynamic tracing),可以不修改程序直接监控一个正在运行的进程。

在 eBPF 之前

在 ebpf 之前,为了实现同样的功能,要在执行的指令中嵌入 hook,并且支持跳到 inspect 函数,然后再恢复执行,这个流程和 debugger 非常相似,这是用 kprobe 来实现,kprobe 是 2007 年引入内核的。比如下面的例子,把 Instruction 3 改成跳转指令,然后再执行 Instruction 3,然后再跳转回去。

使用 kprobe 需要通过编译 kernel module 注册到内核当中,非常麻烦,等于是直接动内核的代码很容易引起内核 panic,而且每个内核版本都不一样,函数符号和位移是有区别的,对每个版本的内核都要编译一个对应的版本的 module。为了解决这个问题引入了一些静态的稳定的 trace point,不会因为版本而改变的地方可以插入 kprobe,但这样就限制了 kprobe 可以探测的范围。

有了 eBPF

有了 eBPF,就可以将用户态的程序插入到内核中,不用编写内核模块了,但是问题并没有改善,内核版本带来的问题还是没有解决。

eBPF 的 kprobe 一种方式时候 mapping,映射 kprobe 的数据到用户态程序,比如发包数,然后用户态程序定期检查这个映射进行统计。

另一种方式是 event (perf_events),如果 kprobe 向用户态程序发送事件来进行统计,这样不同轮询,直接异步计算就可以。

低级别的 API,这个只有 Linux 有

1
2
3
4
5
bpf() 系统调用
BPF_PROG_LOAD 加载 BPF 字节码
BPF_PROG_TYPE_SOCKET_FILTER
BPF_PROG_TYPE_KPROBE
BPF_MAP_* map 映射到 BPF 当中
1
perf_event_open() + ioctl(PERF_EVENT_IOC_SET_BPF)

高级别的接口是 bcc(BPFcompiler collection),转换 c 到 LLVM-epbf 后端,并且前端是 python 的。可以实现动态加载 eBPF 字节码到内核中。

weave scope 就是用 bcc 实现的 HTTP stats 的统计。

这里可以看到程序的主体,这里 hook 了内核函数 skb_copy_datagram_iter,这个函数有一个 tracepoint trace_skb_copy_datagram_iovec。在内核代码里面对应的是下面这段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* skb_copy_datagram_iter - Copy a datagram to an iovec iterator.
* @skb: buffer to copy
* @offset: offset in the buffer to start copying from
* @to: iovec iterator to copy to
* @len: amount of data to copy from buffer to iovec
*/
int skb_copy_datagram_iter(const struct sk_buff *skb, int offset,
struct iov_iter *to, int len)
{
int start = skb_headlen(skb);
int i, copy = start - offset, start_off = offset, n;
struct sk_buff *frag_iter;

trace_skb_copy_datagram_iovec(skb, len);

这里对这个 hook 注册了程序,具体的代码就不展示了,主要是根据协议统计这个 HTTP 的大小,方法等信息。

1
2
3
4
5
6
7
8
9
10

/* skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket
* buffers from kernel to userspace.
*
* skb_copy_datagram_iter() has an associated tracepoint
* (trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but
* it lacks the offset argument.
*/
int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *skb, int offset, void *unused_iovec, int len)
{

比如判断 method 是不是 DELETE 的是实现就比较蠢,是因为 eBPF 不支持循环,只能这么实现才能把 c 代码翻译成字节码。

1
2
3
4
5
case 'D':
if ((data[1] != 'E') || (data[2] != 'L') || (data[3] != 'E') || (data[4] != 'T') || (data[5] != 'E') || (data[6] != ' ')) {
return 0;
}
break;

除了 bcc 之外,waeve 使用了 gobpf,一个 bpf 的 go binding,并且通过建立 tcp 连接来猜测内核的数据结构,以达到内核版本无关,这个项目 tcptracer-bpf 还在开发中。

eBPF 的其他应用

还有一个比较大头的基于 eBPF 的是 cilium,一套比较完整的网络解决方案,用 eBPF 实现了 NAT,L3/L4 负载均衡,连接记录等等功能。比如访问控制,一般的 iptables 都是 drop 或者 rst,要过整个协议栈,但是 eBPF 可以在 connect 的时候就拦截然后返回 EACCESS,这样就不用过协议栈了。cilium 一个优化就是通过 XDP ,利用类似 DPDK 的加速方案,hook 到驱动层中,让 eBPF 可以直接使用 DMA 的缓冲,优化负载均衡。

BPF/XDP allows for a 10x improvement in load balancing over IPVS for L3/L4 traffic.

现在 k8s 最新的 lb 方案是基于 ipvs 的,我在 kube-proxy 分析 里面有提到过,已经比原来的 iptables 提高很多了,现在有了 eBPF 加 XDP 的硬件加速可以实现更高的提升,facebook 的 katran L4 负载均衡器的实现也是类似的。

cilium 在我看来基本上是 k8s 网络的一个大方向吧,只不过包括 eBPF 和 XDP 对硬件和内核版本都要比较新,是一个要持续关注的更新。

性能调优

Velocity 2017: Performance Analysis Superpowers with Linux eBPF里,Brendan Gregg (Netflix 的性能调优专家) 提到,性能调优也是 eBPF 的一个大头。用于网络监控其实只是 hook 在了协议栈的函数上,如果 hook 在别的地方可以有更多的统计维度。比如 bcc 官方的例子就是统计 IO Size 的大小的分布,更多关于基于 eBPF 的性能调优可以参考他的 blog,他给出了更详细的关于 eBPF 的解释,里面有一些列 Linux 性能调优的内容。

1
2
3
4
5
6
7
8
9
10
11
12
# ./bitehist.py
Tracing... Hit Ctrl-C to end.
^C
kbytes : count distribution
0 -> 1 : 3 | |
2 -> 3 : 0 | |
4 -> 7 : 211 |********** |
8 -> 15 : 0 | |
16 -> 31 : 0 | |
32 -> 63 : 0 | |
64 -> 127 : 1 | |
128 -> 255 : 800 |**************************************|

安全

在安全方面有 seccomp,可以实现限制 Linux 的系统调用,而 seccompe-bpf 则是通过 bpf 支持更强大的过滤和匹配功能,k8s pod 里面的 SecurityContext 就有 seccomp 实现的部分。

cgroup

在 cgroup 上有一个小原型,cgnet 获取 cgroup 的网络统计信息到 prometheus,也是基于 eBPF 的。

参考:

  1. Infrastructure 2017 - Alfonso Acosta - High-performance Linux monitoring with eBPF

  2. Using bpf in kubernetes

  3. Cilium: Networking and security for containers with BPF and XDP