ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

前言

在现代数据驱动的世界中,对高效且可扩展的数据存储方案的需求前所未有地强烈。键值数据库因其简洁性和卓越性能而备受推崇。对于那些热爱Rust编程并希望构建自己的键值数据库的开发者来说,这本书将是您的理想起点。笔者将引导您一步一步,从零开始,使用Rust设计和实现一个键值数据库。

数据库领域既神秘又充满魅力。正如俗话所说,不亲手实践,就无法深刻理解。通过构建一个键值数据库,我们不仅能深入掌握这类数据库的设计哲学和实现细节,还能借此机会深化对Rust语言的理解和应用。

本书内容围绕一个基于LSM(Log-Structured Merge-tree)的键值数据库设计和实现展开,参考了LevelDB、RocksDB、PebbleDB、AgateDB和BadgerDB等多个成熟数据库的实现。全书示例代码均使用Rust编写,旨在通过实战演练加深读者对Rust特性的理解,同时探索数据库技术的精髓。

LevelDB相较于其他衍生品,虽然没有一些新的论文和工程实践带来的优化,但保留了最初的设计,其相对简单和完整。其他数据库或多或少都能找到LevelDB的影子。

与《Rust编程语言》一书不同,本书不旨在覆盖Rust语言的所有知识点。我们的目标是实现一个键值数据库,因此读者需要具备一定的系统编程基础,例如文件系统相关的读写调用和指针的使用(不仅在unsafe的情况下,有指针的基础也方便理解引用等概念)。这些知识点如果完全讲解清楚会占据很大的篇幅。本书将解释使用到的Rust语法和特性。即使您没有阅读过《Rust编程语言》,也能跟随本书学习Rust的语法。如果您在阅读本书的Rust语法部分遇到困难,可以参考《Rust编程语言》中的相关章节。笔者也是将《Rust编程语言》作为一本参考书反复阅读,并不需要一次性完全读完,书都是常看常新的。

Rust的设计初衷是确保内存安全,避免常见的程序错误,如空指针解引用。其显著特点包括独特的所有权系统、零成本抽象、可靠的错误处理机制以及完善的工具链,使其在系统编程领域尤为突出。

尽管LevelDB是用C++编写的,Rust在某些方面被视为C++的现代替代品。对于那些对C++有深入了解的开发者而言,转向学习Rust应该会相对轻松。然而,Rust提供了与C++不同的编程范式。例如,迭代器和闭包是Rust标准库和语法的一部分,作为语言的核心部分,引入了多种语法糖来支持这些功能,这可能会让熟悉Python的开发者感到亲切。

经典的内存错误包括使用已释放内存的指针、向量长度被修改但另一个引用仍保留原长度信息导致访问不确定内存地址等。Rust的所有权系统在编译阶段就能避免这些问题。

Rust借鉴了函数式编程的多种技巧,为开发者提供了一种既熟悉又新颖的编程体验,例如模式匹配、迭代器、闭包、泛型等。这些特性使得Rust在编写高效、安全和易维护的代码方面具有独特优势。

与有GC的语言相比,Rust可能让使用者感到不适应,许多在其他语言中理所当然的写法在Rust中行不通。Rust对指针的可变性有明确限制,并且只允许存在一个可变引用。这种所有权的检查使编译器变得非常严格,在一定程度上增加了编程的复杂性。然而,这也是Rust的优势之一,它能够在编译阶段发现许多潜在的错误。

相比暴露指针的语言,Rust对内存的解引用有严格的检查。尽管所有权系统有时会让代码显得冗长,但Rust提供了许多有趣的语法和特性,如模式匹配、错误处理宏、默认返回末端表达式等,使得编写Rust代码既轻便又高效。

Rust的性能非常出色,部分Linux内核驱动和Windows安全模块已经采用Rust实现。AWS也在许多地方使用Rust,飞书客户端的一部分代码也使用了Rust,这在一定程度上证明了Rust在系统编程领域的优异表现。如果需要选择一种新语言开发消息队列、数据库、文件系统等软件,Rust是一个非常不错的选择。这也是本书使用Rust实现的原因之一,以展示Rust在这些方面的优势。

在错误处理方面,Rust采用?问号宏简化了传统的错误处理流程,相比Go语言中显式处理错误的if err != nil {}模式是一种进步。这种简洁的错误处理方式不仅提高了代码的可读性,也加速了开发过程。

这些特色贯穿本书始终,在随后的章节中,您将探索到Rust的更多有趣特性。

笔者是一名Rust初学者,里面的很多实现可能存在不正确的写法,欢迎指正。

本书面向的读者

  • 希望通过具体项目深入学习Rust,特别是在键值数据库方面的开发者。
  • 对键值数据库的设计和实现感兴趣的初学者,希望通过实践学习相关内容。
  • 想了解LevelDB架构和实现细节的读者,可以选择性地跳过实现部分进行阅读。

如何阅读和使用代码

本书的第一章主要是讲基础概念例如一些基础的数据结构。第二章开始分部分讲解实现的细节。对于
对Rust比较熟悉的读者可以跳过第一章。

第一章Rust

完整讲解Rust的内容将要消耗大量篇幅,也不是本书的目的。本章节主要介绍在实现过程中会会涉及的一些的语法和特性,让读者在阅读代码的时候没有过多障碍。

数据类型

Rust 是一种静态类型的编程语言,其数据类型可以分为两大类:原始类型(Primitive Types)和复合类型(Compound Types)。以下是 Rust 中常见的数据类型:

原始类型(Primitive Types)

  • 整数类型(Integer Types):表示整数。有符号整数包括 i8i16i32i64i128,无符号整数包括 u8u16u32u64u128

    1
    2
    let signed_integer: i32 = -42;
    let unsigned_integer: u64 = 42;
  • 浮点数类型(Floating-Point Types):表示小数。Rust 有两个浮点数类型:f32f64

    1
    2
    let float32: f32 = 3.14;
    let float64: f64 = 3.14;
  • 布尔类型(Boolean Type):表示逻辑值,只有两个可能的值:truefalse

    1
    2
    let is_true: bool = true;
    let is_false: bool = false;
  • 字符类型(Character Type):表示单个字符。字符类型使用单引号 '

    1
    2
    let char_a: char = 'a';
    let char_heart: char = '❤';

复合类型(Compound Types)

  • 数组类型(Array Type):表示固定大小的数组。数组中的所有元素必须拥有相同的数据类型。

    1
    let array: [i32; 5] = [1, 2, 3, 4, 5];
  • 元组类型(Tuple Type):表示具有不同数据类型的有序集合。元组的长度是固定的。

    1
    let tuple: (i32, f64, char) = (42, 3.14, 'a');
  • 切片类型(Slice Type):表示对数组或其他集合的引用,但没有固定大小。切片是一种动态大小的视图。

    1
    2
    let array: [i32; 5] = [1, 2, 3, 4, 5];
    let slice: &[i32] = &array[1..4];
  • 字符串类型(String Type):表示动态可变的文本字符串。它由 String 类型表示。

    1
    let my_string: String = String::from("Hello, Rust!");
  • 引用类型(Reference Type):表示对值的引用。引用在 Rust 中被广泛用于实现借用和所有权系统。

    1
    2
    let original_value: i32 = 42;
    let reference: &i32 = &original_value;

这些数据类型提供了灵活性和安全性,通过所有权、借用和生命周期等概念,Rust 的类型系统确保了内存安全和线程安全。在编写 Rust 代码时,正确使用这些数据类型有助于减少运行时错误并提高代码的可维护性。

基本语法

let用于声明变量。

1
let x = 1; // 声明x并赋值为1。

可以使用:显式指定变量类型:

1
let x: i32 = 1;

_表示“存在但不关心”的变量,用于有意忽略某些处理:

1
2
3
4
// 赋值给一个不需要使用的变量
let _ = 1;
// 忽略函数的返回值
let _ = get_thing();

_开头的变量表示暂时忽略以避免编译检查,适合在开发过程中使用:

1
let _x = 1;

let可以“覆盖”变量,使之前相同名称的变量失效,且变量类型可以不同:

1
2
3
let x = 1;
let x = 1 + 2;
let x = "str";

Rust也有元组,相当于固定长度的“容器”可以容纳不同的类型,元组可以指定类型。

1
2
3
let pair : (char, i32) = ('a', 17);
pair.0;
pair.1;

元组适用于解构,下面的代码中some_char'a'some_int是17。结构也可以使用_忽略全部或者其中一部分。解构也适用于函数的返回值。

1
2
3
4
let (some_char, some_int) = ('a', 17);
let (_, some_int) = ('a', 17);
let (_, _) = ('a', 17);
let (left, right) = slice.split_at(middle);

{}可以划分作用域,如果使用之前的覆盖规则,可以在内部作用域覆盖外部作用域的变量。

1
2
3
4
5
6
7
8
9
10
11
fn main() {
let x = "out";
{
// x = "in" 覆盖了外面的"out"
let x = "in";
// 这里会打印"in"
println!("{}", x);
}
// 这里会打印"out"
println!("{}", x);
}

在Rust中,语块也是表达式。

1
2
3
let x = 42;

let x = { 42 };

语块可以包含多个语句,最后一个不以分号结尾的语句是这个语块的值,否则默认等于()

1
2
3
4
5
let x = {
let y = 1;
let z = 2;
y + z
};

函数中也有类似的写法。

1
2
3
4
5
6
7
fn fair_dice_roll() -> i32 {
return 4;
}

fn fair_dice_roll() -> i32 {
4
}

if语句也是表达式。

1
2
3
4
5
6
7
fn fair_dice_roll() -> i32 {
if feeling_lucky {
6
} else {
4
}
}

match语句也是表达式。

1
2
3
4
5
6
fn fair_dice_roll() -> i32 {
match feeling_lucky {
true => 6,
false => 4,
}
}

结构体是用 struct 关键字声明的:

1
2
3
4
struct Vec2 {
x: f64, // 64位浮点数,即 "double precision"
y: f64,
}

它们可以使用结构体字面量初始化,顺序不重要,只有名称重要。:

1
2
let v1 = Vec2 { x: 1.0, y: 3.0 };
let v2 = Vec2 { y: 2.0, x: 4.0 };

还有一种用于从另一个结构体初始化剩余字段的快捷方式,这称为“结构体更新语法”,只能出现在最后位置,并且不能以逗号结束:

1
2
3
4
let v3 = Vec2 {
x: 14.0,
..v2
};

剩余字段也可以是所有字段,这样就可以复制整个结构体,而不是改变所有权:

1
let v4 = Vec2 { ..v3 };

结构体,像元组一样,可以被解构:

1
2
3
let v = Vec2 { x: 3.0, y: 6.0 };
let Vec2 { x, y } = v;
// `x` 现在是 3.0,`y` 现在是 6.0

下面这种形式可以通过..v.y会被忽略掉:

1
let Vec2 { x, .. } = v;

let模式可以用作if中的条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct Number {
odd: bool,
value: i32,
}

fn main() {
let one = Number { odd: true, value: 1 };
let two = Number { odd: false, value: 2 };
print_number(one);
print_number(two);
}

fn print_number(n: Number) {
if let Number { odd: true, value } = n {
println!("Odd number: {}", value);
} else if let Number { odd: false, value } = n {
println!("Even number: {}", value);
}
}

match 也是一种模式匹配,就像 if let 一样:

1
2
3
4
5
6
fn print_number(n: Number) {
match n {
Number { odd: true, value } => println!("Odd number: {}", value),
Number { odd: false, value } => println!("Even number: {}", value),
}
}

match 必须是穷尽的:至少有一个分支需要匹配。

1
2
3
4
5
6
7
8
fn print_number(n: Number) {
match n {
Number { value: 1, .. } => println!("One"),
Number { value: 2, .. } => println!("Two"),
Number { value, .. } => println!("{}", value),
// 如果最后一个分支不存在,我们会得到一个编译时错误
}
}

如果穷尽匹配很难满足,可以使用 _ 作为 “通配符” 模式:

1
2
3
4
5
6
7
fn print_number(n: Number) {
match n.value {
1 => println!("One"),
2 => println!("Two"),
_ => println!("{}", n.value),
}
}

可以给类型声明方法。

1
2
3
4
5
6
7
8
9
10
struct Number {
odd: bool,
value: i32,
}

impl Number {
fn is_strictly_positive(self) -> bool {
self.value > 0
}
}

变量默认不可以改变。

1
2
3
4
5
6
7
8
fn main() {
let n = Number {
odd: true,
value: 17,
};
n.odd = false; // 错误:不能对 `n.odd` 赋值,
// 因为 `n` 没有被声明为可变的
}

不能被重新赋值

1
2
3
4
5
6
7
8
9
10
fn main() {
let n = Number {
odd: true,
value: 17,
};
n = Number {
odd: false,
value: 22,
}; // 错误:不能对不可变变量 `n` 重新赋值
}

可以使用mut关键字来声明可变变量。

1
2
3
4
5
6
7
fn main() {
let mut n = Number {
odd: true,
value: 17,
};
n.odd = false; // 没问题:`n` 是可变的
}

特征(Traits)在Rust中实现了类似其他语言中的多态功能:

特征定义了一组可以由多种类型共享的行为契约。其定义如下:

1
2
3
trait Signed {
fn is_strictly_negative(self) -> bool;
}

任何满足这些条件的类型都可以实现这个特征:

1
2
3
4
5
impl Signed for Number {
fn is_strictly_negative(self) -> bool {
self.value < 0
}
}

这样,Number 类型就拥有了 is_strictly_negative 方法。特征也可以包含默认实现:

1
2
3
4
5
6
7
trait Signed {
fn is_strictly_negative(self) -> bool {
self.value() < 0
}

fn value(&self) -> i32;
}

然后,类型只需要提供那些没有默认实现的方法:

1
2
3
4
5
impl Signed for Number {
fn value(&self) -> i32 {
self.value
}
}

Rust 的一个核心特征是 Drop,它允许你定义当值离开作用域时应该发生的事情:

1
2
3
4
5
impl Drop for Number {
fn drop(&mut self) {
println!("Dropping {}", self.value);
}
}

Number 实例离开作用域时,Rust 会自动调用 drop 方法。

枚举(Enums)允许你定义一个类型,该类型可以是多个不同变体中的一个。这对于值可以有多种但数量有限的类型特别有用:

1
2
3
4
5
6
7
enum WebEvent {
PageLoad,
PageUnload,
KeyPress(char),
Paste(String),
Click { x: i64, y: i64 },
}

与结构体一样,枚举的每个变体可以包含不同类型和数量的数据。你可以使用 match 表达式来操作枚举值:

1
2
3
4
5
6
7
8
9
fn inspect(event: WebEvent) {
match event {
WebEvent::PageLoad => println!("page loaded"),
WebEvent::PageUnload => println!("page unloaded"),
WebEvent::KeyPress(c) => println!("pressed '{}'", c),
WebEvent::Paste(s) => println!("pasted \"{}\"", s),
WebEvent::Click { x, y } => println!("clicked at x={}, y={}", x, y),
}
}

枚举也可以有方法:

1
2
3
4
5
6
7
8
9
10
11
impl WebEvent {
fn describe(&self) -> String {
match self {
WebEvent::PageLoad => String::from("page loaded"),
WebEvent::PageUnload => String::from("page unloaded"),
WebEvent::KeyPress(c) => format!("pressed '{}'", c),
WebEvent::Paste(s) => format!("pasted \"{}\"", s),
WebEvent::Click { x, y } => format!("clicked at x={}, y={}", x, y),
}
}
}

没有返回值的空函数:

1
2
3
fn greet() {
println!("Hi there!");
}

右箭头表示返回值类型:

1
2
3
fn foo() -> i32 {
1
}

模块管理

在Rust中,模块是用于组织代码、控制可见性以及支持代码重用的重要概念。Rust的模块系统是基于文件和目录组织的,这使得代码的组织变得清晰而灵活。下面是Rust模块管理的一些关键概念:

模块定义

模块通过mod关键字进行定义,可以在一个Rust文件中定义一个模块。例如:

1
2
3
4
// 在文件 mod_example.rs 中定义了一个模块
mod example {
// 模块的内容
}

模块路径

模块路径用于指定模块的位置。Rust使用::来表示模块路径。例如:mod_example::example

Rust的模块系统与文件系统有很强的映射关系。一个模块可以对应于一个文件,也可以对应于一个目录,包含多个文件。这使得项目的文件和目录结构能够与代码组织一致。

pub关键字

在Rust中,使用pub关键字来标识模块、结构体、枚举、函数等的可见性。只有被标记为pub的项才可以在其他模块中被访问。

1
2
3
4
5
6
7
8
// 在 example 模块中声明了一个公共的结构体
mod example {
pub struct MyStruct {
// 结构体的字段
}
}
// 在其他模块中使用 example 模块中的 MyStruct
use example::MyStruct;

mod.rs文件

文件本身是可以被作为模块引用的,这样可以更好地组织代码。
如果一个模块的内容比较复杂,可以在模块所在的目录中创建一个mod.rs文件,作为模块的“命名空间”,用于存放模块的具体实现。例如:

1
2
3
// 在 example 目录中创建 mod.rs 文件
// example/mod.rs
pub mod sub_module;

使用方式:

1
2
// 在其他模块中引用 example 模块
use example::sub_module;

这个目录用于存放模块的具体实现。这有助于清晰地分离模块的定义和实现。

cratesuper

crate关键字用于表示当前crate的根模块,而super关键字用于表示当前模块的父模块。

1
2
3
4
5
6
7
8
9
10
11
12
// 在 crate 根模块中
mod my_module {
// 在 my_module 中
mod sub_module {
// 在 sub_module 中,使用 super 表示 my_module
use super::super::my_function; // 调用父模块的函数
}
}

fn my_function() {
// 函数实现
}

模块的可见性规则

默认情况下,模块和其中的项对外部是不可见的。可以通过pub关键字调整可见性。Rust的模块系统强调了显式性,即除非明确指定为pub,否则默认情况下所有项都是私有的。

pub 有多种用法,包括:

  • pub:在默认情况下,Rust 中的项是私有的,只能在定义它们的模块中访问。使用 pub 关键字可以将项声明为公共的,使其在整个 crate 中都可见。
1
2
3
4
5
6
7
pub struct MyStruct {
pub field: i32,
}

pub fn my_function() {
// 函数实现
}

在这个例子中,MyStructmy_function 都被声明为公共的,可以在 crate 的任何地方访问。

  • pub(crate):限制了项的可见性仅在当前 crate 中。这使得项在 crate 外部是不可见的,但在同一个 crate 内的所有模块都可以访问。
1
2
3
4
5
6
7
pub(crate) struct InternalStruct {
// ...
}

pub(crate) fn internal_function() {
// 函数实现
}

在这个例子中,InternalStructinternal_function 只能在定义它们的 crate 中的任何模块中访问。

  • pub(super):限制了项的可见性仅在其父模块(即包含该项的模块)和其父模块的子模块中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
mod parent_module {
pub(super) struct SuperStruct {
// ...
}

pub fn super_function() {
// 函数实现
}

mod child_module {
fn inner_function() {
// 在子模块中可以访问 SuperStruct 和 super_function
let my_struct = SuperStruct { /* ... */ };
super_function();
}
}
}

在这个例子中,SuperStructsuper_functionparent_module 中可见,但在 crate 中的其他模块不可见。

  • pub(self):限制了项的可见性仅在当前模块中。这使得项对于同一模块中的其他模块是不可见的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
mod my_module {
pub(self) struct ModuleStruct {
// ...
}

pub(self) fn module_function() {
// 函数实现
}

mod submodule {
fn inner_function() {
// 在子模块中不能访问 ModuleStruct 和 module_function
// 这两个项对于同一模块中的其他模块是不可见的
}
}
}

在这个例子中,ModuleStructmodule_function 只能在 my_module 中的任何模块中访问。

这些可见性修饰符允许 Rust 程序员精确地控制项的可见性,从而确保代码结构的封装和安全性。

use指令

use 指令可用于将其他命名空间的名称 “引入作用域”:

1
2
3
use std::cmp::min;

let least = min(7, 1); // 1

也可以用紧凑的写法:

1
2
3
4
5
6
7
// 格子单独引入
use std::cmp::min;
use std::cmp::max;
// 从cmp分开
use std::cmp::{min, max};
// 从std分开也可以
use std::{cmp::min, cmp::max};

*可以通配引入:

1
use std::cmp::*;

Rust的模块系统是一个强大的组织和抽象工具,支持创建清晰、可维护、可重用的代码结构。了解和熟练使用模块系统有助于提高代码的可读性和可维护性。

错误类型和可选项

ResultOption 是 Rust 中用于错误处理和可选值的两个重要枚举类型。它们在处理不同类型的情况时非常有用。

Option 枚举类型

Option 类型用于表示一个值可能存在(Some)或不存在(None)。它通常用于返回一个可能为空的值。

1
2
3
4
enum Option<T> {
Some(T),
None,
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
fn find_element(vec: &Vec<i32>, index: usize) -> Option<i32> {
if index < vec.len() {
Some(vec[index])
} else {
None
}
}

let numbers = vec![1, 2, 3];
match find_element(&numbers, 1) {
Some(value) => println!("Found: {}", value),
None => println!("Not found"),
}

Result 枚举类型

Result 类型用于表示一个操作可能成功(Ok)或失败(Err)。它通常用于返回一个可能会出错的操作结果。

1
2
3
4
enum Result<T, E> {
Ok(T),
Err(E),
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
fn divide(a: i32, b: i32) -> Result<i32, String> {
if b == 0 {
Err(String::from("Division by zero"))
} else {
Ok(a / b)
}
}

match divide(4, 2) {
Ok(result) => println!("Result: {}", result),
Err(e) => println!("Error: {}", e),
}

ResultOption 的关系

  • Option 用于表示一个值可能存在或不存在,而不涉及错误信息。
  • Result 用于表示一个操作可能成功或失败,并且可以携带错误信息。

在某些情况下,可以将 Option 转换为 Result,例如在需要提供错误信息时:

1
2
3
4
5
6
fn find_element(vec: &Vec<i32>, index: usize) -> Result<i32, String> {
match vec.get(index) {
Some(&value) => Ok(value),
None => Err(String::from("Index out of bounds")),
}
}

通过这种方式,可以更灵活地处理错误和可选值。

自定义错误类型

在 Rust 中,通常建议使用自定义的错误类型来更好地表达错误信息。可以通过枚举或结构体来定义自己的错误类型,并实现 std::fmt::Debugstd::fmt::Display trait 来提供可读的错误信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#[derive(Debug)]
enum MyError {
DivisionByZero,
CustomError(String),
}

impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MyError::DivisionByZero => write!(f, "Cannot divide by zero"),
MyError::CustomError(msg) => write!(f, "Custom error: {}", msg),
}
}
}

fn divide(a: f64, b: f64) -> Result<f64, MyError> {
if b == 0.0 {
Err(MyError::DivisionByZero)
} else {
Ok(a / b)
}
}

fn main() {
match divide(10.0, 0.0) {
Ok(result) => println!("Result: {}", result),
Err(error) => println!("Error: {}", error),
}
}

使用 ? 操作符

Rust 中的 ? 操作符可以用于快速地将 ResultOption 的值传递给包含错误处理的函数。它简化了错误传播的代码。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fn operation1() -> Result<i32, &'static str> {
// ...
Ok(42)
}

fn operation2() -> Result<i32, &'static str> {
// ...
Ok(10)
}

fn main() -> Result<(), &'static str> {
operation1()?;
operation2()?;
Ok(())
}

错误处理 thiserror 和 anyhow

错误处理
anyhow提供了统一管理error的方式,任何error都可以存储在anyhow中。
thiserror提供了方便我们定义error的宏。

我们的Result类型都是anyhow::Result并且通过thiserror的宏来自定义错误。

单元测试

通过配置宏 #[cfg(test)],我们可以指定某个模块为测试模块,并且可以为模块内的函数配置 #[test] 以指定某个函数为测试实例。在 Rust 中,习惯性地会创建一个与模块同级的名为 tests 的模块,然后在该模块中编写测试函数。这些测试代码一般位于与源代码相同的文件中(也可以分成独立的文件编写)。下面的示例代码演示了如何简单验证两个数的相加。在本书中,测试代码按照类似的格式提供,旨在验证实现的正确性,并一定程度上提供对应函数或方法的使用示例。assert_eq! 是一个宏函数,用来断言相等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 源代码模块
mod my_module {
pub fn add(a: i32, b: i32) -> i32 {
a + b
}
}

// 测试模块
#[cfg(test)]
mod tests {
use super::my_module;

// 测试函数
#[test]
fn test_add() {
assert_eq!(my_module::add(2, 3), 5);
}
}

本书的代码按章节模块化组织,每个章节都可以独立运行。使用 cargo test 命令,可以按章节前缀运行对应的单元测试(例如 cargo test ch1),也可以执行具体的测试函数(例如 cargo test ch1::skiplist::tests::it_works)。章节内容循序渐进,每章结束时会引入前面章节的模块。这种组织方式使读者能够逐步学习和理解每个模块的工作原理。每个章节相对独立,读者也可以跳跃式阅读。

读者可以根据需要修改每个独立模块,尝试理解其工作原理,或在自己实现过程中参考这些模块。这种结构旨在提供灵活性,使读者能够自由地使用和探索本书的代码。

所有权和引用

所有权是由编译器检查的,因此检查会非常严格。在Rust中,浅拷贝会移交所有权(move),而深拷贝(Copy)则会复制对象,从而避免所有权冲突。如果一个对象实现了Copy trait,也可以进行复制,不会与所有权产生冲突。在Rust中,只有copy和move两种操作。

引用不会获得所有权,因此也没有权利调用Drop。创建引用在Rust中被称为借用,因此借用和引用有时会混用。借用被视为一个动词,而引用被视为一个名词。如果希望在不产生Copy的情况下修改一个对象,可以使用可变引用。

Rust规定一个对象只能有一个可变引用,且不能同时存在其他的可变或不可变引用。

笔者推荐更详细的内容可以阅Rust Book
Rust nomicon,这两本书都是比较全面且标准的Rust教程。

在本书的数据库实现中,我们主要使用字节向量和字节切片,分别用 Vec<u8>&[u8] 表示,代表具有所有权的字节块和对字节块的借用。由于所有权的关系,如果我们只需要读取数据,会使用借用;如果需要保存写入的数据,则会使用具有所有权的对象 Vec<u8>&mut [u8] 是可修改的借用,实际上也具有所有权,当我们需要修改连续内存的一部分时,可以使用这种类型。

如果我们不需要所有权,as_refas_mut 可以为我们提供相应的引用。

Rust约定的迭代器类型如下,注意 IntoIter 有些不同,如果是一个切片的 IntoIter,返回的仍然是引用。

1
2
3
IntoIter - T
IterMut - &mut T
Iter - &T

as_derefas_deref_mut 可以帮助我们自动解多层引用。

避免盲目使用 .clone() 满足借用规则

借用检查器确保 Rust 用户在开发中不会产生不安全的代码。具体而言,它防止了两种情况:首先,只允许存在一个可变引用;其次,允许存在多个引用,但全部都是不可变引用。如果编写的代码不符合这些条件,当开发人员通过克隆变量来解决编译器错误时,就可能陷入这种反模式。

对于初学者而言,使用 .clone() 来解决借用检查器引起的混乱问题是很诱人的。然而,这样做会带来严重的后果。使用 .clone() 会导致数据的复制,两者之间的任何更改都不会同步,就像存在两个完全独立的变量一样。

有一些特殊情况,例如 Rc<T> 被设计成可以智能处理克隆。它在内部管理数据的精确一份拷贝,克隆它将只克隆引用。

还有 Arc<T>,它提供对在堆上分配的类型为 T 的值的共享所有权。在 Arc 上调用 .clone() 会产生一个新的 Arc 实例,它指向堆上与源 Arc 相同的分配,同时增加引用计数。

总的来说,克隆应该是经过深思熟虑的,要充分了解后果。如果使用克隆来消除借用检查器错误,这是可能正在使用这种反模式的一个很好的指示。

即使 .clone() 是一个糟糕模式的指示,有时写效率低下的代码也是可以接受的,比如:

  • 开发者仍然是个新手
  • 代码没有很大的速度或内存约束(比如黑客马拉松项目或原型)
  • 满足借用检查器真的很复杂,而你更愿意优化可读性而不是性能

如果怀疑存在不必要的克隆,应该充分了解《Rust Book》关于所有权的章节,然后评估是否需要这个克隆。

同时,务必在项目中始终运行 cargo clippy,这个 lint 工具会帮你检查一些不必要的 clone

函数借用参数的选择

函数参数会用到大量的借用,因为借用不会产生拷贝,但在使用借用的时候尽量使用直接的借用类型 &str&[T]&T 而不是 &String&Vec<T>&Box<T>。在作为参数的时候,后面的拥有所有权的类型(智能指针)可以自动转换成前面的类型,而反过来则不可以。例如,下面这个函数如果把参数改成 &String 是无法编译的。原因是直接的引用类型需要再分配一个对应的所有权类型才能和所有权类型的引用对齐,但是反过来进行一次解引用就可以获得直接引用。比如 a = Box<T>,其实相当于 &(*a),编译器自动进行了转化先解引用对应的直接类型然后直接引用。反过来的话 a = &T,那就得 &Box::new(*a),需要多创建这个 Box 对象,编译器就直接拒绝了。

1
2
3
4
5
6
7
8
9
fn demo(word: &str) {
}

fn main() {
let ferris = "Ferris";
let curious = "Curious".to_string();
demo(ferris);
demo(&curious);
}

不可变引用和 Rc

不可变引用的生命周期必须小于所有权的生命周期,但Rc不要求,只要最后一个引用离开生命周期则回收。

临时的可变性

可以将可修改对象重新赋值让可变性消失。这样做有一个好处就是如果你明确在之后不想修改该对象,而又人为(可能被合作者,或者两个月后的自己)错误地修改了,编译器就会帮你检查出来。我觉得更多的是在人的“阅读期”直观地明确代码的可变性。

1
2
3
4
5
6
let data = {
let mut data = get_vec();
data.sort();
data
};
// data 是可变的。
1
2
3
4
5
let mut data = get_vec();
data.sort();
let data = data;

// data 是不可变的。

协同性

协同性是Rust里面最难的部分了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//fn two_cell_refs<'big: 'small, 'small>(
// // NOTE: these two lines changed
// big: Cell<&'big u32>,
// small: Cell<&'small u32>,
//) {
// assign(big, small);
//}

// 如果让mut reference扩大生命周期就会导致垂悬指针。
// Vec可以是因为Vec是有所有权的,所以不会出现垂悬指针。
fn two_refs<'big: 'small, 'small>(big: Vec<&'big u32>, small: Vec<&'small u32>) {
take_two(big, small);
}
fn take_two<T>(_val1: T, _val2: T) {}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let skl = SkipList { head: None };
}
}

NonNull本质是一个*const T,从而使得NonNull可以与T协变,通过强制转换的方式让这个指针是可修改的。这是标准库中常用的一个对象,目的是让Vec这样的类型使用起来与T具有协变性。

1
2
3
pub struct NonNull<T> {
pointer: *const T,
}

具体的解释可以参考这里,目前笔者也没有完全理解这个概念。

Subtyping is the idea that one type can be used in place of another.

范型

Rust 中的泛型是一种强大的特性,它允许你编写适用于多种数据类型的代码,同时保持类型安全。通过泛型,可以编写更加灵活、抽象和可重用的代码,同时保持 Rust 的内存安全和零成本抽象。

以下是 Rust 中泛型的一些关键概念和用法:

泛型函数

在 Rust 中,你可以编写泛型函数,使其适用于多种类型。示例:

1
2
3
4
5
6
7
8
9
fn print_twice<T>(value: T) {
println!("{:?}", value);
println!("{:?}", value);
}

fn main() {
print_twice("Hello, Rust!");
print_twice(42);
}

在这个例子中,print_twice 是一个泛型函数,可以接受任意类型的参数,并执行相同的打印操作。

泛型结构体

可以为结构体定义泛型类型参数,以实现对不同类型的结构体的抽象。示例:

1
2
3
4
5
6
7
8
9
struct Point<T> {
x: T,
y: T,
}

fn main() {
let int_point = Point { x: 1, y: 2 };
let float_point = Point { x: 1.5, y: 2.5 };
}

在这个例子中,Point 结构体可以用于包含任何相同类型的坐标点。

泛型枚举

枚举也可以包含泛型类型参数,以增加其灵活性。示例:

1
2
3
4
5
6
7
8
9
enum Result<T, E> {
Ok(T),
Err(E),
}

fn main() {
let success: Result<i32, &str> = Result::Ok(42);
let failure: Result<i32, &str> = Result::Err("Error message");
}

在这个例子中,Result 枚举表示可能包含成功结果(Ok)或错误信息(Err),并分别包含了两个泛型类型参数。

泛型实现

可以对泛型类型实现 trait,以为多种类型提供相同的行为。示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
trait Printable {
fn print(&self);
}

impl<T: std::fmt::Debug> Printable for T {
fn print(&self) {
println!("{:?}", self);
}
}

fn main() {
"Hello, Rust!".print();
42.print();
}

在这个例子中,Printable trait 定义了一个 print 方法,然后对所有实现了 Debug trait 的类型实现了这个 trait。

Rust 的泛型提供了强大的抽象能力,帮助编写更加灵活和通用的代码。通过泛型,你能够在不失去类型安全的前提下,减少代码的冗余并提高代码的可维护性。

Read 和 Write

在本书中用的 Trait 比较多的是 Read、Write 和 Seek。

在 Rust 中,Read、Write 和 Seek 是三个与 I/O 操作密切相关的 trait,它们为实现输入和输出操作提供了通用的接口。这三个 trait 可以用于文件、网络套接字等不同的数据源和目标。

Read Trait
Read trait 定义了用于从数据源读取字节的方法。它主要包含一个方法:

1
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error>;

这个方法从实现 Read trait 的类型中读取字节,并将它们存储到提供的缓冲区 buf 中。方法返回一个 Result,其中 Ok(n) 表示成功读取了 n 个字节,Err 表示发生了错误。

Write Trait
Write trait 定义了用于将字节写入数据目标的方法。它主要包含一个方法:

1
fn write(&mut self, buf: &[u8]) -> Result<usize, Error>;

这个方法将提供的缓冲区 buf 中的字节写入到实现 Write trait 的类型中。方法同样返回一个 Result,其中 Ok(n) 表示成功写入了 n 个字节,Err 表示发生了错误。

Seek Trait
Seek trait 定义了用于在数据源中定位和移动读写指针的方法。它包含三个方法:

1
2
3
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error>;
fn stream_len(&mut self) -> Result<u64, Error>;
fn stream_position(&mut self) -> Result<u64, Error>;
  • seek 方法通过给定的 SeekFrom 枚举类型,将读写指针移动到指定位置。
  • stream_len 方法返回数据源的总长度。
  • stream_position 方法返回当前读写指针的位置。

SeekFrom 枚举有以下几种可能的值:

  • SeekFrom::Start(n):将指针设置到数据源的起始位置加上 n
  • SeekFrom::End(n):将指针设置到数据源的末尾位置加上 n
  • SeekFrom::Current(n):将指针从当前位置移动 n 个字节。

这些 trait 为实现了文件、内存缓冲区等不同类型的数据源和目标提供了通用的接口,使得可以方便地使用相同的 I/O 操作代码处理各种类型的输入输出。在标准库中,例如 FileBufReader 都实现了这些 trait,使得对文件和缓冲区的读写变得简单和灵活。

本书中会大量用到这些 Trait,因为 Vec<u8>fs::File 都实现了这个接口。

Iterator

Iterator用于不可变迭代,IntoIterator用于获取所有权并进行迭代,MutIterator用于可变迭代。
在每个示例中,我们都使用了不同的方法进行迭代,并根据需要进行所有权的转移或可变引用的修改。
Iterator的实现,多种iterator的惯例,
Rust要求所有的集合数据类型都要有如下的迭代器,会返回上述方法。

1
2
3
fn iter(&'a self) -> Items<'a>;
fn into_iter(self) -> ItemsMove;
fn iter_mut(&'a mut self) -> ItemsMut<'a>;

Rust 的标准库为集合类型提供了一组通用的迭代方法,这些方法通常以 Iterator trait 的形式提供。这些方法通常分为三类,即标准的迭代方法 trio:

fn iter(&'a self) -> Items<'a>; 用来遍历&T。

这个方法返回一个不可变的迭代器,允许对集合中的元素进行只读的迭代。返回的 Items 类型是一个迭代器对象,其生命周期与集合本身相同,保证迭代器不会在集合被销毁前失效。

fn iter_mut(&'a mut self) -> ItemsMut<'a>; 用来遍历&mut T

这个方法返回一个可变的迭代器,允许对集合中的元素进行修改。返回的 ItemsMut 类型是一个可变迭代器对象,其生命周期与可变引用的生命周期相同,确保迭代器不会在可变引用结束后继续使用。

fn into_iter(self) -> ItemsMove; 用来遍历T,但如果T是一个引用类型其实和iter是类似的。

这个方法获取集合的所有权并返回一个拥有所有权的迭代器。这表示集合本身将不再可用,因为它的所有权已经转移到迭代器上。返回的 ItemsMove 类型是一个拥有所有权的迭代器对象。

为了提供更大的灵活性和符合 Rust 的所有权模型,这些方法还要求集合类型和对集合的(可变)引用都实现了 IntoIterator trait。这个 trait 提供了一个统一的方式,使得集合类型和引用都能够被用于 for 循环等需要迭代的上下文中。

这种设计使得 Rust 中的迭代更为一致和灵活,同时确保了在迭代过程中对所有权和可变性的严格控制。

两级迭代器(TwoLevelIterator)可以使用标准库的flat_map可以把两级迭代器展开成一个大的迭代器,适用于一些多层次迭代器的场景。

1
2
3
4
5
6
7
8
9
fn main() {
let data = vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]];

let flat_iter = data.iter().flat_map(|inner| inner.iter());

for &num in flat_iter {
println!("{}", num);
}
}

归并迭代器可以实现两个有序迭代器的归并排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
struct MergeSortIterator<L, R>
where
L: Iterator,
R: Iterator<Item = L::Item>,
{
left: L,
right: R,
left_value: Option<L::Item>,
right_value: Option<R::Item>,
}

impl<L, R> MergeSortIterator<L, R>
where
L: Iterator,
R: Iterator<Item = L::Item>,
{
fn new(left: L, right: R) -> Self {
let mut sorter = MergeSortIterator {
left,
right,
left_value: None,
right_value: None,
};

sorter.fetch_values();
sorter
}

fn fetch_values(&mut self) {
self.left_value = self.left.next();
self.right_value = self.right.next();
}
}

impl<L, R> Iterator for MergeSortIterator<L, R>
where
L: Iterator,
R: Iterator<Item = L::Item>,
L::Item: Ord,
{
type Item = L::Item;

fn next(&mut self) -> Option<Self::Item> {
match (self.left_value.take(), self.right_value.take()) {
(Some(left), Some(right)) => {
if left <= right {
self.left_value = self.left.next();
self.right_value = Some(right);
Some(left)
} else {
self.left_value = Some(left);
self.right_value = self.right.next();
Some(right)
}
}
(Some(left), None) => {
self.left_value = self.left.next();
Some(left)
}
(None, Some(right)) => {
self.right_value = self.right.next();
Some(right)
}
(None, None) => None,
}
}
}



fn main() {
let iter1 = vec![1, 3, 5, 7, 9].into_iter();
let iter2 = vec![10,11, 12, 13, 14].into_iter();

let merge_iter = MergeSortIterator::new(iter1, iter2);

for item in merge_iter {
println!("{}", item);
}
}

幽灵数据

如果SkipIterator使用&'a Link很奇怪,因为Link本身就是个Rc
在引用为0的时候回收,不需要生命周期的标记,但如果不用的话编译器会报错。
我们希望持有SkipNode的引用,生命周期应该和SkipNode一致,所以引入
幽灵数据,标记我们逻辑上关联的对象,因为结构体中没法引用这个类型。

1
2
3
4
5
6
7
8
9
10
11
12
struct SkipIterator<'a> {  
head: Link,
marker: PhantomData<&'a SkipNode>,
}

impl<'a> Iterator for SkipIterator<'a> {
type Item = &'a SkipNode;
fn next(&mut self) -> Option<Self::Item> {
None
}

}

性能调优

TODO

perf profile

tracing

第二章理解键值数据库

键值数据库的介绍

键值数据库(Key-Value Database)在NoSQL(非关系型数据库)范畴中占据重要地位,其采用简洁的键值的结构对数据对象进行存储和检索。
每个数据项由键(key)和关联的值(value)组成,类似于字典或哈希表的数据模型,其中键是唯一标识符,而值则是与之关联的数据。这种简单的键值对模型使得键值数据库在存储和检索简单数据时表现出色。

许多键值数据库支持分布式架构,能够在多个节点上存储数据,以提高性能和可靠性。亚马逊的DynamoDB是一个典型的例子,于1999年提出并成为高度可扩展的键值数据库系统,满足了亚马逊的分布式存储需求,其思想和设计对后来的键值数据库系统产生了深远的影响。

Redis是另一备受欢迎的键值数据库,由Salvatore Sanfilippo于2009年创建。它是一种开源的内存中数据结构存储系统,支持多种数据结构,包括字符串、哈希表和列表,因而成为广泛使用的键值数据库。

LevelDB是由Google开发的高性能键值数据库,采用LSM树(Log-Structured Merge Tree)的结构。在2012年,RocksDB发布,进一步优化了性能和存储效率,受到了广泛的应用。

键值数据库以其快速的读写性能而著称,尤其在需要快速检索特定键的情境下表现出众。它们通常具备横向可扩展性,能够通过添加更多节点来处理更大的负载。其对值的数据结构没有强制规定,因此可以灵活存储各种类型的数据。本书聚焦于单机键值数据库,不会涉及一些集群数据库相关的分布式能力和扩展能力。

LSM

本书将会实现一个类似LevelDB的键值数据库,其核心数据结构是LSM(Log-Structured Merge Tree)。

LSM最早来源于1996年的一篇论文[^1],而被广为人知的契机则是Google的BigTable[^2]论文,其中的文件格式基于LSM。Google开源了类似键值数据库的单机版本,即LevelDB[^3]。传统数据库通常使用B-Tree类的数据结构,它具有许多优点。随着LevelDB的诞生,基于LSM-Tree的数据结构也逐渐进入人们的视野。

LSM是一种用于存储和管理大规模键值对数据的数据结构,它在特定应用场景中非常有效,具有以下优势:

  • 高写入吞吐量:LSM树通过将写入操作追加到顺序写的文件中,并使用内存和磁盘两级存储结构,实现了高效的写入吞吐量。写入操作可以在内存中迅速完成,然后异步地合并到磁盘上的存储文件中。

  • 压缩和合并:LSM树通过定期合并和压缩磁盘上的存储文件,提高了读取性能。这些合并操作使得数据在磁盘上以更为紧凑的形式存储,减少了读取时需要扫描的数据量。

  • 高吞吐读取:LSM树的结构使得范围查询更加高效,因为数据在磁盘上以顺序方式存储。这对于分析型工作负载非常有利。

  • 容错性:由于LSM树的写入操作是追加到预写日志文件中的,这提供了一种容错机制。即使在写入过程中出现故障,系统也可以通过重新应用日志来恢复。

  • 可扩展性:LSM树适用于大规模的分布式存储系统,支持数据的水平扩展。各个节点可以独立地执行写入和合并操作,从而提高了系统的可扩展性。

  • 减少随机I/O:LSM树的追加写入方式减少了磁盘上的随机I/O,有助于提高写入性能。这对于使用磁盘作为主要存储介质的系统尤为重要。

[^1]: 《The Log-Structured Merge-Tree (LSM-Tree)》:这是LSM的原始论文。
[^2]: 《Bigtable: A Distributed Storage System for Structured Data》(作者:Fay Chang等):这是Google的Bigtable论文,该文档介绍了Bigtable如何使用LSM树来管理大规模分布式数据存储。
[^3]: 《LevelDB: A Fast Persistent Key-Value Store》(作者:Jeff Dean, Sanjay Ghemawat):这是Google开发的LevelDB的论文,该数据库使用了LSM树结构。论文提供了对LSM树及其在LevelDB中的应用的深入了解。

HDD和SSD

LevelDB是为了针对HDD的追加写的特性而设计的,有很多优化是基于SSD的。
有一本书关于SSD的《深入浅出SSD》详细阐述了SSD的特性。

第三章构建数据库引擎

基本架构

整个数据库的基本架构下图,一般来说,一个实现LSM键值存储接口所使用的对象是任意字节流。
作为搜索结构,所有数据会有序排列在存储中,常用的操作有插入、更新、获取、遍历和删除等。
为了利用追加写的特性,其中的删除一般是通过插入“墓碑”来代替而不会真正的删除,
而更新则是追加一个键的新版本,所以整个数据库只用到了追加写不使用随机写,充分利用
机械硬盘的追加写性能远远高于随机写的特性。机械硬盘在写之前需要进行磁片上的寻址操作,
这导致随机写相较于顺序写多了很多寻址操作,其之间的性能大致差了100倍。

用到这追加写的文件就是预写日志。可靠的单机数据库需要确保用户调用写入接口返回成功后,即使进程重启(甚至因机器宕机而中断)也不会导致数据丢失。
采用预写式日志是数据库中常见的一种手段,数据会按照先写内存再到日志的顺序进行更新。
由于数据已经持久保存在磁盘上,即使发生异常,内存数据丢失,也能够通过重放预写日志确保数据的完整性。
当前的预写式日志文件会在内存的形式一般叫MemTable。
很多数据库中writer_buffer相关的配置指的就是这个MemTable的大小,因为某种程度上它就是预写日志的内存缓存。

当MemTable达到容量上限(大多数数据库的默认设置为4KB),内存表的内容会被保存在持久化的文件存储中,接着日志文件可以安全删除。
这个表在文件系统上的形式是一个不可更改的搜索结构,一般会用SST(Static Sorted Table),顾名思义就是不可更改的、有序的文件结构。
该文件的不可更改的特性很像Rust变量默认的immutable。内存数据结构需要满足高效的查找和插入,其底层的数据结构一般用SkipList来实现。

数据库会对SSTable进行分层合并,由上层(或上上层)的SSTable合并成新的SSTable,并写入到下一层。
这个过程被称为major compact。因此,层数越小,数据越新,层数越大,数据越久远。

为了限制内存大小,当MemTable达到一定大小后,会转换为不可变内存表。
会作为整个数据库的第0层的SStable,是比较特殊的一层,这个合并过程称为minor compact。LevelDB的由来正是这种分层合并的结构。

当有新的文件产生时需要一个清单文件对这些文件进行记录,一般会使用一个叫MANIFEST的文件保存,用于记录各阶段文件集合信息。
为了更快速的查找,可能还会记录一些附加信息,例如文件大小、最大最小key等。这个文件相当于保存了所有在持久化存储上的SStable的元信息。

对于读操作,需要从内存表、不可变内存表、level-0 SSTable里查找,然后再从level 1中的文件开始查找。

MemTable

MemTable是一个内存中的数据结构,在数据被刷新到SST文件之前保存它们。它相当于一个读写的缓存——新的写总是将数据插入到MemTable中,而读必须在从SST文件读取之前查询MemTable,因为MemTable中的数据是较新的。一旦内存表被填满,它就变成不可变的,并被一个新的内存表所取代。一个后台线程将MemTable的内容刷新到一个SST文件中,之后MemTable就可以被销毁了。MemTable的大小一般是64MB。

SkipList

SkipList(跳表)是一种常用于实现有序存储的数据结构,通常用于构建内存表(MemTable)等应用场景。相比于平衡树等结构,SkipList 不需要复杂的旋转调整来保持平衡,其实现较为简单且易于理解。

SkipList 由一系列节点组成,每个节点包含键值对以及多个层级的指针。节点的高度由一个概率随机决定,这使得 SkipList 在期望上具有 O(log n) 的搜索复杂度。节点结构如下:

1
2
3
4
5
6
struct SkipNode {
key: Vec<u8>,
value: Vec<u8>,
h: usize,
next: Vec<Link>,
}

SkipList的搜索操作是SkipList中的基本操作之一。从头节点开始,逐层向下搜索,如果找到等于目标键的节点,则返回对应值;如果找到大于目标键的节点,就下移一层继续搜索;如果找到小于目标键的节点,就向右移动。这样,通过多层级的指针,可以有效地减少搜索路径。
但是skip list的常数项相对小很多。skip list在空间上也比较节省。一个节点平均只需要1.333个指针(甚至更少),并且不需要存储保持平衡的变量。

图示
对于层级链表,每增加一层链表,节点的搜索路径就会减半,提高搜索效率。以下是一个示例,展示了如何通过层级链表来降低搜索时间复杂度。
对于一个链表来说,搜索的时间复杂度是O(n)的,搜索5需要5次(1,2,3,4,5)。

1
1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8

这个搜索是线性的,但是如果再加一个链表,每隔一个结点取一次,可以节省一半的时间,从最高level的链表开始到小于后置节点时向下一个level搜索,这时5要搜索4次(1,3,5),找到4需要3次(1,3,4)。

1
2
1 - - - - 3 - - - - 5 - - - - 7
1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8

以此类推,再增加一个链表的话,此时5只要搜索2次(1,5),4则没变还是3次(1,3,4)。

1
2
3
1 - - - - - - - - - 5 - - - - - 
1 - - - - 3 - - - - 5 - - - - 7
1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8

这里每个节点的元素只需要存一次,但是每个节点的高度之内都要保存对应指针。

SkipList相较于一些有序数据结构比如平衡树来说不需要做一些旋转的调整来保持树的平衡,节点的高度是基于概率的,如果设置概率为1/2的话可以通俗的理解为“丢硬币,每次正面则这个节点的高度提高一层”,从期望上来说是可以被证明为log(n)的。因为SkipList和链表很接近,相较于平衡树来说手写更容易实现。但不巧的是链表在Rust里面是地狱难度的实现,本章的篇幅因此会比较长。

首先我们通过rand::Rng生成一个u32的随机数,用最低位的连续1的数量作为高度,每次最低位为1时,高度加1,并且随机数右移,模仿丢硬币的过程。

1
2
3
4
5
6
7
8
9
10
11
12
use rand::Rng;
// 1/2的概率生成节点的高度
fn random_height(total: usize) -> usize {
let mut h = 0;
// 生成u32随机数
let mut r = rand::random::<u32>();
while r & 1 == 1 && h < total {
h += 1;
r >>= 1;
}
h
}

首先我们定几个数据结构,Rc是引用计数,节点会被多个前置节点指向,所以我们需要使用Rc包裹我们的节点。
RefCell是一个智能指针,可以将借用的规则推迟到运行时检查,通过borrow_mut方式及时对象没有使用mut声明
也可以获得可修改引用,RefCell会在内部记录可修改引用的唯一。Option用来表示空指针。所以组合起来我们构造了一个Link类型。

1
2
3
use std::cell::RefCell;
use std::rc::Rc;
type Link = Option<Rc<RefCell<SkipNode>>>;

我们围绕的对象都是字节串,所以键值都用Vec来存储,为了突出代码的说明性质不引入过多的复杂性,这里没有使用范型。

1
2
3
4
5
6
const MAX_HEIGHT: usize = 32;

struct SkipList {
head: Link, // head 是一个辅助节点,不存储数据
}

下面两个函数都被包裹在impl SkipList {}中,为了阅读方便进行了省略,创建SkipList的时候会创建一个非None的key和value都为vec![]的占位节点。
从上向下寻找,遇到比自己大的值就下移一层继续寻找,如果遇到比自己小的值就向后移动。
实现链表有两个比较值得一看的文章:

搜索操作是 SkipList 中的基本操作之一。从头节点开始,逐层向下搜索,如果找到等于目标键的节点,则返回对应值;如果找到大于目标键的节点,就下移一层继续搜索;如果找到小于目标键的节点,就向右移动。这样,通过多层级的指针,可以有效地减少搜索路径。
我们的接口参照标准库中collections的形式,insert需要获取key value的所有权,get则返回引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> {
let mut cur = self.head.clone();
for l in (0..MAX_HEIGHT).rev() {
while let Some(next) = cur.clone().unwrap().borrow().next[l].clone() {
if &next.borrow().key[..] > key {
break;
} else if &next.borrow().key[..] < key {
cur = Some(next);
} else {
return Some(next.borrow().value.clone());
}
}
}
None
}

如果是ge的实现,则在level为0时选择大于的值返回。

1
2
3
4
5
6
7
std::cmp::Ordering::Greater => {
// no equal key, just return the first greater key.
if l == 0 {
return Some((next.borrow().key.clone(), next.borrow().value.clone()));
}
break;
}

插入和搜索类似,当找到下一项大于自己或者为None时将自己链接到当前节点之后,如果下一项小于自己则向后移动。插入操作也是 SkipList 中的关键操作。从头节点开始,逐层向下搜索,找到合适的位置插入新节点。为了保持 SkipList 的有序性,需要在每一层级中正确地插入节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
fn insert(&mut self, key: &[u8], value: &[u8]) {
let h = random_height(MAX_HEIGHT);
let new_node = Rc::new(RefCell::new(SkipNode {
key: key.to_vec(),
value: value.to_vec(),
h,
next: vec![None; MAX_HEIGHT],
}));
let mut cur: Link = self.head.clone();
for l in (0..h + 1).rev() {
while let Some(cur_node) = cur.clone() {
let mut cur_node = cur_node.borrow_mut();
match cur_node.next[l].clone() {
Some(next) => {
if &next.borrow().key[..] < key {
// move
cur = Some(next);
} else if &next.borrow().key[..] > key {
cur_node.next[l] = Some(new_node.clone());
new_node.borrow_mut().next[l] = Some(next.clone());
break;
} else {
next.borrow_mut().value = value.to_vec();
break;
}
}
None => {
cur_node.next[l] = Some(new_node.clone());
}
}
}
}
}

链表这类数据结构是Rust中地狱级难度,如果要实现一个通用的链表类的数据结构涉及到协同性(Covariance),标准库中用到了很多unsafe的实现,通过指针的方式简化了代码,例如我们要实现双向链表的话Rc就不适用了,因为指向时成环的。

我们在实现链表的时候是无法依赖编译器帮我们自动释放空间的,因为编译器默认的行为会是一个递归调用Drop。
Box调用Drop以后Node占用的内存被释放,Node中的next的Box就无法被调用Drop了,这在别的语言里面很好实现,只要暂存next就好了。
但是,Box是一个有所有权的指针,显然Rust的所有权系统不允许让自己在“死亡”以后还被别人获取了所有权。

1
2
3
4
5
6
impl Drop for Box<Node> {
fn drop(&mut self) {
self.ptr.drop(); // 先释放指向的对象再释放自己
deallocate(self.ptr);
}
}

所以链表我们要手动实现Drop函数,这里面把box替换出来变成Empty,这样在“死亡”时就可以避免调用next了(毕竟next已经为空了不会有递归调用)。

我们不可以move一个可变引用即使move给了自己,理论上这不算错误,所以需要mem::replace,如果想要实现move就得通过replace的方法。

1
2
3
4
5
6
7
8
9
10
struct Buffer<T> { buf: Vec<T> }

impl<T> Buffer<T> {
fn replace_index(&mut self, i: usize, v: T) -> T {
// error: cannot move out of dereference of `&mut`-pointer
let t = self.buf[i];
self.buf[i] = v;
t
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
impl Drop for List {
fn drop(&mut self) {
let mut cur_link = mem::replace(&mut self.head, Link::Empty);
// `while let` == "do this thing until this pattern doesn't match"
while let Link::More(mut boxed_node) = cur_link {
cur_link = mem::replace(&mut boxed_node.next, Link::Empty);
// boxed_node goes out of scope and gets dropped here;
// but its Node's `next` field has been set to Link::Empty
// so no unbounded recursion occurs.
}
}
}

[并发写入] todo!()

[插入提示] todo!()

预写日志

下面我们展开讲解预习日志的相关知识和实现方式,日志文件切分成了大小为32KB的连续block块,block由连续的log record组成。
预写日志每次追加写都以一个record为单位,record的负载不作具体的规定可以是任意内容。

Record

Record的格式如下,首先是一个4个字节的CRC校验值紧跟着一个两个字节的u16表示长度,然后是一个字节表示类型,最后是实际的负载。

1
2
3
+---------+-----------+-----------+--- ... ---+
|CRC (4B) | Size (2B) | Type (1B) | Payload |
+---------+-----------+-----------+--- ... ---+

如果我们使用u64保存长度是可以存储很大的数据的,RocksDB的一篇数据库键值规模的分析[^4]中提到一般的键是几十个字节,
值是十几KB的量级,u64显得太长了,当大量小对象存在的时候会比较多余,所以u16是比较合适的,但我们希望这个record又是可以扩展保存更长的
数据的,所以如果负载超过限制的32KB的话就会分成多个record存储。
后面会提到批量原子写的相关内容,如果多个key需要同时写入的话这个payload就会比较大了。

[^4]: 《Characterizing, Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook》

负载长度的扩展

我们按照LevelDB的格式使用Type表示记录的连续性,Log Type有4种:FULL = 1、FIRST = 2、MIDDLE = 3、LAST = 4。FULL类型表明该log record包含了完整的用户数据,用户数据可能比较大,超过了当前block的剩余大小,就需要分成几条log record,第一条类型为FIRST,中间的为MIDDLE,最后一条为LAST。也就是:

  • FULL,说明该log record包含一个完整的用户数据;
  • FIRST,说明是log record的第一条用户数据;
  • MIDDLE,说明是log record中间的用户数据;
  • LAST,说明是log record最后的一条用户数据。
    参考LevelDB文档中的例子,考虑到如下序列的用户数据:
  1. A: length 1000
  2. B: length 97270
  3. C: length 8000

A作为FULL类型的record存储在第一个block中; B将被拆分成3条log record,分别存储在第1、2、3个block中,这时block3还剩6byte,将被填充为0; C将作为FULL类型的record存储在block 4中。

由于一条log record长度最短为7(4个字节的CRC加2个字节的Size加1一个字节的Type),如果一个block的剩余空间小于等于6个字节,那么将被填充为空字符串,长度为7的log record是不包括任何用户数据的空记录。

LevelDB将WAL文件按块划分还有一个好处是能够按块进行切分。对于一些类似MapReduce的处理程序来说比较友好,
按照block读取record直到碰到FULL或者FRIST就可以作为一个分片的边界了。

大小端

大小端(Endian)是计算机体系结构中的一个重要概念,用于描述多字节数据在内存中的存储方式。
它分为两种类型:大端序(Big-Endian)和小端序(Little-Endian),它们的区别在于多字节数据的字节排列顺序。

大端序(Big-Endian):在大端序中,最高有效字节(Most Significant Byte,MSB)位于最低内存地址,而最低有效字节(Least Significant Byte,LSB)位于最高内存地址。这意味着数据的各个部分从高位到低位依次存储。举例来说,十进制数值513在大端序下会以两个字节0x02(512)和0x01(1)的顺序存储。

小端序(Little-Endian):相反,小端序中,最低有效字节(LSB)位于最低内存地址,而最高有效字节(MSB)位于最高内存地址。这意味着数据的各个部分从低位到高位依次存储。以相同的例子,十进制数值513在小端序下会以两个字节0x01和0x02的顺序存储。

1
2
3
4
5
6
7
8
// Convert to little-endian bytes
let le_bytes = 513u16.to_le_bytes();
println!("Little-endian bytes: {:?}", le_bytes);
// Output: Little-endian bytes: [1, 2]
// Convert to big-endian bytes
let be_bytes = 513u16.to_be_bytes();
println!("Big-endian bytes: {:?}", be_bytes);
// Output: Big-endian bytes: [2, 1]

这两种字节序的差异可能在跨平台数据交换或者数据解释方面引发问题。例如,当你从一个大端序计算机向一个小端序计算机传输数据时,需要进行字节序转换,以确保数据正确解释。这样的差异在网络通信、文件存储、数据传输等领域都有广泛的应用。

计算机体系结构、操作系统和编程语言通常会规定默认的字节序,但程序员和开发人员需要了解大小端的概念,以确保数据在不同系统之间正确传递和解释。在考虑兼容性的情况下,我们可以选择固定一种大小端的方式。在本书中,我们选择小端编码,这也是X86的CPU的模式。

CRC

CRC(Cyclic Redundancy Check,循环冗余校验)是一种高效的检错码,用于数据的校验。它基于模二除法进行计算,通过计算结果的余数来生成校验值。模二除法相当于是一种不借位的减法,因为0减1在模二运算后仍为1。在二进制领域,模二运算等效于异或操作,因此CRC可以通过位移和异或的方式进行快速计算。

CRC32是一种广泛使用的CRC类型,其余数为32位,对应的除数是33位的多项式。该多项式表示为:X^{32}+X^{26}+X^{23}+X^{22}+X^{16}+X^{12}+X^{11}+X^{10}+X^8+X^7+X^5+X^4+X^2+X^1+X^0 ,用二进制表示为0x4c11db7。在具体实现中,这个多项式的概念并不直接涉及,而是专注于基于二进制的计算过程。CRC32常用于键值数据库,以4字节大小保存校验和。

CRC的数据开头添加0不影响结果,所以会预设置一个余数初始值0XFFFFFFFF,和结果异或值0XFFFFFFFF,这个值在其他CRC类型中可能是别的值。
余数初始值就是余数的一个初始值,结果异或值就是在计算完余数之后要再用结果异或值进行一次异或运算。

CRC一般的实现有一个反向bit顺序,这个和一些硬件的传输顺序有关系,有些硬件设备是把最低位的bit先进行传输,同时在软件层面代码的实现上会更简单一点,相当于我们把最低位当成了最高位,在一些教科书中可能不会明确这个具体实现上的差异。逆向计算的时候,0x4c11db7的逆向表示是0xedb88320。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub fn crc32(data: &[u8]) -> u32 {
// 余数预设值
let mut crc: u32 = 0xffffffff;
for byte in data {
crc ^= u32::from(*byte);
for _ in 0..8 {
// 如果高位是1则进行模二除法(异或)
if crc & 1 == 1 {
crc = (crc >> 1) ^ 0xedb88320;
} else {
//如果高位是0则进行
crc >>= 1;
}
}
}
// 余数异或值
crc ^ 0xffffffff
}

当然这个实现是最简单的版本,每次只进行1bit的位移。

CRC32有一个查表法的快速计算方法。CRC32的查表法是一种用于加速计算的优化方式,通过预计算并存储查表,可以显著提高CRC的计算速度。这种优化的核心思想是将CRC的计算过程中的每个字节或更小颗粒度的数据提前计算并存储在一个查表中,以便在实际计算中直接查表获取结果,而不需要逐位进行运算。
如果是一个字节则要保存一张256大小的除法表。

在实际应用中,可以按照字节、4位或16位的颗粒度进行预计算并存成表。这样,每次计算CRC时,只需按颗粒度查表,从而大幅减少了计算的复杂度,提高了效率。

除了查表法,CRC32的另一种优化方式是通过指令级优化,主要依赖于SSE(Streaming SIMD Extensions)和PCLMULQDQ指令集。

SSE指令集:SSE是Intel引入的一组指令,用于执行单指令多数据(SIMD)操作。通过使用SSE指令,可以同时对多个数据进行相同的操作,从而提高并行计算能力,加速CRC32的计算过程。

PCLMULQDQ指令集:PCLMULQDQ指令集是Intel和AMD的x86-64架构中的一组指令,用于执行多项式乘法。CRC32的计算可以看作是多项式乘法,因此使用PCLMULQDQ指令集可以更高效地执行这一计算过程。

通过结合SSE和PCLMULQDQ指令集,可以在硬件层面上进一步提升CRC32的计算性能,使其更适用于高性能计算和大规模数据处理。一些快速计算crc的库基本是运用了

VARINT

Varint使用7个比特的组合来表示整数的值,其中每个字节的最高位用于指示是否有更多的字节。如果最高位为1,表示后面还有一个字节;如果最高位为0,表示这是最后一个字节。这种设计使得解码过程相对简单,而且可以高效地处理不同大小的整数。
下面是一个转化varint为u64的例子,这里定义了一个Varint的trait,使用这个trait是为了展示Rust的Trait的一个特点:可以
给外部对象定义方法。这样通过use Varint,u64就会拥有to_varint_bytes的方法。
补充说明:Rust不允许给外部对象定义外部的Trait的实现,如果可以的话相当于可以篡改外部模块的实现了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
pub trait Varint {
// Define your trait methods here
fn to_varint_bytes(self) -> Vec<u8>;
fn from_varint_bytes(data: &[u8]) -> (Self, usize)
where
Self: Sized;
}

impl Varint for u64 {
fn to_varint_bytes(self) -> Vec<u8> {
let mut value = self;
let mut bytes = Vec::new();

loop {
// 0b是表示binary的表示方法
let mut byte = (value & 0b0111_1111) as u8;
value >>= 7;

if value != 0 {
byte |= 0b1000_0000;
}

bytes.push(byte);

if value == 0 {
break;
}
}

bytes
}

fn from_varint_bytes(data: &[u8]) -> (Self, usize) {
let mut value = 0;
let mut shift = 0;
let mut bytes_read = 0;

for &byte in data {
value |= ((byte & 0b0111_1111) as u64) << shift;
shift += 7;
bytes_read += 1;

if byte & 0b1000_0000 == 0 {
break;
}
}

(value, bytes_read)
}
}

内部Key

存储于数据库中的Key携带了额外信息:Sequence和Meta。
Sequence是一个单调递增的序列值,meta用于保存key的类型比如是一个用于标记删除的key。
其结构如下:

| User key (string) | sequence number (7 bytes) | meta (1 byte) |

Rust定义如下:

1
2
3
4
5
6
pub struct InternalKey {
pub user_key: Vec<u8>,
trailer: u64,
}
const INTERNAL_KEY_SEQ_NUM_MAX: u64 = (1 << 56) - 1;

编解码的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

fn make_trailer(seq: u64, kind: InternalKeyKind) -> u64 {
(seq << 8) | (kind as u64)
}

pub fn decode(mut encoded_key: Vec<u8>) -> Option<InternalKey> {
if encoded_key.len() >= 8 {
let trailer =
u64::from_le_bytes(encoded_key[encoded_key.len() - 8..].try_into().unwrap());
encoded_key.resize(encoded_key.len() - 8, 0);
return Some(InternalKey {
user_key: encoded_key,
trailer,
});
}
None
}
pub fn encode(self) -> Vec<u8> {
let mut encoded_key = self.user_key;
encoded_key.extend_from_slice(&self.trailer.to_le_bytes());
encoded_key
}

trailer是一个u64,高位56bit保存sequence number,最后一个字节保存meta,目前只保存了key的类型标志。
Key的类型一般有两种一种是插入一种是删除,对于一个key的删除就是插入一个带类型为deletion的key的internal key。
其他的类型也可以扩展这最后的字节里面。

在Rust中如果一个对象实现了Deref,编译器可以自动帮助该对象进行引用的类型转换,这样就是为什么函数参数一般用
&str而不是用&String,因为&String可以被编译器自动转换成&str。为了让编译器让&InternalKey可以自动转换成
&[u8],我们实现如下的DerefTrait。标准库中的PathPathBufStringstr[u8]的引用都是
实现了彼此的Deref所以看到这些类型和函数的入参不一致的时候可以看看是不是有这个规则被编译器自动解引用了。

1
2
3
4
5
6
7
8
// Implementing Deref for InternalKey
impl Deref for InternalKey {
type Target = [u8];

fn deref(&self) -> &Self::Target {
&self.content
}
}

在如下的代码中会方便很多,尽管参数是&[u8]但还是依旧可以用&InternalKey作为参数。

1
2
3
4
5
6
7
8
fn takes_slice(slice: &[u8]) {
println!("{:?}", slice);
}

fn main() {
let key = InternalKey::new(b"123");
takes_slice(&key); // Now this will implicitly call deref()
}

但有些看起来可以自动deref的其实不行例如:&u64&usize,因为usize不一定是64位的所以标准库没有实现AsDeref。

Sequence Number

我们的数据库目前没有完整的事务,但是提供了一定的读的一致性视图和批量写入的原子性,
Sequence Number作为Key的一部分保存了对应的版本信息。
Sequence Number是为了实现Snapshot和原子的批量而依赖的,类似多版本控制的版本,是一个单增的序号。
对于一个读,特别是遍历的时候,会不读取比当前snapshot的sequence大的key,从而保证读的试图的一致性。
在批量写的情况中也是类似的。例如下面的例子来自于LevelDB的文档:将key1的值移动到key2,如果put key2成功之后在del key1之前失败了,
那么就会存在两个key存储了同一个值。

1
2
3
value = get key1
put key2 = value
del key1

通过原子的批量写可以避免这个问题

1
2
3
4
5
value = get key1
wb = new write batch
wb.del key1
wb.put key2 = value
wb.write

只有全部写入以后seq才能增加,不然在数据库重新启动以后发现了大于commited的seq就会放弃重放这些数据。

原子批量写也可以得益于批量写的能力增加写入的带宽。

WriteBatch

WriteBatch将多个写合并成一个写来实现原子性,格式如下。

开头是一个被所有entry共享的sequence,之后跟着一个u32的计数器。

1
2
3
+-------------+------------+--- ... ---+
| SeqNum (8B) | Count (4B) | Entries |
+-------------+------------+--- ... ---+

每个entry的开头是一个byte的Kind(上文提到的Put,Del等类型标记),然后是一个或者两个varbytes,就是常见的varuint32的长度和对应N个字节的数据,取决于类型,例如删除就只有一个。

1
2
3
+-----------+-----------------+-------------------+
| Kind (1B) | Key (varstring) | Value (varstring) |
+-----------+-----------------+-------------------+

Rust的实现如下:

1
2
3
pub struct Batch {
entries: Vec<u8>,
}

设置count和sequence号,

1
2
3
4
5
6
7
fn set_count(&mut self, count: u32) {
self.entries[8..12].copy_from_slice(&count.to_le_bytes());
}

pub fn set_seqnum(&mut self, seqnum: u64) {
self.entries[..8].copy_from_slice(&seqnum.to_le_bytes());
}

put操作,del操作也是类似的只是没有value,并且meta是类型不是Value而是Deletion。

1
2
3
4
5
6
7
8
9
10
pub fn put(&mut self, key: &[u8], value: &[u8]) {
self.entries.push(InternalKeyKind::Value as u8);
self.entries
.extend_from_slice(&(key.len() as u32).to_varint_bytes());
self.entries.extend_from_slice(key);
self.entries
.extend_from_slice(&(value.len() as u32).to_varint_bytes());
self.entries.extend_from_slice(value);
self.set_count(self.count() + 1);
}

在WAL中保存WriteBatch

Snapshot

Snapshot提供一致性的读视图,本质是一个sequence生成器和管理器,任何大于snapshot的key都不会被读取,从而保证读的一致性(特别是在遍历的时候)。在获取的时候会使用sequence来搜索,小于该sequence的key不会被搜索到。
如果是墓碑值也会过滤掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
fn get_internal(&self, key: &[u8], seq: u64) -> Option<Vec<u8>> {
let lookup = InternalKey::new(key, seq, InternalKeyKind::Value);
if let Some((internal_key, value)) = self.mt.get_ge(&lookup.encode()) {
let internal_key = InternalKey::decode(internal_key).unwrap();
if internal_key.user_key == key
// not deleted
&& internal_key.trailer & 0xff != InternalKeyKind::Deletion as u64
{
return Some(value);
}
}
None
}

内部比较器

InternalKey可以重载比较符,在一些场景下比较方便。
但是一些场景我们需要对&[u8]进行比较,这个时候重载比较符是不被允许的。
这样我们定义一个额外的用于比较相关的数据结构。

可以看到上面我们使用了get_ge代表搜索大于等于这个internal key的key。
为了实现大的sequence不被搜索到我们希望seq是降序排序的。
由于user key按升序排列,但希望seq能够按“降序”排列,我们需要定义一个内部比较器,
user key 升序但是当user key 相同时seq大的更“小”。
这样我们的搜索结构就能保持不变,只要寻找的时候找到第一个“大等于” user key,seq 的internal key就可以。
在MemTable中的SkipList将Greater和Equal合并。
我们要给memtable加上get_ge的方法,寻找第一个大于等于key的函数,SkipList的搜索过程中
要多加一个条件,当level为0时,还没有找到相等的值,就返回当前的较大值。

1
2
3
4
5
6
7
std::cmp::Ordering::Greater => {
// no equal key, just return the first greater key.
if l == 0 {
return Some((next.borrow().key.clone(), next.borrow().value.clone()));
}
break;
}

自定义比较函数,使用静态结构体保存函数集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use std::{
cmp::{self, min, Ordering},
mem,
};

#[derive(Clone, Copy)]
pub struct Comparator {
pub cmp: fn(&[u8], &[u8]) -> Ordering,
}

pub const INTERNAL_COMPARATOR: Comparator = Comparator { cmp: internal_cmp };

// user key + seno + meta
pub(super) fn internal_cmp(a: &[u8], b: &[u8]) -> Ordering {
assert!(a.len() >= 8);
assert!(b.len() >= 8);
match &a[0..a.len() - 8].cmp(&b[0..b.len() - 8]) {
Ordering::Less => Ordering::Less,
Ordering::Greater => Ordering::Greater,
Ordering::Equal => {
let a_trailer = u64::from_le_bytes(a[a.len() - 8..].try_into().unwrap());
let b_trailer = u64::from_le_bytes(b[b.len() - 8..].try_into().unwrap());
// revsersed order
b_trailer.cmp(&a_trailer)
}
}
}

pub const BYTEWISE_COMPARATOR: Comparator = Comparator { cmp: cmp::Ord::cmp };

一个是默认的字节序比较器,一个是根据seq做降序比较的比较器,最后的一个字节是type,但是
seq递增的,所以一起比较也不会影响大小,写成静态的结构体来作为相关函数的静态工具集合,
后面还会扩展对应的方法,所以定义了两个Comparator

至于说为什么不定义成InternalKey的方法是因为,我们的比较
大部分情况下是对其他字节序列做比较,因为InternalKey有所有权,
转换成InternalKey要进行一次数据拷贝。

我们可以类似这样定义一个持有引用的对象然后给这种对象增加方法,
因为区别不是很大就还是按照上面的方式实现,没有那么面向对象。

InternalComparator是包含UserComparator的这样就可以支持用户自定义的排序排序方式。

1
2
3
4
5
type InternalKeyRef = &[u8];

impl InternalKeyRef {
cmp(self, other: Other)...
}

SSTable

SSTable(Sorted String Table)是用于各种键值数据库(如LevelDB、RocksDB和Cassandra)的基本数据结构。它旨在高效地存储和检索键值对,同时保持不变性、键有序和基于磁盘存储的原则。
SSTable是一个不可变对象,这和Rust的默认不可变属性很类似。

SSTable的内容通常由以下部分组成:

  • 排序键值数据: SSTable主要包含按特定顺序排序的键值对,以实现键的范围查询和键的单一检索。其中,值和键通常为字符串或字节序列。
  • 数据块: 数据块是SSTable中存储一系列键值对的部分,专门设计用于高效读取操作。每个块可以包含固定数量的键值对,并且通常会进行压缩以节省磁盘空间。
  • 块缓存:块缓存是一个组件,旨在通过在内存中存储频繁访问的SSTable块来提高读取性能。
  • 元数据块: SSTable通常还包括有关其自身基本信息的元数据,在文件版本、结构等方面提供帮助以正确管理和读取SSTable属性等详细信息。
  • 索引块: 索引模块在快速数据检索方面起着关键作用,其中包含元数据和与之相关联的键引用信息,能够快速获取从特定文件偏移量到相应数据块之间映射关系,并支持高效随机访问特定键值对,索引块是一种元数据块。
  • 布隆过滤器: 某些情况下,SSTable可能还会附带Bloom过滤器作为概率型数据结构来判断是否存在特定键,在查找操作期间允许系统跳过不必要I/O请求并减少磁盘读取次数。
  • 压缩:SSTable中的压缩涉及到应用算法来减少数据块的大小。常用的压缩算法有Snappy、LZ4和zlib。
  • 墓碑: 在支持删除功能的数据库中,SSTable可能会包括墓碑来标记已删除的密钥,并确保这些被删除密钥仍然被考虑进去以保持一致性。
  • 文件格式: SSTable内容采用特定文件格式进行组织和编码,在不同数据库系统之间可能存在差异;该设计旨在实现高效读写操作及合并重叠SSTable、有效管理磁盘空间等压缩过程。
  • 校验和: 一些SSTable可能包含校验和或哈希值,在读取操作期间验证数据完整性;校验和有助于确保存储或传输过程中没有损坏。

RocksDB中的SSTable的格式如下:

LevelDB是每个block有一个专有的filter,但考虑到我们的sst文件的filter并不大,所以参考RocksDB的默认实现只用一个full filter。

每个SSTable的block的最大的大小是典型的和page大小一致的4K大小,在一般的数据库或者存储系统中都会使用这个大小,因为
这个大小是page cache的大小,和page cache大小对齐(当然也可以倍数)能够充分利用操作系统的 page cache。

目前我们先实现一个只有一个block的SSTable,规定SSTable的第一层只能是4K,先实现block内部的数据结构。后面我们可以扩展
多个block的SSTable。

我们希望Table有搜索的能力(使用于查找某个键的时候),也有遍历的能力(使用于归并的时候)。

搜索基于二分查找,读取每个restart的开头key作为比较的key,二分查找对应的restart(该restart是key所在的区间)然后遍历restart查找该key。

Data Block构建器

BlockBuilder对key的存储是前缀压缩的,对于有序的字符串来讲,这能极大的减少存储空间。但是却增加了查找的时间复杂度,为了兼顾查找效率,每隔K个key,leveldb就不使用前缀压缩,而是存储整个key,这就是重启点(restartpoint)。重启点不依赖之前的前缀读取完整的key,可以作为二分查找的分界点,合适的间隔是对压缩效率和查询效率的平衡。

1
2
3
4
5
6
<beginning_of_file>
[data block 1]
[meta block 1: filter block]
[metaindex block]
[Footer]
<end_of_file>

其中一个block由多个restart构成。

1
2
3
restart 1
restart 2
restarts_length

每个restart由一个header和一对kv构成,plen表示和之前的key的共享长度的部分可以省略存储的空间,klen表示不共享的长度部分,
value表示值的长度。

1
2
3
4
5
plen
klen
vlen
key
value

作为可选项,可以对block进行压缩,所以block后会追加一个字节表示压缩算法再追加一个crc32的校验值,总共要多5个字节。
如果设置了这些可选项,那block的最终格式是:block data | type(1B) | crc32(4B)

压缩

Snappy

Snappy 是由 Google 开发的一种快速数据压缩和解压缩算法。
它专注于提供较高的压缩速度和相对较快的解压速度,适用于需要在低延迟环境中传输大量数据的应用场景。
Snappy 不是通用的无损压缩算法,因此它可能不适用于所有类型的数据。

Snappy 的压缩算法基于一系列的变长编码和字典压缩。
它使用两种主要类型的标记:字面值标记和复制标记。
字面值标记用于表示原始数据的一部分,而复制标记用于表示先前出现过的数据块的重复。
这使得 Snappy 在处理一些特定模式的数据时能够取得很好的压缩效果。

Data Block读取

  1. Block 结构

    • Block 结构包含两个字段,content 是块的二进制内容,restarts 是存储重启点偏移的数组。
  2. Block::new 方法

    • 通过 Read 实例读取整个块的内容。
    • 解析末尾的重启点数量,获取重启点偏移数组。
    • 截断块的内容,去除重启点信息,返回 Block 实例。
  3. Block::restart_iter 方法

    • 根据给定的重启点索引,初始化 RestartIterator 实例,用于迭代重启点范围内的条目。
    • 计算重启点的起始和结束位置。
  4. Block::iter 方法

    • 返回 BlockIterator 实例,用于迭代整个块的内容。
    • 将块的内容、重启点数组、以及迭代状态传递给 BlockIterator
  5. RestartIterator 迭代过程

    • 迭代器按顺序遍历重启点范围内的条目。
    • 每个条目由键和值组成,通过解析块内容的头部信息获取长度信息,依次读取键和值。
  6. BlockIterator 迭代过程

    • 迭代器按顺序遍历整个块的内容。
    • 对于每个条目,解析块内容的头部信息,获取键和值的长度,依次读取键和值。
  7. Block::get_ge 方法

    • 使用二分查找在块的重启点中找到第一个大于等于给定键的位置。
    • 通过迭代器查找该位置对应的键值对,返回找到的值。
  8. Block::get 方法

    • 使用二分查找在块的重启点中找到等于给定键的位置。
    • 通过迭代器查找该位置对应的键值对,返回找到的值。

总体来说,块的读取过程通过迭代器实现对块内容的顺序访问,而二分查找用于在重启点中快速定位目标位置,提高查找效率。

统一Filter Block构建

分块的Filter Block构建

Full Filter适合SSTable的key比较少的情况,每个SSTable用一个full filter即可,如果SSTable的key较多的情况下,full filter一次性要load太多
不太合适,所以可以按照每个block的规模去创建block级别的fitler。
LevelDB的设计是用base划分filter block,只要data block落在一个base的区间内那么data block的filter负责人就是对应base的filter。
base一般是11也就是2048,举例来说,如果两个block的开始offset都小于2048那么都由第0个filter来管理。
block_offset / base = filter index. filter的结构如下。一个filter可以管理多个block,一个block很大的话会有空的filter,只要这个filter在block范围内。
空的filter的offsets是和之前的offset一样。这里的空的filter和EmptyFilterPolicy又是两回事了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[filter 0]
[filter 1]
[filter 2]
...
[filter N-1]

[offset of filter 0] : 4 bytes
[offset of filter 1] : 4 bytes
[offset of filter 2] : 4 bytes
...
[offset of filter N-1] : 4 bytes

[offset of beginning of offset array] : 4 bytes
lg(base) : 1 byte

这个设计在RocksDB里面被废弃了而采用了完整Filter Block。

布隆过滤器

LevelDB BloomFilter
Double Hash
Bloom Filter是一种空间效率极高的概率型数据结构,用于判断一个元素是否在一个集合中。它可能会产生误判,即判断一个不存在的元素存在,但不会误判存在的元素。

Bloom Filter的基本原理是,当一个元素被插入集合时,通过K个哈希函数将这个元素哈希成K个位置,然后将这些位置的位都设为1。检查一个元素是否在集合中时,通过同样的哈希函数找到K个位置,如果任何一个位置的位为0,则元素一定不在集合中;如果所有位置的位都为1,则元素可能在集合中。

在LevelDB中,Bloom Filter被用作SSTable的一部分,用于减少不必要的磁盘读取。当查找一个键时,首先使用Bloom Filter判断这个键是否可能在SSTable中,如果可能在,则进行磁盘读取;否则,直接跳过这个SSTable,从而减少了不必要的磁盘读取。

Bloom Filter的优点是空间效率和查询时间都极高,特别适合于元素数量巨大,但内存空间有限的场景。缺点是存在一定的误判率。

Bloom Filter本身是个bit vector,最后一个字节保存K的值。

在LevelDB的Bloom Filter中,使用了一种称为双重哈希(Double Hashing)的技术。这种技术的主要目的是为了解决哈希冲突,即当两个不同的输入产生相同的哈希值时,如何处理。

双重哈希的基本思想是,当哈希冲突发生时,不是简单地在哈希表中寻找下一个空闲位置,而是使用第二个哈希函数来确定探测序列。这个第二个哈希函数会根据输入的键值生成一个新的哈希值,这个新的哈希值会与原始的哈希值结合在一起,用于确定在哈希表中的位置。

在LevelDB的Bloom Filter中,双重哈希的实现方式是,首先使用一个哈希函数将键值哈希到Bloom Filter的某个位置,然后使用第二个哈希函数生成一个新的哈希值,这个新的哈希值用于确定在Bloom Filter中的第二个位置。这样,每个键值在Bloom Filter中都会对应两个位置,大大降低了哈希冲突的可能性,从而提高了Bloom Filter的效率和准确性。

在Bloom Filter中,K值表示我们使用的哈希函数的数量。如果K值大于2,我们将对每个插入的元素应用K个哈希函数,然后在Bloom Filter中设置对应的K个位置。

以下是一个简单的例子,假设我们有一个空的Bloom Filter,长度为m,和3个哈希函数(即K=3)。当我们插入一个元素时,我们将对这个元素应用这3个哈希函数,得到3个哈希值。然后,我们将这3个哈希值对m取模,得到在Bloom Filter中的3个位置,然后将这3个位置的位都设为1。

当我们要检查一个元素是否在集合中时,我们也会对这个元素应用这3个哈希函数,得到3个哈希值,然后查看Bloom Filter中对应的3个位置。如果这3个位置的位都为1,那么我们认为这个元素可能在集合中;如果这3个位置中有任何一个位为0,那么这个元素肯定不在集合中。

需要注意的是,随着K值的增大,误判率会降低,但是插入和查询的时间复杂度也会增大。因此,选择合适的K值是一个需要权衡的问题。

如果你有很多点查找操作(例如Get()),那么Bloom Filter可以帮助加速这些操作,相反,如果你的大部分操作是范围扫描(例如Scan()),那么Bloom Filter就没有帮助了。因为Range两端的两个key可能不存在,这样过滤器就不会生效了,大部分情况下都退化成了一次搜索大于左边界的键和从当前位置遍历直到超过右边界。

murmurhash有比较好的平衡分布的特性,计算速度快也比较简单,LevelDB使用的是最简单的murmur1。他之所以叫murmur的原因是因为他的算法就是两次乘法(multiply)和右移操作(Right shift),在版本三种这个R其实是(Rotate Left)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fn murmur1(bytes: &[u8], seed: u32) -> u32 {
let m: u32 = 0xc6a4a793;
let r = 24;
let mut h: u32 = 0;
// seed ^ ( n * m )
// may overflow
let mut h: u32 = seed ^ (bytes.len() as u32).wrapping_mul(m);
for chunk in bytes.chunks(4) {
if chunk.len() == 4 {
let w = u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
// may overflow
h = h.wrapping_add(w);
h = h.wrapping_mul(m);
h ^= h >> 16;
} else {
for (i, &b) in chunk.iter().enumerate() {
h += u32::from(b) << (i * 8);
}
// may overflow
h = h.wrapping_mul(m);
h ^= h >> r;
}
}
h
}

LevelDB使用Double Hashing模拟多个哈希函数。第一个函数是一个类似murmur的hash函数,而第二个函数则是一个将后15bit和前17bit兑换的简单函数。数据结构包含一个k用来标记hash函数的个数,bits用来保存bit数组。

1
2
3
4
pub struct BloomFilter {
bits: Vec<u8>,
k: usize,
}

Index Block

Index Block 的 key,按照RocksDB的Wiki,
Index Block 的 key >= 当前block,小于下一个block。

经历了几次优化比如把 first encode 到value中,这次就不做这个实现。

理论上用 last key 就可以,但是为了减小大小可以优化一下选择一个较小的key。

比如 [0,1,2] 和 [0,2,3] 当中 [0,1,2]和[0,2]都满足要求。

shortestseperator

shortestseperator是比较器的一部分,在这里作为index block的切分作用,所以在这里做介绍。
shortest_separator的功能:找到一个最字节串介于两个key或者边界之间。
首先找到公共前缀,如果前缀相等那么start就是分割者,如果不想等就顺序找到第一个可以+1的字节(小于255),然后
从那个字节截断就是最小的分割者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fn shortest_separator(start: &mut Vec<u8>, limit: &[u8]) {
// Iterate over common prefix of start and limit
let min_length = min(start.len(), limit.len());
let mut diff_index = 0;

while diff_index < min_length && start[diff_index] == limit[diff_index] {
diff_index += 1;
}

// Find the first differing byte
if diff_index < min_length {
let diff_byte = start[diff_index];
if diff_byte < 255 && diff_byte + 1 < limit[diff_index] {
// Increment the differing byte
start[diff_index] += 1;
// Remove the rest of the vector to make it shorter
start.resize(diff_index + 1, 0);
}
} // Do not shorten if one string is a prefix of the other
}

如果是内部key的话,如果分割者比sart短的话,就要完善这个内部ke的构成,我们需要补上一个
MAX_SEQUENCE_NUMBER << 8 | ValueType::TypeForSeek as u64,这样就可以保证分割者比start大。
这里的TypeForSeek表示这个key并没有代表使用的key,而是在index block中起索引作用的key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fn internal_shortest_separator(start: &mut Vec<u8>, limit: &[u8]) {
assert!(start.len() >= 8);
assert!(limit.len() >= 8);
let l = &start[0..start.len() - 8];
let j = &limit[0..limit.len() - 8];
let mut tmp = l.to_vec();
(UserKeyComparator.separator)(&mut tmp, j);
if tmp.len() < l.len() {
let pack = MAX_SEQUENCE_NUMBER << 8 | ValueType::TypeForSeek as u64;
tmp.extend_from_slice(pack.to_le_bytes().as_ref());
assert!(internal_cmp(start, &tmp) == Ordering::Less);
assert!(internal_cmp(&tmp, limit) == Ordering::Less);
mem::swap(start, &mut tmp);
}
}

TableBuilder

SSTable是不可修改的,只存在创建和删除。

Option

用于管理选项,例如restart_block_interval,block_size等。

TableReader

block的设计有一个好处可以根据block index去读取block。

Block Cache

LRU Cache 需要一个队列和HashMap并且,为了缓解缩的压力也可以对key分片做分段锁。
当需要从BlockHandle获取Block时会先从Block Cache当中获取。

Index 和 Filter Cache

会把level0的index block 和 filer block 缓存起来。
pin_l0_filter_and_index_blocks_in_cache

块缓存用来来缓存未压缩的数据块,它的大小一般来说可以是总内存预算的1/3左右。

LRU Cache

为了加速读取可以将最近使用的一些Table缓存到内存中加速Table的加载。
LRU Cache是个比较经典的数据结构了,但是标准库的双向链表是O(n)的删除。
如果要实现O(1)的链表可以还得自己定一个链表,实现起来比较复杂,我们先暂时容忍一下这个O(n)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
pub struct LRUCache<K, V> {
map: HashMap<K, V>,
order: VecDeque<K>,
capacity: usize,
}

impl<K: Clone + Eq + Hash, V> LRUCache<K, V> {
pub fn new(capacity: usize) -> Self {
LRUCache {
map: HashMap::with_capacity(capacity),
order: VecDeque::with_capacity(capacity),
capacity,
}
}

pub fn get_or_insert_with<F: FnOnce() -> V>(&mut self, key: &K, f: F) -> &V {
if self.map.contains_key(key) {
self.refresh(key);
self.map.get(key).unwrap()
} else {
self.put(key.clone(), f());
self.map.get(key).unwrap()
}
}

pub fn put(&mut self, key: K, value: V) {
if self.map.contains_key(&key) {
self.refresh(&key);
} else {
if self.map.len() == self.capacity {
if let Some(oldest) = self.order.pop_back() {
self.map.remove(&oldest);
}
}
self.order.push_front(key.clone());
}
self.map.insert(key, value);
}

fn refresh(&mut self, key: &K) {
if let Some(position) = self.order.iter().position(|k| k == key) {
let key = self.order.remove(position).unwrap();
self.order.push_front(key);
}
}
}

Block Cache

TODO

Table Cache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pub struct TableCache {
cache: LRUCache<u64, Table<File>>,
opt: Options,
}

impl TableCache {
fn new(opt: Options) -> Self {
Self {
cache: LRUCache::new(100),
opt,
}
}
fn get_table(&mut self, file_num: u64) -> &Table<File> {
self.cache.get_or_insert_with(&file_num, ||{
let file = std::fs::OpenOptions::new()
.read(true)
.open(format!("{}.sst", file_num))
.unwrap();
Table::from_reader(file, self.opt).unwrap()
})
}
}

MANIFEST

MANIFEST也是一个基于record的日志。
版本跟着 sst 文件走,每次插入都会有新版本。
Leveldb每次新生成sstable文件,或者删除sstable文件,都会从一个版本升级成另外一个版本。

相关文件的管理

lognum 用来创建 log 文件。
filenum 用来创建 sst 文件(加载文件时获取)。
level 和文件的映射关系。

当 manifest 文件超过一定大小后会进行压缩,按照全部新建的 edit 进行追加写。
manifest edit 时如果超过大小就会进行压缩。

不需要 version,维护一个 table 的引用计数(rc),当 rc 为 0 时就删除。

需要将 manifest 转换成符合重放的追加写的 edit,这样在重放时可以恢复成原本的 manifest。

合并

合并的基本思想是:数据按多个level组织,目标大小呈指数级增长,例如level0的SSTable的大小是4MB的话,
level1就是8MB,level2就是16MB,以此类推,这里的目标大小相当于level的一个总大小限制。
当一个level的大小超过它的目标大小时,我们选择它的一个或多个文件,并将该文件合并到下一个级别。
合并算法保证了了LSM结构的平衡条件被满足并且可以缩小磁盘占用。合并过程包含的三个主要过程:

minor compaction 和 major compaction

  • 寻找level,选择自身大小除以目标大小的比率最大的level。
  • 在合适的level中挑选合适的文件,比如和下级文件重叠最多的文件,节省空间。
  • 将文件与下一个level的文件(可能多个)进行归并排序组合成新的文件(可能多个),同时之前的墓碑值也可以在合并的时候顺带删除掉。

Major Compaction主要有三种分别为:

  1. Manual Compaction,是人工触发的Compaction,由外部接口调用产生。
  2. Size Compaction,是根据每个level的总文件大小来触发,注意Size Compation的优先级高于Seek Compaction。
  3. Seek Compaction,每个文件的 seek miss 次数都有一个阈值,如果超过了这个阈值,那么认为这个文件需要Compact。

compact_pointer 就是 round robin的标记,如果没找到就从第一个文件开始。LevelDB挑选文件的方式很简单。

读写存储放大

读写存储放大(Read/Write/Space Amplification)是数据库系统中一个重要的性能问题,特别是对于基于磁盘的存储引擎。
读写存储放大是我们在讨论合并策略的时候需要参考的重要指标。
例如:当我们为了减少存储放大把冗余的文件合并的时候就会因为重写多个冗余文件而引入写放大。当不重写文件来减少写放大的时候,又会引入
存储放大和读放大(对应的key可能需要检查多个冗余文件才能找到)。

我们衡量读写存放大的指标是这么计算的:

  • 读放大:读取过程中读取的全部数据/实际读取本身所需要的数据。
  • 写放大:写入过程中写入的全部数据/实际写入本身所需要的数据。
  • 存放大:存储数据占用的磁盘空间/实际存储的数据本身占用的磁盘空间。

数据库的大量优化除了操作执行的时间之外,很重要的性能指标就是这些“放大”,合并当中的很多优化都是围绕它们展开的。

Version

Version中的level的映射可以使用Vec,files是有序的也只需要一个Vec,然后根据file的Smallest和Largest的Key做二分搜索既可以。

Version一个引用计数器,读的时候Ref,读结束的时候UnRef,当没有引用的时候从VersionSet当中删除。
写入的时候是先写到MemTable再写到WAL,和Version没关系,但是Compact的时候有关系。

Verion和Compaction的关系是一对一的。
VersionSet包含多个Version,每个Version是串联的关系,一个Compaction会导致一个新的Version的追加。

LevelDB在合并和打开数据库的时候会删除不需要的文件,这个不需要是通过live_files获取的。
live_files来自所有的live的version,不live的Version会从version_set内删除。

合并涉及到删除操作,但是可能当前有相关文件的读操作还没有结束,所以我们希望在没有引用的情况下再删除改文件。

Version相较于MANIFEST是内存中的对象,每次重启以后只会初始化一个只有一个Version的VersionSet。
每次元数据的改动(SSTable的增加删除和移动等)更新,会导致一次MANIFEST文件的追加写,和内存中的一个新的Version的产生。

1
2
/// VersionSet managed the various versions that are live within a database. A single version
/// contains references to the files on disk as they were at a certain point.

版本保存了当前LSM结构的元信息,版本信息对应的是每个level有哪些文件存在,删除的文件不会立即删除
有可能有访问者正在访问。Version对应的版本是manifest的版本,每个版本对应了不同时刻的levels和sst的变化。

如果有读事务位于旧的文件,那么暂时就不能删除。因此利用引用计数,只要一个Verison还活着,就不允许删除该Verison管理的所有文件。当一个Version生命周期结束,它管理的所有文件的引用计数减1。

当sst文件不再被“活着”的版本引用的时候就可以删除对应的文件了。

有几种情况

  • 重启的时候对log的重放
  • minor compaction
  • major compaction 没有文件合并
  • major compaction 有文件合并

VersionEdit会以追加写的Record形式追加到manifest当中。

这样重放的时候不需要全部堆到level0。
Riak 1.3 版本做了优化,改变了目录结构,对于google 最初版本的LevelDB,所有的文件都在一个目录下,但是Riak 1.3版本引入了子目录, 将不同level的sst 文件放入不同的子目录:

sst_0
sst_1

sst_6

MANIFEST文件和LOG文件一样,只要DB不关闭,这个文件一直在增长。我查看了我一个线上环境,MANIFEST文件已经膨胀到了205MB。

试试上,随着时间的流逝,早期的版本是没有意义的,我们没必要还原所有的版本的情况,我们只需要还原还活着的版本的信息。MANIFEST只有一个机会变小,抛弃早期过时的VersionEdit,给当前的VersionSet来个快照,然后从新的起点开始累加VerisonEdit。这个机会就是重新开启DB。

LevelDB的早期,只要Open DB必然会重新生成MANIFEST,哪怕MANIFEST文件大小比较小,这会给打开DB带来较大的延迟。
MANIFEST 文件列出了构成每个级别的排序表集、相应的键范围以及其他重要元数据。每当数据库重新打开时,都会创建一个新的 MANIFEST 文件(文件名中嵌入一个新编号)。MANIFEST 文件的格式为日志,对服务状态所做的更改(如文件的添加或删除)都会附加到该日志中。

在系统(重新)启动时,最新的清单日志(manifest log)包含了数据库的一致状态。任何后续变化都会被记录到清单日志文件中。
当清单日志文件超过一定大小时,就会创建一个包含状态快照的新清单日志文件。最新的清单文件指针会被更新,文件系统也会同步。一旦成功更新到 CURRENT 文件,多余的清单日志就会被清除。
Badger

Version和FileMetadata是多对1的,当没有Version引用这个FileMetadata的时候就可以删除对应的文件。

也可以通过比较当前Version和数据库目录下的文件删除不需要的,这个在数据库重启的时候比较有用。

VersionEdit

VersionEdit是一次修改的记录,apply一次VersionEdit以后会生成一个新的Version。
VersionEdit使用的是WAL一样的Record。

1
2
3
4
+-------------+------ ......... ----------+
| Record ID | Variable size record data |
+-------------+------ .......... ---------+
<-- byte --->|<-- varies by type -->

例如删除文件 RecordID 为 DeleteFile,保存了删除了哪个level的文件号。

1
2
3
4
5
6
Mark a file as deleted from database.

+-----------------+-------------+--------------+
| kDeletedFile | level | file number |
+-----------------+-------------+--------------+
<-- byte --->|<-- Var32 -->|<-- Var64 -->|
1
2
// The MANIFEST file describes the startup state of the db -- all LSM files and what level they're
// at.

manifest记录了两个主要对象 levels 和 table。
他是一条一条的changeSet,需要replay一次才能重新构建出来。
有哪些level,这些level都有哪些SSTable。

changeSet 在rocksdb里面叫 versionedit

Manifest -> version
Version set
VersionSet
VersionSet 是一个 Version 的集合。
随着数据库状态的变化,LevelDB 内部会不停地生成 VersionEdit——进而产生新的 Version。此时,旧的 Version 可能还在被正在执行的请求使用。所以,同一时刻可能存在多个 Version。
VersionSet 用一个链表将这些 Version 维护起来,每生成一个 Version 就往这个链表尾部插入一个节点(AppendVersion)。

Levels 表示每个level包含的table信息,是一个数组,每个数组的元素是一个 table id的set。
Tables 是一个 map key是id,value包含level 和 checksum的信息。

这样根据 level 可以查找所包含的table,根据table id 可以查找所包含的 level 和 checksum。

level是从零开始连续的所以是个数组。

VersionEdit还有两个指标allowed_seeks和compaction_ptrs,一个代表的是seek操作的次数如果过多某种程度上代表了
文件被搜索的次数过多需要进行合并(TODO:补充其中的逻辑),还有一个代表的是轮转合并的文件指针,只要是为了重启的时候能从
上次的文件之后选择合并的文件,都是和合并相关的指标,在后面和合并相关的章节会展开说明。

VersionBuilder

Version是一个静态只读的数据结构,当我们需要从一个Version转换成另外一个Version的时候,
需要复制当前的Version然后删除和增加对应的文件,我们的files是用Vec保存的,没办法根据file num来做增删。
files的需要保存file num到file metadata的映射需要map,
并且这个file的顺序是有序的要按照边界的key来排序,所以要有一个复合的数据结构。
我们引入一个新的数据结构Version Builder这个Builder会将Version的file拷贝成map然后
应用VersionEdit之后转换再根据file的SmallestKey排序组装回新的Version。

除此之外这个

deleted_file上是一个file num的Set
added_file是一个file metadta的Set

VersionSet

VersionSet是一个双向循环链表,但是在Rust中没有比较好的办法不用unsafe实现
如果你看过 Rust实现链表或者too many list可能会觉得使用Rc加Weak可以,但是双向循环不行,
这个数据结构存在自己指向自己的情况 head.next=head,所以不得不用指针了。

其实不需要这个循环链表,交给Rust的ref去管理就行了。

FileMetadata不是和file一一映射的不能靠file metadata来回收。

链表还是有用的,得查找所有的存活的version。

合并时选择Level

选择文件是一个可以调优的选择,LevelDB用一个单线程的后台进程进行非常简单的RoundRobin轮询挑选。 Compaction操作在key空间中循环执行,详细讲一点就是,对于每个level,我们记录上次compaction的ending key。Level的下一次compaction将选择ending key之后的第一个文件(如果这样的文件不存在,将会跳到key空间的开始)。
也可以使用多线程的进行多文件的合并,每个文件会在一个线程中进行合并,如果有合并参与者则选择下一个文件,
这样就没有读写的冲突,选择的每个并行合并涉及的文件和目标文件互相不干扰,并行合并比较好实现。

这里我们考虑挑选文件的情况:

如果一个文件和下一级的N个文件有交集那么写放到就是近似于N倍,所有的N个文件都会被重写。
如果这个文件的key的分布很松散导致N的数量很大就会导致写放大的问题。
如果key是分布比较均匀这个写放大的效果不会太明显,但实际情况不会如此。
所以RocksDB的一种选项是挑选包含最老更新的文件,这类文件会有最密集的键的分布。或者从另一个角度理解是这个文件在这个level存在的时间够长,相同大小的情况下会保存更多的键,密度也会更大了。
kOldestSmallestSeqFirst 这里的Oldest代表最老,Smallest代表的是时间戳表示最老的更新。

另一种情况是热点键,选择最新更新是最老的文件,则代表的是最“冷”的文件,这样可以减少热点键向下一级移动。
如果一些场景更新比插入的操作更多,具有热点键范围的话可以使用这个选项。
kOldestLargestSeqFirst 这里的Oldest代表最老,Largest代表的是时间戳表示最新的更新。

如果一个文件包含很多墓碑值,它可能会减慢迭代该区域的速度,因为我们仍然需要在合并的时候迭代那些墓碑键。此外,我们越早将墓碑键压缩到最后一层,就能越早回收磁盘空间,因此这有利于提高空间效率。

我们的默认压缩优先级kByCompensatedSize考虑了这种情况。如果文件中的删除(插入墓碑)次数超过插入次数,则更有可能选择该文件进行压缩。删除次数超过插入次数越多,它就越有可能被压缩。这个选项一般是为了解决数据库中有大量的键被删除导致的空间浪费和读放大。

BadgerDB选择的不太一样,是选择最少的overlap,然后这样重写的量最少写放大就会比较少。

我们的实现使用kOldestSmallestSeqFirst这个参数,按照RocksDB的设计。

RocksDB中关于合并的配置

1
2
3
4
cf_options.level_compaction_dynamic_level_bytes = true;
opts.max_background_jobs = 6;
options.bytes_per_sync = 1048576;
options.compaction_pri = kMinOverlappingRatio;

合并是在后台进行的,level0一般都比较小,允许键有重叠,保存在MemTable中。
当MemTable到达一定大小之后,会被转化成SSTable格式刷入磁盘持久化存储。

合并的参与者是两个level,挑选两个level的算法是一个可选项。
选择需要合并的两个level在LevelDB中是level相对越“满”越应该被选为要合并的对象,
也就是如果一个level的大小相对于目标大小的比例total_size / target_size 最大那么就应该被选为合并对象。我们的理想情况应该是两个level合并之后文件的总大小减少最多,或者是让读放大减少。

选定上下两层level以后寻找overlap的部分进行归并排序。
badger 是用 top[0] 和 bottom[:]进行比较
如果是level0的话就是所有table,不是level0的话top就一个文件。

合并完成之后会删除不需要的文件。

动态的Level尺寸

dynamic level size
如果用户数据20GB,那么L6的100GB可能使用不到那么大,这样就会导致写放大。
如果能动态调整level的大小,在初期把l0的直接写入L6。

例如,假设max_bytes_for_level_multiplier=10num_levels=6,以及max_bytes_for_level_base=10MB
level 1到5的目标尺寸开始为:

1
[- - - - 10MB]

因为level 1到4的目标尺寸不适用,所以它们将不会被使用。
直到Level 5的大小增长到超过10MB,例如11MB,我们将基础目标设置为level 4,现在目标看起来是这样的:

1
[- - - 1.1MB 11MB]

随着数据的累积,尺寸目标根据level 5的实际数据进行调整。当level 5的数据达到50MB时,目标是这样的:

1
[- - - 5MB 50MB]

直到level 5的实际大小超过100MB,例如101MB。如果我们继续保持level 4作为基础级别,
它的目标尺寸需要是10.1MB,这不符合目标尺寸范围。
所以现在我们将level 3作为目标尺寸,各级别的目标尺寸看起来是这样的:

1
[- - 1.01MB 10.1MB 101MB]

同样,当level 5进一步增长时,所有级别的目标也增长,如下所示:

1
[- - 5MB 50MB 500MB]

直到level 5超过1000MB并变为1001MB,我们将level 2作为基础级别,
并将级别的目标尺寸设置为:

1
[- 1.001MB 10.01MB 100.1MB 1001MB]

依此类推…

具体可以参考leveled compaction

选择合并文件的边界

当选择文件以后我们需要划分出一个“干净的”边界(亦称clean cutatomic compaction unit)。
我们始终要保持一个版本的恒定性质:level i 的同一个key的版本要高于level i+1 的版本。
下面的描述文件用b来代表,下标作为文件的标号,其中u记作上限,l记作下限,例如,u1代表b1的上限,l2代表b2的下限。

提取来自 |compaction_files| 的最大文件 b1,然后在 |level_files| 中搜索一个文件 b2, user_key(u1) = user_key(l2)。如果找到这样的文件 b2(称为边界文件),则将其添加到 |compaction_files| 中,然后使用这个新的上限再次进行搜索。

之所以这样做的原因是因为:如果存在两个块,b1=(l1, u1) 和 b2=(l2, u2),并且 user_key(u1) = user_key(l2),如果我们压缩了 b1 但没有压缩 b2,那么后续的获取操作将产生不正确的结果(我们保持的恒定的性质:同一个user key,i层的seqnum要高于i+1层的seqnum,并且seqnum降序排序的),因为它将在第 i 层返回来自 b2 的记录而不是来自 b1,也就是b2的seqnum的key把b1中seqnum更大的key给“盖”住了。

在LevelDB当中,文件会”向右“扩展,视图去包含边界上的user key的更低的seqnum(内部key的seqnum是降序排序的)。在其他基于LevelDB的数据库中加入了一些优化。

重叠的分布

但我们讨论写放大的时候就需要考虑重叠的分布,如果一个重叠的分布比较均匀,那么范围越大重叠的文件就会越多。
理想情况下我们会用重叠的大小来代表写放大的程度(这是LevelDB隐含的一个假设),比如一个a-j的范围,分布很均匀,重叠的文件依次是:[a#10,a#1],[b#10,b#1]。
但如果分布不是很均匀 a-j 对应的是 [a#10,e#1],[f#10,f#1],这样写放大的问题就会比较小,而减少了存储放大。

RocksDB针对key的分布不均匀有一些优化,这里我们还是记住一个LevelDB的隐含考量:key的分布是均匀的,重叠越多~文件的写放大越大。

Trivial move

“Trivial move” 是 LevelDB 在执行合并操作时的一个优化策略。如果一个文件在转移到下一层时,并不与下一层的任何文件有重叠,那么这个文件的移动就被认为是 “trivial” 的,即不需要进行复杂的合并操作,可以直接移动文件。这样做的好处是减少了不必要的数据复制,从而提高了整体的性能。另外补充一点:BadgerDB
并没有使用这个策略,因为BadgerDB希望即使是不存在key的合并,但是如果文件里面有删除的key的话,一次复制而不是移动可以减少文件的体积因为合并的过程会把删除的无效key直接删除。

为了利用这个”trivial”移动的优化还需要记录”grandparents”(也就是level i+2)层的重叠文件,LevelDB 通过判断待移动的文件是否与其 “grandparents level” 中的文件有大量重叠来决定是否进行 “trivial move”。如果没有大量重叠,就可以直接移动文件,而不需要合并。这种策略有效地减少了I/O操作和提高了性能,特别是在处理大量数据时。这个大量在LevelDB里面是10*max_file_size(这里的10是每个层级的倍数),默认是25MB。

多路合并

从level到level+1需要对多个文件进行归并排序输出一个新的level+1的文件。我们要定义一个多路的归并器来处理这个问题。

创建一个CompactionState保存TableBuilder和outputfile
保存最小的snapshot,如果没有snapshots就使用最新的sequence。

VersionSet MakeInputIterator

需要的Iterator的数量:

  • level 0 合并成一个 +1 = 2
  • leve > 0 的 size 1= size +1

文件的删除

到目前为止我们的数据库都没有涉及任何的文件删除的能力,所有的老文件都还存在着,当前使用的文件被记录在log当中。
当我们重启数据库的时候我们可以把这些多余的文件删除。

当文件不再被引用的时候既可以删除,或者当Version被释放的时候比较current和目录中的文件集中清楚。
后者在数据库重启的时候比较有用。

FileMetadata

通过重载Drop,可以实现在FileMetadata没有被引用的时候删除这个文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
use std::fs;
use std::path::PathBuf;
use std::rc::Rc;
use std::io;

struct TempFile {
path: PathBuf,
}

impl TempFile {
// Creates a new TempFile instance for a given path.
pub fn new(path: PathBuf) -> Rc<Self> {
Rc::new(TempFile { path })
}
}

impl Drop for TempFile {
fn drop(&mut self) {
match fs::remove_file(&self.path) {
Ok(()) => println!("File {:?} was deleted successfully.", self.path),
Err(e) => eprintln!("Error deleting file {:?}: {}", self.path, e),
}
}
}

fn main() -> io::Result<()> {

// Example: Creating a temporary file.
let temp_path = PathBuf::from("my_temp_file.txt");
fs::write(&temp_path, "Temporary file contents")?;

let temp_file = TempFile::new(temp_path.clone());

// Now `temp_file` is an Rc<TempFile>. You can clone `temp_file` to create multiple references.
let temp_file_clone = Rc::clone(&temp_file);

// Both `temp_file` and `temp_file_clone` point to the same `TempFile` instance.
// The TempFile instance will not be dropped (and the file will not be deleted)
// until all Rc references are out of scope.
drop(temp_file);
assert_eq!(fs::metadata("my_temp_file.txt").is_ok(), true);
// Here you could check if the file still exists to verify it was deleted,
// but normally you wouldn't need to do this in your actual application.
drop(temp_file_clone);
assert_eq!(fs::metadata("my_temp_file.txt").is_ok(), false);

Ok(())
}

扩展

Range Delete

在一些将键值数据库作为基础数据库的一些分布式数据库中涉及到删表操作。
Range Delete 会在 sstable 的 meta block 标记。

干净的边界

TODO

事务

TODO

总结

实现LevelDB的过程中,体会到了Rus的很多语法糖和特性,和Go还有Python之间都有相互借鉴,是一个不吝啬引入语言语法复杂性的语言。
在于内存安全方面确实也带了一种新的思考。内容不算完全写完,有的甚至没有实现,有时间在继续补充下去吧。

Prompt缓存可以从两个角度来处理:

基于相似度的外部缓存

一种是对提示词和结果做相似性对比,对结果缓存,这一部分可以在外部来做,例如 langchain的 llm caching。具体实现方法包括:

  1. 向量化存储

    • 将prompt转换为向量表示
    • 使用向量数据库(如FAISS、Milvus等)存储
    • 通过向量相似度检索相近的历史prompt
  2. 模糊匹配

    • 使用编辑距离等算法计算文本相似度
    • 设置相似度阈值进行匹配
    • 返回最相似的历史响应
  3. 缓存策略

    • LRU(最近最少使用)淘汰
    • 时间过期机制
    • 容量限制管理

基于KV Cache的内部优化

另一种是利用 KV Cache 中的交叉注意力机制,复用相同的提示词前缀,这是ChatGPT使用的方法。其工作原理是:

  1. KV Cache机制

    • 存储每个token的Key和Value计算结果
    • 避免重复计算相同前缀
    • 提高推理性能
  2. 增量计算

    • 只对新增的token进行注意力计算
    • 复用已缓存的中间状态
    • 显著减少计算量
  3. 内存管理

    • 自动清理过期缓存
    • 动态调整缓存大小
    • 优化内存使用

在实际应用中,我们可以综合运用这两种缓存方法来优化性能:

  • 对于完全相同或高度相似的prompt,优先使用外部缓存机制
  • 对于部分重叠的prompt,则可以利用KV Cache机制
  • 具体使用哪种策略,需要根据实际场景和资源限制来权衡选择

值得注意的是,KV Cache中的Key和Value都包含了位置编码信息。这意味着要充分发挥prompt缓存的作用,需要确保提示词保持相同的前缀结构。如果提示词的位置发生变化,即使内容相同,对应的KV值也会不同。

具体来说,当两次不同的推理过程中,如果prompt具有相同的提示词前缀,那么这部分的KV计算结果是完全一致的,因此可以直接复用之前推理过程中的KV cache,从而提高推理效率。

最近有一个有趣的论文:通过使用DSL(领域特定语言)来描述prompt结构,可以更精确地控制位置编码。这种方法不仅能够缓存相同的前缀,还支持缓存相同的后缀,同时允许中间部分灵活变动,进一步提升了缓存的效率。但我个人感觉比较难用,等于给本来很灵活的prompt套上了一层结构化的描述语言,这种结构化的语言如果是一些GPT应用的开发有固定模式可能还好,但是通用场景下很难让用户能够用得起来这么专业的描述语言。

像RAG系统中的文档检索结果和固定的记忆上下文,都非常适合作为”提示词前缀”,这样可以更好地利用KV Cache机制。

在设计prompt结构时,我们应该按照内容的变化频率来排序,将越”稳定”的部分放在越前面的位置:

  1. 系统提示词(System Prompt):基本保持不变
  2. 个人记忆(Memory):对特定用户来说相对稳定
  3. RAG检索内容(Context):根据查询动态变化
  4. 对话历史(History):随交互持续更新
  5. 用户输入(User Input):每次都不同

这种由稳定到动态的排序结构可以最大化KV Cache的复用效果:

1
2
3
4
5
System: 你是一个专业的助手。请基于以下上下文回答问题。
Memory: {personal_memory_context}
Context: {retrieved_documents}
History: {chat_history}
User: {user_question}

从一些大模型的训练技术报告来看有一些比较有代表性的挑战,比如 Meta 的 Research Super Compute (RSC) 和 X 的 Grok Infra。这些技术报告中提到了一些关键的技术挑战和解决方案,包括 GPU 架构与互联、存储系统、训练的稳定性等。

X Grok Infra

Grok-1.5 Infra 的技术报告中可以窥见,Grok-1.5 在基础设施方面具有以下核心优势:

  1. 先进的分布式训练框架:基于 JAX、Rust 和 Kubernetes 的技术栈,不仅确保了高性能,还能快速适配和训练新的模型架构。
  2. 卓越的可靠性和可用性:通过自研的训练协调器,系统能够智能地检测并隔离故障节点,大幅降低训练任务中断的风险。
  3. 高效的存储与数据处理:在检查点存储、数据加载和训练作业重启等环节都进行了深度优化,将训练过程中的停机时间降至最低。

Meta Reasearch Super Compute

另一个典型案例是 Meta 的 Research Super Compute (RSC) 超算集群,在这上面训练了Llama3.2,有一份92页的技术报告,RSC的相关Talk,以及里面用到的MAST论文调度器:

算力规模

已升级至 16,000 张 H100 GPU,算力获得质的飞跃。每个服务器配备了 8 块 GPU 和 2 块 CPU。在服务器内部,八块 GPU 通过 NVLink 连接。

网络互联

采用双网络方案:

  • NVIDIA Quantum InfiniBand,带宽高达 1600 Gb/s,RoCE(RDMA over Converged Ethernet)作为补充互联方案。

网络拓扑

  • 底层网络(第一个层):每个机架(rack)包含 16 块 GPU,分散在两个服务器上,并通过一个 Minipack2 顶层网络(ToR)交换机连接。
  • 中间网络(第二层):192 个这样的机架通过 Cluster Switches 连接,形成一个包含 3,072 块 GPU 的 Pod。这种设计确保了从任何两个 GPU 之间的通信都有满速带宽,没有过度订阅。
  • 顶层网络(第三层):八个这样的 Pod 通过 Aggregation Switches 连接,形成一个包含 24,000 块 GPU 的集群。然而,顶层网络的连接没有保持满速带宽,而是存在过度订阅比例为 1:7。

负载均衡

  • Collective library 将 16 个网络流中的两个 GPU 之间的数据传输从一个流变为 16 个流。
  • Enhanced-ECMP(E-ECMP)协议 通过在 RoCE(Rdma over Converged Ethernet)报头中添加额外的字段,进行 hash 计算,从而有效地在不同网络路径上平衡 16 个流。

拥塞控制

使用深度缓冲区(deep-buffer switches)来解决在 Spine(Gangidi et al., 2024)中由于集体通信模式引起的暂时拥堵和缓冲问题。

存储系统

采用自研的 Tectonic 文件系统,通过 FUSE 提供标准的 Linux 文件系统接口,确保高效的数据访问。

  • 存储容量:240 PB,基于 7,500 台 SSD servers
  • 支持的最大吞吐量:7 TB/s
  • 支持的可持续吞吐量:2 TB/s
  • 检查点写入:非常时断时续,导致存储网络饱和
  • 检查点的目标:因为 checkpoint 非常大,最小化 GPU 停顿时间,加快检查点频率也变得非常重要

总结

从这些实践可以看出,现代 AI 基础设施主要围绕三大核心要素展开:

  • 计算能力(以 GPU 为核心)
  • 网络互联(RoCE 或 InfiniBand)
  • 存储系统

而在上层的编排调度领域,系统的容错能力和可靠性则成为关键考量因素。

GPU 架构与互联

在当前AI训练领域,主流的GPU型号主要是NVIDIA的A100、H100和H200系列,它们按照发布时间依次提供了更强大的算力和更优化的架构设计。关于GPU的详细架构,特别是其拓扑结构,可以参考这篇深度解析文章

GPU互联拓扑

GPU之间的互联拓扑结构主要取决于不同总线间的传输特性,GPU之间可以通过NVIDIA专有的NVLink高速互联技术直接通信。在现代GPU集群中,主要有以下几种互联方式:

  1. NVSwitch架构:通过NVIDIA的交换架构实现所有GPU之间的全互联
  2. 走网卡,如果卡之间没有NVSwitch的话,可以绕过CPU走网卡:
    1
    GPU0 -> PCIe -> IB(InfiniBand) -> PCIe -> GPU1
    这种通信模式由NCCL(NVIDIA Collective Communications Library)负责协调和优化。

GPU分配策略

NVIDIA开源的go-gpuallocator库提供了一系列基于拓扑关系的GPU分配策略。例如,其中的NewStaticDGX1Policy专门针对DGX-1标准配置优化。考虑到单机环境下GPU组合的可能性有限,这种基于静态规则的分配策略已经能够很好地满足需求。

这些分配策略的核心目标是最小化跨总线和跨NUMA节点的通信开销,确保GPU间通信尽可能利用最高带宽的数据通路,从而提供最优的训练性能。

跨节点的通信

在分布式训练场景下,跨节点通信需要经过更长的数据传输路径:

1
GPU -> NIC -> 叶层交换机 -> 核心交换机 -> NIC -> GPU

这种通信模式面临两个主要的优化方向:

  1. 本地化优化:尽可能将相关联的GPU任务分配在物理位置相近的节点上,以减少网络延迟。

  2. 负载均衡:避免将所有任务集中在同一交换机下,防止出现网络拥塞。过度集中可能导致局部带宽饱和,反而降低整体训练效率。

这种权衡本质上是一个网络流优化问题。通过图论中的网络流算法,可以在通信延迟和带宽利用率之间找到最优平衡点,从而实现更高效的跨节点通信。

一个分布式训练的带宽瓶颈来源于带宽最低的那条路径。

利用 Kubernetes Pod 亲和性优化网络拓扑

在 Kubernetes 环境下,我们可以通过 Pod 亲和性(Affinity)和规则来优化 GPU 任务的分配。主要可以从以下几个方面入手:

拓扑感知调度:使用 topologyKey 确保相关联的 Pod 被调度到网络拓扑上接近的节点:
例如同一个分布式训练任务(training-group = group1)尽让分配在一个机架上,同交换机,同核心交换机也是类似的。

1
2
3
4
5
6
7
8
9
10
11
12
affinity:
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 50
podAffinityTerm:
labelSelector:
matchExpressions:
- key: training-group
operator: In
values:
- group1
topologyKey: topology.kubernetes.io/rack # 同机架优先

这种方案的优势在于:

  • 配置简单,易于理解和维护
  • 充分利用 Kubernetes 原生能力,无需额外组件
  • 可以根据实际需求灵活调整权重和策略

存储系统

AI训练中的存储系统面临着两个主要挑战:

1. 海量小文件问题

AI训练数据集通常包含大量的小文件,这对传统文件系统的性能和管理造成了巨大压力。一些现代分布式文件系统提供了很好的解决方案,例如 Meta 的 Tectonic 和与其架构类似的 JuiceFS,它们采用了以下优化方案:

元数据管理优化

  • 使用元数据库管理文件结构,将 ls 命令转化为简单的字符串前缀匹配操作
  • 避免了传统 Linux 文件系统依赖 inode 管理的方式
  • 解决了 inode 臃肿问题(在传统系统中,一个 inode 的大小可能与文件本身相当)

2. Checkpoint 存储挑战

分布式训练中的 checkpoint 文件体积巨大,这在大语言模型训练中尤为明显:

  • 以 LLaMA-2-70B 为例,单个完整的 checkpoint 就需要 140GB 存储空间(FP16格式)
  • 训练过程中需要定期保存 checkpoint,累积存储需求可能达到 TB 甚至 PB 级别
  • 需要存储系统能够提供高带宽和低延迟的读写性能,同时保证数据的可靠性

这些挑战要求存储系统具备:

  • 强大的扩展性
  • 高效的数据压缩能力
  • 智能的数据分层存储机制
  • 可靠的数据备份和恢复能力

训练的稳定性

在大规模 AI 训练中,硬件故障是一个常见问题。特别是新型号显卡往往会有较高的故障率,再加上传统的硬件错误,这些都可能导致训练中断。因此,快速识别错误并恢复训练成为了一个关键挑战。目前主流的解决方案主要有以下两种:

基于 torchrun 的弹性训练

torchrun 提供了两种容错机制:简单重试和弹性训练。

  1. 简单重试模式
    通过 --max-restarts 参数配置重试次数:

    1
    2
    3
    4
    5
    6
    7
    8
    torchrun \
    --nnodes=$NUM_NODES \
    --nproc-per-node=$NUM_TRAINERS \
    --max-restarts=3 \
    --rdzv-id=$JOB_ID \
    --rdzv-backend=c10d \
    --rdzv-endpoint=$HOST_NODE_ADDR \
    YOUR_TRAINING_SCRIPT.py [script args...]
  2. 弹性训练模式
    通过设置 nnodes 的范围来支持动态节点数:

    1
    2
    3
    4
    5
    6
    7
    8
    torchrun \
    --nnodes=1:4 \ # 支持1-4个节点的动态伸缩
    --nproc-per-node=$NUM_TRAINERS \
    --max-restarts=3 \
    --rdzv-id=$JOB_ID \
    --rdzv-backend=c10d \
    --rdzv-endpoint=$HOST_NODE_ADDR \
    YOUR_TRAINING_SCRIPT.py [script args...]

弹性训练模式需要配置服务发现机制,默认使用 c10d 作为内置的节点发现服务,也支持使用 etcd 等外部服务。

当节点发生变化时,系统会自动处理以下场景:

  • 节点离开(缩容):系统通知 agent,停止现有 workers,重新组建 WorkerGroup,使用新的 RANK 和 WORLD_SIZE 启动所有 workers
  • 节点加入(扩容):接纳新节点,按照相同流程重组 WorkerGroup

基于 DeepSpeed 的弹性训练

DeepSpeed 提供了更细粒度的弹性训练配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"elasticity": {
"enabled": true,
"max_train_batch_size": "seqlen",
"micro_batch_sizes": 8,
"min_gpus": 1024,
"max_gpus": "fixed_linear",
"min_time": "seqlen",
"version": 8,
"ignore_non_elastic_batch_info": 1024,
"num_gpus_per_node": "fixed_linear",
"model_parallel_size": MODEL_PARALLEL_SIZE
}
}

DeepSpeed 的特点是:

  • 支持动态调整 batch size
  • 以 GPU 为粒度进行弹性伸缩(而不是节点级别)
  • 提供更丰富的训练参数配置

弹性训练控制器

要实现完整的弹性训练支持,控制器需要:

  1. 依赖服务发现机制进行节点注册和健康检查
  2. 动态调整弹性策略(如 min_nodes、max_nodes 等参数)

对于简单的降级场景,通过静态配置即可实现:

  • 将 max_nodes 设置为总资源规格
  • 将 min_nodes 设置为最小运行要求(如设置为 1:4 表示支持 1-4 张显卡的动态伸缩)

节点的问题发现

在大规模语言模型(LLM)预训练过程中,常见的硬件异常包括:

  1. GPU ECC 错误:当 GPU 发生不可纠正的显存 ECC(Error Correcting Code)错误时,通常需要重置 GPU 或重启节点来清除这个错误。

  2. Infiniband(IB)/NCCL 问题:这类问题通常源于硬件故障,如网卡损坏或网络抖动,可能导致训练速度下降或任务异常中断。

  3. 任务挂起(Hang):通常与 IB/NCCL 问题相关,需要人工检测和处理。

  4. GPU 掉卡:此时一般会触发 CUDA 错误或程序异常退出,可能需要重置 GPU 或重启节点来解决。

  5. 机器异常:包括 GPU 之外的硬件异常,如硬盘、CPU 等,甚至整机故障,可能需要更换硬件或进行系统维护。

  6. 机器配置异常:例如,某台机器意外启用了 MIG(多实例 GPU),可能影响训练任务的正常运行。

  7. 集群维护:集群中的其他任务或系统维护、升级,可能需要暂停当前训练任务。

可以使用node-promblem-detector
node-problem-detector 是一个用于在集群管理栈的上游层次中使各个节点问题可见的守护进程。它在每个节点上运行,检测节点问题并将其报告给 apiserver。

监控和容错是一个比较难的问题,需要结合硬件和软件的特性,以及业务需求,进行综合考量。
特别是万卡集群,MFU 只有 50%左右。

在训练 OPT-175B 模型的过程中,Meta团队使用了 992 个 80GB 的 A100 GPU,每个 GPU 实现了约 147 TFLOP/s 的性能,对应的机器浮点利用率(MFU)约为 47%(147/312)。

为了应对可能的硬件故障,团队额外准备了 12 台备用机器,以便在出现问题时进行替换。在训练期间,平均每天约有 2 台机器发生故障,即每台机器每天发生故障的概率约为 1.61%。

整个训练过程持续了约 2 个多月,包括从 2021 年 10 月 20 日到 2021 年 11 月 11 日的测试阶段,以及从 2021 年 11 月 11 日到 2022 年 1 月 6 日的正式训练阶段,正式训练约 57 天。

根据预估,实际训练时间应为约 25 天,但由于各种问题,实际有效训练时间仅占总时间的约 44%。在前期,由于各种问题,团队至少手动重启了 35 次任务。为减少人工干预,后续引入了自动重启机制,但由于硬件故障,仍触发了超过 70 次重启,平均每天需要重启一次任务。

这些经验表明,在大规模模型训练中,硬件故障和其他问题会显著影响训练效率。为此,团队采取了多种措施来应对这些挑战,包括准备备用硬件、引入自动重启机制等,以确保训练过程的顺利进行。

这个问题在用新的卡的时候会有更多问题。

总结

To train our largest Llama 3 models, we combined three types of parallelization: data parallelization, model parallelization, and pipeline parallelization. Our most efficient implementation achieves a compute utilization of over 400 TFLOPS per GPU when trained on 16K GPUs simultaneously. We performed training runs on two custom-built 24K GPU clusters. To maximize GPU uptime, we developed an advanced new training stack that automates error detection, handling, and maintenance. We also greatly improved our hardware reliability and detection mechanisms for silent data corruption, and we developed new scalable storage systems that reduce overheads of checkpointing and rollback. Those improvements resulted in an overall effective training time of more than 95%. Combined, these improvements increased the efficiency of Llama 3 training by ~three times compared to Llama 2.

LLAMA3 的技术博客揭示了许多令人振奋的优化成果,这些优化背后蕴含着大量值得深入研究的技术细节。虽然我们可能难以直接接触如此大规模的训练集群及其面临的挑战,但这些技术进展仍然为整个 AI 基础设施领域提供了宝贵的参考和启发。

ebpf是内核当中一个非常重要的功能,简单来说就是以虚拟机的形式提供一种在内核中执行嵌入字节码的能力。
ebpf定义的指令集也非常简答,但是写起来比较麻烦所以作为工具提供了libbpf和bpf-tool。
用户可以通过写C的形式写ebpf程序嵌入到内核中,同时在用户态进行交互,用户态的程序没有语言限制。
ebpf特性的对应内核版本列表在,后面例子中的ringbuffer map是要内核版本在5.8以上。
ebpf的架构和工具链的一个比较完整的文档是cilium的一个文档

vmlinux.h是通过bpftool生成的一个虚拟头文件,用于访问内核数据结构。

libbpf 和 bpftool 都是在内核的代码仓库里面开发的。对应的目录分别是tools/lib/bpftools/bpf,帮助用户用c开发和调试ebpf程序。
相当于说bpftool是ebpf的调试和观测工具,libbpf是一提供给开发者的ebpf库。

bpf_herplers.h有一个section的定义SEC用来决定在ebpf的.o对象文件中的位置,以及一些功能。
比如SEC(kprobe/xxx)就代表修饰的程序会嵌入到内核函数的调用中。

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* Helper macro to place programs, maps, license in
* different sections in elf_bpf file. Section names
* are interpreted by libbpf depending on the context (BPF programs, BPF maps,
* extern variables, etc).
* To allow use of SEC() with externs (e.g., for extern .maps declarations),
* make sure __attribute__((unused)) doesn't trigger compilation warning.
*/
#define SEC(name) \
_Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wignored-attributes\"") \
__attribute__((section(name), used)) \
_Pragma("GCC diagnostic pop") \

libbpf.c 有对应的SEC的定义。

1
2
3
4
5
6
#define SEC_DEF(sec_pfx, ptype, ...) {                                      \
.sec = sec_pfx, \
.len = sizeof(sec_pfx) - 1, \
.prog_type = BPF_PROG_TYPE_##ptype, \
__VA_ARGS__ \
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static const struct bpf_sec_def section_defs[] = {
BPF_PROG_SEC("socket", BPF_PROG_TYPE_SOCKET_FILTER),
BPF_EAPROG_SEC("sk_reuseport/migrate", BPF_PROG_TYPE_SK_REUSEPORT,
BPF_SK_REUSEPORT_SELECT_OR_MIGRATE),
BPF_EAPROG_SEC("sk_reuseport", BPF_PROG_TYPE_SK_REUSEPORT,
BPF_SK_REUSEPORT_SELECT),
SEC_DEF("kprobe/", KPROBE,
.attach_fn = attach_kprobe),
BPF_PROG_SEC("uprobe/", BPF_PROG_TYPE_KPROBE),
SEC_DEF("kretprobe/", KPROBE,
.attach_fn = attach_kprobe),
BPF_PROG_SEC("uretprobe/", BPF_PROG_TYPE_KPROBE),
BPF_PROG_SEC("classifier", BPF_PROG_TYPE_SCHED_CLS),
BPF_PROG_SEC("action", BPF_PROG_TYPE_SCHED_ACT),
SEC_DEF("tracepoint/", TRACEPOINT,
.attach_fn = attach_tp),
SEC_DEF("tp/", TRACEPOINT,
.attach_fn = attach_tp),
SEC_DEF("raw_tracepoint/", RAW_TRACEPOINT,
.attach_fn = attach_raw_tp),

kprobe的attach方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct bpf_link *bpf_program__attach_kprobe(struct bpf_program *prog,
bool retprobe,
const char *func_name)
{
char errmsg[STRERR_BUFSIZE];
struct bpf_link *link;
int pfd, err;

pfd = perf_event_open_probe(false /* uprobe */, retprobe, func_name,
0 /* offset */, -1 /* pid */);
if (pfd < 0) {
pr_warn("prog '%s': failed to create %s '%s' perf event: %s\n",
prog->name, retprobe ? "kretprobe" : "kprobe", func_name,
libbpf_strerror_r(pfd, errmsg, sizeof(errmsg)));
return libbpf_err_ptr(pfd);
}
link = bpf_program__attach_perf_event(prog, pfd);
err = libbpf_get_error(link);
if (err) {
close(pfd);
pr_warn("prog '%s': failed to attach to %s '%s': %s\n",
prog->name, retprobe ? "kretprobe" : "kprobe", func_name,
libbpf_strerror_r(err, errmsg, sizeof(errmsg)));
return libbpf_err_ptr(err);
}
return link;
}

根据函数名用perf_event_open_probe注入program

内核对于oom的处理主要在mm/oom_kill.c中。
oom的触发就是在内存无法分配的时候,选择一个最“差”的进程发送kill信号。
oom_kill_process这个函数是主要入口,定义是static void oom_kill_process(struct oom_control *oc, const char *message)
其中oc->victim是要被杀掉的进程,如果有cgroup会把相同内存cgroup的进程都杀掉,所以如果要检测一个进程的oom可以通过检测这个函数的参数做到。

下面这段就是OOM的时候dmesg看到的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

/* Get a reference to safely compare mm after task_unlock(victim) */
mm = victim->mm;
mmgrab(mm);

/* Raise event before sending signal: task reaper must see this */
count_vm_event(OOM_KILL);
memcg_memory_event_mm(mm, MEMCG_OOM_KILL);

/*
* We should send SIGKILL before granting access to memory reserves
* in order to prevent the OOM victim from depleting the memory
* reserves from the user space under its control.
*/
do_send_sig_info(SIGKILL, SEND_SIG_PRIV, victim, PIDTYPE_TGID);
mark_oom_victim(victim);
pr_err("%s: Killed process %d (%s) total-vm:%lukB, anon-rss:%lukB, file-rss:%lukB, shmem-rss:%lukB, UID:%u pgtables:%lukB oom_score_adj:%hd\n",
message, task_pid_nr(victim), victim->comm, K(mm->total_vm),
K(get_mm_counter(mm, MM_ANONPAGES)),
K(get_mm_counter(mm, MM_FILEPAGES)),
K(get_mm_counter(mm, MM_SHMEMPAGES)),
from_kuid(&init_user_ns, task_uid(victim)),
mm_pgtables_bytes(mm) >> 10, victim->signal->oom_score_adj);
task_unlock(victim);

对于内核函数的检测需要ebpf当中的kprobe的能力。
kprobe类似单步调试的能力,在函数入口插入一个breakpoint,然后通过trap让执行流转到注册的ebpf的程序。

oomkill的内核中的ebpf程序主要参考datadog的agent的实现。
框架程序主要参考cilium/ebpf当中的例子

PT_REGS_PARM1是一个宏可以帮助读取内核函数的参数,因为按照约定ebpf都是通过寄存器传参的,所以其实返回的是第1个参数对应的寄存器。bpf_probe_read是一个用于读取内存到ebpf程序中的辅助函数。
所以oomkill的ebpf实现的内核中的代码就比较简单,嵌入oom_kill_process的函数调用,或者要被结束的进程复制出进程的pid和command,然后通过类型为BPF_MAP_TYPE_RINGBUF的map发送event。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// +build ignore

#include "vmlinux.h"
#include "bpf_helpers.h"
#include "bpf_tracing.h"

char __license[] SEC("license") = "Dual MIT/GPL";

struct event {
u32 pid;
u8 comm[80];
};

struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} events SEC(".maps");

// Force emitting struct event into the ELF.
const struct event *unused __attribute__((unused));

SEC("kprobe/oom_kill_process")
int kprobe_oom_kill_process(struct pt_regs *ctx) {
struct event *task_info;
struct oom_control *oc = (struct oom_control *)PT_REGS_PARM1(ctx);

task_info = bpf_ringbuf_reserve(&events, sizeof(struct event), 0);

if (!task_info) {
return 0;
}

struct task_struct *p;
bpf_probe_read(&p, sizeof(p), &oc->chosen);
bpf_probe_read(&task_info->pid, sizeof(task_info->pid), &p->pid);
bpf_probe_read(&task_info->comm, sizeof(task_info->comm), (void *)&p->comm);

bpf_ringbuf_submit(task_info, 0);

return 0;
}

用户态的程序也比较简单,把ebpf的对象文件attach以后读取ringbuffer别解析event。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"golang.org/x/sys/unix"
)

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -type event -target amd64 bpf oom_kill_kernel.c

func main() {
// Name of the kernel function to trace.
fn := "oom_kill_process"

// Subscribe to signals for terminating the program.
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)

// Allow the current process to lock memory for eBPF resources.
if err := rlimit.RemoveMemlock(); err != nil {
log.Fatal(err)
}

// Load pre-compiled programs and maps into the kernel.
objs := bpfObjects{}
if err := loadBpfObjects(&objs, nil); err != nil {
log.Fatalf("loading objects: %v", err)
}
defer objs.Close()

fmt.Println("probe")
// Open a Kprobe at the entry point of the kernel function and attach the
// pre-compiled program. Each time the kernel function enters, the program
// will emit an event containing pid and command of the execved task.
kp, err := link.Kprobe(fn, objs.KprobeOomKillProcess, nil)
if err != nil {
log.Fatalf("opening kprobe: %s", err)
}
defer kp.Close()
fmt.Println("attach done")

// Open a ringbuf reader from userspace RINGBUF map described in the
// eBPF C program.
rd, err := ringbuf.NewReader(objs.Events)
if err != nil {
log.Fatalf("opening ringbuf reader: %s", err)
}
defer rd.Close()

// Close the reader when the process receives a signal, which will exit
// the read loop.
go func() {
<-stopper

if err := rd.Close(); err != nil {
log.Fatalf("closing ringbuf reader: %s", err)
}
}()

log.Println("Waiting for events..")

// bpfEvent is generated by bpf2go.
var event bpfEvent
for {
record, err := rd.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Println("Received signal, exiting..")
return
}
log.Printf("reading from reader: %s", err)
continue
}

// Parse the ringbuf event entry into a bpfEvent structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Printf("parsing ringbuf event: %s", err)
continue
}

log.Printf("pid: %d\tcomm: %s\n", event.Pid, unix.ByteSliceToString(event.Comm[:]))
}
}

使用一个python脚本进行测试OOM。

1
2
3
a="111111111111111"
while True:
a+=a

最后运行结果

1
2
3
4
probe
attach done
Waiting for events..
pid: 16541 comm: python

ebpf的能力还不止于此,业界做了很多流量控制,性能监控的实践,包括calico用ebpf的实现替代kube-proxy,waeve-scope 用ebpf记录tcp连接,cilium 也用eBPF,结合了tc和XDP。datadog-agent 也是使用了ebpf做一些监控,上面的oomkill的例子也是参考datadog的。ebpf能够让linux内核变得更动态更灵活。甚至有一种说法是eBPF让Linux内核正在变成微内核。

Knative主要有两个重要的部分,一个是自动扩展一个是流量切换,并且目前的设计已经和istio是相对独立的了。

Knative的service和k8s的service容易混淆,所以用sks指代knative的service,然后service本身是指k8s本身的service。

sks的revision创建的时候会有两个service,一个是 public service,一个是 private service,如果用istio的话会看到一个和ingressgateway关联的有externalname的service,这个service是和ingress实现相关的。主要的实现还是public和private两个service,实现和具体的ingress的实现是独立的。

Private service 对应的是真实的pod的endpoints,public service 有两种模式一个serve模式,一个是proxy模式。当public service出于proxy模式时,其endpoints指向的是activator,当处于serve模式时endpoints指向的是private service所指向的后端endpoints。

在scale从0到1的过程中,会先阻塞在activator上,当有pod启动以后,还是会保持proxy模式,直到超过burst才会切换到private service的endpoints上。在从1到0的过程中会再切换回activator,直到有新的请求到来再触发pod的启动。

activator

Throttler

Throttler is the interface that Handler calls to Try to proxy the user request

Health Handler 注册用于 kubelet 做 readiness 和 health probe 的接口,返回statSink的状态,收到 signal term 的时候就开始返回500。

Network probe handler
knative组件用来 probe 的接口,在header里面会有区分。

Context handler
把header中的revision的name和namespace注入的context当中

Metric Handler
收集request的qps等metrics的信息。

Log Handler
请求日志

Tracing Handler
Trace spans

Cocurrency report Handler

记录请求信息,这些信息会被reporter上报

Activator handler

过一层 throttler 进行proxy,如果没受限制就会proxy request。

Throttler 会根据revID创建一个throttler,revision如果存在的话就会创建throttler。(revision肯定是一直在的哪怕没有起pod)如果超过revision的并发数就会退出。

Throttler 会try对应的revisionThrottler的pods然后转发过去。

controller

Controller 主要是对用户使用的几个CRD的同步:Service、Route、Revision、Configuration。

net-istio

Ingress 的一种实现

internal 的 ingress创建一个 ingress virtualservice 并且将gate指定为isito-gateway,其他的ingress实现其实类似,只是目前没有traefik的支持。knative有没有istio现在是没啥区别了。

domain mappings

一个用于扩展域名的CRD DomainClaim,会根据domainclaim创建一个ingress。

queue-proxy

tracing metrics breaker 都是正常操作。本质是个sidecar层面的反向代理。

Metrcis

Admin

有一个给cocurrency endpoint发 paused 和 resumed 的回调。

Main

Proxy handler 收集 request in 和 request out。

自定义默认域名的问题

kubectl get cm config-network -n knative-serving -o yaml 可以看到默认的模板去修改他

1
domain-template: "{{.Name}}.{{.Namespace}}.{{.Domain}}"

WASM 作为一种通用字节码可以让很多语言编译成 WASM 跑在沙箱VM环境当中:跑在浏览器的 V8 引擎上,让别的语言可以写前端、
构建成 Istio 的代理 Envoy 的 WASM 插件形成一些过滤器或者日志插件(有点像内核的 eBPF)、作为各种语言的浇水层(让Rust调用Go编译出的WASM)。

The major problem is that, whilst the Go compiler supports WebAssembly, it does not support WASI (WebAssembly System Interface). It generates an ABI that is deeply tied to JavaScript, and one needs to use the wasm_exec.js file provided by the Go toolchain, which doesn’t work outside a JavaScript host.

Go 的 WASM 目前没有支持 WASI,类似于没有 native 的系统调用,所以很多文件、网络的请求不能通过系统执行,目前只能跑在浏览器通过浏览器的接口执行。
TinyGo 是一个支持比较好的 WASM 的 Go 的运行时,有些需要WASI的项目就需要TinyGo来构建。

调度

WASM 是一个单线程的环境,js 是一个基于事件的模型,对于 goroutine 的调度是如何进行的,本着这个好奇研究了一下 Go 本身的 WASM 运行时。比如下面这段代码就明确了 Go 的 WASM 没有多线程也就没有跑 sysmon。
没有sysmon就相当于sysmon的forcegc触发gc的部分也没有,只能靠主动GC和内存分配的时候触发超过阈值开始gc。

1
2
3
4
5
6
7
8
9
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
// For runtime_syscall_doAllThreadsSyscall, we
// register sysmon is not ready for the world to be
// stopped.
atomic.Store(&sched.sysmonStarting, 1)
systemstack(func() {
newm(sysmon, nil, -1)
})
}

在这里回忆一下 GMP 的关系。P的数量由启动时环境变量 $GOMAXPROCS 或者是由 runtime 的方法 GOMAXPROCS() 决定。这意味着在程序执行的任意时刻都只有 $GOMAXPROCS 个 goroutine 在同时运行。
M的数量可以通过 runtime/debug 中的 SetMaxThreads 函数,设置 M 的最大数量一个 M 阻塞了,会创建新的 M。
M 与 P 的数量没有绝对关系,一个 M 阻塞,P 就会去创建或者切换另一个 M,所以,即使 P 的默认数量是 1,也有可能会创建很多个 M 出来。
在确定了 P 的最大数量 n 后,运行时系统会根据这个数量创建 n 个 P。
没有足够的 M 来关联 P 并运行其中的可运行的 G。比如所有的 M 此时都阻塞住了,而 P 中还有很多就绪任务,就会去寻找空闲的 M,而没有空闲的,就会去创建新的 M。

这个关系在对应的代码里面也有体现,在 osinit 的时候会强制将 P 设置为1,newosproc 也是空的,无法启动新进程。

1
2
3
4
5
func osinit() {
ncpu = 1
getg().m.procid = 2
physPageSize = 64 * 1024
}

没有 sysmon 也就没有异步抢占,只能靠 goroutine 之间的协作式抢占来切换 goroutine。

在 schedule 里面也有一段专用的调度逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// wasm only:
// If a callback returned and no other goroutine is awake,
// then wake event handler goroutine which pauses execution
// until a callback was triggered.
gp, otherReady := beforeIdle(now, pollUntil)
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
if otherReady {
goto top
}

等于是在 wasm 中在找不到 G 的时候会启动一个 pause goroutine 占住当前的 P。

Lock_js 有 handle event 的逻辑。

内存分配

通过 WASM 的内存分配接口代替 linux 里面的 mmap 接管过来做内存分配。

系统调用

系统调用是使用 js 实现的,而且也不是像 linux 那种完整一套系统调用这个可能需要 WASI 的支持。
没有系统调用 M 也不会阻塞,等于是一个完全的单 P 单 M 多 G 的模型。

1
2
3
var jsProcess = js.Global().Get("process")
var jsFS = js.Global().Get("fs")
var constants = jsFS.Get("constants")

目前实现的版本是参照go0.x的一个实现也就是只有一个m,在m上调度g,对应的是我们只用一个goroutine然后在goroutine上调度多个gogoroutine。在实现gogoroutine之前需要介绍一下go的调用惯例,这个文章讲的比较清楚。目前Golang在传递参数和返回值的时候是通过栈传递的。gcc会使用寄存器传递参数,golang有一个使用寄存器传递参数的提案。如果要实现一个goroutine上的协程可以利用这么一个调用惯例。

上下文切换

Go的上下文切换其实和操作系统的上下文切换比较类似,go的实现也是类似的,但相对来说比较简单,因为Go的调用惯例其他的寄存器在函数调用的时候是没有被使用的,主要是保存栈指针和一个指令寄存器PC就可以。Golang的抢占一开始是协作式的,入口是在函数调用的时候。在引入了异步抢占也就是让信号处理函数去切换到调度逻辑(这个切换的过程也类似后面讲到的gogo和gosave,但是他的入口是任何一个指令的地方都会发送所以要保存所有的寄存器)实现抢占以后就可以实现非协作的抢占了。

现在实现的是相对简单的协作式抢占。

0.x的代码是C写的,其中两个汇编函数实现的上下文的保存和切换。这里简单先补充一下Go中汇编的一些知识点。

函数定义:

1
 TEXT 包名·函数名(SB),一些标签,$栈帧大小-参数大小(包括返回值)

SP表示栈指针,AX是对应的ax系列的寄存器。
保存上下文的汇编函数如下,gobuf是一个结构体有两个指针成员分别是sp和pc。

1
2
3
4
5
6
7
TEXT gosave(SB), NOSPLIT, $0
MOVQ 8(SP), AX // 8(SP)是函数的第一个参数:gobuf的地址
MOVQ SP, 0(AX) // 保存SP也就是栈指针到 gobuf.sp 中。
MOVQ 0(SP), BX // 0(SP)是函数的返回地址
MOVQ BX, 8(AX) // 将函数的返回地址保存到 gobuf.pc 中。
MOVL $0, AX // return 0
RET

这段函数其实主要是保存了gosave调用时的栈指针,而返回地址就是gosave返回后的下一条指令的地址,返回值是0,这个0可以标记到这个函数是从gosave返回的,这个要结合后面的gogo来理解。
现在来看gogo

1
2
3
4
5
6
7
TEXT gogo(SB), 7, $0
MOVQ 8(SP), AX // 8(SP)是gobuf这个参数的地址
MOVQ 0(AX), SP // 将栈针修改为之前保存的SP
MOVQ 8(AX), AX // 获取PC
MOVQ AX, 0(SP) // 把 PC 放到0(SP)上
MOVL $1, AX // return 1
RET

gogo返回以后其实是返回到了之前gosave需要返回的地方,并且返回值是1。这里用寄存器做返回值是c里面的一种方式。所以如果一个gosave是返回0那么它是从真正的调用者那里返回的,如果返回的是1就是从gogo返回的,如果这里点理解了以后就可以实现一个上下文切换了。
对应Go的函数调用的版本就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TEXT ·gogogo(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ 0(AX), SP // restore SP
MOVQ 8(AX), AX
MOVQ AX, 0(SP) // put PC on the stack
MOVL $1, 16(SP) // return true
RET


TEXT ·gosave(SB), NOSPLIT, $0-16
MOVQ 8(SP), AX // gogobuf
MOVQ SP, 0(AX) // save SP
MOVQ 0(SP), BX
MOVQ BX, 8(AX) // save PC
MOVB $0, 16(SP) // return false
RET

这面的区别是参数gobuf和返回值用了16个字节,因为Go的调用是用栈的,之前说到过。
0(SP)就是返回地址,8(SP)是gobuf,16(SP)是true或者false的返回地址。

gogoroutine的创建

原本函数做的两件事情就是分配栈和指定PC,pc只要对于一个新的gogoroutine指向函数指针就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void
sys·newproc(int32 siz, byte* fn, byte* arg0)
{
byte *stk, *sp;
G *newg;

//prints("newproc siz=");
//sys·printint(siz);
//prints(" fn=");
//sys·printpointer(fn);

siz = (siz+7) & ~7;
if(siz > 1024)
throw("sys·newproc: too many args");

lock(&sched);

if((newg = gfget()) != nil){
newg->status = Gwaiting;
stk = newg->stack0;
}else{
newg = mal(sizeof(G));
stk = mal(4096);
newg->stack0 = stk;
newg->status = Gwaiting;
newg->alllink = allg;
allg = newg;
}

到这就是分配了一段给栈用的内存,内存的具体分配后面会有一个文章讲解。

1
2
3
		// 160 这个地方是一个约定
// 一些小函数会利用栈之外的160个字节进行优化
newg->stackguard = stk+160;

上面多留出来的地方是一个x86的约定需要给出一个redzone给一些小函数优化用的,不用分配栈直接去使用栈外的这个redzone来增加效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   // 栈是从高地址向低地址增长的,至于这个 4*8是留给什么的没搞清
sp = stk + 4096 - 4*8;
newg->stackbase = sp;
// 拷贝参数
sp -= siz;
mcpy(sp, (byte*)&arg0, siz);
// 函数结束时返回到 goexit 进行收尾工作
sp -= 8;
*(byte**)sp = (byte*)sys·goexit;
// 留给 gogo 的修改返回地址用的地方
// 相当于假装在gogo的地方返回到了fn的函数指针
sp -= 8; // retpc used by gogo
newg->sched.SP = sp;
newg->sched.PC = fn;

sched.gcount++;
goidgen++;
newg->goid = goidgen;

readylocked(newg);
unlock(&sched);

//prints(" goid=");
//sys·printint(newg->goid);
//prints("\n");
}

对应的go代码,Go当中获取函数地址的方式比较tricky,需要了解interface的layout,他是一个interface的头和一个interface包含的对象的地址。FuncPC就是通过转换获取这个函数地址,对应的实现可以在Go的源码找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func NewProc(f interface{}, args ...interface{}) {
pc := FuncPC(f)
stack := Malloc(1024)
sp := stack + 1024 - 4*8
*(*uintptr)(unsafe.Pointer(sp - 8)) = FuncPC(goexit) + 1
gogoRoutine := GoGoRoutine{}
gogoRoutine.Sched.PC = pc
gogoRoutine.Sched.SP = sp - 8 - 8
gogoRoutine.Stack = stack
globalgoid++
gogoRoutine.goid = globalgoid
gogoRoutine.status = _Grunnable
ggput(&gogoRoutine)
}

调度主体

0.x版本的逻辑其实很简单,通过if(gosave)的方式可以判断是从哪里跳过来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Scheduler loop: find g to run, run it, repeat.
static void
scheduler(void)
{
G* gp;

// Initialization.
m->procid = getprocid();
lock(&sched);

if(gosave(&m->sched)){
// 这里的 gosave 返回的是 true
// 说明是通过 gogo 过来的
// 如果当前的 g 是 running 的话就保存上下文
// 切换成 runnable 放入到 queue 中
// 走出 if 去到调度逻辑。
// Jumped here via gosave/gogo, so didn'
// execute lock(&sched) above.
lock(&sched);

// Just finished running m->curg.
gp = m->curg;
gp->m = nil; // for debugger
switch(gp->status){
case Grunnable:
case Gdead:
// Shouldn't have been running!
throw("bad gp->status in sched");
case Grunning:
gp->status = Grunnable;
gput(gp);
break;
case Gmoribund:
gp->status = Gdead;
if(--sched.gcount == 0)
sys·exit(0);
break;
}
notewakeup(&gp->stopped);
}
// 真正的 gosave 是返回 false的。
// 这个地方是 gosave 的返回地址
// 也是 gogo 后 if 处理完的地方
// 在这里寻找合适的g然后运行。
// Find (or wait for) g to run. Unlocks sched.
gp = nextgandunlock();

noteclear(&gp->stopped);
gp->status = Grunning;
m->curg = gp;
gp->m = m; // for debugger
g = gp;
gogo(&gp->sched);
}

对应的go代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
   //go:noline
func schedule() {
if gosave(&sched.gg0.Sched) {
curgg := sched.curgg
switch curgg.status {
case _Grunnable:
panic("invalid status")
case _Grunning:
curgg.status = _Grunnable
ggput(curgg)
break
case _Gdead:
break
}
}
// 调度循环
for {
// println("find g")
gg := ggget()
if gg == nil {
time.Sleep(time.Second)
continue
}
gg.status = _Grunning
sched.curgg = gg
gogogo(&gg.Sched)
}

}

这样就能实现一个简单的单goroutine上跑多个gogoroutine的上下文切换了。

大规模的并发并行计算考虑的问题主要关于数据、计算的吞吐量还有容错。不管是设计者还是使用者,关心的是在期望上(因为可能会失败)运行任务的效率,从设计者的角度来讲要提供一个可靠高效的环境,从使用者的角度来讲这个东西需要足够的简单,简单意味着编程效率也保证了可靠性。大数据的整套技术栈在解决一些吞吐量大的批处理问题时得心应手,但是到了深度学习的场景,计算吞吐的要求反而提高了,比较现实点的情况是很多时候我们考虑的是能处理多大的数据量的日志,和多快能把一个ImageNet ResNet50训练完。这个区别在于 Spark 或者 Hadoop 的 MapReduce 的设计和 Tensorflow 以及 PyTorch 的设计理念不同以及对应的计算场景的不同。到了强化学习的场景又是一个大融合,强化学习的场景不光有迭代式的反向误差传播的计算,同时也包含了大规模的仿真环境的计算,Ray主要是在这方面提供了一个一站式的解决框架。分布式系统的问题某种程度上和操作系统的问题其实很类似,都是要考量如何从整个系统的角度充分利用物理资源,从错误中恢复,满足效率。这篇文章主要是比较一下几种主流的计算引擎或者框架或者说是算法的异同。

Spark(MapReduce)

我把Spark和MapReduce都归于大数据栈这一类,RDD(Resillient Distributed Dataset) 和 MapReduce
这两者的区别没有和MPI/PS的区别大那么多。

MapReduce 有 JobTracker,Spark 有 Driver

MapReduce 有 TaskTracker,Spark 有 Executor

MapReduce 中间结果是基于 HDFS,会落盘,Spark 中间结果是基于内存的,也可以落盘,主要是利用内存做缓存

MapReduce 计算抽象由Map和Reduce构成,Spark 的 RDD 有一系列的Transform和Action,封装程度更高

MapReduce 的错误处理比较简单,把失败的Map重试就好了,重试是一种非常好理解的错误处理。
Spark 的重试是根据 RDD 的有向无环图中的血缘关系计算的,可以理解为从失败的拓扑序上重新计算,也可以有中间的checkpoint。

RDD 的特性是只读的,在机器学习场景下参数不大的时候 MLLib 通过把参数存到 Driver 上来计算,当参数比较大特别是深度网络的参数大得吓人 Driver 存不下的时候,
只能通过新增 RDD,对于要频繁更新的模型参数要生成非常多的 RDD,这是 Spark 在深度学习上设计的缺陷。
一般来说一些简单的机器学习任务通过 sklearn 就能完成,当数据量比较大的时候就需要通过 Spark 的MLLib来处理。
当然 MLLib 现在也在开始从RDD-based转向SparkSQL用的Dataframe-based,从大的角度上讲两者互相融合是可行的,可能需要一些时间。
上 Spark 用 Yarn 调度 Tensorflow,还是用 Kuberenetes 调度 Spark 和 Tensorflow,我个人支持后者,而且这种分层是我比较喜欢的一种分层。

Ray

Ray 的基本抽象就是 Remote 或者 Actor,一个是分布式调用函数,一个式分布式调用类。Ray 和 Ray 的 RLlib 主要面对的问题是强化学习有大量的 simulating 的环境,比如仿真一局Dota,涉及到模拟一局Dota,反馈Agent的神经网络,是并行计算和神经网络训练的结合。当然 Ray 本身的抽象就是个分布式 goroutine,所以某种程度上可以完成的事情不光是强化学习一种任务,比如HypterTunning等一些并行计算的模型也是试用的。

反过来想,如果没有 Ray 的话,如何做这个系统呢,要构建大批量的仿真计算环境,然后能根据仿真的反馈训练神经网络。
这两个任务的调度控制就是一个问题,当然放到 k8s 的调度器里做似乎也可以,然后涉及这些分布式任务的同步问题,
需要构建这些任务的关系和信息传输,似乎用一些 DAG (比如 argo)的 workflow 也能解决,但他们之间通信的高效性似乎会是一个问题,需要
选择一种高效的远程调用传输方式,肯能gRPC也可以,还有他们的元数据管理用什么呢,自己搞个Redis似乎也行。
Ray 从这些方面综合考虑了这些问题提供了一个一站式的RL训练平台。

PS和MPI

MPI和PS的介绍有很多,我也不需要费篇幅唠叨。

PS和MPI是比较常用的分布式深度学习的训练方式,两者的主要区别在于Paramater Server,在PS的场景下参数统一走一个或者shard多个PS更新。

在MPI的场景下每个Worker是对等的(或者分层级对等,比如主机上的四张卡走NVLink,主机之间走万兆网卡)工作节点,使用AllReduce参数的同步在Worker之间进行。

总结

MPI在我出生前应该就有了,但是MapReduce之所以能火起来主要还是在 fault-tolerance 上,MPI的抽象比较基础,但是MapReduce和Spark在廉价大集群上的表现非常亮眼,对于没那么并行化理想的场景能够tolerate,时间到了深度学习的场景这玩意儿又冒出来了,一方面是深度学习的计算资源好得不得了,因为配备GPU的机器和传统的机器比起来好很多,对于这种纯粹的并行计算框架来说非常友好,错误处理的问题就没那么严重,即使是这样也慢慢开始有人着手优化MPI/PS的fault tolerance,在一些并行化退化的场景下能够把训练并行度降级不至于完全失败的工作。

从长远来看 Ray 还会有很多进化的空间,Spark 也会更好地适配深度学习场景,深度学习本身在System上的优化也层出不穷,大家对于大规模的并发并行计算系统的方方面面的要求都会越来越高。

贝叶斯优化(BO)

贝叶斯优化是一种黑盒优化方法,一般有几个特征:

输入纬度不大,一般小于 20 个,时间复杂度是$O(n^3)$
有设定的取值空间
目标函数需要是连续的(我觉得这个好像不是必须的,离散的也可以)
目标函数的计算非常消耗成本(时间等等)
目标函数是黑盒的,没有明确的结构
目标函数(derivative-free)没有一阶二阶导数(不然就可以用梯度下降去算了)

因为这些特性,在做机器学习的超参数调优的时候特别合适。

BayesOpt consists of two main components: a Bayesian statistical model for modeling the objective function, and an acquisition function for deciding where to sample next.

贝叶斯优化主要是要两个部分,一个是统计模型比如高斯过程(GP),一个是采样函数(AC)决定下一个样本从拿里获取。

高斯过程(GP)

高斯过程是贝叶斯优化中的一种统计模型,先抛开具体的数学问题,简单讲一下就是,假设我在一个变量空间(比如多个超参数)采样了一个目标函数(我们训练结果的 evaluation),然后我们会得到一个后验分布,这就类似条件概率里面,我们知道了某个事情发生以后,如果随机变量是相关的,我们有更大的概率确定其他变量的分布。例如下图,绿色是我们的分布空间,每次采样以后,分布的空间就会缩小,进而接近我们的曲线。

从数学上描述这件事情就需要高斯分布和高斯过程了。高斯分布我相信大家都耳熟能详,他由平均值 $\mu$ 还有标准差 $\sigma$ 决定分布。高斯过程的本质是一个多元高斯分布,只不过他是无限空间上的,定义高斯过程需要一个核函数定义他的协方差矩阵,也就是一个矩阵定义多个随机变量的相关性。当然了,有限的离散点用协方差矩阵可以,如果取值是连续的就需要核函数描述这个“无限”的协方差矩阵了。

协方差的核函数就很多选项,A Visual Exploration of Gaussian Processes 提供了一个非常 intuitive 的可视化来解释 GP 和各种描述协方差矩阵的核函数。

高斯过程被定义为一个随机过程,相当于每个样本点自己也是个随机函数,对于高斯分布来说,每个样本点也是一个高斯分布函数就是高斯过程,cs4780 的 Lecture 15有非常详细的解释。

Definition: A GP is a (potentially infinte) collection of random variables (RV) such that the joint distribution of every finite subset of RVs is multivariate Gaussian:
$ f \sim GP(\mu, k), $
where $\mu(\mathbf{x})$ and $k(\mathbf{x}, \mathbf{x}’)$ are the mean resp. covariance function! Now, in order to model the predictive distribution $P(f_* \mid \mathbf{x}_*, D)$ we can use a Bayesian approach by using a GP prior: $P(f\mid \mathbf{x}) \sim \mathcal{N}(\mu, \Sigma)$ and condition it on the training data $D$ to model the joint distribution of $f=f(X)$ (vector of training observations) and $f_* = f(\mathbf{x}_*)$ (prediction at test input).

采样函数

采样函数一般有 expected improvement(EI),当然还有 probability improvement(PI), upper confidence bound(UCB), knowledge gradient(KG),entropy search and predictive entropy search 等等。
采样的策略有两种:
Explore:探索新的点,这种采样有助于估计更准确的;
Exploit:利用已有结果附近的点进行采样,从而希望找到更大的;

这两个标准是互相矛盾的,如何在这两者之间寻找一个平衡点可以说是采样函数面对的主要挑战。

Expected improvement is a popular acquisition function owing to its good practical performance and an analytic form that is easy to compute. As the name suggests it rewards evaluation of the objective $f$ based on the expected improvement relative to the current best. If $f^* = \max_i y_i$ is the current best observed outcome and our goal is to maximize $f$, then EI is defined as

$\text{EI}(x) = \mathbb{E}\bigl[\max((f(x) - f^*), 0)\bigr]$

在 Facebook 的 Ax 在这里提到使用了 PyTorch 的一个 BoTorch 用的就是 EI,就是期望的增加度。

高斯分布式的期望就是 $\mu$ 所以是很好算的,$f^*$ 是已知的定值,这里有一个计算的推导。

BoTorch 提供了一个优化的 EI 叫 Noisy EI,主要功能是抗噪。

The above definition of the EI function assumes that the objective function is observed free of noise. In many types of experiments, such as those found in A/B testing and reinforcement learning, the observations are typically noisy. For these cases, BoTorch implements an efficient variant of EI, called Noisy EI, which allow for optimization of highly noisy outcomes, along with any number of constraints (i.e., ensuring that auxiliary outcomes do not increase or decrease too much). For more on Noisy EI, see our blog post.

这是采样的过程的一个示例:

应用

高斯过程本身可以用来回归和分类,使用高斯过程的贝叶斯优化有很多具体的应用场景,除了超参数优化之外,对于网络结果(层数,数据并行度)等等也是可以使用的。
除此之外像 horovod 也使用了贝叶斯优化,在这个目录下面

Horovod comes with several adjustable “knobs” that can affect runtime performance, including –fusion-threshold-mb and –cycle-time-ms (tensor fusion), –cache-capacity (response cache), and hierarchical collective algorithms –hierarchical-allreduce and –hierarchical-allgather.

他主要是服务于一些 Tensor Fusion 和 response cache 等参数以及层级 collective 通信选择,文档提到了通过设置一些参数细致控制调优的过程。

参考

  1. 一个例子搞清楚(先验分布/后验分布/似然估计)
  2. A Tutorial on Bayesian Optimization
  3. Practical Bayesian Optimization of Machine Learning Algorithms
  4. Awesome-AutoML-Papers
  5. Gaussian Processes for Machine Learning

背景

Horovod 是一个兼容主流计算框架的分布式机器学习训练框架,主要基于的算法是 AllReduce,这个是 baidu-research 在17年做的一个实现,这个东西原来是高性能计算范畴里的东西应用了 MPI 并行计算接口来实现,这是并行计算里的一个框架,已经很老了,这里有一个介绍 MPI 的 tutorial 写的比较好。

在介绍 horovod 的之前需要解释一下 AllReduce。在 MapReduce 里面 reduce 被翻译成了规约,在上面提到的 MPI tutorial 里面的解释是

Reduce is a classic concept from functional programming. Data reduction involves reducing a set of numbers into a smaller set of numbers via a function. For example, let’s say we have a list of numbers [1, 2, 3, 4, 5]. Reducing this list of numbers with the sum function would produce sum([1, 2, 3, 4, 5]) = 15. Similarly, the multiplication reduction would yield multiply([1, 2, 3, 4, 5]) = 120.

就是说把一个大的集合“缩减”成了小的集合,这里要注意的是这种缩减的计算是要满足交换律的,也就是减法或者除法是不行的,因为在并行计算当中不太好去控制计算的顺序。Reduce 就是这个意思,具体到 MPI_Reduce 就是把不同节点的数字“缩减”到一个节点上,支持的计算方式有加法乘法和取大小值等。

教程中给出的 Reduce 是求和。

AllReduce 就是在每个节点都获得 Reduce 的结果

基于这个标准就有很多的 All-Reduce 的实现,比如 Ring-Reduce,这个实现分两部分,一部分是 Scatter-Reduce 另一部分是 All-Gather。最早是在这篇 post里提到的。这个算法的好处是可以摆脱之前 PS 非常依赖 Parameter-Server 的带宽,Parameter-Server 的带宽会成为计算瓶颈的问题,而 AllReduce 可以让每个节点在带宽传输中的位置是对等的,并且减少传输次数。具体的算法可以看文章的解释,scatter-reduce 就是让每个节点有 K/N 的一个 reduce(也就是 sum),然后把自己的一个 K/N 的 reduce 再传递给其他节点,每个节点只和自己相邻的节点通信。

In the system we described, each of the N GPUs will send and receive values N-1 times for the scatter-reduce, and N-1 times for the allgather. Each time, the GPUs will send K / N values, where K is the total number of values in array being summed across the different GPUs. Therefore, the total amount of data transferred to and from every GPU is

Data Transferred=2(N−1)KN

数据传输量在 N 比较大的时候越没有影响,这就消弭了多节点给 Parameter-Server 造成的瓶颈。

还有一些其他术语,假设有 4 台 4 卡的 GPU 服务器。size 是工作进程(GPU)的数量(6),rank 是所有工作进程的 id(0-15),local rank 是当前服务器上的 id(0-3)。

Horovod 的介绍

使用 horovod 有一定的侵入性,代码需要一定的修改才能变成适配分布式训练,但是有一个好处就是适配的成本不高,并且 horovod 提供的各种框架的支持可以让 horovod 比较好的在各个框架的基础上使用,他支持 tensorflow/keras/mxnet/pytorch,MPI 的实现也有很多,比如 OpenMPI 还有 Nvidia 的 NCCL,还有 facebook 的 gloo,他们都实现了一种并行计算的通信和计算方式。而且 horovod 的本身的实现也很简单。

使用

Keras 用 ResNet50 训练 ImageNet 为例,主要侵入了几部分 hvd.init() 这个是 MPI 的初始化,让并行进程能够知道自己的 rank/local_rank 等信息。

第二部根据 local_rank(相当于单节点上的第n张卡),并且设置不占用全部显存,按需分配(可能因内没有统一管理导致显存碎片),然后传递给 keras 设置 session。

1
2
3
4
5
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))

然后在 rank 0 上恢复一个 checkpoint 并且广播给其他节点,这里的 broadcast 后面会介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
for try_epoch in range(args.epochs, 0, -1):
if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
resume_from_epoch = try_epoch
break

# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch')

# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0

设定传输的压缩函数,具体的压缩后面会提到,然后要么从之前的模型恢复要么重新训练。关键的 wrapper 在 opt 上,会给本地的 opt 包装一个 DistributedOptimizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast both model and optimizer weights
# to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch),
compression=compression)
else:
# ResNet-50 model that is included with Keras is optimized for inference.
# Add L2 weight decay & adjust BN settings.
model_config = model.get_config()
for layer, layer_config in zip(model.layers, model_config['layers']):
if hasattr(layer, 'kernel_regularizer'):
regularizer = keras.regularizers.l2(args.wd)
layer_config['config']['kernel_regularizer'] = \
{'class_name': regularizer.__class__.__name__,
'config': regularizer.get_config()}
if type(layer) == keras.layers.BatchNormalization:
layer_config['config']['momentum'] = 0.9
layer_config['config']['epsilon'] = 1e-5

model = keras.models.Model.from_config(model_config)

# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(),
momentum=args.momentum)

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt, compression=compression)

model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy', 'top_k_categorical_accuracy'])

然后设置一些回调函数,hvd.callbacks.BroadcastGlobalVariablesCallback(0) 保证的是 rank 0 上的所有参数只在 rank 0 初始化,然后广播给其他节点,后面是学习率 decay 的设置和一些统计信息的回调打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),

# Horovod: average metrics among workers at the end of every epoch.
#
# Note: This callback must be in the list before the ReduceLROnPlateau,
# TensorBoard, or other metrics-based callbacks.
hvd.callbacks.MetricAverageCallback(),

# Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose),

# Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3),
]

最后直接用 allreduce 计算一个 evaluation score。

1
2
# Evaluate the model on the full data set.
score = hvd.allreduce(model.evaluate_generator(input_fn(False, args.train_dir, args.val_batch_size),NUM_IMAGES['validation']))

实现

适配层和压缩算法

horovod 的实现主要分几部分,第一部分是一个适配层,用于兼容各种框架,比如 tensorflow 的适配就是实现一个新的 Op,这个可以参考 add new op,里面规范了 Tensorflow 自定义算子的实现。

请注意,生成的函数将获得一个蛇形名称(以符合 PEP8)。因此,如果您的操作在 C++ 文件中命名为 ZeroOut,则 Python 函数将称为 zero_out。

C++ 的定义是驼峰的,生成出来的 python 函数是下划线小写的,所以最后对应的是,适配Op的代码在 horovod/tensorflow 目录下面

C++ Python
HorovodAllgather horovod_allgather
HorovodAllreduce horovod_allreduce
HorovodBroadcast horovod_broadcast

另外在适配层可以加入一些压缩算法(在 horovod/[framework]/compression.py),我觉得压缩算法和框架无关的,放到适配层下面可能有别的原因,比如 tensorflow 默认带了一个 float16 压缩,具体的其他压缩算法比如3LC,可以通过有损压缩或者无损压缩提高带宽利用率。

统一层

这一层的实现是统一的,所有的适配层最后都是发出一些 Op+Tensor 的 Message 到队列中,后台初始化的时候会有一个专门的线程专门消费这个队列。他有一个同步消息的过程,相当于这个 tensor 在所有节点上都就绪以后就可以开始计算了,主体的流程是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// The coordinator currently follows a master-worker paradigm. Rank zero acts
// as the master (the "coordinator"), whereas all other ranks are simply
// workers. Each rank runs its own background thread which progresses in ticks.
// In each tick, the following actions happen:
//
// a) The workers send a Request to the coordinator, indicating what
// they would like to do (which tensor they would like to gather and
// reduce, as well as their shape and type). They repeat this for every
// tensor that they would like to operate on.
//
// b) The workers send an empty "DONE" message to the coordinator to
// indicate that there are no more tensors they wish to operate on.
//
// c) The coordinator receives the Requests from the workers, as well
// as from its own TensorFlow ops, and stores them in a [request table]. The
// coordinator continues to receive Request messages until it has
// received MPI_SIZE number of empty "DONE" messages.
//
// d) The coordinator finds all tensors that are ready to be reduced,
// gathered, or all operations that result in an error. For each of those,
// it sends a Response to all the workers. When no more Responses
// are available, it sends a "DONE" response to the workers. If the process
// is being shutdown, it instead sends a "SHUTDOWN" response.
//
// e) The workers listen for Response messages, processing each one by
// doing the required reduce or gather, until they receive a "DONE"
// response from the coordinator. At that point, the tick ends.
// If instead of "DONE" they receive "SHUTDOWN", they exit their background
// loop.

简单来讲就是说 coordinator 集 size 个 request DONE,然后找出就绪的 tensor (在 message_table 里面查找)构造出一个 read_to_reduce 的列表,然后发出 size 个 request 告知进程进行计算,然后 worker 接受到 response 开始真正的计算过程(通过 op_manager 具体执行)。

这是整体同步的过程,如果打开 horovod 的 trace log(HOROVOD_LOG_LEVEL=trace) 就能看到同步的过程。horovod 的主要 Op 除了 AllReduce 之外还有 allgather 和 broadcast。

算子实现层

具体的 op 在 common/op 可以看到有 NCCL/Gloo/MPI 等等的,这些由 op_manager 管理,他会根据优先级找到可以用来计算的 op 进行计算,比如 MPI 用的就是 MPI_Allreduce,具体 scatter-gather 和 all-gather openMPI 有现成的实现,NCCL 就直接调用 ncclAllReduce,比较新的 nccl 也支持跨节点的 allreduce 了,不用自己再套一层。

除了 allreduce 之外,还有两个比较重要的算子。

allgather 主要是比 allreduce 少一层 reduce,所有数据被发送到所有进程就可以。allreduce 的第二步就是把每个进程的 scatter-reduce 的 reduce 结果发送到所有进程。

broadcast 的作用是一对多的广播,主要是把初始化的参数同步给其他进程的时候使用。