ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

ebpf是内核当中一个非常重要的功能,简单来说就是以虚拟机的形式提供一种在内核中执行嵌入字节码的能力。
ebpf定义的指令集也非常简答,但是写起来比较麻烦所以作为工具提供了libbpf和bpf-tool。
用户可以通过写C的形式写ebpf程序嵌入到内核中,同时在用户态进行交互,用户态的程序没有语言限制。
ebpf特性的对应内核版本列表在,后面例子中的ringbuffer map是要内核版本在5.8以上。
ebpf的架构和工具链的一个比较完整的文档是cilium的一个文档

vmlinux.h是通过bpftool生成的一个虚拟头文件,用于访问内核数据结构。

libbpf 和 bpftool 都是在内核的代码仓库里面开发的。对应的目录分别是tools/lib/bpftools/bpf,帮助用户用c开发和调试ebpf程序。
相当于说bpftool是ebpf的调试和观测工具,libbpf是一提供给开发者的ebpf库。

bpf_herplers.h有一个section的定义SEC用来决定在ebpf的.o对象文件中的位置,以及一些功能。
比如SEC(kprobe/xxx)就代表修饰的程序会嵌入到内核函数的调用中。

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* Helper macro to place programs, maps, license in
* different sections in elf_bpf file. Section names
* are interpreted by libbpf depending on the context (BPF programs, BPF maps,
* extern variables, etc).
* To allow use of SEC() with externs (e.g., for extern .maps declarations),
* make sure __attribute__((unused)) doesn't trigger compilation warning.
*/
#define SEC(name) \
_Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wignored-attributes\"") \
__attribute__((section(name), used)) \
_Pragma("GCC diagnostic pop") \

libbpf.c 有对应的SEC的定义。

1
2
3
4
5
6
#define SEC_DEF(sec_pfx, ptype, ...) {                                      \
.sec = sec_pfx, \
.len = sizeof(sec_pfx) - 1, \
.prog_type = BPF_PROG_TYPE_##ptype, \
__VA_ARGS__ \
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static const struct bpf_sec_def section_defs[] = {
BPF_PROG_SEC("socket", BPF_PROG_TYPE_SOCKET_FILTER),
BPF_EAPROG_SEC("sk_reuseport/migrate", BPF_PROG_TYPE_SK_REUSEPORT,
BPF_SK_REUSEPORT_SELECT_OR_MIGRATE),
BPF_EAPROG_SEC("sk_reuseport", BPF_PROG_TYPE_SK_REUSEPORT,
BPF_SK_REUSEPORT_SELECT),
SEC_DEF("kprobe/", KPROBE,
.attach_fn = attach_kprobe),
BPF_PROG_SEC("uprobe/", BPF_PROG_TYPE_KPROBE),
SEC_DEF("kretprobe/", KPROBE,
.attach_fn = attach_kprobe),
BPF_PROG_SEC("uretprobe/", BPF_PROG_TYPE_KPROBE),
BPF_PROG_SEC("classifier", BPF_PROG_TYPE_SCHED_CLS),
BPF_PROG_SEC("action", BPF_PROG_TYPE_SCHED_ACT),
SEC_DEF("tracepoint/", TRACEPOINT,
.attach_fn = attach_tp),
SEC_DEF("tp/", TRACEPOINT,
.attach_fn = attach_tp),
SEC_DEF("raw_tracepoint/", RAW_TRACEPOINT,
.attach_fn = attach_raw_tp),

kprobe的attach方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct bpf_link *bpf_program__attach_kprobe(struct bpf_program *prog,
bool retprobe,
const char *func_name)
{
char errmsg[STRERR_BUFSIZE];
struct bpf_link *link;
int pfd, err;

pfd = perf_event_open_probe(false /* uprobe */, retprobe, func_name,
0 /* offset */, -1 /* pid */);
if (pfd < 0) {
pr_warn("prog '%s': failed to create %s '%s' perf event: %s\n",
prog->name, retprobe ? "kretprobe" : "kprobe", func_name,
libbpf_strerror_r(pfd, errmsg, sizeof(errmsg)));
return libbpf_err_ptr(pfd);
}
link = bpf_program__attach_perf_event(prog, pfd);
err = libbpf_get_error(link);
if (err) {
close(pfd);
pr_warn("prog '%s': failed to attach to %s '%s': %s\n",
prog->name, retprobe ? "kretprobe" : "kprobe", func_name,
libbpf_strerror_r(err, errmsg, sizeof(errmsg)));
return libbpf_err_ptr(err);
}
return link;
}

根据函数名用perf_event_open_probe注入program

内核对于oom的处理主要在mm/oom_kill.c中。
oom的触发就是在内存无法分配的时候,选择一个最“差”的进程发送kill信号。
oom_kill_process这个函数是主要入口,定义是static void oom_kill_process(struct oom_control *oc, const char *message)
其中oc->victim是要被杀掉的进程,如果有cgroup会把相同内存cgroup的进程都杀掉,所以如果要检测一个进程的oom可以通过检测这个函数的参数做到。

下面这段就是OOM的时候dmesg看到的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

/* Get a reference to safely compare mm after task_unlock(victim) */
mm = victim->mm;
mmgrab(mm);

/* Raise event before sending signal: task reaper must see this */
count_vm_event(OOM_KILL);
memcg_memory_event_mm(mm, MEMCG_OOM_KILL);

/*
* We should send SIGKILL before granting access to memory reserves
* in order to prevent the OOM victim from depleting the memory
* reserves from the user space under its control.
*/
do_send_sig_info(SIGKILL, SEND_SIG_PRIV, victim, PIDTYPE_TGID);
mark_oom_victim(victim);
pr_err("%s: Killed process %d (%s) total-vm:%lukB, anon-rss:%lukB, file-rss:%lukB, shmem-rss:%lukB, UID:%u pgtables:%lukB oom_score_adj:%hd\n",
message, task_pid_nr(victim), victim->comm, K(mm->total_vm),
K(get_mm_counter(mm, MM_ANONPAGES)),
K(get_mm_counter(mm, MM_FILEPAGES)),
K(get_mm_counter(mm, MM_SHMEMPAGES)),
from_kuid(&init_user_ns, task_uid(victim)),
mm_pgtables_bytes(mm) >> 10, victim->signal->oom_score_adj);
task_unlock(victim);

对于内核函数的检测需要ebpf当中的kprobe的能力。
kprobe类似单步调试的能力,在函数入口插入一个breakpoint,然后通过trap让执行流转到注册的ebpf的程序。

oomkill的内核中的ebpf程序主要参考datadog的agent的实现。
框架程序主要参考cilium/ebpf当中的例子

PT_REGS_PARM1是一个宏可以帮助读取内核函数的参数,因为按照约定ebpf都是通过寄存器传参的,所以其实返回的是第1个参数对应的寄存器。bpf_probe_read是一个用于读取内存到ebpf程序中的辅助函数。
所以oomkill的ebpf实现的内核中的代码就比较简单,嵌入oom_kill_process的函数调用,或者要被结束的进程复制出进程的pid和command,然后通过类型为BPF_MAP_TYPE_RINGBUF的map发送event。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// +build ignore

#include "vmlinux.h"
#include "bpf_helpers.h"
#include "bpf_tracing.h"

char __license[] SEC("license") = "Dual MIT/GPL";

struct event {
u32 pid;
u8 comm[80];
};

struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} events SEC(".maps");

// Force emitting struct event into the ELF.
const struct event *unused __attribute__((unused));

SEC("kprobe/oom_kill_process")
int kprobe_oom_kill_process(struct pt_regs *ctx) {
struct event *task_info;
struct oom_control *oc = (struct oom_control *)PT_REGS_PARM1(ctx);

task_info = bpf_ringbuf_reserve(&events, sizeof(struct event), 0);

if (!task_info) {
return 0;
}

struct task_struct *p;
bpf_probe_read(&p, sizeof(p), &oc->chosen);
bpf_probe_read(&task_info->pid, sizeof(task_info->pid), &p->pid);
bpf_probe_read(&task_info->comm, sizeof(task_info->comm), (void *)&p->comm);

bpf_ringbuf_submit(task_info, 0);

return 0;
}

用户态的程序也比较简单,把ebpf的对象文件attach以后读取ringbuffer别解析event。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"golang.org/x/sys/unix"
)

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -type event -target amd64 bpf oom_kill_kernel.c

func main() {
// Name of the kernel function to trace.
fn := "oom_kill_process"

// Subscribe to signals for terminating the program.
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)

// Allow the current process to lock memory for eBPF resources.
if err := rlimit.RemoveMemlock(); err != nil {
log.Fatal(err)
}

// Load pre-compiled programs and maps into the kernel.
objs := bpfObjects{}
if err := loadBpfObjects(&objs, nil); err != nil {
log.Fatalf("loading objects: %v", err)
}
defer objs.Close()

fmt.Println("probe")
// Open a Kprobe at the entry point of the kernel function and attach the
// pre-compiled program. Each time the kernel function enters, the program
// will emit an event containing pid and command of the execved task.
kp, err := link.Kprobe(fn, objs.KprobeOomKillProcess, nil)
if err != nil {
log.Fatalf("opening kprobe: %s", err)
}
defer kp.Close()
fmt.Println("attach done")

// Open a ringbuf reader from userspace RINGBUF map described in the
// eBPF C program.
rd, err := ringbuf.NewReader(objs.Events)
if err != nil {
log.Fatalf("opening ringbuf reader: %s", err)
}
defer rd.Close()

// Close the reader when the process receives a signal, which will exit
// the read loop.
go func() {
<-stopper

if err := rd.Close(); err != nil {
log.Fatalf("closing ringbuf reader: %s", err)
}
}()

log.Println("Waiting for events..")

// bpfEvent is generated by bpf2go.
var event bpfEvent
for {
record, err := rd.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Println("Received signal, exiting..")
return
}
log.Printf("reading from reader: %s", err)
continue
}

// Parse the ringbuf event entry into a bpfEvent structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Printf("parsing ringbuf event: %s", err)
continue
}

log.Printf("pid: %d\tcomm: %s\n", event.Pid, unix.ByteSliceToString(event.Comm[:]))
}
}

使用一个python脚本进行测试OOM。

1
2
3
a="111111111111111"
while True:
a+=a

最后运行结果

1
2
3
4
probe
attach done
Waiting for events..
pid: 16541 comm: python

ebpf的能力还不止于此,业界做了很多流量控制,性能监控的实践,包括calico用ebpf的实现替代kube-proxy,waeve-scope 用ebpf记录tcp连接,cilium 也用eBPF,结合了tc和XDP。datadog-agent 也是使用了ebpf做一些监控,上面的oomkill的例子也是参考datadog的。ebpf能够让linux内核变得更动态更灵活。甚至有一种说法是eBPF让Linux内核正在变成微内核。

Knative主要有两个重要的部分,一个是自动扩展一个是流量切换,并且目前的设计已经和istio是相对独立的了。

Knative的service和k8s的service容易混淆,所以用sks指代knative的service,然后service本身是指k8s本身的service。

sks的revision创建的时候会有两个service,一个是 public service,一个是 private service,如果用istio的话会看到一个和ingressgateway关联的有externalname的service,这个service是和ingress实现相关的。主要的实现还是public和private两个service,实现和具体的ingress的实现是独立的。

Private service 对应的是真实的pod的endpoints,public service 有两种模式一个serve模式,一个是proxy模式。当public service出于proxy模式时,其endpoints指向的是activator,当处于serve模式时endpoints指向的是private service所指向的后端endpoints。

在scale从0到1的过程中,会先阻塞在activator上,当有pod启动以后,还是会保持proxy模式,直到超过burst才会切换到private service的endpoints上。在从1到0的过程中会再切换回activator,直到有新的请求到来再触发pod的启动。

activator

Throttler

Throttler is the interface that Handler calls to Try to proxy the user request

Health Handler 注册用于 kubelet 做 readiness 和 health probe 的接口,返回statSink的状态,收到 signal term 的时候就开始返回500。

Network probe handler
knative组件用来 probe 的接口,在header里面会有区分。

Context handler
把header中的revision的name和namespace注入的context当中

Metric Handler
收集request的qps等metrics的信息。

Log Handler
请求日志

Tracing Handler
Trace spans

Cocurrency report Handler

记录请求信息,这些信息会被reporter上报

Activator handler

过一层 throttler 进行proxy,如果没受限制就会proxy request。

Throttler 会根据revID创建一个throttler,revision如果存在的话就会创建throttler。(revision肯定是一直在的哪怕没有起pod)如果超过revision的并发数就会退出。

Throttler 会try对应的revisionThrottler的pods然后转发过去。

controller

Controller 主要是对用户使用的几个CRD的同步:Service、Route、Revision、Configuration。

net-istio

Ingress 的一种实现

internal 的 ingress创建一个 ingress virtualservice 并且将gate指定为isito-gateway,其他的ingress实现其实类似,只是目前没有traefik的支持。knative有没有istio现在是没啥区别了。

domain mappings

一个用于扩展域名的CRD DomainClaim,会根据domainclaim创建一个ingress。

queue-proxy

tracing metrics breaker 都是正常操作。本质是个sidecar层面的反向代理。

Metrcis

Admin

有一个给cocurrency endpoint发 paused 和 resumed 的回调。

Main

Proxy handler 收集 request in 和 request out。

自定义默认域名的问题

kubectl get cm config-network -n knative-serving -o yaml 可以看到默认的模板去修改他

1
domain-template: "{{.Name}}.{{.Namespace}}.{{.Domain}}"

WASM 作为一种通用字节码可以让很多语言编译成 WASM 跑在沙箱VM环境当中:跑在浏览器的 V8 引擎上,让别的语言可以写前端、
构建成 Istio 的代理 Envoy 的 WASM 插件形成一些过滤器或者日志插件(有点像内核的 eBPF)、作为各种语言的浇水层(让Rust调用Go编译出的WASM)。

The major problem is that, whilst the Go compiler supports WebAssembly, it does not support WASI (WebAssembly System Interface). It generates an ABI that is deeply tied to JavaScript, and one needs to use the wasm_exec.js file provided by the Go toolchain, which doesn’t work outside a JavaScript host.

Go 的 WASM 目前没有支持 WASI,类似于没有 native 的系统调用,所以很多文件、网络的请求不能通过系统执行,目前只能跑在浏览器通过浏览器的接口执行。
TinyGo 是一个支持比较好的 WASM 的 Go 的运行时,有些需要WASI的项目就需要TinyGo来构建。

调度

WASM 是一个单线程的环境,js 是一个基于事件的模型,对于 goroutine 的调度是如何进行的,本着这个好奇研究了一下 Go 本身的 WASM 运行时。比如下面这段代码就明确了 Go 的 WASM 没有多线程也就没有跑 sysmon。
没有sysmon就相当于sysmon的forcegc触发gc的部分也没有,只能靠主动GC和内存分配的时候触发超过阈值开始gc。

1
2
3
4
5
6
7
8
9
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
// For runtime_syscall_doAllThreadsSyscall, we
// register sysmon is not ready for the world to be
// stopped.
atomic.Store(&sched.sysmonStarting, 1)
systemstack(func() {
newm(sysmon, nil, -1)
})
}

在这里回忆一下 GMP 的关系。P的数量由启动时环境变量 $GOMAXPROCS 或者是由 runtime 的方法 GOMAXPROCS() 决定。这意味着在程序执行的任意时刻都只有 $GOMAXPROCS 个 goroutine 在同时运行。
M的数量可以通过 runtime/debug 中的 SetMaxThreads 函数,设置 M 的最大数量一个 M 阻塞了,会创建新的 M。
M 与 P 的数量没有绝对关系,一个 M 阻塞,P 就会去创建或者切换另一个 M,所以,即使 P 的默认数量是 1,也有可能会创建很多个 M 出来。
在确定了 P 的最大数量 n 后,运行时系统会根据这个数量创建 n 个 P。
没有足够的 M 来关联 P 并运行其中的可运行的 G。比如所有的 M 此时都阻塞住了,而 P 中还有很多就绪任务,就会去寻找空闲的 M,而没有空闲的,就会去创建新的 M。

这个关系在对应的代码里面也有体现,在 osinit 的时候会强制将 P 设置为1,newosproc 也是空的,无法启动新进程。

1
2
3
4
5
func osinit() {
ncpu = 1
getg().m.procid = 2
physPageSize = 64 * 1024
}

没有 sysmon 也就没有异步抢占,只能靠 goroutine 之间的协作式抢占来切换 goroutine。

在 schedule 里面也有一段专用的调度逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// wasm only:
// If a callback returned and no other goroutine is awake,
// then wake event handler goroutine which pauses execution
// until a callback was triggered.
gp, otherReady := beforeIdle(now, pollUntil)
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
if otherReady {
goto top
}

等于是在 wasm 中在找不到 G 的时候会启动一个 pause goroutine 占住当前的 P。

Lock_js 有 handle event 的逻辑。

内存分配

通过 WASM 的内存分配接口代替 linux 里面的 mmap 接管过来做内存分配。

系统调用

系统调用是使用 js 实现的,而且也不是像 linux 那种完整一套系统调用这个可能需要 WASI 的支持。
没有系统调用 M 也不会阻塞,等于是一个完全的单 P 单 M 多 G 的模型。

1
2
3
var jsProcess = js.Global().Get("process")
var jsFS = js.Global().Get("fs")
var constants = jsFS.Get("constants")

目前实现的版本是参照go0.x的一个实现也就是只有一个m,在m上调度g,对应的是我们只用一个goroutine然后在goroutine上调度多个gogoroutine。在实现gogoroutine之前需要介绍一下go的调用惯例,这个文章讲的比较清楚。目前Golang在传递参数和返回值的时候是通过栈传递的。gcc会使用寄存器传递参数,golang有一个使用寄存器传递参数的提案。如果要实现一个goroutine上的协程可以利用这么一个调用惯例。

上下文切换

Go的上下文切换其实和操作系统的上下文切换比较类似,go的实现也是类似的,但相对来说比较简单,因为Go的调用惯例其他的寄存器在函数调用的时候是没有被使用的,主要是保存栈指针和一个指令寄存器PC就可以。Golang的抢占一开始是协作式的,入口是在函数调用的时候。在引入了异步抢占也就是让信号处理函数去切换到调度逻辑(这个切换的过程也类似后面讲到的gogo和gosave,但是他的入口是任何一个指令的地方都会发送所以要保存所有的寄存器)实现抢占以后就可以实现非协作的抢占了。

现在实现的是相对简单的协作式抢占。

0.x的代码是C写的,其中两个汇编函数实现的上下文的保存和切换。这里简单先补充一下Go中汇编的一些知识点。

函数定义:

1
 TEXT 包名·函数名(SB),一些标签,$栈帧大小-参数大小(包括返回值)

SP表示栈指针,AX是对应的ax系列的寄存器。
保存上下文的汇编函数如下,gobuf是一个结构体有两个指针成员分别是sp和pc。

1
2
3
4
5
6
7
TEXT gosave(SB), NOSPLIT, $0
MOVQ 8(SP), AX // 8(SP)是函数的第一个参数:gobuf的地址
MOVQ SP, 0(AX) // 保存SP也就是栈指针到 gobuf.sp 中。
MOVQ 0(SP), BX // 0(SP)是函数的返回地址
MOVQ BX, 8(AX) // 将函数的返回地址保存到 gobuf.pc 中。
MOVL $0, AX // return 0
RET

这段函数其实主要是保存了gosave调用时的栈指针,而返回地址就是gosave返回后的下一条指令的地址,返回值是0,这个0可以标记到这个函数是从gosave返回的,这个要结合后面的gogo来理解。
现在来看gogo

1
2
3
4
5
6
7
TEXT gogo(SB), 7, $0
MOVQ 8(SP), AX // 8(SP)是gobuf这个参数的地址
MOVQ 0(AX), SP // 将栈针修改为之前保存的SP
MOVQ 8(AX), AX // 获取PC
MOVQ AX, 0(SP) // 把 PC 放到0(SP)上
MOVL $1, AX // return 1
RET

gogo返回以后其实是返回到了之前gosave需要返回的地方,并且返回值是1。这里用寄存器做返回值是c里面的一种方式。所以如果一个gosave是返回0那么它是从真正的调用者那里返回的,如果返回的是1就是从gogo返回的,如果这里点理解了以后就可以实现一个上下文切换了。
对应Go的函数调用的版本就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TEXT ·gogogo(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ 0(AX), SP // restore SP
MOVQ 8(AX), AX
MOVQ AX, 0(SP) // put PC on the stack
MOVL $1, 16(SP) // return true
RET


TEXT ·gosave(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ SP, 0(AX) // save SP
MOVQ 0(SP), BX
MOVQ BX, 8(AX) // save PC
MOVB $0, 16(SP) // return false
RET

这面的区别是参数gobuf和返回值用了16个字节,因为Go的调用是用栈的,之前说到过。
0(SP)就是返回地址,8(SP)是gobuf,16(SP)是true或者false的返回地址。

gogoroutine的创建

原本函数做的两件事情就是分配栈和指定PC,pc只要对于一个新的gogoroutine指向函数指针就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void
sys·newproc(int32 siz, byte* fn, byte* arg0)
{
byte *stk, *sp;
G *newg;

//prints("newproc siz=");
//sys·printint(siz);
//prints(" fn=");
//sys·printpointer(fn);

siz = (siz+7) & ~7;
if(siz > 1024)
throw("sys·newproc: too many args");

lock(&sched);

if((newg = gfget()) != nil){
newg->status = Gwaiting;
stk = newg->stack0;
}else{
newg = mal(sizeof(G));
stk = mal(4096);
newg->stack0 = stk;
newg->status = Gwaiting;
newg->alllink = allg;
allg = newg;
}

到这就是分配了一段给栈用的内存,内存的具体分配后面会有一个文章讲解。

1
2
3
		// 160 这个地方是一个约定
// 一些小函数会利用栈之外的160个字节进行优化
newg->stackguard = stk+160;

上面多留出来的地方是一个x86的约定需要给出一个redzone给一些小函数优化用的,不用分配栈直接去使用栈外的这个redzone来增加效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   // 栈是从高地址向低地址增长的,至于这个 4*8是留给什么的没搞清
sp = stk + 4096 - 4*8;
newg->stackbase = sp;
// 拷贝参数
sp -= siz;
mcpy(sp, (byte*)&arg0, siz);
// 函数结束时返回到 goexit 进行收尾工作
sp -= 8;
*(byte**)sp = (byte*)sys·goexit;
// 留给 gogo 的修改返回地址用的地方
// 相当于假装在gogo的地方返回到了fn的函数指针
sp -= 8; // retpc used by gogo
newg->sched.SP = sp;
newg->sched.PC = fn;

sched.gcount++;
goidgen++;
newg->goid = goidgen;

readylocked(newg);
unlock(&sched);

//prints(" goid=");
//sys·printint(newg->goid);
//prints("\n");
}

对应的go代码,Go当中获取函数地址的方式比较tricky,需要了解interface的layout,他是一个interface的头和一个interface包含的对象的地址。FuncPC就是通过转换获取这个函数地址,对应的实现可以在Go的源码找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func NewProc(f interface{}, args ...interface{}) {
pc := FuncPC(f)
stack := Malloc(1024)
sp := stack + 1024 - 4*8
*(*uintptr)(unsafe.Pointer(sp - 8)) = FuncPC(goexit) + 1
gogoRoutine := GoGoRoutine{}
gogoRoutine.Sched.PC = pc
gogoRoutine.Sched.SP = sp - 8 - 8
gogoRoutine.Stack = stack
globalgoid++
gogoRoutine.goid = globalgoid
gogoRoutine.status = _Grunnable
ggput(&gogoRoutine)
}

调度主体

0.x版本的逻辑其实很简单,通过if(gosave)的方式可以判断是从哪里跳过来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Scheduler loop: find g to run, run it, repeat.
static void
scheduler(void)
{
G* gp;

// Initialization.
m->procid = getprocid();
lock(&sched);

if(gosave(&m->sched)){
// 这里的 gosave 返回的是 true
// 说明是通过 gogo 过来的
// 如果当前的 g 是 running 的话就保存上下文
// 切换成 runnable 放入到 queue 中
// 走出 if 去到调度逻辑。
// Jumped here via gosave/gogo, so didn'
// execute lock(&sched) above.
lock(&sched);

// Just finished running m->curg.
gp = m->curg;
gp->m = nil; // for debugger
switch(gp->status){
case Grunnable:
case Gdead:
// Shouldn't have been running!
throw("bad gp->status in sched");
case Grunning:
gp->status = Grunnable;
gput(gp);
break;
case Gmoribund:
gp->status = Gdead;
if(--sched.gcount == 0)
sys·exit(0);
break;
}
notewakeup(&gp->stopped);
}
// 真正的 gosave 是返回 false的。
// 这个地方是 gosave 的返回地址
// 也是 gogo 后 if 处理完的地方
// 在这里寻找合适的g然后运行。
// Find (or wait for) g to run. Unlocks sched.
gp = nextgandunlock();

noteclear(&gp->stopped);
gp->status = Grunning;
m->curg = gp;
gp->m = m; // for debugger
g = gp;
gogo(&gp->sched);
}

对应的go代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
   //go:noline
func schedule() {
if gosave(&sched.gg0.Sched) {
curgg := sched.curgg
switch curgg.status {
case _Grunnable:
panic("invalid status")
case _Grunning:
curgg.status = _Grunnable
ggput(curgg)
break
case _Gdead:
break
}
}
// 调度循环
for {
// println("find g")
gg := ggget()
if gg == nil {
time.Sleep(time.Second)
continue
}
gg.status = _Grunning
sched.curgg = gg
gogogo(&gg.Sched)
}

}

这样就能实现一个简单的单goroutine上跑多个gogoroutine的上下文切换了。

大规模的并发并行计算考虑的问题主要关于数据、计算的吞吐量还有容错。不管是设计者还是使用者,关心的是在期望上(因为可能会失败)运行任务的效率,从设计者的角度来讲要提供一个可靠高效的环境,从使用者的角度来讲这个东西需要足够的简单,简单意味着编程效率也保证了可靠性。大数据的整套技术栈在解决一些吞吐量大的批处理问题时得心应手,但是到了深度学习的场景,计算吞吐的要求反而提高了,比较现实点的情况是很多时候我们考虑的是能处理多大的数据量的日志,和多快能把一个ImageNet ResNet50训练完。这个区别在于 Spark 或者 Hadoop 的 MapReduce 的设计和 Tensorflow 以及 PyTorch 的设计理念不同以及对应的计算场景的不同。到了强化学习的场景又是一个大融合,强化学习的场景不光有迭代式的反向误差传播的计算,同时也包含了大规模的仿真环境的计算,Ray主要是在这方面提供了一个一站式的解决框架。分布式系统的问题某种程度上和操作系统的问题其实很类似,都是要考量如何从整个系统的角度充分利用物理资源,从错误中恢复,满足效率。这篇文章主要是比较一下几种主流的计算引擎或者框架或者说是算法的异同。

Spark(MapReduce)

我把Spark和MapReduce都归于大数据栈这一类,RDD(Resillient Distributed Dataset) 和 MapReduce
这两者的区别没有和MPI/PS的区别大那么多。

MapReduce 有 JobTracker,Spark 有 Driver

MapReduce 有 TaskTracker,Spark 有 Executor

MapReduce 中间结果是基于 HDFS,会落盘,Spark 中间结果是基于内存的,也可以落盘,主要是利用内存做缓存

MapReduce 计算抽象由Map和Reduce构成,Spark 的 RDD 有一系列的Transform和Action,封装程度更高

MapReduce 的错误处理比较简单,把失败的Map重试就好了,重试是一种非常好理解的错误处理。
Spark 的重试是根据 RDD 的有向无环图中的血缘关系计算的,可以理解为从失败的拓扑序上重新计算,也可以有中间的checkpoint。

RDD 的特性是只读的,在机器学习场景下参数不大的时候 MLLib 通过把参数存到 Driver 上来计算,当参数比较大特别是深度网络的参数大得吓人 Driver 存不下的时候,
只能通过新增 RDD,对于要频繁更新的模型参数要生成非常多的 RDD,这是 Spark 在深度学习上设计的缺陷。
一般来说一些简单的机器学习任务通过 sklearn 就能完成,当数据量比较大的时候就需要通过 Spark 的MLLib来处理。
当然 MLLib 现在也在开始从RDD-based转向SparkSQL用的Dataframe-based,从大的角度上讲两者互相融合是可行的,可能需要一些时间。
上 Spark 用 Yarn 调度 Tensorflow,还是用 Kuberenetes 调度 Spark 和 Tensorflow,我个人支持后者,而且这种分层是我比较喜欢的一种分层。

Ray

Ray 的基本抽象就是 Remote 或者 Actor,一个是分布式调用函数,一个式分布式调用类。Ray 和 Ray 的 RLlib 主要面对的问题是强化学习有大量的 simulating 的环境,比如仿真一局Dota,涉及到模拟一局Dota,反馈Agent的神经网络,是并行计算和神经网络训练的结合。当然 Ray 本身的抽象就是个分布式 goroutine,所以某种程度上可以完成的事情不光是强化学习一种任务,比如HypterTunning等一些并行计算的模型也是试用的。

反过来想,如果没有 Ray 的话,如何做这个系统呢,要构建大批量的仿真计算环境,然后能根据仿真的反馈训练神经网络。
这两个任务的调度控制就是一个问题,当然放到 k8s 的调度器里做似乎也可以,然后涉及这些分布式任务的同步问题,
需要构建这些任务的关系和信息传输,似乎用一些 DAG (比如 argo)的 workflow 也能解决,但他们之间通信的高效性似乎会是一个问题,需要
选择一种高效的远程调用传输方式,肯能gRPC也可以,还有他们的元数据管理用什么呢,自己搞个Redis似乎也行。
Ray 从这些方面综合考虑了这些问题提供了一个一站式的RL训练平台。

PS和MPI

MPI和PS的介绍有很多,我也不需要费篇幅唠叨。

PS和MPI是比较常用的分布式深度学习的训练方式,两者的主要区别在于Paramater Server,在PS的场景下参数统一走一个或者shard多个PS更新。

在MPI的场景下每个Worker是对等的(或者分层级对等,比如主机上的四张卡走NVLink,主机之间走万兆网卡)工作节点,使用AllReduce参数的同步在Worker之间进行。

总结

MPI在我出生前应该就有了,但是MapReduce之所以能火起来主要还是在 fault-tolerance 上,MPI的抽象比较基础,但是MapReduce和Spark在廉价大集群上的表现非常亮眼,对于没那么并行化理想的场景能够tolerate,时间到了深度学习的场景这玩意儿又冒出来了,一方面是深度学习的计算资源好得不得了,因为配备GPU的机器和传统的机器比起来好很多,对于这种纯粹的并行计算框架来说非常友好,错误处理的问题就没那么严重,即使是这样也慢慢开始有人着手优化MPI/PS的fault tolerance,在一些并行化退化的场景下能够把训练并行度降级不至于完全失败的工作。

从长远来看 Ray 还会有很多进化的空间,Spark 也会更好地适配深度学习场景,深度学习本身在System上的优化也层出不穷,大家对于大规模的并发并行计算系统的方方面面的要求都会越来越高。

贝叶斯优化(BO)

贝叶斯优化是一种黑盒优化方法,一般有几个特征:

输入纬度不大,一般小于 20 个,时间复杂度是$O(n^3)$
有设定的取值空间
目标函数需要是连续的(我觉得这个好像不是必须的,离散的也可以)
目标函数的计算非常消耗成本(时间等等)
目标函数是黑盒的,没有明确的结构
目标函数(derivative-free)没有一阶二阶导数(不然就可以用梯度下降去算了)

因为这些特性,在做机器学习的超参数调优的时候特别合适。

BayesOpt consists of two main components: a Bayesian statistical model for modeling the objective function, and an acquisition function for deciding where to sample next.

贝叶斯优化主要是要两个部分,一个是统计模型比如高斯过程(GP),一个是采样函数(AC)决定下一个样本从拿里获取。

高斯过程(GP)

高斯过程是贝叶斯优化中的一种统计模型,先抛开具体的数学问题,简单讲一下就是,假设我在一个变量空间(比如多个超参数)采样了一个目标函数(我们训练结果的 evaluation),然后我们会得到一个后验分布,这就类似条件概率里面,我们知道了某个事情发生以后,如果随机变量是相关的,我们有更大的概率确定其他变量的分布。例如下图,绿色是我们的分布空间,每次采样以后,分布的空间就会缩小,进而接近我们的曲线。

从数学上描述这件事情就需要高斯分布和高斯过程了。高斯分布我相信大家都耳熟能详,他由平均值 $\mu$ 还有标准差 $\sigma$ 决定分布。高斯过程的本质是一个多元高斯分布,只不过他是无限空间上的,定义高斯过程需要一个核函数定义他的协方差矩阵,也就是一个矩阵定义多个随机变量的相关性。当然了,有限的离散点用协方差矩阵可以,如果取值是连续的就需要核函数描述这个“无限”的协方差矩阵了。

协方差的核函数就很多选项,A Visual Exploration of Gaussian Processes 提供了一个非常 intuitive 的可视化来解释 GP 和各种描述协方差矩阵的核函数。

高斯过程被定义为一个随机过程,相当于每个样本点自己也是个随机函数,对于高斯分布来说,每个样本点也是一个高斯分布函数就是高斯过程,cs4780 的 Lecture 15有非常详细的解释。

Definition: A GP is a (potentially infinte) collection of random variables (RV) such that the joint distribution of every finite subset of RVs is multivariate Gaussian:
$ f \sim GP(\mu, k), $
where $\mu(\mathbf{x})$ and $k(\mathbf{x}, \mathbf{x}’)$ are the mean resp. covariance function! Now, in order to model the predictive distribution $P(f_* \mid \mathbf{x}_*, D)$ we can use a Bayesian approach by using a GP prior: $P(f\mid \mathbf{x}) \sim \mathcal{N}(\mu, \Sigma)$ and condition it on the training data $D$ to model the joint distribution of $f=f(X)$ (vector of training observations) and $f_* = f(\mathbf{x}_*)$ (prediction at test input).

采样函数

采样函数一般有 expected improvement(EI),当然还有 probability improvement(PI), upper confidence bound(UCB), knowledge gradient(KG),entropy search and predictive entropy search 等等。
采样的策略有两种:
Explore:探索新的点,这种采样有助于估计更准确的;
Exploit:利用已有结果附近的点进行采样,从而希望找到更大的;

这两个标准是互相矛盾的,如何在这两者之间寻找一个平衡点可以说是采样函数面对的主要挑战。

Expected improvement is a popular acquisition function owing to its good practical performance and an analytic form that is easy to compute. As the name suggests it rewards evaluation of the objective $f$ based on the expected improvement relative to the current best. If $f^* = \max_i y_i$ is the current best observed outcome and our goal is to maximize $f$, then EI is defined as

$\text{EI}(x) = \mathbb{E}\bigl[\max((f(x) - f^*), 0)\bigr]$

在 Facebook 的 Ax 在这里提到使用了 PyTorch 的一个 BoTorch 用的就是 EI,就是期望的增加度。

高斯分布式的期望就是 $\mu$ 所以是很好算的,$f^*$ 是已知的定值,这里有一个计算的推导。

BoTorch 提供了一个优化的 EI 叫 Noisy EI,主要功能是抗噪。

The above definition of the EI function assumes that the objective function is observed free of noise. In many types of experiments, such as those found in A/B testing and reinforcement learning, the observations are typically noisy. For these cases, BoTorch implements an efficient variant of EI, called Noisy EI, which allow for optimization of highly noisy outcomes, along with any number of constraints (i.e., ensuring that auxiliary outcomes do not increase or decrease too much). For more on Noisy EI, see our blog post.

这是采样的过程的一个示例:

应用

高斯过程本身可以用来回归和分类,使用高斯过程的贝叶斯优化有很多具体的应用场景,除了超参数优化之外,对于网络结果(层数,数据并行度)等等也是可以使用的。
除此之外像 horovod 也使用了贝叶斯优化,在这个目录下面

Horovod comes with several adjustable “knobs” that can affect runtime performance, including –fusion-threshold-mb and –cycle-time-ms (tensor fusion), –cache-capacity (response cache), and hierarchical collective algorithms –hierarchical-allreduce and –hierarchical-allgather.

他主要是服务于一些 Tensor Fusion 和 response cache 等参数以及层级 collective 通信选择,文档提到了通过设置一些参数细致控制调优的过程。

参考

  1. 一个例子搞清楚(先验分布/后验分布/似然估计)
  2. A Tutorial on Bayesian Optimization
  3. Practical Bayesian Optimization of Machine Learning Algorithms
  4. Awesome-AutoML-Papers
  5. Gaussian Processes for Machine Learning

背景

Horovod 是一个兼容主流计算框架的分布式机器学习训练框架,主要基于的算法是 AllReduce,这个是 baidu-research 在17年做的一个实现,这个东西原来是高性能计算范畴里的东西应用了 MPI 并行计算接口来实现,这是并行计算里的一个框架,已经很老了,这里有一个介绍 MPI 的 tutorial 写的比较好。

在介绍 horovod 的之前需要解释一下 AllReduce。在 MapReduce 里面 reduce 被翻译成了规约,在上面提到的 MPI tutorial 里面的解释是

Reduce is a classic concept from functional programming. Data reduction involves reducing a set of numbers into a smaller set of numbers via a function. For example, let’s say we have a list of numbers [1, 2, 3, 4, 5]. Reducing this list of numbers with the sum function would produce sum([1, 2, 3, 4, 5]) = 15. Similarly, the multiplication reduction would yield multiply([1, 2, 3, 4, 5]) = 120.

就是说把一个大的集合“缩减”成了小的集合,这里要注意的是这种缩减的计算是要满足交换律的,也就是减法或者除法是不行的,因为在并行计算当中不太好去控制计算的顺序。Reduce 就是这个意思,具体到 MPI_Reduce 就是把不同节点的数字“缩减”到一个节点上,支持的计算方式有加法乘法和取大小值等。

教程中给出的 Reduce 是求和。

AllReduce 就是在每个节点都获得 Reduce 的结果

基于这个标准就有很多的 All-Reduce 的实现,比如 Ring-Reduce,这个实现分两部分,一部分是 Scatter-Reduce 另一部分是 All-Gather。最早是在这篇 post里提到的。这个算法的好处是可以摆脱之前 PS 非常依赖 Parameter-Server 的带宽,Parameter-Server 的带宽会成为计算瓶颈的问题,而 AllReduce 可以让每个节点在带宽传输中的位置是对等的,并且减少传输次数。具体的算法可以看文章的解释,scatter-reduce 就是让每个节点有 K/N 的一个 reduce(也就是 sum),然后把自己的一个 K/N 的 reduce 再传递给其他节点,每个节点只和自己相邻的节点通信。

In the system we described, each of the N GPUs will send and receive values N-1 times for the scatter-reduce, and N-1 times for the allgather. Each time, the GPUs will send K / N values, where K is the total number of values in array being summed across the different GPUs. Therefore, the total amount of data transferred to and from every GPU is

Data Transferred=2(N−1)KN

数据传输量在 N 比较大的时候越没有影响,这就消弭了多节点给 Parameter-Server 造成的瓶颈。

还有一些其他术语,假设有 4 台 4 卡的 GPU 服务器。size 是工作进程(GPU)的数量(6),rank 是所有工作进程的 id(0-15),local rank 是当前服务器上的 id(0-3)。

Horovod 的介绍

使用 horovod 有一定的侵入性,代码需要一定的修改才能变成适配分布式训练,但是有一个好处就是适配的成本不高,并且 horovod 提供的各种框架的支持可以让 horovod 比较好的在各个框架的基础上使用,他支持 tensorflow/keras/mxnet/pytorch,MPI 的实现也有很多,比如 OpenMPI 还有 Nvidia 的 NCCL,还有 facebook 的 gloo,他们都实现了一种并行计算的通信和计算方式。而且 horovod 的本身的实现也很简单。

使用

Keras 用 ResNet50 训练 ImageNet 为例,主要侵入了几部分 hvd.init() 这个是 MPI 的初始化,让并行进程能够知道自己的 rank/local_rank 等信息。

第二部根据 local_rank(相当于单节点上的第n张卡),并且设置不占用全部显存,按需分配(可能因内没有统一管理导致显存碎片),然后传递给 keras 设置 session。

1
2
3
4
5
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))

然后在 rank 0 上恢复一个 checkpoint 并且广播给其他节点,这里的 broadcast 后面会介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
for try_epoch in range(args.epochs, 0, -1):
if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
resume_from_epoch = try_epoch
break

# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch')

# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0

设定传输的压缩函数,具体的压缩后面会提到,然后要么从之前的模型恢复要么重新训练。关键的 wrapper 在 opt 上,会给本地的 opt 包装一个 DistributedOptimizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast both model and optimizer weights
# to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch),
compression=compression)
else:
# ResNet-50 model that is included with Keras is optimized for inference.
# Add L2 weight decay & adjust BN settings.
model_config = model.get_config()
for layer, layer_config in zip(model.layers, model_config['layers']):
if hasattr(layer, 'kernel_regularizer'):
regularizer = keras.regularizers.l2(args.wd)
layer_config['config']['kernel_regularizer'] = \
{'class_name': regularizer.__class__.__name__,
'config': regularizer.get_config()}
if type(layer) == keras.layers.BatchNormalization:
layer_config['config']['momentum'] = 0.9
layer_config['config']['epsilon'] = 1e-5

model = keras.models.Model.from_config(model_config)

# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(),
momentum=args.momentum)

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt, compression=compression)

model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy', 'top_k_categorical_accuracy'])

然后设置一些回调函数,hvd.callbacks.BroadcastGlobalVariablesCallback(0) 保证的是 rank 0 上的所有参数只在 rank 0 初始化,然后广播给其他节点,后面是学习率 decay 的设置和一些统计信息的回调打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),

# Horovod: average metrics among workers at the end of every epoch.
#
# Note: This callback must be in the list before the ReduceLROnPlateau,
# TensorBoard, or other metrics-based callbacks.
hvd.callbacks.MetricAverageCallback(),

# Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose),

# Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3),
]

最后直接用 allreduce 计算一个 evaluation score。

1
2
# Evaluate the model on the full data set.
score = hvd.allreduce(model.evaluate_generator(input_fn(False, args.train_dir, args.val_batch_size),NUM_IMAGES['validation']))

实现

适配层和压缩算法

horovod 的实现主要分几部分,第一部分是一个适配层,用于兼容各种框架,比如 tensorflow 的适配就是实现一个新的 Op,这个可以参考 add new op,里面规范了 Tensorflow 自定义算子的实现。

请注意,生成的函数将获得一个蛇形名称(以符合 PEP8)。因此,如果您的操作在 C++ 文件中命名为 ZeroOut,则 Python 函数将称为 zero_out。

C++ 的定义是驼峰的,生成出来的 python 函数是下划线小写的,所以最后对应的是,适配Op的代码在 horovod/tensorflow 目录下面

C++ Python
HorovodAllgather horovod_allgather
HorovodAllreduce horovod_allreduce
HorovodBroadcast horovod_broadcast

另外在适配层可以加入一些压缩算法(在 horovod/[framework]/compression.py),我觉得压缩算法和框架无关的,放到适配层下面可能有别的原因,比如 tensorflow 默认带了一个 float16 压缩,具体的其他压缩算法比如3LC,可以通过有损压缩或者无损压缩提高带宽利用率。

统一层

这一层的实现是统一的,所有的适配层最后都是发出一些 Op+Tensor 的 Message 到队列中,后台初始化的时候会有一个专门的线程专门消费这个队列。他有一个同步消息的过程,相当于这个 tensor 在所有节点上都就绪以后就可以开始计算了,主体的流程是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// The coordinator currently follows a master-worker paradigm. Rank zero acts
// as the master (the "coordinator"), whereas all other ranks are simply
// workers. Each rank runs its own background thread which progresses in ticks.
// In each tick, the following actions happen:
//
// a) The workers send a Request to the coordinator, indicating what
// they would like to do (which tensor they would like to gather and
// reduce, as well as their shape and type). They repeat this for every
// tensor that they would like to operate on.
//
// b) The workers send an empty "DONE" message to the coordinator to
// indicate that there are no more tensors they wish to operate on.
//
// c) The coordinator receives the Requests from the workers, as well
// as from its own TensorFlow ops, and stores them in a [request table]. The
// coordinator continues to receive Request messages until it has
// received MPI_SIZE number of empty "DONE" messages.
//
// d) The coordinator finds all tensors that are ready to be reduced,
// gathered, or all operations that result in an error. For each of those,
// it sends a Response to all the workers. When no more Responses
// are available, it sends a "DONE" response to the workers. If the process
// is being shutdown, it instead sends a "SHUTDOWN" response.
//
// e) The workers listen for Response messages, processing each one by
// doing the required reduce or gather, until they receive a "DONE"
// response from the coordinator. At that point, the tick ends.
// If instead of "DONE" they receive "SHUTDOWN", they exit their background
// loop.

简单来讲就是说 coordinator 集 size 个 request DONE,然后找出就绪的 tensor (在 message_table 里面查找)构造出一个 read_to_reduce 的列表,然后发出 size 个 request 告知进程进行计算,然后 worker 接受到 response 开始真正的计算过程(通过 op_manager 具体执行)。

这是整体同步的过程,如果打开 horovod 的 trace log(HOROVOD_LOG_LEVEL=trace) 就能看到同步的过程。horovod 的主要 Op 除了 AllReduce 之外还有 allgather 和 broadcast。

算子实现层

具体的 op 在 common/op 可以看到有 NCCL/Gloo/MPI 等等的,这些由 op_manager 管理,他会根据优先级找到可以用来计算的 op 进行计算,比如 MPI 用的就是 MPI_Allreduce,具体 scatter-gather 和 all-gather openMPI 有现成的实现,NCCL 就直接调用 ncclAllReduce,比较新的 nccl 也支持跨节点的 allreduce 了,不用自己再套一层。

除了 allreduce 之外,还有两个比较重要的算子。

allgather 主要是比 allreduce 少一层 reduce,所有数据被发送到所有进程就可以。allreduce 的第二步就是把每个进程的 scatter-reduce 的 reduce 结果发送到所有进程。

broadcast 的作用是一对多的广播,主要是把初始化的参数同步给其他进程的时候使用。

总结一下 SysML 2019 的一些论文。

TICTAC

问题背景是解决分布式训练的 scale 问题。
如图,网络带宽和传输的顺序是关键因素,网络带宽很好理解,如果 Best 要提高只能加带宽,同时传输顺序如图会影响计算时间。

Communication to Computation Ratio 传输计算比,如果比值大说明效率高

  1. 增大 BatchSize,但是会过拟合(贾扬清的 Train ImageNet in 1 Hour),减小参数精度(FP16,腾讯绝艺 1024 张V100)。

  2. Overlay Coeffiecient 提供传输和计算的重合率,是 TicTac 的优化方向。

计算最优传输依赖,这是个 NP 问题,需要找近似解。recv op 是图的 root,可行的拓扑排序有很多种,解决方案就是找到近似优化排序。(原问题是 NP 问题)

几种符号

op.P 直接 Op 执行计算时间
op.M Op传输时间
op.M+ 触发计算 op 的最小传输时间

Tic

设 Communication Time 对于每个 recv op 来说都相等,从直觉上解释就是计算执行 recv op 的优先度。
如图计算对应的值

1
2
3
4
5
6
case 1
A.M = B.M = 1
case 2
A.M = B.M = 1
C.M = 1+1
D.M = 1+1+1

越小优先级越高,这个可以在 DAG 中静态算出

Tac

Communication Time 对于每个 recv op 来说有对应的时间,对应的算法。

系统设计

Time Oracle: Tensorflow metrics 计算 op time

Ordering Wizard: 计算静态依赖优先级

Enforcing: 修改 Tensorflow gRPC 子模块

P3

PRIORITY-BASED PARAMETER PROPAGATION FOR DISTRIBUTED DNN TRAINING

和上一篇论文类似,也是基于传输优化的思路,因为网络带宽基本上就是一个硬件问题。一些优化手段是用了有损压缩或者降低收敛时准确率的方式。还是通过提高 overlap 来实现。
直观的现象是如图 L4 的一次正向传播和反向传播之间的间隔很大。

解决方法:带优先级的参数切分

在 MXNet 里面,worker 算完当层的梯度
就会提一个 pull request 当其他 work 同步了这个梯度。
问题在于发送的顺序和反向传播的顺序一致,并且颗粒度
是以 layer 为单位的。

根据领域知识将大 layer 分片成小 slice。
对每个 slice 进行权重排序,优先传输权重高的 slice。
被称为 Priority-Based Parameter Propagation

如图如果参数传输有优先级能够被中断就能减少 delay

大部分模型每层的参数是不平衡的,特别是全连接的参数非常大。

对每一层切分以后利用 TCP 的全双工可以达到流水线的效果。

P3 的设计

参数切片:根据领域知识和实验选择大小(50000)
参数优先级:下一层先需要的先发送,可以抢占低优先级的

我个人觉得简单,这个比上一个更好理解也简单。

BlueConnect

主要是一个对分层(原本的一层或者两层)的 All-Reduce 的泛化算法。
首先使用递归的方法可以减少 all-gather 的传输次数。

这个只要三次 logp

ring reduce 要7次 (p-1)

问题背景

多卡之间的带宽很高 32GB/s,不同网络拓扑下,最慢的会成为瓶颈。
分两层可以解决网络和总线的带宽差异。

对于二层的一个泛化,

三层的 reduce-scatter 的例子,reduce-gather 是反的。

BEYOND DATA AND MODEL PARALLELISM FOR DEEP NEURAL NETWORKS

这个主要是提到了并行程度除了数据和模型之外可以更细粒度到参数和属性,也就是模型的小划分和样本的小划分。
SOAP= Sample Attribute Operator Parameter

全连接层有大量的参数,造成传输瓶颈。
领域特定的优化:RNN 和 CNN 用 data 并行,最后全连接用 model 并行。

搜索策略

找到最优并行策略是 NP 问题。
Execution Optimizer 通过找到最小 cost 的
并行策略寻找较优解。
Cost = tensor size/ bandwidth

随机从搜索空间选取策略,依据 MCMC
最够贪心,又不会太局部贪心。

总的来有点和 AutoML 类似,在一个搜索空间选择一个策略使得 cost 最低,一个是原本的 cost 函数,这个是
计算本身执行的时间。目前主流框架也不支持 SOAP 级别的并行划分,没手调的好,但很接近。

可以看到搜索出来的结果全连接基本上是单卡算的

行星运动我们都是按万有引力定律算的,实际上按照爱因斯坦的说法,万有引力不是惯性力是时空弯曲的效果,但是按照爱因斯坦的公式算起来特别复杂,所以现在航天轨道的设计还是按万有引力定律在算。

牛顿万有引力定律

牛顿万有引力定律:任意两个质点由通过连心线方向上的力相互吸引。该吸引力的大小与它们的质量乘积成正比,与它们距离的平方成反比,G 是万有引力常数。

$$ F=-\frac{G M \cdot m}{r^2} \cdot \frac{\vec{r}}{r} $$

多体问题

多体问题,是所有吸引力的叠加,这里排除物体相撞的情况,因为宇宙非常大,两个天体之间的距离非常远,暂时不考虑这种情况,通过加入一个小常量可以在计算上忽略这个问题。

$$ F=-\frac{G M m r}{(r^2 + \epsilon^2)^{\frac{3}{2}}} $$

这样累加的时候自己也可以加进去,因为 r 是 0,那一项相当于没算。

$$ F_j= \sum_{i=1}^{n} F_i $$

对于 N body 有很多的稍微近似的算法,多是基于实际的物理场景的,比如大部分天体是属于一个星系的,每个星系之间都非常远,所以可以将一些星系作为整体计算,构建一颗树,树中的某些节点是其所有叶子的质心,这样就可以减少计算量,但是放到 GPU 的场景下一个 O(N^2) 的暴力算法利用 GPU 的并行能力可以非常快的算出来。

Python 暴力解

下面是 N-Body 暴力实现的例子,网上有个各种语言实验的,比较好用来测试准确性,来自这里,就是两层循环。

1
2
3
4
5
6
7
8
9
10
11
12
cat <<EOF >> nbody.txt
0.01 3 20
1
0 0 0
0.01 0 0
0.1
1 1 0
0 0 0.02
0.001
0 1 1
0.01 -0.01 -0.01
EOF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import math

class Vector:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z

def __add__(self, other):
return Vector(self.x + other.x, self.y + other.y, self.z + other.z)

def __sub__(self, other):
return Vector(self.x - other.x, self.y - other.y, self.z - other.z)

def __mul__(self, other):
return Vector(self.x * other, self.y * other, self.z * other)

def __div__(self, other):
return Vector(self.x / other, self.y / other, self.z / other)

def __eq__(self, other):
if isinstance(other, Vector):
return self.x == other.x and self.y == other.y and self.z == other.z
return False

def __ne__(self, other):
return not self.__eq__(other)

def __str__(self):
return '({x}, {y}, {z})'.format(x=self.x, y=self.y, z=self.z)

def abs(self):
return math.sqrt(self.x*self.x + self.y*self.y + self.z*self.z)

origin = Vector(0, 0, 0)

class NBody:
def __init__(self, fileName):
with open(fileName, "r") as fh:
lines = fh.readlines()
gbt = lines[0].split()
self.gc = float(gbt[0])
self.bodies = int(gbt[1])
self.timeSteps = int(gbt[2])
self.masses = [0.0 for i in range(self.bodies)]
self.positions = [origin for i in range(self.bodies)]
self.velocities = [origin for i in range(self.bodies)]
self.accelerations = [origin for i in range(self.bodies)]
for i in range(self.bodies):
self.masses[i] = float(lines[i*3 + 1])
self.positions[i] = self.__decompose(lines[i*3 + 2])
self.velocities[i] = self.__decompose(lines[i*3 + 3])

print("Contents of", fileName)
for line in lines:
print(line.rstrip())
print
print("Body : x y z |")
print(" vx vy vz")

def __decompose(self, line):
xyz = line.split()
x = float(xyz[0])
y = float(xyz[1])
z = float(xyz[2])
return Vector(x, y, z)

def __computeAccelerations(self):
for i in range(self.bodies):
self.accelerations[i] = origin
for j in range(self.bodies):
if i != j:
temp = self.gc * self.masses[j] / math.pow((self.positions[i] - self.positions[j]).abs(), 3)
self.accelerations[i] += (self.positions[j] - self.positions[i]) * temp
return None

def __computePositions(self):
for i in range(self.bodies):
self.positions[i] += self.velocities[i] + self.accelerations[i] * 0.5
return None

def __computeVelocities(self):
for i in range(self.bodies):
self.velocities[i] += self.accelerations[i]
return None

def __resolveCollisions(self):
for i in range(self.bodies):
for j in range(self.bodies):
if self.positions[i] == self.positions[j]:
(self.velocities[i], self.velocities[j]) = (self.velocities[j], self.velocities[i])
return None

def simulate(self):
self.__computeAccelerations()
self.__computePositions()
self.__computeVelocities()
self.__resolveCollisions()
return None

def printResults(self):
fmt = "Body %d : % 8.6f % 8.6f % 8.6f | % 8.6f % 8.6f % 8.6f"
for i in range(self.bodies):
print(fmt % (i+1, self.positions[i].x, self.positions[i].y, self.positions[i].z, self.velocities[i].x, self.velocities[i].y, self.velocities[i].z))
return None

nb = NBody("nbody.txt")
for i in range(nb.timeSteps):
print("\nCycle %d" % (i + 1))
nb.simulate()
nb.printResults()

CUDA 并行计算

在 CUDA 中并行化首先要理解 CUDA 的层次结构,CUDA 有 grid 和 block 对线程做划分。首先设计一个 computation tile,相当于一个 block 的计算。

左侧表示一个 block 中的 p 个 body 执行 p 次,最后就能更新所有 body 的加速度,这里的内存占用是 O(p) 的不是 O(p^2),浅绿色表示的是串行执行的流,官方的文档没有更新,对应 CUDA 10 的例子里是这样的,vec3 存的是 x/y/z 轴的加速度,vec4 存的是坐标加质量,就是计算 i 受到 j 的加速度,这里没有并行设计是串行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename T>
__device__ typename vec3<T>::Type
bodyBodyInteraction(typename vec3<T>::Type ai,
typename vec4<T>::Type bi,
typename vec4<T>::Type bj)
{
typename vec3<T>::Type r;

// r_ij [3 FLOPS]
r.x = bj.x - bi.x;
r.y = bj.y - bi.y;
r.z = bj.z - bi.z;

// distSqr = dot(r_ij, r_ij) + EPS^2 [6 FLOPS]
T distSqr = r.x * r.x + r.y * r.y + r.z * r.z;
distSqr += getSofteningSquared<T>();

// invDistCube =1/distSqr^(3/2) [4 FLOPS (2 mul, 1 sqrt, 1 inv)]
T invDist = rsqrt_T(distSqr);
T invDistCube = invDist * invDist * invDist;

// s = m_j * invDistCube [1 FLOP]
T s = bj.w * invDistCube;

// a_i = a_i + s * r_ij [6 FLOPS]
ai.x += r.x * s;
ai.y += r.y * s;
ai.z += r.z * s;

return ai;
}

block 的 tile computation 就是串行计算 p 次 p 个 body 的加速度。

1
2
3
4
for (unsigned int counter = 0; counter < blockDim.x; counter++)
{
acc = bodyBodyInteraction<T>(acc, bodyPos, sharedPos[counter]);
}

每次计算完了以后要对线程做共享内存的同步

黑实线就是一次 block 共享内存同步的边界,也就是一次 tile compuation,图中是 p 个 body 按时间串行执行的流程,超出 p 的时间计算的不是 p 中的 body,看下面这张图。

也就是按照时间串行的计算每个 body 对一个 body 的加速度,然后 N/p 个 block 并行,每个 block 有 p 个线程并行,没 p 次计算每个 block 要同步一次,但是多个 block 之间不需要同步,所以每个并行线程的主体是这样的要在 tile computation 之间包一层同步的调用 cg::sync(ca),同步一次 block 中线程的共享内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename T>
__device__ typename vec3<T>::Type
computeBodyAccel(typename vec4<T>::Type bodyPos,
typename vec4<T>::Type *positions,
int numTiles, cg::thread_block cta)
{
typename vec4<T>::Type *sharedPos = SharedMemory<typename vec4<T>::Type>();

typename vec3<T>::Type acc = {0.0f, 0.0f, 0.0f};

for (int tile = 0; tile < numTiles; tile++)
{
sharedPos[threadIdx.x] = positions[tile * blockDim.x + threadIdx.x];

cg::sync(cta);
// This is the "tile_calculation" from the GPUG3 article.
#pragma unroll 128

for (unsigned int counter = 0; counter < blockDim.x; counter++)
{
acc = bodyBodyInteraction<T>(acc, bodyPos, sharedPos[counter]);
}

cg::sync(cta);
}

return acc;
}

从 GPU 的角度来看,时间复杂度是 O(N),空间复杂度也是 O(N),但前提是要开 N 个线程同时计算。

参考资料

n body nvidia

O(N) 算法计算天体运动

n body problem

模型的保存分三种类型

  1. 知道模型结构,单纯保存变量
  2. 不知道模型结构,保存模型和变量
  3. 不需要再改变量,只要常量化的模型(“冻结”)

第一种用于训练的存档,并且临时恢复,这个时候用户是把训练需要的网络结构在代码里面构造好了的,只是在一定的时间下需要暂时保存网络中的变量,为了在崩溃之后继续训练。所以自然而然会有一个问题,如果我用 Python 写的代码,需要在 C++ 当中恢复,我需要知道你的模型结构,才能恢复,这个最蠢的办法是用 C++ 把你的网络结构再构造一遍,但我们按照统一的协议(比如 Protobuf)确定网络结构,就可以直接从标准序列化的数据中解析网络结构,这就是第二种情况,独立于语言,模型和变量一起保存的情况。然后如果碰到我们不需要再训练了,比如只是把这个模型进行部署,不需要改变相关的变量,那么其实只要一个带常量的模型就可以,这就是第三种情况,把变量冻结的正向传播模型。接下来会依次解释这几种情况的工作方式。

除了这些以外,针对用于服务的模型还可以做很多的优化。

存档

存档只是单纯的保存变量,并且能够恢复,可以在一定的迭代次数以后保存变量,并且从任意一个存档开始重新训练。以两个变量加减 1 为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import tensorflow as tf

# Create some variables.
v1 = tf.get_variable("v1", shape=[3], initializer = tf.zeros_initializer)
v2 = tf.get_variable("v2", shape=[5], initializer = tf.zeros_initializer)

inc_v1 = v1.assign(v1+1)
dec_v2 = v2.assign(v2-1)

# Add an op to initialize the variables.
init_op = tf.global_variables_initializer()

# Add ops to save and restore all the variables.
saver = tf.train.Saver()

# Later, launch the model, initialize the variables, do some work, and save the
# variables to disk.
with tf.Session() as sess:
sess.run(init_op)
# Do some work with the model.
inc_v1.op.run()
dec_v2.op.run()
# Save the variables to disk.
save_path = saver.save(sess, "/tmp/tf-test/model.ckpt")
print("Model saved in path: %s" % save_path)

可以在 /tmp/tf-test 下面看到这几个文件 checkpoint model.ckpt.data-00000-of-00001 model.ckpt.index model.ckpt.meta

可以通过脚本观察保存的变量 python $tensorflow-src/tensorflow/python/tools/inspect_checkpoint.py --file_name=/tmp/tf-test/model.ckpt --all_tensors

得到保存的变量的内容,注意 model.ckpt 这个只是文件前缀。

1
2
3
4
tensor_name:  v1
[1. 1. 1.]
tensor_name: v2
[-1. -1. -1. -1. -1.]

如果要恢复的话,可以通过下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import tensorflow as tf

# Create some variables.
v1 = tf.get_variable("v1", shape=[3])
v2 = tf.get_variable("v2", shape=[5])

# Add ops to save and restore all the variables.
saver = tf.train.Saver()

# Later, launch the model, use the saver to restore variables from disk, and
# do some work with the model.
with tf.Session() as sess:
# Restore variables from disk.
saver.restore(sess, "/tmp/tf-test/model.ckpt")
print("Model restored.")
# Check the values of the variables
print("v1 : %s" % v1.eval())
print("v2 : %s" % v2.eval())

得到一样的效果

1
2
v1 : [1. 1. 1.]
v2 : [-1. -1. -1. -1. -1.]

具体来说 .meta 对应的是 MetaGraph 和 SaverGraph,.index 对应的是变量值的位置,key 是变量名,value 是变量保存的入口定义,data 变量的值具体保存的文件。这是恢复代码中已经原样构造出了 Graph,如果没有构造的化,需要通过 tf.train.import_meta_graph('/tmp/model.ckpt.meta') 来加载,但是存档保存的信息比较单一,Tensorflow 提供了一个更丰富的 API 来使用。

保存

SavedModelBuilder 保存的 API 比较丰富,能够保存多个 MetaGraph 和 Variables 的组合,除此之外还能附带 assets,并且要指定模型签名,simple_saved 的方法是一个简单版本的调用,适用于 Predict API。这里要展开一下 GraphDef, MetaGraphDef, SignatureDef, tags 这些东西的概念。对于 MetaGraph,这篇文章解释得很清楚。SignatureDef 是对应了一种图的输入和输出,可以依据这个进行 serving API 的调用,类似于函数签名,相对于一个接口的定义。

tensorflow_serving 自己给了个例子,执行 python mnist_saved_model.py /tmp/tf-test-2 以后可以获得一个目录,下面有版本 1 的模型数据,执行 saved_model_cli show --dir /tmp/tf-test-2/1 可以查看对应的签名。可以看到对应的层级关系,默认用于服务的模型会打上 serve 的标签,函数签名有两个,分别对应了 predict 和 classify 的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['predict_images']:
The given SavedModel SignatureDef contains the following input(s):
inputs['images'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 784)
name: x:0
The given SavedModel SignatureDef contains the following output(s):
outputs['scores'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: y:0
Method name is: tensorflow/serving/predict

signature_def['serving_default']:
The given SavedModel SignatureDef contains the following input(s):
inputs['inputs'] tensor_info:
dtype: DT_STRING
shape: unknown_rank
name: tf_example:0
The given SavedModel SignatureDef contains the following output(s):
outputs['classes'] tensor_info:
dtype: DT_STRING
shape: (-1, 10)
name: index_to_string_Lookup:0
outputs['scores'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: TopKV2:0
Method name is: tensorflow/serving/classify

可以参考 tensorflow 的 REST API,比如 GET http://host:port/v1/models/${MODEL_NAME}[/versions/${MODEL_VERSION}] 其实对应这个例子就是 GET http://host:port/v1/models/tf-test-2/versions/1,然后感觉函数签名不同的 method name,可以调用不同的 request,比如 POST http://host:port/v1/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]:predict 这个格式,如果输入和输出对应的是 imagesscores 那么就对应了第一个签名。

冻结

冻结的情况就是变量不再需要修改,直接把变量转化成常量保存成单一的模型,方便在部署的场景下使用。
冻结模型的代码在这里,他的主要流程如下

  1. 清除所有 Op 中的 device,让原来在指定 CPU/GPU/节点 上的 Op 不再绑定。
  2. 通过 graph_util.convert_variables_to_constants 将所有的 Variable eval 一次,把变量的 Op 的结果拿到,替换成 constant

优化

除了冻结模型以外,还可以删减一些多余的节点,比如 Summary 节点或者 Identity 节点,甚者把 16bit 的浮点数权重修改为 8bit 的浮点数权重(这个在 Tensorflow Lite 里很有用)。这篇文章 列出了详细的优化方式,主要是靠 transform_graph 这个工具,地址在,他有很详细的柴剪列表,并且可以自己编写裁剪函数,充分做到模型在部署环节的“纯净化”,调用方式也很简单。

1
2
3
4
5
6
7
8
9
10
transform_graph \
--in_graph=tensorflow_inception_graph.pb \
--out_graph=optimized_inception_graph.pb \
--inputs='Mul:0' \
--outputs='softmax:0' \
--transforms='
strip_unused_nodes(type=float, shape="1,299,299,3")
remove_nodes(op=Identity, op=CheckNumerics)
fold_old_batch_norms
'

transforms里面加入你想进行优化的 transformer 和对应的参数即可,在科赛上也有在线可以跑的notebook

参考

  1. Cloud ML Engine