ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

Prompt缓存可以从两个角度来处理:

基于相似度的外部缓存

一种是对提示词和结果做相似性对比,对结果缓存,这一部分可以在外部来做,例如 langchain的 llm caching。具体实现方法包括:

  1. 向量化存储

    • 将prompt转换为向量表示
    • 使用向量数据库(如FAISS、Milvus等)存储
    • 通过向量相似度检索相近的历史prompt
  2. 模糊匹配

    • 使用编辑距离等算法计算文本相似度
    • 设置相似度阈值进行匹配
    • 返回最相似的历史响应
  3. 缓存策略

    • LRU(最近最少使用)淘汰
    • 时间过期机制
    • 容量限制管理

基于KV Cache的内部优化

另一种是利用 KV Cache 中的交叉注意力机制,复用相同的提示词前缀,这是ChatGPT使用的方法。其工作原理是:

  1. KV Cache机制

    • 存储每个token的Key和Value计算结果
    • 避免重复计算相同前缀
    • 提高推理性能
  2. 增量计算

    • 只对新增的token进行注意力计算
    • 复用已缓存的中间状态
    • 显著减少计算量
  3. 内存管理

    • 自动清理过期缓存
    • 动态调整缓存大小
    • 优化内存使用

在实际应用中,我们可以综合运用这两种缓存方法来优化性能:

  • 对于完全相同或高度相似的prompt,优先使用外部缓存机制
  • 对于部分重叠的prompt,则可以利用KV Cache机制
  • 具体使用哪种策略,需要根据实际场景和资源限制来权衡选择

值得注意的是,KV Cache中的Key和Value都包含了位置编码信息。这意味着要充分发挥prompt缓存的作用,需要确保提示词保持相同的前缀结构。如果提示词的位置发生变化,即使内容相同,对应的KV值也会不同。

具体来说,当两次不同的推理过程中,如果prompt具有相同的提示词前缀,那么这部分的KV计算结果是完全一致的,因此可以直接复用之前推理过程中的KV cache,从而提高推理效率。

最近有一个有趣的论文:通过使用DSL(领域特定语言)来描述prompt结构,可以更精确地控制位置编码。这种方法不仅能够缓存相同的前缀,还支持缓存相同的后缀,同时允许中间部分灵活变动,进一步提升了缓存的效率。但我个人感觉比较难用,等于给本来很灵活的prompt套上了一层结构化的描述语言,这种结构化的语言如果是一些GPT应用的开发有固定模式可能还好,但是通用场景下很难让用户能够用得起来这么专业的描述语言。

Grok-1.5 Infra 的技术报告中可以窥见,Grok-1.5 在基础设施方面具有以下核心优势:

  1. 先进的分布式训练框架:基于 JAX、Rust 和 Kubernetes 的技术栈,不仅确保了高性能,还能快速适配和训练新的模型架构。
  2. 卓越的可靠性和可用性:通过自研的训练协调器,系统能够智能地检测并隔离故障节点,大幅降低训练任务中断的风险。
  3. 高效的存储与数据处理:在检查点存储、数据加载和训练作业重启等环节都进行了深度优化,将训练过程中的停机时间降至最低。

另一个典型案例是 Meta 的 Research Super Compute (RSC) 超算集群:

  1. 算力规模:
    • 早期配置:760台 NVIDIA DGX A100 系统,总计 6,080 个 GPU
    • 当前规模:已升级至 24,576 张 H100 GPU,算力获得质的飞跃
  2. 网络互联:采用双网络方案
    • NVIDIA Quantum InfiniBand,带宽高达 1600 Gb/s
    • RoCE(RDMA over Converged Ethernet)作为补充互联方案
  3. 存储系统:采用自研的 Tectonic 文件系统,通过 FUSE 提供标准的 Linux 文件系统接口,确保高效的数据访问

从这些实践可以看出,现代 AI 基础设施主要围绕三大核心要素展开:

  • 计算能力(以 GPU 为核心)
  • 网络互联(RoCE 或 InfiniBand)
  • 存储系统

而在上层的编排调度领域,系统的容错能力和可靠性则成为关键考量因素。

GPU 架构与互联

在当前AI训练领域,主流的GPU型号主要是NVIDIA的A100、H100和H200系列,它们按照发布时间依次提供了更强大的算力和更优化的架构设计。关于GPU的详细架构,特别是其拓扑结构,可以参考这篇深度解析文章

GPU互联拓扑

GPU之间的互联拓扑结构主要取决于不同总线间的传输特性,GPU之间可以通过NVIDIA专有的NVLink高速互联技术直接通信。在现代GPU集群中,主要有以下几种互联方式:

  1. NVSwitch架构:通过NVIDIA的交换架构实现所有GPU之间的全互联
  2. 走网卡,如果卡之间没有NVSwitch的话,可以绕过CPU走网卡:
    1
    GPU0 -> PCIe -> IB(InfiniBand) -> PCIe -> GPU1
    这种通信模式由NCCL(NVIDIA Collective Communications Library)负责协调和优化。

GPU分配策略

NVIDIA开源的go-gpuallocator库提供了一系列基于拓扑关系的GPU分配策略。例如,其中的NewStaticDGX1Policy专门针对DGX-1标准配置优化。考虑到单机环境下GPU组合的可能性有限,这种基于静态规则的分配策略已经能够很好地满足需求。

这些分配策略的核心目标是最小化跨总线和跨NUMA节点的通信开销,确保GPU间通信尽可能利用最高带宽的数据通路,从而提供最优的训练性能。

跨节点的通信

在分布式训练场景下,跨节点通信需要经过更长的数据传输路径:

1
GPU -> NIC -> 叶层交换机 -> 核心交换机 -> NIC -> GPU

这种通信模式面临两个主要的优化方向:

  1. 本地化优化:尽可能将相关联的GPU任务分配在物理位置相近的节点上,以减少网络延迟。

  2. 负载均衡:避免将所有任务集中在同一交换机下,防止出现网络拥塞。过度集中可能导致局部带宽饱和,反而降低整体训练效率。

这种权衡本质上是一个网络流优化问题。通过图论中的网络流算法,可以在通信延迟和带宽利用率之间找到最优平衡点,从而实现更高效的跨节点通信。

一个分布式训练的带宽瓶颈来源于带宽最低的那条路径。

利用 Kubernetes Pod 亲和性优化网络拓扑

在 Kubernetes 环境下,我们可以通过 Pod 亲和性(Affinity)和规则来优化 GPU 任务的分配。主要可以从以下几个方面入手:

拓扑感知调度:使用 topologyKey 确保相关联的 Pod 被调度到网络拓扑上接近的节点:
例如同一个分布式训练任务(training-group = group1)尽让分配在一个机架上,同交换机,同核心交换机也是类似的。

1
2
3
4
5
6
7
8
9
10
11
12
affinity:
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 50
podAffinityTerm:
labelSelector:
matchExpressions:
- key: training-group
operator: In
values:
- group1
topologyKey: topology.kubernetes.io/rack # 同机架优先

这种方案的优势在于:

  • 配置简单,易于理解和维护
  • 充分利用 Kubernetes 原生能力,无需额外组件
  • 可以根据实际需求灵活调整权重和策略

存储系统

AI训练中的存储系统面临着两个主要挑战:

1. 海量小文件问题

AI训练数据集通常包含大量的小文件,这对传统文件系统的性能和管理造成了巨大压力。一些现代分布式文件系统提供了很好的解决方案,例如 Meta 的 Tectonic 和与其架构类似的 JuiceFS,它们采用了以下优化方案:

元数据管理优化

  • 使用元数据库管理文件结构,将 ls 命令转化为简单的字符串前缀匹配操作
  • 避免了传统 Linux 文件系统依赖 inode 管理的方式
  • 解决了 inode 臃肿问题(在传统系统中,一个 inode 的大小可能与文件本身相当)

2. Checkpoint 存储挑战

分布式训练中的 checkpoint 文件体积巨大,这在大语言模型训练中尤为明显:

  • 以 LLaMA-2-70B 为例,单个完整的 checkpoint 就需要 140GB 存储空间(FP16格式)
  • 训练过程中需要定期保存 checkpoint,累积存储需求可能达到 TB 甚至 PB 级别
  • 需要存储系统能够提供高带宽和低延迟的读写性能,同时保证数据的可靠性

这些挑战要求存储系统具备:

  • 强大的扩展性
  • 高效的数据压缩能力
  • 智能的数据分层存储机制
  • 可靠的数据备份和恢复能力

训练的稳定性

在大规模 AI 训练中,硬件故障是一个常见问题。特别是新型号显卡往往会有较高的故障率,再加上传统的硬件错误,这些都可能导致训练中断。因此,快速识别错误并恢复训练成为了一个关键挑战。目前主流的解决方案主要有以下两种:

基于 torchrun 的弹性训练

torchrun 提供了两种容错机制:简单重试和弹性训练。

  1. 简单重试模式
    通过 --max-restarts 参数配置重试次数:

    1
    2
    3
    4
    5
    6
    7
    8
    torchrun \
    --nnodes=$NUM_NODES \
    --nproc-per-node=$NUM_TRAINERS \
    --max-restarts=3 \
    --rdzv-id=$JOB_ID \
    --rdzv-backend=c10d \
    --rdzv-endpoint=$HOST_NODE_ADDR \
    YOUR_TRAINING_SCRIPT.py [script args...]
  2. 弹性训练模式
    通过设置 nnodes 的范围来支持动态节点数:

    1
    2
    3
    4
    5
    6
    7
    8
    torchrun \
    --nnodes=1:4 \ # 支持1-4个节点的动态伸缩
    --nproc-per-node=$NUM_TRAINERS \
    --max-restarts=3 \
    --rdzv-id=$JOB_ID \
    --rdzv-backend=c10d \
    --rdzv-endpoint=$HOST_NODE_ADDR \
    YOUR_TRAINING_SCRIPT.py [script args...]

弹性训练模式需要配置服务发现机制,默认使用 c10d 作为内置的节点发现服务,也支持使用 etcd 等外部服务。

当节点发生变化时,系统会自动处理以下场景:

  • 节点离开(缩容):系统通知 agent,停止现有 workers,重新组建 WorkerGroup,使用新的 RANK 和 WORLD_SIZE 启动所有 workers
  • 节点加入(扩容):接纳新节点,按照相同流程重组 WorkerGroup

基于 DeepSpeed 的弹性训练

DeepSpeed 提供了更细粒度的弹性训练配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"elasticity": {
"enabled": true,
"max_train_batch_size": "seqlen",
"micro_batch_sizes": 8,
"min_gpus": 1024,
"max_gpus": "fixed_linear",
"min_time": "seqlen",
"version": 8,
"ignore_non_elastic_batch_info": 1024,
"num_gpus_per_node": "fixed_linear",
"model_parallel_size": MODEL_PARALLEL_SIZE
}
}

DeepSpeed 的特点是:

  • 支持动态调整 batch size
  • 以 GPU 为粒度进行弹性伸缩(而不是节点级别)
  • 提供更丰富的训练参数配置

弹性训练控制器

要实现完整的弹性训练支持,控制器需要:

  1. 依赖服务发现机制进行节点注册和健康检查
  2. 动态调整弹性策略(如 min_nodes、max_nodes 等参数)

对于简单的降级场景,通过静态配置即可实现:

  • 将 max_nodes 设置为总资源规格
  • 将 min_nodes 设置为最小运行要求(如设置为 1:4 表示支持 1-4 张显卡的动态伸缩)

节点的问题发现

可以使用node-promblem-detector
node-problem-detector 是一个用于在集群管理栈的上游层次中使各个节点问题可见的守护进程。它在每个节点上运行,检测节点问题并将其报告给 apiserver。

总结

To train our largest Llama 3 models, we combined three types of parallelization: data parallelization, model parallelization, and pipeline parallelization. Our most efficient implementation achieves a compute utilization of over 400 TFLOPS per GPU when trained on 16K GPUs simultaneously. We performed training runs on two custom-built 24K GPU clusters. To maximize GPU uptime, we developed an advanced new training stack that automates error detection, handling, and maintenance. We also greatly improved our hardware reliability and detection mechanisms for silent data corruption, and we developed new scalable storage systems that reduce overheads of checkpointing and rollback. Those improvements resulted in an overall effective training time of more than 95%. Combined, these improvements increased the efficiency of Llama 3 training by ~three times compared to Llama 2.

LLAMA3 的技术博客揭示了许多令人振奋的优化成果,这些优化背后蕴含着大量值得深入研究的技术细节。虽然我们可能难以直接接触如此大规模的训练集群及其面临的挑战,但这些技术进展仍然为整个 AI 基础设施领域提供了宝贵的参考和启发。

ebpf是内核当中一个非常重要的功能,简单来说就是以虚拟机的形式提供一种在内核中执行嵌入字节码的能力。
ebpf定义的指令集也非常简答,但是写起来比较麻烦所以作为工具提供了libbpf和bpf-tool。
用户可以通过写C的形式写ebpf程序嵌入到内核中,同时在用户态进行交互,用户态的程序没有语言限制。
ebpf特性的对应内核版本列表在,后面例子中的ringbuffer map是要内核版本在5.8以上。
ebpf的架构和工具链的一个比较完整的文档是cilium的一个文档

vmlinux.h是通过bpftool生成的一个虚拟头文件,用于访问内核数据结构。

libbpf 和 bpftool 都是在内核的代码仓库里面开发的。对应的目录分别是tools/lib/bpftools/bpf,帮助用户用c开发和调试ebpf程序。
相当于说bpftool是ebpf的调试和观测工具,libbpf是一提供给开发者的ebpf库。

bpf_herplers.h有一个section的定义SEC用来决定在ebpf的.o对象文件中的位置,以及一些功能。
比如SEC(kprobe/xxx)就代表修饰的程序会嵌入到内核函数的调用中。

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* Helper macro to place programs, maps, license in
* different sections in elf_bpf file. Section names
* are interpreted by libbpf depending on the context (BPF programs, BPF maps,
* extern variables, etc).
* To allow use of SEC() with externs (e.g., for extern .maps declarations),
* make sure __attribute__((unused)) doesn't trigger compilation warning.
*/
#define SEC(name) \
_Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wignored-attributes\"") \
__attribute__((section(name), used)) \
_Pragma("GCC diagnostic pop") \

libbpf.c 有对应的SEC的定义。

1
2
3
4
5
6
#define SEC_DEF(sec_pfx, ptype, ...) {                                      \
.sec = sec_pfx, \
.len = sizeof(sec_pfx) - 1, \
.prog_type = BPF_PROG_TYPE_##ptype, \
__VA_ARGS__ \
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static const struct bpf_sec_def section_defs[] = {
BPF_PROG_SEC("socket", BPF_PROG_TYPE_SOCKET_FILTER),
BPF_EAPROG_SEC("sk_reuseport/migrate", BPF_PROG_TYPE_SK_REUSEPORT,
BPF_SK_REUSEPORT_SELECT_OR_MIGRATE),
BPF_EAPROG_SEC("sk_reuseport", BPF_PROG_TYPE_SK_REUSEPORT,
BPF_SK_REUSEPORT_SELECT),
SEC_DEF("kprobe/", KPROBE,
.attach_fn = attach_kprobe),
BPF_PROG_SEC("uprobe/", BPF_PROG_TYPE_KPROBE),
SEC_DEF("kretprobe/", KPROBE,
.attach_fn = attach_kprobe),
BPF_PROG_SEC("uretprobe/", BPF_PROG_TYPE_KPROBE),
BPF_PROG_SEC("classifier", BPF_PROG_TYPE_SCHED_CLS),
BPF_PROG_SEC("action", BPF_PROG_TYPE_SCHED_ACT),
SEC_DEF("tracepoint/", TRACEPOINT,
.attach_fn = attach_tp),
SEC_DEF("tp/", TRACEPOINT,
.attach_fn = attach_tp),
SEC_DEF("raw_tracepoint/", RAW_TRACEPOINT,
.attach_fn = attach_raw_tp),

kprobe的attach方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct bpf_link *bpf_program__attach_kprobe(struct bpf_program *prog,
bool retprobe,
const char *func_name)
{
char errmsg[STRERR_BUFSIZE];
struct bpf_link *link;
int pfd, err;

pfd = perf_event_open_probe(false /* uprobe */, retprobe, func_name,
0 /* offset */, -1 /* pid */);
if (pfd < 0) {
pr_warn("prog '%s': failed to create %s '%s' perf event: %s\n",
prog->name, retprobe ? "kretprobe" : "kprobe", func_name,
libbpf_strerror_r(pfd, errmsg, sizeof(errmsg)));
return libbpf_err_ptr(pfd);
}
link = bpf_program__attach_perf_event(prog, pfd);
err = libbpf_get_error(link);
if (err) {
close(pfd);
pr_warn("prog '%s': failed to attach to %s '%s': %s\n",
prog->name, retprobe ? "kretprobe" : "kprobe", func_name,
libbpf_strerror_r(err, errmsg, sizeof(errmsg)));
return libbpf_err_ptr(err);
}
return link;
}

根据函数名用perf_event_open_probe注入program

内核对于oom的处理主要在mm/oom_kill.c中。
oom的触发就是在内存无法分配的时候,选择一个最“差”的进程发送kill信号。
oom_kill_process这个函数是主要入口,定义是static void oom_kill_process(struct oom_control *oc, const char *message)
其中oc->victim是要被杀掉的进程,如果有cgroup会把相同内存cgroup的进程都杀掉,所以如果要检测一个进程的oom可以通过检测这个函数的参数做到。

下面这段就是OOM的时候dmesg看到的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

/* Get a reference to safely compare mm after task_unlock(victim) */
mm = victim->mm;
mmgrab(mm);

/* Raise event before sending signal: task reaper must see this */
count_vm_event(OOM_KILL);
memcg_memory_event_mm(mm, MEMCG_OOM_KILL);

/*
* We should send SIGKILL before granting access to memory reserves
* in order to prevent the OOM victim from depleting the memory
* reserves from the user space under its control.
*/
do_send_sig_info(SIGKILL, SEND_SIG_PRIV, victim, PIDTYPE_TGID);
mark_oom_victim(victim);
pr_err("%s: Killed process %d (%s) total-vm:%lukB, anon-rss:%lukB, file-rss:%lukB, shmem-rss:%lukB, UID:%u pgtables:%lukB oom_score_adj:%hd\n",
message, task_pid_nr(victim), victim->comm, K(mm->total_vm),
K(get_mm_counter(mm, MM_ANONPAGES)),
K(get_mm_counter(mm, MM_FILEPAGES)),
K(get_mm_counter(mm, MM_SHMEMPAGES)),
from_kuid(&init_user_ns, task_uid(victim)),
mm_pgtables_bytes(mm) >> 10, victim->signal->oom_score_adj);
task_unlock(victim);

对于内核函数的检测需要ebpf当中的kprobe的能力。
kprobe类似单步调试的能力,在函数入口插入一个breakpoint,然后通过trap让执行流转到注册的ebpf的程序。

oomkill的内核中的ebpf程序主要参考datadog的agent的实现。
框架程序主要参考cilium/ebpf当中的例子

PT_REGS_PARM1是一个宏可以帮助读取内核函数的参数,因为按照约定ebpf都是通过寄存器传参的,所以其实返回的是第1个参数对应的寄存器。bpf_probe_read是一个用于读取内存到ebpf程序中的辅助函数。
所以oomkill的ebpf实现的内核中的代码就比较简单,嵌入oom_kill_process的函数调用,或者要被结束的进程复制出进程的pid和command,然后通过类型为BPF_MAP_TYPE_RINGBUF的map发送event。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// +build ignore

#include "vmlinux.h"
#include "bpf_helpers.h"
#include "bpf_tracing.h"

char __license[] SEC("license") = "Dual MIT/GPL";

struct event {
u32 pid;
u8 comm[80];
};

struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} events SEC(".maps");

// Force emitting struct event into the ELF.
const struct event *unused __attribute__((unused));

SEC("kprobe/oom_kill_process")
int kprobe_oom_kill_process(struct pt_regs *ctx) {
struct event *task_info;
struct oom_control *oc = (struct oom_control *)PT_REGS_PARM1(ctx);

task_info = bpf_ringbuf_reserve(&events, sizeof(struct event), 0);

if (!task_info) {
return 0;
}

struct task_struct *p;
bpf_probe_read(&p, sizeof(p), &oc->chosen);
bpf_probe_read(&task_info->pid, sizeof(task_info->pid), &p->pid);
bpf_probe_read(&task_info->comm, sizeof(task_info->comm), (void *)&p->comm);

bpf_ringbuf_submit(task_info, 0);

return 0;
}

用户态的程序也比较简单,把ebpf的对象文件attach以后读取ringbuffer别解析event。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"golang.org/x/sys/unix"
)

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -type event -target amd64 bpf oom_kill_kernel.c

func main() {
// Name of the kernel function to trace.
fn := "oom_kill_process"

// Subscribe to signals for terminating the program.
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)

// Allow the current process to lock memory for eBPF resources.
if err := rlimit.RemoveMemlock(); err != nil {
log.Fatal(err)
}

// Load pre-compiled programs and maps into the kernel.
objs := bpfObjects{}
if err := loadBpfObjects(&objs, nil); err != nil {
log.Fatalf("loading objects: %v", err)
}
defer objs.Close()

fmt.Println("probe")
// Open a Kprobe at the entry point of the kernel function and attach the
// pre-compiled program. Each time the kernel function enters, the program
// will emit an event containing pid and command of the execved task.
kp, err := link.Kprobe(fn, objs.KprobeOomKillProcess, nil)
if err != nil {
log.Fatalf("opening kprobe: %s", err)
}
defer kp.Close()
fmt.Println("attach done")

// Open a ringbuf reader from userspace RINGBUF map described in the
// eBPF C program.
rd, err := ringbuf.NewReader(objs.Events)
if err != nil {
log.Fatalf("opening ringbuf reader: %s", err)
}
defer rd.Close()

// Close the reader when the process receives a signal, which will exit
// the read loop.
go func() {
<-stopper

if err := rd.Close(); err != nil {
log.Fatalf("closing ringbuf reader: %s", err)
}
}()

log.Println("Waiting for events..")

// bpfEvent is generated by bpf2go.
var event bpfEvent
for {
record, err := rd.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Println("Received signal, exiting..")
return
}
log.Printf("reading from reader: %s", err)
continue
}

// Parse the ringbuf event entry into a bpfEvent structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Printf("parsing ringbuf event: %s", err)
continue
}

log.Printf("pid: %d\tcomm: %s\n", event.Pid, unix.ByteSliceToString(event.Comm[:]))
}
}

使用一个python脚本进行测试OOM。

1
2
3
a="111111111111111"
while True:
a+=a

最后运行结果

1
2
3
4
probe
attach done
Waiting for events..
pid: 16541 comm: python

ebpf的能力还不止于此,业界做了很多流量控制,性能监控的实践,包括calico用ebpf的实现替代kube-proxy,waeve-scope 用ebpf记录tcp连接,cilium 也用eBPF,结合了tc和XDP。datadog-agent 也是使用了ebpf做一些监控,上面的oomkill的例子也是参考datadog的。ebpf能够让linux内核变得更动态更灵活。甚至有一种说法是eBPF让Linux内核正在变成微内核。

Knative主要有两个重要的部分,一个是自动扩展一个是流量切换,并且目前的设计已经和istio是相对独立的了。

Knative的service和k8s的service容易混淆,所以用sks指代knative的service,然后service本身是指k8s本身的service。

sks的revision创建的时候会有两个service,一个是 public service,一个是 private service,如果用istio的话会看到一个和ingressgateway关联的有externalname的service,这个service是和ingress实现相关的。主要的实现还是public和private两个service,实现和具体的ingress的实现是独立的。

Private service 对应的是真实的pod的endpoints,public service 有两种模式一个serve模式,一个是proxy模式。当public service出于proxy模式时,其endpoints指向的是activator,当处于serve模式时endpoints指向的是private service所指向的后端endpoints。

在scale从0到1的过程中,会先阻塞在activator上,当有pod启动以后,还是会保持proxy模式,直到超过burst才会切换到private service的endpoints上。在从1到0的过程中会再切换回activator,直到有新的请求到来再触发pod的启动。

activator

Throttler

Throttler is the interface that Handler calls to Try to proxy the user request

Health Handler 注册用于 kubelet 做 readiness 和 health probe 的接口,返回statSink的状态,收到 signal term 的时候就开始返回500。

Network probe handler
knative组件用来 probe 的接口,在header里面会有区分。

Context handler
把header中的revision的name和namespace注入的context当中

Metric Handler
收集request的qps等metrics的信息。

Log Handler
请求日志

Tracing Handler
Trace spans

Cocurrency report Handler

记录请求信息,这些信息会被reporter上报

Activator handler

过一层 throttler 进行proxy,如果没受限制就会proxy request。

Throttler 会根据revID创建一个throttler,revision如果存在的话就会创建throttler。(revision肯定是一直在的哪怕没有起pod)如果超过revision的并发数就会退出。

Throttler 会try对应的revisionThrottler的pods然后转发过去。

controller

Controller 主要是对用户使用的几个CRD的同步:Service、Route、Revision、Configuration。

net-istio

Ingress 的一种实现

internal 的 ingress创建一个 ingress virtualservice 并且将gate指定为isito-gateway,其他的ingress实现其实类似,只是目前没有traefik的支持。knative有没有istio现在是没啥区别了。

domain mappings

一个用于扩展域名的CRD DomainClaim,会根据domainclaim创建一个ingress。

queue-proxy

tracing metrics breaker 都是正常操作。本质是个sidecar层面的反向代理。

Metrcis

Admin

有一个给cocurrency endpoint发 paused 和 resumed 的回调。

Main

Proxy handler 收集 request in 和 request out。

自定义默认域名的问题

kubectl get cm config-network -n knative-serving -o yaml 可以看到默认的模板去修改他

1
domain-template: "{{.Name}}.{{.Namespace}}.{{.Domain}}"

WASM 作为一种通用字节码可以让很多语言编译成 WASM 跑在沙箱VM环境当中:跑在浏览器的 V8 引擎上,让别的语言可以写前端、
构建成 Istio 的代理 Envoy 的 WASM 插件形成一些过滤器或者日志插件(有点像内核的 eBPF)、作为各种语言的浇水层(让Rust调用Go编译出的WASM)。

The major problem is that, whilst the Go compiler supports WebAssembly, it does not support WASI (WebAssembly System Interface). It generates an ABI that is deeply tied to JavaScript, and one needs to use the wasm_exec.js file provided by the Go toolchain, which doesn’t work outside a JavaScript host.

Go 的 WASM 目前没有支持 WASI,类似于没有 native 的系统调用,所以很多文件、网络的请求不能通过系统执行,目前只能跑在浏览器通过浏览器的接口执行。
TinyGo 是一个支持比较好的 WASM 的 Go 的运行时,有些需要WASI的项目就需要TinyGo来构建。

调度

WASM 是一个单线程的环境,js 是一个基于事件的模型,对于 goroutine 的调度是如何进行的,本着这个好奇研究了一下 Go 本身的 WASM 运行时。比如下面这段代码就明确了 Go 的 WASM 没有多线程也就没有跑 sysmon。
没有sysmon就相当于sysmon的forcegc触发gc的部分也没有,只能靠主动GC和内存分配的时候触发超过阈值开始gc。

1
2
3
4
5
6
7
8
9
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
// For runtime_syscall_doAllThreadsSyscall, we
// register sysmon is not ready for the world to be
// stopped.
atomic.Store(&sched.sysmonStarting, 1)
systemstack(func() {
newm(sysmon, nil, -1)
})
}

在这里回忆一下 GMP 的关系。P的数量由启动时环境变量 $GOMAXPROCS 或者是由 runtime 的方法 GOMAXPROCS() 决定。这意味着在程序执行的任意时刻都只有 $GOMAXPROCS 个 goroutine 在同时运行。
M的数量可以通过 runtime/debug 中的 SetMaxThreads 函数,设置 M 的最大数量一个 M 阻塞了,会创建新的 M。
M 与 P 的数量没有绝对关系,一个 M 阻塞,P 就会去创建或者切换另一个 M,所以,即使 P 的默认数量是 1,也有可能会创建很多个 M 出来。
在确定了 P 的最大数量 n 后,运行时系统会根据这个数量创建 n 个 P。
没有足够的 M 来关联 P 并运行其中的可运行的 G。比如所有的 M 此时都阻塞住了,而 P 中还有很多就绪任务,就会去寻找空闲的 M,而没有空闲的,就会去创建新的 M。

这个关系在对应的代码里面也有体现,在 osinit 的时候会强制将 P 设置为1,newosproc 也是空的,无法启动新进程。

1
2
3
4
5
func osinit() {
ncpu = 1
getg().m.procid = 2
physPageSize = 64 * 1024
}

没有 sysmon 也就没有异步抢占,只能靠 goroutine 之间的协作式抢占来切换 goroutine。

在 schedule 里面也有一段专用的调度逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// wasm only:
// If a callback returned and no other goroutine is awake,
// then wake event handler goroutine which pauses execution
// until a callback was triggered.
gp, otherReady := beforeIdle(now, pollUntil)
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
if otherReady {
goto top
}

等于是在 wasm 中在找不到 G 的时候会启动一个 pause goroutine 占住当前的 P。

Lock_js 有 handle event 的逻辑。

内存分配

通过 WASM 的内存分配接口代替 linux 里面的 mmap 接管过来做内存分配。

系统调用

系统调用是使用 js 实现的,而且也不是像 linux 那种完整一套系统调用这个可能需要 WASI 的支持。
没有系统调用 M 也不会阻塞,等于是一个完全的单 P 单 M 多 G 的模型。

1
2
3
var jsProcess = js.Global().Get("process")
var jsFS = js.Global().Get("fs")
var constants = jsFS.Get("constants")

目前实现的版本是参照go0.x的一个实现也就是只有一个m,在m上调度g,对应的是我们只用一个goroutine然后在goroutine上调度多个gogoroutine。在实现gogoroutine之前需要介绍一下go的调用惯例,这个文章讲的比较清楚。目前Golang在传递参数和返回值的时候是通过栈传递的。gcc会使用寄存器传递参数,golang有一个使用寄存器传递参数的提案。如果要实现一个goroutine上的协程可以利用这么一个调用惯例。

上下文切换

Go的上下文切换其实和操作系统的上下文切换比较类似,go的实现也是类似的,但相对来说比较简单,因为Go的调用惯例其他的寄存器在函数调用的时候是没有被使用的,主要是保存栈指针和一个指令寄存器PC就可以。Golang的抢占一开始是协作式的,入口是在函数调用的时候。在引入了异步抢占也就是让信号处理函数去切换到调度逻辑(这个切换的过程也类似后面讲到的gogo和gosave,但是他的入口是任何一个指令的地方都会发送所以要保存所有的寄存器)实现抢占以后就可以实现非协作的抢占了。

现在实现的是相对简单的协作式抢占。

0.x的代码是C写的,其中两个汇编函数实现的上下文的保存和切换。这里简单先补充一下Go中汇编的一些知识点。

函数定义:

1
 TEXT 包名·函数名(SB),一些标签,$栈帧大小-参数大小(包括返回值)

SP表示栈指针,AX是对应的ax系列的寄存器。
保存上下文的汇编函数如下,gobuf是一个结构体有两个指针成员分别是sp和pc。

1
2
3
4
5
6
7
TEXT gosave(SB), NOSPLIT, $0
MOVQ 8(SP), AX // 8(SP)是函数的第一个参数:gobuf的地址
MOVQ SP, 0(AX) // 保存SP也就是栈指针到 gobuf.sp 中。
MOVQ 0(SP), BX // 0(SP)是函数的返回地址
MOVQ BX, 8(AX) // 将函数的返回地址保存到 gobuf.pc 中。
MOVL $0, AX // return 0
RET

这段函数其实主要是保存了gosave调用时的栈指针,而返回地址就是gosave返回后的下一条指令的地址,返回值是0,这个0可以标记到这个函数是从gosave返回的,这个要结合后面的gogo来理解。
现在来看gogo

1
2
3
4
5
6
7
TEXT gogo(SB), 7, $0
MOVQ 8(SP), AX // 8(SP)是gobuf这个参数的地址
MOVQ 0(AX), SP // 将栈针修改为之前保存的SP
MOVQ 8(AX), AX // 获取PC
MOVQ AX, 0(SP) // 把 PC 放到0(SP)上
MOVL $1, AX // return 1
RET

gogo返回以后其实是返回到了之前gosave需要返回的地方,并且返回值是1。这里用寄存器做返回值是c里面的一种方式。所以如果一个gosave是返回0那么它是从真正的调用者那里返回的,如果返回的是1就是从gogo返回的,如果这里点理解了以后就可以实现一个上下文切换了。
对应Go的函数调用的版本就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TEXT ·gogogo(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ 0(AX), SP // restore SP
MOVQ 8(AX), AX
MOVQ AX, 0(SP) // put PC on the stack
MOVL $1, 16(SP) // return true
RET


TEXT ·gosave(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ SP, 0(AX) // save SP
MOVQ 0(SP), BX
MOVQ BX, 8(AX) // save PC
MOVB $0, 16(SP) // return false
RET

这面的区别是参数gobuf和返回值用了16个字节,因为Go的调用是用栈的,之前说到过。
0(SP)就是返回地址,8(SP)是gobuf,16(SP)是true或者false的返回地址。

gogoroutine的创建

原本函数做的两件事情就是分配栈和指定PC,pc只要对于一个新的gogoroutine指向函数指针就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void
sys·newproc(int32 siz, byte* fn, byte* arg0)
{
byte *stk, *sp;
G *newg;

//prints("newproc siz=");
//sys·printint(siz);
//prints(" fn=");
//sys·printpointer(fn);

siz = (siz+7) & ~7;
if(siz > 1024)
throw("sys·newproc: too many args");

lock(&sched);

if((newg = gfget()) != nil){
newg->status = Gwaiting;
stk = newg->stack0;
}else{
newg = mal(sizeof(G));
stk = mal(4096);
newg->stack0 = stk;
newg->status = Gwaiting;
newg->alllink = allg;
allg = newg;
}

到这就是分配了一段给栈用的内存,内存的具体分配后面会有一个文章讲解。

1
2
3
		// 160 这个地方是一个约定
// 一些小函数会利用栈之外的160个字节进行优化
newg->stackguard = stk+160;

上面多留出来的地方是一个x86的约定需要给出一个redzone给一些小函数优化用的,不用分配栈直接去使用栈外的这个redzone来增加效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   // 栈是从高地址向低地址增长的,至于这个 4*8是留给什么的没搞清
sp = stk + 4096 - 4*8;
newg->stackbase = sp;
// 拷贝参数
sp -= siz;
mcpy(sp, (byte*)&arg0, siz);
// 函数结束时返回到 goexit 进行收尾工作
sp -= 8;
*(byte**)sp = (byte*)sys·goexit;
// 留给 gogo 的修改返回地址用的地方
// 相当于假装在gogo的地方返回到了fn的函数指针
sp -= 8; // retpc used by gogo
newg->sched.SP = sp;
newg->sched.PC = fn;

sched.gcount++;
goidgen++;
newg->goid = goidgen;

readylocked(newg);
unlock(&sched);

//prints(" goid=");
//sys·printint(newg->goid);
//prints("\n");
}

对应的go代码,Go当中获取函数地址的方式比较tricky,需要了解interface的layout,他是一个interface的头和一个interface包含的对象的地址。FuncPC就是通过转换获取这个函数地址,对应的实现可以在Go的源码找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func NewProc(f interface{}, args ...interface{}) {
pc := FuncPC(f)
stack := Malloc(1024)
sp := stack + 1024 - 4*8
*(*uintptr)(unsafe.Pointer(sp - 8)) = FuncPC(goexit) + 1
gogoRoutine := GoGoRoutine{}
gogoRoutine.Sched.PC = pc
gogoRoutine.Sched.SP = sp - 8 - 8
gogoRoutine.Stack = stack
globalgoid++
gogoRoutine.goid = globalgoid
gogoRoutine.status = _Grunnable
ggput(&gogoRoutine)
}

调度主体

0.x版本的逻辑其实很简单,通过if(gosave)的方式可以判断是从哪里跳过来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Scheduler loop: find g to run, run it, repeat.
static void
scheduler(void)
{
G* gp;

// Initialization.
m->procid = getprocid();
lock(&sched);

if(gosave(&m->sched)){
// 这里的 gosave 返回的是 true
// 说明是通过 gogo 过来的
// 如果当前的 g 是 running 的话就保存上下文
// 切换成 runnable 放入到 queue 中
// 走出 if 去到调度逻辑。
// Jumped here via gosave/gogo, so didn'
// execute lock(&sched) above.
lock(&sched);

// Just finished running m->curg.
gp = m->curg;
gp->m = nil; // for debugger
switch(gp->status){
case Grunnable:
case Gdead:
// Shouldn't have been running!
throw("bad gp->status in sched");
case Grunning:
gp->status = Grunnable;
gput(gp);
break;
case Gmoribund:
gp->status = Gdead;
if(--sched.gcount == 0)
sys·exit(0);
break;
}
notewakeup(&gp->stopped);
}
// 真正的 gosave 是返回 false的。
// 这个地方是 gosave 的返回地址
// 也是 gogo 后 if 处理完的地方
// 在这里寻找合适的g然后运行。
// Find (or wait for) g to run. Unlocks sched.
gp = nextgandunlock();

noteclear(&gp->stopped);
gp->status = Grunning;
m->curg = gp;
gp->m = m; // for debugger
g = gp;
gogo(&gp->sched);
}

对应的go代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
   //go:noline
func schedule() {
if gosave(&sched.gg0.Sched) {
curgg := sched.curgg
switch curgg.status {
case _Grunnable:
panic("invalid status")
case _Grunning:
curgg.status = _Grunnable
ggput(curgg)
break
case _Gdead:
break
}
}
// 调度循环
for {
// println("find g")
gg := ggget()
if gg == nil {
time.Sleep(time.Second)
continue
}
gg.status = _Grunning
sched.curgg = gg
gogogo(&gg.Sched)
}

}

这样就能实现一个简单的单goroutine上跑多个gogoroutine的上下文切换了。

大规模的并发并行计算考虑的问题主要关于数据、计算的吞吐量还有容错。不管是设计者还是使用者,关心的是在期望上(因为可能会失败)运行任务的效率,从设计者的角度来讲要提供一个可靠高效的环境,从使用者的角度来讲这个东西需要足够的简单,简单意味着编程效率也保证了可靠性。大数据的整套技术栈在解决一些吞吐量大的批处理问题时得心应手,但是到了深度学习的场景,计算吞吐的要求反而提高了,比较现实点的情况是很多时候我们考虑的是能处理多大的数据量的日志,和多快能把一个ImageNet ResNet50训练完。这个区别在于 Spark 或者 Hadoop 的 MapReduce 的设计和 Tensorflow 以及 PyTorch 的设计理念不同以及对应的计算场景的不同。到了强化学习的场景又是一个大融合,强化学习的场景不光有迭代式的反向误差传播的计算,同时也包含了大规模的仿真环境的计算,Ray主要是在这方面提供了一个一站式的解决框架。分布式系统的问题某种程度上和操作系统的问题其实很类似,都是要考量如何从整个系统的角度充分利用物理资源,从错误中恢复,满足效率。这篇文章主要是比较一下几种主流的计算引擎或者框架或者说是算法的异同。

Spark(MapReduce)

我把Spark和MapReduce都归于大数据栈这一类,RDD(Resillient Distributed Dataset) 和 MapReduce
这两者的区别没有和MPI/PS的区别大那么多。

MapReduce 有 JobTracker,Spark 有 Driver

MapReduce 有 TaskTracker,Spark 有 Executor

MapReduce 中间结果是基于 HDFS,会落盘,Spark 中间结果是基于内存的,也可以落盘,主要是利用内存做缓存

MapReduce 计算抽象由Map和Reduce构成,Spark 的 RDD 有一系列的Transform和Action,封装程度更高

MapReduce 的错误处理比较简单,把失败的Map重试就好了,重试是一种非常好理解的错误处理。
Spark 的重试是根据 RDD 的有向无环图中的血缘关系计算的,可以理解为从失败的拓扑序上重新计算,也可以有中间的checkpoint。

RDD 的特性是只读的,在机器学习场景下参数不大的时候 MLLib 通过把参数存到 Driver 上来计算,当参数比较大特别是深度网络的参数大得吓人 Driver 存不下的时候,
只能通过新增 RDD,对于要频繁更新的模型参数要生成非常多的 RDD,这是 Spark 在深度学习上设计的缺陷。
一般来说一些简单的机器学习任务通过 sklearn 就能完成,当数据量比较大的时候就需要通过 Spark 的MLLib来处理。
当然 MLLib 现在也在开始从RDD-based转向SparkSQL用的Dataframe-based,从大的角度上讲两者互相融合是可行的,可能需要一些时间。
上 Spark 用 Yarn 调度 Tensorflow,还是用 Kuberenetes 调度 Spark 和 Tensorflow,我个人支持后者,而且这种分层是我比较喜欢的一种分层。

Ray

Ray 的基本抽象就是 Remote 或者 Actor,一个是分布式调用函数,一个式分布式调用类。Ray 和 Ray 的 RLlib 主要面对的问题是强化学习有大量的 simulating 的环境,比如仿真一局Dota,涉及到模拟一局Dota,反馈Agent的神经网络,是并行计算和神经网络训练的结合。当然 Ray 本身的抽象就是个分布式 goroutine,所以某种程度上可以完成的事情不光是强化学习一种任务,比如HypterTunning等一些并行计算的模型也是试用的。

反过来想,如果没有 Ray 的话,如何做这个系统呢,要构建大批量的仿真计算环境,然后能根据仿真的反馈训练神经网络。
这两个任务的调度控制就是一个问题,当然放到 k8s 的调度器里做似乎也可以,然后涉及这些分布式任务的同步问题,
需要构建这些任务的关系和信息传输,似乎用一些 DAG (比如 argo)的 workflow 也能解决,但他们之间通信的高效性似乎会是一个问题,需要
选择一种高效的远程调用传输方式,肯能gRPC也可以,还有他们的元数据管理用什么呢,自己搞个Redis似乎也行。
Ray 从这些方面综合考虑了这些问题提供了一个一站式的RL训练平台。

PS和MPI

MPI和PS的介绍有很多,我也不需要费篇幅唠叨。

PS和MPI是比较常用的分布式深度学习的训练方式,两者的主要区别在于Paramater Server,在PS的场景下参数统一走一个或者shard多个PS更新。

在MPI的场景下每个Worker是对等的(或者分层级对等,比如主机上的四张卡走NVLink,主机之间走万兆网卡)工作节点,使用AllReduce参数的同步在Worker之间进行。

总结

MPI在我出生前应该就有了,但是MapReduce之所以能火起来主要还是在 fault-tolerance 上,MPI的抽象比较基础,但是MapReduce和Spark在廉价大集群上的表现非常亮眼,对于没那么并行化理想的场景能够tolerate,时间到了深度学习的场景这玩意儿又冒出来了,一方面是深度学习的计算资源好得不得了,因为配备GPU的机器和传统的机器比起来好很多,对于这种纯粹的并行计算框架来说非常友好,错误处理的问题就没那么严重,即使是这样也慢慢开始有人着手优化MPI/PS的fault tolerance,在一些并行化退化的场景下能够把训练并行度降级不至于完全失败的工作。

从长远来看 Ray 还会有很多进化的空间,Spark 也会更好地适配深度学习场景,深度学习本身在System上的优化也层出不穷,大家对于大规模的并发并行计算系统的方方面面的要求都会越来越高。

贝叶斯优化(BO)

贝叶斯优化是一种黑盒优化方法,一般有几个特征:

输入纬度不大,一般小于 20 个,时间复杂度是$O(n^3)$
有设定的取值空间
目标函数需要是连续的(我觉得这个好像不是必须的,离散的也可以)
目标函数的计算非常消耗成本(时间等等)
目标函数是黑盒的,没有明确的结构
目标函数(derivative-free)没有一阶二阶导数(不然就可以用梯度下降去算了)

因为这些特性,在做机器学习的超参数调优的时候特别合适。

BayesOpt consists of two main components: a Bayesian statistical model for modeling the objective function, and an acquisition function for deciding where to sample next.

贝叶斯优化主要是要两个部分,一个是统计模型比如高斯过程(GP),一个是采样函数(AC)决定下一个样本从拿里获取。

高斯过程(GP)

高斯过程是贝叶斯优化中的一种统计模型,先抛开具体的数学问题,简单讲一下就是,假设我在一个变量空间(比如多个超参数)采样了一个目标函数(我们训练结果的 evaluation),然后我们会得到一个后验分布,这就类似条件概率里面,我们知道了某个事情发生以后,如果随机变量是相关的,我们有更大的概率确定其他变量的分布。例如下图,绿色是我们的分布空间,每次采样以后,分布的空间就会缩小,进而接近我们的曲线。

从数学上描述这件事情就需要高斯分布和高斯过程了。高斯分布我相信大家都耳熟能详,他由平均值 $\mu$ 还有标准差 $\sigma$ 决定分布。高斯过程的本质是一个多元高斯分布,只不过他是无限空间上的,定义高斯过程需要一个核函数定义他的协方差矩阵,也就是一个矩阵定义多个随机变量的相关性。当然了,有限的离散点用协方差矩阵可以,如果取值是连续的就需要核函数描述这个“无限”的协方差矩阵了。

协方差的核函数就很多选项,A Visual Exploration of Gaussian Processes 提供了一个非常 intuitive 的可视化来解释 GP 和各种描述协方差矩阵的核函数。

高斯过程被定义为一个随机过程,相当于每个样本点自己也是个随机函数,对于高斯分布来说,每个样本点也是一个高斯分布函数就是高斯过程,cs4780 的 Lecture 15有非常详细的解释。

Definition: A GP is a (potentially infinte) collection of random variables (RV) such that the joint distribution of every finite subset of RVs is multivariate Gaussian:
$ f \sim GP(\mu, k), $
where $\mu(\mathbf{x})$ and $k(\mathbf{x}, \mathbf{x}’)$ are the mean resp. covariance function! Now, in order to model the predictive distribution $P(f_* \mid \mathbf{x}_*, D)$ we can use a Bayesian approach by using a GP prior: $P(f\mid \mathbf{x}) \sim \mathcal{N}(\mu, \Sigma)$ and condition it on the training data $D$ to model the joint distribution of $f=f(X)$ (vector of training observations) and $f_* = f(\mathbf{x}_*)$ (prediction at test input).

采样函数

采样函数一般有 expected improvement(EI),当然还有 probability improvement(PI), upper confidence bound(UCB), knowledge gradient(KG),entropy search and predictive entropy search 等等。
采样的策略有两种:
Explore:探索新的点,这种采样有助于估计更准确的;
Exploit:利用已有结果附近的点进行采样,从而希望找到更大的;

这两个标准是互相矛盾的,如何在这两者之间寻找一个平衡点可以说是采样函数面对的主要挑战。

Expected improvement is a popular acquisition function owing to its good practical performance and an analytic form that is easy to compute. As the name suggests it rewards evaluation of the objective $f$ based on the expected improvement relative to the current best. If $f^* = \max_i y_i$ is the current best observed outcome and our goal is to maximize $f$, then EI is defined as

$\text{EI}(x) = \mathbb{E}\bigl[\max((f(x) - f^*), 0)\bigr]$

在 Facebook 的 Ax 在这里提到使用了 PyTorch 的一个 BoTorch 用的就是 EI,就是期望的增加度。

高斯分布式的期望就是 $\mu$ 所以是很好算的,$f^*$ 是已知的定值,这里有一个计算的推导。

BoTorch 提供了一个优化的 EI 叫 Noisy EI,主要功能是抗噪。

The above definition of the EI function assumes that the objective function is observed free of noise. In many types of experiments, such as those found in A/B testing and reinforcement learning, the observations are typically noisy. For these cases, BoTorch implements an efficient variant of EI, called Noisy EI, which allow for optimization of highly noisy outcomes, along with any number of constraints (i.e., ensuring that auxiliary outcomes do not increase or decrease too much). For more on Noisy EI, see our blog post.

这是采样的过程的一个示例:

应用

高斯过程本身可以用来回归和分类,使用高斯过程的贝叶斯优化有很多具体的应用场景,除了超参数优化之外,对于网络结果(层数,数据并行度)等等也是可以使用的。
除此之外像 horovod 也使用了贝叶斯优化,在这个目录下面

Horovod comes with several adjustable “knobs” that can affect runtime performance, including –fusion-threshold-mb and –cycle-time-ms (tensor fusion), –cache-capacity (response cache), and hierarchical collective algorithms –hierarchical-allreduce and –hierarchical-allgather.

他主要是服务于一些 Tensor Fusion 和 response cache 等参数以及层级 collective 通信选择,文档提到了通过设置一些参数细致控制调优的过程。

参考

  1. 一个例子搞清楚(先验分布/后验分布/似然估计)
  2. A Tutorial on Bayesian Optimization
  3. Practical Bayesian Optimization of Machine Learning Algorithms
  4. Awesome-AutoML-Papers
  5. Gaussian Processes for Machine Learning

背景

Horovod 是一个兼容主流计算框架的分布式机器学习训练框架,主要基于的算法是 AllReduce,这个是 baidu-research 在17年做的一个实现,这个东西原来是高性能计算范畴里的东西应用了 MPI 并行计算接口来实现,这是并行计算里的一个框架,已经很老了,这里有一个介绍 MPI 的 tutorial 写的比较好。

在介绍 horovod 的之前需要解释一下 AllReduce。在 MapReduce 里面 reduce 被翻译成了规约,在上面提到的 MPI tutorial 里面的解释是

Reduce is a classic concept from functional programming. Data reduction involves reducing a set of numbers into a smaller set of numbers via a function. For example, let’s say we have a list of numbers [1, 2, 3, 4, 5]. Reducing this list of numbers with the sum function would produce sum([1, 2, 3, 4, 5]) = 15. Similarly, the multiplication reduction would yield multiply([1, 2, 3, 4, 5]) = 120.

就是说把一个大的集合“缩减”成了小的集合,这里要注意的是这种缩减的计算是要满足交换律的,也就是减法或者除法是不行的,因为在并行计算当中不太好去控制计算的顺序。Reduce 就是这个意思,具体到 MPI_Reduce 就是把不同节点的数字“缩减”到一个节点上,支持的计算方式有加法乘法和取大小值等。

教程中给出的 Reduce 是求和。

AllReduce 就是在每个节点都获得 Reduce 的结果

基于这个标准就有很多的 All-Reduce 的实现,比如 Ring-Reduce,这个实现分两部分,一部分是 Scatter-Reduce 另一部分是 All-Gather。最早是在这篇 post里提到的。这个算法的好处是可以摆脱之前 PS 非常依赖 Parameter-Server 的带宽,Parameter-Server 的带宽会成为计算瓶颈的问题,而 AllReduce 可以让每个节点在带宽传输中的位置是对等的,并且减少传输次数。具体的算法可以看文章的解释,scatter-reduce 就是让每个节点有 K/N 的一个 reduce(也就是 sum),然后把自己的一个 K/N 的 reduce 再传递给其他节点,每个节点只和自己相邻的节点通信。

In the system we described, each of the N GPUs will send and receive values N-1 times for the scatter-reduce, and N-1 times for the allgather. Each time, the GPUs will send K / N values, where K is the total number of values in array being summed across the different GPUs. Therefore, the total amount of data transferred to and from every GPU is

Data Transferred=2(N−1)KN

数据传输量在 N 比较大的时候越没有影响,这就消弭了多节点给 Parameter-Server 造成的瓶颈。

还有一些其他术语,假设有 4 台 4 卡的 GPU 服务器。size 是工作进程(GPU)的数量(6),rank 是所有工作进程的 id(0-15),local rank 是当前服务器上的 id(0-3)。

Horovod 的介绍

使用 horovod 有一定的侵入性,代码需要一定的修改才能变成适配分布式训练,但是有一个好处就是适配的成本不高,并且 horovod 提供的各种框架的支持可以让 horovod 比较好的在各个框架的基础上使用,他支持 tensorflow/keras/mxnet/pytorch,MPI 的实现也有很多,比如 OpenMPI 还有 Nvidia 的 NCCL,还有 facebook 的 gloo,他们都实现了一种并行计算的通信和计算方式。而且 horovod 的本身的实现也很简单。

使用

Keras 用 ResNet50 训练 ImageNet 为例,主要侵入了几部分 hvd.init() 这个是 MPI 的初始化,让并行进程能够知道自己的 rank/local_rank 等信息。

第二部根据 local_rank(相当于单节点上的第n张卡),并且设置不占用全部显存,按需分配(可能因内没有统一管理导致显存碎片),然后传递给 keras 设置 session。

1
2
3
4
5
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))

然后在 rank 0 上恢复一个 checkpoint 并且广播给其他节点,这里的 broadcast 后面会介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
for try_epoch in range(args.epochs, 0, -1):
if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
resume_from_epoch = try_epoch
break

# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch')

# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0

设定传输的压缩函数,具体的压缩后面会提到,然后要么从之前的模型恢复要么重新训练。关键的 wrapper 在 opt 上,会给本地的 opt 包装一个 DistributedOptimizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast both model and optimizer weights
# to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch),
compression=compression)
else:
# ResNet-50 model that is included with Keras is optimized for inference.
# Add L2 weight decay & adjust BN settings.
model_config = model.get_config()
for layer, layer_config in zip(model.layers, model_config['layers']):
if hasattr(layer, 'kernel_regularizer'):
regularizer = keras.regularizers.l2(args.wd)
layer_config['config']['kernel_regularizer'] = \
{'class_name': regularizer.__class__.__name__,
'config': regularizer.get_config()}
if type(layer) == keras.layers.BatchNormalization:
layer_config['config']['momentum'] = 0.9
layer_config['config']['epsilon'] = 1e-5

model = keras.models.Model.from_config(model_config)

# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(),
momentum=args.momentum)

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt, compression=compression)

model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy', 'top_k_categorical_accuracy'])

然后设置一些回调函数,hvd.callbacks.BroadcastGlobalVariablesCallback(0) 保证的是 rank 0 上的所有参数只在 rank 0 初始化,然后广播给其他节点,后面是学习率 decay 的设置和一些统计信息的回调打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),

# Horovod: average metrics among workers at the end of every epoch.
#
# Note: This callback must be in the list before the ReduceLROnPlateau,
# TensorBoard, or other metrics-based callbacks.
hvd.callbacks.MetricAverageCallback(),

# Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose),

# Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3),
]

最后直接用 allreduce 计算一个 evaluation score。

1
2
# Evaluate the model on the full data set.
score = hvd.allreduce(model.evaluate_generator(input_fn(False, args.train_dir, args.val_batch_size),NUM_IMAGES['validation']))

实现

适配层和压缩算法

horovod 的实现主要分几部分,第一部分是一个适配层,用于兼容各种框架,比如 tensorflow 的适配就是实现一个新的 Op,这个可以参考 add new op,里面规范了 Tensorflow 自定义算子的实现。

请注意,生成的函数将获得一个蛇形名称(以符合 PEP8)。因此,如果您的操作在 C++ 文件中命名为 ZeroOut,则 Python 函数将称为 zero_out。

C++ 的定义是驼峰的,生成出来的 python 函数是下划线小写的,所以最后对应的是,适配Op的代码在 horovod/tensorflow 目录下面

C++ Python
HorovodAllgather horovod_allgather
HorovodAllreduce horovod_allreduce
HorovodBroadcast horovod_broadcast

另外在适配层可以加入一些压缩算法(在 horovod/[framework]/compression.py),我觉得压缩算法和框架无关的,放到适配层下面可能有别的原因,比如 tensorflow 默认带了一个 float16 压缩,具体的其他压缩算法比如3LC,可以通过有损压缩或者无损压缩提高带宽利用率。

统一层

这一层的实现是统一的,所有的适配层最后都是发出一些 Op+Tensor 的 Message 到队列中,后台初始化的时候会有一个专门的线程专门消费这个队列。他有一个同步消息的过程,相当于这个 tensor 在所有节点上都就绪以后就可以开始计算了,主体的流程是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// The coordinator currently follows a master-worker paradigm. Rank zero acts
// as the master (the "coordinator"), whereas all other ranks are simply
// workers. Each rank runs its own background thread which progresses in ticks.
// In each tick, the following actions happen:
//
// a) The workers send a Request to the coordinator, indicating what
// they would like to do (which tensor they would like to gather and
// reduce, as well as their shape and type). They repeat this for every
// tensor that they would like to operate on.
//
// b) The workers send an empty "DONE" message to the coordinator to
// indicate that there are no more tensors they wish to operate on.
//
// c) The coordinator receives the Requests from the workers, as well
// as from its own TensorFlow ops, and stores them in a [request table]. The
// coordinator continues to receive Request messages until it has
// received MPI_SIZE number of empty "DONE" messages.
//
// d) The coordinator finds all tensors that are ready to be reduced,
// gathered, or all operations that result in an error. For each of those,
// it sends a Response to all the workers. When no more Responses
// are available, it sends a "DONE" response to the workers. If the process
// is being shutdown, it instead sends a "SHUTDOWN" response.
//
// e) The workers listen for Response messages, processing each one by
// doing the required reduce or gather, until they receive a "DONE"
// response from the coordinator. At that point, the tick ends.
// If instead of "DONE" they receive "SHUTDOWN", they exit their background
// loop.

简单来讲就是说 coordinator 集 size 个 request DONE,然后找出就绪的 tensor (在 message_table 里面查找)构造出一个 read_to_reduce 的列表,然后发出 size 个 request 告知进程进行计算,然后 worker 接受到 response 开始真正的计算过程(通过 op_manager 具体执行)。

这是整体同步的过程,如果打开 horovod 的 trace log(HOROVOD_LOG_LEVEL=trace) 就能看到同步的过程。horovod 的主要 Op 除了 AllReduce 之外还有 allgather 和 broadcast。

算子实现层

具体的 op 在 common/op 可以看到有 NCCL/Gloo/MPI 等等的,这些由 op_manager 管理,他会根据优先级找到可以用来计算的 op 进行计算,比如 MPI 用的就是 MPI_Allreduce,具体 scatter-gather 和 all-gather openMPI 有现成的实现,NCCL 就直接调用 ncclAllReduce,比较新的 nccl 也支持跨节点的 allreduce 了,不用自己再套一层。

除了 allreduce 之外,还有两个比较重要的算子。

allgather 主要是比 allreduce 少一层 reduce,所有数据被发送到所有进程就可以。allreduce 的第二步就是把每个进程的 scatter-reduce 的 reduce 结果发送到所有进程。

broadcast 的作用是一对多的广播,主要是把初始化的参数同步给其他进程的时候使用。

总结一下 SysML 2019 的一些论文。

TICTAC

问题背景是解决分布式训练的 scale 问题。
如图,网络带宽和传输的顺序是关键因素,网络带宽很好理解,如果 Best 要提高只能加带宽,同时传输顺序如图会影响计算时间。

Communication to Computation Ratio 传输计算比,如果比值大说明效率高

  1. 增大 BatchSize,但是会过拟合(贾扬清的 Train ImageNet in 1 Hour),减小参数精度(FP16,腾讯绝艺 1024 张V100)。

  2. Overlay Coeffiecient 提供传输和计算的重合率,是 TicTac 的优化方向。

计算最优传输依赖,这是个 NP 问题,需要找近似解。recv op 是图的 root,可行的拓扑排序有很多种,解决方案就是找到近似优化排序。(原问题是 NP 问题)

几种符号

op.P 直接 Op 执行计算时间
op.M Op传输时间
op.M+ 触发计算 op 的最小传输时间

Tic

设 Communication Time 对于每个 recv op 来说都相等,从直觉上解释就是计算执行 recv op 的优先度。
如图计算对应的值

1
2
3
4
5
6
case 1
A.M = B.M = 1
case 2
A.M = B.M = 1
C.M = 1+1
D.M = 1+1+1

越小优先级越高,这个可以在 DAG 中静态算出

Tac

Communication Time 对于每个 recv op 来说有对应的时间,对应的算法。

系统设计

Time Oracle: Tensorflow metrics 计算 op time

Ordering Wizard: 计算静态依赖优先级

Enforcing: 修改 Tensorflow gRPC 子模块

P3

PRIORITY-BASED PARAMETER PROPAGATION FOR DISTRIBUTED DNN TRAINING

和上一篇论文类似,也是基于传输优化的思路,因为网络带宽基本上就是一个硬件问题。一些优化手段是用了有损压缩或者降低收敛时准确率的方式。还是通过提高 overlap 来实现。
直观的现象是如图 L4 的一次正向传播和反向传播之间的间隔很大。

解决方法:带优先级的参数切分

在 MXNet 里面,worker 算完当层的梯度
就会提一个 pull request 当其他 work 同步了这个梯度。
问题在于发送的顺序和反向传播的顺序一致,并且颗粒度
是以 layer 为单位的。

根据领域知识将大 layer 分片成小 slice。
对每个 slice 进行权重排序,优先传输权重高的 slice。
被称为 Priority-Based Parameter Propagation

如图如果参数传输有优先级能够被中断就能减少 delay

大部分模型每层的参数是不平衡的,特别是全连接的参数非常大。

对每一层切分以后利用 TCP 的全双工可以达到流水线的效果。

P3 的设计

参数切片:根据领域知识和实验选择大小(50000)
参数优先级:下一层先需要的先发送,可以抢占低优先级的

我个人觉得简单,这个比上一个更好理解也简单。

BlueConnect

主要是一个对分层(原本的一层或者两层)的 All-Reduce 的泛化算法。
首先使用递归的方法可以减少 all-gather 的传输次数。

这个只要三次 logp

ring reduce 要7次 (p-1)

问题背景

多卡之间的带宽很高 32GB/s,不同网络拓扑下,最慢的会成为瓶颈。
分两层可以解决网络和总线的带宽差异。

对于二层的一个泛化,

三层的 reduce-scatter 的例子,reduce-gather 是反的。

BEYOND DATA AND MODEL PARALLELISM FOR DEEP NEURAL NETWORKS

这个主要是提到了并行程度除了数据和模型之外可以更细粒度到参数和属性,也就是模型的小划分和样本的小划分。
SOAP= Sample Attribute Operator Parameter

全连接层有大量的参数,造成传输瓶颈。
领域特定的优化:RNN 和 CNN 用 data 并行,最后全连接用 model 并行。

搜索策略

找到最优并行策略是 NP 问题。
Execution Optimizer 通过找到最小 cost 的
并行策略寻找较优解。
Cost = tensor size/ bandwidth

随机从搜索空间选取策略,依据 MCMC
最够贪心,又不会太局部贪心。

总的来有点和 AutoML 类似,在一个搜索空间选择一个策略使得 cost 最低,一个是原本的 cost 函数,这个是
计算本身执行的时间。目前主流框架也不支持 SOAP 级别的并行划分,没手调的好,但很接近。

可以看到搜索出来的结果全连接基本上是单卡算的