To train our largest Llama 3 models, we combined three types of parallelization: data parallelization, model parallelization, and pipeline parallelization. Our most efficient implementation achieves a compute utilization of over 400 TFLOPS per GPU when trained on 16K GPUs simultaneously. We performed training runs on two custom-built 24K GPU clusters. To maximize GPU uptime, we developed an advanced new training stack that automates error detection, handling, and maintenance. We also greatly improved our hardware reliability and detection mechanisms for silent data corruption, and we developed new scalable storage systems that reduce overheads of checkpointing and rollback. Those improvements resulted in an overall effective training time of more than 95%. Combined, these improvements increased the efficiency of Llama 3 training by ~three times compared to Llama 2.
LLAMA3 的技术博客揭示了许多令人振奋的优化成果,这些优化背后蕴含着大量值得深入研究的技术细节。虽然我们可能难以直接接触如此大规模的训练集群及其面临的挑战,但这些技术进展仍然为整个 AI 基础设施领域提供了宝贵的参考和启发。
/* Get a reference to safely compare mm after task_unlock(victim) */ mm = victim->mm; mmgrab(mm);
/* Raise event before sending signal: task reaper must see this */ count_vm_event(OOM_KILL); memcg_memory_event_mm(mm, MEMCG_OOM_KILL);
/* * We should send SIGKILL before granting access to memory reserves * in order to prevent the OOM victim from depleting the memory * reserves from the user space under its control. */ do_send_sig_info(SIGKILL, SEND_SIG_PRIV, victim, PIDTYPE_TGID); mark_oom_victim(victim); pr_err("%s: Killed process %d (%s) total-vm:%lukB, anon-rss:%lukB, file-rss:%lukB, shmem-rss:%lukB, UID:%u pgtables:%lukB oom_score_adj:%hd\n", message, task_pid_nr(victim), victim->comm, K(mm->total_vm), K(get_mm_counter(mm, MM_ANONPAGES)), K(get_mm_counter(mm, MM_FILEPAGES)), K(get_mm_counter(mm, MM_SHMEMPAGES)), from_kuid(&init_user_ns, task_uid(victim)), mm_pgtables_bytes(mm) >> 10, victim->signal->oom_score_adj); task_unlock(victim);
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -type event -target amd64 bpf oom_kill_kernel.c
func main() { // Name of the kernel function to trace. fn := "oom_kill_process"
// Subscribe to signals for terminating the program. stopper := make(chan os.Signal, 1) signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
// Allow the current process to lock memory for eBPF resources. if err := rlimit.RemoveMemlock(); err != nil { log.Fatal(err) }
// Load pre-compiled programs and maps into the kernel. objs := bpfObjects{} if err := loadBpfObjects(&objs, nil); err != nil { log.Fatalf("loading objects: %v", err) } defer objs.Close()
fmt.Println("probe") // Open a Kprobe at the entry point of the kernel function and attach the // pre-compiled program. Each time the kernel function enters, the program // will emit an event containing pid and command of the execved task. kp, err := link.Kprobe(fn, objs.KprobeOomKillProcess, nil) if err != nil { log.Fatalf("opening kprobe: %s", err) } defer kp.Close() fmt.Println("attach done")
// Open a ringbuf reader from userspace RINGBUF map described in the // eBPF C program. rd, err := ringbuf.NewReader(objs.Events) if err != nil { log.Fatalf("opening ringbuf reader: %s", err) } defer rd.Close()
// Close the reader when the process receives a signal, which will exit // the read loop. go func() { <-stopper
sks的revision创建的时候会有两个service,一个是 public service,一个是 private service,如果用istio的话会看到一个和ingressgateway关联的有externalname的service,这个service是和ingress实现相关的。主要的实现还是public和private两个service,实现和具体的ingress的实现是独立的。
Private service 对应的是真实的pod的endpoints,public service 有两种模式一个serve模式,一个是proxy模式。当public service出于proxy模式时,其endpoints指向的是activator,当处于serve模式时endpoints指向的是private service所指向的后端endpoints。
The major problem is that, whilst the Go compiler supports WebAssembly, it does not support WASI (WebAssembly System Interface). It generates an ABI that is deeply tied to JavaScript, and one needs to use the wasm_exec.js file provided by the Go toolchain, which doesn’t work outside a JavaScript host.
Go 的 WASM 目前没有支持 WASI,类似于没有 native 的系统调用,所以很多文件、网络的请求不能通过系统执行,目前只能跑在浏览器通过浏览器的接口执行。 TinyGo 是一个支持比较好的 WASM 的 Go 的运行时,有些需要WASI的项目就需要TinyGo来构建。
调度
WASM 是一个单线程的环境,js 是一个基于事件的模型,对于 goroutine 的调度是如何进行的,本着这个好奇研究了一下 Go 本身的 WASM 运行时。比如下面这段代码就明确了 Go 的 WASM 没有多线程也就没有跑 sysmon。 没有sysmon就相当于sysmon的forcegc触发gc的部分也没有,只能靠主动GC和内存分配的时候触发超过阈值开始gc。
1 2 3 4 5 6 7 8 9
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon // For runtime_syscall_doAllThreadsSyscall, we // register sysmon is not ready for the world to be // stopped. atomic.Store(&sched.sysmonStarting, 1) systemstack(func() { newm(sysmon, nil, -1) }) }
在这里回忆一下 GMP 的关系。P的数量由启动时环境变量 $GOMAXPROCS 或者是由 runtime 的方法 GOMAXPROCS() 决定。这意味着在程序执行的任意时刻都只有 $GOMAXPROCS 个 goroutine 在同时运行。 M的数量可以通过 runtime/debug 中的 SetMaxThreads 函数,设置 M 的最大数量一个 M 阻塞了,会创建新的 M。 M 与 P 的数量没有绝对关系,一个 M 阻塞,P 就会去创建或者切换另一个 M,所以,即使 P 的默认数量是 1,也有可能会创建很多个 M 出来。 在确定了 P 的最大数量 n 后,运行时系统会根据这个数量创建 n 个 P。 没有足够的 M 来关联 P 并运行其中的可运行的 G。比如所有的 M 此时都阻塞住了,而 P 中还有很多就绪任务,就会去寻找空闲的 M,而没有空闲的,就会去创建新的 M。
这个关系在对应的代码里面也有体现,在 osinit 的时候会强制将 P 设置为1,newosproc 也是空的,无法启动新进程。
// wasm only: // If a callback returned and no other goroutine is awake, // then wake event handler goroutine which pauses execution // until a callback was triggered. gp, otherReady := beforeIdle(now, pollUntil) if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } if otherReady { goto top }
等于是在 wasm 中在找不到 G 的时候会启动一个 pause goroutine 占住当前的 P。
Lock_js 有 handle event 的逻辑。
内存分配
通过 WASM 的内存分配接口代替 linux 里面的 mmap 接管过来做内存分配。
系统调用
系统调用是使用 js 实现的,而且也不是像 linux 那种完整一套系统调用这个可能需要 WASI 的支持。 没有系统调用 M 也不会阻塞,等于是一个完全的单 P 单 M 多 G 的模型。
1 2 3
var jsProcess = js.Global().Get("process") var jsFS = js.Global().Get("fs") var constants = jsFS.Get("constants")
Ray 的基本抽象就是 Remote 或者 Actor,一个是分布式调用函数,一个式分布式调用类。Ray 和 Ray 的 RLlib 主要面对的问题是强化学习有大量的 simulating 的环境,比如仿真一局Dota,涉及到模拟一局Dota,反馈Agent的神经网络,是并行计算和神经网络训练的结合。当然 Ray 本身的抽象就是个分布式 goroutine,所以某种程度上可以完成的事情不光是强化学习一种任务,比如HypterTunning等一些并行计算的模型也是试用的。
反过来想,如果没有 Ray 的话,如何做这个系统呢,要构建大批量的仿真计算环境,然后能根据仿真的反馈训练神经网络。 这两个任务的调度控制就是一个问题,当然放到 k8s 的调度器里做似乎也可以,然后涉及这些分布式任务的同步问题, 需要构建这些任务的关系和信息传输,似乎用一些 DAG (比如 argo)的 workflow 也能解决,但他们之间通信的高效性似乎会是一个问题,需要 选择一种高效的远程调用传输方式,肯能gRPC也可以,还有他们的元数据管理用什么呢,自己搞个Redis似乎也行。 Ray 从这些方面综合考虑了这些问题提供了一个一站式的RL训练平台。
BayesOpt consists of two main components: a Bayesian statistical model for modeling the objective function, and an acquisition function for deciding where to sample next.
Definition: A GP is a (potentially infinte) collection of random variables (RV) such that the joint distribution of every finite subset of RVs is multivariate Gaussian: $ f \sim GP(\mu, k), $ where $\mu(\mathbf{x})$ and $k(\mathbf{x}, \mathbf{x}’)$ are the mean resp. covariance function! Now, in order to model the predictive distribution $P(f_* \mid \mathbf{x}_*, D)$ we can use a Bayesian approach by using a GP prior: $P(f\mid \mathbf{x}) \sim \mathcal{N}(\mu, \Sigma)$ and condition it on the training data $D$ to model the joint distribution of $f=f(X)$ (vector of training observations) and $f_* = f(\mathbf{x}_*)$ (prediction at test input).
采样函数
采样函数一般有 expected improvement(EI),当然还有 probability improvement(PI), upper confidence bound(UCB), knowledge gradient(KG),entropy search and predictive entropy search 等等。 采样的策略有两种: Explore:探索新的点,这种采样有助于估计更准确的; Exploit:利用已有结果附近的点进行采样,从而希望找到更大的;
这两个标准是互相矛盾的,如何在这两者之间寻找一个平衡点可以说是采样函数面对的主要挑战。
Expected improvement is a popular acquisition function owing to its good practical performance and an analytic form that is easy to compute. As the name suggests it rewards evaluation of the objective $f$ based on the expected improvement relative to the current best. If $f^* = \max_i y_i$ is the current best observed outcome and our goal is to maximize $f$, then EI is defined as
The above definition of the EI function assumes that the objective function is observed free of noise. In many types of experiments, such as those found in A/B testing and reinforcement learning, the observations are typically noisy. For these cases, BoTorch implements an efficient variant of EI, called Noisy EI, which allow for optimization of highly noisy outcomes, along with any number of constraints (i.e., ensuring that auxiliary outcomes do not increase or decrease too much). For more on Noisy EI, see our blog post.
Horovod comes with several adjustable “knobs” that can affect runtime performance, including –fusion-threshold-mb and –cycle-time-ms (tensor fusion), –cache-capacity (response cache), and hierarchical collective algorithms –hierarchical-allreduce and –hierarchical-allgather.
Reduce is a classic concept from functional programming. Data reduction involves reducing a set of numbers into a smaller set of numbers via a function. For example, let’s say we have a list of numbers [1, 2, 3, 4, 5]. Reducing this list of numbers with the sum function would produce sum([1, 2, 3, 4, 5]) = 15. Similarly, the multiplication reduction would yield multiply([1, 2, 3, 4, 5]) = 120.
In the system we described, each of the N GPUs will send and receive values N-1 times for the scatter-reduce, and N-1 times for the allgather. Each time, the GPUs will send K / N values, where K is the total number of values in array being summed across the different GPUs. Therefore, the total amount of data transferred to and from every GPU is
Data Transferred=2(N−1)KN
数据传输量在 N 比较大的时候越没有影响,这就消弭了多节点给 Parameter-Server 造成的瓶颈。
第二部根据 local_rank(相当于单节点上的第n张卡),并且设置不占用全部显存,按需分配(可能因内没有统一管理导致显存碎片),然后传递给 keras 设置 session。
1 2 3 4 5
# Horovod: pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank()) K.set_session(tf.Session(config=config))
# If set > 0, will resume training from a given checkpoint. resume_from_epoch = 0 for try_epoch in range(args.epochs, 0, -1): if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)): resume_from_epoch = try_epoch break
# Horovod: broadcast resume_from_epoch from rank 0 (which will have # checkpoints) to other ranks. resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch')
# Horovod: print logs on the first worker. verbose = 1 if hvd.rank() == 0 else 0
# Restore from a previous checkpoint, if initial_epoch is specified. # Horovod: restore on the first worker which will broadcast both model and optimizer weights # to other workers. if resume_from_epoch > 0 and hvd.rank() == 0: model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch), compression=compression) else: # ResNet-50 model that is included with Keras is optimized for inference. # Add L2 weight decay & adjust BN settings. model_config = model.get_config() for layer, layer_config in zip(model.layers, model_config['layers']): if hasattr(layer, 'kernel_regularizer'): regularizer = keras.regularizers.l2(args.wd) layer_config['config']['kernel_regularizer'] = \ {'class_name': regularizer.__class__.__name__, 'config': regularizer.get_config()} if type(layer) == keras.layers.BatchNormalization: layer_config['config']['momentum'] = 0.9 layer_config['config']['epsilon'] = 1e-5
model = keras.models.Model.from_config(model_config)
# Horovod: adjust learning rate based on number of GPUs. opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(), momentum=args.momentum)
callbacks = [ # Horovod: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when # training is started with random weights or restored from a checkpoint. hvd.callbacks.BroadcastGlobalVariablesCallback(0),
# Horovod: average metrics among workers at the end of every epoch. # # Note: This callback must be in the list before the ReduceLROnPlateau, # TensorBoard, or other metrics-based callbacks. hvd.callbacks.MetricAverageCallback(),
# Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during # the first five epochs. See https://arxiv.org/abs/1706.02677 for details. hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose),
# Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs. hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.), hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1), hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2), hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3), ]
最后直接用 allreduce 计算一个 evaluation score。
1 2
# Evaluate the model on the full data set. score = hvd.allreduce(model.evaluate_generator(input_fn(False, args.train_dir, args.val_batch_size),NUM_IMAGES['validation']))
实现
适配层和压缩算法
horovod 的实现主要分几部分,第一部分是一个适配层,用于兼容各种框架,比如 tensorflow 的适配就是实现一个新的 Op,这个可以参考 add new op,里面规范了 Tensorflow 自定义算子的实现。
请注意,生成的函数将获得一个蛇形名称(以符合 PEP8)。因此,如果您的操作在 C++ 文件中命名为 ZeroOut,则 Python 函数将称为 zero_out。
C++ 的定义是驼峰的,生成出来的 python 函数是下划线小写的,所以最后对应的是,适配Op的代码在 horovod/tensorflow 目录下面
// The coordinator currently follows a master-worker paradigm. Rank zero acts // as the master (the "coordinator"), whereas all other ranks are simply // workers. Each rank runs its own background thread which progresses in ticks. // In each tick, the following actions happen: // // a) The workers send a Request to the coordinator, indicating what // they would like to do (which tensor they would like to gather and // reduce, as well as their shape and type). They repeat this for every // tensor that they would like to operate on. // // b) The workers send an empty "DONE" message to the coordinator to // indicate that there are no more tensors they wish to operate on. // // c) The coordinator receives the Requests from the workers, as well // as from its own TensorFlow ops, and stores them in a [request table]. The // coordinator continues to receive Request messages until it has // received MPI_SIZE number of empty "DONE" messages. // // d) The coordinator finds all tensors that are ready to be reduced, // gathered, or all operations that result in an error. For each of those, // it sends a Response to all the workers. When no more Responses // are available, it sends a "DONE" response to the workers. If the process // is being shutdown, it instead sends a "SHUTDOWN" response. // // e) The workers listen for Response messages, processing each one by // doing the required reduce or gather, until they receive a "DONE" // response from the coordinator. At that point, the tick ends. // If instead of "DONE" they receive "SHUTDOWN", they exit their background // loop.