ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

cgroup 的整体结构

cgroup 是容器当中对资源进行限制的机制,完整的名称是叫 control group。经常提到的 hierarchy 对应的是一个层级,而subsystem 对应的是一个子系统,都是可以望文生意的。创建一个层级是通过挂载完成的,也就是说层级对应的是文件系统 root 目录的结构。

子系统目前有下列几种

  1. devices 设备权限
  2. cpuset 分配指定的 CPU 和内存节点
  3. cpu 控制 CPU 占用率
  4. cpuacct 统计 CPU 使用情况
  5. memory 限制内存的使用上限
  6. freezer 暂停 Cgroup 中的进程
  7. net_cls 配合 tc(traffic controller)限制网络带宽
  8. net_prio 设置进程的网络流量优先级
  9. huge_tlb 限制 HugeTLB 的使用
  10. perf_event 允许 Perf 工具基于 Cgroup 分组做性能检测

创建层级通过 mount -t cgroup -o subsystems name /cgroup/name,/cgroup/name 是用来挂载层级的目录(层级结构是通过挂载添加的),-o 是子系统列表,比如 -o cpu,cpuset,memory,name 是层级的名称,一个层级可以包含多个子系统,如果要修改层级里的子系统重新 mount 即可。子系统和层级之间满足几个关系。

  1. 同一个 hierarchy 可以附加一个或多个 subsystem
  2. 一个 subsystem 可以附加到多个 hierarchy,当且仅当这些 hierarchy 只有这唯一一个 subsystem
  3. 系统每次新建一个 hierarchy 时,该系统上的所有 task 默认构成了这个新建的 hierarchy 的初始化 cgroup,这个 cgroup 也称为 root cgroup。对于你创建的每个 hierarchy,task 只能存在于其中一个 cgroup 中,即一个 task 不能存在于同一个 hierarchy 的不同 cgroup 中,但是一个 task 可以存在在不同 hierarchy 中的多个 cgroup 中。如果操作时把一个 task 添加到同一个 hierarchy 中的另一个 cgroup 中,则会从第一个 cgroup 中移除

/proc/self 对应的是当前进程的 proc 目录,比如当前进程 pid 是1,那么/proc/1/proc/self是等价的。运行man proc可以看到/proc/self/cgroup的解释。

/proc/[pid]/cgroup (since Linux 2.6.24)
This file describes control groups to which the process/task belongs. For each cgroup hierarchy there is one entry
containing colon-separated fields of the form:

5:cpuacct,cpu,cpuset:/daemons

The colon-separated fields are, from left to right:

1. hierarchy ID number

2. set of subsystems bound to the hierarchy

3. control group in the hierarchy to which the process belongs

This file is present only if the CONFIG_CGROUPS kernel configuration option is enabled.

这个展示的是当前进程属于的 control groups, 每一行是一排 hierarchy,中间是子系统,最后是受控制的 cgroup,可以通过这个文件知道自己所属于的cgroup。

cgroup 的创建

创建一个独立的 cgroup 则是在层级结构下面创建一个目录。

先看一下创建目录做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static int cgroup_mkdir(struct inode *dir, struct dentry *dentry, int mode)
{
struct cgroup *c_parent = dentry->d_parent->d_fsdata;

/* the vfs holds inode->i_mutex already */
return cgroup_create(c_parent, dentry, mode | S_IFDIR);
}

static long cgroup_create(struct cgroup *parent, struct dentry *dentry,
mode_t mode)
{
/* 获取父cgroup对应的层级hierarchy */
/* 这里叫做 cgroupfs 其实是匹配的,因为创建 hierachy 就是 mount 了一个文件系统的动作 */
struct cgroupfs_root *root = parent->root;
/* 初始化一个cgroup结构 cgrp */
init_cgroup_housekeeping(cgrp);
cgrp->parent = parent; /* 设置父 cgroup */
cgrp->root = parent->root; /* 继承 parent 的 hierachy */
cgrp->top_cgroup = parent->top_cgroup; /* 继承父 parent 对应 hierachy 的root cgroup */
/* 继承父 parent 的 notify_on_release 设置 */
if (notify_on_release(parent))
set_bit(CGRP_NOTIFY_ON_RELEASE, &cgrp->flags);
/* 对所属的hierachy的子系统进行初始化 */
for_each_subsys(root, ss) {
struct cgroup_subsys_state *css = ss->create(ss, cgrp);
if (IS_ERR(css)) {
err = PTR_ERR(css);
goto err_destroy;
}
init_cgroup_css(css, ss, cgrp);
if (ss->use_id)
if (alloc_css_id(ss, parent, cgrp))
goto err_destroy;
/* At error, ->destroy() callback has to free assigned ID. */
}
/* 加入到父 cgroup 的子列表里 */
cgroup_lock_hierarchy(root);
list_add(&cgrp->sibling, &cgrp->parent->children);
cgroup_unlock_hierarchy(root);
/* 创建 cgroup 目录 */
cgroup_create_dir(cgrp, dentry, mode);
/* 创建目录下对应的文件,比如common的部分(tasks),或者子系统的部分(cpu.shares,freezer.state)*/
cgroup_populate_dir(cgrp);

看一下 cgroup_subsys_state->create 的实现,举个例子比如kernel/cpuset.ccpuset子系统的创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static struct cgroup_subsys_state *cpuset_create(
struct cgroup_subsys *ss,
struct cgroup *cont)
{
struct cpuset *cs;
struct cpuset *parent;

if (!cont->parent) {
return &top_cpuset.css;
}
parent = cgroup_cs(cont->parent);
cs = kmalloc(sizeof(*cs), GFP_KERNEL);
if (!cs)
return ERR_PTR(-ENOMEM);
if (!alloc_cpumask_var(&cs->cpus_allowed, GFP_KERNEL)) {
kfree(cs);
return ERR_PTR(-ENOMEM);
}

cs->flags = 0;
if (is_spread_page(parent))
set_bit(CS_SPREAD_PAGE, &cs->flags);
if (is_spread_slab(parent))
set_bit(CS_SPREAD_SLAB, &cs->flags);
set_bit(CS_SCHED_LOAD_BALANCE, &cs->flags);
cpumask_clear(cs->cpus_allowed);
nodes_clear(cs->mems_allowed);
fmeter_init(&cs->fmeter);
cs->relax_domain_level = -1;

cs->parent = parent;
number_of_cpusets++;
return &cs->css ;
}

其实就是一系列 cgroup 初始化动作,填充目录的部分作为接口留给子系统实现。

总结一下:

hierarchy 对应的是 cgroup 的一个根,拥有一个top_cgroup,之后 hierarchy 下面的目录(cgroup)都是继承这些内容。

真正起作用的入口其实是对文件的读写操作,关于这一块VFS的内容可以看一下我之前的博客,这也是由子系统实现的,接下来看看子系统的实现。

freezer 子系统

freeze tasks 的相关内容可以在内核文档当中找到,简单来说是为了提供一种机制能够让进程挂起。这些函数在电源控制里面有很多用到的地方,比如我们常说的挂起,就是让所有进程进入冬眠状态。

首先看这个子系统是因为它比较简单,属性比较少,实现的代码也比较少。

这里铺垫一些知识,说明内核是如何睡眠和唤醒进程的。
一般内核进程进入睡眠需要进入wait_queue,然后调用 schedule。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* wait 是我们想要让 task 睡眠的 queue entry, q 是等待队列 */
DEFINE_WAIT(wait);
/* 添加到等待队列中 */
add_wait_queue(q, &wait);

/*
* 这里要检查condition是因为可能在唤醒之后这个condition的条件又不成立了,
* 这个和条件变量一样,即使条件满足被wake up了,也可能被其他进程修改了该条件.
*/
while (!condition) {
prepare_to_wait(&q, &wait, TASK_INTERRUPTIBLE);
if (signal_pending(current))
/* 处理信号 */
/* 进行调度 */
schedule();
}

finish_wait(&g, &wait);

内核当中进入睡眠都是这个模板,这里的 schedule,会触发调度器遍历 scheduler class,选择优先级最高的调度类。
一般就是CFS(Complete Fair Scheduler),接下来会进行 context 切换,注意不是像一般理解的那样,函数调用增长栈空间,而是把栈和寄存器都换掉,刷掉缓存等等,由此进入另外一个进程的上下文。直到被唤醒从恢复保存的 IP 重新开始执行。

唤醒的过程则是,调用wake_up()函数把 task 的状态重新设置为 TASK_RUNNING,并且把task从等待队列移除。
它会使用 enqueue_task() 把任务从新加入到调度器中。如果是 CFS 调度器的话就是加入到红黑树中,当need_resched设置了的话会显式调用调用schedule()调度,不然还会继续执行唤醒者的上下文。

然后说一下Freezing of tasks,就是通过发送信号唤醒用户态的进程和内核进程,所有这些进程需要响应这个信号并且最后调用 refrigerator() 进入睡眠,也就上面的那个循环。
下图是进入冬眠进程的过程.

“冰箱”这个函数名称很形象,就是把当前 task 丢入睡眠状态直到解封。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* Refrigerator is place where frozen processes are stored :-). */
void refrigerator(void)
{
/* Hmm, should we be allowed to suspend when there are realtime
processes around? */
long save;

task_lock(current);
if (freezing(current)) {
frozen_process();
task_unlock(current);
} else {
task_unlock(current);
return;
}
save = current->state;
pr_debug("%s entered refrigerator\n", current->comm);

spin_lock_irq(&current->sighand->siglock);
recalc_sigpending(); /* We sent fake signal, clean it up */
spin_unlock_irq(&current->sighand->siglock);

/* prevent accounting of that task to load */
current->flags |= PF_FREEZING;

for (;;) {
set_current_state(TASK_UNINTERRUPTIBLE);
if (!frozen(current))
break;
schedule();
}

/* Remove the accounting blocker */
current->flags &= ~PF_FREEZING;

pr_debug("%s left refrigerator\n", current->comm);
__set_current_state(save);
}

所以 freezer subsystem 干的事情就是这样一件事情,把 cgroup 中的进程进行挂起和恢复。现在具体看一下实现。

1
2
3
4
5
6
7
8
9
10
11
enum freezer_state {           
CGROUP_THAWED = 0,
CGROUP_FREEZING,
CGROUP_FROZEN,
};

struct freezer {
struct cgroup_subsys_state css;
enum freezer_state state;
spinlock_t lock; /* protects _writes_ to state */
};

freezer 有三种状态,THAWED,FREEZING,FROZEN,分别代表正常状态,停止中和停止。

freezer 对应的文件有state,cftype是对vfs的file结构的一个封装,最后加上子系统的name,文件名对应的就是”freezer.state”。
freezer.state更改文件内容的操作就可以更改cgroup当中task的挂起和恢复.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* 文件的读写函数 */
static struct cftype files[] = {
{
.name = "state",
.read_seq_string = freezer_read,
.write_string = freezer_write,
},
};

/* 添加子系统文件到cgroup目录中 */
static int freezer_populate(struct cgroup_subsys *ss, struct cgroup *cgroup)
{
if (!cgroup->parent)
return 0;
return cgroup_add_files(cgroup, ss, files, ARRAY_SIZE(files));
}

首先看一下freezer_read.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int freezer_read(struct cgroup *cgroup, struct cftype *cft,
struct seq_file *m)
{
struct freezer *freezer;
enum freezer_state state;

if (!cgroup_lock_live_group(cgroup))
return -ENODEV;

freezer = cgroup_freezer(cgroup);
spin_lock_irq(&freezer->lock);
state = freezer->state;
if (state == CGROUP_FREEZING) {
/* We change from FREEZING to FROZEN lazily if the cgroup was
* only partially frozen when we exitted write. */
update_freezer_state(cgroup, freezer);
state = freezer->state;
}
spin_unlock_irq(&freezer->lock);
cgroup_unlock();

seq_puts(m, freezer_state_strs[state]);
seq_putc(m, '\n');
return 0;
}

整个函数就是把 freezer 的 state 转换成字符换然后读取出来。

再看下 freezer_write 是如何改变进程状态的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int freezer_write(struct cgroup *cgroup,
struct cftype *cft,
const char *buffer)
{
int retval;
enum freezer_state goal_state;

if (strcmp(buffer, freezer_state_strs[CGROUP_THAWED]) == 0)
goal_state = CGROUP_THAWED;
else if (strcmp(buffer, freezer_state_strs[CGROUP_FROZEN]) == 0)
goal_state = CGROUP_FROZEN;
else
return -EINVAL;

if (!cgroup_lock_live_group(cgroup))
return -ENODEV;
retval = freezer_change_state(cgroup, goal_state);
cgroup_unlock();
return retval;
}

其实只是把写入的字符串转换成对应的枚举类型,然后调用freezer_change_state(cgroup, goal_state);

为了不贴过多的代码,这里略写,其实是根据类型不同进行调用了unfreeze_cgrouptry_to_freeze_cgroup

try_to_freeze_cgroup 遍历每个task执行freeze操作,而unfreeze也是类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int try_to_freeze_cgroup(struct cgroup *cgroup, struct freezer *freezer)
{
struct cgroup_iter it;
struct task_struct *task;
unsigned int num_cant_freeze_now = 0;

freezer->state = CGROUP_FREEZING;
cgroup_iter_start(cgroup, &it);
while ((task = cgroup_iter_next(cgroup, &it))) {
/* 尝试freeze task */
if (!freeze_task(task, true))
continue;
if (is_task_frozen_enough(task))
continue;
if (!freezing(task) && !freezer_should_skip(task))
num_cant_freeze_now++;
}
cgroup_iter_end(cgroup, &it);

return num_cant_freeze_now ? -EBUSY : 0;
}

所以,这里我们看最后的freeze和unfreeze某个task的动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool freeze_task(struct task_struct *p, bool sig_only)
{
/*
* We first check if the task is freezing and next if it has already
* been frozen to avoid the race with frozen_process() which first marks
* the task as frozen and next clears its TIF_FREEZE.
*/
if (!freezing(p)) {
rmb();
/* 如果frozen标记了
* 说明已经冻结,就返回失败
*/
if (frozen(p))
return false;

if (!sig_only || should_send_signal(p))
set_freeze_flag(p);
else
return false;
}

if (should_send_signal(p)) {
if (!signal_pending(p))
fake_signal_wake_up(p);
} else if (sig_only) {
return false;
} else {
wake_up_state(p, TASK_INTERRUPTIBLE);
}

return true;
}

停止的方式就是通过标记 freeze_flag,然后通过发送信号或者唤醒 task 来处理TIF_FREEZE标记(取决于是否设置了PF_FREEZER_NOSIG)。
最后又回到了之前给的那张流程图,等处理函数运行又会try_to_freeze()检查信号或者标志位,然后进入冰箱,而唤醒的方式则是反过来,把标记清除并且wake_up进程即可。

cpu 子系统

cpu子系统是对CPU时间配额进行限制的子系统,属性在这里列举一下

  • cpu.cfs_period_us 完全公平调度器的调整时间配额的周期
  • cpu.cfs_quota_us 完全公平调度器的周期当中可以占用的时间
  • cpu.stat 统计值
    • nr_periods 进入周期的次数
    • nr_throttled 运行时间被调整的次数
    • throttled_time 用于调整的时间
  • cpu.share cgroup中cpu的分配,如果a group是100,b group是300,那么a就会获得\(\frac{1}{4}\),b就会获得\(\frac{3}{4}\)的CPU。

CFS 调度器

接下来看一下对于 CPU 的限制是如何做到的,这要补充一下 CFS(完全公平调度器) 的相关的内容

CFS 保证进程之间完全公平获得 CPU 的份额,和我们传统的操作系统的时间片的理念不同,CFS 计算进程的 vruntime (其实就是总时间中的比例,并且带上进程优先级作为权重),来选择需要调度的下一个进程。用户态暴露的权重就是nice值,这个值越高权重就会低,反之亦然。(坊间的解释是 nice 的意思就是我对别的进程很 nice ,所以让别的进程多运行一会儿,自己少运行一会儿)。

CFS主要有几点,时间计算,进程选择,调度入口。

时间计算

先看下面这句话

Linux is a multi-user operating system. Consider a scenario where user A spawns ten tasks and user B spawns five. Using the above approach, every task would get ~7% of the available CPU time within a scheduling period. So user A gets 67% and user B gets 33% of the CPU time during their runs. Clearly, if user A continues to spawn more tasks, he can starve user B of even more CPU time. To address this problem, the concept of “group scheduling” was introduced in the scheduler, where, instead of dividing the CPU time among tasks, it is divided among groups of tasks.

总结来说 CPU 的时间并不是分给独立的 task 的,而是分给 task_group 的,这样防止用户 A 的进程数远远大于 B 而导致 B 饥饿的情况。这一组task通过 sched_entity 来表示。能够导致进程分组的方式一种是把进程划入一个cgroup,一种是通过set_sid()系统调用的新session中创建的进程会自动分组,这需要CONFIG_SCHED_AUTOGROUP编译选项开启。

调度的粒度是以sched_entity为粒度的,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
struct sched_entity {
struct load_weight load; /* for load-balancing */
struct rb_node run_node;
struct list_head group_node;
unsigned int on_rq;

u64 exec_start;
u64 sum_exec_runtime;
u64 vruntime;
u64 prev_sum_exec_runtime;

u64 nr_migrations;

#if defined(CONFIG_SMP) && defined(CONFIG_FAIR_GROUP_SCHED)
/* Per-entity load-tracking */
struct sched_avg avg;
#endif
#ifdef CONFIG_SCHEDSTATS
struct sched_statistics statistics;
#endif

#ifdef CONFIG_FAIR_GROUP_SCHED
struct sched_entity *parent;
/* rq on which this entity is (to be) queued: */
struct cfs_rq *cfs_rq;
/* rq "owned" by this entity/group: */
struct cfs_rq *my_q;
#endif
};

每个调度实体都有两个cfs_rq结构

1
2
3
4
5
6
7
   struct cfs_rq {
struct load_weight load;
unsigned long runnable_load_avg;
unsigned long blocked_load_avg;
unsigned long tg_load_contrib;
/* ... */
};

Each scheduling entity may, in turn, be queued on a parent scheduling entity’s run queue. At the lowest level of this hierarchy, the scheduling entity is a task; the scheduler traverses this hierarchy until the end when it has to pick a task to run on the CPU.

最底层的调度实体就是进程,而每个调度实体还会有两个cfs_rq,一个是cfs_rq另一个是my_q,前者是当前调度实体从属的rq,后者他自己的rq,所有的子调度实体都在这个rq上,从而构成了树形结构。可以通过cfs_rq遍历调度实体,而把自己的时间平分给my_q的调度实体.

1
2
3
4
5
6
7
   struct task_group {
struct sched_entity **se;
struct cfs_rq **cfs_rq;
unsigned long shares;
atomic_long_t load_avg;
/* ... */
};

Tasks belonging to a group can be scheduled on any CPU. Therefore it is not sufficient for a group to have a single scheduling entity; instead, every group must have one scheduling entity for each CPU. Tasks belonging to a group must move between the run queues in these per-CPU scheduling entities only, so that the footprint of the task is associated with the group even during task migrations.

单独的sched_entity为了适应SMP结构,又引入了task_group结构,包含了数组,分别属于某个CPU,对于一个进程想要从CPU1迁移到CPU2,只要把进程从tg->cfs_rq[0]转移到tg->cfs_rq[1]即可,一种 percpu 的结构。

优先级有一个映射表,表示调度占的权重,一般nice值为0的时候,大家都是1024,但是nice值为1的时候,权重就会降低到820,对于所有1024权重的进程,就会享有更少的时间,这个映射体现的是每提升一个等级,相差值大约为10%。

下面是 nice 值到权重的映射,这是内核普通进程的优先级范围(100-139),内核拥有140个优先级。

1
2
3
4
5
6
7
8
9
10
static const int prio_to_weight[40] = {
/* -20 */ 88761, 71755, 56483, 46273, 36291,
/* -15 */ 29154, 23254, 18705, 14949, 11916,
/* -10 */ 9548, 7620, 6100, 4904, 3906,
/* -5 */ 3121, 2501, 1991, 1586, 1277,
/* 0 */ 1024, 820, 655, 526, 423,
/* 5 */ 335, 272, 215, 172, 137,
/* 10 */ 110, 87, 70, 56, 45,
/* 15 */ 36, 29, 23, 18, 15,
};

调度实体中包含了一个结构就是表示这个权重的值,表示进程占的权重。

1
2
3
   struct load_weight {
unsigned long weight;
};

最后 time_slice = (sched_period() * se.load.weight) / cfs_rq.load.weight; 就是 se 运行时应该分配到的 CPU 时间的份额(sched_periodcfs.nr_running调度最小粒度时间,理想要每个进程都能运行一次)。

另外,用于衡量CPU负载的方式是通过sched_entity中的sched_avg结构,这个结构用于记录负载情况:

1
2
3
4
struct sched_avg {
u32 runnable_sum, runnable_avg_period;
unsigned long load_avg_contrib;
};

sched_entity是一个进程的时候,计算公式是sa.load_avg_contrib = (sa.runnable_sum * se.load.weight) / sa.runnable_period;(sesched_entitysasched_avg),runnbale_sum 是处于 RUNNING 状态的时间,runnable_period 表示可以变成运行状态的时间段。runnable_load_avgcfs_rq中用于统计所有seload的合,以此来表示CPU负载。blocked_load_avg 是相应的进程处于阻塞状态的负载。

sched_entity是一组进程的时候,计算方式是,
首先提取task group

1
tg = cfs_rq->tg;

之前已经统计了队列中的所有se的总和runnable_load_avg,然后全部累加到tg中。

1
2
cfs_rq->tg_load_contrib = cfs_rq->runnable_load_avg + cfs_rq->blocked_load_avg;
tg->load_avg += cfs_rq->tg_load_contrib;

最后se的值是通过在tg中的比重得到的,这里的tg->shares是最大允许的负载。

1
2
se->avg.load_avg_contrib =
(cfs_rq->tg_load_contrib * tg->shares / tg->load_avg);

进程选择

完全公平调度器的进程选择其实很简单,通过调用pick_next_entity()每次选择 vruntime 最小的进程进行运行。装载进程的结构选择的是红黑树,并且最左下角的结点是个特殊的节点,被保存起来,这样防止每次都从 root 一直搜索到最左下角,每次选择进程的时候直接选择该节点即可。

每次 vruntime 会在时钟中断和任何进程运行状态发生改变的时候进行计算,方式是通过权重调整得到一个 delta 值然后加到 vruntime 上面。
公式是vruntime += delta_exec * (NICE_0_LOAD/curr->load.weight);。这里的 weight 取决于 shares 值和负载等等因素的综合结果。

通过enqueue_entity()可以把进程加入到红黑树当中,当进程被唤醒的时候,或者第一次调用fork的时候就会被调用这个函数。具体就是更新了统计数据,并且把调度节点插入到红黑树当中。如果正好插入到了最右下角,那么就能马上被运行了。

通过dequeue_entity()可以把调度结点从红黑树中删除,这是进程在阻塞或者终止的时候会被调用的函数,具体就是把调度节点移除红黑树并且调整红黑树。

调度入口

内核的调度入口就是schedule(),遍历所有的调度类(因为内核中调度器的实现不只一种),选择权重最高的调度类并且进行进程选择,然后执行该进程。

这里列举一下所有抢占可能发生的时机

  1. 用户态进程:
    • 从系统调用返回
    • 从中断返回
  2. 内核态进程:
    • 从中断返回内核态
    • 进程主动调用schedule()
    • 进程变为可抢占状态(没有持有锁,其实还是中断驱动的)
    • 进程阻塞(最后还是调用schedule)

具体看 cgroup 的 cpu 子系统

补充完调度器的知识,再回来看 cgroup 是如何对进程做运行时间限制的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static struct cftype cpu_files[] = {
#ifdef CONFIG_FAIR_GROUP_SCHED
{
.name = "shares",
.read_u64 = cpu_shares_read_u64,
.write_u64 = cpu_shares_write_u64,
},
#endif
#ifdef CONFIG_CFS_BANDWIDTH
{
.name = "cfs_quota_us",
.read_s64 = cpu_cfs_quota_read_s64,
.write_s64 = cpu_cfs_quota_write_s64,
},
{
.name = "cfs_period_us",
.read_u64 = cpu_cfs_period_read_u64,
.write_u64 = cpu_cfs_period_write_u64,
},
{
.name = "stat",
.read_map = cpu_stats_show,
},
#endif

cpu 子系统也是用对文件进行读写的接口,其实就是获取了 cgroup 的 subsystem 的从属的 task_group,并且读取或者设置了 quota_usperiod_write 以及 shares 属性。具体这些属性应用的地方在调度器内部。task_group是一个管理组调度的结构。

因为内嵌了一个cgroup_subsys_state,这样cgroup就能通过自己的css成员反找到这个task_group

看一下cpu_shares_write_u64的实现

1
2
3
cpu_shares_write_u64 实际调用的是
-> sched_group_set_shares(cgroup_tg(cgrp), scale_load(shareval))
-> update_cfs_shares 获取 cgroup 的task group结构,调整权重
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void update_cfs_shares(struct cfs_rq *cfs_rq)
{
struct task_group *tg;
struct sched_entity *se;
long shares;

tg = cfs_rq->tg;
se = tg->se[cpu_of(rq_of(cfs_rq))];
if (!se || throttled_hierarchy(cfs_rq))
return;
#ifndef CONFIG_SMP
if (likely(se->load.weight == tg->shares))
return;
#endif
/* 根据tg->shares 和 rq 的负载计算出新的权重 */
shares = calc_cfs_shares(cfs_rq, tg);

reweight_entity(cfs_rq_of(se), se, shares);
}

最后一步 reweight_entity 就是调整se->load.weight的权重,从这里来保证shares能够调整进程可以获得的运行时间。当然除了这里,任何发生调度的地方都会有这样的行为,只不过我们主动修改了shares的值。

下图表示了展示了sharestask group中的作用。

对于cpu.cfs_period_uscpu.cfs_quota_us,是关于CPU bandwith的内容,论文CPU bandwidth control for CFS详细描述了其中的设计。论文中举例提到,shares 值只是使得CPU 的时间能够平均分配,但是实际运行时间可能会有变化,不能限制一个进程运行的上限。

在调度实体sched_entity中内嵌了一个结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct cfs_bandwidth {
#ifdef CONFIG_CFS_BANDWIDTH
raw_spinlock_t lock;
ktime_t period;
/* quota 是被赋予的时间,runtime 是实际运行的时间 */
u64 quota, runtime;
s64 hierarchal_quota;
/* 到期时间 */
u64 runtime_expires;

int idle, timer_active;
struct hrtimer period_timer, slack_timer;
struct list_head throttled_cfs_rq;

/* statistics */
int nr_periods, nr_throttled;
u64 throttled_time;
#endif
};

在每次调度的时候(无论是时间中断还是其他调度时间导致 enqueue 或者 dequeue),都会调用account_cfs_rq_runtime(),runtime 相当于实际使用的 quota , 在论文里说的是account_cfs_rq_quota(),对占用时间更新,计算剩余可以运行的时间,如果不够,则进行限制,标记为不可调度。其中内含一个高精度定时器period_timer定时扫描进程,把限制的进程解除,并给予更多的bandwidth以继续运行,period就是计时器的周期,每次都会更新可运行的时间。注意这里用的时间是真实时间.

另外,cpu.stat 主要是控制过程中的统计信息,是只读属性,比如被限制了多少次等等,具体的代码分析就直接略过。

cpuacct 子系统

cpuacct 比较简单,因为主要是一些统计信息

  • cpuacct.stat cgroup 及子消耗在用户态和内核态的CPU循环次数
  • cpuacct.usage cgroup 消耗的CPU总时间
  • cpuacct.usage_percpu cgroup在每个CPU上消耗的总时间

kernel/sched/cpuacct.c下有具体实现。

1
2
3
4
5
6
7
/* track cpu usage of a group of tasks and its child groups */
struct cpuacct {
struct cgroup_subsys_state css;
/* cpuusage holds pointer to a u64-type object on every cpu */
u64 __percpu *cpuusage;
struct kernel_cpustat __percpu *cpustat;
};

其中定义了per-cpu结构,让每个CPU都独占了一个用于统计的值,算是CPU的私有变量。

接口如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static struct cftype files[] = {
{
.name = "usage",
.read_u64 = cpuusage_read,
.write_u64 = cpuusage_write,
},
{
.name = "usage_percpu",
.read_seq_string = cpuacct_percpu_seq_read,
},
{
.name = "stat",
.read_map = cpuacct_stats_show,
},
{ } /* terminate */
};

每次调度update_curr,都会调用cpuacct_charge更新cpuacct中的值,作为统计数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* charge this task's execution time to its accounting group.
*
* called with rq->lock held.
*/
void cpuacct_charge(struct task_struct *tsk, u64 cputime)
{
struct cpuacct *ca;
int cpu;
/* 获取当前task属于的cpu */
cpu = task_cpu(tsk);

rcu_read_lock();
/* task的cpuacct结构 */
ca = task_ca(tsk);
/* 所有父节点的值都应该相应变化
* 上溯父节点更新统计值
*/
while (true) {
u64 *cpuusage = per_cpu_ptr(ca->cpuusage, cpu);
*cpuusage += cputime;

ca = parent_ca(ca);
if (!ca)
break;
}

rcu_read_unlock();
}

在时钟中断的时候会最终调用cpuacct_account_field()来更新kcpustat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* Add user/system time to cpuacct.
*
* Note: it's the caller that updates the account of the root cgroup.
*/
void cpuacct_account_field(struct task_struct *p, int index, u64 val)
{
struct kernel_cpustat *kcpustat;
struct cpuacct *ca;

rcu_read_lock();
ca = task_ca(p);
while (ca != &root_cpuacct) {
kcpustat = this_cpu_ptr(ca->cpustat);
kcpustat->cpustat[index] += val;
ca = __parent_ca(ca);
}
rcu_read_unlock();
}

cpuacct 算是一个简单的子系统,多是统计信息。

cpuset 子系统

cpuset 子系统用于分配独立的内存节点和CPU节点,这个主要应用与NUMA结构里面,多内存节点属于结构,先看一下cpuset的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct cpuset {
struct cgroup_subsys_state css;

unsigned long flags; /* "unsigned long" so bitops work */
cpumask_var_t cpus_allowed; /* CPUs allowed to tasks in cpuset */
nodemask_t mems_allowed; /* Memory Nodes allowed to tasks */

struct fmeter fmeter; /* memory_pressure filter */

/*
* Tasks are being attached to this cpuset. Used to prevent
* zeroing cpus/mems_allowed between ->can_attach() and ->attach().
*/
int attach_in_progress;

/* partition number for rebuild_sched_domains() */
int pn;

/* for custom sched domain */
int relax_domain_level;

struct work_struct hotplug_work;
};
  • cpuset.cpus cpu结点限制
  • cpuset.mems 内存结点限制
  • cpuset.memory_migrate 内存结点改变是否迁移
  • cpuset.cpu_exclusive 指定的限制是否是独享的,除了父节点或者子节点,不会和其他cpuset有交集
  • cpuset.mem_exclusive 指定的限制是否是独享的,除了父节点或者子节点,不会和其他cpuset有交集
  • cpuset.memory_pressure 换页压力的比率统计
  • cpuset.mem_hardwall 限制内核内存分配的结点,mems是限制用户态的分配
  • cpuset.memory_spread_page 把page cache分散到分配的各个结点中,而不是当前运行的结点.
  • cpuset.memory_spread_slab 把fs相关的slab的对象(inode和dentry)分散到结点中.
  • cpuset.sched_load_balance 打开调度CPU的负载均衡,这里指的是cpuset拥有的sched_domain,默认全局的CPU调度是本来就有负载均衡的。
  • cpuset.sched_relax_domain_level
  • cpuset.memory_pressure_enabled 计算换页压力的开关,注意,这个属性在top_group里面才有

cpus_allowedmems_allowed就是允许分配的内存节点和CPU节点的掩码

分配内存的时候调用栈是alloc_pages()->alloc_pages_current()->__alloc_pages_nodemask(),直到寻找可分配结点的时候会调用 zref_in_nodemask 来判断是否可以分配在该结点。

1
2
3
4
5
6
7
8
9
static inline int zref_in_nodemask(struct zoneref *zref, nodemask_t *nodes)
{
#ifdef CONFIG_NUMA
return node_isset(zonelist_node_idx(zref), *nodes);
#else
return 1;
#endif /* CONFIG_NUMA */
}

从这个函数也可以看到如果编译选项带了CONFIG_NUMA才会起作用,不然返回的永远都是真。

分散file cacheslab cache的方式是通过设置标志位来实现的。

Setting the flag ‘cpuset.memory_spread_page’ turns on a per-process flag
PFA_SPREAD_PAGE for each task that is in that cpuset or subsequently
joins that cpuset. The page allocation calls for the page cache
is modified to perform an inline check for this PFA_SPREAD_PAGE task
flag, and if set, a call to a new routine cpuset_mem_spread_node()
returns the node to prefer for the allocation.

Similarly, setting ‘cpuset.memory_spread_slab’ turns on the flag
PFA_SPREAD_SLAB, and appropriately marked slab caches will allocate
pages from the node returned by cpuset_mem_spread_node().

内存分配向结点的传播,都是通过设置标志PFA_SPREAD_PAGE或者PFA_SPREAD_SLAB来标记的,这个时候对应的函数cpuset_mem_spread_nodecpuset_mem_spread_node会返回希望分配的结点,举个例子,cpuset_mem_spread_node会从允许的节点中随机返回一个值,以达到分配对象分散在结点当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int cpuset_spread_node(int *rotor)
{
int node;

node = next_node(*rotor, current->mems_allowed);
if (node == MAX_NUMNODES)
node = first_node(current->mems_allowed);
*rotor = node;
return node;
}

int cpuset_mem_spread_node(void)
{
if (current->cpuset_mem_spread_rotor == NUMA_NO_NODE)
current->cpuset_mem_spread_rotor =
node_random(&current->mems_allowed);

return cpuset_spread_node(&current->cpuset_mem_spread_rotor);
}

对于CPU结点的控制是通过修改cpus_allowed来控制的,在task被唤醒的时候选择运行的rq时就会对掩码做判断,这是调度类需要实现的接口select_task_rq,比如CFS的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
* The caller (fork, wakeup) owns p->pi_lock, ->cpus_allowed is stable.
*/
static inline
int select_task_rq(struct task_struct *p, int sd_flags, int wake_flags)
{
int cpu = p->sched_class->select_task_rq(p, sd_flags, wake_flags);

/*
* In order not to call set_task_cpu() on a blocking task we need
* to rely on ttwu() to place the task on a valid ->cpus_allowed
* cpu.
*
* Since this is common to all placement strategies, this lives here.
*
* [ this allows ->select_task() to simply return task_cpu(p) and
* not worry about this generic constraint ]
*/
if (unlikely(!cpumask_test_cpu(cpu, tsk_cpus_allowed(p)) ||
!cpu_online(cpu)))
cpu = select_fallback_rq(task_cpu(p), p);

return cpu;
}

load_balance 设置的是cpusetCS_SCHED_LOAD_BALANCE标志,之后会调用update_cpumask,这个标志的更新会调用rebuild_sched_domains_locked(),会引起sched_domain的分配。当然这不是唯一的sched_domain重新划分的触发点,触发点有一下几点。

  1. 绑定了CPU并且该标记改变
  2. 这个标记为enable,绑定CPU发生改变
  3. 绑定了CPU,这个标记为enable,标记cpuset.sched_relax_domain_level发生改变
  4. 绑定了CPU,并且该标记设置了,但是cpuset被删除了
  5. CPU 转变 offline/online 状态

简单说一下[sched_domain](https://www.ibm.com/developerworks/cn/linux/l-cn-schldom/ sched domain 的详细内容)的作用,其实就是划定了负载均衡的 CPU 范围,默认是有一个全局的sched_domain,对所有 CPU 做负载均衡的,现在再划分出一个sched_domain把 CPU 的某个子集作为负载均衡的单元。
每个 Scheduling Domain 其实就是具有相同属性的一组 CPU 的集合. 并且跟据 Hyper-threading, Multi-core, SMP, NUMA architectures 这样的系统结构划分成不同的级别,不同级之间通过指针链接在一起, 从而形成一种的树状的关系, 如下图所示。

调度器会调用partition_sched_domains()来更新自己的scehd_domains,调度域发生作用的地方是在时钟中断的时候会触发SCHED_SOFTIRQ对任务做迁移,或者p->sched_class->select_task_rq,会在选择运行 CPU 时进行抉择,看一下 CFS 的实现的select_task_rq的简化流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 向上遍历更高层次的domain,如果发现同属一个domain
// 就是affine目标
for_each_domain(cpu, tmp) {
/*
* If both cpu and prev_cpu are part of this domain,
* cpu is a valid SD_WAKE_AFFINE target.
*/
if (want_affine && (tmp->flags & SD_WAKE_AFFINE) &&
cpumask_test_cpu(prev_cpu, sched_domain_span(tmp))) {
affine_sd = tmp;
break;
}

if (tmp->flags & sd_flag)
sd = tmp;
}
// 如果上面的条件满足,从prev_cpu中选出一个idle的new_cpu来运行.
if (affine_sd) {
if (cpu != prev_cpu && wake_affine(affine_sd, p, sync))
prev_cpu = cpu;
// 在同一个级别的sched domain向下找到一个idle的CPU.
new_cpu = select_idle_sibling(p, prev_cpu);
// 快速路径,有idle的CPU就不用负载均衡了.
goto unlock;
}
// 遍历层级
while (sd) {
// 找到负载最小的CPU
group = find_idlest_group(sd, p, cpu, load_idx);
if (!group) {
sd = sd->child;
continue;
}

new_cpu = find_idlest_cpu(group, p, cpu);
/* 如果最闲置的CPU没有变的话,或者没有找到的话,就向下遍历.
if (new_cpu == -1 || new_cpu == cpu) {
/* Now try balancing at a lower domain level of cpu */
sd = sd->child;
continue;
}

/* Now try balancing at a lower domain level of new_cpu */
cpu = new_cpu;
weight = sd->span_weight;
sd = NULL;
// 如果选出的节点weight比其他节点都大的话.
// 再向下一个层级遍历.
for_each_domain(cpu, tmp) {
if (weight <= tmp->span_weight)
break;
if (tmp->flags & sd_flag)
sd = tmp;
}
/* while loop will break here if sd == NULL */
}


负载均衡的对象有个例外。

CPUs in “cpuset.isolcpus” were excluded from load balancing by the
isolcpus= kernel boot option, and will never be load balanced regardless
of the value of “cpuset.sched_load_balance” in any cpuset.

如果boot选项标记了该CPU,会无视sched_load_balance的设置。

cpuset.sched_relax_domain_level有几个等级,越大越优先,表示迁移时搜索CPU的范围,这个主要开启了负载均衡选项的时候才有用。

  • -1 : no request. use system default or follow request of others. 用默认的或者按照其他组的优先级来.
  • 0 : no search,不搜索.
  • 1 : search siblings (hyperthreads in a core,搜索CPU当中的超线程).
  • 2 : search cores in a package.(搜索CPU当中的核).
  • 3 : search cpus in a node [= system wide on non-NUMA system]
  • 4 : search nodes in a chunk of node [on NUMA system]
  • 5 : search system wide [on NUMA system]

memory 子系统

看完 cpu 部分再开看一下内存子系统是如何做限制的

memory 子系统的参数比较多

  • memory.usage_in_bytes # 当前内存中的 res_counter 使用量
  • memory.memsw.usage_in_bytes # 当前内存和交换空间中的 res_counter 使用量
  • memory.limit_in_bytes # 设置/读取 内存使用量
  • memory.memsw.limit_in_bytes # 设置/读取 内存加交换空间使用量
  • memory.failcnt # 读取内存使用量被限制的次数
  • memory.memsw.failcnt # 读取内存和交换空间使用量被限制的次数
  • memory.max_usage_in_bytes # 最大内存使用量
  • memory.memsw.max_usage_in_bytes # 最大内存和交换空间使用量
  • memory.soft_limit_in_bytes # 设置/读取内存的soft limit
  • memory.stat # 统计信息
  • memory.use_hierarchy # 设置/读取 层级统计的使能
  • memory.force_empty # trigger forced move charge to parent?
  • memory.pressure_level # 设置内存压力通知
  • memory.swappiness # 设置/读取 vmscan swappiness 参数?
  • memory.move_charge_at_immigrate # 设置/读取 controls of moving charges?
  • memory.oom_control # 设置/读取 内存超限控制信息
  • memory.numa_stat # 每个numa节点的内存使用数量
  • memory.kmem.limit_in_bytes # 设置/读取 内核内存限制的hard limit
  • memory.kmem.usage_in_bytes # 读取当前内核内存的分配
  • memory.kmem.failcnt # 读取当前内核内存分配受限的次数
  • memory.kmem.max_usage_in_bytes # 读取最大内核内存使用量
  • memory.kmem.tcp.limit_in_bytes # 设置tcp 缓存内存的hard limit
  • memory.kmem.tcp.usage_in_bytes # 读取tcp 缓存内存的使用量
  • memory.kmem.tcp.failcnt # tcp 缓存内存分配的受限次数
  • memory.kmem.tcp.max_usage_in_bytes # tcp 缓存内存的最大使用量

对于大部分的数据是通过res_counter来保存的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* The core object. the cgroup that wishes to account for some
* resource may include this counter into its structures and use
* the helpers described beyond
*/

struct res_counter {
/*
* the current resource consumption level
*/
unsigned long long usage;
/*
* the maximal value of the usage from the counter creation
*/
unsigned long long max_usage;
/*
* the limit that usage cannot exceed
*/
unsigned long long limit;
/*
* the limit that usage can be exceed
*/
unsigned long long soft_limit;
/*
* the number of unsuccessful attempts to consume the resource
*/
unsigned long long failcnt;
/*
* the lock to protect all of the above.
* the routines below consider this to be IRQ-safe
*/
spinlock_t lock;
/*
* Parent counter, used for hierarchial resource accounting
*/
struct res_counter *parent;
};

获取方式是通过该结构相关的封装接口提供的,比如mem_cgroup_usage就是通过res_counter_red_u64来获取对应的res_counterRES_USAGE对应的值的,也就是unsigned long long usage这个成员。(如果不是root,还会递归获取rss和page cache的合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static inline u64 mem_cgroup_usage(struct mem_cgroup *memcg, bool swap)
{
u64 val;

if (!mem_cgroup_is_root(memcg)) {
if (!swap)
return res_counter_read_u64(&memcg->res, RES_USAGE);
else
return res_counter_read_u64(&memcg->memsw, RES_USAGE);
}

/*
* Transparent hugepages are still accounted for in MEM_CGROUP_STAT_RSS
* as well as in MEM_CGROUP_STAT_RSS_HUGE.
*/
// 如果是root就把所有的内存使用量都算进来.
val = mem_cgroup_recursive_stat(memcg, MEM_CGROUP_STAT_CACHE);
val += mem_cgroup_recursive_stat(memcg, MEM_CGROUP_STAT_RSS);

if (swap)
val += mem_cgroup_recursive_stat(memcg, MEM_CGROUP_STAT_SWAP);

return val << PAGE_SHIFT;
}

struct mem_cgroup 是负责内存 cgroup 的结构,简化的表示是

1
2
3
4
5
6
7
8
struct mem_cgroup {
struct cgroup_subsys_state css; // 通过css关联cgroup.
struct res_counter res; // mem统计变量
res_counter memsw; // mem+sw的和
struct res_counter kmem; // 内核内存统计量
...
}

这些参数的入口都在mm/memcontrol.c下,比如说memory.usage_in_bytes的读取调用的是mem_cgroup_read函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static ssize_t mem_cgroup_read(struct cgroup *cont, struct cftype *cft,
struct file *file, char __user *buf,
size_t nbytes, loff_t *ppos)
{
// 获取cgroup对应的mem_cgroup.
struct mem_cgroup *memcg = mem_cgroup_from_cont(cont);
char str[64];
u64 val;
int name, len;
enum res_type type;

// 获取读取的类型,memory.usage_in_bytes就是_MEM
type = MEMFILE_TYPE(cft->private);
// 名称
name = MEMFILE_ATTR(cft->private);

switch (type) {
case _MEM:
if (name == RES_USAGE)
val = mem_cgroup_usage(memcg, false);
else
val = res_counter_read_u64(&memcg->res, name);
break;
case _MEMSWAP:
if (name == RES_USAGE)
val = mem_cgroup_usage(memcg, true);
else
val = res_counter_read_u64(&memcg->memsw, name);
break;
case _KMEM:
val = res_counter_read_u64(&memcg->kmem, name);
break;
default:
BUG();
}

len = scnprintf(str, sizeof(str), "%llu\n", (unsigned long long)val);
return simple_read_from_buffer(buf, nbytes, ppos, str, len);
}

接下来再看一下这些值是在什么时候统计的,统计的入口是mem_cgroup_charge_common(),如果统计值超过限制就会在cgroup内进行回收。调用者分别是缺页时调用的mem_cgroup_newpage_charge 和 page cache 相关的mem_cgroup_cache_charge

简单复习一下内存分配的过程,来自维基百科

创建进程fork(), 程序载入execve(), 映射文件mmap(), 动态内存分配 malloc()/brk() 等进程相关操作都需要分配内存给进程。不过这时进程申请和获得的还不是实际内存,而是虚拟内存,准确的说是“内存区域”。进程对内存区域的分配最终都会归结到 do_mmap() 函数上来(brk调用被单独以系统调用实现,不用do_mmap()),内核使用 do_mmap() 函数创建一个新的线性地址区间。但是说该函数创建了一个新 VMA 并不非常准确,因为如果创建的地址区间和一个已经存在的地址区间相邻,并且它们具有相同的访问权限的话,那么两个区间将合并为一个。如果不能合并,那么就确实需要创建一个新的 VMA 了。但无论哪种情况,do_mmap() 函数都会将一个地址区间加入到进程的地址空间中–无论是扩展已存在的内存区域还是创建一个新的区域。同样,释放一个内存区域应使用函数 do_ummap(),它会销毁对应的内存区域。当进程需要内存时,从内核获得的仅仅是虚拟的内存区域,而不是实际的物理地址,进程并没有获得物理内存,获得的仅仅是对一个新的线性地址区间的使用权。实际的物理内存只有当进程真的去访问新获取的虚拟地址时,才会由”请求页机制”产生”缺页”异常,从而进入分配实际页面的例程

和下面来自内核文档

与用户进程相似,内核也有一个名为 init_mm 的 mm_strcut 结构来描述内核地址空间,其中页表项 pdg=swapper_pg_dir包含了系统内核空间(3G-4G)的映射关系。因此 vmalloc 分配内核虚拟地址必须更新内核页表,而kmalloc或get_free_page由于分配的连续内存,所以不需要更新内核页表。[13]

内存页的分配是基于伙伴分配系统,也就是基于2的阶乘通过拆分大阶乘的连续页和合并小阶乘的连续页来管理物理内存的方式,这在任何一本操作系统的书里都会讲到,我之前的博客也详细分析了。

当进程进入缺页异常的时候就会分配具体的物理内存,当物理内存使用超过高水平线以后,换页daemon(kswapd)就会被唤醒用于把内存交换到交换空间以腾出内存,当内存恢复至高水平线以后换页daemon进入睡眠。

缺页异常的入口是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static int __do_fault(struct mm_struct *mm, struct vm_area_struct *vma,
unsigned long address, pmd_t *pmd,
pgoff_t pgoff, unsigned int flags, pte_t orig_pte)
{
pte_t *page_table;
spinlock_t *ptl;
struct page *page;
struct page *cow_page;
pte_t entry;
int anon = 0;
struct page *dirty_page = NULL;
struct vm_fault vmf;
int ret;
int page_mkwrite = 0;

/*
* If we do COW later, allocate page befor taking lock_page()
* on the file cache page. This will reduce lock holding time.
*/
if ((flags & FAULT_FLAG_WRITE) && !(vma->vm_flags & VM_SHARED)) {

if (unlikely(anon_vma_prepare(vma)))
return VM_FAULT_OOM;

/* 分配内存并且映射到内存区间 */
cow_page = alloc_page_vma(GFP_HIGHUSER_MOVABLE, vma, address);
if (!cow_page)
return VM_FAULT_OOM;
/* 进行统计 */
if (mem_cgroup_newpage_charge(cow_page, mm, GFP_KERNEL)) {
page_cache_release(cow_page);
return VM_FAULT_OOM;
}
} else
cow_page = NULL;

mem_cgroup_newpage_charge则调用mem_cgroup_charge_common

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int mem_cgroup_newpage_charge(struct page *page,
struct mm_struct *mm, gfp_t gfp_mask)
{
if (mem_cgroup_disabled())
return 0;
// 不应该关联到页表
VM_BUG_ON(page_mapped(page));
// 对应用户态地址,但是不是匿名页
VM_BUG_ON(page->mapping && !PageAnon(page));
// mm 为空
VM_BUG_ON(!mm);
return mem_cgroup_charge_common(page, mm, gfp_mask,
MEM_CGROUP_CHARGE_TYPE_ANON);
}

mem_cgroup_charge_common内容是,返回ret < 0则是OOM。第一步是调用__mem_cgroup_try_charge查看当前使用量是否超过内存限制,如果超过就进行内存回收。第二步如果成功就调用__mem_cgroup_commit_charge添加统计值 ,不然就返回无法分配内存的错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int mem_cgroup_charge_common(struct page *page, struct mm_struct *mm,
gfp_t gfp_mask, enum charge_type ctype)
{
struct mem_cgroup *memcg = NULL;
unsigned int nr_pages = 1;
bool oom = true;
int ret;
if (PageTransHuge(page)) {
nr_pages <<= compound_order(page);
VM_BUG_ON(!PageTransHuge(page));
/*
* Never OOM-kill a process for a huge page. The
* fault handler will fall back to regular pages.
*/
oom = false;
}

ret = __mem_cgroup_try_charge(mm, gfp_mask, nr_pages, &memcg, oom);
if (ret == -ENOMEM)
return ret;
__mem_cgroup_commit_charge(memcg, page, nr_pages, ctype, false);
return 0;
}

__mem_cgroup_try_charge最终会调用mem_cgroup_do_charge,省略代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static int mem_cgroup_do_charge(struct mem_cgroup *memcg, gfp_t gfp_mask,
unsigned int nr_pages, unsigned int min_pages,
bool invoke_oom)
{
unsigned long csize = nr_pages * PAGE_SIZE;
struct mem_cgroup *mem_over_limit;
struct res_counter *fail_res;
unsigned long flags = 0;
int ret;
// 更新res计数器
ret = res_counter_charge(&memcg->res, csize, &fail_res);

if (likely(!ret)) {
if (!do_swap_account)
return CHARGE_OK;
// 计数成功,如果开启swap计数,记录memsw.
ret = res_counter_charge(&memcg->memsw, csize, &fail_res);
if (likely(!ret))
return CHARGE_OK;
// swap计数失败,退回res的计数
res_counter_uncharge(&memcg->res, csize);
// 获取fail_res对应的memcg,也就是计数失败的memcg.
mem_over_limit = mem_cgroup_from_res_counter(fail_res, memsw);
flags |= MEM_CGROUP_RECLAIM_NOSWAP;
} else
mem_over_limit = mem_cgroup_from_res_counter(fail_res, res);
// 回收内存
ret = mem_cgroup_reclaim(mem_over_limit, gfp_mask, flags);
// 告诉上层重试一次,可能回收了一些内存
if (nr_pages <= (1 << PAGE_ALLOC_COSTLY_ORDER) && ret)
return CHARGE_RETRY;
if (invoke_oom)
// 进入oom的处理
mem_cgroup_oom(mem_over_limit, gfp_mask, get_order(csize));

mem_cgroup_reclaim的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static unsigned long mem_cgroup_reclaim(struct mem_cgroup *memcg,
gfp_t gfp_mask,
unsigned long flags)
{
unsigned long total = 0;
bool noswap = false;
int loop;

if (flags & MEM_CGROUP_RECLAIM_NOSWAP)
noswap = true;
if (!(flags & MEM_CGROUP_RECLAIM_SHRINK) && memcg->memsw_is_minimum)
noswap = true;

for (loop = 0; loop < MEM_CGROUP_MAX_RECLAIM_LOOPS; loop++) {
if (loop)
drain_all_stock_async(memcg);
total += try_to_free_mem_cgroup_pages(memcg, gfp_mask, noswap);
/*
* Allow limit shrinkers, which are triggered directly
* by userspace, to catch signals and stop reclaim
* after minimal progress, regardless of the margin.
*/
if (total && (flags & MEM_CGROUP_RECLAIM_SHRINK))
break;
if (mem_cgroup_margin(memcg))
break;
/*
* If nothing was reclaimed after two attempts, there
* may be no reclaimable pages in this hierarchy.
*/
if (loop && !total)
break;
}
return total;
}

RSSpage_fault的时候记录,page cache是插入到inoderadix-tree中才记录的。
RSS在完全unmap的时候减少计数,page cachepage在离开inoderadix-tree才减少计数。
即使RSS完全unmap,也就是被kswapd给换出,可能作为SwapCache存留在系统中,除非不作为SwapCache,不然还是会被计数。
一个换入的page不会马上计数,只有被map的时候才会,当进行换页的时候,会预读一些不属于当前进程的page,而不是通过page fault,所以不在换入的时候计数。

补充:

  • why ‘memory+swap’ rather than swap.
    The global LRU(kswapd) can swap out arbitrary pages. Swap-out means
    to move account from memory to swap…there is no change in usage of
    memory+swap. In other words, when we want to limit the usage of swap without
    affecting global LRU, memory+swap limit is better than just limiting swap from
    an OS point of view.[12]

使用memoery+swap来统计而不是光统计swap,是因为kswapd换出的page只是从内存到了交换空间而已 ,在不影响kswpad的单页内存池LRU的情况下,这样的统计更有意义。

内核内存是不会被换出的。只有在被限制的时候才会开始计数。并且限制不能在已经有进程或者有子cgroup的情况下设置。
计数部分

When use_hierarchy == 1 and a group is accounted, its children will
automatically be accounted regardless of their limit value.

总结

目前总结了调度相关和内存相关的 cgroup 代码,可以看出 cgroup 本身其实主要是在一些 hook 的地方做检查,真正的控制的执行者还是调度器和内存分配器本身,cgroup 只是统计数据并且在必要的时候触发调度和内存回收等等。接下来我会就网络的部分进行一些分析,希望能够把完整的各个 cgroup 的子系统都能够解析一下。

参考:

  1. 《Docker 进阶与实战》
  2. http://abcdxyzk.github.io/download/kernel/Linux_Physical_Memory_Page_Allocation.pdf

首先介绍一下 hugepage 的背景。

一般来说操作系统分配内存的最小单元是页,一般是 4KB 大小,但是这个页放到现在来说可能有点“不够用”,因为很多程序内存消耗很大,分配内存很频繁,所以选择更大的页可以提升性能,大页带来的好处很多,首先是页表的层次可以减少,增加访存的速度,其次是减少 TLB miss 的概率,同时 page fault 也会减少,减少到 hugepage size / 4KB (x86_64 一般有 2MB 的 hugepage 和 1GB 的 hugepage)。例如下面这张图就说明了 hugepage 带来的改变。文章中的代码使用的是 4.x 的内核版本。

hugepage 有两种类型,一种是 THP(Transparent Huge Page) ,顾名思义,就是对用户来说对这种大页是无感知的,它本身可以被分成 4KB 的小页,并且可以被 swap out,有一个 Khugepaged 周期性扫描 4KB 的页合并成大页。

另一种大页是 persistent hugepage,这种 page 是预先分配的并且不能拆分成 4KB 小页,而且不能 swap out。这种的隐患是可能在内存 fragmentation 太多分不出大页的时候压缩小页,这在内存分配有压力的时候会造成很大的性能影响。

使用 persistent hugepage 可以通过 echo 512 | sudo tee /sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages,这会预先分配 512 个 2MB 的大页。具体使用是通过 fs/hugetlbfs 下实现的hugetlbfs来实现的。应用程序需要通过mmap进行文件映射来使用这些大页。具体的使用方式内核附带的一个测试可以作为参考。

这个 echo 触发的是 sysfsnr_hugepages_store_common, 它会设置最大的 hugepage 个数(存在 hstate 中,一个保存 hugepage 状态的结构体),h->max_huge_pages = set_max_huge_pages(h, count, nodes_allowed);set_max_huge_pages 有一个副作用就是调用 alloc_fresh_huge_page 来(分配或者减少)大页以达到 count 个。比如说分配会调用 ret = alloc_fresh_huge_page(h, nodes_allowed);,然后加入到 hstate 的 freelist 当中, 减少则是相反的,如果 freelist 上没有就会触发 buddysystem 的 __alloc_buddy_huge_page_no_mpol

我们来看一下具体的实现,首先关键的结构体是 hugetlbfs_file_operations , 其中规定了 mmap 函数,也就是当我打开 hugetlbfs 文件系统下的文件对对应的 fd 调用 mmap 的时候触发的对应的函数。

1
2
3
4
5
6
7
8
9
const struct file_operations hugetlbfs_file_operations = {
.read_iter = hugetlbfs_read_iter,
.mmap = hugetlbfs_file_mmap,
.fsync = noop_fsync,
.get_unmapped_area = hugetlb_get_unmapped_area,
.llseek = default_llseek,
.fallocate = hugetlbfs_fallocate,
};

在解释 mmap 之前,先复习一下进程空间内存的相关的内容。进程的内存空间是通过mm_struct这个结构体管理的。进程的地址空间,基本上是一些松散的区间,每个区间有相同的功能和保护属性(只读等属性),这个区间用 vm_area_struct 表示。再来看 hugetlbfs_file_mmap 中的一段代码。把申请的虚拟空间的地址长度按照大页对齐以后,保留对应个数的大页。

1
2
3
4
5
6
7
8
if (hugetlb_reserve_pages(inode,
vma->vm_pgoff >> huge_page_order(h),
len >> huge_page_shift(h), vma,
vma->vm_flags))
goto out;

ret = 0;

hugetlb_reserve_pages 要处理两种逻辑,如果是 VM_MAYSHARE 就从 inode 中取出 resv_map 并且获取分配长度,不然就使用vma的长度,然后用 hugepage_subpool_get_pageshugepage_subpool 中减掉对应的spool->rsv_hpages, 这个个人感觉也不是池子,只是一个统计数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct hstate *h = hstate_inode(inode); // 获取 hugepage state,里面保存了 hugepage 的相关信息
struct hugepage_subpool *spool = subpool_inode(inode); // 写获取 hugepage 的 pool, pool 其实只是个数字 rsv_pages。
...
// 然后根据 flag 算出要使用的长度 chg,我猜应该是 charge 的缩写。
if (!vma || vma->vm_flags & VM_MAYSHARE) {
resv_map = inode_resv_map(inode);

chg = region_chg(resv_map, from, to);

} else {
resv_map = resv_map_alloc();
if (!resv_map)
return -ENOMEM;

chg = to - from;

set_vma_resv_map(vma, resv_map);
set_vma_resv_flags(vma, HPAGE_RESV_OWNER);
}
...
// 最后从 hugepage 的 pool 减掉对应的个数。
gbl_reserve = hugepage_subpool_get_pages(spool, chg);
// 确定有足够的 hugepage 如果没有就从 buddysystem 里面取出来。
ret = hugetlb_acct_memory(h, gbl_reserve);
// 把 page 和 vma 做映射
region_add(resv_map, from, to);

do_page_fault 一直调用到 __handle_mm_fault 的时候,如果对应的 vma 是大页分配的,会直接进到hugetlb_fault,最后在缺页的时候会调用hugetlb_no_page,然后调用 alloc_huge_page_node,就会看到 __alloc_buddy_huge_page,开始走 buddy system, buddy system ,这里的 buddy system 没有细说,可以参考我之前的一篇文章

补充:

你可以理解mm_struct管理着一个vm_area_struct的链表,而 vm_area_struct 对应的主要操作函数如下(还有很多,这里只列了和本文相关的函数指针)。

1
2
3
4
5
6
struct vm_operations_struct {
void (*open)(struct vm_area_struct * area);
void (*close)(struct vm_area_struct * area);
int (*fault)(struct vm_area_struct *vma, struct vm_fault *vmf);
};

对应到 hugetlbfs 就是如下的 hugetlb_vm_ops 它会在对文件 mmap 的时候,进行初始化vma->vm_ops = &hugetlb_vm_ops;

1
2
3
4
5
6
const struct vm_operations_struct hugetlb_vm_ops = {
.fault = hugetlb_vm_op_fault,
.open = hugetlb_vm_op_open,
.close = hugetlb_vm_op_close,
};

但是注意,hugetlb_vm_op_fault 如果被触发说明有 BUG,因为 hugepage 的 page fault 是在do_page_fault里面独立处理的,不会调用到vm_operations_structfault接口。mmap 走的流程是 mmap -> mmap_region -> make_pages_presetn -> get_user_pages -> handle_page_fault -> handle_mm_fault -> hugetlb_fault

以上是大页的分配和使用的流程,希望对大家有帮助。

参考:

  1. Linux KVM concept - Memory
  2. Linux 内核中大页的实现与分析,第 1 部分

不同的隔离级别有不同的约束,而且不一定是子集和超集的关系,约束可能是交集的。这里尝试循序渐进的加强锁的级别和实现方式来阐述各个隔离级别的区别,由此可以选择在实际开发中适合业务的隔离策略。我自己最近也在调研 cockroachdb 比较关心事务这方面的性能,cockroachdb 的事务主要是两个级别,SI 和 SSI,会在下面提到。

总视图

这是各个隔离级别的一个关系,可以看出不一定是包含与被包含的关系(因为完整的隔离级别不只 4 种)。

定义

长期锁:到事务结束就释放的锁
短期锁:对相关数据操作完成就释放的锁

这里提到的写锁和排他锁可以互换,读锁和共享锁可以互换,长期锁也被称为二阶段锁,就是事务某个时候锁上了算一个阶段,最后一起释放算一个阶段。

断言型:基于先判的锁的修饰词,比如 WHERE 语句指定的范围就是预测型,如果没有 WHERE 可能就是整张表。

如果没有断言修饰的话,锁就是有指定数据的锁,也就是有明确索引的锁。

缩写 P(Phenomena) A(Anomalies)

P0 dirty write (脏写)

现象:

我们最开始的阶段是一切皆有可能发生,没有任何锁,所以碰到的第一个问题是脏写。

脏写导致的问题是,破坏数据的一致性,一个事务 (T1) 如果覆盖了另一个正在执行事务 (T2) 之前写入的值的时候就会导致脏写。比如 T1 写入 x=y=1 并且 T2 写入 x=y=2,但是整个数据的一致性就被破坏了。

解决:

对 x 和 y 持有长期写锁。基本上没有事务不拿长期写锁的,不然数据库连回退的可能都没有。防止脏写以后会出现新的现象,脏读。

P1 dirty read (脏读 read uncommited)

现象

事务 (T1) 写入的值被正在执行的事务 (T2) 读取。比如 x 向 y 转 40,在 x 写入以后,T2 看到的总和只有 60。

网上很多人把这个级别算作了脏写其实不是很严格,最开始那个 P0 级别才算是脏写。

解决

使用短期读锁和长期写锁,长期写锁可以防止 T2 的 X 读数据,短期读锁可以防止 T1 的 y 写不了(如果使用长期读锁就被阻塞到 T1 结束了)。解决脏读问题,我们又面临的问题是不可重复读

P2 non-repeatable read (不可重复读)

现象

当正在执行的事务 (T1) 读取的值被事务 (T2) 写入的时候,对 T1 来说就出现了不一致。例如下图,X 在被读取之后又被 T2 写入,这个时候总的钱数出现了不一致。

解决

使用长期读锁和长期写锁,也就是 T2 的 X 要等 T1 提交之后才能写入。对于不可重复读的问题。不过短期的断言型读锁也是足够的,因为 X 和 Y 如果都提前读取出来还是能保持一致的。我们解决了不可重复读以后,还会碰到幻读的情况。

P3 phantom (幻读)

现象

幻读发生在正在执行的事务 T1 有断言的读 (select where) 时,另外一个事务 T2 执行了和断言集合有交集的插入操作。
比如 T1 在 T2 之前读到了员工总数是 3,但是 T2 执行的时候有交集,插入了新的数据,这个时候员工总数是 4,但是 T1 如果再读取的话,就会发现员工总数变成了 4,而不是最初的 3,这就是幻读。

解决

解决幻读的方式是使用长期(断言型)读锁和写锁。也就是不允许在这个范围内进行插入操作。解决了幻读以后我们的事务就完全串行化了,这样的事务并发度是最弱的。

P4 update lost (更新丢失)

常见的 4 个隔离级别说完了以后我们看一下剩下的部分,注意更新丢失这个现象不是比幻读更约束的现象,这个是在基于防止脏读以后可能会出现的现象,会被可重复读防止。

现象

事务 T2 提交的写被其他事务覆盖,首先,没有脏写,因为 T2 已经提交,其次没有脏读,因为在写之后没有读操作,这样的现象称为更新丢失。

解决

升级到可重复读就可以了。

P4C cursor update lost (游标更新丢失)

现象

和更新丢失是一样的,这个只是约束在了游标操作的时候,事务 (T1) 对游标下的数据进行读之后被另一个事务 (T2) 提交了,在游标下的数据写之后让,T1 的写导致 T2 的更新丢失。

解决

在游标移动或者释放之前,都不释放锁,这个是到达可重复读之前的一个插曲。这个也是在读提交的阶段会发生的事情。

A5A read skew (读偏)

偏可以理解为不一致,这个是发生在多个数据之间有一个总的约束的时候。

现象

总的钱是 100,但是从 T1 的角度,总的钱数是 (50+75),因为只有短期读锁。

解决

使用快照隔离 (Snapshot Isolation),快照隔离是基于 MVCC 的。当一个 T 事务开始的时候,T 会获得一个抽象的时间戳(版本),当对数据 X 进行读取的时候,并不是直接看到最新写入的数据而是在 T 开始前的所有执行中的事务中最后一个对 X 标记的版本(如果 T 修改过 X,那么看到的是自己的版本)。也就是说 T 是基于当前的数据库的一个镜像进行操作的,有点类似于 Copy And Swap,而 T 开始执行是获得的版本就是这个快照的凭证。这样能保证所有的读都是基于一个一致的状态获取的。

SI 解决冲突的方法一般是 “First-Commiter-Wins” 也就是说,如果两个并发的事务修改了同一个数据,先写的事务会成功,而后写的事务会发现版本和原本的不一致而退出事务。

以我们的例子来说的话,T1 的 y 只会读到自己开始时候的版本,也就是 50,而不是 75,这样读偏就解决了。但是快照隔离还是不能解决另一个问题,就是写偏。这是我们要面临的新问题。

A5B write skew (写偏)

现象

这个和读偏类似,只不过,不一致在了整个系统上。T1 写锁有 y 的新版本,T2 写锁有 x 的新版本,他们没有写冲突,导致最后系统不一致,x+y 的钱变多了。

解决

目前快照隔离的算法有很多,参考 cockroachDB 使用的论文的话,可以说,通过对版本依赖构成有向图,解决成环问题,以此达到串行快照隔离的级别。

比如上面的例子,如果 T1 在 y 读了之后写了一个版本的 y 就构成一个先读后写的 rw(y) 依赖,类似的 T2 对 T1 构成了一个先读后写的 rw(x) 依赖。还有两种无害的依赖是先写后读 (wr) 和先写后写 (ww)。论文中阐述了,造成写偏的条件是成环,并且环中有两个连续的 rw 依赖。也就是下面这种形式。

这个问题的关键是,检查成环这件事情,就跟操作系统检查死锁一样,消耗太大了,性能上不能接受,所以这个实现的妥协是,把检查放宽,让一些无害的条件也被认定为有害,通过重试来恢复执行,至少防止错误的发生。
这个条件是只要有两个连续的 rw 依赖就会放弃提交,即使没有成环。这个检查发生在读的时候如果发现读的版本和自己开始之前的版本不一致就会找到依赖的事务,构建一条入边,另一个事务构建一条出边,如果某个事务入边出边都有 rw 边,这个节点就会被作为嫌疑人。当然还有其他关于串行 snapshot 隔离的论文可以参考。

其他

Oracle Consistent Read 也算是另一种快照级别的隔离。

参考文献

  1. A Critique of ANSI SQL Isolation Levels
  2. ASCI-SQL
  3. cockroachdb 使用的算法

援引自 cockroach 博客 的解释

cockroach 的底层 scaling 存储本身可以认为是一张无限大的有序 KV map,上层封装了一层 SQL。他 sharding 的方式是通过 range 实现的。对一组 64M 大小数据的范围内的 key 做冗余,用二级 metadata range 可以理论上存储 4EB 的数据。

举个例子,五节点打散三副本的一个 range 做冗余,是成本性能和效率上比较典型的一个折衷。像你说的 leader 集中的情况是比较少的,因为三副本是打散分布到不同节点上的,所以三副本的交集不会太集中,单点的压力还是很少的。

另外租约的存在(不知道算不算在 multi raft 算法内),也可以缓解集中化的压力,通过获得授权的租约旁路一些“可以本地化”的请求(比如只对一个 replica 的操作以及一些能保证数据不变的一段时间的读请求),来缓解主的压力,这个最早我记得在 GFS 里面看到过是这么设计的。

另外 gossip 的协议有点像交换机的生成树协议,是通过互相传播自己的负载和位置信心来传播信息的,没有中心化的决策节点,这样的好处就是无中心化,不会依靠一个中心的节点来做负载均衡,而且整个系统就需要一个二进制文件不依赖任何的其他服务。

如图是一个 range 在三节点上的三副本的一个 raft group 之间有网路通信。

但是如果 range 变多的话,就会很复杂,如下图,是四个节点之间大概 8 个 (如果我没看错颜色的话) range 的 raft group 之间通信。

很明显网络 RoundTrip 太多了。针对这个的优化就是 MultiRaft 了,说白了就是把网络请求 batch up,变成下面的形式。

这样网络压力就会小很多。针对这个的优化和应用本身的实现是耦合的,作者也尝试把这个优化加到 etcd/raft 当中,但是最后的决定是增加 RawNode 这样的无线程安全的接口给应用层实用,方便对心跳等请求的合并,所以你会看到有个非线程安全的 RawNode 的实现。

具体到 cockroach 的实现是可以找到相应的代码的,省略无关代码就如下。

在发送消息的时候检查是否有可以整合的请求。

1
2
3
4
5
6
7
8
9
func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
for range {
// 迭代每个 follower 发送消息
// 对消息进行缓存
if r.maybeCoalesceHeartbeat(ctx, msg, toReplica, fromReplica, true) {
continue
}
}
}

循环取出缓存的消息,注意因为是整合了消息,所以对消息来说有延迟,这里稍微加快了频率,弥补这个延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Since coalesced heartbeats adds latency to heartbeat messages, it is
// beneficial to have it run on a faster cycle than once per tick, so that
// the delay does not impact latency-sensitive features such as quiescence.
func (s *Store) startCoalescedHeartbeatsLoop() {
s.stopper.RunWorker(func() {
ticker := time.NewTicker(s.cfg.CoalescedHeartbeatsInterval)
defer func() {
ticker.Stop()
}()

for {
select {
case <-ticker.C:
s.sendQueuedHeartbeats()
case <-s.stopper.ShouldStop():
return
}
}
})
}

在 Store 这个层面是对每个 Replica (一个 raft group 的成员) 是有感知的,但是每个 raft group 是独立的,只是在发送是会检查对方节点是否和现在的 raft group 有交集,
从而尝试合并消息的发送,你可以理解为在节点之间的消息通道上对不同 raft group 之间多路复用。
这就是 MultiRaft 对多个 raft group 的网络吞吐做的一个优化,代价是可能造成消息的延迟,因为毕竟是被缓存到队列里了。

其实大家提的 LSM 最开始论文里面都使用树做搜索结构的, 现在在用的都不是严格的树结构了。

这篇文章解释的一样,从最朴素的角度上来讲可以把SSTable(sorted string table)作为一个连续的kv构成的块。

1
2
3
4
SSTable
+-+---+----+---+
|k| v | k | v | ...
+-+---+----+---+

对于一个大文件来说,读取整个文件以后就能构成一个各个键值的索引,当然可以在文件追加一块索引,和文件一起保存。

1
2
3
4
5
6
7
8
9
Index
+-+-------+
|k|offset |
+-+-------+
|k|offset |
+-+-------+
.
.
.

有了索引以后用 seek 操作或者直接把文件 mmap 到内存中都可以有很好的随机读性能。

但是对于随机写来说, 会造成大量的I/O,如果我们能够保证我们的SSTable是不可修改(immutable)的,只有SSTable在内存当中的时候(也就是MemTable)才可以修改,就能避免随机写的大负载问题。

通过下面几条约束就能完成我们的要求:

  1. 首先SSTable索引要放在内存中,这样读索引更快
  2. 所有写只能写到MemTable当中, 因为SSTable不可修改
  3. 所有读要先查看MemTable如果没有再查看内存中的索引(最后找到磁盘上的kv)
  4. 定期把MemTable刷成SSTable,这段时间MemTable也变成了不可修改的,新的MemTable会顶替
  5. 定期对SSTable进行合并

最终我们保证了随机写很快(因为只在MemTable中),随机读也很快(因为要么在MemTable中要么通过索引可以很快找到)。

还有一个问题是对于已有数据的删除和修改怎么办?

因为SSTable不可修改所以只能追加写一个新的数据覆盖老的数据,对于删除则是追加一个”墓碑”值覆盖掉存在的值。把索引指向新值,这样老值就不会被访问了。最后在SSTable合并的时候这些老值会完全消失。所以还要定期合并SSTable

以上是对leveldb的LSM结构的朴素解释。实际上MemTableSSTable都没有采用纯粹的树形结构,MemTable使用的是跳表而SSTable使用的是层次的结构。(这也是为什么 leveldb 叫 level db 的原因)

从这里开始完善朴素解释

首先对于MemTable来说不是持久化的如果重启导致内存中的数据丢失怎么办?WAL 表示的是预写日志,这个日志和MemTable是同步的,这个日志把每次的命令追加到日志中再更改MemTable,这样如果重启的话能够进行”重放”把从已经持久化的状态开始把数据填回到MemTable当中。

其次是对SSTable的合并,SSTable是分层存储的,第一层也就是Level0(被称作 young level),是MemTable刷入的一层,允许这一层的SSTable的key有交集。对于每一层都有一个阈值(young level 是 4,其他层是按大小算的,10^L MB),如果超过阈值自动向下一层合并,从level1开始的每一次key不允许有交集。具体的做法是从 young level 中把有交集的SSTable一起和下一层key有交集的SSTable合并成一个新的SSTable,然后其他层则是从自身层取出一个和下一层有交集的SSTable合并即可。这个属性可以用归纳法证明,从0层向1层合并的时候,1层只有一个的情况下肯定不会相交,然后假设n个的时候也不相交,在n+1的时候有交集,那么n+1合并时有0层的 key 和 n 当中的有交集,但是有交集的部分会被归并掉所以矛盾,所以n+1个的时候也是没有交集的。那1层能保证没有交集的话取出一个向下合并也是类似的不会有交集。所以再重复一遍分层存储的两个属性。

对于朴素解释的两个扩展使得我们对leveldb的设计更接近了。

  1. young层SSTable之间可能存在交集
  2. Li(i>0)层SSTable之间不存在交集

在这个基础上再增加几个约束条件,一个是,对于合并过程每超过2MB就会产生一个新文件,如果文件和下一层的文件有交集的个数有10个以上的话也会产生一个新文件,这样的目的是保证Ln和Ln+1之间不会重复太多。个人理解是覆盖太多,会成了倒三角的”树”情况,上一层搜索性能不好。

当然大量的随机读落在磁盘上还是会有性能问题,因为 seek 也可能是不连续的,这个可以想办法优化, 比如leveldb 里面使用了一种LRU缓存优化读性能。

参考

  1. 官方实现文档
  2. LSM-tree论文

SSA概述

SSA在Go1.7中被引入,这个特性对编译器的性能有很大的提高,但是也导致编译过程有些减速。下面来结合网上的资粮和书籍,简单说明一下SSA以及SSA的应用。

SSA 代表 static single-assignment,是一种IR(中间表示代码),要保证每个变量只被赋值一次。这个能帮助简化编译器的优化算法。

1
2
3
y := 1
y := 2
x := y

比如上面这段代码,y = 1其实是不可用的,这个要通过定义的可达分析来确定y是要用1还是2,而SSA有一个标识符可以称之为版本或者“代“。

1
2
3
y1 := 1
y2 := 2
x1 := y2

这样就没有任何间接值了。用SSA表示的好处是对于同一个变量的无关使用表示成不同“代”,可以方便很多编译器的优化算法的实现。

一个概念:

Φ(读作fai) 函数,表示要根据控制流赋值的“代”。

例子可以参考维基里的这一段

三个定义:

A dominate B,如果从起点开始必须通过A到达B。也就是说A是到B的必经之路。

A strictly dominate B,如果 A dominate B,并且A和B不相等。

A 的 dominance frontier 含有B,如果A没有strictly dominate B,但是 dominate 了B的一个前驱节点。

用遍历的方式确定 dominance frontier 的伪代码。

1
2
3
4
5
6
7
for each node b
if the number of immediate predecessors of b ≥ 2
for each p in immediate predecessors of b
runner := p
while runner ≠ idom(b)
add b to runner’s dominance frontier set
runner := idom(runner)

idom(b) 代表相邻的strictly dominate b的结点。这样的点只有一个,因为相邻的点有两个的话就不会是必经之路了。

如图是一个例子,2的前驱是1和7,7没有sd(strictly dominate) 2,所以把2加入到7的DF(dominate frontiers)当中。3是7的相邻sd,然后3不是2的sd所以2加入到3的DF当中,接着2是3相邻的sd,然后2不是2的sd所以把2加入到2的DF中,最后遍历到7,5和6不是7的sd所以把7加入到5和6的df当中。

df(A)可以认为是可以通过A到达的,但不是必经之路的点的集合。

有了这个定义以后就可以插入Φ函数了和重命名了。如果X中有定义a那么所有df(X)都需要a的Φ函数。并且Φ函数本身也是一个定义。

比如还是同一个例子。

1当中有j的定义,但是df(1)是空的,5当中有j的定义,并且5的df有7所以7当中要插入一个φ(j, j)。j现在在7中定义了(通过φ函数)所以df(7)中的2也要有φ(j, j),6也有j的定义但是7已经有了φ函数了,2的df有2,但是2已经有φ函数了。类似的方式可以应用到i和k。接着对定义重命名就可以完成SSA的转换了。

SSA的应用

上面只是通俗的解释了一下SSA,没有给出更多详细的理论和算法以及证明,因为证明实在难看懂,下面说一下SSA的应用。

DEAD CODE 消除

因为每个变量都有“代”(因为大家都只被赋值一次),所以很容易检查出没有被使用的变量并且删除对应的定义。另外如果删除v=x这样的定义还要在x的use表中把这条语句删除。

简单的常量扩展

比如v = φ(c1,c2,...,cn)这种格式,如果c都是相等的可以直接替换成c,或者v=c,如果c是常量的话也可以直接替换掉。在做这些的同时也可以做其他的优化,能够在一趟遍历中都完成,比如copy propagationx=y或者x=φ(y)都可以直接用y替换掉x。比如constant foldingx=a+b如果a+b是常量的话,可以直接用常量赋值。

当然还有其他的优化算法,出发点都是基于SSA的简单性。

从SSA转换回原始代码

y = φ(x1, x2, x3)这样的形式要根据条件分支再拆分回原来的形态,比如满足1条件,使用y=x1这样的形式。并且可能会很自然的想把x1和x2变回使用同一个寄存器,但是通过优化过程中的一些手段(copy propagation)已经把大部分move指令给优化掉了,并且重新推回x可能会有生命周期的影响,所以还是保留了“代”。

总结

其实这个就是把def-use换成了use-def方便做代码优化呀:)害我看那么久

参考文献

  1. 虎书第十九章
  2. 维基百科

本篇文章主要分析一下经过纯物理层之后从设备读取的frame是如何进入协议栈的和对应封装好的frame是如何离开协议栈的,多亏协议栈的分层思路,每一层之间都可以独立分析和阅读,对于驱动来说,首先要面对的就是中断了, 本文基于4.7.2的内核简单的过了一遍.

中断处理

中断处理是分析链路层数据传输的入口。
例如在drivers/net/ethernet/3com/3c59x.c这个是一个华三的设备驱动, 观察这个驱动的中断处理函数可以知道,frame的接收的入口是vortex_rx,而当设备有足够的空间发送frame的时候就会调用netif_wake_queue来触发发送frame的例程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* This is the ISR for the vortex series chips.
* full_bus_master_tx == 0 && full_bus_master_rx == 0
*/

static irqreturn_t
vortex_interrupt(int irq, void *dev_id)
{
struct net_device *dev = dev_id;
struct vortex_private *vp = netdev_priv(dev);
void __iomem *ioaddr;
int status;
int work_done = max_interrupt_work;
int handled = 0;
unsigned int bytes_compl = 0, pkts_compl = 0;

ioaddr = vp->ioaddr;
spin_lock(&vp->lock);

status = ioread16(ioaddr + EL3_STATUS); // 从ioremap的地址当中读取网卡的状态,这个是设备规定的地址

if (vortex_debug > 6)
pr_debug("vortex_interrupt(). status=0x%4x\n", status);

// 在中断处理的时候,设备是关中断的,这个时候可以通过观察这个status来检查设备是否有中断到来.
if ((status & IntLatch) == 0) // 有中断需要处理,但是可能已经被其他中断处理函数处理了
goto handler_exit; /* No interrupt: shared IRQs cause this */
handled = 1;

if (status & IntReq) { // 中断请求
status |= vp->deferred;
vp->deferred = 0;
}

if (status == 0xffff) /* h/w no longer present (hotplug)? */
goto handler_exit;

if (vortex_debug > 4)
pr_debug("%s: interrupt, status %4.4x, latency %d ticks.\n",
dev->name, status, ioread8(ioaddr + Timer));

spin_lock(&vp->window_lock);
window_set(vp, 7);

do {
if (vortex_debug > 5)
pr_debug("%s: In interrupt loop, status %4.4x.\n",
dev->name, status);
if (status & RxComplete) // 中断表示接收完成的时候调用 vortex_rx
vortex_rx(dev);

if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug(" TX room bit was handled.\n");
/* There's room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

if (status & DMADone) {// 表示发送完成可以清除sk_buff了
if (ioread16(ioaddr + Wn7_MasterStatus) & 0x1000) {
iowrite16(0x1000, ioaddr + Wn7_MasterStatus); /* Ack the event. */
pci_unmap_single(VORTEX_PCI(vp), vp->tx_skb_dma, (vp->tx_skb->len + 3) & ~3, PCI_DMA_TODEVICE);
pkts_compl++;
bytes_compl += vp->tx_skb->len;
dev_kfree_skb_irq(vp->tx_skb); /* Release the transferred buffer */
if (ioread16(ioaddr + TxFree) > 1536) {
/*
* AKPM: FIXME: I don't think we need this. If the queue was stopped due to
* insufficient FIFO room, the TxAvailable test will succeed and call
* netif_wake_queue()
*/
netif_wake_queue(dev);
} else { /* Interrupt when FIFO has room for max-sized packet. */
iowrite16(SetTxThreshold + (1536>>2), ioaddr + EL3_CMD);
netif_stop_queue(dev);
}
}
}
/* Check for all uncommon interrupts at once. */
if (status & (HostError | RxEarly | StatsFull | TxComplete | IntReq)) {
if (status == 0xffff)
break;
if (status & RxEarly)
vortex_rx(dev);
spin_unlock(&vp->window_lock);
vortex_error(dev, status);
spin_lock(&vp->window_lock);
window_set(vp, 7);
}

if (--work_done < 0) { // 最多处理work_done个frame
pr_warn("%s: Too much work in interrupt, status %4.4x\n",
dev->name, status);
/* Disable all pending interrupts. */
do {
vp->deferred |= status; // 把当前状态保存起来等下次中断的时候处理
iowrite16(SetStatusEnb | (~vp->deferred & vp->status_enable),
ioaddr + EL3_CMD);
iowrite16(AckIntr | (vp->deferred & 0x7ff), ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_CMD)) & IntLatch); // 把中断清掉?
/* The timer will reenable interrupts. */
mod_timer(&vp->timer, jiffies + 1*HZ);
break;
}
/* Acknowledge the IRQ. */
iowrite16(AckIntr | IntReq | IntLatch, ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_STATUS)) & (IntLatch | RxComplete));// 当有pending的中断并且是接收完成的状态

netdev_completed_queue(dev, pkts_compl, bytes_compl);
spin_unlock(&vp->window_lock);

if (vortex_debug > 4)
pr_debug("%s: exiting interrupt, status %4.4x.\n",
dev->name, status);
handler_exit:
spin_unlock(&vp->lock);
return IRQ_RETVAL(handled);
}

当中断到来的时候检查RxComplete,如果这个状态置位了说明有frame可以处理,当状态有TxAvailable的时候表示网卡的缓冲有空可以尝试发送frame,这样的检查会循环很多次,直到出现错误或者超过规定的循环次数max_interrupt_work,这个值对于这个设备来说是32,也就是说最多接受32帧就会退出中断处理。

frame的接收

vortex_rx最终会调用netif_rx这个函数很关键,vortex_rx会通过netdev_alloc_skb分配一个sk_buff,这个是一个会贯穿整个协议栈的一个buff,然后从设备中拷贝frame,拷贝的方式也有很多比如直接读取或者利用DMA。
从DMA读取到内存当中比如截取的这段代码

1
2
3
4
5
6
7
8
9
10
// 转换成总线地址给DMA设备,启动DMA传输,然后循环检查传输状态
// 最后取消总线地址的映射.
dma_addr_t dma = pci_map_single(VORTEX_PCI(vp), skb_put(skb, pkt_len),
pkt_len, PCI_DMA_FROMDEVICE);
iowrite32(dma, ioaddr + Wn7_MasterAddr);
iowrite16((skb->len + 3) & ~3, ioaddr + Wn7_MasterLen);
iowrite16(StartDMAUp, ioaddr + EL3_CMD);
while (ioread16(ioaddr + Wn7_MasterStatus) & 0x8000)
;
pci_unmap_single(VORTEX_PCI(vp), dma, pkt_len, PCI_DMA_FROMDEVICE);

也有不饶过CPU直接读取的

1
2
3
ioread32_rep(ioaddr + RX_FIFO,
skb_put(skb, pkt_len),
(pkt_len + 3) >> 2);

接着调用eth_type_trans确定对应的protocol,并且最终调用netif_rx来继续处理接收任务。

到了netif_rx就是一个通用流程了,netif_rx调用了netif_rx_internal,这个函数会先通过net_timestamp_check检查sk_buff的时间戳,并且设置,然后调用enqueue_to_backlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
* enqueue_to_backlog is called to queue an skb to a per CPU backlog
* queue (may be a remote CPU queue).
*/
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;

sd = &per_cpu(softnet_data, cpu); // 获取per-cpu softnet_data

local_irq_save(flags);

rps_lock(sd);
if (!netif_running(skb->dev)) // 如果设备已经没有运行的直接丢frame
goto drop;
qlen = skb_queue_len(&sd->input_pkt_queue); // 获取softnet_data的&sk_buff的队列长度
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) { // 如果没有超过最大长度并且没有被限制
if (qlen) {
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags); // 加入队列并且开中断
return NET_RX_SUCCESS;
}
// 如果队列为空可以尝试调度 backlog device,
// 再把frame加入到队列当中。
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
____napi_schedule(sd, &sd->backlog);
}
goto enqueue;
}

drop:
sd->dropped++;
rps_unlock(sd);

local_irq_restore(flags);

atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}

进入到____napi_schedule则很简单了,就是唤起把backlog这个softnet_data上的device加入到poll_list当中然后唤起softirq来处理。

1
2
3
4
5
6
7
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

这里要提的是,kernel对于frame的处理有一个比较新的API,称之为NAPI。

对于NAPI来说, net_rx_action结合了轮询和中断, 当设备有中断表示收到frame的时候会把它加入softnet_datapoll_list, 这个时候设备不再发出中断,然后通过轮询这些设备是否有剩余frame处理,并且限制最大处理数量,这样能保证cpu不会承受过大的中断load的压力,并且可以使得设备之间能够相对公平获得处理的机会,而不是中断更频繁的设备获得的机会更多。

对于使用NAPI的设备来说显然需要提供poll接口来供查询之用,同时用NET_RX_SOFTIRQ触发软中断执行net_rx_action这个软中断处理函数,而不需要netif_rx这个接口。

NET_RX_SOFTIRQ做的事情就是把设备加入poll列表并且触发softirq. 对于没有使用NAPI的设备来说,会使用per-cpu 结构softnet_data中的backlog_dev(是一个假的胶水层的封装)来替代放入对应的poll列表中,然后再进入netif_rx_schedule的例程来处理。也就是说NAPI-unaware的设备用的是backlog_dev而NAPI-aware的设备用的是自己的device结构体。

相反的把设备从轮询列表中移除依靠的是netif_rx_complete. 这样轮询检查的时候就不会处理对应的设备了.

这里说了这么久的轮询不要误会frame的主力是轮询,显然这是不行的,因为网络数据要求接受很快,还是中断驱动的,只不过为了物尽其用,既然你有数据帧还在就不要中断告诉我,我继续处理就可以了。

现在看一下加入到poll_list之后,NET_RX_SOFTIRQ软中断是如何工作的。关于软中断的实现可以参考参考列表中的第二个链接,我本来想自己总结一下,但是发现这篇文章确实总结的很好,所以保存一下就好了。

补充一点,softnet_data->input_pkt_queue是frame的缓存队列,这个队列有一个最大值netdev_max_backlog,目前这个值是1000,也就是每个CPU最多有1000个frame没处理。对于有自己device的设备,frame需要通过设备的poll方法来获取。

net_rx_action对应的是软中断NET_RX_SOFTIRQ的处理函数。之前说过net_rx_action的工作方式,这里看一下具体的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget; // 一个budget用于限制运行时间,目前是300
LIST_HEAD(list);
LIST_HEAD(repoll);

local_irq_disable();
list_splice_init(&sd->poll_list, &list); // 把poll_list合并到list上并且把poll_list清空
local_irq_enable();

for (;;) {
struct napi_struct *n;

if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
return;
break;
}

n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll); // 每调用一次会减掉相应的budget,如果还有work没完成还需要poll就会加入到repoll链表里。

/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 || // 如果budge耗尽或者超过了两个jiffies就会停止
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}

__kfree_skb_flush();
local_irq_disable();

list_splice_tail_init(&sd->poll_list, &list); // 再把poll_list合并到list当中,并且清空poll_list。
list_splice_tail(&repoll, &list); // 然后把repoll合并到list当中。
list_splice(&list, &sd->poll_list); // 再把list合并到poll_list当中。
if (!list_empty(&sd->poll_list)) // 这几步的过程就是把需要repoll的设备和当前的poll_list合并
__raise_softirq_irqoff(NET_RX_SOFTIRQ); // 如果还有需要poll的设备就再触发软中断.

net_rps_action_and_irq_enable(sd);
}

整个过程就是检查poll_list并且轮询调用poll方法。接下来具体看一下poll方法的相关内容。以非NAPI设备为例,使用的是backlogpoll方法,对应的方法可以在net/core/dev.c当中找到,在初始化函数net_dev_init当中sd->backlog.poll = process_backlog;给每个per-cpu结构的softnet_data都是指向了这个poll函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);

/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}

napi->weight = weight_p; // 这个值目前是32
local_irq_disable();
while (1) {
struct sk_buff *skb;

while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
local_irq_enable();
__netif_receive_skb(skb); // 取出的skb交给__netif_receive_skb处理
rcu_read_unlock();
local_irq_disable();
input_queue_head_incr(sd);
if (++work >= quota) {
local_irq_enable();
return work;
}
}

rps_lock(sd);
if (skb_queue_empty(&sd->input_pkt_queue)) {
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
napi->state = 0;
rps_unlock(sd);

break;
}

skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
rps_unlock(sd);
}
local_irq_enable();

return work;
}

这个函数的主体就是把&sd->input_pkt_queue交给&sd->process_queue然后从&sd->process_queue当中取出sk_buff,再调用__netif_receive_skb,进一步处理.我想老是把&sd->input_pkt_queue拷贝并且清空应该是因为希望不因为锁独占这个队列太久的原因.

__netif_receive_skb的内容主要是根据skb->protocol,遍历&skb->dev->ptype_all然后将frame交给L3的ptype->func处理,还有一些在这一层需要处理的特性,比如bridging.

目前说描述的东西就是整个传输路径的这部分,完整的图在这里

frame的发送

在frame的发送路径,主要包含几个任务,开关设备的发送功能,调度设备进行发送,选择在设备发送队列的frame进行发送,还要传输过程的本身.而且发送的例程也是类似接收的过程,有对应的softirq(NET_TX_SOFTIRQ)和对应的handler,net_tx_action。和poll_list对应的是output_queue,也是一个等待发送的设备列表。
__LINK_STATE_START__LINK_ STATE_XOFF__LINK_STATE_RX_SCHED__LINK_STATE_SCHED也是对应的.回顾开头中断的代码

1
2
3
4
5
6
7
if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug(" TX room bit was handled.\n");
/* There's room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

就是说当状态可用的时候,尝试调用netif_wake_queue来触发frame的发送。

首先来看一下如何选择发送的设备。

1
2
3
4
5
6
7
8
9
10
11
/**
* netif_wake_queue - restart transmit
* @dev: network device
*
* Allow upper layers to call the device hard_start_xmit routine.
* Used for flow control when transmit resources are available.
*/
static inline void netif_wake_queue(struct net_device *dev)
{
netif_tx_wake_queue(netdev_get_tx_queue(dev, 0));
}

这个函数内部调用了

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void __netif_reschedule(struct Qdisc *q)
{
struct softnet_data *sd;
unsigned long flags;

local_irq_save(flags);
sd = this_cpu_ptr(&softnet_data);
q->next_sched = NULL;
*sd->output_queue_tailp = q;
sd->output_queue_tailp = &q->next_sched;
raise_softirq_irqoff(NET_TX_SOFTIRQ);
local_irq_restore(flags);
}

主体就是把设备加入output_queue(通过->next_sched连接)然后唤起软中断NET_TX_SOFTIRQ

再来看看软中断的主体。
首先是遍历completion_queue来释放sk_buff这个连接是通过dev_kfree_skb_irq添加的,和正常的dev_kfree_skb不同,它是把sk_buff加入到释放列表中就快速返回了, 然后就会遍历设备列表寻找可运行的设备,调用qdisk_run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 void net_tx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);

if (sd->completion_queue) { // 检查completion_queue逐一释放sk_buff, 他们是通过dev_kfree_skb_irq,因为他不真的释放sk_buff,而是把sk_buff移到completio_queue当中.
struct sk_buff *clist;

local_irq_disable();
clist = sd->completion_queue;
sd->completion_queue = NULL;
local_irq_enable();

while (clist) {
struct sk_buff *skb = clist;
clist = clist->next;

WARN_ON(atomic_read(&skb->users));
if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
trace_consume_skb(skb);
else
trace_kfree_skb(skb, net_tx_action);

if (skb->fclone != SKB_FCLONE_UNAVAILABLE)
__kfree_skb(skb);
else
__kfree_skb_defer(skb);
}

__kfree_skb_flush();
}

if (sd->output_queue) {
struct Qdisc *head;

local_irq_disable();
head = sd->output_queue; // 拷贝并首尾相连.
sd->output_queue = NULL;
sd->output_queue_tailp = &sd->output_queue;
local_irq_enable();

while (head) {
struct Qdisc *q = head;
spinlock_t *root_lock;

head = head->next_sched;

root_lock = qdisc_lock(q);
if (spin_trylock(root_lock)) {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
qdisc_run(q); // qdisk_run
spin_unlock(root_lock);
} else {
if (!test_bit(__QDISC_STATE_DEACTIVATED,
&q->state)) {
__netif_reschedule(q);
} else {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
}
}
}
}
}

qdisc_run首先检查设备的运行状态然后调用__qdisc_run,这个函数的主体就是调用qdisc_restart,直到超过限制或者需要让出时间CPU了,最后清空qdisc的RUNNING状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void __qdisc_run(struct Qdisc *q)
{
int quota = weight_p;
int packets;

while (qdisc_restart(q, &packets)) {
/*
* Ordered by possible occurrence: Postpone processing if
* 1. we've exceeded packet quota
* 2. another process needs the CPU;
*/
quota -= packets;
if (quota <= 0 || need_resched()) {
__netif_schedule(q);
break;
}
}

qdisc_run_end(q);
}

qdisc_restart的主体是从队列中取出sk_buff,然后调用sch_direct_xmit,它的功能是调用dev_hard_start_xmit来运行设备驱动的指定的传输方法hard_start_xmit,如果没有成功则把sk_buff重新加入到出队当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
* Transmit possibly several skbs, and handle the return status as
* required. Holding the __QDISC___STATE_RUNNING bit guarantees that
* only one CPU can execute this function.
*
* Returns to the caller:
* 0 - queue is empty or throttled.
* >0 - queue is not empty.
*/
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock, bool validate)
{
int ret = NETDEV_TX_BUSY;

/* And release qdisc */
spin_unlock(root_lock);

/* Note that we validate skb (GSO, checksum, ...) outside of locks */
if (validate)
skb = validate_xmit_skb_list(skb, dev);

if (likely(skb)) {
HARD_TX_LOCK(dev, txq, smp_processor_id());
if (!netif_xmit_frozen_or_stopped(txq))
skb = dev_hard_start_xmit(skb, dev, txq, &ret);

HARD_TX_UNLOCK(dev, txq);
} else {
spin_lock(root_lock);
return qdisc_qlen(q);
}
spin_lock(root_lock);

if (dev_xmit_complete(ret)) {
/* Driver sent out skb successfully or skb was consumed */
ret = qdisc_qlen(q);
} else {
/* Driver returned NETDEV_TX_BUSY - requeue skb */
if (unlikely(ret != NETDEV_TX_BUSY))
net_warn_ratelimited("BUG %s code %d qlen %d\n",
dev->name, ret, q->q.qlen);

ret = dev_requeue_skb(skb, q);
}

if (ret && netif_xmit_frozen_or_stopped(txq))
ret = 0;

return ret;
}

至此整个L2的发送和接收就大致有了一个了解,并且能够理解整个的工作过程,这里有一点没有讲到的是L2的转发机制,比如STP等协议和实现,可能会在接下来的文章中继续剖析.

参考:

  1. Linux 下DMA浅析
  2. linux kernel的中断子系统之(八):softirq
  3. Linux Network Internals

经常提到的context都是一些在一个http请求的不同中间件之间传递参数和返回值的上下文. 标准库里的上下文稍微丰富一点, 比如流水线模型都会有一个通知退出的channel, 也一并封装进了Context, 所以既有传值功能也有流程控制的功能.

一般会把Context作为函数的第一个参数传递下去, 比如Context可以保存一个Request的id, 在打log的时候带上, 这样就可以区分出输出中同一个request的调用情况. 或者子函数select 传入的Context.Done channel用于在流水线中退出的通知等等.

Context接口有四个方法, 并且线程安全.

Done 是一个channel用于通知退出事件, 当从这个channel里读出的时候说明Context结束.
Err 返回退出的原因.
Deadline 返回一个截止时间(如果设置了的话).
Value 可以取得Context里面存得值.

Context的实现有4种, cancelCtx, emptyCtx, timerCtx, valueCtx.

valueCtx只是最简单的Ctx用来存k-v, 这类似于中间件之间传递各种值的功能, 没有Done, Err, Deadline这些方法, 是组合父Context得到的.

cancelCtx是一个带取消功能的Ctx, 返回一个cancel函数, 可以通过调用它主动取消这个Ctx, 比如错误退出的时候, 可以调用cancel函数, 他会同时调用子Ctx的cancel函数, 以此来让Context的Done起作用, cancelCtx是树形的, 有Done, Err, 但是没有Deadline, 组合了父结构获得的.

timerCtx是一个cancelCtx的封装, 由timer调用cancel函数, 并且不会超过parent的Deadline(不然就退化成cancelCtx由parent触发cancel), 有Deadline, 其余部分是组合了cancelCtx.

Context.WithCancel就是绑定一个parent, 返回一个cancelCtx的Context, 用返回的CancelFunc可以用来主动关闭Context, 比如程序错误的时候可以调用CancelFunc来让select Done的goroutine知道.

Context.WithTimer是对Context.WithDeadline的封装, Context.WithDeadline返回的是timerCtx.

Context.WithValue则仅仅是用来存值的Context.

WithXXX的函数都是要接受一个parent的, 最初的parent就用Context.Background().
它是一种emptyCtx, Err, Done(nil channel永远阻塞), Deadline, Value全都返回空值, 没有实际作用只是作为树形结构的root.

另一种emptyCtx是context.TODO(), 用于没有明确作用的不知道父Context会是什么的时候可以使用.

主要结构

为了解决外部碎片的问题, 需要有一种手段可以分配非连续的页映射到一段连续的虚拟地址上. 这就是vmalloc的目的, 本文基于4.7.2的代码对vmalloc的实现进行分析.

这里要提到一个重要的结构体

1
2
3
4
5
6
7
8
9
10
11
struct vm_struct {
struct vm_struct *next;
void *addr;
unsigned long size;
unsigned long flags;
struct page **pages;
unsigned int nr_pages;
phys_addr_t phys_addr;
const void *caller;
};

这个结构用于描述每一段映射的内存.

比起32位的结构, 64位结构有更大的寻址空间, 可以通过查看Documentation/x86/x86_64/mm.txt 获得64位的内存映射, 比起32位的变化就是跨度变大了很多.

直接映射是64TB, 有一个1TB的空洞, 接着就是32TB的vmalloc空间,以VMALLOC_START(64TB+1TB)开始,VMALLOC_END(64TB+1TB+32TB)结尾.

原来的32位时, 内核的虚拟空间只有1G, 直接映射的区间占896MB, 隔了一个8MB的空洞, 再接着是vmalloc的映射区, 大小一般是128MB.

vmalloc和物理映射的空洞主要是一个安全区, 如果越过了这个hole说明程序有问题存在非法的访问.

vm_struct的组织是通过链表实现的,
其中next指向下一个vm_struct, addr指向的是虚拟地址, size对应分配的内存的大小,在有guard page(也就是flag的VM_NO_GUARD没有设置的话)的时候会多出一个PAGE_SIZE的大小(但没有映射内存), pages是页的地址, nr_pages是页的数量, phy_addr这个字段一般为0,当用于ioremap的时候会用到, caller是调用者的地址.

上面说了, 每个vm_struct代表的区间之间会有一PAGE_SIZE大小的分隔,也是为了能够方便检查非法越界的情况.

这是32位的一张分布图, 整体结构没变, 就是64位的区间变大了.

vmap_area用于代表虚拟地址的区间, 和vm_struct是有对应关系的.
list 是按地址排序存的线性链表节点.
rb_node 是按地址存的红黑树节点.
这两个值都是在插入的时候修改的.
也就是为了方便搜索,它既存在红黑树上也存在链表里面.

1
2
3
4
5
6
7
8
9
10
11
12
struct vmap_area {
unsigned long va_start;
unsigned long va_end;
unsigned long flags;
struct rb_node rb_node; /* address sorted rbtree */
struct list_head list; /* address sorted list */
struct llist_node purge_list; /* "lazy purge" list */
struct vm_struct *vm;
struct rcu_head rcu_head;
};


vmap_area 虚拟地址的管理

接下了说一下调用过程, 首先vmalloc会调用__vmalloc_node_flags,这是一层封装,指定了GFP的flag, 接着调用__vmalloc_node,它通过指定搜寻的区域(VMALLOC_START,VMALLOC_END)调用了__vmalloc_node_range, 然后通过__get_vm_area_node寻找合适的虚拟内存空间,并且分配vm_struct,最后调用__vmalloc_area_node根据vm_struct分配具体的物理页并在页表中建立虚拟地址的映射.

__get_vm_area_nodesize进行对齐, 分配vmap_area的内存, 然后增加一个PAGE_SIZE的guard page, 接着就是寻找一段合适的虚拟地址的区间来分配给vmap_area并且用它来初始化vm_struct.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static struct vm_struct *__get_vm_area_node(unsigned long size,
unsigned long align, unsigned long flags, unsigned long start,
unsigned long end, int node, gfp_t gfp_mask, const void *caller)
{
struct vmap_area *va;
struct vm_struct *area;

BUG_ON(in_interrupt());
if (flags & VM_IOREMAP)
align = 1ul << clamp_t(int, fls_long(size),
PAGE_SHIFT, IOREMAP_MAX_ORDER);

size = PAGE_ALIGN(size);
if (unlikely(!size))
return NULL;

/* 分配vm_struct
* 并且初始化全0
*/
area = kzalloc_node(sizeof(*area), gfp_mask & GFP_RECLAIM_MASK, node);
if (unlikely(!area))
return NULL;
/* 如果没有标记,size要多算一个guard page用于分隔 */
if (!(flags & VM_NO_GUARD))
size += PAGE_SIZE;
/* 分配vmap_area */
va = alloc_vmap_area(size, align, start, end, node, gfp_mask);
if (IS_ERR(va)) {
kfree(area);
return NULL;
}

/* 初始化 */
setup_vmalloc_vm(area, va, flags, caller);

return area;
}

alloc_vmap_area会缓存上次一次搜索的节点到free_vmap_cache, 这样是认为一般虚拟内存是按顺序连续分配的, 如果通过缓存没有搜索到就要从全局的vmap_area_root开始搜索.

这里说明一下vmap_area的分配过程, 主要是在红黑树中根据所指定的区间寻找合适的点,如果找到一个区间正好包含了vstart,那么在这个区间链表后面寻找合适的空洞进行分配就可以, 也有可能在红黑书上找到的就是空洞就不用接着遍历了, 总之搜索的过程是先红黑书然后再链表.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
* Allocate a region of KVA of the specified size and alignment, within the
* vstart and vend.
*/
static struct vmap_area *alloc_vmap_area(unsigned long size,
unsigned long align,
unsigned long vstart, unsigned long vend,
int node, gfp_t gfp_mask)
{
struct vmap_area *va;
struct rb_node *n;
unsigned long addr;
int purged = 0;
struct vmap_area *first;

BUG_ON(!size);
BUG_ON(offset_in_page(size));
BUG_ON(!is_power_of_2(align));

might_sleep_if(gfpflags_allow_blocking(gfp_mask));

va = kmalloc_node(sizeof(struct vmap_area),
gfp_mask & GFP_RECLAIM_MASK, node);
if (unlikely(!va))
return ERR_PTR(-ENOMEM);

/*
* Only scan the relevant parts containing pointers to other objects
* to avoid false negatives.
*/
kmemleak_scan_area(&va->rb_node, SIZE_MAX, gfp_mask & GFP_RECLAIM_MASK);

retry:
spin_lock(&vmap_area_lock);
/*
* Invalidate cache if we have more permissive parameters.
* cached_hole_size notes the largest hole noticed _below_
* the vmap_area cached in free_vmap_cache: if size fits
* into that hole, we want to scan from vstart to reuse
* the hole instead of allocating above free_vmap_cache.
* Note that __free_vmap_area may update free_vmap_cache
* without updating cached_hole_size or cached_align.
*/
if (!free_vmap_cache || /* 缓存为空, 并且在下面条件满足的时候不使用缓存 */
size < cached_hole_size || /* 至少有一个空洞可以复用, 直接从头开始搜索 */
vstart < cached_vstart || /* 别缓存的起点小 */
align < cached_align) { /* 对齐大小比缓存的小 */
nocache:
cached_hole_size = 0;
free_vmap_cache = NULL;
}
/* record if we encounter less permissive parameters */
cached_vstart = vstart;
cached_align = align;

/* find starting point for our search */
if (free_vmap_cache) { /* 这个地方是用缓存上次搜索的结果, 每次找到以后会存到这里 */
first = rb_entry(free_vmap_cache, struct vmap_area, rb_node);
addr = ALIGN(first->va_end, align);
if (addr < vstart) /* 缓存的末尾对齐之后超出了范围从头开始搜索 */
goto nocache;
if (addr + size < addr) /* 整数溢出 */
goto overflow;

} else {
addr = ALIGN(vstart, align);
if (addr + size < addr)
goto overflow;

n = vmap_area_root.rb_node;
first = NULL;

while (n) { /* 用对齐之后的vstart正好<=va_ned&&>=va_start的节点 */
struct vmap_area *tmp;
tmp = rb_entry(n, struct vmap_area, rb_node);
if (tmp->va_end >= addr) {
first = tmp;
if (tmp->va_start <= addr)
break;
n = n->rb_left;
} else
n = n->rb_right;
}

if (!first)
goto found;
}
/* from the starting point, walk areas until a suitable hole is found */
while (addr + size > first->va_start && addr + size <= vend) {
if (addr + cached_hole_size < first->va_start) /* cached_hole_size */
cached_hole_size = first->va_start - addr; /* 缓存最大的空洞大小 */
addr = ALIGN(first->va_end, align);
if (addr + size < addr)
goto overflow;

if (list_is_last(&first->list, &vmap_area_list))
goto found;

first = list_next_entry(first, list);
}

found:
if (addr + size > vend)
goto overflow;

va->va_start = addr;
va->va_end = addr + size;
va->flags = 0;
__insert_vmap_area(va); /* 插入到红黑书和链表当中 */
free_vmap_cache = &va->rb_node;
spin_unlock(&vmap_area_lock);

BUG_ON(!IS_ALIGNED(va->va_start, align));
BUG_ON(va->va_start < vstart);
BUG_ON(va->va_end > vend);

return va;

overflow:
spin_unlock(&vmap_area_lock);
if (!purged) {
purge_vmap_area_lazy();
purged = 1;
goto retry;
}

if (gfpflags_allow_blocking(gfp_mask)) {
unsigned long freed = 0;
blocking_notifier_call_chain(&vmap_notify_list, 0, &freed);
if (freed > 0) {
purged = 0;
goto retry;
}
}

if (printk_ratelimit())
pr_warn("vmap allocation for size %lu failed: use vmalloc=<size> to increase size\n",
size);
kfree(va);
return ERR_PTR(-EBUSY);
}


建立映射

成功分配好vm_struct之后就可以建立映射了.
__vmalloc_area_node的过程首先是分配物理页,调用alloc_kmem_pages,这个函数和alloc_pages不同的是套了一层cgroup的kmem计数器,来限制内存分配, 然后再调用map_vm_area在页表上建立映射,内部调用的是vmap_page_range,指定了vm_struct的start和end, 而vmap_page_range则调用了vmap_page_range_noflush迭代去初始化各级页表. 内核的虚拟地址的映射就全部完成了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
*
* Set up page tables in kva (addr, end). The ptes shall have prot "prot", and
* will have pfns corresponding to the "pages" array.
* 这里的kva指的是kernel virtual address.
* Ie. pte at addr+N*PAGE_SIZE shall point to pfn corresponding to pages[N]
*/
static int vmap_page_range_noflush(unsigned long start, unsigned long end,
pgprot_t prot, struct page **pages)
{
pgd_t *pgd;
unsigned long next;
unsigned long addr = start;
int err = 0;
int nr = 0;

BUG_ON(addr >= end);
pgd = pgd_offset_k(addr); /* 把虚拟地址hash到pgd */
do {
next = pgd_addr_end(addr, end); /* 获取下一个pgd */
err = vmap_pud_range(pgd, addr, next, prot, pages, &nr); /* 再去初始化 pud pmd pte */
if (err)
return err;
} while (pgd++, addr = next, addr != end);

return nr;
}


对应的调用关系如图.

释放

释放的入口是vfree, 这个函数首先通过in_interrupt检查是否在中断上下文, 如果是正处于中断中则使用workqueue传入vfree_deferred用workqueue进行释放, 不然就直接调用__vunmap来释放, 它会调用remove_vm_area, 最终调用用unmap_vmap_area重置页表, 然后用free_vmap_area_noflushvmp_area加入到vmap_purge_list这个链表当中, 放入这个链表之后只有链表长度超过阈值才会尝试调用try_purge_vmap_area_lazy真正释放掉vmap_area,释放过程主要是从红黑树中移除, 然后把自己也给free掉, 这是一个延迟的释放过程, 最后把把vm_struct自己和它管理的物理页全部释放. 调用图如图所示.

总结

vmalloc主要是针对连续地址分配的需求, 涉及两个部分, 一个是虚拟地址的管理以及到物理地址的映射. 其中虚拟地址的管理比较麻烦一点, 用链表红黑树保存了每个vmap_area并且寻找合适的空洞用于分配虚拟地址. 而物理地址的分配之前我的博客已经讲过了, 建立映射只要设置对应的页表项就OK了.

参考

  1. 《LINUX3.0内核源代码分析》第四章: 内存管理

  2. linux内存管理之vmalloc

  3. lkvm

很多人看内核物理页的分配都喜欢从__alloc_pages开始看,喜欢从程序的执行流程去看问题,其实可换个思路,从开发或者说实现的角度去理解物理页的分配效果可能更好,所以这篇博客,反其道而行之,基于4.7.2的内核代码, 由内向外解释物理页的分配过程的细节.

binary buddy system allocation 算法

伙伴分配器算法的流程如下:

  1. 如果内存被分配
    1. 寻找一个合适大小的内存(要求是: 大于 requested memory, 同时以\(2^k\)为量纲最小的块, 也就是分配一个满足要求的最小块)
      1. 如果找到了直接分配
      2. 如果没有找到, 需要进行如下尝试
        1. 拆分一个比 requested memory 更大的内存块(比如比之前没找到的\(2^x\)大的\(2^{x+1}\), 分成两半.
        2. 如果拆分出来的一半满足requested memory, 并且不能再分了, 已经是最小的了, 就分配该块.
        3. 重复1, 寻找合适大小的内存块.
        4. 重复这个流程直到找到合适的内存块.
  2. 如果内存被释放
    1. 释放\(2^k\)内存块
    2. 查看内存块的伙伴也就是分配之后的另一半\(2^k\)块是否也free了
    3. 如果是,则会回到2并且重复执行直到所有内存被释放或者有一个伙伴没有被free掉, 无法合并.

内存碎片

内存碎片有两种,一种是External fragmentation和Internal Fragmentation, 前者是因为内存块划分的太小而无法满足大的内存分配的需要, 在这点上内核遇到的大内存块分配其实并不多, 而且通过vmalloc把不连续的物理地址映射成连续的虚拟地址很好的解决了这个问题. 后者的解决方法是引入了slab allocator, 对小内存进行了有效的缓存, 避免了Internal fragmentation的情况.

具体实现

每个struct zone结构有一个struct free_area free_area[MAX_ORDER];的定义, 对应的结构如下.

1
2
3
4
struct free_area {
struct list_head free_list[MIGRATE_TYPES];
unsigned long nr_free;
};

所以整个的空闲块的管理结构如图.

每一个free_area用来存储一种\(2^k\)块,用链表链接,MAX_ORDER表示的允许最大的块就是\(2^{MAX_ORDER}\)大小.

migratetype

图中省略了MIGRATE_TYPES的表示, 仔细观察free_list是一个数组每个代表了一种migratetype, 用于page分配从node之间迁移的, 让内存尽量分配在离CPU近的node上, 这个特性是在2.6.32.25引入的, 同时对外部碎片的现象进行了优化, 这一点是这么理解的, 外部碎片是因为小内存块的分配位置可能占据了某个地方, 导致大的内存块无法分配出来, 比如说如果这个页是一些内核的永久数据可能这个碎片就会一直存在, 但是我们提前从可移动页分配一大块内存来给不可移动页, 那么内存碎片就不会影响到可移动页的分配, 这样就不会让内核初始化的时候分配的一些永久性的内存导致碎片影响了其他内存的分配,下面我们看看迁移类型的具体定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* PAGE_ALLOC_COSTLY_ORDER is the order at which allocations are deemed
* costly to service. That is between allocation orders which should
* coalesce naturally under reasonable reclaim pressure and those which
* will not.
*/
/* 内核认为超过8个页算是大的内存分配 */
#define PAGE_ALLOC_COSTLY_ORDER 3

enum {
MIGRATE_UNMOVABLE,
MIGRATE_MOVABLE,
MIGRATE_RECLAIMABLE,
MIGRATE_PCPTYPES, /* the number of types on the pcp lists */
MIGRATE_HIGHATOMIC = MIGRATE_PCPTYPES,
#ifdef CONFIG_CMA
/*
* MIGRATE_CMA migration type is designed to mimic the way
* ZONE_MOVABLE works. Only movable pages can be allocated
* from MIGRATE_CMA pageblocks and page allocator never
* implicitly change migration type of MIGRATE_CMA pageblock.
*
* The way to use it is to change migratetype of a range of
* pageblocks to MIGRATE_CMA which can be done by
* __free_pageblock_cma() function. What is important though
* is that a range of pageblocks must be aligned to
* MAX_ORDER_NR_PAGES should biggest page be bigger then
* a single pageblock.
*/
MIGRATE_CMA,
#endif
#ifdef CONFIG_MEMORY_ISOLATION
MIGRATE_ISOLATE, /* can't allocate from here */
#endif
MIGRATE_TYPES
};


这里简单解释一下:

  1. MIGRATE_UNMOVABLE
     这种类型的页的位置是固定不变的, 不可以移动, 内核中大部分数据是这样的.
    
  2. MIGRATE_MOVABLE
     不能够直接移动,但是可以删除,而内容则可以从某些源重新生成.如文件数据映射的页面则归属此类.
    
  3. MIGRATE_RECLAIMABLE
     可以移动。分配给用户态程序运行的用户空间页面则为该类.由于是通过页面映射而得,将其复制到新位置后,更新映射表项,重新映射,应用程序是不感知的.
    
  4. MIGRATE_PCPTYPES
     是一个分界数,小于这个数的都是在pcplist上的.
    
  5. MIGRATE_CMA
     连续内存分配,用于避免预留大块内存导致系统可用内存减少而实现的,即当驱动不使用内存时,将其分配给用户使用,而需要时则通过回收或者迁移的方式将内存腾出来。
    
  6. MIGRATE_ISOLATE
     表示NUMA不能夸node迁移page, 让cpu和节点之间保持亲和性.
    
  7. MIGRATE_TYPES
     分界数, 低于这个才是迁移类型.
    

实现概要

实际上的合并对应的是通过小块合并成大块从下级的free_area的链表中移除, 然后添加到上一层free_area当中. 拆分则相反, 从上级free_area当中删除大块, 并且拆分成两个小块, 加入到下层free_area的list当中, 当然, 如果有一块符合需求就直接分配了. 这就是整个物理页分配的核心内容.

分配

整个算法的入口是__rmqueue, 从伙伴分配器中分配对应\( 2^{order} \) 的物理块.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
* Do the hard work of removing an element from the buddy allocator.
* Call me with the zone->lock already held.
*/
static struct page *__rmqueue(struct zone *zone, unsigned int order,
int migratetype)
{
struct page *page;

page = __rmqueue_smallest(zone, order, migratetype);
if (unlikely(!page)) {
if (migratetype == MIGRATE_MOVABLE)
page = __rmqueue_cma_fallback(zone, order); /* MOVABLE的失败优先从cma迁移 */

if (!page)
page = __rmqueue_fallback(zone, order, migratetype); /* 失败的话会从其他备选迁移类型当中迁移page */
}

trace_mm_page_alloc_zone_locked(page, order, migratetype);
return page;
}

分配失败退化的行为在后面讨论,先看主要路径, 算法主要体现在__rmqueue_smallest这个函数上, 依次从order开始向上寻找free_area并从list当中取出page块.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
* Go through the free lists for the given migratetype and remove
* the smallest available page from the freelists
*/
static inline
struct page *__rmqueue_smallest(struct zone *zone, unsigned int order,
int migratetype)
{
unsigned int current_order;
struct free_area *area;
struct page *page;

/* Find a page of the appropriate size in the preferred list */
for (current_order = order; current_order < MAX_ORDER; ++current_order) {
area = &(zone->free_area[current_order]);
page = list_first_entry_or_null(&area->free_list[migratetype],
struct page, lru);
if (!page)
continue; /* 如果失败就尝试更大的块 */
list_del(&page->lru); /* 这个lru取决他的上下文,比如在这里就是free_list */
rmv_page_order(page); /* 设置flags */
area->nr_free--; /* free计数器-1 */
expand(zone, page, order, current_order, area, migratetype);
set_pcppage_migratetype(page, migratetype);/* 设置page的migratetype */
return page;
}
return NULL;
}

从下往上找到合适的块以后再从上往下进行拆分, 这依赖于expand函数完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/*
* The order of subdivision here is critical for the IO subsystem.
* Please do not alter this order without good reasons and regression
* testing. Specifically, as large blocks of memory are subdivided,
* the order in which smaller blocks are delivered depends on the order
* they're subdivided in this function. This is the primary factor
* influencing the order in which pages are delivered to the IO
* subsystem according to empirical testing, and this is also justified
* by considering the behavior of a buddy system containing a single
* large block of memory acted on by a series of small allocations.
* This behavior is a critical factor in sglist merging's success.
*
* -- nyc
*/
static inline void expand(struct zone *zone, struct page *page,
int low, int high, struct free_area *area,
int migratetype)
{
unsigned long size = 1 << high;

while (high > low) { /* 从高阶向低阶迭代 */
area--;
high--;
size >>= 1;
VM_BUG_ON_PAGE(bad_range(zone, &page[size]), &page[size]);
/* guardpage TODO */
if (IS_ENABLED(CONFIG_DEBUG_PAGEALLOC) &&
debug_guardpage_enabled() &&
high < debug_guardpage_minorder()) {
/*
* Mark as guard pages (or page), that will allow to
* merge back to allocator when buddy will be freed.
* Corresponding page table entries will not be touched,
* pages will stay not present in virtual address space
*/
set_page_guard(zone, &page[size], high, migratetype);
continue;
}
list_add(&page[size].lru, &area->free_list[migratetype]); /* 把page放到area的free_list里面, 这个page是一个数组形式的 比如之前是8 这里变成4,那么后半部分会被留下来,前半部分会继续用于迭代 */
area->nr_free++; /* 空闲块计数器+1 */
set_page_order(&page[size], high); /* 相当于page->private = high 表示自己属于order为high的阶的block中 */
}
}

expand 就是向下一层的free_area留一半, 然后接着用另一半进行迭代.

举个例子说明分配的过程:

假设当前需要从zone当中分配一个order为1的page, 就会从下往上找到一个可以分配的块.

从1开始迭代到2, 发现2有可以分配的\(2^3\)page, 然后进入expand, 进行拆分.

拆分以后一半用于分配一半进入了同一层的free_list当中,如图所示, 核心流程非常简洁明了.

现在再重新回到__rmqueue,分析这个函数后半部分的fallback行为是如何迁移的.
首先如果编译选项带了CMA就会有具体内容, 其实就是再尝试从MIGRATE_CMA当中再分配一次page.

1
2
3
4
5
6
7
8
9
10
11
#ifdef CONFIG_CMA
static struct page *__rmqueue_cma_fallback(struct zone *zone,
unsigned int order)
{
return __rmqueue_smallest(zone, order, MIGRATE_CMA);
}
#else
static inline struct page *__rmqueue_cma_fallback(struct zone *zone,
unsigned int order) { return NULL; }
#endif

之后是一般性的退化流程是__rmqueue_fallback这个函数.
说这个函数之前需要看一张表.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

/*
* This array describes the order lists are fallen back to when
* the free lists for the desirable migrate type are depleted
*/
static int fallbacks[MIGRATE_TYPES][4] = {
[MIGRATE_UNMOVABLE] = { MIGRATE_RECLAIMABLE, MIGRATE_MOVABLE, MIGRATE_TYPES },
[MIGRATE_RECLAIMABLE] = { MIGRATE_UNMOVABLE, MIGRATE_MOVABLE, MIGRATE_TYPES },
[MIGRATE_MOVABLE] = { MIGRATE_RECLAIMABLE, MIGRATE_UNMOVABLE, MIGRATE_TYPES },
#ifdef CONFIG_CMA
[MIGRATE_CMA] = { MIGRATE_TYPES }, /* Never used */
#endif
#ifdef CONFIG_MEMORY_ISOLATION
[MIGRATE_ISOLATE] = { MIGRATE_TYPES }, /* Never used */
#endif
};

这张表描述的是当一种类型无法满足时退化选择的迁移类型的一个顺序.接下来看函数的实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

/* Remove an element from the buddy allocator from the fallback list */
static inline struct page *
__rmqueue_fallback(struct zone *zone, unsigned int order, int start_migratetype)
{
struct free_area *area;
unsigned int current_order;
struct page *page;
int fallback_mt;
bool can_steal;

/* Find the largest possible block of pages in the other list */
for (current_order = MAX_ORDER-1;
current_order >= order && current_order <= MAX_ORDER-1;
--current_order) {
area = &(zone->free_area[current_order]);
/* 遍历fallback table 找到合适的fallback migratetype 要综合考虑: 是否有超过一定阈值的空闲页,并且是可以退化的页类型 */
fallback_mt = find_suitable_fallback(area, current_order,
start_migratetype, false, &can_steal);
if (fallback_mt == -1)
continue;

page = list_first_entry(&area->free_list[fallback_mt],
struct page, lru);
/* 从对应迁类型中"偷取"page TODO:具体如何迁移的 我感觉应该是很抽象的迁移就可以了 */
if (can_steal)
steal_suitable_fallback(zone, page, start_migratetype);

/* Remove the page from the freelists */
area->nr_free--;
list_del(&page->lru);
rmv_page_order(page);

expand(zone, page, order, current_order, area,
start_migratetype);
/*
* The pcppage_migratetype may differ from pageblock's
* migratetype depending on the decisions in
* find_suitable_fallback(). This is OK as long as it does not
* differ for MIGRATE_CMA pageblocks. Those can be used as
* fallback only via special __rmqueue_cma_fallback() function
*/
set_pcppage_migratetype(page, start_migratetype);

trace_mm_page_alloc_extfrag(page, order, current_order,
start_migratetype, fallback_mt);

return page;
}

return NULL;
}

和主要分配路径不同的事, 迭代器是从最大的阶数开始迭代的,这个代表的意思是说我尽量多迁移一点空间给你, 不然你还要迁移的时候我又分配一小块空间给你, 你的碎片间接导致了我的碎片产生, 所以从大到小开始迭代.

整个核心部分分析完以后, 按照从里向外分析的思路,这是基于__rmqueu的一套更高层次的函数, 再倒回来看alloc_pages的实现会发现非常容易理解.首先__alloc_pages调用__alloc_pages_nodemask. 这里补充一个结构体.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
* Structure for holding the mostly immutable allocation parameters passed
* between functions involved in allocations, including the alloc_pages*
* family of functions.
*
* nodemask, migratetype and high_zoneidx are initialized only once in
* __alloc_pages_nodemask() and then never change.
*
* zonelist, preferred_zone and classzone_idx are set first in
* __alloc_pages_nodemask() for the fast path, and might be later changed
* in __alloc_pages_slowpath(). All other functions pass the whole strucure
* by a const pointer.
*/
struct alloc_context {
struct zonelist *zonelist;
nodemask_t *nodemask;
struct zoneref *preferred_zoneref;
int migratetype;
enum zone_type high_zoneidx;
bool spread_dirty_pages;
};

这个结构体主要是一些内存分配的参数, 因为从上到下的调用都一直有用到这几个参数所以合成一个context向下传递.

__alloc_pages_nodemask第一步会检查cpusets的功能是否打开, 这个是一个cgroup的子模块, 如果没有设置nodemask就会用cpusets配置的cpuset_current_mems_allowed来限制在哪个node上分配, 这个也是在NUMA结构当中才会有用的. 之后会用到might_sleep_if,判断gfp_mask & __GFP_DIRECT_RECLAIM), 表示当前内存压力比较大需要直接回收内存, 会循环睡眠同步等待页可用, 而might_sleep_if是一个debug函数,标记当前函数在iftrue的时候表示可能会进入睡眠, 如果当前调用进入了一个不可睡眠的上下文就会报错. should_fail_alloc_page会做一些预检查, 一些无法分配的条件会直接报错.
接着获取read_mems_allowed_begin,这个会拿到current当前进程的cpusets允许的节点分配掩码, 这个结果的读取通过顺序锁(seqcount_t)进行, 如果在分配page期间这个值发生了改变(通过read_mems_allowed_retry判断), 那么读操作需要重新进行尝试.
调用first_zones_zonelist会从zonelist这个表示分配page的zone的优先顺序链表里获取第一个zoneref, zoneref是一个链表节点用于迭代之后的zone选项.
接着就会调用关键函数get_page_from_freelist从而分配到页.
分配完页如果失败会调用__alloc_pages_slowpath唤起swapd进程进行页回收从而获取可用的页, 再尝试调用get_page_from_free_list.
函数的调用图如下.

这是最外层的一个过程, 但是内核往往经过了很多实践才会有这么一套机制, 内核在这一层之前加了一套缓存, 来加速物理页的分配, get_page_from_free_list到伙伴分配器之间还有一层高速缓存. 这个结构是一个percpu结构, 每个cpu独有一份链表缓存分配的页,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct per_cpu_pages {
int count; /* number of pages in the list */
int high; /* high watermark, emptying needed */
int batch; /* chunk size for buddy add/remove */

/* Lists of pages, one per migrate type stored on the pcp-lists */
struct list_head lists[MIGRATE_PCPTYPES];
};

struct per_cpu_pageset {
struct per_cpu_pages pcp;
#ifdef CONFIG_NUMA
s8 expire;
#endif
#ifdef CONFIG_SMP
s8 stat_threshold;
s8 vm_stat_diff[NR_VM_ZONE_STAT_ITEMS];
#endif
};

每个zone都会有一个这个结构用于缓存页的分配, 中文一般就叫高速缓存.

缓存的过程是这样的, 首先通过for_next_zone_zonelist_nodemask遍历每个zone 会选择合适的zone, 选择合适的zone一笔带过但其实考虑了很多因素比如是否满足分配的wartermark, 是否需要均衡到别的zone当中, 是否被cpusets禁止了分配等等, 然后调用buffered_rmqueue这个函数, 这个函数就是构建对buddy分配器裹了一层高速缓存. 这个函数在只分配1页(order=0)的情况下,会判断是不是”冷”页, bool cold = ((gfp_flags & __GFP_COLD) != 0), 冷页的区别是会放到高速缓存的末尾, 相反热页会放到高速缓存的头部. 然后尝试获取高速缓存的page, 如果高速缓存为空就调用rmqueue_bulk分配一段batch大小的页块到高速缓存中继续尝试分配, 而rmqueue_bulk就是循环了batch次分配了order=0的page逐个添加到高速缓存当中. 如果需要分配的order不为0的话还是会直接从buddy-system当中分配不依靠高速缓存. 高速缓存主要是缓存单页的分配.

所以调整后的走高速缓存的主流程就如下图所示.

释放

还是从伙伴系统的最内部实现向外说, 释放对应的是__free_one_page, 指定释放\(2^{order}\)的物理页块, 地址为page, 可以说释放过程是拆分过程的一个逆过程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
* Freeing function for a buddy system allocator.
*
* The concept of a buddy system is to maintain direct-mapped table
* (containing bit values) for memory blocks of various "orders".
* The bottom level table contains the map for the smallest allocatable
* units of memory (here, pages), and each level above it describes
* pairs of units from the levels below, hence, "buddies".
* At a high level, all that happens here is marking the table entry
* at the bottom level available, and propagating the changes upward
* as necessary, plus some accounting needed to play nicely with other
* parts of the VM system.
* At each level, we keep a list of pages, which are heads of continuous
* free pages of length of (1 << order) and marked with _mapcount
* PAGE_BUDDY_MAPCOUNT_VALUE. Page's order is recorded in page_private(page)
* field.
* So when we are allocating or freeing one, we can derive the state of the
* other. That is, if we allocate a small block, and both were
* free, the remainder of the region must be split into blocks.
* If a block is freed, and its buddy is also free, then this
* triggers coalescing into a block of larger size.
*
* -- nyc
*/

static inline void __free_one_page(struct page *page, /* 把page加回到free_list里面 */
unsigned long pfn,
struct zone *zone, unsigned int order,
int migratetype)
{
unsigned long page_idx;
unsigned long combined_idx;
unsigned long uninitialized_var(buddy_idx);
struct page *buddy;
unsigned int max_order;

max_order = min_t(unsigned int, MAX_ORDER, pageblock_order + 1); /* 获取最大的阶数 */

VM_BUG_ON(!zone_is_initialized(zone)); /* 检查wait_table是否存在来表示zone是否初始化过 里面用到了双感叹号表示强制1或者0 */
VM_BUG_ON_PAGE(page->flags & PAGE_FLAGS_CHECK_AT_PREP, page);

VM_BUG_ON(migratetype == -1);
if (likely(!is_migrate_isolate(migratetype))) /* 是isolate类型 */
__mod_zone_freepage_state(zone, 1 << order, migratetype);

page_idx = pfn & ((1 << MAX_ORDER) - 1); /* 获取page的下标 */

VM_BUG_ON_PAGE(page_idx & ((1 << order) - 1), page);
VM_BUG_ON_PAGE(bad_range(zone, page), page);

continue_merging:
while (order < max_order - 1) {
buddy_idx = __find_buddy_index(page_idx, order);/* 获取buddy的下标 */
buddy = page + (buddy_idx - page_idx); /* 根据相对距离得到buddy page */
if (!page_is_buddy(page, buddy, order)) /* 如果不是buddy就结束合并 */
goto done_merging;
/*
* Our buddy is free or it is CONFIG_DEBUG_PAGEALLOC guard page,
* merge with it and move up one order.
*/
if (page_is_guard(buddy)) {
clear_page_guard(zone, buddy, order, migratetype);
} else {
/* 把buddy从free_area当中移除 */
list_del(&buddy->lru);
zone->free_area[order].nr_free--;
rmv_page_order(buddy);
}
combined_idx = buddy_idx & page_idx;
page = page + (combined_idx - page_idx);
page_idx = combined_idx;
order++; /* 向上合并 */
}
if (max_order < MAX_ORDER) {
/* If we are here, it means order is >= pageblock_order.
* We want to prevent merge between freepages on isolate
* pageblock and normal pageblock. Without this, pageblock
* isolation could cause incorrect freepage or CMA accounting.
*
* We don't want to hit this code for the more frequent
* low-order merging.
*/
/* 在这里说明已经超出了pageblock的order, 可能在不同的migratetype的block边界了,这里再检查一次是不是isolate, 不允许节点间迁移的page, 如果是的话就要结束合并的迭代,不然继续向上合并 */
if (unlikely(has_isolate_pageblock(zone))) {
int buddy_mt;

buddy_idx = __find_buddy_index(page_idx, order);
buddy = page + (buddy_idx - page_idx);
buddy_mt = get_pageblock_migratetype(buddy);

if (migratetype != buddy_mt
&& (is_migrate_isolate(migratetype) ||
is_migrate_isolate(buddy_mt)))
goto done_merging;
}
max_order++;
goto continue_merging;
}

done_merging:
set_page_order(page, order);/* 结束合并自后设置合并之后的阶数 */

/*
* If this is not the largest possible page, check if the buddy
* of the next-highest order is free. If it is, it's possible
* that pages are being freed that will coalesce soon. In case,
* that is happening, add the free page to the tail of the list
* so it's less likely to be used soon and more likely to be merged
* as a higher order page
*/
if ((order < MAX_ORDER-2) && pfn_valid_within(page_to_pfn(buddy))) { /* 这个条件判断没有合并到最大块, 内核认为很有可能接下来这个块和其他free的块合并, 从减少碎片的角度来说会更倾向于放到链表的末尾, 让这个块的free状态保持久一点, 更有机会被合并成更大的块 */
struct page *higher_page, *higher_buddy;
combined_idx = buddy_idx & page_idx; /* 合并后的idx */
higher_page = page + (combined_idx - page_idx); /* 合并后的page */
buddy_idx = __find_buddy_index(combined_idx, order + 1); /* 向上寻找buddy */
higher_buddy = higher_page + (buddy_idx - combined_idx);
if (page_is_buddy(higher_page, higher_buddy, order + 1)) {
list_add_tail(&page->lru,
&zone->free_area[order].free_list[migratetype]);
goto out;
}
}

list_add(&page->lru, &zone->free_area[order].free_list[migratetype]);
/* 把page加入到对应的order当中 */
out:
zone->free_area[order].nr_free++; /* free_area nr_free 空闲块加1 */
}

这个函数就是从order开始迭代不断寻找自己的buddy, 寻找buddy的函数是__find_buddy_index, 首先要说的是page是以数组的形式存放在mem_map当中, 整个page是可以通过index线性索引的.要找到自己的buddy只要在对应的order位进行抑或即可.

1
2
3
4
5
6
7
static inline unsigned long
__find_buddy_index(unsigned long page_idx, unsigned int order)
{
/* idx 抑或 2^order order那位是0, 那么buddy应该是1
* order那位是1, 那么buddy应该是0 */
return page_idx ^ (1 << order);
}

然后层层递归把低层的free_area中的page块向高层移动,最后有一个优化, 内核认为刚释放的page很可能相关的buddy页也会释放, 所以把刚释放的页放到末尾让他更晚被分配出去从而得到更多的机会被合并成大块.

现在再从外层看物理页的分配.

入口函数是__free_pages, 其实知道了分配的过程对于释放的过程可以说是轻而易举就能理解的.首先这个函数put_page_testzero会减少引用计数, 如果到达0才真正执行释放操作.释放路径有两个一个是order为0和不为0的情况, 为0很好理解, 需要交回到高速缓存即可, 如果高速缓存满了再交回给buddy system, 不为0说明也没有缓存过,就直接调用free_one_page,这个函数会调用__free_one_page,最终还给buddy system. 所以这里主要看一下代缓存的路径, 先是调用free_hot_cold_page, 指定是热页也就是添加到高速缓存的头,如果计数器超过阈值pcp->count >= pcp->high调用free_pcppages_bulk交还给buddy system, 这个函数可以遍历高速缓存中的migratetype,然后根据指定的count从高速缓存中交还给buddy system的页, 当前路径是整个batch个的页都会还回去.

带高速缓存的释放的调用图如图所示.

总结

伙伴分配器的确是一个非常简洁的算法(我认为可以递归表示和迭代表示算法都美妙无比), 但是内核对于这样一个简单的算法也结合了很多实际工程的内容还有一些优化,比如迁移类型还有冷热页的标记和高速缓存等等. 充分理解内核的物理页分配对于接下来理解slab分配起和内核虚拟内存的相关内容有很好的帮助作用. 文末给出了许多参考链接, 我结合了前人的知识进行总结, 并且基于最新的4.7.2的内核重新走了一遍分析之路, 做了解释和补充, 对自己的学习是一个积累, 同时希望对大家有帮助.

参考

  1. 迁移类型 实现反碎片
  2. 页面迁移
  3. LVMM 的Physical Page Allocation 章节
  4. Linux 物理页面分配
  5. 顺序锁