Transfer 的实现在 vllm/distributed/kv_transfer,这种解耦的设计是为了对接多种实现,比如 Mooncake 的开源 TransferEngine。Transfer 内部会调用 connector 的 send 和 recv 方法,这个方法是一个抽象方法,需要子类实现。目前有两种实现:Mooncake 的 transfer 和 PyNccl 的 transfer。
1 2 3 4 5 6 7 8 9 10 11 12
# Register various connectors here. # The registration should not be done in each individual file, as we want to # only load the files corresponding to the current connector. KVConnectorFactory.register_connector( "PyNcclConnector", "vllm.distributed.kv_transfer.kv_connector.simple_connector", "SimpleConnector")
During the pre-training stage, training DeepSeek-V3 on each trillion tokens requires only 180K H800 GPU hours, i.e., 3.7 days on our cluster with 2048 H800 GPUs.
他证明梯度可能具有低秩结构,如果我们能够在优化器状态中保留梯度的一个小 “核心” 的梯度统计信息,而不是完整的梯度本身,那么内存消耗就可以大幅降低。这就引出了 GaLore 策略。 他的关键思想是利用权重矩阵 W 的梯度 G 上做LoRA,而不是试图将权重矩阵本身近似为低秩。他的核心逻辑用Torch写出来如下:
1 2 3 4 5 6 7 8 9
for weight in model.parameters(): grad = weight.grad # original space -> compact space lor_grad = project(grad) # update by Adam, Adafactor, etc. lor_update = update(lor_grad) # compact space -> original space update = project_back(lor_update) weight.data += update
如果从远程内存获取KVCache,会增加数据传输时间,从而延长TTFT(Time To First Token)。因此,当本地KVCache的增量计算时间少于传输时间时,可以复用本地的KVCache,即使它不是最匹配的。而增大batch意味着系统处理的大批量数据,导致TBT(Token Between Token)延长,可以将负载均衡到低负载的Decode Instance。
ZELBETH: Sey solmenter! 'tis tonguerered if berryishdd, and What his stabe, you, and, but all I pilJefals, mode with, Vurint as steolated have loven OlD the queen'd refore Are been, good plmp:
Proforne, wift'es swleen, was no bunderes'd a a quain beath! Tybell is my gateer stalk smen'das be matious dazest brink thou lord Enves were cIUll, afe and whwas seath This a is, an tale hoice his his onety Meall-tearn not murkawn, fase bettizen'd her, To belacquesterer? baxewed wupl usweggs yet tall An
":\nBut, that I'll", "\nBut, that I'll "
"ng?\nWhy, then th", "g?\nWhy, then the"
"s so blind, but ", " so blind, but s"
"thy offices,\nSo ", "hy offices,\nSo r"
"ords, how plainl", "rds, how plainly"
"IET:\nHere's such", "ET:\nHere's such "
"wer\nTo take off ", "er\nTo take off s"
" hurry from the ", "hurry from the f"
['\nFind!\nD:\nAr t,\nLis sthte o t l',
'\nAnd ronnot ar\nBE:\nKINRDYOrspr;',
'\nI t athe momyengthend thanswal',
'\nFis t bp he\nLacarn.\nA:\nYOMI wi',
'\nWh ly sck\nB-de pll t\nHERIns ou']
scheduled_running_reqs.append(request) req_to_new_block_ids[request.request_id] = [ b.block_id for b in new_blocks ] num_scheduled_tokens[request.request_id] = num_new_tokens token_budget -= num_new_tokens req_index += 1
如果没有抢占请求则说明还是比较富裕的,尝试从 waiting list 中获取 request,waiting list 可能有新请求也可能有之前被抢占的请求,然后执行一遍上面的代码,不同的是需要从 kv_cache_manager 计算 computed_tokens,因为被之前被抢占的或者一些有共同前缀的 kv cache block 是已经缓存过的。
1 2 3 4 5 6 7 8
request = self.waiting[0] # Get already-cached tokens. computed_blocks = self.kv_cache_manager.get_computed_blocks( request) # NOTE(woosuk): Since incomplete blocks are not eligible for # sharing, `num_computed_tokens` is always a multiple of # `block_size`. num_computed_tokens = len(computed_blocks) * self.block_size
if request.num_computed_tokens == request.num_tokens: req_index = model_runner_output.req_id_to_index[req_id] # NOTE(woosuk): Currently, we assume that each request # generates at most one token at each step. token_id = sampled_token_ids[req_index] request.append_output_token_ids(token_id) num_new_tokens = 1 # TODO: Update the KV cache manager for prefix caching.
# Check for stop and update request state. # This must be called before me make the EngineCoreOutput. stopped = self._check_stop(request)
# Add EngineCoreOutput for this Request. output = EngineCoreOutput( request_id=req_id, new_token_ids=request.output_token_ids[-num_new_tokens:], finished=request.is_finished(), finish_reason=request.get_finished_reason(), stop_reason=request.stop_reason) engine_core_outputs.append(output)