Pipeline Parallelism(PP)是多卡推理中的核心技术之一。SGLang 的 PP 实现有一套独立的事件循环和调度机制,和普通的 single-batch 路径完全不同。本文将从代码出发,深入分析 SGLang 的 PP 实现。
为什么需要 Pipeline Parallelism 单机推理的瓶颈 单卡推理时,GPU 利用率受限于模型大小和显存。一个 70B 模型单卡放不下,必须拆分到多卡。
模型并行方案对比
方案
原理
优点
缺点
TP (Tensor Parallelism)
按层切分权重
通信少,延迟低
受限于单节点卡数
PP (Pipeline Parallelism)
按层切分模型
支持任意层数
存在 pipeline bubble
DP (Data Parallelism)
每卡完整模型
简单,吞吐高
显存需求不变
SGLang 通常 TP + PP 组合使用:节点内 TP,跨节点 PP。
SGLang PP 架构概览 PP 路径的独立事件循环 SGLang 的调度器有三条事件循环路径:
event_loop_normal - 普通单 batch 调度
event_loop_overlap - 计算和通信 overlap 的调度
event_loop_pp - PP 路径的独立事件循环
PP 路径有自己独立的事件循环,定义在 scheduler_pp_mixin.py 的 SchedulerPPMixin 类中。它和 normal/overlap 路径完全隔离,因为 PP 的通信模式(send/recv proxy tensors 跨 stage)和单机调度逻辑差异太大,硬塞进同一个 event loop 会很脏。
event_loop_pp 核心流程 1 2 3 4 5 6 7 8 9 10 11 while True : for mb_id in range (pp_loop_size): 1. recv_requests() 2. _pp_send_pyobj_to_next_stage() 3. get_next_batch_to_run() 4. _pp_recv_proxy_tensors() 5. _pp_launch_batch() 6. _pp_commit_send_output_work_and_preprocess...() 7. _pp_process_batch_result() 8. _pp_send_dict_to_next_stage(msg_type="proxy" )
**⚠️ step 5 和 step 6 的顺序取决于 pp_async_batch_depth**:
pp_async_batch_depth > 0:step 6(output 收发+预处理)在 step 5(forward)之前 执行,与 forward 重叠,隐藏延迟
pp_async_batch_depth == 0:step 6(output 收发+预处理)在 step 5(forward)之后 执行,串行不重叠
上图展示的是 pp_async_batch_depth == 0 的顺序(step 6 在 step 5 之后)。
Microbatch 索引计算 :
核心概念:commit = 等待上一轮的异步操作完成
所有 _pp_commit_comm_work(work) 做的事情就是:
1 2 3 4 def _pp_commit_comm_work (self: Scheduler, work: List [P2PWork] ) -> None : for p2p_work in work: p2p_work.work.wait() work.clear()
即:commit 是一个同步屏障 ,确保上一轮发起的异步 send 已经完成,发送缓冲区可以安全复用。这是一种延迟等待模式–发送时不阻塞,到下一轮再等待完成,从而让发送和 GPU 计算重叠。
Microbatch 索引计算 :
1 2 next_mb_id = (mb_id + 1 ) % self .pp_loop_size next_first_rank_mb_id = (mb_id + self .pp_size) % self .pp_loop_size
next_mb_id:当前 rank 下一个要处理的 microbatch 槽位(偏移 1,因为每次迭代处理一个 mb)
next_first_rank_mb_id:first rank 对应的”下一轮”microbatch 槽位(偏移 pp_size,因为流水线深度–first rank 产出的 batch 要经过 pp_size 个迭代才到 last rank 完成)
步骤与函数的对应关系 :
步骤
函数
作用
recv ith req
recv_requests()
接收请求(PP0 从 zmq,PP>0 从前一个 rank P2P)
send ith req to next
_pp_send_pyobj_to_next_stage()
把请求异步转发给下一个 PP rank(提前发送,隐藏通信延迟)
get_next_batch_to_run
get_next_batch_to_run()
从 waiting_queue 调度请求组成 batch
recv ith proxy
_pp_recv_proxy_tensors()
接收上游 stage 的 hidden_states + residual(forward 输入)
send+recv output + preprocess
_pp_commit_send_output_work_and_preprocess_output_tensors()
发送 output 给下一个 stage + 接收 next_mb_id 槽位的 output + GPU→CPU 拷贝 + 解包
run ith batch
_pp_launch_batch()
执行当前 microbatch 的 forward 计算
process next_mb_id batch result
_pp_process_batch_result()
更新请求状态、判断是否结束、流式输出给 tokenizer
send ith proxy to next
_pp_send_dict_to_next_stage(msg_type="proxy")
发送 forward 输出的 hidden_states 给下一个 stage
关键设计 :
send req 提前 :收到请求后立即转发给下一个 rank(step 2),不等 forward 完成,隐藏 P2P 通信延迟
output 发送在 _pp_commit_send_output_work_and_preprocess_output_tensors 中处理 :output 的发送和接收都在 step 6 中完成,last rank 把 output 发给 rank 0,中间 rank 转发
**output 处理时机取决于 pp_async_batch_depth**:
> 0:output 收发+预处理在 forward 之前 执行,与当前 forward 重叠,隐藏延迟
== 0:output 收发+预处理在 forward 之后 执行,串行不重叠
延迟等待异步通信 :_pp_commit_comm_work 在下一轮迭代中调用 .wait() 确保上一轮的 async send 已完成,让发送和 GPU 计算重叠
每个 microbatch 槽位独立管理状态
async send + recv 实现计算和通信的 overlap
主循环轮转处理每个 microbatch,填充 pipeline bubble
完整流程示例(pp_size=2, pp_async_batch_depth=0) 以 pp_size=2, pp_async_batch_depth=0 为例,pp_loop_size=2,只有槽位 0 和 1:
1 2 3 4 5 6 参数: pp_size = 2(PP0 = first rank, PP1 = last rank) pp_async_batch_depth = 0 pp_loop_size = 2(槽位:mb_id ∈ {0, 1}) next_mb_id = (mb_id + 1) % 2 # 另一个槽位 next_first_rank_mb_id = (mb_id + 2) % 2 = mb_id # 注意:next_first_rank_mb_id == mb_id
单次迭代(mb_id = N)的完整步骤 :
步骤 1:恢复上下文
1 2 3 4 running_batch = running_mbs[N] last_batch = last_mbs[N] next_first_rank_mb_id = N next_mb_id = (N + 1 ) % 2
步骤 2:接收并处理请求
1 2 recv_reqs = recv_requests() process_input_requests(recv_reqs)
PP0 :从 tokenizer_manager / detokenizer 收请求(zmq 通道)
PP1 :从 PP0 收转发过来的请求(P2P,含 PPHiCacheEventsReq 控制消息)
步骤 3:HiCache 事件同步 + 转发请求
1 2 3 4 5 6 if enable_hierarchical_cache: tree_cache.check_hicache_events() if not is_last_rank: _pp_commit_comm_work(send_req_work) send_req_work = async_send(pp_send_payload) → PP1
commit 含义 :上一轮(槽位 (N-1)%2)发起的 send_req_work 可能还没完成,这里阻塞等它完成后,才能安全地复用发送缓冲区发送本轮的请求。
步骤 4:调度决策
1 2 3 mbs[N] = get_next_batch_to_run() running_mbs[N] = running_batch cur_batch = mbs[N]
步骤 5:接收 Proxy 张量
1 2 3 4 if cur_batch is not None : pp_proxy_tensors = _pp_recv_proxy_tensors()
步骤 6:等待上一轮 Proxy 发送完成
1 _pp_commit_comm_work(send_proxy_work)
commit 含义 :上一轮(槽位 (N-1)%2)PP0 发起的 proxy 异步发送可能还没完成,这里确保它完成,释放发送缓冲区。
步骤 7:GPU Forward 计算
1 2 3 4 5 if cur_batch: result, launch_event = _pp_launch_batch(N, pp_proxy_tensors, ...)
步骤 8:发送 Output + 接收 Output + 预处理
因为 pp_async_batch_depth == 0,这一步在 forward 之后执行(无法与 GPU 计算 overlap)。
1 2 3 _pp_commit_send_output_work_and_preprocess_output_tensors( next_first_rank_mb_id=N, next_mb_id=(N+1 )%2 , )
内部逻辑(_pp_send_recv_and_preprocess_output_tensors):
1 2 3 4 5 6 7 8 9 10 11 12 13 _pp_commit_comm_work(send_output_work)
commit 含义 :上一轮(槽位 (N-1)%2)发起的 output 异步发送可能还没完成,这里确保完成后才能发起新的 output 发送。
步骤 9:后处理 next_mb_id 槽位的 Batch 结果
1 2 3 4 5 if mbs[next_mb_id] is not None : d2h_event.synchronize() _pp_process_batch_result(mbs[next_mb_id], next_batch_result) last_mbs[next_mb_id] = mbs[next_mb_id]
步骤 10:发送 Proxy 张量(仅 PP0)
1 2 3 if not is_last_rank and cur_batch: torch.cuda.current_stream().wait_event(launch_event) send_proxy_work = async_send(hidden_states + residual) → PP1
步骤 11:保存状态
1 pp_outputs = next_pp_outputs
完整时序(稳态,两个槽位交替) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 ┌─────────────────────────────────────────────────────────────────┐ │ mb_id=0 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. recv_requests + process │ │ 2. commit(send_req_work[上轮mb=1的]) → async_send(reqs) │ │ 3. get_next_batch_to_run → cur_batch │ │ 4. recv_proxy (PP1阻塞等PP0; PP0跳过) │ │ 5. GPU forward (forward_stream) │ │ 6. commit(send_output_work[上轮mb=1的]) │ │ send_output → recv_output → prep_batch_result (copy_stream) │ │ 7. d2h_event.sync → process_batch_result(mbs[next_mb_id]的结果) │ │ 8. commit(send_proxy_work[上轮mb=1的]) │ │ 9. PP0: wait(launch_event) → async_send(proxy) → PP1 │ │10. pp_outputs = next_pp_outputs │ ├─────────────────────────────────────────────────────────────────┤ │ mb_id=1 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. recv_requests + process │ │ 2. commit(send_req_work[上轮mb=0的]) → async_send(reqs) │ │ 3. get_next_batch_to_run → cur_batch │ │ 4. recv_proxy (PP1阻塞等PP0; PP0跳过) │ │ 5. GPU forward (forward_stream) │ │ 6. commit(send_output_work[上轮mb=0的]) │ │ send_output → recv_output → prep_batch_result (copy_stream) │ │ 7. d2h_event.sync → process_batch_result(mbs[next_mb_id]的结果) │ │ 8. commit(send_proxy_work[上轮mb=0的]) │ │ 9. PP0: wait(launch_event) → async_send(proxy) → PP1 │ │10. pp_outputs = next_pp_outputs │ └─────────────────────────────────────────────────────────────────┘
PP0 与 PP1 的交互时序(稳态) 1 2 3 4 5 6 7 8 9 10 11 12 13 时间轴 → PP0 (mb_id=0): recv_req → commit(send_req) → send_req→PP1 → schedule → [skip proxy] → FORWARD → commit(send_output) → send_output(转发) → recv_output(从PP1) → prep → process_result → commit(send_proxy) → send_proxy→PP1 PP1 (mb_id=0): recv_req(从PP0) → process → schedule → recv_proxy(阻塞等PP0) → FORWARD → commit(send_output) → send_output→PP0 → recv_output(从PP0转发) → prep → process_result → commit(send_proxy)
关键依赖链 :
PP1 的 recv_proxy 必须等 PP0 的 send_proxy(上一轮的)完成
PP0 的 recv_output 必须等 PP1 的 send_output 完成
因为 pp_async_batch_depth=0,output 的发送/接收在 forward 之后,无法与 GPU 计算重叠
所有 commit 汇总
commit 调用
等待的是什么
目的
commit(send_req_work)
上一轮槽位发起的 reqs 异步发送
确保发送完成,可以安全复用缓冲区发送本轮 reqs
commit(send_proxy_work)
上一轮槽位发起的 proxy 异步发送
确保 hidden_states 张量已发送完毕,可以释放/复用
commit(send_output_work)
上一轮槽位发起的 output 异步发送
确保 output 张量已发送完毕,可以发起新的 output 发送
模式统一 :每个 async_send 在下一轮同一类型操作之前被 commit,形成”发起→(做其他事)→等待完成→再发起”的流水线模式。即使 pp_async_batch_depth=0 没有计算-通信 overlap,这种 commit 模式仍然保证了同类型通信操作之间不会冲突。
Microbatch:填充 Pipeline Bubble 问题:Pipeline Bubble PP2(2 个 stage)如果只用一个 batch,会出现大量空闲:
1 2 3 4 时间 → Stage 0: [ batch 0 ] [ batch 1 ] Stage 1: [ batch 0 ] [ batch 1 ] ↑ 空闲 ↑ 空闲
Stage 0 做完 batch 0 后必须等 Stage 1 处理完才能做 batch 1(因为只有一个 batch 在 pipeline 里流转),一半的时间都在空闲。这就是 pipeline bubble 。
解法:Microbatch 把多个 batch 同时塞进 pipeline,这些 batch 就叫 microbatch:
1 2 3 时间 → Stage 0: [ mb 0 ][ mb 1 ][ mb 2 ][ mb 3 ] Stage 1: [ mb 0 ][ mb 1 ][ mb 2 ][ mb 3 ]
Stage 0 做完 mb0 不用等,立刻做 mb1;Stage 1 也紧接着处理,bubble 大幅减少。
Microbatch 数量的确定 1 2 self .pp_loop_size = self .pp_size + self .server_args.pp_async_batch_depth
pp_size = 2(PP2)→ 至少 2 个 microbatch 槽位
pp_async_batch_depth → 额外的 buffer 深度,进一步隐藏延迟
每个 microbatch 都有独立的状态:
1 2 3 self .mbs = [None ] * self .pp_loop_size self .last_mbs = [None ] * self .pp_loop_size self .running_mbs = [ScheduleBatch(...) for _ in range (self .pp_loop_size)]
Microbatch vs 正常 Batch
维度
正常 batch
Microbatch(PP)
数量
同时 1 个
同时 pp_size + async_depth 个
目的
普通调度
填充 pipeline bubble,提升利用率
状态管理
单个 running_batch
数组 running_mbs[mb_id] 各自独立
本质
就是一个 batch
也是普通 batch,只是多个在 pipeline 中交替执行
Microbatch 和 batch 在数据结构上没有区别(都是 ScheduleBatch),区别只在于 PP 需要多个 batch 同时在不同 stage 中流转,所以给它们编号叫 microbatch。
Microbatch 数量不是越多越好 1 pp_loop_size = pp_size + pp_async_batch_depth
太少 (= pp_size):pipeline 容易断流,一个 stage 稍慢就导致下游空闲
太多 :显存占用线性增长(每个 microbatch 都要有自己的 KV cache + activation),而且可能因为显存压力导致 batch size 被迫缩小,反而吞吐下降
调优经验 :
pp_async_batch_depth = 1~2 通常足够,再深收益递减
显存充裕时可以试更大的 depth,但要注意 activation memory 的开销
如果 single batch 的 token 数很大(长上下文),microbatch 多了容易 OOM
PP Rank 的请求接收路径 recv_requests() 根据 PP rank 的不同,有两条完全不同的接收路径。接收的请求包括两类:
外部 API 请求 :tokenize 后的推理请求(TokenizedGenerateReqInput、TokenizedEmbeddingReqInput 等)
内部控制请求 :PP 同步用的控制消息(如 PPHiCacheEventsReq 等)
PP Rank 0(第一个 stage) 从上游组件直接接收用户请求,走两条 zmq 通道:
1 2 3 4 5 recv_req = self .recv_from_tokenizer.recv_pyobj(zmq.NOBLOCK) recv_rpc = self .recv_from_rpc.recv_pyobj(zmq.NOBLOCK)
接收的是 tokenize 后的请求对象,类型为:
TokenizedGenerateReqInput - 生成请求
TokenizedEmbeddingReqInput - embedding 请求
其他控制类消息(flush、abort 等)
PP Rank > 0(后续 stage) 不直接从 tokenizer 接收,而是从前一个 PP rank P2P 转发过来:
1 2 3 4 5 6 7 recv_reqs = point_to_point_pyobj( [], self .pp_rank * self .tp_size + dp_offset, self .world_group.cpu_group, (self .pp_rank - 1 ) * self .tp_size + dp_offset, self .pp_rank * self .tp_size + dp_offset, )
对应 event_loop_pp 中的流转 1 2 3 4 5 6 7 8 9 recv_reqs = self .recv_requests() self .process_input_requests(recv_reqs) if not self .pp_group.is_last_rank: self .send_req_work = self ._pp_send_pyobj_to_next_stage( recv_reqs, async_send=True , )
流转全图 1 2 3 4 5 6 7 Tokenizer / RPC │ zmq ▼ PP Rank 0: recv_requests() ── point_to_point_pyobj (async) ──→ PP Rank 1: recv_requests() │ │ ▼ ▼ process_input_requests() process_input_requests()
关键设计 :所有 PP rank 收到的内容是一样的(同样的请求列表),这样每个 stage 都能为同一批请求做 get_next_batch_to_run() 调度,保证各 stage 的 batch 一致。
recv_requests() 收到请求后,process_input_requests() 负责按类型分发到对应的 handler:
1 2 3 4 5 6 7 8 9 10 11 12 13 def process_input_requests (self, recv_reqs: List ): now = time.monotonic() self .session_controller.maybe_reap(now) for recv_req in recv_reqs: if is_health_check_generate_req(recv_req) and not self .is_fully_idle(): continue output = self ._request_dispatcher(recv_req) if output is not None : self .send_to_tokenizer.send_output(output, recv_req)
_request_dispatcher 是一个 TypeBasedDispatcher,根据请求类型路由到不同 handler:
核心推理请求 :
请求类型
Handler
说明
TokenizedGenerateReqInput
handle_generate_request
生成请求 → 加入 waiting_queue
TokenizedEmbeddingReqInput
handle_embedding_request
Embedding 请求
BatchTokenized*
handle_batch_*
批量请求
控制类请求 :
请求类型
Handler
说明
AbortReq
abort_request
中止请求
FlushCacheReqInput
flush_cache_wrapped
清空 KV cache
OpenSessionReqInput
open_session
开启会话
CloseSessionReqInput
close_session
关闭会话
ProfileReq
profile
性能分析
SlowDownReqInput
slow_down
降速
PauseGenerationReqInput
pause_generation
暂停生成
权重管理请求 :
请求类型
Handler
说明
UpdateWeightsFrom*
update_weights_from_*
更新模型权重(disk/distributed/tensor/IPC)
LoadLoRAAdapterReqInput
load_lora_adapter
加载 LoRA
UnloadLoRAAdapterReqInput
unload_lora_adapter
卸载 LoRA
一句话总结 :process_input_requests 就是一个请求路由器–生成请求进入 waiting_queue 等待后续调度,控制请求立即处理并回送结果,所有 PP rank 都执行同样的分发逻辑保证状态一致。
_pp_recv_proxy_tensors:接收上游 Stage 的中间结果 _pp_recv_proxy_tensors(scheduler_pp_mixin.py 第 993-1004 行)负责从上一个 PP stage 接收 forward 的中间激活值:
1 2 3 4 5 6 7 8 9 10 def _pp_recv_proxy_tensors (self: Scheduler ) -> Optional [PPProxyTensors]: pp_proxy_tensors = None if not self .pp_group.is_first_rank: pp_proxy_tensors = PPProxyTensors( self ._pp_recv_typed_dict( expected_kind="proxy" , all_gather_group=self .attn_tp_group if self .require_attn_tp_allgather else None , ) ) return pp_proxy_tensors
行为 :
PP Rank 0(first rank) :返回 None,因为它是第一个 stage,没有上一级给它传 hidden states,直接用 embedding 层的输出开始 forward。代码里有 is_first_rank 判断,PP0 不会做任何接收操作。
PP Rank > 0 :从前一个 PP rank 接收 tensor dict(msg_type="proxy"),包装成 PPProxyTensors
Proxy tensors 是什么 :
看 profile_and_init_predictor 里的构造(第 609-622 行)就清楚了:
1 2 3 4 5 proxy_tensors = { "hidden_states" : torch.zeros((seq_len, hidden_size), ...), "residual" : torch.zeros((seq_len, hidden_size), ...), } pp_proxy = PPProxyTensors(proxy_tensors)
就是上一个 PP stage forward 输出的中间激活值:
hidden_states - 当前层的隐藏状态
residual - 残差连接
在 event_loop_pp 中的位置 :
1 2 3 4 5 6 7 if self .cur_batch: pp_proxy_tensors = self ._pp_recv_proxy_tensors() ... result, self .launch_event = self ._pp_launch_batch( mb_id, pp_proxy_tensors, ... )
对应的发送端 :在 event_loop_pp 末尾(第 129-139 行),非 last rank 发送 proxy:
1 2 3 4 5 6 7 8 if not self .pp_group.is_last_rank: if self .cur_batch: torch.cuda.current_stream().wait_event(self .launch_event) self .send_proxy_work = self ._pp_send_dict_to_next_stage( result.pp_hidden_states_proxy_tensors.tensors, async_send=True , msg_type="proxy" , )
一句话总结 :_pp_recv_proxy_tensors 就是非首 rank 从前一个 PP stage 接收 forward 中间结果(hidden_states + residual),这样本 stage 的模型层才能接着算。这是 PP 的核心数据流–每个 stage 只有部分层,需要前一个 stage 的输出作为输入。
_pp_recv_dict_from_prev_stage(接收 output)与 _pp_recv_proxy_tensors(接收 proxy)的区别 这两个接收操作虽然底层都走 _pp_recv_typed_dict,但它们在 语义、时机、数据内容、通信方向 上完全不同。
一、核心区别对比
维度
_pp_recv_proxy_tensors(proxy)
_pp_recv_dict_from_prev_stage(output)
msg_type
"proxy"
"output"
语义
中间隐藏状态(模型前向传播的中间产物)
最终输出(next_token_ids + logprob)
数据内容
hidden_states + residual(形状 [num_tokens, hidden_size])
next_token_ids(形状 [batch_size])+ 可选的 logprob 张量
通信方向
PP(k-1) → PP(k),前向传播方向
PP(last) → PP(0),反向回传方向(或非 last rank 的转发)
对应的发送端
上一个 PP stage 的 _pp_send_dict_to_next_stage(..., msg_type="proxy")
last rank 的 _pp_send_dict_to_next_stage(..., msg_type="output")
用途
作为当前 stage 模型 forward 的输入
作为 batch 后处理(process_batch_result)的输入
时机
在 _pp_launch_batch(GPU forward)之前接收
在 GPU forward 之后/并行接收(用于处理上一轮的结果)
接收者
非 first rank(PP1, PP2, …)
所有 rank(PP0 从 last rank 收,中间 rank 从上一个 rank 转发)
二、在流水线中的位置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 PP Rank 0 PP Rank 1 PP Rank 2 (Last) ───────── ────────── ────────────── _pp_recv_proxy_tensors ← proxy (hidden_states + residual) _pp_launch_batch (forward) _pp_send_dict_to_next_stage → proxy (hidden_states + residual) ← proxy (hidden_states + residual) _pp_launch_batch (forward + sample) _pp_send_dict_to_next_stage → output (next_token_ids + logprobs) ← output (next_token_ids + logprobs) _pp_recv_dict_from_prev_stage (msg_type="output") _pp_commit_send_output_work_and_preprocess_output_tensors process_batch_result (更新请求状态、流式输出)
三、数据内容详解 Proxy 张量(中间隐藏状态) :
1 2 3 4 5 result.pp_hidden_states_proxy_tensors.tensors = { "hidden_states" : tensor([num_tokens, hidden_size]), "residual" : tensor([num_tokens, hidden_size]), }
这是模型被纵向切分后,前半部分层的输出。下一个 PP stage 需要它作为输入继续跑后半部分层。没有它,下一个 stage 无法执行 forward。
Output 张量(最终输出) :
1 2 3 4 5 6 7 8 9 tensor_dict = { "next_token_ids" : tensor([batch_size]), "input_token_logprobs" : ..., "normalized_prompt_logprobs" : ..., "prefill_top_logprobs" : ..., ... }
这是 last rank 完成整个模型 forward + 采样后的最终结果。PP0 需要它来执行后处理(更新请求状态、判断是否结束、发送给 tokenizer 等)。
四、为什么需要分开?
时序不同 :proxy 必须在 forward 前到位(是 forward 的输入);output 可以在 forward 后异步接收(是上一轮的结果,用于 CPU 后处理)。
方向不同 :proxy 沿流水线正向流动(rank 0→1→2→…→last);output 从 last rank 反向回到 rank 0(形成环路)。
同一条 P2P 链路上交错到达 :由于 PP 使用环形拓扑(last rank 的 next 就是 rank 0),同一对 rank 之间可能同时有 proxy 和 output 在传输。_pp_recv_typed_dict 的 demux 机制(按 msg_type 分拣)就是为了解决这个问题–收到不匹配类型的消息时暂存到 inbox,等需要时再取出。
大小差异巨大 :proxy 张量很大(num_tokens × hidden_size,可能几十 MB);output 张量很小(batch_size 个 int,几 KB)。这影响通信调度策略。
五、一句话总结
_pp_recv_proxy_tensors:接收的是 “半成品” –上一个 stage 跑完前半部分模型层后的隐藏状态,当前 stage 要拿它继续跑后半部分。
_pp_recv_dict_from_prev_stage(output):接收的是 “成品” –last rank 跑完整个模型后采样出的 token,PP0 拿它做后处理(更新状态、返回用户)。
_pp_launch_batch:执行当前 Microbatch 的 Forward _pp_launch_batch(scheduler_pp_mixin.py)对应 event_loop_pp 流程中的 step 5: run_batch ,负责启动当前 microbatch 的 forward 计算:
1 2 3 4 5 6 7 8 9 def _pp_launch_batch (self, mb_id, pp_proxy_tensors, ... ): batch = self .mbs[mb_id] result = self .worker.forward_batch(batch, pp_proxy_tensors) launch_event = torch.cuda.Event() launch_event.record() return result, launch_event
关键行为 :
PP Rank 0(first rank) :不需要 proxy tensors,直接用 token embeddings 作为 forward 输入
PP Rank > 0 :用 _pp_recv_proxy_tensors() 收到的 hidden_states + residual 作为 forward 输入,接着上一个 stage 继续算
返回 result(包含 pp_hidden_states_proxy_tensors,供后续发送给下一个 stage)和 launch_event(用于同步)
在 event_loop_pp 中的调用位置 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 for mb_id in range (self .pp_loop_size): ... pp_proxy_tensors = self ._pp_recv_proxy_tensors() result, self .launch_event = self ._pp_launch_batch( mb_id, pp_proxy_tensors, ... ) if not self .pp_group.is_last_rank: torch.cuda.current_stream().wait_event(self .launch_event) self .send_proxy_work = self ._pp_send_dict_to_next_stage( result.pp_hidden_states_proxy_tensors.tensors, async_send=True , msg_type="proxy" , )
一句话总结 :_pp_launch_batch 就是 PP 路径下的 forward 执行器–首 rank 用 embedding 输入,非首 rank 用上游传来的 hidden_states 输入,执行当前 stage 的模型层计算后输出给下一个 stage。
_pp_commit_send_output_work_and_preprocess_output_tensors:Output 的收发与预处理(对应 step 5: recv next_mb_id outputs + preprocess) 这个函数(scheduler_pp_mixin.py 第 854-873 行)对应 event_loop_pp 流程中的 step 5: recv next_mb_id outputs + preprocess ,负责处理 next_mb_id 槽位对应的 output tensors 的发送、接收和预处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def _pp_commit_send_output_work_and_preprocess_output_tensors ( self: Scheduler, next_first_rank_mb_id: int , next_mb_id: int , ) -> Tuple [PPProxyTensors, GenerationBatchResult, torch.cuda.Event]: self ._pp_commit_comm_work(work=self .send_output_work) (next_pp_outputs, next_batch_result, d2h_event, self .send_output_work) = \ self ._pp_send_recv_and_preprocess_output_tensors( next_first_rank_mb_id, next_mb_id, self .mbs, self .mb_metadata, self .last_rank_comm_queue, self .pp_outputs, ) return next_pp_outputs, next_batch_result, d2h_event
核心逻辑在 _pp_send_recv_and_preprocess_output_tensors(第 1081-1116 行),做了三件事 :
1. 发送 output(last rank → rank 0)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if self .pp_group.is_last_rank: q_event, pp_outputs_to_send = last_rank_comm_queue.popleft() torch.cuda.current_stream().wait_event(q_event) send_output_work = self ._pp_send_dict_to_next_stage( pp_outputs_to_send.tensors, msg_type="output" ) if not self .pp_group.is_last_rank: if pp_outputs: send_output_work = self ._pp_send_dict_to_next_stage( pp_outputs.tensors, msg_type="output" )
2. 接收 output(rank 0 从 last rank 收)
1 2 if mbs[next_mb_id] is not None : next_pp_outputs = PPProxyTensors(self ._pp_recv_dict_from_prev_stage())
3. 预处理:GPU→CPU 拷贝(copy stream 上)
1 2 3 4 5 6 7 with self .copy_stream_ctx: self .copy_stream.wait_stream(self .schedule_stream) batch_result = self ._pp_prep_batch_result( mbs[next_mb_id], mb_metadata[next_mb_id], next_pp_outputs ) d2h_event = torch.cuda.Event() d2h_event.record(torch.cuda.current_stream())
Output tensors 里有什么 :
看 _pp_prepare_tensor_dict(第 920-933 行):
1 2 3 4 5 tensor_dict = { "next_token_ids" : result.next_token_ids, } if batch.return_logprob: tensor_dict.update(logprob_dict)
这是 last rank 采样后的最终结果,不是中间的 hidden states。
在 event_loop_pp 中的调用位置 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 for mb_id in range (self .pp_loop_size): ... if self .server_args.pp_async_batch_depth > 0 : next_pp_outputs, next_batch_result, d2h_event = \ self ._pp_commit_send_output_work_and_preprocess_output_tensors( next_first_rank_mb_id, next_mb_id, ) result, self .launch_event = self ._pp_launch_batch(mb_id, ...) if self .server_args.pp_async_batch_depth == 0 : next_pp_outputs, next_batch_result, d2h_event = \ self ._pp_commit_send_output_work_and_preprocess_output_tensors(...) if self .mbs[next_mb_id] is not None : d2h_event.synchronize() self ._pp_process_batch_result(self .mbs[next_mb_id], next_batch_result)
pp_async_batch_depth 的影响 :
pp_async_batch_depth
output 处理时机
效果
> 0
forward 之前处理 next_mb_id 槽位的 output
output 处理和当前 forward 重叠,隐藏延迟
== 0
forward 之后再处理
串行执行,不重叠
一句话总结 :_pp_commit_send_output_work_and_preprocess_output_tensors 负责把 last rank 的采样结果(next_token_ids、logprobs)从 GPU 传回 rank 0,并在 rank 0 上做 GPU→CPU 拷贝和结果解包。这是 pipeline 的”最后一公里”–把最终输出送回给 tokenizer/rpc 层。
next_batch_result 是什么 :
next_batch_result 是 next_mb_id 槽位对应的 microbatch 的最终输出结果(GenerationBatchResult),包含 next_token_ids 等。这里的逻辑是:当前迭代处理的是 mb_id,但同时要处理 next_mb_id 槽位的结果(因为流水线的延迟,next_mb_id 的 batch 在之前的迭代中已经完成了 GPU forward,output 在这一轮才到达)。
⚠️ 注意:next_mb_id 不是”上一个 microbatch”
next_mb_id = (mb_id + 1) % pp_loop_size,从当前 stage 的视角看,它是下一个要处理结果的槽位 。但从前一个 PP stage 的视角看,这个槽位对应的是它们刚刚处理完、准备发给我们的下一个 microbatch 。
之所以叫 next_mb_id,是因为从前一个 stage 的角度,它是”下一个要发给我们的 microbatch”。不是”上一个”。
简单记 :next_mb_id = 从前一个 PP stage 传过来的下一个 microbatch 槽位。
Output 与 Proxy 的发送完全分开 :
_pp_commit_send_output_work_and_preprocess_output_tensors 只发 output,不发 proxy 。
output (next_token_ids):通过 _pp_send_output_to_next_stage 发送(msg_type="output"),方向是 last rank → rank 0(环形)
proxy (hidden_states + residual):在 event_loop_pp 循环末尾单独发送(msg_type="proxy"),方向是 rank k → rank k+1(正向)
两者完全独立,不会混在一起。
_pp_commit_comm_work:延迟等待异步通信完成1 2 3 4 def _pp_commit_comm_work (self: Scheduler, work: List [P2PWork] ) -> None : for p2p_work in work: p2p_work.work.wait() work.clear()
作用 :等待上一轮的异步通信完成。
因为 _pp_send_pyobj_to_next_stage、_pp_send_dict_to_next_stage 都用了 async_send=True,返回的是一个 P2PWork 列表(底层是 dist.isend 的 future)。_pp_commit_comm_work 就是在下一轮迭代中调用 .wait() 来确保上一轮的发送已经完成,然后清空 work 列表。
这是一种延迟等待的模式 :发送时不阻塞,到下一轮再等待完成,从而让发送和 GPU 计算重叠。
_pp_commit_comm_work 的调用位置 :
在 event_loop_pp 中,_pp_commit_comm_work 被调用于两个地方:
1 2 3 4 5 6 7 8 9 10 for mb_id in range (self .pp_loop_size): ... self ._pp_commit_comm_work(work=self .send_output_work) result, self .launch_event = self ._pp_launch_batch(mb_id, ...) self ._pp_commit_comm_work(work=self .send_proxy_work)
self._pp_commit_comm_work(work=self.send_output_work) - 等待上一轮 output tensor 的 async send 完成
self._pp_commit_comm_work(work=self.send_proxy_work) - 等待上一轮 proxy tensor 的 async send 完成
两者都是延迟等待,让上一轮的通信和当前轮的 GPU 计算重叠。
数据流全图 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Last Rank 中间 Rank Rank 0 ───────── ────────── ────── forward 完成 │ ├─ next_token_ids 入 comm_queue │ └─ send "output" ──────────→ recv & 转发 "output" ──────────→ recv "output" │ copy_stream: prep_batch_result (D2H) │ d2h_event.sync() │ process_batch_result() (更新请求状态、流式输出)
PP 路径中的 HiCache(分层 KV Cache) HiCache(Hierarchical Cache)是 SGLang 的多级 KV Cache 管理机制,把 KV cache 存在 L3(CPU 内存或磁盘),需要时再加载到 GPU。在 PP scheduler 中,HiCache 不在 event_loop_pp 本身,而是在 event loop 调用的子流程中发挥作用。
HiCache 在 PP 调度中的四个阶段 1. 请求入队时 - 预取(Prefetch)
1 2 3 4 5 6 7 8 9 def _prefetch_kvcache (self, req: Req ): if self .enable_hicache_storage: req.init_next_round_input(self .tree_cache, cow_mamba=False ) self .tree_cache.prefetch_from_storage( req.rid, last_host_node, new_input_tokens, last_hash, prefix_keys )
时机 :请求刚进入 waiting_queue 时,立即发起异步预取,利用等待调度的时间把 KV cache 从外部存储搬到 GPU。
2. 调度组 batch 时 - 检查预取进度 + 加载
get_next_batch_to_run → _get_new_batch_prefill_raw 中有三处:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 if self .enable_hierarchical_cache: self .tree_cache.check_hicache_events() for req in self .waiting_queue: ... if self .enable_hicache_storage: prefetch_done = self .tree_cache.check_prefetch_progress(req.rid) if not prefetch_done: continue req.storage_hit_length = self .tree_cache.pop_prefetch_loaded_tokens(req.rid) if self .enable_hierarchical_cache: new_batch.hicache_consumer_index = ( self .tree_cache.ready_to_load_host_cache() )
3. 请求被 abort 时 - 释放预取资源
1 2 3 if self .enable_hicache_storage: self .tree_cache.release_aborted_request(req.rid)
4. flush cache 时 - 等待异步操作完成
check_hicache_events():异步事件推进器 check_hicache_events() 是 HiCache 的核心–每轮调度前轮询一次,把已完成的 GPU↔️Host↔️L3 异步拷贝操作确认掉(解锁节点、释放资源、推进状态)。
1 2 3 4 5 6 def check_hicache_events (self ): self .writing_check() self .loading_check() if self .enable_storage: self .drain_storage_control_queues()
① writing_check()(第 676-717 行)- GPU→Host 写入
KV cache 从 GPU 写到 Host 内存(备份),是异步的。这个函数检查哪些写入完成了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 for _, finish_event, ack_list in self .cache_controller.ack_write_queue: if not finish_event.query(): break finish_count += 1 torch.distributed.all_reduce(queue_size, op=ReduceOp.MIN, group=self .tp_group) for ack_id in ack_list: backuped_node = self .ongoing_write_through.pop(ack_id) self .dec_lock_ref(backuped_node) if self .enable_storage: self .write_backup_storage(backuped_node)
作用 :GPU 上的 KV cache 备份到 Host 完成后,解锁节点(可以被驱逐释放 GPU 显存),如果开了 L3 存储还会继续往 L3 写。
② loading_check()(第 719-732 行)- Host→GPU 加载
KV cache 从 Host 内存加载回 GPU,也是异步的:
1 2 3 4 5 6 7 8 9 for _, finish_event, ack_list in self .cache_controller.ack_load_queue: if not finish_event.query(): break finish_count += 1 for ack_id in ack_list: end_node = self .ongoing_load_back.pop(ack_id) self .dec_lock_ref(end_node) del self .cache_controller.ack_load_queue[:finish_count]
作用 :之前从 Host 加载回 GPU 的 KV cache 完成后,解锁节点,这些 KV cache 就可以被 forward 使用了。
③ drain_storage_control_queues()(第 1146-1172 行)- L3 存储控制
处理三个队列:
1 2 3 4 5 6 7 8 cc = self .cache_controller qsizes = torch.tensor([ cc.prefetch_revoke_queue.qsize(), cc.ack_backup_queue.qsize(), cc.host_mem_release_queue.qsize(), ]) torch.distributed.all_reduce(qsizes, op=ReduceOp.MIN, group=self .tp_group)
队列
作用
prefetch_revoke_queue
取消不再需要的 L3→Host 预取
ack_backup_queue
确认 Host→L3 备份写入完成
host_mem_release_queue
释放不再需要的 Host 内存
HiCache 三级缓存的数据流 1 2 3 4 5 6 7 8 check_hicache_events() 推进的异步操作 ════════════════════════════════════ GPU (L1) ◄────── loading_check() ────── Host (L2) ◄──── drain (prefetch from L3) │ ▲ └──── writing_check() ────────────────────┘ │ └──── drain (backup to L3) ────→ L3 Storage
HiCache 数据流 1 2 3 4 5 6 7 8 9 10 请求进入 waiting_queue │ ├─ _prefetch_kvcache() → 异步预取 KV cache 从 L3 到 GPU │ ├─ get_next_batch_to_run() │ ├─ check_hicache_events() → 推进 D2H/H2D 事件 │ ├─ check_prefetch_progress() → 检查预取是否完成 │ └─ ready_to_load_host_cache() → 标记 host→GPU 加载 │ └─ 预取完成 → 请求进入 batch → forward
一句话总结 :HiCache 在 PP 调度中贯穿请求的完整生命周期–入队时异步预取、调度时检查进度、abort 时释放资源、flush 时等待完成。核心思想是利用请求在 waiting_queue 中的等待时间,提前把 KV cache 从 L3 搬到 GPU,隐藏加载延迟。
Host 内存驱逐:LRU 最小堆 当 host 内存不足时,evict_host() 使用 LRU 最小堆来决定驱逐哪些节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 leaves = list (self .evictable_host_leaves) eviction_heap = [ ( self .eviction_strategy.get_priority(node), self ._evict_tie_breaker(node), node, ) for node in leaves ] heapq.heapify(eviction_heap) while num_evicted < num_tokens and len (eviction_heap): _priority, _tb, x = heapq.heappop(eviction_heap) num_evicted += self .cache_controller.evict_host(x.host_value) x.parent.children.pop(key)
三元组比较 :
位置
内容
含义
第 1 个
get_priority(node)
LRU 策略下就是 last_access_time,值越小 = 越久没访问 = 越优先驱逐
第 2 个
_evict_tie_breaker(node)
当 priority 相同时的打破平局:PP>1 用内容 hash(跨 rank 一致),PP=1 用 node.id
第 3 个
node
实际要驱逐的节点对象
PP 场景下的 tie-breaker 设计 :
PP=1 :用 node.id 作为 tie-breaker,简单高效
PP>1 :用内容 hash(node.hash_value)作为 tie-breaker,保证不同 rank 对相同节点计算出相同的优先级顺序,确保驱逐一致性
一句话总结 :把所有可驱逐的 host 叶子节点放进 LRU 最小堆,循环弹出堆顶(最久没访问的)逐个驱逐,直到释放够 num_tokens 个 token 的 host 内存。tie-breaker 保证 PP 多 rank 场景下驱逐顺序一致。
PP + HiCache 的 Tree Cache 同步问题(PR #22878) PP + HiCache 组合使用时存在一个经典问题:各 PP rank 的 writing_check() 独立消费 GPU→Host 的 write-ack,由于各 rank 的 batch 内容和异步进度不同,消费数量可能不一致,导致各 rank 的 radix tree 状态出现分歧(哪些节点已备份、哪些可以驱逐不一致)。
解决方案 :上游 rank 的消费数作为下游的预算上限,通过 piggyback 在已有的 PP 请求转发中传递。
1. writing_check() 末尾,PP rank 0 记录本轮消费数
1 2 3 if self .pp_size > 1 and self .pp_rank < self .pp_size - 1 : self ._pp_last_write_ack_consumed += consumed_count
2. 通过 recv_reqs 捎带传给下游 (scheduler_pp_mixin.py)
1 2 3 4 5 6 7 8 9 if self .enable_hicache_storage: ack_count = self .tree_cache.get_pp_last_write_ack_consumed() if ack_count > 0 : pp_send_payload = { "recv_reqs" : recv_reqs, "pp_write_ack_count" : ack_count, } self .send_req_work = self ._pp_send_pyobj_to_next_stage(pp_send_payload)
3. 下游 rank 接收并设置预算 (scheduler.py)
1 2 3 4 5 if self .pp_rank > 0 and isinstance (recv_reqs, dict ) and "pp_write_ack_count" in recv_reqs: pp_write_ack_count = recv_reqs["pp_write_ack_count" ] recv_reqs = recv_reqs["recv_reqs" ] self .tree_cache.set_pp_upstream_write_ack_count(pp_write_ack_count)
4. 下游 rank 的 writing_check 受限于上游预算
1 2 3 4 if self .pp_rank > 0 and self ._pp_write_ack_budget_from_upstream is not None : finish_count = min (finish_count, self ._pp_write_ack_budget_from_upstream) self ._pp_write_ack_budget_from_upstream -= finish_count
数据流 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 PP Rank 0 PP Rank 1 ───────── ───────── writing_check() consumed 3 acks ──→ _pp_last_write_ack_consumed = 3 │ ├─ send_pyobj({recv_reqs: [...], pp_write_ack_count: 3}) │ └──────────→ recv_requests() 解包 _pp_write_ack_budget = 3 │ writing_check() 本地完成了 5 个 但 min(5, 3) = 3 ← 只消费 3 个 保持和 Rank 0 一致
设计亮点 :不新增通信通道,而是 piggyback 在已有的 _pp_send_pyobj_to_next_stage 上,仅 +61 行代码解决问题。
一句话总结 :把上游 PP rank 的 write-ack 消费数量捎带在 recv_reqs 的 pyobj 通信上传给下游,下游以此为上限限制自己的消费,确保所有 PP rank 的 radix tree 状态变更保持一致,防止缓存驱逐决策不同步。
PP + HiCache 一致性修复进度 (Issue #22607):
任务
PR 状态
CP 同步
PR #20460
逻辑时钟替代 time.monotonic()
PR #22759(解决出错 3)
PD 模块
PR #22771
Mooncake CP 写控制 + PP storage key 隔离
待做
Channel A: Host tree event replay
待做
Channel B: Write-back count sync
PR #22878 ✅
Channel C: L3 hit delegation
待做
Zero-hit deferred revoke
待做
PP prefill 诊断工具
待做
Channel A:Host Tree Event Replay Channel B(write-back count sync)只限制 PP1 不超前于 PP0,但不能保证两个 rank 在每一轮都消费相同数量。如果 PP1 的 CUDA event 本地没完成,min(local_ready=0, budget=3) = 0,PP1 这轮仍然消费 0。这意味着 host tree 的状态变更时机在两个 rank 之间可以有偏差,一旦偏差积累,就会导致 host tree 结构不同。
Channel A 的思路 :不指望两个 rank 独立做出相同决策,而是让 PP0 做决策,PP1 回放 PP0 的结果。
具体场景:PP2 下的 host tree 分歧 假设 PP2 配置:PP0 管 layer 0-29,PP1 管 layer 30-59,两个 rank 各自独立维护一棵 radix tree。
第 1 步:正常运行,两个 rank 一致
请求 A、B、C 进来,两个 rank 收到相同请求、做相同调度。节点 X(系统提示词的 KV cache)在两个 rank 的 GPU 上都存在:
1 2 3 PP0 radix tree: root → X(value=GPU, host_value=None, hit_count=5) PP1 radix tree: root → X(value=GPU, host_value=None, hit_count=5) ✅ 一致
第 2 步:hit_count 达到阈值,触发 write_backup
1 2 3 PP0: write_backup(X) → CUDA 异步拷贝启动 → X 加入 ongoing_write_through PP1: write_backup(X) → CUDA 异步拷贝启动 → X 加入 ongoing_write_through ✅ 一致
第 3 步:writing_check 完成时间分歧(关键!)
1 2 3 4 5 6 7 PP0: CUDA event 已完成 → finish_count=1 → X.lock_ref=0 X.backuped=True, X.host_value 有值 X 进入 evictable_leaves(可被驱逐) PP1: CUDA event 未完成 → finish_count=0 → X.lock_ref=1 仍然锁定 X 不在 evictable_leaves 中 ⚠️ 开始分歧
Channel B 此时 PP0 把 consumed_count=1 传给 PP1,PP1 的 budget 变成 1。但 PP1 本地 finish_count=0,min(0, 1)=0,这轮 PP1 仍然消费 0。Channel B 只保证 PP1 不会超过 PP0,不能强迫 PP1 追上 PP0。
第 4 步:GPU 内存压力 → PP0 驱逐 X
1 2 3 4 5 6 7 8 PP0: X 在 evictable_leaves 中(lock_ref=0, backuped=True) → _evict_backuped(X) → X.value=None → X.evicted=True X 现在只存在于 Host 内存 PP1: X 不在 evictable_leaves(lock_ref=1, 还在 ongoing_write_through) → 驱逐了另一个节点 Y X 仍在 GPU 上 ⚠️ 分歧加大
第 5 步:PP0 的 Host 内存也满了 → evict_host 删除 X
1 2 3 4 5 6 7 8 PP0: write_backup(Z) → host 内存不足 → evict_host() → X 是 evictable_host_leaves 中优先级最低的 → parent.children.pop(X) → X 从 host tree 彻底删除! PP1: host 内存没有压力(X 没被驱逐到 host) → 不触发 evict_host → X 仍在 radix tree 中 ❌ 两个 rank 的 tree 结构不同了
第 6 步:新请求匹配前缀 → crash
1 2 3 4 5 6 7 PP0: _match_prefix_helper → 找不到 X → device_indices=[], host_hit_length=0 PP1: _match_prefix_helper → 匹配到 X → device_indices=X.value, host_hit_length=0 PP0: prefix_indices 长度 = 0 → extend_input_len = 完整长度 PP1: prefix_indices 长度 = len(X) → extend_input_len = 更短 两个 rank 对同一个请求算出不同的 extend_input_len → batch 形状不一致 → crash
Channel A 如何解决 1 2 3 4 5 6 7 8 9 10 11 12 13 PP0 (权威源) PP1 (回放) ─────────────── ───────── writing_check: X 完成 emit FINALIZE(hash=X_hash) ─────→ replay FINALIZE: 标记 X 为 backuped (不管本地 CUDA event 是否完成) evict: 驱逐 X emit EVICT(hash=X_hash) ─────→ replay EVICT: 也驱逐 X evict_host: 删除 X emit REVOKE(hash=X_hash) ─────→ replay REVOKE: 也从 host tree 删除 X PP1 的 host tree 始终和 PP0 一致 ✅
三个 Channel 的分工 1 2 3 4 5 6 7 8 9 10 11 12 13 14 问题链条: writing_check 时机不同 → lock_ref 不同 → evictable 集合不同 → 驱逐不同 → host tree 结构不同 → match_prefix 不同 → crash Channel B (count sync): ────────┐ 限制 PP1 不超前于 PP0 │ 减小分歧窗口, 减少 lock_ref 偏差 │ 但不能消除 │ 逻辑时钟 + tie-breaker: ────────┤ 保证驱逐顺序在"相同输入"下一致 LRU 顺序确定性 │ 但前提是输入(evictable 集合)相同 │ Channel A (event replay): ────────┘ 根本解决:PP1 不独立决策 PP0 的 host tree 变更直接回放到 PP1 host tree 结构保证一致
一句话总结 :Channel B + 逻辑时钟是”努力让两个 rank 独立做出相同决策”,但由于异步操作的固有非确定性,无法 100% 保证。Channel A 是”放弃独立决策,直接同步结果”,从根本上消除分歧。
PP 路径的调度细节 get_next_batch_to_run 这个调度器在 PP 路径下要考虑的东西比 normal 多得多:
当前 stage 的显存 - 不能 OOM
Microbatch 的依赖关系 - 上下游 stage 的进度
Pipeline 的吞吐 - 尽量让每个 stage 都有活干
Async Send/Recv 的 Overlap 第 2 步 send reqs 和 recv proxy tensors 是 async 的,理想情况下计算和通信能 overlap。但如果 recv 阻塞了,整个 loop 就卡住了。
1 2 Stage 0: [compute mb0] → [send hidden to S1] → [recv hidden from S1] → [compute mb1] Stage 1: [recv hidden from S0] → [compute mb0] → [send hidden to S0] → [recv hidden from S0]
通信和计算的重叠程度决定了 pipeline 的效率。
PP 路径 vs 非 PP 路径对比 非 PP 路径(event_loop_normal / event_loop_overlap) 1 2 时间 → GPU: [ batch 0 forward ] → [ batch 1 forward ] → [ batch 2 forward ]
调度一个 batch,forward 一个 batch,处理结果,再调度下一个。同一时刻只有一个 batch。
PP 路径(event_loop_pp) 1 2 3 时间 → Stage 0: [ mb 0 ][ mb 1 ][ mb 2 ][ mb 3 ] Stage 1: [ mb 0 ][ mb 1 ][ mb 2 ][ mb 3 ]
多个 microbatch 同时在不同 stage 中流转,主循环轮转处理每个 microbatch。
总结 SGLang 的 PP 实现有几个关键设计:
独立事件循环 - PP 路径和 normal/overlap 路径完全隔离,避免复杂度交叉
Microbatch 填充 bubble - 通过 pp_loop_size = pp_size + async_depth 控制 pipeline 中的并发度
Async 通信 - send/recv 异步化,尽量 overlap 计算和通信
独立状态管理 - 每个 microbatch 槽位有独立的 running_batch、last_batch 等状态
PP 的核心挑战始终是 如何减少 bubble、提升 GPU 利用率 ,同时控制显存开销。Microbatch 数量的调优是 PP 性能调优的关键。
参考