SGLang Pipeline Parallelism 深度分析
Pipeline Parallelism(PP)是多卡推理中的核心技术之一。SGLang 的 PP 实现有一套独立的事件循环和调度机制,和普通的 single-batch 路径完全不同。本文将从代码出发,深入分析 SGLang 的 PP 实现。
为什么需要 Pipeline Parallelism
单机推理的瓶颈
单卡推理时,GPU 利用率受限于模型大小和显存。一个 70B 模型单卡放不下,必须拆分到多卡。
模型并行方案对比
| 方案 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| TP (Tensor Parallelism) | 按层切分权重 | 通信少,延迟低 | 受限于单节点卡数 |
| PP (Pipeline Parallelism) | 按层切分模型 | 支持任意层数 | 存在 pipeline bubble |
| DP (Data Parallelism) | 每卡完整模型 | 简单,吞吐高 | 显存需求不变 |
SGLang 通常 TP + PP 组合使用:节点内 TP,跨节点 PP。
SGLang PP 架构概览
PP 路径的独立事件循环
SGLang 的调度器有三条事件循环路径:
event_loop_normal- 普通单 batch 调度event_loop_overlap- 计算和通信 overlap 的调度event_loop_pp- PP 路径的独立事件循环
PP 路径有自己独立的事件循环,定义在 scheduler_pp_mixin.py 的 SchedulerPPMixin 类中。它和 normal/overlap 路径完全隔离,因为 PP 的通信模式(send/recv proxy tensors 跨 stage)和单机调度逻辑差异太大,硬塞进同一个 event loop 会很脏。
event_loop_pp 核心流程
1 | # scheduler_pp_mixin.py 第 72-145 行 |
**⚠️ step 5 和 step 6 的顺序取决于
pp_async_batch_depth**:
pp_async_batch_depth > 0:step 6(output 收发+预处理)在 step 5(forward)之前执行,与 forward 重叠,隐藏延迟pp_async_batch_depth == 0:step 6(output 收发+预处理)在 step 5(forward)之后执行,串行不重叠上图展示的是
pp_async_batch_depth == 0的顺序(step 6 在 step 5 之后)。
Microbatch 索引计算:
核心概念:commit = 等待上一轮的异步操作完成
所有 _pp_commit_comm_work(work) 做的事情就是:
1 | def _pp_commit_comm_work(self: Scheduler, work: List[P2PWork]) -> None: |
即:commit 是一个同步屏障,确保上一轮发起的异步 send 已经完成,发送缓冲区可以安全复用。这是一种延迟等待模式–发送时不阻塞,到下一轮再等待完成,从而让发送和 GPU 计算重叠。
Microbatch 索引计算:
1 | next_mb_id = (mb_id + 1) % self.pp_loop_size # 当前 rank 下一个要处理的 microbatch 槽位 |
next_mb_id:当前 rank 下一个要处理的 microbatch 槽位(偏移 1,因为每次迭代处理一个 mb)next_first_rank_mb_id:first rank 对应的”下一轮”microbatch 槽位(偏移pp_size,因为流水线深度–first rank 产出的 batch 要经过pp_size个迭代才到 last rank 完成)
步骤与函数的对应关系:
| 步骤 | 函数 | 作用 |
|---|---|---|
| recv ith req | recv_requests() |
接收请求(PP0 从 zmq,PP>0 从前一个 rank P2P) |
| send ith req to next | _pp_send_pyobj_to_next_stage() |
把请求异步转发给下一个 PP rank(提前发送,隐藏通信延迟) |
| get_next_batch_to_run | get_next_batch_to_run() |
从 waiting_queue 调度请求组成 batch |
| recv ith proxy | _pp_recv_proxy_tensors() |
接收上游 stage 的 hidden_states + residual(forward 输入) |
| send+recv output + preprocess | _pp_commit_send_output_work_and_preprocess_output_tensors() |
发送 output 给下一个 stage + 接收 next_mb_id 槽位的 output + GPU→CPU 拷贝 + 解包 |
| run ith batch | _pp_launch_batch() |
执行当前 microbatch 的 forward 计算 |
| process next_mb_id batch result | _pp_process_batch_result() |
更新请求状态、判断是否结束、流式输出给 tokenizer |
| send ith proxy to next | _pp_send_dict_to_next_stage(msg_type="proxy") |
发送 forward 输出的 hidden_states 给下一个 stage |
关键设计:
- send req 提前:收到请求后立即转发给下一个 rank(step 2),不等 forward 完成,隐藏 P2P 通信延迟
- output 发送在
_pp_commit_send_output_work_and_preprocess_output_tensors中处理:output 的发送和接收都在 step 6 中完成,last rank 把 output 发给 rank 0,中间 rank 转发 - **output 处理时机取决于
pp_async_batch_depth**:> 0:output 收发+预处理在 forward 之前执行,与当前 forward 重叠,隐藏延迟== 0:output 收发+预处理在 forward 之后执行,串行不重叠
- 延迟等待异步通信:
_pp_commit_comm_work在下一轮迭代中调用.wait()确保上一轮的 async send 已完成,让发送和 GPU 计算重叠 - 每个 microbatch 槽位独立管理状态
- async send + recv 实现计算和通信的 overlap
- 主循环轮转处理每个 microbatch,填充 pipeline bubble
完整流程示例(pp_size=2, pp_async_batch_depth=0)
以 pp_size=2, pp_async_batch_depth=0 为例,pp_loop_size=2,只有槽位 0 和 1:
1 | 参数: |
单次迭代(mb_id = N)的完整步骤:
步骤 1:恢复上下文
1 | running_batch = running_mbs[N] |
步骤 2:接收并处理请求
1 | recv_reqs = recv_requests() |
- PP0:从 tokenizer_manager / detokenizer 收请求(zmq 通道)
- PP1:从 PP0 收转发过来的请求(P2P,含 PPHiCacheEventsReq 控制消息)
步骤 3:HiCache 事件同步 + 转发请求
1 | if enable_hierarchical_cache: |
commit 含义:上一轮(槽位 (N-1)%2)发起的 send_req_work 可能还没完成,这里阻塞等它完成后,才能安全地复用发送缓冲区发送本轮的请求。
步骤 4:调度决策
1 | mbs[N] = get_next_batch_to_run() |
步骤 5:接收 Proxy 张量
1 | if cur_batch is not None: |
步骤 6:等待上一轮 Proxy 发送完成
1 | _pp_commit_comm_work(send_proxy_work) # ← 等待上一轮的 proxy 异步发送完成 |
commit 含义:上一轮(槽位 (N-1)%2)PP0 发起的 proxy 异步发送可能还没完成,这里确保它完成,释放发送缓冲区。
步骤 7:GPU Forward 计算
1 | if cur_batch: |
步骤 8:发送 Output + 接收 Output + 预处理
因为 pp_async_batch_depth == 0,这一步在 forward 之后执行(无法与 GPU 计算 overlap)。
1 | _pp_commit_send_output_work_and_preprocess_output_tensors( |
内部逻辑(_pp_send_recv_and_preprocess_output_tensors):
1 | # 1. 等待上一轮的 output 异步发送完成 |
commit 含义:上一轮(槽位 (N-1)%2)发起的 output 异步发送可能还没完成,这里确保完成后才能发起新的 output 发送。
步骤 9:后处理 next_mb_id 槽位的 Batch 结果
1 | if mbs[next_mb_id] is not None: |
步骤 10:发送 Proxy 张量(仅 PP0)
1 | if not is_last_rank and cur_batch: |
步骤 11:保存状态
1 | pp_outputs = next_pp_outputs # 暂存本轮接收到的 output,供下一轮步骤 8 转发 |
完整时序(稳态,两个槽位交替)
1 | ┌─────────────────────────────────────────────────────────────────┐ |
PP0 与 PP1 的交互时序(稳态)
1 | 时间轴 → |
关键依赖链:
- PP1 的
recv_proxy必须等 PP0 的send_proxy(上一轮的)完成 - PP0 的
recv_output必须等 PP1 的send_output完成 - 因为
pp_async_batch_depth=0,output 的发送/接收在 forward 之后,无法与 GPU 计算重叠
所有 commit 汇总
| commit 调用 | 等待的是什么 | 目的 |
|---|---|---|
commit(send_req_work) |
上一轮槽位发起的 reqs 异步发送 | 确保发送完成,可以安全复用缓冲区发送本轮 reqs |
commit(send_proxy_work) |
上一轮槽位发起的 proxy 异步发送 | 确保 hidden_states 张量已发送完毕,可以释放/复用 |
commit(send_output_work) |
上一轮槽位发起的 output 异步发送 | 确保 output 张量已发送完毕,可以发起新的 output 发送 |
模式统一:每个 async_send 在下一轮同一类型操作之前被 commit,形成”发起→(做其他事)→等待完成→再发起”的流水线模式。即使 pp_async_batch_depth=0 没有计算-通信 overlap,这种 commit 模式仍然保证了同类型通信操作之间不会冲突。
Microbatch:填充 Pipeline Bubble
问题:Pipeline Bubble
PP2(2 个 stage)如果只用一个 batch,会出现大量空闲:
1 | 时间 → |
Stage 0 做完 batch 0 后必须等 Stage 1 处理完才能做 batch 1(因为只有一个 batch 在 pipeline 里流转),一半的时间都在空闲。这就是 pipeline bubble。
解法:Microbatch
把多个 batch 同时塞进 pipeline,这些 batch 就叫 microbatch:
1 | 时间 → |
Stage 0 做完 mb0 不用等,立刻做 mb1;Stage 1 也紧接着处理,bubble 大幅减少。
Microbatch 数量的确定
1 | # scheduler_pp_mixin.py 第 514 行 |
pp_size= 2(PP2)→ 至少 2 个 microbatch 槽位pp_async_batch_depth→ 额外的 buffer 深度,进一步隐藏延迟
每个 microbatch 都有独立的状态:
1 | self.mbs = [None] * self.pp_loop_size # 当前 batch |
Microbatch vs 正常 Batch
| 维度 | 正常 batch | Microbatch(PP) |
|---|---|---|
| 数量 | 同时 1 个 | 同时 pp_size + async_depth 个 |
| 目的 | 普通调度 | 填充 pipeline bubble,提升利用率 |
| 状态管理 | 单个 running_batch | 数组 running_mbs[mb_id] 各自独立 |
| 本质 | 就是一个 batch | 也是普通 batch,只是多个在 pipeline 中交替执行 |
Microbatch 和 batch 在数据结构上没有区别(都是 ScheduleBatch),区别只在于 PP 需要多个 batch 同时在不同 stage 中流转,所以给它们编号叫 microbatch。
Microbatch 数量不是越多越好
1 | pp_loop_size = pp_size + pp_async_batch_depth |
- 太少(= pp_size):pipeline 容易断流,一个 stage 稍慢就导致下游空闲
- 太多:显存占用线性增长(每个 microbatch 都要有自己的 KV cache + activation),而且可能因为显存压力导致 batch size 被迫缩小,反而吞吐下降
调优经验:
pp_async_batch_depth = 1~2通常足够,再深收益递减- 显存充裕时可以试更大的 depth,但要注意 activation memory 的开销
- 如果 single batch 的 token 数很大(长上下文),microbatch 多了容易 OOM
PP Rank 的请求接收路径
recv_requests() 根据 PP rank 的不同,有两条完全不同的接收路径。接收的请求包括两类:
- 外部 API 请求:tokenize 后的推理请求(
TokenizedGenerateReqInput、TokenizedEmbeddingReqInput等) - 内部控制请求:PP 同步用的控制消息(如
PPHiCacheEventsReq等)
PP Rank 0(第一个 stage)
从上游组件直接接收用户请求,走两条 zmq 通道:
1 | # 1. 从 tokenizer 接收(zmq 通道) |
接收的是 tokenize 后的请求对象,类型为:
TokenizedGenerateReqInput- 生成请求TokenizedEmbeddingReqInput- embedding 请求- 其他控制类消息(flush、abort 等)
PP Rank > 0(后续 stage)
不直接从 tokenizer 接收,而是从前一个 PP rank P2P 转发过来:
1 | recv_reqs = point_to_point_pyobj( |
对应 event_loop_pp 中的流转
1 | # event_loop_pp 第 80-88 行 |
流转全图
1 | Tokenizer / RPC |
关键设计:所有 PP rank 收到的内容是一样的(同样的请求列表),这样每个 stage 都能为同一批请求做 get_next_batch_to_run() 调度,保证各 stage 的 batch 一致。
process_input_requests:请求路由器
recv_requests() 收到请求后,process_input_requests() 负责按类型分发到对应的 handler:
1 | # scheduler_pp_mixin.py 第 1545-1566 行 |
_request_dispatcher 是一个 TypeBasedDispatcher,根据请求类型路由到不同 handler:
核心推理请求:
| 请求类型 | Handler | 说明 |
|---|---|---|
TokenizedGenerateReqInput |
handle_generate_request |
生成请求 → 加入 waiting_queue |
TokenizedEmbeddingReqInput |
handle_embedding_request |
Embedding 请求 |
BatchTokenized* |
handle_batch_* |
批量请求 |
控制类请求:
| 请求类型 | Handler | 说明 |
|---|---|---|
AbortReq |
abort_request |
中止请求 |
FlushCacheReqInput |
flush_cache_wrapped |
清空 KV cache |
OpenSessionReqInput |
open_session |
开启会话 |
CloseSessionReqInput |
close_session |
关闭会话 |
ProfileReq |
profile |
性能分析 |
SlowDownReqInput |
slow_down |
降速 |
PauseGenerationReqInput |
pause_generation |
暂停生成 |
权重管理请求:
| 请求类型 | Handler | 说明 |
|---|---|---|
UpdateWeightsFrom* |
update_weights_from_* |
更新模型权重(disk/distributed/tensor/IPC) |
LoadLoRAAdapterReqInput |
load_lora_adapter |
加载 LoRA |
UnloadLoRAAdapterReqInput |
unload_lora_adapter |
卸载 LoRA |
一句话总结:process_input_requests 就是一个请求路由器–生成请求进入 waiting_queue 等待后续调度,控制请求立即处理并回送结果,所有 PP rank 都执行同样的分发逻辑保证状态一致。
_pp_recv_proxy_tensors:接收上游 Stage 的中间结果
_pp_recv_proxy_tensors(scheduler_pp_mixin.py 第 993-1004 行)负责从上一个 PP stage 接收 forward 的中间激活值:
1 | def _pp_recv_proxy_tensors(self: Scheduler) -> Optional[PPProxyTensors]: |
行为:
- PP Rank 0(first rank):返回
None,因为它是第一个 stage,没有上一级给它传 hidden states,直接用 embedding 层的输出开始 forward。代码里有is_first_rank判断,PP0 不会做任何接收操作。 - PP Rank > 0:从前一个 PP rank 接收 tensor dict(
msg_type="proxy"),包装成PPProxyTensors
Proxy tensors 是什么:
看 profile_and_init_predictor 里的构造(第 609-622 行)就清楚了:
1 | proxy_tensors = { |
就是上一个 PP stage forward 输出的中间激活值:
hidden_states- 当前层的隐藏状态residual- 残差连接
在 event_loop_pp 中的位置:
1 | if self.cur_batch: |
对应的发送端:在 event_loop_pp 末尾(第 129-139 行),非 last rank 发送 proxy:
1 | if not self.pp_group.is_last_rank: |
一句话总结:_pp_recv_proxy_tensors 就是非首 rank 从前一个 PP stage 接收 forward 中间结果(hidden_states + residual),这样本 stage 的模型层才能接着算。这是 PP 的核心数据流–每个 stage 只有部分层,需要前一个 stage 的输出作为输入。
_pp_recv_dict_from_prev_stage(接收 output)与 _pp_recv_proxy_tensors(接收 proxy)的区别
这两个接收操作虽然底层都走 _pp_recv_typed_dict,但它们在 语义、时机、数据内容、通信方向 上完全不同。
一、核心区别对比
| 维度 | _pp_recv_proxy_tensors(proxy) |
_pp_recv_dict_from_prev_stage(output) |
|---|---|---|
msg_type |
"proxy" |
"output" |
| 语义 | 中间隐藏状态(模型前向传播的中间产物) | 最终输出(next_token_ids + logprob) |
| 数据内容 | hidden_states + residual(形状 [num_tokens, hidden_size]) |
next_token_ids(形状 [batch_size])+ 可选的 logprob 张量 |
| 通信方向 | PP(k-1) → PP(k),前向传播方向 | PP(last) → PP(0),反向回传方向(或非 last rank 的转发) |
| 对应的发送端 | 上一个 PP stage 的 _pp_send_dict_to_next_stage(..., msg_type="proxy") |
last rank 的 _pp_send_dict_to_next_stage(..., msg_type="output") |
| 用途 | 作为当前 stage 模型 forward 的输入 | 作为 batch 后处理(process_batch_result)的输入 |
| 时机 | 在 _pp_launch_batch(GPU forward)之前接收 |
在 GPU forward 之后/并行接收(用于处理上一轮的结果) |
| 接收者 | 非 first rank(PP1, PP2, …) | 所有 rank(PP0 从 last rank 收,中间 rank 从上一个 rank 转发) |
二、在流水线中的位置
1 | PP Rank 0 PP Rank 1 PP Rank 2 (Last) |
三、数据内容详解
Proxy 张量(中间隐藏状态):
1 | # 发送端(_pp_launch_batch 之后) |
这是模型被纵向切分后,前半部分层的输出。下一个 PP stage 需要它作为输入继续跑后半部分层。没有它,下一个 stage 无法执行 forward。
Output 张量(最终输出):
1 | # 发送端(_pp_prepare_tensor_dict) |
这是 last rank 完成整个模型 forward + 采样后的最终结果。PP0 需要它来执行后处理(更新请求状态、判断是否结束、发送给 tokenizer 等)。
四、为什么需要分开?
时序不同:proxy 必须在 forward 前到位(是 forward 的输入);output 可以在 forward 后异步接收(是上一轮的结果,用于 CPU 后处理)。
方向不同:proxy 沿流水线正向流动(rank 0→1→2→…→last);output 从 last rank 反向回到 rank 0(形成环路)。
同一条 P2P 链路上交错到达:由于 PP 使用环形拓扑(last rank 的 next 就是 rank 0),同一对 rank 之间可能同时有 proxy 和 output 在传输。
_pp_recv_typed_dict的 demux 机制(按msg_type分拣)就是为了解决这个问题–收到不匹配类型的消息时暂存到 inbox,等需要时再取出。大小差异巨大:proxy 张量很大(
num_tokens × hidden_size,可能几十 MB);output 张量很小(batch_size个 int,几 KB)。这影响通信调度策略。
五、一句话总结
_pp_recv_proxy_tensors:接收的是 “半成品” –上一个 stage 跑完前半部分模型层后的隐藏状态,当前 stage 要拿它继续跑后半部分。_pp_recv_dict_from_prev_stage(output):接收的是 “成品” –last rank 跑完整个模型后采样出的 token,PP0 拿它做后处理(更新状态、返回用户)。
_pp_launch_batch:执行当前 Microbatch 的 Forward
_pp_launch_batch(scheduler_pp_mixin.py)对应 event_loop_pp 流程中的 step 5: run_batch,负责启动当前 microbatch 的 forward 计算:
1 | # 伪代码示意 |
关键行为:
- PP Rank 0(first rank):不需要 proxy tensors,直接用 token embeddings 作为 forward 输入
- PP Rank > 0:用
_pp_recv_proxy_tensors()收到的 hidden_states + residual 作为 forward 输入,接着上一个 stage 继续算 - 返回
result(包含pp_hidden_states_proxy_tensors,供后续发送给下一个 stage)和launch_event(用于同步)
在 event_loop_pp 中的调用位置:
1 | for mb_id in range(self.pp_loop_size): |
一句话总结:_pp_launch_batch 就是 PP 路径下的 forward 执行器–首 rank 用 embedding 输入,非首 rank 用上游传来的 hidden_states 输入,执行当前 stage 的模型层计算后输出给下一个 stage。
_pp_commit_send_output_work_and_preprocess_output_tensors:Output 的收发与预处理(对应 step 5: recv next_mb_id outputs + preprocess)
这个函数(scheduler_pp_mixin.py 第 854-873 行)对应 event_loop_pp 流程中的 step 5: recv next_mb_id outputs + preprocess,负责处理 next_mb_id 槽位对应的 output tensors 的发送、接收和预处理:
1 | def _pp_commit_send_output_work_and_preprocess_output_tensors( |
核心逻辑在 _pp_send_recv_and_preprocess_output_tensors(第 1081-1116 行),做了三件事:
1. 发送 output(last rank → rank 0)
1 | # Last rank: 从 last_rank_comm_queue 取出 forward 结果,发给 rank 0 |
2. 接收 output(rank 0 从 last rank 收)
1 | if mbs[next_mb_id] is not None: |
3. 预处理:GPU→CPU 拷贝(copy stream 上)
1 | with self.copy_stream_ctx: |
Output tensors 里有什么:
看 _pp_prepare_tensor_dict(第 920-933 行):
1 | tensor_dict = { |
这是 last rank 采样后的最终结果,不是中间的 hidden states。
在 event_loop_pp 中的调用位置:
1 | for mb_id in range(self.pp_loop_size): |
pp_async_batch_depth 的影响:
pp_async_batch_depth |
output 处理时机 | 效果 |
|---|---|---|
> 0 |
forward 之前处理 next_mb_id 槽位的 output | output 处理和当前 forward 重叠,隐藏延迟 |
== 0 |
forward 之后再处理 | 串行执行,不重叠 |
一句话总结:_pp_commit_send_output_work_and_preprocess_output_tensors 负责把 last rank 的采样结果(next_token_ids、logprobs)从 GPU 传回 rank 0,并在 rank 0 上做 GPU→CPU 拷贝和结果解包。这是 pipeline 的”最后一公里”–把最终输出送回给 tokenizer/rpc 层。
next_batch_result 是什么:
next_batch_result 是 next_mb_id 槽位对应的 microbatch 的最终输出结果(GenerationBatchResult),包含 next_token_ids 等。这里的逻辑是:当前迭代处理的是 mb_id,但同时要处理 next_mb_id 槽位的结果(因为流水线的延迟,next_mb_id 的 batch 在之前的迭代中已经完成了 GPU forward,output 在这一轮才到达)。
⚠️ 注意:
next_mb_id不是”上一个 microbatch”
next_mb_id = (mb_id + 1) % pp_loop_size,从当前 stage 的视角看,它是下一个要处理结果的槽位。但从前一个 PP stage 的视角看,这个槽位对应的是它们刚刚处理完、准备发给我们的下一个 microbatch。之所以叫
next_mb_id,是因为从前一个 stage 的角度,它是”下一个要发给我们的 microbatch”。不是”上一个”。简单记:
next_mb_id= 从前一个 PP stage 传过来的下一个 microbatch 槽位。
Output 与 Proxy 的发送完全分开:
_pp_commit_send_output_work_and_preprocess_output_tensors 只发 output,不发 proxy。
- output(
next_token_ids):通过_pp_send_output_to_next_stage发送(msg_type="output"),方向是 last rank → rank 0(环形) - proxy(
hidden_states + residual):在 event_loop_pp 循环末尾单独发送(msg_type="proxy"),方向是 rank k → rank k+1(正向)
两者完全独立,不会混在一起。
_pp_commit_comm_work:延迟等待异步通信完成
1 | def _pp_commit_comm_work(self: Scheduler, work: List[P2PWork]) -> None: |
作用:等待上一轮的异步通信完成。
因为 _pp_send_pyobj_to_next_stage、_pp_send_dict_to_next_stage 都用了 async_send=True,返回的是一个 P2PWork 列表(底层是 dist.isend 的 future)。_pp_commit_comm_work 就是在下一轮迭代中调用 .wait() 来确保上一轮的发送已经完成,然后清空 work 列表。
这是一种延迟等待的模式:发送时不阻塞,到下一轮再等待完成,从而让发送和 GPU 计算重叠。
_pp_commit_comm_work 的调用位置:
在 event_loop_pp 中,_pp_commit_comm_work 被调用于两个地方:
1 | for mb_id in range(self.pp_loop_size): |
self._pp_commit_comm_work(work=self.send_output_work)- 等待上一轮 output tensor 的 async send 完成self._pp_commit_comm_work(work=self.send_proxy_work)- 等待上一轮 proxy tensor 的 async send 完成
两者都是延迟等待,让上一轮的通信和当前轮的 GPU 计算重叠。
数据流全图:
1 | Last Rank 中间 Rank Rank 0 |
PP 路径中的 HiCache(分层 KV Cache)
HiCache(Hierarchical Cache)是 SGLang 的多级 KV Cache 管理机制,把 KV cache 存在 L3(CPU 内存或磁盘),需要时再加载到 GPU。在 PP scheduler 中,HiCache 不在 event_loop_pp 本身,而是在 event loop 调用的子流程中发挥作用。
HiCache 在 PP 调度中的四个阶段
1. 请求入队时 - 预取(Prefetch)
1 | # process_input_requests → handle_generate_request → _add_request_to_queue → _prefetch_kvcache |
时机:请求刚进入 waiting_queue 时,立即发起异步预取,利用等待调度的时间把 KV cache 从外部存储搬到 GPU。
2. 调度组 batch 时 - 检查预取进度 + 加载
get_next_batch_to_run → _get_new_batch_prefill_raw 中有三处:
1 | # 2a. 检查异步事件完成(第 2282-2283 行) |
3. 请求被 abort 时 - 释放预取资源
1 | # 在多个 abort 路径中(第 1965、1997、3146 行) |
4. flush cache 时 - 等待异步操作完成
1 | # 第 2877 行 |
check_hicache_events():异步事件推进器
check_hicache_events() 是 HiCache 的核心–每轮调度前轮询一次,把已完成的 GPU↔️Host↔️L3 异步拷贝操作确认掉(解锁节点、释放资源、推进状态)。
1 | # tree_cache.py 第 1136-1144 行 |
① writing_check()(第 676-717 行)- GPU→Host 写入
KV cache 从 GPU 写到 Host 内存(备份),是异步的。这个函数检查哪些写入完成了:
1 | # 遍历 ack_write_queue 里的 CUDA event |
作用:GPU 上的 KV cache 备份到 Host 完成后,解锁节点(可以被驱逐释放 GPU 显存),如果开了 L3 存储还会继续往 L3 写。
② loading_check()(第 719-732 行)- Host→GPU 加载
KV cache 从 Host 内存加载回 GPU,也是异步的:
1 | for _, finish_event, ack_list in self.cache_controller.ack_load_queue: |
作用:之前从 Host 加载回 GPU 的 KV cache 完成后,解锁节点,这些 KV cache 就可以被 forward 使用了。
③ drain_storage_control_queues()(第 1146-1172 行)- L3 存储控制
处理三个队列:
1 | cc = self.cache_controller |
| 队列 | 作用 |
|---|---|
prefetch_revoke_queue |
取消不再需要的 L3→Host 预取 |
ack_backup_queue |
确认 Host→L3 备份写入完成 |
host_mem_release_queue |
释放不再需要的 Host 内存 |
HiCache 三级缓存的数据流
1 | check_hicache_events() 推进的异步操作 |
HiCache 数据流
1 | 请求进入 waiting_queue |
一句话总结:HiCache 在 PP 调度中贯穿请求的完整生命周期–入队时异步预取、调度时检查进度、abort 时释放资源、flush 时等待完成。核心思想是利用请求在 waiting_queue 中的等待时间,提前把 KV cache 从 L3 搬到 GPU,隐藏加载延迟。
Host 内存驱逐:LRU 最小堆
当 host 内存不足时,evict_host() 使用 LRU 最小堆来决定驱逐哪些节点。
1 | # 1. 收集候选节点 |
三元组比较:
| 位置 | 内容 | 含义 |
|---|---|---|
| 第 1 个 | get_priority(node) |
LRU 策略下就是 last_access_time,值越小 = 越久没访问 = 越优先驱逐 |
| 第 2 个 | _evict_tie_breaker(node) |
当 priority 相同时的打破平局:PP>1 用内容 hash(跨 rank 一致),PP=1 用 node.id |
| 第 3 个 | node |
实际要驱逐的节点对象 |
PP 场景下的 tie-breaker 设计:
- PP=1:用
node.id作为 tie-breaker,简单高效 - PP>1:用内容 hash(
node.hash_value)作为 tie-breaker,保证不同 rank 对相同节点计算出相同的优先级顺序,确保驱逐一致性
一句话总结:把所有可驱逐的 host 叶子节点放进 LRU 最小堆,循环弹出堆顶(最久没访问的)逐个驱逐,直到释放够 num_tokens 个 token 的 host 内存。tie-breaker 保证 PP 多 rank 场景下驱逐顺序一致。
PP + HiCache 的 Tree Cache 同步问题(PR #22878)
PP + HiCache 组合使用时存在一个经典问题:各 PP rank 的 writing_check() 独立消费 GPU→Host 的 write-ack,由于各 rank 的 batch 内容和异步进度不同,消费数量可能不一致,导致各 rank 的 radix tree 状态出现分歧(哪些节点已备份、哪些可以驱逐不一致)。
解决方案:上游 rank 的消费数作为下游的预算上限,通过 piggyback 在已有的 PP 请求转发中传递。
1. writing_check() 末尾,PP rank 0 记录本轮消费数
1 | # hiradix_cache.py |
2. 通过 recv_reqs 捎带传给下游(scheduler_pp_mixin.py)
1 | # 非 last rank 发送请求时,把 ack_count 捎带上 |
3. 下游 rank 接收并设置预算(scheduler.py)
1 | # recv_requests() 中,PP rank > 0 解包捎带的 ack_count |
4. 下游 rank 的 writing_check 受限于上游预算
1 | # writing_check() 中,下游 rank 不能消费超过上游的数量 |
数据流:
1 | PP Rank 0 PP Rank 1 |
设计亮点:不新增通信通道,而是 piggyback 在已有的 _pp_send_pyobj_to_next_stage 上,仅 +61 行代码解决问题。
一句话总结:把上游 PP rank 的 write-ack 消费数量捎带在 recv_reqs 的 pyobj 通信上传给下游,下游以此为上限限制自己的消费,确保所有 PP rank 的 radix tree 状态变更保持一致,防止缓存驱逐决策不同步。
PP + HiCache 一致性修复进度(Issue #22607):
| 任务 | PR 状态 |
|---|---|
| CP 同步 | PR #20460 |
| 逻辑时钟替代 time.monotonic() | PR #22759(解决出错 3) |
| PD 模块 | PR #22771 |
| Mooncake CP 写控制 + PP storage key 隔离 | 待做 |
| Channel A: Host tree event replay | 待做 |
| Channel B: Write-back count sync | PR #22878 ✅ |
| Channel C: L3 hit delegation | 待做 |
| Zero-hit deferred revoke | 待做 |
| PP prefill 诊断工具 | 待做 |
Channel A:Host Tree Event Replay
Channel B(write-back count sync)只限制 PP1 不超前于 PP0,但不能保证两个 rank 在每一轮都消费相同数量。如果 PP1 的 CUDA event 本地没完成,min(local_ready=0, budget=3) = 0,PP1 这轮仍然消费 0。这意味着 host tree 的状态变更时机在两个 rank 之间可以有偏差,一旦偏差积累,就会导致 host tree 结构不同。
Channel A 的思路:不指望两个 rank 独立做出相同决策,而是让 PP0 做决策,PP1 回放 PP0 的结果。
具体场景:PP2 下的 host tree 分歧
假设 PP2 配置:PP0 管 layer 0-29,PP1 管 layer 30-59,两个 rank 各自独立维护一棵 radix tree。
第 1 步:正常运行,两个 rank 一致
请求 A、B、C 进来,两个 rank 收到相同请求、做相同调度。节点 X(系统提示词的 KV cache)在两个 rank 的 GPU 上都存在:
1 | PP0 radix tree: root → X(value=GPU, host_value=None, hit_count=5) |
第 2 步:hit_count 达到阈值,触发 write_backup
1 | PP0: write_backup(X) → CUDA 异步拷贝启动 → X 加入 ongoing_write_through |
第 3 步:writing_check 完成时间分歧(关键!)
1 | PP0: CUDA event 已完成 → finish_count=1 → X.lock_ref=0 |
Channel B 此时 PP0 把 consumed_count=1 传给 PP1,PP1 的 budget 变成 1。但 PP1 本地 finish_count=0,min(0, 1)=0,这轮 PP1 仍然消费 0。Channel B 只保证 PP1 不会超过 PP0,不能强迫 PP1 追上 PP0。
第 4 步:GPU 内存压力 → PP0 驱逐 X
1 | PP0: X 在 evictable_leaves 中(lock_ref=0, backuped=True) |
第 5 步:PP0 的 Host 内存也满了 → evict_host 删除 X
1 | PP0: write_backup(Z) → host 内存不足 → evict_host() |
第 6 步:新请求匹配前缀 → crash
1 | PP0: _match_prefix_helper → 找不到 X → device_indices=[], host_hit_length=0 |
Channel A 如何解决
1 | PP0 (权威源) PP1 (回放) |
三个 Channel 的分工
1 | 问题链条: |
一句话总结:Channel B + 逻辑时钟是”努力让两个 rank 独立做出相同决策”,但由于异步操作的固有非确定性,无法 100% 保证。Channel A 是”放弃独立决策,直接同步结果”,从根本上消除分歧。
PP 路径的调度细节
get_next_batch_to_run
这个调度器在 PP 路径下要考虑的东西比 normal 多得多:
- 当前 stage 的显存 - 不能 OOM
- Microbatch 的依赖关系 - 上下游 stage 的进度
- Pipeline 的吞吐 - 尽量让每个 stage 都有活干
Async Send/Recv 的 Overlap
第 2 步 send reqs 和 recv proxy tensors 是 async 的,理想情况下计算和通信能 overlap。但如果 recv 阻塞了,整个 loop 就卡住了。
1 | Stage 0: [compute mb0] → [send hidden to S1] → [recv hidden from S1] → [compute mb1] |
通信和计算的重叠程度决定了 pipeline 的效率。
PP 路径 vs 非 PP 路径对比
非 PP 路径(event_loop_normal / event_loop_overlap)
1 | 时间 → |
调度一个 batch,forward 一个 batch,处理结果,再调度下一个。同一时刻只有一个 batch。
PP 路径(event_loop_pp)
1 | 时间 → |
多个 microbatch 同时在不同 stage 中流转,主循环轮转处理每个 microbatch。
总结
SGLang 的 PP 实现有几个关键设计:
- 独立事件循环 - PP 路径和 normal/overlap 路径完全隔离,避免复杂度交叉
- Microbatch 填充 bubble - 通过
pp_loop_size = pp_size + async_depth控制 pipeline 中的并发度 - Async 通信 - send/recv 异步化,尽量 overlap 计算和通信
- 独立状态管理 - 每个 microbatch 槽位有独立的 running_batch、last_batch 等状态
PP 的核心挑战始终是 如何减少 bubble、提升 GPU 利用率,同时控制显存开销。Microbatch 数量的调优是 PP 性能调优的关键。
参考
- SGLang 源码:
scheduler_pp_mixin.py - Pipeline Parallelism 论文:GPipe: Efficient Training of Giant Neural Networks using Pipeline Parallelism