ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

DeepSeek V3分析

During the pre-training stage, training DeepSeek-V3 on each trillion tokens requires only 180K
H800 GPU hours, i.e., 3.7 days on our cluster with 2048 H800 GPUs.

DeepSeek实现了非常便宜的训练成本,是一个700B的MoE模型。

基础设施

  • 计算集群:在配备 2048 个 NVIDIA H800 GPU 的集群上训练,节点内通过 NVLink 和 NVSwitch 连接,节点间使用 InfiniBand 互连。
  • 训练框架:基于 HAI - LLM 框架,采用 16 路管道并行(PP)、64 路专家并行(EP)和 ZeRO - 1 数据并行(DP)。设计 DualPipe 算法减少管道气泡并重叠计算与通信,开发高效的跨节点全对全通信内核,优化内存占用,无需使用昂贵的张量并行(TP)。
  • FP8 训练:提出 FP8 混合精度训练框架,对多数计算密集型操作采用 FP8 精度,对部分关键操作保留原始精度,引入细粒度量化策略、提高累积精度、采用 E4M3 格式及在线量化,还降低了内存和通信开销。
  • 推理与部署:部署在 H800 集群上,通过分离预填充和解码阶段确保服务水平目标(SLO)和高吞吐量。预填充阶段最小部署单元为 4 节点 32 个 GPU,采用特定并行策略和冗余专家策略确保负载均衡;解码阶段最小部署单元为 40 节点 320 个 GPU,采用相应并行策略和冗余专家策略,并探索动态冗余策略。
  • 硬件设计建议:针对通信硬件,期望未来硬件能卸载通信任务,统一网络接口;针对计算硬件,建议提高 FP8 GEMM 累积精度、支持细粒度量化、在线量化和转置 GEMM 操作。

并行度配置

prefill阶段,attention模块采用4路张量并行+8路数据并行,moe模块采用32路专家并行。这样并行的目的是在满足首token时延的要求下,最大化系统吞吐(和训练任务类似)。

decode阶段,DeepSeek-V3采取320路专家并行(256个小专家+64个热点专家),有效降低解码时延,并缓解负载不均衡的问题。

DeepSeek-V3 采用了多种并行策略,包括 16 路流水线并行(PP),这一策略有助于提高训练效率,加快模型的处理速度。同时,还应用了 64 路专家并行(EP),且在 8 个节点上进行,能够充分发挥多节点的计算优势。此外,ZeRO-1 数据并行(DP)也被运用到训练中,进一步提升了模型的训练效果。

ZeRO-1 优化器被切分到不同的GPU上。 《大模型动力引擎——PyTorch性能与显存优化手册》有提到这个优化,总结的很好。

假设我们有N=64块GPU进行数据并行训练,在ZeRO-1阶段,优化器的状态量首先被分散存储到所有GPU中,此时单张GPU上的内存使用量骤降到(4+4+8/64)*7.5=60.9GB。ZeRO-2阶段进一步地将模型的梯度也分散存储,此时单张GPU上的内存使用量便是(4+(4+8)/64)7.5=31.4GB。而ZeRO-3阶段将模型的参数也分散存储到N个节点,此时每张GPU的内存消耗只有(4+4+8)/647.5=1.875GB。从单卡需要120GB到仅需不到2GB内存,这个优化效果是不是有点惊艳?不过需要再次强调的是,这样巨大的显存优化是有代价的,显存切分的程度越高,相应的通信开销也会增加。因此,根据实际需求合理地进行显存切分是非常重要的。

MLA

采用类似 LoRA 的架构,借助一个低秩矩阵 “compressed laten vector”,kvcache 仅需对低秩的 key-value 对以及附带旋转位置编码(RoPE)的 key 进行缓存。

MoE

除了针对 Top k、routed experts 运用添加了激活函数的加权求和方式外,还额外引入了 shared experts。在 gate 的激活函数里增添一个 bias,以此来化解 balance 失衡的难题,在训练阶段,通过调节这个 bias 对 balance 状况予以奖惩,这一调节过程被称作 bias update speed。
就一个 batch、一个序列而言,每个 token 倘若倾向于特定的一些 expert,那么未被选中的 expert 实际上仅相当于训练了极小的 batch size,或者极短的序列,正因如此,才有了这样一种策略,用以平衡 expert 的 batch size 以及序列当中的 token 数量,毕竟序列通常都很长。
DeepSeek-V3 着重凭借辅助损失策略达成负载均衡,与此同时,引入互为补充的序列平衡损失,以防单个序列内部出现极度不平衡的现象。

MTP

类似于 speculative decoding,它同样会计算多个 token,不过具体方式存在一定差异。其 embedding 与 output head 是共用的,这一点和 sd 里的 Medusa 有所不同,Medusa 是由多个头来推测不同位置,而 MTP 则是依靠多个相同的头(只是 attention 有别)去推断不同位置。

MTP 的核心目的在于提升主模型的性能表现,在推理阶段能够直接将 MTP 模块舍去,主模型依旧可以独自正常运作。不仅如此,MTP 模块还能够应用于推测解码环节,以此进一步优化生成延迟问题,让整个流程更加高效流畅。

DualPipe

双流水线pipeline的优化。它实现了前向和后向过程中计算与通信阶段的重叠,有效解决了跨节点专家并行带来的通信负载问题。

FP8

能够不依赖硬件能力做FP8精度的训练,这个点是非常厉害的。

首先,为提高模型训练速度,大部分核心计算操作(尤其是 GEMM 运算),均采用 FP8 精度实现。这些 GEMM 运算接收 FP8 格式的张量输入,输出 BF16 或 FP32 格式的结果。如图6所示,线性运算相关的三个 GEMM 操作,包括 Fprop(前向传播)、Dgrad(激活值反向传播)和 Wgrad(权重反向传播),均采用 FP8 执行。这种设计策略理论上将计算速度提升至原有 BF16 方法的两倍。同时,FP8 格式的 Wgrad GEMM 使得激活值能够以 FP8 格式存储用于反向传播,显著降低了内存使用量。

LoRA一般的设定是认为微调任务应该只需要在一个更小的子空间去训练即可不需要复用基座模型的大空间,从而实现低成本的微调。
LoRA的前提是问题是不是在子空间能得到最优解。在线性回归 y=W x 中,如果最优 W * 是高秩的,那么对 W 施加低秩假设永远不会导致最优解,无论使用什么优化器。

Gradient Low-Rank Projection (GaLore) 允许全参数学习,但比 LoRA 等常见的低秩自适应方法更具内存效率。

使用单个批处理大小从头开始预训练 LLaMA7B 模型至少需要 58 GB 内存(14GB 用于可训练参数,42GB 用于 Adam 优化器状态和权重梯度,2GB 用于激活函数)。这使得训练在消费级 GPU 上不可行,例如具有 24GB 内存的 NVIDIA RTX 4090。

他证明梯度可能具有低秩结构,如果我们能够在优化器状态中保留梯度的一个小 “核心” 的梯度统计信息,而不是完整的梯度本身,那么内存消耗就可以大幅降低。这就引出了 GaLore 策略。
他的关键思想是利用权重矩阵 W 的梯度 G 上做LoRA,而不是试图将权重矩阵本身近似为低秩。他的核心逻辑用Torch写出来如下:

1
2
3
4
5
6
7
8
9
for weight in model.parameters():
grad = weight.grad
# original space -> compact space
lor_grad = project(grad)
# update by Adam, Adafactor, etc.
lor_update = update(lor_grad)
# compact space -> original space
update = project_back(lor_update)
weight.data += update

Sequence Parallelism

假设有4个chunk,切四份。

初始化状态,每个GPU都有自己的 Qn Kn,可以计算出对应的注意力矩阵,然后类似AllReduce的方式传递切分的K。

第一步环形传递K,然后再算一次注意力矩阵。

第二步环形传递K,然后再算一次注意力矩阵。

第三步全部传完,得到完整的Sn。

然后 Sn 和 Vn 的计算也是类似的,经过三次环形传递Vn,然后每一份可以单独和小s的那一份做乘法。

所以K和V的传播都要经历 3 次(N-1)的集合通信。

LLM推理的核心在于KVCache的调度。

  1. 尽可能多次重用KV缓存,以减少所需的计算资源;
  2. 每批次最大化token数量,从而改善Model FLOPs Utilization (MFU)。

如果从远程内存获取KVCache,会增加数据传输时间,从而延长TTFT(Time To First Token)。因此,当本地KVCache的增量计算时间少于传输时间时,可以复用本地的KVCache,即使它不是最匹配的。而增大batch意味着系统处理的大批量数据,导致TBT(Token Between Token)延长,可以将负载均衡到低负载的Decode Instance。

架构

Mooncake的架构图主要分为三个部分:Prefill Instance,Decode Instance,Conductor。

  1. Cache-aware Prefill Scheduler:负责调度Request到Prefill Instance,主要考虑load和KVCache的复用率。
  2. KVCache Balance Scheduler:负责从匹配最多前缀的P2P传输KVCache到Instance(Decode和Prefill)。
  3. Load-balance Decoding Scheduler:负责负载均衡调度Request到Decode Instance。

Prefill Instance要满足TTFT SLO,最小化MFU,保证KVCache < DRAM。
Decode Instance要满足TBT SLO,保证KVCache < VRAM。
Inter-Node Transfer基于RDMA的P2P,这也是一个较大的开销。

Mooncake的方法总结如下:

  1. 转移可重用的KVCache,将尽可能多的可重用KVCache转移至Prefill Instance,减少增量计算的时间。
  2. Prefill Instance Pool分层并分块处理,并持续输出给对应的Decode Instance。分层指的是Layer-wise KVCache的异步保存,分块指的是Chunked Pipeline Parallelism。
  3. 独立的Decode Instance Pool加载KVCache,通过连续批处理解码tokens。

Mooncake的主要特点是将prefill和decode拆开,并调度KVCache块。

Reject Policy:如果一个请求不能在服务水平内完成其完整的执行,那么就应该尽早拒绝这个请求,基于这个理念需要设计一些拒绝策略,被称作Overloaded-Scheduling。

KVCache的复制

KVCache的调度主要是利用KV Cache(VRAM,DRAM),利用RDMA带宽。

下图是一个Prefill和Decode分离的计算过程。

如果了解vLLM中的prefill和decode以及管理block的方法,这个图其实很简单。

首先通过Hash判断block是否相同,例如很多系统提示词都是一样的,这部分的复用率很高。

Prefill Instance已经有了ABCDE(这里是一个P2P的过程,但我看开源的版本有个KVCache Store的WIP,不知道后面会不会有一个中心化的KVCache Store的组件)。然后计算了FGHI,存入了KV Cache(在CPU mem上),论文里面提到这个prefill在超过prefill_chunk tokens数量会做chunked prefill。

接着通过Messenger以RDMA的方式发给Decode Instance。Decode Instance基于ABCDEFGHI的prompt对应的KV Cache开始decode的过程。

根据请求模式,它可以使用缓存淘汰算法,如LRU(最近最少使用),LFU(最不常用的),或基于请求特征的算法。这些KVCache块在CPU和GPU之间的传输由一个独立的(GPUDirect)RDMA组件Messenger处理。这种架构还使我们能够为外部用户提供KVCache缓存API,从而实现更高的缓存重用性。

Mooncake已经开源了他的代码,目前只有Transfer Engine。

基于这个架构,Conductor的主要功能是:

  1. 根据当前的KVCache分布和工作负载,分发请求。
  2. 复制或交换某些KVCache块,以便于未来推理。如果某些块的数据在未来被频繁访问,Conductor可能会将其复制到其他节点上,从而提高推理效率。

Mooncake的一个争论点是,是否需要在存在chunked prefill的情况下采用这种分离架构。毕竟,chunked prefill可以填补许多pipeline中的气泡,并且能让prefill和decode节点相对统一,只需要关心一种instance,对于scheduler比较友好。

  1. 不分离的优点:

    • 所有节点被视为平等,使调度更简单;
    • 将chunked prefill内联到解码批处理中可以提高解码批次的计算强度,从而提高MFU。
  2. 分离的优点:

    • 长文本的跨节点并行和VRAM的节省。长文本输入是输出的10倍甚至100倍,对于相同的模型来说,prefill需要多节点配置才能满足显存需求。prefill阶段可以进行layer-wise prefill,每次保存大量KVCache,而decode阶段每次只需保存一个KVCache。因此,prefill阶段可以通过layer-wise prefill来减少VRAM占用。

是这么理解么?异步的Store KVCache可以节省保存的时间,但这是Prefill和Decode分离的理由么?Decode阶段应该是不保存KVCache?

然而,经过仔细考虑,论文决定保持Mooncake的分离架构。只有在请求的prefill可以不进行chunking且不影响TBT SLO的情况下,才会将其内联到解码批次中。我们这样决定的主要原因有两个:

  1. Prefill节点需要不同的跨节点并行设置来处理长上下文 (§5.1)。

  2. 这为节省VRAM提供了独特的机会 (§5.2)。

  3. 大模型需要部署在多机上,进行TP后,每一层都需要进行一次基于RDMA的reduce,这个过程开销巨大。虽然有一些Sequence Parallelism的方法,但效果并不理想,且无法避免跨节点通信。而Mooncake采用的是CPP(Chunked Parallelism Pipeline),将序列按prefill_chunk大小切分,交给prefill pool的不同节点,这些节点被切分成更小的节点池(pipelined prefill node group)。

疑问:他们是pipe的不同部分?还是完全对等的?目前感觉是PP是分layer做Pipe,而CPP是sequence分chunked做pipe。24引用的论文中提到的Sequence Pipeline可以再看一下,应该对理解这个有帮助。

  1. Layer-wise prefill,这有点像airllm项目,在计算过程中动态加载KVCache。在每次注意力计算时,KVCache是异步加载的,计算当前层时可以异步加载下一层,并且当前层结束后可以异步保存当前层。论文中认为KVCache的保存时间可以被完全省略(相较于加载计算保存的线性循环)。这样也可以降低VRAM的占用。

调度算法

  1. 选择Prefill实例

    • 如果Prefill节点上缓存了足够的前缀(由kvcache_balancing_threshold控制),则选择预估TTFT最小的实例:TTFT = min(T_queue + T_prefill)
    • 如果Prefill节点上缓存不足,则选择TTFT = min(T_queue + T_prefill + T_transfer)最小的实例,其中T_transfer指的是有最长匹配的KVCache的实例拷贝到当前实例的预估时间。
  2. 选择Decode实例

    • 通过负载均衡的方式预估TBT。
    • 如果TBT和TTFT不满足SLO,则拒绝请求,并触发KVCache的传输。
  3. 预测模型

    • 预估模型用于预测传输时间和决策传输。
    • 数据传输时间难以预测,因为它不仅取决于数据大小,还依赖于当前网络状态,特别是当发送节点处于拥塞状态时。
  4. KVCache复制

    • 热门的KVCache块需要被复制以确保高可用性。
  5. 调度器目标

    • 保证低Cache负载和高Cache命中率。
  6. 高负载情况下的策略

    • 请求可能不会被直接发送给缓存最长前缀的实例,而是转发给备选实例。备选实例会主动从缓存持有者处检索KV缓存并存储本地。
    • 当最佳的远程前缀匹配长度不超过当前本地可重用前缀的阈值时,系统优先使用本地缓存,而不是从远程实例获取令牌。

这些策略不仅减少了请求的Prefill时间,还自动复制热点缓存,使其在多台机器上更广泛地分布。

拒绝策略

论文提到了一种基于预测的拒绝策略。Prefill和Decode的负载节奏是相反的,可能在Decode负载高时,Prefill负载较低。此时如果拒绝请求,会导致Decode负载下降,而Prefill完成后Decode负载又会升高,进而再次拒绝请求。引入预测拒绝策略后,可以使Prefill过程更加平滑,减少频繁拒绝请求的情况,从而减小负载节奏的波动。

参考Pytorch版本的llama-from-scratch。原文中的RmsNorm的平均值多算了一个维度,这里改成了正确的版本。

首先需要下载TinyShakespeare数据集,这是一个莎士比亚文字数据集。本文档将大致遵循论文的布局,并跳过一些明显的步骤,比如设置虚拟环境和安装依赖项。

我们最终将实现的内容预览:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

println!(generate(llama, MASTER_CONFIG, 500, device)[0])

ZELBETH:
Sey solmenter! 'tis tonguerered if berryishdd, and What his stabe, you, and, but all I pilJefals, mode with,
Vurint as steolated have loven OlD the queen'd refore
Are been, good plmp:

Proforne, wift'es swleen, was no bunderes'd a a quain beath!
Tybell is my gateer stalk smen'd as be matious dazest brink thou
lord
Enves were cIUll, afe and whwas seath This a is, an tale hoice his his onety Meall-tearn not murkawn, fase bettizen'd her,
To belacquesterer? baxewed wupl usweggs yet tall
An

实现过程中可能涉及一些 Rust 的使用方法,与 Python 有所不同,这里不做过多说明,具体的 Rust 语法可以参考其他文档。

迭代工作:从小模块开始,保持确定性,然后逐步构建

  1. 创建所有需要的辅助函数,以便定量测试模型(数据拆分、训练、绘制损失)。
  2. 从论文中挑选出不同的组件,然后逐一实现,边训练边评估。

确保你的层按预期工作

  1. 经常使用 .shape()assert 是你的朋友。
  2. 先在不进行矩阵乘法的情况下计算结果,然后使用 candle 函数使其高效。
  3. 有一个测试来确保你的层是正确的。例如,RoPE 嵌入有一个特定的属性,你可以测试它。对于 Transformer,你可以通过查看注意力矩阵来测试注意力是否正常工作。
  4. 在各种批次、序列和嵌入大小上测试你的层。即使它适用于一种大小,它可能不适用于其他大小,这将在推理时导致问题。

关于 Llama

Llama 是一种基于 Transformer 的语言建模模型。它是一个自回归模型,也称为 CausalModel,模型会将输出中的 token 加入到输入中,不断迭代推理,直到超过上下文长度或遇到停止符。Meta AI 开源了 Llama,并明确表示他们的目标是使模型在推理时更高效,而不是优化训练成本。

接下来,我们将加载库并开始实现。

1
2
3
4
5
6
7
8
9
10
11
12
use candle_core::{DType, Device, IndexOp, Result, Tensor, D};
use candle_nn::ops::softmax;
use candle_nn::{
embedding, linear, loss, AdamW, Embedding, Init, Linear, Module, Optimizer, ParamsAdamW,
VarBuilder,
};

use core::f32;
use rand::Rng;
use std::collections::HashMap;
use std::fs;
use std::time;

设置数据集

虽然 Llama 在 1.4T 个标记上进行训练,但我们的数据集 TinyShakespeare,即莎士比亚所有作品的集合,大约只有 100 万个字符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::collections::HashMap;
use std::fs;

fn main() {
// Read the entire content of the file
let lines = fs::read_to_string("./input.txt")
.expect("Failed to read the file");

// Create a sorted set of unique characters
let mut vocab: Vec<char> = lines.chars().collect();
vocab.sort_unstable();
vocab.dedup();

// Create itos and stoi mappings
let itos: HashMap<usize, char> = vocab.iter().enumerate().map(|(i, &ch)| (i, ch)).collect();
let stoi: HashMap<char, usize> = vocab.iter().enumerate().map(|(i, &ch)| (ch, i)).collect();

// Print the first 30 characters of the file
println!("{}", &lines[..30.min(lines.len())]);
}
First Citizen:
Before we proce

他们使用了SentencePiece字节对编码分词器,但我们将只使用一个简单的字符级分词器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
use std::collections::HashMap;
use std::fs;
struct Vocab {
itos: HashMap<u32, char>,
stoi: HashMap<char, u32>,
}

impl Vocab {
fn new(itos: HashMap<u32, char>, stoi: HashMap<char, u32>) -> Vocab {
Vocab { itos, stoi }
}
fn decode(&self, ids: &[u32]) -> String {
ids.iter().map(|&id| self.itos[&id]).collect()
}
fn encode(&self, text: &str) -> Vec<u32> {
text.chars().map(|ch| self.stoi[&ch]).collect()
}
fn len(&self) -> usize {
self.itos.len()
}
fn build(lines: &str) -> Self {
// Create a sorted set of unique characters
let mut vocab: Vec<char> = lines.chars().collect();
vocab.sort();
vocab.dedup();

// Create itos and stoi mappings
let itos: HashMap<u32, char> = vocab
.iter()
.enumerate()
.map(|(i, &ch)| (i as u32, ch))
.collect();
let stoi: HashMap<char, u32> = vocab
.iter()
.enumerate()
.map(|(i, &ch)| (ch, i as u32))
.collect();
Self { itos, stoi }
}
}

fn main() {
// Read the entire content of the file
let lines = fs::read_to_string("./input.txt").expect("Failed to read the file");

let vocab = Vocab::build(&lines);
println!("vocab size = {}", vocab.len());
println!("{}", vocab.decode(&vocab.encode("hello")));
}
vocab size = 65
hello

由于数据集较小,我们无需担心内存存储问题。

我们创建了一个 config 对象来存储基本的模型参数。这样可以提高代码的可读性,并且便于修改配置。Rust 是强类型语言,因此所有变量都有明确的类型。

1
2
3
let mut modeConfig = ModelConfig {
vocab_size: vocab.len(),
}
1
2
let dataset = Tensor::from_slice(&vocab.encode(&lines), (lines.len(),), &Device::Cpu).unwrap();
println!("{:?}", dataset.shape());
[1115394]

让我们创建一个方法 get_batches 来生成训练数据和目标的批次。我们将使用相同的方法来生成验证和测试数据,通过 split 参数来控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
fn get_batches(
dataset: &Tensor,
split: &str,
batch_size: usize,
context_length: usize,
) -> Result<(Tensor, Tensor)> {
let len_of_dataset = dataset.shape().dim(0).unwrap() as f32;
// 按照 0.8 0.1 0.1 的比例切分训练集, 验证集和测试集
let batch_data = match split {
"val" => &dataset.i((0.8 * len_of_dataset) as usize..(0.9 * len_of_dataset) as usize)?,
"test" => &dataset.i((0.9 * len_of_dataset) as usize..)?,
_ => &dataset.i(..(0.8 * len_of_dataset) as usize)?,
};
// 生成随机index
let mut rng = rand::thread_rng();
let data_len = batch_data.shape().dim(0)?;
let indices: Vec<usize> = (0..batch_size)
.map(|_| rng.gen_range(0..data_len - context_length - 1))
.collect();
let mut x_batches = Vec::with_capacity(batch_size);
let mut y_batches = Vec::with_capacity(batch_size);

for idx in indices {
let x = batch_data.i(idx..(idx + context_length))?;
// y 是 x 后面的一个字符
let y = batch_data.i((idx + 1)..(idx + context_length + 1))?;
x_batches.push(x);
y_batches.push(y);
}
// stack 和 cat 的区别是, stack 是在新的维度上堆叠, cat 是在已有的维度上堆叠
let x_tensor = Tensor::stack(&x_batches, 0)?;
let y_tensor = Tensor::stack(&y_batches, 0)?;
Ok((x_tensor, y_tensor))
}
// in fn main
modeConfig.context_length = 16;
modeConfig.batch_size = 8;
let batch = get_batches(
&dataset,
"train",
modeConfig.batch_size,
modeConfig.context_length,
)?;
println!(
"batch size {}, context_length {}",
batch.0.shape().dim(0)?,
batch.0.shape().dim(1)?
);
for i in 0..modeConfig.batch_size {
println!(
"{:?}, {:?}",
vocab.decode(&batch.0.i(i)?.to_vec1()?),
vocab.decode(&batch.1.i(i)?.to_vec1()?),
);
}

":\nBut, that I'll", "\nBut, that I'll "
"ng?\nWhy, then th", "g?\nWhy, then the"
"s so blind, but ", " so blind, but s"
"thy offices,\nSo ", "hy offices,\nSo r"
"ords, how plainl", "rds, how plainly"
"IET:\nHere's such", "ET:\nHere's such "
"wer\nTo take off ", "er\nTo take off s"
" hurry from the ", "hurry from the f"

实现论文有趣的一点在于,模型“工作”有两个方面:编译(你的张量是否在各层之间匹配)和训练(损失是否下降)。
我们还要定义评估模型的方法。我们希望在定义模型之前就这样做,因为我们希望在训练模型时使用它来评估模型的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fn evaluate_loss(
model: &SimpleBrokenModel,
dataset: &Tensor,
vocab: &Vocab,
config: ModelConfig,
) -> Result<HashMap<String, f32>> {
let mut out = HashMap::new();
for split in ["train", "val"] {
let mut losses = Vec::new();
for _ in 0..10 {
let (xs, ys) = get_batches(&dataset, split, config.batch_size, config.context_length)?;
let (_, loss) = model.forward(&xs, Some(&ys))?;
let loss = loss.unwrap();
losses.push(loss.to_scalar::<f32>()?);
}
let avg_loss = losses.iter().sum::<f32>() / losses.len() as f32;
out.insert(split.to_owned(), avg_loss);
}
Ok(out)
}

设置一个可以工作的简单模型

这是一个带有嵌入的基本前馈神经网络。它是我们将要开始的基础模型,然后我们将逐步替换其部分内容,直到最终得到 Llama 论文中描述的模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
struct SimpleBrokenModel {
embedding: Embedding,
mlp: Sequential,
config: ModelConfig,
}

impl SimpleBrokenModel {
fn forward(&self, x: &Tensor, targets: Option<&Tensor>) -> Result<(Tensor, Option<Tensor>)> {
// 潜入层
let embeds = self.embedding.forward(x)?;

// 线性和激活层
let logits = self.mlp.forward(&embeds)?;

// 如果提供了targets就计算loss,不然视为推理,计算logits就可以。
if let Some(targets) = targets {
// 负的似然函数
// -log(x) 越大,loss 越小
// y = [0, 0 , 0, 0, 1, ...,0,0]
// y' = [4, 5, 6, 7, 8, ...,11,12 ]
// 这个 cross_entropy 帮我们做了一个 log softmax
// y' = [0.1, 0.12, 0.13, 0.64, ..., 0,0]
// loss = -log(0.64)
// 当 -log(q) = 4.17 q = 0.015 大概 1/64,vocab_size = 65,所以基本是在瞎猜。
let loss = loss::cross_entropy(
&logits.reshape(((), self.config.vocab_size))?,
&targets.reshape(((),))?,
)?;
Ok((logits, Some(loss)))
} else {
Ok((logits, None))
}
}
// VarBuilder是用来构建参数的,我们目前不加载和保存模型参数,但是candle的用法必须基于这个。
// vb.pp 会在参数树中加入参数的前缀,这样可以方便的查看参数的结构。
fn load(vb: VarBuilder, config: ModelConfig) -> Result<(Self)> {
let embedding = embedding(
config.vocab_size,
config.d_model,
vb.pp("model.embed_tokens"),
)?;
let mlp = sequential::seq()
.add(linear(
config.d_model,
config.d_model,
vb.push_prefix("model.fc1"),
)?)
.add(Activation::Relu)
.add(linear(
config.d_model,
config.vocab_size,
vb.push_prefix("model.fc2"),
)?);
Ok(Self {
embedding,
mlp,
config,
})
}
}
// in fn main
modeConfig.d_model = 128;
modeConfig.batch_size = 32;
let (xs, ys) = get_batches(
&dataset,
"train",
modeConfig.batch_size,
modeConfig.context_length,
)?;
let varmap = candle_nn::VarMap::new();
let vb = candle_nn::VarBuilder::from_varmap(&varmap, DType::F32, &Device::Cpu);
let model = SimpleBrokenModel::load(vb, modeConfig)?;
let (logits, loss) = model.forward(&xs, Some(&ys))?;
println!("{:?} {:?}", logits, loss);
let mut params_count: usize = 0;
for (name, var) in varmap.data().lock().unwrap().iter() {
println!("{}: {:?}", name, var.elem_count());
params_count += var.elem_count();
}
println!("params count: {}", params_count);
Tensor[dims 32, 16, 65; f32] Some(Tensor[5.266067; f32])
model.fc2.weight: 8320
model.embed_tokens.weight: 8320
model.fc1.bias: 128
model.fc2.bias: 65
model.fc1.weight: 16384
params count: 33217

在这一点上,我们必须开始关注张量的形状,并让矩阵的维度匹配。查看我们模型定义中的这一行:

1
2
3
4
let loss = loss::cross_entropy(
&logits.reshape(((), self.config.vocab_size))?,
&targets.reshape(((),))?,
)?;

我们必须调整 logitstargets 张量的形状,以便在比较时它们的维度匹配。我们使用 reshape 方法来实现这一点。
() 参数的意思是“从其他维度推断这个维度”。所以,在这种情况下,我们是在说“将 logitstargets 重新调整为具有相同行数的形状,并使用所需的列数来实现这一点”。这是处理批量数据时的常见模式。

让我们训练我们的 SimpleBrokenModel 以确保梯度流动。在确认这一点之后,我们可以替换它的部分内容以匹配 Llama,再次训练并跟踪我们的进展。在这一点上,我开始记录我的训练运行日志,这样如果我搞砸了,我可以轻松地回到之前的运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
fn train(
config: ModelConfig,
model: &SimpleBrokenModel,
opt: &mut AdamW,
dataset: &Tensor,
vocab: &Vocab,
) -> Result<()> {
let mut start_time = std::time::Instant::now();
for epoch in 0..config.epochs {
let (xs, ys) = get_batches(&dataset, "train", config.batch_size, config.context_length)?;
let (_, loss) = model.forward(&xs, Some(&ys))?;
opt.backward_step(&loss.unwrap())?;
if epoch % config.log_interval == 0 {
let batch_duration = start_time.elapsed().as_secs_f32();
let loss = evaluate_loss(&model, dataset, vocab, config)?;
let val_loss = loss.get("val").unwrap();
let eta = batch_duration * (config.epochs - epoch) as f32;
let eta = eta.round();
println!(
"Epoch: {epoch} | Loss: {val_loss} | Time: {batch_duration} | ETA in seconds {eta}"
);
start_time = time::Instant::now();
}
}
Ok(())
}
// in fn main
modeConfig.log_interval = 10;
modeConfig.epochs = 100;
let mut opt = candle_nn::AdamW::new(varmap.all_vars(), ParamsAdamW::default())?;
train(modeConfig, &model, &mut opt, &dataset, &vocab)?;
let out = evaluate_loss(&model, &dataset, &vocab, modeConfig);
println!("{:?}", out);
Epoch: 10 | Loss: 3.9159875 | Time: 6.5813394 | ETA in seconds 599
Epoch: 20 | Loss: 3.26492 | Time: 6.3639965 | ETA in seconds 515
Epoch: 30 | Loss: 2.9944448 | Time: 6.3596206 | ETA in seconds 452
Epoch: 40 | Loss: 2.8793342 | Time: 6.357106 | ETA in seconds 388
Epoch: 50 | Loss: 2.7827232 | Time: 6.3562865 | ETA in seconds 324
Epoch: 60 | Loss: 2.764416 | Time: 6.352279 | ETA in seconds 260
Epoch: 70 | Loss: 2.7196321 | Time: 6.356127 | ETA in seconds 197
Epoch: 80 | Loss: 2.7631993 | Time: 6.357493 | ETA in seconds 134
Epoch: 90 | Loss: 2.696882 | Time: 6.358631 | ETA in seconds 70
Epoch: 100 | Loss: 2.670012 | Time: 6.3603354 | ETA in seconds 6
Ok({"train": 2.591057, "val": 2.6625311})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fn generate(model: &SimpleBrokenModel, vocab: &Vocab, max_tokens: usize) -> Result<()> {
// batch size 5, initial token = 0
let mut token_ids = Tensor::zeros((5, 1), DType::U32, &Device::Cpu).unwrap();
for _ in 0..max_tokens {
let (logits, _) = model.forward(&token_ids, None)?;
assert!(logits.shape().dims() == [token_ids.dim(0)?, token_ids.dim(1)?, 65]);
let last_step_logits = logits.i((.., logits.dim(1)? - 1))?;
assert!(last_step_logits.shape().dims() == [token_ids.dim(0)?, 65]);
let probs = softmax(&last_step_logits, last_step_logits.dims().len() - 1)?;
assert!(probs.shape().dims() == [token_ids.dim(0)?, 65]);
let next_token = probs.argmax(probs.dims().len() - 1)?;
assert!(next_token.shape().dims() == [token_ids.dim(0)?]);
token_ids = Tensor::cat(&[token_ids, next_token.reshape(((), 1))?], 1)?;
}
let lines = fs::read_to_string("./input.txt").expect("Failed to read the file");
for v in &token_ids.to_vec2()? {
let text = vocab.decode(v);
println!("{}", text);
}
Ok(())
}
// fn in main
generate(&model, &vocab, 10, device)?;
['\nFind!\nD:\nAr t,\nLis sthte o t l',
 '\nAnd ronnot ar\nBE:\nKINRDYOrspr;',
 '\nI t athe momyengthend thanswal',
 '\nFis t bp he\nLacarn.\nA:\nYOMI wi',
 '\nWh ly sck\nB-de pll t\nHERIns ou']

这还算不错,但也不算太好。不过现在我们有了一个可以训练到验证损失的工作模型。因此,我们将在此基础上迭代我们的模型,使其更接近 Llama。

Llama 具体细节

Llama 对原始 Transformer 进行了三项架构修改:

  1. 用于预归一化的 RMSNorm
  2. 旋转嵌入 RoPE
  3. SwiGLU 激活函数

我们将逐一添加每个修改到我们的基础模型,并进行迭代。

RMSNorm

在 Vaswani 2017 中,原始的 Transformer 使用了 BatchNormalization。在 Llama 中,作者使用了 RMSNorm,这是一种在不进行中心化的情况下通过方差来缩放向量的方法。此外,虽然 Vaswani 将归一化应用于注意力层的输出(后归一化),但 Llama 将其应用于输入之前(前归一化)。

这篇文章对于RMSNorm有一个很好的解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
pub struct RmsNorm {
scale: Tensor,
eps: f64,
}
impl RmsNorm {
fn new(size: usize, vb: VarBuilder) -> Result<Self> {
Ok(RmsNorm {
scale: vb.get_with_hints(size, "weight", Init::Const(1.))?,
eps: 1e-6,
})
}
pub fn forward(&self, x: &Tensor) -> Result<Tensor> {
let x_sqr = x.sqr()?;
assert!(x_sqr.shape().dims() == x.shape().dims());
let norm_x = (x.mean(D::Minus1)? + self.eps)?.sqrt()?;
assert!(norm_x.shape().dims() == [x.shape().dim(0)?, x.shape().dim(1)?]);
let x_normed = x.broadcast_div(&norm_x.reshape((
norm_x.shape().dim(0)?,
norm_x.shape().dim(1)?,
(),
))?)?;
assert!(x_normed.shape().dims() == x.shape().dims());
let x = (x_normed.broadcast_mul(&self.scale))?;
Ok(x)
}
}

let varmap = candle_nn::VarMap::new();
let vb = candle_nn::VarBuilder::from_varmap(&varmap, DType::F32, &Device::Cpu);

let rms_norms = RmsNorm::new(2, vb)?;
// (2,3,2)
let batch = Tensor::new(
vec![
vec![vec![1f32, 1f32], vec![1.2f32, 2f32], vec![3f32, 3f32]],
vec![vec![4f32, 43f32], vec![5f32, 5f32], vec![61f32, 6f32]],
],
&Device::Cpu,
)?;
let vb = candle_nn::VarBuilder::from_varmap(&varmap, DType::F32, &Device::Cpu);
let out = rms_norms.forward(&batch)?;
Tensor[dims 2, 3, 2; f32]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
struct SimpleBrokenModel {
embedding: Embedding,
mlp: Sequential,
rms_norm: RmsNorm,
config: ModelConfig,
}

impl SimpleBrokenModel {
fn forward(&self, x: &Tensor, targets: Option<&Tensor>) -> Result<(Tensor, Option<Tensor>)> {
// Embedding
let embeds = self.embedding.forward(x)?;
// RMSNorm
let normed_embeds = self.rms_norm.forward(&embeds)?;
// Linear layers and activation
let logits = self.mlp.forward(&normed_embeds)?;

// Calculate loss if targets are provided
if let Some(targets) = targets {
// 负的似然函数
// log(x) 越大,loss 越小
// y = [0, 0 , 0, 0, 1, ...,0,0]
// y' = [4, 5, 6, 7, 8, ...,11,12 ]
// 这个 cross_entropy 帮我们做了一个 log softmax
// y' = [0.1, 0.12, 0.13, 0.64, ..., 0,0]
// loss = -log(0.64)
// -log(q) = 4.17 q = 0.015 大概 1/64,vocab_size = 65,所以基本是在瞎猜。
// println!("{:?}", targets.shape());
// println!("{:?}", logits.shape());
let loss = loss::cross_entropy(
&logits.reshape(((), self.config.vocab_size))?,
&targets.reshape(((),))?,
)?;
Ok((logits, Some(loss)))
} else {
Ok((logits, None))
}
}
// VarBuilder是用来构建参数的。
fn load(vb: VarBuilder, config: ModelConfig) -> Result<(Self)> {
let embedding = embedding(
config.vocab_size,
config.d_model,
vb.pp("model.embed_tokens"),
)?;
let rms_norm = RmsNorm::new(config.d_model, vb.pp("model.rms_norm"))?;
let mlp = sequential::seq()
.add(linear(
config.d_model,
config.d_model,
vb.push_prefix("model.fc1"),
)?)
.add(Activation::Relu)
.add(linear(
config.d_model,
config.vocab_size,
vb.push_prefix("model.fc2"),
)?);
Ok(Self {
embedding,
mlp,
config,
rms_norm,
})
}
}

Epoch: 10 | Loss: 4.1559505 | Time: 6.779387 | ETA in seconds 617
Epoch: 20 | Loss: 4.14648 | Time: 6.7727704 | ETA in seconds 549
Epoch: 30 | Loss: 4.1364665 | Time: 6.776428 | ETA in seconds 481
Epoch: 40 | Loss: 4.125594 | Time: 6.772582 | ETA in seconds 413
Epoch: 50 | Loss: 4.120083 | Time: 6.7661977 | ETA in seconds 345
Epoch: 60 | Loss: 4.1099877 | Time: 6.760399 | ETA in seconds 277
Epoch: 70 | Loss: 4.0996284 | Time: 6.7623253 | ETA in seconds 210
Epoch: 80 | Loss: 4.0902996 | Time: 6.761824 | ETA in seconds 142
Epoch: 90 | Loss: 4.0833025 | Time: 6.76845 | ETA in seconds 74
Epoch: 100 | Loss: 4.070025 | Time: 6.7624454 | ETA in seconds 7
Ok({"train": 4.072861, "val": 4.0711236})

从这里得到的结果来看,范化以后,模型的表现并没有提升,所以我们需要继续迭代,只是梯度的下降变得比较平滑了。

Rotary Embeddings

RoPE 是一种用于 Transformer 的位置编码方法。在《Attention is All You Need》中,作者提出了两种位置编码方法:学习的和固定的。在 RoPE 中,作者通过旋转嵌入来表示序列中标记的位置,并在每个位置使用不同的旋转角度。

其中的 cos 和 sin 值可以预先计算并缓存,避免重复计算,后续会统一存放在一个缓存结构中。

RoPE 将 hidden_state 中每两个 x 组成的向量与旋转矩阵相乘来实现位置编码。

1
2
3
4
[x0, x1, .... ,xn]
y0 = x0 * cos(theta) - x1 * sin(theta)
y1 = x0 * sin(theta) + x1 * cos(theta)
[y0, y1, ...., yn]

nd_model 的一半。
theta 是一个根据位置得到的固定值,计算公式为:
theta = m / 10000^(2i/n)
其中,m 是在序列中的位置,i 是在 d_model 中的位置。

这个公式的含义是将特征向量中的 x0x1 进行一个固定的旋转,这个旋转不是通过学习得到的,而是预先计算的。它可以用于表示相对位置信息。

1
2
3
```Rust
pos_index = 0, 中的 x0,x1
pos_index = 1, 中的 x0,x1

隔了一个恒定的调度旋转。

freq_cis缓存住提前算好的cos和sin的值,这部分不用重复计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct Cache {
cos: Tensor,
sin: Tensor,
}

impl Cache {
fn new(context_length: usize, n_elem: usize, vb: VarBuilder) -> Result<Cache> {
let theta: Vec<_> = (0..n_elem)
.step_by(2)
.map(|i| 1f32 / 10000f32.powf(i as f32 / n_elem as f32))
.collect();
let theta = Tensor::new(theta.as_slice(), vb.device())?;
let idx_theta = Tensor::arange(0, context_length as u32, vb.device())?
.to_dtype(DType::F32)?
.reshape((context_length, 1))?
.matmul(&theta.reshape((1, theta.elem_count()))?)?;
let freq_cis_real = idx_theta.cos()?;
let freq_cis_imag = idx_theta.sin()?;

let cos = freq_cis_real.reshape((context_length, n_elem / 2, 1))?;
let sin = freq_cis_imag.reshape((context_length, n_elem / 2, 1))?;
Ok(Cache { cos, sin})
}
}

rope计算的时候

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fn apply_rotary_emb(&self, x: &Tensor, cache: &Cache) -> Result<Tensor> {
let (b_sz, seq_len, n_embd) = x.dims3()?;
// println!("shape of cache.cos {:?}", cache.cos.shape());
let cos = cache.cos.i(..seq_len)?;
let sin = cache.sin.i(..seq_len)?;
// println!("cos shape {:?}", cos.shape());
let cos = cos.broadcast_as((b_sz, seq_len, n_embd / 2, 1))?;
let sin = sin.broadcast_as((b_sz, seq_len, n_embd / 2, 1))?;
// println!("broadcast cos shape {:?}", cos.shape());
let x = x.reshape((b_sz, seq_len, n_embd / 2, 2))?;
let x0 = x.narrow(D::Minus1, 0, 1)?;
let x1 = x.narrow(D::Minus1, 1, 1)?;
let dst0 = (x0.broadcast_mul(&cos)? - x1.broadcast_mul(&sin)?)?;
let dst1 = (x0.broadcast_mul(&sin)? + x1.broadcast_mul(&cos)?)?;
let rope = Tensor::cat(&[&dst0, &dst1], D::Minus1)?.reshape((b_sz, seq_len, n_embd))?;
Ok(rope)
}

Self Attention

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
struct AttentionModel {
embedding: Embedding,
mlp: Sequential,
rms_norm: RmsNorm,
config: ModelConfig,
self_attention: SelfAttention,
cache: Cache,
}

impl AttentionModel {
fn forward(&self, x: &Tensor, targets: Option<&Tensor>) -> Result<(Tensor, Option<Tensor>)> {
// 嵌入层
let embeds = self.embedding.forward(x)?;
// 范化层
let normed_embeds = self.rms_norm.forward(&embeds)?;
// 自注意力层
let y = self.self_attention.forward(&normed_embeds, &self.cache)?;
// 线性和激活层
let logits = self.mlp.forward(&normed_embeds)?;

if let Some(targets) = targets {
// 负的似然函数
// -log(x) 越大,loss 越小
// y = [0, 0 , 0, 0, 1, ...,0,0]
// y' = [4, 5, 6, 7, 8, ...,11,12 ]
// 这个 cross_entropy 帮我们做了一个 log softmax
// y' = [0.1, 0.12, 0.13, 0.64, ..., 0,0]
// loss = -log(0.64)
// 例如 -log(q) = 4.17 q = 0.015 大概 1/64,vocab_size = 65,所以基本是在瞎猜。
let loss = loss::cross_entropy(
&logits.reshape(((), self.config.vocab_size))?,
&targets.reshape(((),))?,
)?;
Ok((logits, Some(loss)))
} else {
Ok((logits, None))
}
}

fn load(vb: VarBuilder, config: ModelConfig) -> Result<(Self)> {
let embedding = embedding(
config.vocab_size,
config.d_model,
vb.pp("model.embed_tokens"),
)?;
let rms_norm = RmsNorm::new(config.d_model, vb.pp("model.rms_norm"))?;
let self_attention = SelfAttention::load(vb.pp("model.self_attention"), config)?;
let mlp = sequential::seq()
.add(linear(
config.d_model,
config.d_model,
vb.push_prefix("model.fc1"),
)?)
.add(Activation::Relu)
.add(linear(
config.d_model,
config.vocab_size,
vb.push_prefix("model.fc2"),
)?);
Ok(Self {
embedding,
mlp,
config,
rms_norm,
self_attention,
cache: Cache::new(config.context_length, config.d_model, vb)?,
})
}
}

提示:了解训练时张量维度与推理时张量维度的区别。

虽然在训练时,你可以期望张量维度与模型参数紧密匹配,例如 batch.shape = (config['batch_size'], config['context_window'], config['d_model']),但在推理时,你可能需要处理单个示例,例如 batch.shape = (1, 1, config['d_model'])。因此,你需要确保在 forward 传递中进行索引时,使用从输入派生的形状,而不一定是模型参数。

MultiHeadRopeAttention

让我们为这个单一的注意力头设置一个多头注意力层,看看训练时会发生什么。

这里实现的是GQA的注意力头。n_kv_head=1时就是MQA,n_kv_head>1n_kv_head<n_head时就是GQA,n_kv_head=n_head时就是原本的MHA。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
struct MultiHeadAttention {
q_proj: Linear,
k_proj: Linear,
v_proj: Linear,
o_proj: Linear,
n_head: usize,
n_kv_head: usize,
head_dim: usize,
}

impl MultiHeadAttention {
fn load(vb: VarBuilder, config: ModelConfig) -> Result<Self> {
let q_proj = linear(config.d_model, config.d_model, vb.pp("model.q_proj"))?;
let k_proj = linear(
config.d_model,
(config.d_model / config.n_head) * config.n_kv_head,
vb.pp("model.k_proj"),
)?;
let v_proj = linear(
config.d_model,
(config.d_model / config.n_head) * config.n_kv_head,
vb.pp("model.v_proj"),
)?;
let o_proj = linear(config.d_model, config.d_model, vb.pp("model.o_proj"))?;
println!(
"MHA config n_head {} n_kv_head {}",
config.n_head, config.n_kv_head
);
Ok(Self {
q_proj,
k_proj,
v_proj,
o_proj,
n_head: config.n_head,
n_kv_head: config.n_kv_head,
head_dim: config.d_model / config.n_head,
})
}
fn apply_rotary_emb(&self, x: &Tensor, cache: &Cache) -> Result<Tensor> {
let (b_sz, seq_len, h, n_embd) = x.dims4()?;
let cos = cache.cos.i(..seq_len)?;
let sin = cache.sin.i(..seq_len)?;
let cos = cos.unsqueeze(1)?;
let sin = sin.unsqueeze(1)?;
let cos = cos.broadcast_as((b_sz, seq_len, 1, n_embd / 2, 1))?;
let sin = sin.broadcast_as((b_sz, seq_len, 1, n_embd / 2, 1))?;
let x = x.reshape((b_sz, seq_len, h, n_embd / 2, 2))?;
let x0 = x.narrow(D::Minus1, 0, 1)?;
let x1 = x.narrow(D::Minus1, 1, 1)?;
let dst0 = (x0.broadcast_mul(&cos)? - x1.broadcast_mul(&sin)?)?;
let dst1 = (x0.broadcast_mul(&sin)? + x1.broadcast_mul(&cos)?)?;
let rope = Tensor::cat(&[&dst0, &dst1], D::Minus1)?.reshape((b_sz, seq_len, h, n_embd))?;
Ok(rope)
}
fn forward(&self, x: &Tensor, cache: &Cache) -> Result<Tensor> {
let (b_sz, seq_len, n_embd) = x.dims3()?;

// 计算 q k v
let q = self.q_proj.forward(x)?;
let k = self.k_proj.forward(x)?;
let v = self.v_proj.forward(x)?;

assert!(n_embd == self.n_head * self.head_dim);

let q = q.reshape((b_sz, seq_len, self.n_head, self.head_dim))?;
let k = k.reshape((b_sz, seq_len, self.n_kv_head, self.head_dim))?;
let v = v.reshape((b_sz, seq_len, self.n_kv_head, self.head_dim))?;

// 对 q 和 k 做位置编码
let q = self.apply_rotary_emb(&q, cache)?;
let k = self.apply_rotary_emb(&k, cache)?;
// 复制成 n_head / n_kv_head 份
let k = self.repeat_kv(k)?;
let v = self.repeat_kv(v)?;

// 把 seq_len 和 n_head 交换
// 这转换一下是为了做一个 cat single head 的简单操作
// 相当于 n_head 个的seq_len*seq_len的注意力。
let q = q.transpose(1, 2)?.contiguous()?;
let k = k.transpose(1, 2)?.contiguous()?;
let v = v.transpose(1, 2)?.contiguous()?;

// q*k^T / sqrt(d_k) d_k = d_model
// 这里是 (bs,n_head) 个 (seq_len, seq_len) 的注意力
let attn = (q.matmul(&k.t()?)? / (self.head_dim as f64).sqrt())?;

// 这里是头内的softmax (seq_len,seq_len)的行总和为1
let attn = softmax(&attn, D::Minus1)?;

// 再乘 * (bs, n_head, seq_len, head_dim) 得到 (bs, n_head)个注意力头对应的加权的v
let y = attn.matmul(&v)?;
// 把 n_head 和 seq_len 交换回来,得到 (bs, seq_len, n_head, head_dim) 然后reshape以后
// 得到 (bs, seq_len, n_head * head_dim) 把头给cat到一起。
let y = y.transpose(1, 2)?.reshape(&[b_sz, seq_len, n_embd])?;
let y = self.o_proj.forward(&y)?;
Ok(y)
}

fn repeat_kv(&self, x: Tensor) -> Result<Tensor> {
let n_rep = self.n_head / self.n_kv_head;
if n_rep == 1 {
Ok(x)
} else {
let (b_sz, seq_len, n_kv_head, head_dim) = x.dims4()?;
let x = x
.unsqueeze(3)?
.expand((b_sz, seq_len, n_kv_head, n_rep, head_dim))?
.reshape((b_sz, seq_len, n_kv_head * n_rep, head_dim))?;
Ok(x)
}
}
}

完整模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
struct AttentionModel {
embedding: Embedding,
mlp: Sequential,
rms_norm: RmsNorm,
config: ModelConfig,
self_attention: MultiHeadAttention,
cache: Cache,
}

impl AttentionModel {
fn forward(&self, x: &Tensor, targets: Option<&Tensor>) -> Result<(Tensor, Option<Tensor>)> {

let embeds = self.embedding.forward(x)?;

let normed_embeds = self.rms_norm.forward(&embeds)?;
let y = self.self_attention.forward(&normed_embeds, &self.cache)?;

let logits = self.mlp.forward(&y)?;

if let Some(targets) = targets {
let loss = loss::cross_entropy(
&logits.reshape(((), self.config.vocab_size))?,
&targets.reshape(((),))?,
)?;
Ok((logits, Some(loss)))
} else {
Ok((logits, None))
}
}

fn load(vb: VarBuilder, config: ModelConfig) -> Result<(Self)> {
let embedding = embedding(
config.vocab_size,
config.d_model,
vb.pp("model.embed_tokens"),
)?;
let rms_norm = RmsNorm::new(config.d_model, vb.pp("model.rms_norm"))?;
let self_attention = MultiHeadAttention::load(vb.pp("model.multi_head_attention"), config)?;
let mlp = sequential::seq()
.add(linear(
config.d_model,
config.d_model,
vb.push_prefix("model.fc1"),
)?)
.add(Activation::Relu)
.add(linear(
config.d_model,
config.vocab_size,
vb.push_prefix("model.fc2"),
)?);
Ok(Self {
embedding,
mlp,
config,
rms_norm,
self_attention,
cache: Cache::new(config.context_length, config.d_model/config.n_head, vb)?,
})
}
}

1
generate(&model, &vocab, 10, device)?;
['\n\n\n\n\n\n\n\nI\n\nOOOOOOOOOFOOtOOOOOOO',
 '\nIIIIII IIIIIIIIIIIIIIIIIIIIIII',
 '\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n',
 '\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\naaame',
 '\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n']

所以看起来很糟糕。这里发生了什么?让我们通过查看注意力来开始调试。

目前的注意力是没有masked的,任何位置的字符都在关注任何其他位置的字符。
这有什么不好呢?我们试图仅基于之前的标记来预测下一个标记,但这里我们看到模型正在关注之后的标记。
换句话说,模型在作弊,或者从未来泄露信息。这是一个问题,这就是为什么我们需要使用因果掩码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

pub struct Cache {
// ...
// mask 也可以 cached
mask: Tensor,
}
impl Cache {
fn new(context_length: usize, n_elem: usize, vb: VarBuilder) -> Result<Cache> {
// ...
// _ 表示类型由编译器推断,
// 默认的collect 是 [_],但是这个大小是不可变的要编译期间决定
// 所以这里还是要提示编译器要 collect 成 vec.
let mask: Vec<_> = (0..context_length)
.flat_map(|i| (0..context_length).map(move |j| u8::from(j > i)))
.collect();
let mask = Tensor::from_slice(&mask, (context_length, context_length), vb.device())?;
Ok(Cache { cos, sin, mask })
}
}

struct MultiHeadAttention {
q_proj: Linear,
k_proj: Linear,
v_proj: Linear,
o_proj: Linear,
n_head: usize,
n_kv_head: usize,
head_dim: usize,
}

impl MultiHeadAttention {
fn load(vb: VarBuilder, config: ModelConfig) -> Result<Self> {
let q_proj = linear(config.d_model, config.d_model, vb.pp("model.q_proj"))?;
let k_proj = linear(
config.d_model,
(config.d_model / config.n_head) * config.n_kv_head,
vb.pp("model.k_proj"),
)?;
let v_proj = linear(
config.d_model,
(config.d_model / config.n_head) * config.n_kv_head,
vb.pp("model.v_proj"),
)?;
let o_proj = linear(config.d_model, config.d_model, vb.pp("model.o_proj"))?;
println!(
"MHA config n_head {} n_kv_head {}",
config.n_head, config.n_kv_head
);
Ok(Self {
q_proj,
k_proj,
v_proj,
o_proj,
n_head: config.n_head,
n_kv_head: config.n_kv_head,
head_dim: config.d_model / config.n_head,
})
}
// ...
fn forward(&self, x: &Tensor, cache: &Cache) -> Result<Tensor> {
let (b_sz, seq_len, n_embd) = x.dims3()?;

// 计算 q k v
let q = self.q_proj.forward(x)?;
let k = self.k_proj.forward(x)?;
let v = self.v_proj.forward(x)?;

assert!(n_embd == self.n_head * self.head_dim);

let q = q.reshape((b_sz, seq_len, self.n_head, self.head_dim))?;
let k = k.reshape((b_sz, seq_len, self.n_kv_head, self.head_dim))?;
let v = v.reshape((b_sz, seq_len, self.n_kv_head, self.head_dim))?;

// 对 q 和 k 做位置编码
let q = self.apply_rotary_emb(&q, cache)?;
let k = self.apply_rotary_emb(&k, cache)?;
// 复制成 n_head / n_kv_head 份
let k = self.repeat_kv(k)?;
let v = self.repeat_kv(v)?;

// println!("q.shape {:?}", q.shape());
// 把 seq_len 和 n_head 交换
// 这转换一下是为了做一个 cat single head 的简单操作
// 相当于 n_head 个的seq_len*seq_len的注意力。
let q = q.transpose(1, 2)?.contiguous()?;
let k = k.transpose(1, 2)?.contiguous()?;
let v = v.transpose(1, 2)?.contiguous()?;

// let tmp = q.matmul(&k.t()?)?;
// 这个结果是有负数的,但是注意力层不会有负数。
// 计算结果出了很多 NaN,感觉应该是范化没做好。
// 后面发现是没有用 sqrt 用了开方,导致负数变成了NaN。
// q*k^T / sqrt(d_k) d_k = d_model
// 这里是 (bs,n_head) 个 (seq_len, seq_len) 的注意力
let attn = (q.matmul(&k.t()?)? / (self.head_dim as f64).sqrt())?;
// 在 softmax 之前,把未来的token位置变为负无穷,这样在softmax之后,这些位置的概率就会变为0
let mask = cache
.mask
.i((..seq_len, ..seq_len))?
.unsqueeze(0)?
.unsqueeze(0)?
.broadcast_as(attn.shape())?;
let on_true =
Tensor::new(f32::NEG_INFINITY, attn.device())?.broadcast_as(mask.shape().dims())?;
let attn = mask.where_cond(&on_true, &attn)?;
// 取一个例子
// 这里是头内的softmax (seq_len,seq_len)的每行 (seq_len) 总和为1
let attn = softmax(&attn, D::Minus1)?;

// 再乘 * (bs, n_head, seq_len, head_dim) 得到 (bs, n_head)个注意力头对应的加权的v
let y = attn.matmul(&v)?;
// 把 n_head 和 seq_len 交换回来,得到 (bs, seq_len, n_head, head_dim) 然后reshape以后
// 得到 (bs, seq_len, n_head * head_dim) 把头给cat到一起。
let y = y.transpose(1, 2)?.reshape(&[b_sz, seq_len, n_embd])?;
let y = self.o_proj.forward(&y)?;
Ok(y)
}

fn repeat_kv(&self, x: Tensor) -> Result<Tensor> {
let n_rep = self.n_head / self.n_kv_head;
if n_rep == 1 {
Ok(x)
} else {
let (b_sz, seq_len, n_kv_head, head_dim) = x.dims4()?;
let x = x
.unsqueeze(3)?
.expand((b_sz, seq_len, n_kv_head, n_rep, head_dim))?
.reshape((b_sz, seq_len, n_kv_head * n_rep, head_dim))?;
Ok(x)
}
}
}

现在,我们可以让注意力激活的上三角部分(对应未来的部分)几乎被归零了。让我们看看训练时会发生什么。

SwiGLU

正如论文中所述,“我们用SwiGLU激活函数替换了ReLU非线性函数……我们使用$\frac{2}{3} 4d$的维度,而不是PaLM中的$4d$。” SwiGLU定义为:
$$
\text{SwiGLU}(x) = \text{Swish}_\beta (xW + b) \otimes (xV + c)
$$
其中$\otimes$是逐元素乘积。Swish函数定义为:
$$
\text{Swish}_\beta(x) = x \sigma(\beta x)
$$
其中$\beta$是一个可学习的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fn silu(xs: &Tensor) -> Result<Tensor> {
xs / (xs.neg()?.exp()? + 1.0)?
}

struct SwiGLU {
c_fc1: Linear,
c_fc2: Linear,
c_proj: Linear,
}
// 新的 mlp 是三层的带gate
impl SwiGLU {
// silu 的特征是允许有一点点的负数
fn forward(&self, x: &Tensor) -> Result<Tensor> {
// 这里就是 SwiGLU SiLU(W_1 * x) * (W_2 * x) 是 element wise 的,这个可以作为gate门信号
let x = (silu(&self.c_fc1.forward(x)?)? * self.c_fc2.forward(x)?)?;
self.c_proj.forward(&x)
}

fn load(vb: VarBuilder, cfg: ModelConfig) -> Result<Self> {
let h_size = cfg.d_model;
let i_size = cfg.hidden_dim;
let c_fc1 = linear(h_size, i_size, vb.pp("gate_proj"))?;
let c_fc2 = linear(h_size, i_size, vb.pp("up_proj"))?;
let c_proj = linear(i_size, h_size, vb.pp("down_proj"))?;
Ok(Self {
c_fc1,
c_fc2,
c_proj,
})
}
}

一个llama block res 两次,一次在attention之前,一次在swiglu之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
struct Block {
rms_1: RmsNorm,
attn: MultiHeadAttention,
rms_2: RmsNorm,
swiglu: SwiGLU,
}

impl Block {
fn forward(&self, x: &Tensor, cache: &Cache) -> Result<Tensor> {
let residual = x;
let x = self.rms_1.forward(x)?;
let x = (self.attn.forward(&x, cache)? + residual)?;
let residual = &x;
let x = (self.swiglu.forward(&self.rms_2.forward(&x)?)? + residual)?;
Ok(x)
}

fn load(vb: VarBuilder, cfg: ModelConfig) -> Result<Self> {
let attn = MultiHeadAttention::load(vb.pp("self_attn"), cfg)?;
let swiglu = SwiGLU::load(vb.pp("mlp"), cfg)?;
let rms_1 = RmsNorm::new(cfg.d_model, vb.pp("input_layernorm"))?;
let rms_2 = RmsNorm::new(cfg.d_model, vb.pp("post_attention_layernorm"))?;
Ok(Self {
rms_1,
attn,
rms_2,
swiglu,
})
}
}

现在,让我们通过创建块来添加多个层的最后完整的模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
struct Llama {
embedding: Embedding,
blocks: Vec<Block>,
ln_f: RmsNorm,
lm_head: Linear,
cache: Cache,
config: ModelConfig,
}

impl Llama {
pub fn forward(
&self,
x: &Tensor,
targets: Option<&Tensor>,
) -> Result<(Tensor, Option<Tensor>)> {
let (_b_sz, _seq_len) = x.dims2()?;
let mut x = self.embedding.forward(x)?;
for block in &self.blocks {
x = block.forward(&x, &self.cache)?;
}
let x = self.ln_f.forward(&x)?;
let logits = self.lm_head.forward(&x)?;

if let Some(targets) = targets {
let loss = loss::cross_entropy(
&logits.reshape(((), self.config.vocab_size))?,
&targets.reshape(((),))?,
)?;
Ok((logits, Some(loss)))
} else {
Ok((logits, None))
}
}

pub fn load(vb: VarBuilder, config: ModelConfig) -> Result<Self> {
let embed_layer = embedding(
config.vocab_size,
config.d_model,
vb.pp("model.embed_tokens"),
)?;
let lm_head = linear(config.d_model, config.vocab_size, vb.pp("lm_head"))?;
let ln_f = RmsNorm::new(config.d_model, vb.pp("model.norm"))?;
let blocks: Vec<_> = (0..config.n_layers)
.map(|i| Block::load(vb.pp(format!("model.layers.{i}")), config).unwrap())
.collect();
Ok(Self {
embedding: embed_layer,
blocks,
ln_f,
lm_head,
config,
cache: Cache::new(config.context_length, config.d_model / config.n_head, vb)?,
})
}
}

扩展

这里主要是训练的部分,在推理的过程中还涉及到一个比较重要的kv cache。
kv cache主要是缓存自回归过程中的kv,这个kv是不变的,因为这个k 和 v 只和之前的token有关系,所以可以缓存下来,
这样在推理的时候就不用重复计算了。
围绕着prompt产生第一个Token的prefill的阶段和计算完prompt之后的decode阶段也是目前业界比较关注的推理优化的方向。
源代码在这里

这篇文章主要基于vLLM中做推理的优化做一个总结。

PagedAttention

原始论文来看,显存的浪费主要有几种。

这张图里面表示的是每个token对应的kvcache的slot。对于一个context长度中用不到的slots部分,有预留的slots和不同空隙之间的slots空隙。

vLLM参考了虚拟内存和内存页分配的逻辑构造了一个block table用于 kv cache block slots 和 token之间的关系。通过block表的管理
可以像操作系统一下减少内存碎片。
因为不同的sequence中token是有位置信息的,所有他们对应的kv slot也不一定一样。下图展示了他们的关系。

Continuous Batching

“连续批量处理”(Continuous Batching),也称为”动态批量处理”(Dynamic Batching)或”迭代级调度批量处理”(Batching with Iteration-Level Scheduling),是一种选择Batch行的技术,用于优化计算和资源利用率。
它的主要区别在于与静态批处理相比,它会在每次推理迭代过程中动态调整Batch中的序列,例如vLLM会抢占部分序列,将序列的prefill阶段和decode阶段分开,不在同一个batch中处理。

而HuggingFace的text-generation-inference的router文档中提到的:为了提高效率,特别是在文本生成和内存受限的LLM上,可以让客户端发送单个查询,然后路由器将这些查询合并或分离成批次,以最大限度地利用计算资源。这是因为在LLM中,运行模型的计算成本远远高于批处理的成本。当新请求到达时,当前正在forward的前向传播不中断,而是继续等待执行完毕。然后将当前正在处理的请求与新到的请求合并成一个批处理请求,再进行forward前向传播。在批处理请求中,任何一个请求完成(即模型产生了终止符或达到允许的最大长度),则从批处理请求中移除该请求,并释放相关资源。这种方法可以应用于多个请求,并且支持在不同参数下的处理(例如采样、不采样、温度控制等),因为每个请求在批处理中都可以独立进行处理。Anyscale 对这个过程有很好的解释

Prefill 和 Decode

在LLM推理过程中,一个Prompt的第一次执行(称为prefill)和后续的前向传播(称为decode)是不同的。Prefill阶段需要计算整个注意力矩阵并将其缓存到KV缓存中,计算规模较大,尤其是对于长度为10K或100K的提示词。而在decode阶段,只需计算新生成的token的注意力矩阵,计算规模较小。

从Kimi的Mooncake论文中的图片来看:

左图显示,当prompt长度增加时,计算时间呈平方级别增加。右图显示,decode阶段只生成一个token,计算规模比线性增长还要慢一点,但由于需要复用之前的KV缓存,因此显存开销较大。Mooncake的解决方案是将prefill和decode分开处理,prefill计算规模大,decode计算规模小,通过KV缓存共享机制传递KV缓存。论文还提到了一些关于缓存调度的细节,这里不再展开。

在Prefill阶段,所有的提示词(prompt)都是已知的,因此可以并行计算多个token,计算并行度较高。而在Decode阶段,只需根据新生成的token计算下一个token(之前的KV缓存已经保存了中间结果),因此计算规模较小。如果将Prefill和Decode放在同一个batch中计算,由于计算规模不对等,容易产生计算空隙(bubble)。

Prefill像是一个矩阵和矩阵的乘法,而Decode则是一个向量和矩阵的乘法。

Chunked Prefill

vLLM的文档很好的解释了prefill阶段和decode阶段的区别。

考虑到vLLM进行生成的序列“ABC”。当它到达时,KV缓存基于block size的预设值(这里是2)在内存中分配对应的block(B1,B2,B3,B4),但它是空的。
我们知道序列的内容(A,B,C),但我们没有token id到块索引的映射。

考虑到这种情况,下一步是为序列ABC进行prefill。在调度过程中,我们为序列中每个token块分配块索引(B3,B4),即([A, B], [C, _])。

一旦确定了块映射,我们就可以通过运行模型的前向传播。这会将ABC token的KV激活值写入KV缓存中的相应位置。此外,前向传播将导致新token “D” 被采样。D的KV值尚未知晓。

现在,序列已经完成了预填充。我们可以安排一个解码步骤。这涉及为新token “D” 分配块映射。然而,由于它适合现有的块映射,调度器不需要分配新的映射。

然后我们再次运行模型,计算并将“D”的KV写入KV缓存。这会生成一个新的token “E”。

这个过程会重复进行直到解码完成。请注意,后续的分配可能是不连续的。

Speculative Decoding

Speculative Decoding利用小型、快速的草案(draft)模型生成初始token(token是输入信息的基本单元)时的高效性,而在验证阶段则依赖更大的、更准确的大型语言模型(LLM)进行验证。

这个过程可以分为两步:

  1. 初始token生成:使用小型、快速的草案模型生成初始token。这一步骤快速生成token序列,使得下一步骤可以快速开始。
  2. 验证:使用更大的、更准确的LLM对初始token进行验证。这一步骤确保生成的token序列是准确的和有效的。

这种技术通过将任务分成两个阶段来实现高效性和准确性:快速生成初始token,然后验证这些token以确保准确性。

其实现方式如下:

  • 使用小模型进行多次decode,生成多个token序列。
  • 将这些token序列传递给大模型进行验证。大模型会生成对应的logits,形状为(batch, sequence, vocab_size),并进行softmax处理。
  • 对于单步decode,logits的形状为(batch, 1, vocab_size),其中的1表示最后一个token,在词汇表(vocab_size)上的概率分布。

例如,将小模型生成的”abcd” token序列传递给大模型,得到”ABCD”的logits。A对应的是用于预测B的概率分布,在这个序列中就是预测第二个token的概率分布。将b对应的token id在A的vocab_size长度的log prob中的值取出,对应的可能是B token id的概率,也可能不是。

1
2
3
a | b | c | d
^ ^ ^
| A | B | C | D

最后,将b、c、d在大模型中对应的logits prob求和并取平均值。如果这个值大于一个阈值,就认为这个token是合理的,否则就拒绝。

这种方法通过结合小模型的快速生成能力和大模型的高准确性,实现了高效且准确的token生成过程。

根据论文[https://arxiv.org/pdf/2406.14066]来说,在continus batching的情况下,Speculative Decoding可以提高推理速度,减少计算资源的浪费。

在Target模型验证完以后,还会多生成一个bonus token,也就是上面的那个D。

其中Decoding也产生了很多的方法,有基于模型的,也有model free的,才用大模型的一部分,或者直接从外部数据库来获取。

结果表明,在低请求率下(具体来说,请求率为 4),提出 3 个或 5 个令牌会带来最显著的加速。然而,随着请求率的增加,提出更多令牌的优势迅速减弱:当请求率超过 12 时,提出 5 个令牌不再带来性能提升。同样,在请求率大于 16 时,提出 3 个令牌会导致性能下降。

其中不同的颗粒度的猜测长度也会对性能有影响。

全局统一的长度;每个step所有request用一个长度;每个step每个request用不同的长度。

相较于吞吐,Goodput规定只有没被拒绝的token才计算,用来衡量最总的性能。

这张图展示了猜测长度和batch size对于Goodput的影响。

对于小批次,要多猜测(propose),小批次尺寸下每次请求需提议超 4 个 token 以实现最大有效吞吐量(goodput),且随着批次尺寸增大,最优猜测长度会降低;
对于大批次,则要少猜测,甚至不进行推测反而能获得更高有效吞吐量,因为大批次下推测失败成本显著增加,超过潜在收益。

除了朴素的猜测模型,里面也提到了Medusa风格的猜测模型。预测3个token就有三个head分别预测每个位置。
里面的例子head 1猜了三个,对了1个,head 2猜了2个对了一个,head3猜了3个全错了,然后加上LLM的bonus token。

SmartSpec 估算Goodput的方法就是根据成功率计算期望长度再乘上Request对应的时间。
然后根据不同的batch size计算Goodput,让Goodput最大化,得到最佳Goodput从而让吞吐最大化。

Automatic Prefix Caching

一般的LLM请求的提示词会非常长,据说OpenAI的系统提示词已经有几K了,这个规模很适合做前缀缓存。
前缀缓存是指将提示词分成多个前缀,然后将这些前缀缓存到KV缓存中,这样在生成token的时候就可以直接使用KV缓存中的值,
而不需要重新计算。这样可以减少计算量,提高推理速度。
当然在vLLM中kv cache是分块的,所以prefix 也是分块的。

1
2
3
4
5
                    Block 1                  Block 2                  Block 3
[A gentle breeze stirred] [the leaves as children] [laughed in the distance]
Block 1: |<--- block tokens ---->|
Block 2: |<------- prefix ------>| |<--- block tokens --->|
Block 3: |<------------------ prefix -------------------->| |<--- block tokens ---->|

Multi-LoRA Serving

VLLM当中的LoRA Adaptor是可以动态加载的,因为他本身和基座模型保持独立。
这里要注意的一个点是,如果词汇表修改了,会影响最后的llm head,比如英文基座模型用中文词汇表,那么 vocab_size 就不一样了。
会导致llm head的输出维度不一样,这个时候就需要重新训练llm head。
所以要注意最后的llm head的输出维度。

Tensor Parallelism

张量并行(Tensor Parallelism)是一种将大型模型的计算任务分解到多个GPU上并行执行的技术。它通过将模型的权重矩阵切分成多个子矩阵,并将这些子矩阵分配到不同的GPU上进行计算,从而实现并行计算。

在Transformer模型中,QKV(Query, Key, Value)矩阵乘法是计算量最大的部分之一。通过将QKV矩阵切分成更小的子矩阵,并将这些子矩阵分配到不同的GPU上,可以显著提高计算效率。

例如,对于一个具有d_model维度的QKV矩阵,可以将其切分成n个子矩阵,每个子矩阵的维度为d_model / n。然后,将这些子矩阵分配到n个GPU上进行并行计算。这样,每个GPU只需要计算一个较小的子矩阵,从而减少了计算时间。

张量并行的实现方式如下:

  1. 切分权重矩阵:将模型的权重矩阵切分成多个子矩阵。例如,对于一个d_model x d_model的权重矩阵,可以将其切分成nd_model x (d_model / n)的子矩阵。
  2. 分配子矩阵:将切分后的子矩阵分配到不同的GPU上。例如,将第一个子矩阵分配到GPU 0,第二个子矩阵分配到GPU 1,以此类推。
  3. 并行计算:在每个GPU上并行执行矩阵乘法计算。例如,在GPU 0上计算输入矩阵与第一个子矩阵的乘积,在GPU 1上计算输入矩阵与第二个子矩阵的乘积,以此类推。
  4. 聚合结果:将所有GPU上的计算结果聚合起来,得到最终的输出。例如,将所有子矩阵的乘积结果相加,得到最终的QKV矩阵乘积结果。

通过张量并行,可以显著提高大型模型的计算效率,减少推理时间,从而提高模型的推理速度和性能。

Pipeline Parallelism

Pipeline Parallelism通过将模型的不同层分配到不同的GPU上来实现并行计算,特别是一些大型模型。这种方法可以提高计算效率,但也会引入一些挑战,例如在层之间传递数据时可能会产生延迟。此外,由于不同层的计算时间可能不一致,可能会导致某些GPU在等待其他GPU完成计算时处于空闲状态,从而产生计算空隙(bubble)。

为了减少这些计算空隙,可以使用Chunked Prefill技术。Chunked Prefill通过将计算任务分成更小的块,从而缩短每个计算任务的时间。这使得在流水线并行中可以更灵活地安排计算任务,从而尽可能地填满计算空隙,提高整体计算效率。

通过结合Pipeline Parallelism和Chunked Prefill,可以在保持高效计算的同时,最大限度地利用计算资源,减少计算空隙,提高模型推理的速度和效率。

在vLLM当中 TP=n 且 PP=m 时,vLLM 引擎总共会有 n*m + 1 个进程。即使使用单个 GPU,我们也会有 2 个进程。

衡量LLM的服务指标

在衡量LLM服务性能时,Token相关的数据是一个重要的方面。以下是一些常见的Token相关指标:

Token Throughput

Token Throughput表示每秒生成的token数量。它是衡量模型生成速度的一个重要指标,通常以tokens per minute (TPM)为单位。

Token Latency

Token Latency表示生成一个token所需的时间。它是衡量模型响应速度的一个重要指标,通常以毫秒(ms)为单位。Token Latency包括以下几个子指标:

  • **TTFT (Time To First Token)**:从请求到第一个token生成的时间。例如,当prompt变长时,TTFT会变长;或者当kv cache不足时被抢占,TTFT也会变长。
  • **TBT (Time Between Tokens)**:生成两个token之间的时间。例如,当batch size变大时,TBT会变大。

vLLM的一个主要贡献就是PagedAttention,可以实现更高效的推理。

高效的语言模型服务系统(LLM)需要批量处理多个请求。然而,现有系统存在以下问题:

  • 每个请求的key-value缓存(KV缓存)内存巨大,动态增长和减少。
  • 容易因为碎片化和冗余复制导致内存浪费,限制了批量大小。

为了解决这些问题,提出了PagedAttention,一个基于虚拟内存和分页技术的注意力算法。基于此,开发了vLLM,一个LLM服务系统,实现了以下两个目标:

  1. KV缓存显存的几乎零浪费,减少了显存碎片。
  2. KV缓存在请求之间和请求内共享,进一步减少显存使用。

论文包含了他早期设计。

一次调用的示例如博客中展示的。

AsyncLLM

generate细节:

  • 如果引擎没有运行,启动后台循环,循环调用 _run_output_handler 方法来处理等待的请求。
  • AsyncStream 中等待请求输出并生成它们。

engine会在启动之前profile一下,把剩余的显存分配给kv cache用。

AsyncStream 对 asyncio.Queue的封装,支持了终止的能力,当finish的时候会丢入一个STOP_ITERATION的exception,这样可以让调用者知道这个stream已经结束了。

每当有一个对话请求的时候调用add_request就会生成一个这样的AsycStream用于处理对话的输出,其中副作用就是判断backgroud loop没有启动的时候,启动backgroundloop。

AsyncEngine本身有一个_new_request的Queue用户保存request的AsyncStream。

generate方法会不断从AsyncStream中yield出结果,直到遇到STOP_ITERATION。

loop的主体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 1) Pull EngineCoreOutput from the EngineCore.
outputs = await self.engine_core.get_output_async()

# 2) Detokenize based on the output.
request_outputs, reqs_to_abort = self.detokenizer.step(outputs)

# 3) Put the RequestOutputs into the per-request AsyncStreams.
self._process_request_outputs(request_outputs)

# 4) Abort any requests that finished due to stop strings.
await self.engine_core.abort_requests_async(reqs_to_abort)

# 5) Abort any requests due to client cancellations.
await self._process_cancellations()

When TP=n & PP=m, vLLM engine will have n*m + 1 processes in total.
Corollary: even when using a single GPU, we will have 2 processes.

EngineCore

EngineCore主要是完成 schedule、execute 和 output 的循环。

1
2
3
4
5
6
7
8
9
10
11
def step(self) -> List[EngineCoreOutput]:
"""Schedule, execute, and make output."""

if not self.scheduler.has_unfinished_requests():
return []

scheduler_output = self.scheduler.schedule()
output = self.model_executor.execute_model(scheduler_output)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, output)
return engine_core_outputs

Request

在具体分析之前,先看看 Request 的定义,这个数据结构串联了很多东西。

属性 num_tokens 代表的是 prompt_tokensoutput_tokens 的总数。

1
2
3
@property
def num_tokens(self) -> int:
return len(self._all_token_ids)

num_output_tokens 代表 output tokens 的数量。

1
2
3
@property
def num_output_tokens(self) -> int:
return len(self._output_token_ids)

append_output_token_ids 会改变上述的两个属性。

1
2
3
4
5
6
7
8
def append_output_token_ids(
self,
token_ids: Union[int, List[int]],
) -> None:
if isinstance(token_ids, int):
token_ids = [token_ids]
self._output_token_ids.extend(token_ids)
self._all_token_ids.extend(token_ids)

__init__ 方法中,会设置 num_prompt_tokens,这个是不变的,num_computed_tokens 会初始化为 0。

1
2
3
4
self.prompt = self.inputs.prompt
self.prompt_token_ids = self.inputs.prompt_token_ids
self.num_prompt_tokens = len(self.prompt_token_ids)
self.num_computed_tokens = 0

所以在最开始时,num_tokensnum_prompt_tokens 是相等的。当 prefill 以后,num_computed_tokens 会逐渐(逐渐的原因是 prefill 可能会被 chunked 掉)等于 num_prompt_tokensdecode 以后,num_tokens 会等于 num_prompt_tokens 加上 num_output_tokens。如果 computed_tokens 等于 num_tokens,说明已经开始 decode 了,要开始一个 token 一个 token 计算了。

在调度过程中没有直接用 computed_tokens 等于 num_prompt_tokens 的原因是:如果一个 request 被抢占掉,那么 num_tokens 在 request 恢复的时候其实应该是 num_prompt_tokens 加上 num_output_tokens,这里做了一个统一的判断。如果把preempted的request重新处理的话其实相当于多了一些output tokens的prompt的新request。

Scheduler

从 EngineCore 的 step 方法来看,目前的调度是同步的 schedule | execute model | update_from_output | schedule | execute model | update_from_output,这样会导致计算和调度之间的时间差,这个时间差会导致计算的时间没有充分利用,从而导致资源的浪费。后面的版本应该会有优化。

Scheduler 的 V1 版本把一些 chunked prefill 还有 prefix caching 的内容拆离出去,做得比较通用。

vLLM 实现了一种所有或无(all-or-nothing)驱逐策略,即要么驱逐一个序列中的所有块,要么不驱逐任何块。

接下来看看来自 v1/core/scheduler.py 的 V1 版本的 schedule 实现。

Scheduler 有个 waiting list 和 running list(位置代表权重,是 FIFO 的)。

从 running list 中获取 request 然后通过 kv_cache_manager 执行 append_slots 把新的 block 追加到 request 的 block chain 当中。如果最后一个 block 的 slot 还够的话,就不会追加新的 block。

1
2
new_blocks = self.kv_cache_manager.append_slots(
request, num_new_tokens)

如果当前的 kv_cache 的 block table 满了,则会抢占一个 running list 中的 request(放入 waiting list 中)并且把他的 cache block 都 free 掉,这里的 free 是引用计数的形式,如果引用计数为 0 就会被释放,但如果多个 request 共享了一个 block 就还不会被真正释放。

1
2
3
4
5
6
7
8
if new_blocks is None:
# The request cannot be scheduled.
# Preempt the lowest-priority request.
preempted_req = self.running.pop()
self.kv_cache_manager.free(preempted_req)
preempted_req.status = RequestStatus.PREEMPTED
self.waiting.appendleft(preempted_req)
preempted_reqs.append(preempted_req)

加入到 scheduled_running_reqs 中,消耗这次调度的 token budget,这个 budget 用完以后就会停止调度了。

1
2
3
4
5
6
7
scheduled_running_reqs.append(request)
req_to_new_block_ids[request.request_id] = [
b.block_id for b in new_blocks
]
num_scheduled_tokens[request.request_id] = num_new_tokens
token_budget -= num_new_tokens
req_index += 1

如果没有抢占请求则说明还是比较富裕的,尝试从 waiting list 中获取 request,waiting list 可能有新请求也可能有之前被抢占的请求,然后执行一遍上面的代码,不同的是需要从 kv_cache_manager 计算 computed_tokens,因为被之前被抢占的或者一些有共同前缀的 kv cache block 是已经缓存过的。

1
2
3
4
5
6
7
8
request = self.waiting[0]
# Get already-cached tokens.
computed_blocks = self.kv_cache_manager.get_computed_blocks(
request)
# NOTE(woosuk): Since incomplete blocks are not eligible for
# sharing, `num_computed_tokens` is always a multiple of
# `block_size`.
num_computed_tokens = len(computed_blocks) * self.block_size

最后把每个 request 分配到的 tokens 数量记录到 SchedulerOutput 当中。

update_from_output 接受 SchedulerOutputModelExecutorOutput,更新 request 的状态,例如更新已经计算的 token 数量,更新 kv cache 的 block 等。对于每个请求都会检查 request.num_computed_tokens == request.num_tokens 从而判断是否已经开始 decode 的部分了。然后构造 EngineCoreOutput,并且检查是否需要停止这个 request。_check_stop 方法会检查是否已经生成了 eos token 或者已经达到了最大长度,并且 free 掉对应的 request。所有没有 stop 的 request 会重新加入到 running 队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if request.num_computed_tokens == request.num_tokens:
req_index = model_runner_output.req_id_to_index[req_id]
# NOTE(woosuk): Currently, we assume that each request
# generates at most one token at each step.
token_id = sampled_token_ids[req_index]
request.append_output_token_ids(token_id)
num_new_tokens = 1
# TODO: Update the KV cache manager for prefix caching.

# Check for stop and update request state.
# This must be called before me make the EngineCoreOutput.
stopped = self._check_stop(request)

# Add EngineCoreOutput for this Request.
output = EngineCoreOutput(
request_id=req_id,
new_token_ids=request.output_token_ids[-num_new_tokens:],
finished=request.is_finished(),
finish_reason=request.get_finished_reason(),
stop_reason=request.stop_reason)
engine_core_outputs.append(output)

# Breakout of the loop.
if stopped:
continue

SchedulerOutput

该类包含了调度器的输出信息。以下是各个字段的作用:

  • scheduled_new_reqs: List[NewRequestData]

    • 作用:存储新请求的数据列表,这些请求是刚刚被调度的。
  • scheduled_resumed_reqs: List[ResumedRequestData]

    • 作用:存储恢复请求的数据列表,这些请求是之前被暂停,现在重新被调度的。
  • scheduled_running_reqs: List[RunningRequestData]

    • 作用:存储正在运行请求的数据列表,这些请求在当前调度周期内继续运行。
  • num_scheduled_tokens: Dict[str, int]

    • 作用:存储每个请求调度的token数量,键是请求的ID,值是对应的token数量。
  • total_num_scheduled_tokens: int

    • 作用:存储所有请求调度的token总数。
  • scheduled_encoder_inputs: Dict[str, List[int]]

    • 作用:存储每个请求的编码器输入,键是请求的ID,值是对应的编码器输入列表。
  • preempted_req_ids: Set[str]

    • 作用:存储被抢占的请求ID集合,这些请求在当前调度周期内被暂停。
  • finished_req_ids: Set[str]

    • 作用:存储已完成的请求ID集合,这些请求在当前调度周期内完成。
  • free_encoder_input_ids: List[Tuple[str, int]]

    • 作用:存储空闲的编码器输入ID列表,每个元素是一个元组,包含请求ID和对应的编码器输入ID。

这些字段共同描述了调度器在一个调度周期内的所有操作和状态变化。

KVCacheManager

来自v1/core/kv_cache_manager.py,这是v1版本的实现。

kv cache比较简单,
这个博客中的图片很好地阐述了kvcache的作用。

但涉及PagedAttention的实现,就需要管理block。这类似于操作系统中的虚拟地址、页表和物理页的关系。

PagedAttention的主要思想是基于操作系统中分页(paging)的经典概念。传统的注意力算法通常要求keys和values在内存空间中连续存储,
而PagedAttention则允许在非连续的内存空间中存储keys和values。

PagedAttention将每个序列(sequence)的KV缓存(KV cache)分成固定大小的块(block)。
每个块包含一个固定数量的token的key和value向量。这意味着,即使keys和values不连续存储,也可以有效地访问和操作它们。

block的管理会有一个类似于页表的结构,用于映射block的逻辑地址到物理地址。

论文中的这个图很好的表示了他们的关系,如果新生成的token填满了当前block就会分配一个新的block用于新token的生成。

共享的prefix cache指的是提示词的前缀一样的情况,他们的位置编码也不变的情况下可以在不同的sequence之间共享。
例如对于一个英语到法语翻译的提示词,前面有很多事可以共享的,对于跨请求的kv cache来说可以基于这个前缀来共享kv cache的block。

序列 前缀 (Prefix) 输入任务 (Task Input) 完整提示 (Complete Prompt) LLM 输出 (LLM Output) 输出任务 (Task Output)
Sequence A Translate English to French:
“sea otter” => “loutre de mer”
“peppermint” => “menthe poivrée”
“plush giraffe” => “girafe en peluche”
“cheese” => Translate English to French:
“sea otter” => “loutre de mer”
“peppermint” => “menthe poivrée”
“plush giraffe” => “girafe en peluche”
“cheese” =>
fromage fromage
Sequence B Translate English to French:
“sea otter” => “loutre de mer”
“peppermint” => “menthe poivrée”
“plush giraffe” => “girafe en peluche”
“I love you” => Translate English to French:
“sea otter” => “loutre de mer”
“peppermint” => “menthe poivrée”
“plush giraffe” => “girafe en peluche”
“I love you” =>
Je t’aime Je t’aime

free_block_queue是一个链表,用于分配block,初始化时将block_pool中的所有blocks串起来。它通过链表实现对KVCacheBlock的管理,删除操作是O(1)的,没有使用标准库中的dequeue。

KVCacheBlock除了prev和next指针,还有ref_count_block_hash,用于prefix caching的计算。其key是父block的hash和当前block的tokens ids的hash。

block_pool代表物理block的映射关系,例如0 -> 第一块block

1
2
3
4
# A Block pool of all kv-cache blocks.
self.block_pool: List[KVCacheBlock] = [
KVCacheBlock(idx) for idx in range(num_gpu_blocks)
]

cached_block_hash_to_block 保存的数据结构是 {block_hash: {block ID: block}}

req_to_blocks 保存了 request到 block列表的映射关系,{request ID: [block ID]}

block的eviction的定义,eviction candidate == in free queue and ref_cnt == 0

get_computed_blocks方法

根据request获取已经计算过(缓存过)的block,获取kv cache blocks的方式是通过block hash 从cached_block_hash_to_block寻找的。
hash的计算是之前的block hash加上当前token ids做一次hash,第一个block则没有父block只用当前自己的token ids做hash。

append_slots方法

会为需要新计算的token ids分配block(如果现有的block不够的话)。

Worker

GPUModelRunner

v1/worker中的gpu_runner.pyv1版本的实现。

首先依赖一个大的config参数vllm_config,包含了model_configcache_configscheduler_configdevice_config等。

初始化kv cache的dtype,对照表如下,half就是fp16,float就是fp32,默认是和模型的dtype一样。

1
2
3
4
5
6
7
8
STR_DTYPE_TO_TORCH_DTYPE = {
"half": torch.half,
"bfloat16": torch.bfloat16,
"float": torch.float,
"fp8": torch.uint8,
"fp8_e4m3": torch.uint8,
"fp8_e5m2": torch.uint8,
}

初始化sliding_window的配置,这个东西在Qwen里面才用到。

初始化block_size,决定了kv cache中连续保存的token的数量,也就是PagedAttention中的那个block的大小,Prefix cache也是以block为维度的。

初始化kv_heads,这个决定了kv head的数量,如果指定了 tensor_parallel_size,会根据这个参数平均分给每个GPU。

初始化head_size,基于model config,是model config里面的head_dim

初始化hidden_size,就是model config里面的hidden_size,就是d_model或者embed_dim,代表同一个长度。

初始化kv_cache

初始化encoder_cache encoder结果的缓存。

初始化input_registry 和多模态的支持有关系。

初始化requests dict用于request的状态保存,这里的request就是一个文本的sequence。

初始化InputBatchmax_num_seq决定了batch的宽度,max_model_len决定了batch的长度。这个Batch对象负责管理在用于前向传播的batch当中的request的插入和删除。

初始化use_cuda_graph 这个由 enforce_eager 决定,默认是会加载整个计算图。

初始化positions: torch.zeros(self.max_num_tokens, dtype=torch.int64, device=self.device)

初始化input_embeds,可以看到,宽度是max_num_tokens,长度是hidden_size,这个是用来存储输入的embedding的。

1
2
3
4
self.inputs_embeds = torch.zeros(
(self.max_num_tokens, self.hidden_size),
dtype=self.dtype,
device=self.device)

InputBatch

InputBatch在整个工程中负责管理和处理批量输入请求,确保请求的高效处理和管理。

execute_model 方法

execute_model是整个schedule | compute | update循环中的核心部分,负责执行模型的前向传播。

_update_states方法

在每次运行一个 batch 时,会根据调度器(scheduler)的要求调整每个 batch 中请求的优先级。调度器会更新请求的状态缓存 id -> CachedRequestStateinput_batch 的缓存,移除被抢占和停止的请求,并将新加入的请求放入 batch 中。因此,runner 只负责执行,具体的策略由调度器决定。

CachedRequestState 记录了请求 ID、使用的缓存块 ID 以及已计算的 token 数量。

_excute_encoder方法

执行多模态中的encoder,对于新的多模态的encode,调用model.process_mm_inputs存入到encoder_cache当中。

self.model.compute_logits使用
vllm/model_executor/layers/logits_processor.py中的LogitsProcessor,从hidden_states计算logits

self.model.sample使用vllm/model_executor/layers/sampler.py中的sampler进行sample。

最终得到sampled_token_ids = sampler_output.sampled_token_ids

_gather_encoder_outputs 方法

encoder_cache中获取当前batch需要用到的encoder的输出。

_prepare_inputs方法

input_batch.block_table 在 GPU 上,而 input_batch.block_table_cpu_tensor 在 CPU 上。
前面提到 batch 的整理是在 CPU 上进行的,这里是将要推理的部分拷贝到 GPU 上的 block_table 中。由于使用了 PagedAttention,因此所有的序列都是按 block 为粒度进行切分的。

获取input_ids,构造出传给FlashAttention的数据,例如block_table,和query_start_locseq_start_loc用于定位query和seq的位置。

input_ids, attn_metadata, logits_indices

_prepare_sampling方法

构造出sampling的参数,获取每个request的temperaturetop_ktop_p等参数。

GPUWorker

v1/worker中的gpu_worker.pyv1版本的实现。
初始化GPUModelRunner,如果开始了VLLM_TORCH_PROFILER_DIR就会调用torch.profiler.profile

determine_num_available_blocks会通过profile的方式决定可以使用的block数量。
然后根据block数量调用Runner的initialize_kv_cache

做一些GPU的dtype支持检查,比如一些老的GPU是不支持bf16的。

FlashAttentionMetadata 包含了input的结构和对应的block table的映射。

FlashAttention是一种新型的注意力算法,它能够准确计算注意力,且只需进行远远少于传统方法的内存访问。这个算法的主要目标是尽可能避免内存的读取和写入,这是注意力机制性能瓶颈的一个关键因素。该论文提出了一个IO-aware的精确注意力算法,它使用tiling(贴瓷砖,代表数据分片)来减少GPU高带宽内存与低带宽内存之间的内存读取/写入次数。

该算法基于注意力矩阵通常稀疏这一观察结果:注意力矩阵只有少数元素非零。它通过将输入矩阵Q、K、V分成更小的块来实现,从而避免了计算全矩阵乘积Q*K^T的内存占用问题。通过块级别的处理,FlashAttention使得矩阵操作可以在现代GPU的内存限制下进行,并仅读取/写入每个切片的非零元素。这降低了需要的内存访问次数,使整个过程更快和更高效。

FlashAttention通过分“瓦片化”的方式计算能够更快的一个原因是将矩阵放入更高速的缓存当中,高速的叫SRAM,低速的叫HBM。

第一代 FlashAttention 只是把QK切片,这个只要把矩阵切分在SRAM,然后计算出结果再存回HBM,这个比较简单。

第二代 FlashAttention 把 softmax 的计算也放在了SRAM上。

源自博客描述的结构中可以看出。

他这里面标得感觉不是很清楚,其中的O_2应该是最终的结果O,里面的l_1/l_2 * A^1 / l_1 就还原出了最终结果的分母,也就是scale法则。

第三代 FlashAttention 减少了上面提到的scale,不再每一步做除法,而是放到最后再除。还有就是针对交叉注意力中的mask的优化,跳过了被mask的部分。还有就是CUDA Thread warps的优化提高了并行度。

总结

FlashAttention通过利用高速缓存和分块技术,显著减少了内存访问次数,提高了注意力计算的效率。第一代主要通过切分QK矩阵并利用SRAM缓存,第二代将softmax计算也放入SRAM,第三代则进一步优化了scale计算和mask处理,并提升了并行度。

vLLM 基于 uvicorn + FastAPI 的异步 Web 框架构成。vLLM 的主体是 LLMEngine,它是一个单例类,负责管理所有的模型和数据。在异步 API 中使用的是一个 AsyncEngine。在分析 AsyncEngine 之前,我们先将 Web 部分单独拆出来看一下。

vLLM 的 CLI 入口是 vllm/scripts.py,其中 serve 的启动是通过 uvloop.run 的方式启动的。uvloop 是一个替代默认 asyncio 事件循环的库,它使用 libuv 作为事件循环的实现,从而提高性能。uvicorn 是一个基于 uvloop 的 ASGI 服务器,它可以将 ASGI 应用部署到 Web 服务器上。FastAPI 是一个基于 Starlette 的 Web 框架,它提供了许多便利的功能,比如自动文档生成、请求参数校验等。

参数经过解析以后会进入 run_server,通过 uvloop.run(run_server(args))run_serverentrypoints/openai/api_server.py 下面。AsyncEngineArgs.from_cli_args(args) 使用命令行参数初始化 AsyncEngineArgs,如果要自行封装的话可以直接初始化 AsyncEngineArgsAsyncEngineArgs 继承自 EngineArgs,其中的参数都是用来控制推断命令的。

比较常用的几个参数:

  • model: 模型的路径,可以是一个目录,也可以是 hf 上的一个 repo。

  • model_name: 如果是目录的话,期望的模型名称,或者想要改个别名,对应的是 API 中指定模型的名称。

  • tensor_parallel_size: tensor parallel 副本数,如果用多个 GPU 可以用到,会根据这个将 kv head 平分到不同的 GPU 上。

  • pipeline_parallel_size: pipeline stages 数,如果用多个 GPU 可以用到,会根据这个将模型的前向计算的layers分成多个阶段,每个阶段在不同的 GPU 上计算。

    可以参考下面这个例子:

    假设我们有 8 个 GPU,分别表示为 g0 … g7,并且我们使用 2 个 GPU 来并行化模型张量,使用 4 个 GPU 来并行化模型流水线。当前函数将创建 4 个张量模型并行组和 2 个流水线模型并行组:

    4 个张量模型并行组:

    • [g0, g1]
    • [g2, g3]
    • [g4, g5]
    • [g6, g7]

    2 个流水线模型并行组:

    • [g0, g2, g4, g6]
    • [g1, g3, g5, g7]

    注意,为了提高效率,调用者应确保相邻的 rank 位于同一个 DGX 盒子上。例如,如果我们使用 2 个 DGX-1 盒子,总共有 16 个 GPU,rank 0 到 7 属于第一个盒子,rank 8 到 15 属于第二个盒子。

  • num_seqs: 最大的序列数,其实就是 batch size,会翻倍得增加显存使用,这个貌似在启动之前的 profile 阶段可能会导致大量显存的占用。

  • quantization: 量化的方法,可以是 bitsandbytes 等,可能需要和 load_format 结合使用。

  • load_format: 加载模型的格式,可以是 pt, safetensors, bitsandbytes 等等,如果用到量化的模型基本要改成 bitsandbytes。

  • dtype: 数据类型,fp32, fp16,bf16 等等,如果模型是 bf16 的话,他默认是 bf16 的模型用 bf16,有些显卡不支持 bf 浮点数所以要设置成 half 也就是 fp16。

  • host: 监听地址。

  • port: 监听端口。

  • max_model_len: 上下文长度,适合显存不足的显卡,把默认的上下文长度改下一点。

  • enforce_eager: 是否强制使用 eager 模式,如果显存不够的需要开启这个模式,不完全加载计算图的方式可以减少显存的使用。

api_server 中的 build_app 会使用 APIRouter 初始化路由,并通过 app.include_router 引入。

主要看 @router.post("/v1/chat/completions") 注册的 async def create_chat_completion 是最常用的函数调用。

init_app_stateapp.state 中保存了 openai_serving_chat,以及其他一些接口的状态,这取决于模型配置中是否包含这些功能。例如,文本嵌入等功能(通常都有)。当调用 create_chat_completion 时,会调用 openai_serving_chat 对应的 OpenAIServingChat 类的方法。因此,Serving 的主体可以通过查看这个对象的方法来理解其功能。

构建 AsyncEngine -> 构建 app 对象。

OpenAIServingChat.create_chat_completion 主体流程

  1. 检查模型

    • 是否支持 model,model 是否是 rola,model 是否是 prompt adapter 等。

      vLLM 的 rola 不是和基座合并在一起的,是支持基座模型加多了个 lora 模型的形式。prompt adapter 看起是多模态架构中的 adaptor。

  2. 从 Engine 中获取 Tokenizer

    • 主要是基于 model path 获取对应的 tokenizer 文件,并初始化对应的 tokenizer。
  3. _preprocess_call:对输入进行预处理

    • resolve_chat_template_content_format:检查对话模板格式,因为每种大模型的用于生成文本的训练数据的格式有所不同,要确认对应的格式,LLAMA 有 LLAMA 的格式,可以参考下面的例子。
    • parse_chat_messages_futures:解析输入的聊天消息,生成一个对话消息列表,变成有类型的对话消息。其中 mm_tracker 要处理 image_urlaudio_url 的消息,会根据构造 placeholderplaceholder 是一个特殊的字符串,用来标记这个位置是一个占位符。llama3.2 用的是 <|image|>
    • apply_{hf,mistral}_chat_template:模板会给提示词添加提示词的开头和结束的标志,从而和实际训练的数据标注对齐,比如 llama3<|eot_id|> 标记结束,padding 等。request_promptengine_prompt 包含 token ids 和多模态数据。
      例如:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    chat = [
    {
    "role": "user",
    "content": [
    {"type": "image"},
    {"type": "text", "text": "If I had to write a haiku for this one, it would be: "}
    ]
    }
    ]

    会变成 <|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n<|image|>If I had to write a haiku for this one, it would be: <|eot_id|> 中,<|start_header_id|>user<|end_header_id|> 标识 header(也就是 role),<|begin_of_text|> 标识上下文的开头,<|eot_id|> 标识一个消息的结束。除此之外,对于function call的处理,可以参考 examples/tool_chat_template_llama3.2_json.jinja 的一部分可以看出,会把对应工具的调用和提示词加入到用户对话前面,作为 user 的 text 的前缀中的内容形成提示词的一部分上下文。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    {#- Custom tools are passed in a user message with some extra guidance #}
    {%- if tools_in_user_message and not tools is none %}
    {#- Extract the first user message so we can plug it in here #}
    {%- if messages | length != 0 %}
    {%- if messages[0]['content'] is string %}
    {%- set first_user_message = messages[0]['content']|trim %}
    {%- else %}
    {%- set first_user_message = messages[0]['content'] | selectattr('type', 'equalto', 'text') | map(attribute='text') | map('trim') | join('\n') %}
    {%- endif %}
    {%- set messages = messages[1:] %}
    {%- else %}
    {{- raise_exception("Cannot put tools in the first user message when there's no first user message!") }}
    {%- endif %}
    {{- '<|start_header_id|>user<|end_header_id|>\n\n' -}}
    {{- "Given the following functions, please respond with a JSON for a function call " }}
    {{- "with its proper arguments that best answers the given prompt.\n\n" }}
    {{- 'Respond in the format {"name": function name, "parameters": dictionary of argument name and its value}. ' }}
    {{- "Do not use variables.\n\n" }}
    {%- for t in tools %}
    {{- t | tojson(indent=4) }}
    {{- "\n\n" }}
    {%- endfor %}
    {{- first_user_message + "<|eot_id|>"}}
    {%- endif %}
    • 请求处理:生成请求的 id request_id = f"chatcmpl-{request.request_id}",确定采样方法 beam_search 还是 sampling,调用 AsyncEngine 的 beam_searchgenerate 方法获得一个 generator。

    • chat_completion_stream_generator 是基于 generator 处理响应,这里主要看 streaming 的部分,同步的请求会直接返回结果。流式响应的格式是多个基于 json 格式的 chunk,类型是 chat.completion.chunk

    1
    {"id": "chatcmpl-1eadb733adf64f5b90114307b2d4d718", "choices": [{"delta": {"content": "", "function_call": null, "refusal": null, "role": "assistant", "tool_calls": null}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1732869116, "model": "llama3.2", "object": "chat.completion.chunk", "service_tier": null, "system_fingerprint": null, "usage": null}
    1
    {"id": "chatcmpl-1eadb733adf64f5b90114307b2d4d718", "choices": [{"delta": {"content": "AI", "function_call": null, "refusal": null, "role": null, "tool_calls": null}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1732869116, "model": "llama3.2", "object": "chat.completion.chunk", "service_tier": null, "system_fingerprint": null, "usage": null}
    1
    {"id": "chatcmpl-1eadb733adf64f5b90114307b2d4d718", "choices": [{"delta": {"content": " assistant", "function_call": null, "refusal": null, "role": null, "tool_calls": null}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1732869116, "model": "llama3.2", "object": "chat.completion.chunk", "service_tier": null, "system_fingerprint": null, "usage": null}
    1
    {"id": "chatcmpl-1eadb733adf64f5b90114307b2d4d718", "choices": [{"delta": {"content": "", "function_call": null, "refusal": null, "role": null, "tool_calls": null}, "finish_reason": "stop", "index": 0, "logprobs": null}], "created": 1732869116, "model": "llama3.2", "object": "chat.completion.chunk", "service_tier": null, "system_fingerprint": null, "usage": null}

    AsyncEngine Client 的 generate 会返回一个异步生成器,result_generator,通过 async for 遍历这个生成器 result,而 result 又是一个 output 的生成器。num_cached_tokens 表示前缀匹配的 kv cache 命中的 token 数量。request.n 代表要生成的选择的数量,一般是 1,如果大于 1 就会生成多个选择的分支,而 response 中的 index 就会代表不同的分支的序号。result 生成器对应的就是多个分支的结果,而 result 中的 output 就代表一个分支中的 chunk。处理过程中会把 output 转化成 ChatCompletionStreamResponse,输出成 data: $json_dump 的 SSE chunk 的形式。stream_options.include_usage 如果设置了的话会在 DONE 之前返回一个 usage stats 的 chunk。

    • tool_parser:解析工具描述。方法和对应的类在 openai/tool_parsers 下面,会根据传入的初始化参数决定对应的解析类。如果对应的 request 有 tool_choice 参数,就会使用到 tool_parser,tool_parser 主要用于处理响应中的 tool call 的文本内容。tool_parsertool_choice 为 auto 的时候要调用对应的 extract_tool_calls_streaming 去解析函数调用的文本内容。例如 pythonic_tool_parser 会解释 [func_name1(params_name1=params_value1, params_name2=params_value2...), func_name2(params)] 这种类似 Python 的文本内容并转化为响应中的 ToolCall 对象。如果是 llama3.1 的 template 的话,参考上面的格式,会把输出 {"name": function name, "parameters": dictionary of argument name and its value} 转化为 ToolCall 对象。

总结

vLLM 的主体是 LLMEngine,它是一个单例类,负责管理所有的模型和数据。在基于FastAPI的异步Restful API 中使用的是一个 AsyncEngine。在交给Engine处理之前会对一些请求参数进行预处理,比如对话模板的格式化,对话消息的解析,模板中的函数调用等。

这篇文章对VLM的架构解释得非常清楚。

一种方法是使用适配器将图片转换为tokens,例如LLaVA使用的prompt based适配器。这种方法类似于RAG的形式,将图片理解的内容补充在对话的上文中。这种适配器会占用LLM的上下文长度,因为图片的tokens会被放入LLM的上文中。目前来说性能会好一些。

另一种方法是基于交叉注意力的适配器,这种方法不会占用LLM的上下文长度,但需要大量参数来达到良好的质量。Llama3.2就是这种结构。

关于Llama3.2本身,它使用了GQA,将kv head分组,多头查询将原本的K和V头分成组并为每个组生成一个共享的Head,这样可以减少kv cache而不太丧失精度(相较于MQA这种只共享一个KV头的方法)。因此,分组多头查询在多头查询注意力和正常多头注意力之间维持了平衡,既考虑了速度,又考虑了输出质量。另一个优化是对一个上下文中的不同文档进行mask处理。由于大模型的上下文现在很长,会将多个文档放入一个上下文中进行训练,但为了避免文档之间的相互影响,需要在文档级别进行mask处理,即当前token不能看到之后的token,也不能看到同一上下文中其他文档的token。其他改动主要是训练规模的调整。

根据Llama3.2的技术报告,里面的image encoder用的是ViT架构。适配器在语言模型和图像编码器之间引入交叉注意力层(cross-attention layers),以提高模型的效率和准确性。交叉注意力层使用通用查询注意力(GQA)并在核心语言模型每四层之后应用。交叉注意力层增加了大量可训练参数,例如Llama 3 405B中约有100B个参数。

本质上,图片编码器的输出通过适配器后作为交叉注意力层的K,文本作为Q,V也来自图片适配器,从而计算文字和图片之间的注意力关系,然后与LLM的输出进行交叉注意力。在训练Llama3.2的适配器时,同时更新了图像编码器的参数,但刻意不更新语言模型的参数。这意味着在适配器训练过程中,Meta只关注图像编码器和适配器的学习,而不影响语言模型的预训练知识。
简而言之,这个适配器在功能上类似于最初的encoder-decoder Transformer中的encoder部分。

在具体的以vLLM推断过程的实现为例,对话的API中会包含{"type":"image","image_url":"uri_of_the_image"},在应用对话模板以后会插入占位符,比如llama3.2用的就是<|image|>,原始的训练中的文本内容会变成类似"<|image|>If I had to write a haiku for this one",以此标记图片的位置信息,实际上需要图片会通过uri_of_the_image被加载到encoder中并携带<|image|>所代表的位置信息编码。

总的来说,VLM的计算过程和推断中的处理方式通过引入适配器和交叉注意力层,实现了图片和文本的高效融合,为多模态任务提供了强大的支持。