ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

vLLM 分析 1 提示词的前置处理和流式响应

vLLM 基于 uvicorn + FastAPI 的异步 Web 框架构成。vLLM 的主体是 LLMEngine,它是一个单例类,负责管理所有的模型和数据。在异步 API 中使用的是一个 AsyncEngine。在分析 AsyncEngine 之前,我们先将 Web 部分单独拆出来看一下。

vLLM 的 CLI 入口是 vllm/scripts.py,其中 serve 的启动是通过 uvloop.run 的方式启动的。uvloop 是一个替代默认 asyncio 事件循环的库,它使用 libuv 作为事件循环的实现,从而提高性能。uvicorn 是一个基于 uvloop 的 ASGI 服务器,它可以将 ASGI 应用部署到 Web 服务器上。FastAPI 是一个基于 Starlette 的 Web 框架,它提供了许多便利的功能,比如自动文档生成、请求参数校验等。

参数经过解析以后会进入 run_server,通过 uvloop.run(run_server(args))run_serverentrypoints/openai/api_server.py 下面。AsyncEngineArgs.from_cli_args(args) 使用命令行参数初始化 AsyncEngineArgs,如果要自行封装的话可以直接初始化 AsyncEngineArgsAsyncEngineArgs 继承自 EngineArgs,其中的参数都是用来控制推断命令的。

比较常用的几个参数:

  • model: 模型的路径,可以是一个目录,也可以是 hf 上的一个 repo。

  • model_name: 如果是目录的话,期望的模型名称,或者想要改个别名,对应的是 API 中指定模型的名称。

  • tensor_parallel_size: tensor parallel 副本数,如果用多个 GPU 可以用到,会根据这个将 kv head 平分到不同的 GPU 上。

  • pipeline_parallel_size: pipeline stages 数,如果用多个 GPU 可以用到,会根据这个将模型的前向计算的layers分成多个阶段,每个阶段在不同的 GPU 上计算。

    可以参考下面这个例子:

    假设我们有 8 个 GPU,分别表示为 g0 … g7,并且我们使用 2 个 GPU 来并行化模型张量,使用 4 个 GPU 来并行化模型流水线。当前函数将创建 4 个张量模型并行组和 2 个流水线模型并行组:

    4 个张量模型并行组:

    • [g0, g1]
    • [g2, g3]
    • [g4, g5]
    • [g6, g7]

    2 个流水线模型并行组:

    • [g0, g2, g4, g6]
    • [g1, g3, g5, g7]

    注意,为了提高效率,调用者应确保相邻的 rank 位于同一个 DGX 盒子上。例如,如果我们使用 2 个 DGX-1 盒子,总共有 16 个 GPU,rank 0 到 7 属于第一个盒子,rank 8 到 15 属于第二个盒子。

  • num_seqs: 最大的序列数,其实就是 batch size,会翻倍得增加显存使用,这个貌似在启动之前的 profile 阶段可能会导致大量显存的占用。

  • quantization: 量化的方法,可以是 bitsandbytes 等,可能需要和 load_format 结合使用。

  • load_format: 加载模型的格式,可以是 pt, safetensors, bitsandbytes 等等,如果用到量化的模型基本要改成 bitsandbytes。

  • dtype: 数据类型,fp32, fp16,bf16 等等,如果模型是 bf16 的话,他默认是 bf16 的模型用 bf16,有些显卡不支持 bf 浮点数所以要设置成 half 也就是 fp16。

  • host: 监听地址。

  • port: 监听端口。

  • max_model_len: 上下文长度,适合显存不足的显卡,把默认的上下文长度改下一点。

  • enforce_eager: 是否强制使用 eager 模式,如果显存不够的需要开启这个模式,不完全加载计算图的方式可以减少显存的使用。

api_server 中的 build_app 会使用 APIRouter 初始化路由,并通过 app.include_router 引入。

主要看 @router.post("/v1/chat/completions") 注册的 async def create_chat_completion 是最常用的函数调用。

init_app_stateapp.state 中保存了 openai_serving_chat,以及其他一些接口的状态,这取决于模型配置中是否包含这些功能。例如,文本嵌入等功能(通常都有)。当调用 create_chat_completion 时,会调用 openai_serving_chat 对应的 OpenAIServingChat 类的方法。因此,Serving 的主体可以通过查看这个对象的方法来理解其功能。

构建 AsyncEngine -> 构建 app 对象。

OpenAIServingChat.create_chat_completion 主体流程

  1. 检查模型

    • 是否支持 model,model 是否是 rola,model 是否是 prompt adapter 等。

      vLLM 的 rola 不是和基座合并在一起的,是支持基座模型加多了个 lora 模型的形式。prompt adapter 看起是多模态架构中的 adaptor。

  2. 从 Engine 中获取 Tokenizer

    • 主要是基于 model path 获取对应的 tokenizer 文件,并初始化对应的 tokenizer。
  3. _preprocess_call:对输入进行预处理

    • resolve_chat_template_content_format:检查对话模板格式,因为每种大模型的用于生成文本的训练数据的格式有所不同,要确认对应的格式,LLAMA 有 LLAMA 的格式,可以参考下面的例子。
    • parse_chat_messages_futures:解析输入的聊天消息,生成一个对话消息列表,变成有类型的对话消息。其中 mm_tracker 要处理 image_urlaudio_url 的消息,会根据构造 placeholderplaceholder 是一个特殊的字符串,用来标记这个位置是一个占位符。llama3.2 用的是 <|image|>
    • apply_{hf,mistral}_chat_template:模板会给提示词添加提示词的开头和结束的标志,从而和实际训练的数据标注对齐,比如 llama3<|eot_id|> 标记结束,padding 等。request_promptengine_prompt 包含 token ids 和多模态数据。
      例如:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    chat = [
    {
    "role": "user",
    "content": [
    {"type": "image"},
    {"type": "text", "text": "If I had to write a haiku for this one, it would be: "}
    ]
    }
    ]

    会变成 <|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n<|image|>If I had to write a haiku for this one, it would be: <|eot_id|> 中,<|start_header_id|>user<|end_header_id|> 标识 header(也就是 role),<|begin_of_text|> 标识上下文的开头,<|eot_id|> 标识一个消息的结束。除此之外,对于function call的处理,可以参考 examples/tool_chat_template_llama3.2_json.jinja 的一部分可以看出,会把对应工具的调用和提示词加入到用户对话前面,作为 user 的 text 的前缀中的内容形成提示词的一部分上下文。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    {#- Custom tools are passed in a user message with some extra guidance #}
    {%- if tools_in_user_message and not tools is none %}
    {#- Extract the first user message so we can plug it in here #}
    {%- if messages | length != 0 %}
    {%- if messages[0]['content'] is string %}
    {%- set first_user_message = messages[0]['content']|trim %}
    {%- else %}
    {%- set first_user_message = messages[0]['content'] | selectattr('type', 'equalto', 'text') | map(attribute='text') | map('trim') | join('\n') %}
    {%- endif %}
    {%- set messages = messages[1:] %}
    {%- else %}
    {{- raise_exception("Cannot put tools in the first user message when there's no first user message!") }}
    {%- endif %}
    {{- '<|start_header_id|>user<|end_header_id|>\n\n' -}}
    {{- "Given the following functions, please respond with a JSON for a function call " }}
    {{- "with its proper arguments that best answers the given prompt.\n\n" }}
    {{- 'Respond in the format {"name": function name, "parameters": dictionary of argument name and its value}. ' }}
    {{- "Do not use variables.\n\n" }}
    {%- for t in tools %}
    {{- t | tojson(indent=4) }}
    {{- "\n\n" }}
    {%- endfor %}
    {{- first_user_message + "<|eot_id|>"}}
    {%- endif %}
    • 请求处理:生成请求的 id request_id = f"chatcmpl-{request.request_id}",确定采样方法 beam_search 还是 sampling,调用 AsyncEngine 的 beam_searchgenerate 方法获得一个 generator。

    • chat_completion_stream_generator 是基于 generator 处理响应,这里主要看 streaming 的部分,同步的请求会直接返回结果。流式响应的格式是多个基于 json 格式的 chunk,类型是 chat.completion.chunk

    1
    {"id": "chatcmpl-1eadb733adf64f5b90114307b2d4d718", "choices": [{"delta": {"content": "", "function_call": null, "refusal": null, "role": "assistant", "tool_calls": null}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1732869116, "model": "llama3.2", "object": "chat.completion.chunk", "service_tier": null, "system_fingerprint": null, "usage": null}
    1
    {"id": "chatcmpl-1eadb733adf64f5b90114307b2d4d718", "choices": [{"delta": {"content": "AI", "function_call": null, "refusal": null, "role": null, "tool_calls": null}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1732869116, "model": "llama3.2", "object": "chat.completion.chunk", "service_tier": null, "system_fingerprint": null, "usage": null}
    1
    {"id": "chatcmpl-1eadb733adf64f5b90114307b2d4d718", "choices": [{"delta": {"content": " assistant", "function_call": null, "refusal": null, "role": null, "tool_calls": null}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1732869116, "model": "llama3.2", "object": "chat.completion.chunk", "service_tier": null, "system_fingerprint": null, "usage": null}
    1
    {"id": "chatcmpl-1eadb733adf64f5b90114307b2d4d718", "choices": [{"delta": {"content": "", "function_call": null, "refusal": null, "role": null, "tool_calls": null}, "finish_reason": "stop", "index": 0, "logprobs": null}], "created": 1732869116, "model": "llama3.2", "object": "chat.completion.chunk", "service_tier": null, "system_fingerprint": null, "usage": null}

    AsyncEngine Client 的 generate 会返回一个异步生成器,result_generator,通过 async for 遍历这个生成器 result,而 result 又是一个 output 的生成器。num_cached_tokens 表示前缀匹配的 kv cache 命中的 token 数量。request.n 代表要生成的选择的数量,一般是 1,如果大于 1 就会生成多个选择的分支,而 response 中的 index 就会代表不同的分支的序号。result 生成器对应的就是多个分支的结果,而 result 中的 output 就代表一个分支中的 chunk。处理过程中会把 output 转化成 ChatCompletionStreamResponse,输出成 data: $json_dump 的 SSE chunk 的形式。stream_options.include_usage 如果设置了的话会在 DONE 之前返回一个 usage stats 的 chunk。

    • tool_parser:解析工具描述。方法和对应的类在 openai/tool_parsers 下面,会根据传入的初始化参数决定对应的解析类。如果对应的 request 有 tool_choice 参数,就会使用到 tool_parser,tool_parser 主要用于处理响应中的 tool call 的文本内容。tool_parsertool_choice 为 auto 的时候要调用对应的 extract_tool_calls_streaming 去解析函数调用的文本内容。例如 pythonic_tool_parser 会解释 [func_name1(params_name1=params_value1, params_name2=params_value2...), func_name2(params)] 这种类似 Python 的文本内容并转化为响应中的 ToolCall 对象。如果是 llama3.1 的 template 的话,参考上面的格式,会把输出 {"name": function name, "parameters": dictionary of argument name and its value} 转化为 ToolCall 对象。

总结

vLLM 的主体是 LLMEngine,它是一个单例类,负责管理所有的模型和数据。在基于FastAPI的异步Restful API 中使用的是一个 AsyncEngine。在交给Engine处理之前会对一些请求参数进行预处理,比如对话模板的格式化,对话消息的解析,模板中的函数调用等。