ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

neighbor 在协议栈里指的是同一个 LAN 下面的邻居,也就是他们在 L3 上通过媒介或者点对点连接在了一起。邻居子系统同时也可以理解为一个 L2 和 L3 地址的转换器。邻居子系统的目的就在于上层协议不应该关心下层的地址信息,我发送过来的 IP 地址应该让下层来决定发送到哪个 MAC 地址。neighbor solicitation and neighbor advertisement,可以分别对应 ARP 的 request 请求和 ARP 的 reply 请求。

邻居子系统主要缓存两大块的内容,一个是 L3 到 L2 的映射解析的缓存,一个是 L2 头的缓存,缓存 L2 头的原因是大部分 L2 的头基本上是重复的,所以通过缓存头部可以加快协议的封装。

以下有几个相关结构体需要介绍一下。

struct neighbour 代表的是一个邻居的信息,比如 L2 和 L3 地址等。

struct neigh_table 代表的是一种邻居协议的接口(比如 ARP)。

struct neigh_params 代表的是邻居协议在每个设备上的不同参数。

struct neigh_ops 邻居对应的一些操作函数。

struct hh_cache 缓存 L2 的头部,不是所有的设备都支持头部缓存。

struct rttablestruct dst_enry, IPv4 的路由缓存信息是通过 struct rttable 缓存的。

这是 dst_entry, hh_cacheneighbour 之间的关系。

neighbor 结构是用 hash 存储的,key 是 L3 地址加设备(对应的 device 结构体)加一个随机值。hash 表通过 neigh_hash_allocneigh_hash_free 来分配和释放。neigh_lookup 就是通过 key 从 hash table 中找到对应的 neighbour 结构体,实现都不复杂,这里理解就可以了。

一般邻居子系统的缓存流程是这样的,如果 L3 的请求到达,地址解析缓存没有命中,把 L3 packet 入队,开始neighbor solicitation 等收到 neighbor advertisement 之后再出队并且发送。

邻居状态信息 NUD(Netowork Unreachablility) 状态

NUD state 本来是 IPv6 协议里的邻居关系的定义,但是在内核中沿用到了 IPv4 里面。

  • NUD_NONE 这个 neighbour 刚开始建立,没有相关状态
  • NUD_INCOMPLETE 发送了获取 L2 地址的请求(通过 ARP 或者其他协议),但是没收到回复,并且之前不存在老缓存
  • NUD_REACHABLE neighbour 地址被缓存了,并且是可达的
  • NUD_FAILED 因为发送地址解析请求失败了,标志邻居不可达
  • NUD_STALE NUD_DELAY NUD_PROBE 是解析请求确认邻居可达的过程中的中间状态
  • NUD_NOARP 标志不需要用解析协议(虽然是 NOARP,但是别的协议也用这个标记),这是一个特殊情况。
  • NUD_PERMANET 标志邻居的地址解析是静态配置的

这些是基本状态,根据这些基本状态组合了几个相对有语义的状态。

  • NUD_VALID 表示该地址会是一个有效的地址

    NUD_PERMANENT NUD_NOARP NUD_REACHABLE NUD_PROBE NUD_STALE NUD_DELAY

  • NUD_CONNECTED 是 NUD_VALID 的子集,去除了待决的中间状态

    NUD_PERMANENT NUD_NOARP NUD_REACHABLE

  • NUD_IN_TIMER 表示在这个状态下正在执行一个定时任务,一般是状态不明了的时候

    NUD_INCOMPLETE NUD_DELAY NUD_PROBE

邻居的可达性可以通过两点来确认,收到了一个地址解析协议的单播回复,或者通过外部信息确认(比如收到了这个邻居的 TCP 报文,当然这个 IP 可能不是自己的邻居,但是可以可以确定对应网关『作为邻居』的可达性)。

确认邻居信息

IP 层会调用ipv4_confirm_neigh 来确认映射地址,如果有 gateway 用 gateway,没有 gateway 开始查缓存。在 net/ipv4/ip_output.c 中的对应代码,就是查缓存的 neighbour 结构体,如果这个结构体不存在的话就要开始confirm 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
neigh = __ipv4_neigh_lookup_noref(dev, nexthop);

if (unlikely(!neigh))
neigh = __neigh_create(&arp_tbl, &nexthop, dev, false);
if (!IS_ERR(neigh)) {
int res;

sock_confirm_neigh(skb, neigh);
res = neigh_output(neigh, skb);

rcu_read_unlock_bh();
return res;
}

当 TCP 收到报文(比如对端的 SYN/ACK 时),这种外部信息说明其实这个节点是可达的(不是来自 gateway),也可以更新缓存。

另外,neigh_connectneigh_suspect 是两个状态转换时会调用的函数。

当 neigh 进入 NUD_REACHABLEneigh_connectneigh->output 的函数指向 connected_output 这个函数,它会在调用 dev_queue_xmit 之前填充 L2 头部,把包直接发出去。

当从 NUD_REACHBLE 转换成 NUD_STALE 或者 NUD_DELAYneigh_suspect 会强制进行可达性的确认,通过把 neighbor->output 指向 neigh_ops->output, 也就是 neigh_resolve_output,它会在调用 dev_queue_xmit 之前先把地址解析出来,等把地址解析完成以后再把缓存的包发送出去。

更新邻居信息

邻居信息更新的入口函数就是 neigh_update,这个函数定义如下。

1
2
int neigh_update(struct neighbour *neigh, const u8 *lladdr, u8 new,
u32 flags, u32 nlmsg_pid)

首先做一些预先检查,如果是管理员配置,原来的配置就不能是 PERMANENT 或者 NOARP 的。

1
2
3
if (!(flags & NEIGH_UPDATE_F_ADMIN) &&
(old & (NUD_NOARP | NUD_PERMANENT)))
goto out;

标记为 dead,然后 goto out

1
2
if (neigh->dead)
goto out;

如果更新为无效的标记的话,删除 timer,并且 supect 一下,如果是 NUD_CONNECTED 状态。
如果是需要标记为失败的 neigh(之前是 INCOMPLETE|NUD_PROBE),则调用 neigh_invalidate,让这个 neigh 无效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (!(new & NUD_VALID)) {
neigh_del_timer(neigh);
if (old & NUD_CONNECTED)
neigh_suspect(neigh);
neigh->nud_state = new;
err = 0;
notify = old & NUD_VALID;
if ((old & (NUD_INCOMPLETE | NUD_PROBE)) &&
(new & NUD_FAILED)) {
neigh_invalidate(neigh);
notify = 1;
}
goto out;
}

接下来是三个条件,一个是 device 没有硬件地址,用 neigh->ha,如果 lladdr 提供了并且老的是有效的,使用老的地址,如果 lladdr 没有提供,直接使用老的地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Compare new lladdr with cached one */
if (!dev->addr_len) {
/* First case: device needs no address. */
lladdr = neigh->ha;
} else if (lladdr) {
/* The second case: if something is already cached
and a new address is proposed:
- compare new & old
- if they are different, check override flag
*/
if ((old & NUD_VALID) &&
!memcmp(lladdr, neigh->ha, dev->addr_len))
lladdr = neigh->ha;
} else {
/* No address is supplied; if we know something,
use it, otherwise discard the request.
*/
err = -EINVAL;
if (!(old & NUD_VALID))
goto out;
lladdr = neigh->ha;
}

如果NUD_CONNECTED 更新 confirm 的时间,更新『更新』的时间。

1
2
3
if (new & NUD_CONNECTED)
neigh->confirmed = jiffies;
neigh->updated = jiffies;

NEIGH_UPDATE_F_OVERRIDE_ISROUTER 标记的是当前 neigh 是一个 router。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* If entry was valid and address is not changed,
do not change entry state, if new one is STALE.
*/
err = 0;
update_isrouter = flags & NEIGH_UPDATE_F_OVERRIDE_ISROUTER;
if (old & NUD_VALID) {
if (lladdr != neigh->ha && !(flags & NEIGH_UPDATE_F_OVERRIDE)) {
update_isrouter = 0;
if ((flags & NEIGH_UPDATE_F_WEAK_OVERRIDE) &&
(old & NUD_CONNECTED)) {
lladdr = neigh->ha;
new = NUD_STALE;
} else
goto out;
} else {
if (lladdr == neigh->ha && new == NUD_STALE &&
!(flags & NEIGH_UPDATE_F_ADMIN))
new = old;
}
}

如果是更新操作,删除老的 timer,如果需要 timer,更新新的 timer,并且设置新状态。

1
2
3
4
5
6
7
8
9
10
11
12
if (new != old) {
neigh_del_timer(neigh);
if (new & NUD_PROBE)
atomic_set(&neigh->probes, 0);
if (new & NUD_IN_TIMER)
neigh_add_timer(neigh, (jiffies +
((new & NUD_REACHABLE) ?
neigh->parms->reachable_time :
0)));
neigh->nud_state = new;
notify = 1;
}

更新 neigh->ha,如果 lladdrneigh->ha 不同的话。

1
2
3
4
5
6
7
8
9
10
11
12
if (lladdr != neigh->ha) {
write_seqlock(&neigh->ha_lock);
memcpy(&neigh->ha, lladdr, dev->addr_len);
write_sequnlock(&neigh->ha_lock);
neigh_update_hhs(neigh);
if (!(new & NUD_CONNECTED))
neigh->confirmed = jiffies -
(NEIGH_VAR(neigh->parms, BASE_REACHABLE_TIME) << 1);
notify = 1;
}
if (new == old)
goto out;

根据状态调用 connect 和 suspect

1
2
3
4
if (new & NUD_CONNECTED)
neigh_connect(neigh);
else
neigh_suspect(neigh);

如果之前老的不是 NUD_VALID,就会把 skb 从 arp_queue,并且释放 arp_queue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
if (!(old & NUD_VALID)) {
struct sk_buff *skb;

/* Again: avoid dead loop if something went wrong */

while (neigh->nud_state & NUD_VALID &&
(skb = __skb_dequeue(&neigh->arp_queue)) != NULL) {
struct dst_entry *dst = skb_dst(skb);
struct neighbour *n2, *n1 = neigh;
write_unlock_bh(&neigh->lock);

rcu_read_lock();

/* Why not just use 'neigh' as-is? The problem is that
* things such as shaper, eql, and sch_teql can end up
* using alternative, different, neigh objects to output
* the packet in the output path. So what we need to do
* here is re-lookup the top-level neigh in the path so
* we can reinject the packet there.
*/
n2 = NULL;
if (dst) {
n2 = dst_neigh_lookup_skb(dst, skb);
if (n2)
n1 = n2;
}
n1->output(n1, skb);
if (n2)
neigh_release(n2);
rcu_read_unlock();

write_lock_bh(&neigh->lock);
}
__skb_queue_purge(&neigh->arp_queue);
neigh->arp_queue_len_bytes = 0;
}

更新 flag,发送 notify 通过 rtnetlink 通知 nlmsg_pid 对应的进程,

1
2
3
4
5
6
7
8
9
10
out:
if (update_isrouter) {
neigh->flags = (flags & NEIGH_UPDATE_F_ISROUTER) ?
(neigh->flags | NTF_ROUTER) :
(neigh->flags & ~NTF_ROUTER);
}
write_unlock_bh(&neigh->lock);

if (notify)
neigh_update_notify(neigh, nlmsg_pid);

这就是邻居信息的更新流程。

当然还有一个缓存项是 L2 header 的缓存这里就简单略过了。

ARP

接下来说一下 ARP 相关的内容,ARP 本身的格式其实很简单。

邻居子系统有一个比较关键的协议就是 ARP,这里简单介绍一下 ARP 协议,我们来看一下 ARP 的格式。

先是两个字节的 L2 类型,然后是两个字节的 L3 类型。一般 L2 是 1 (以太网),L3 是 0x0800 (IPV4),接着跟着的是 L2 类型的长度和 L3 类型的长度,这里 Ethernet 的 MAC 地址是 6 个字节,IPV4 的 IP 地址是 4 个字节,接着是两个字节的操作,1 表示请求,2 表示回复,然后是发送端 L2 的地址,L3 的地址,接着是接收端 L2 和 L3 的地址。整个翻译过来就是『我(MAC 是 0a:00:27:00:00:00 IP 是 192.168.56.1)广而告之(MAC 广播地址),问一下谁知道 192.168.56.102 的地址』。

gARP (gratuitous ARP)

听起来像是一个无理由的 ARP 请求,实际上是一种主动通知的 ARP 请求。gARP 本身不是一个查询请求,而是一个通知请求,主要运用于主动通知 L2 地址改变,重复地址发现(请求解析自己的地址,如果收到回复说明有地址重复)。

还有就是 VIP,一般的作用是在本地网路中有两台机器,一台作为备机,一台作为主机,当主机 failover 的时候,备机可以继续『冒充』主机的 IP 地址,具体的做法就是主动发送请求,解析的 MAC 和 IP 都和 source 一样,老的 server 肯定不会回答这个 ARP,交换机上已经没有这个端口的缓存,会进行广播,让所有的接收者都会更新自己的缓存。也就是发送了一个一去不复返的请求,让所有的邻居更新了自己的 ARP 缓存,从而替代了老 server 的 IP,这就是 VIP 通过 ARP 实现的 failover。

总结

邻居子系统很大一部分的作用就是解析和缓存地址映射,主要是通过 ARP 来完成,而且 ARP 本身也有很多使用的姿势,也就上面说到的 gARP,邻居子系统是沟通路由子系统,以及 L2 和 L3 的桥梁。

参考:

  1. Understanding Linux Network Internals

ip 分片的主体函数在 ip_fragment 当中,重组则在 ip_defrag 当中。第一个分片的标志 Offset 为 0,MF 为 1,之后的分片则是 Offset 非 0,MF 为 1,最后一个分片则是 Offset 非 0,但是 MF 为 0。以此来分别当前的 IP packet 是否是一个分片。从 IP 层向上层协议发送数据包的时候就会进行重组,比如在 ip_local_deliver 当中,调用了。说一句题外话, TCP 有 MSS ,保障 TCP message 不超过分片大小,这样是一种对底层协议有感知的行为。

1
2
3
4
if (ip_is_fragment(ip_hdr(skb))) {
if (ip_defrag(net, skb, IP_DEFRAG_LOCAL_DELIVER))
return 0;
}

IP 分片

ip_is_fragment 对应的条件就是 (iph->frag_off & htons(IP_MF | IP_OFFSET)) != 0;

ip_fragment 当中会碰到几种情况,一种是不需要分片的 IP packet,这种很好,省心,一种是需要分片的 IP packet,这种最操心,还有一种是已经按分片负载的长度分配好了 buffer 只要加个头就相当于分片完成了就也非常棒。要从头开始进行分配的情况属于慢速路径,而已经有 buffer 准好的,直接加个头就完事的属于快速路径,快速路径的内存拷贝代价更低。

ip_fragment 主要检查 IP 是否允许进行分片,不然的话就返回一个 ICMP 错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct iphdr *iph = ip_hdr(skb);

if ((iph->frag_off & htons(IP_DF)) == 0)
return ip_do_fragment(net, sk, skb, output);

if (unlikely(!skb->ignore_df ||
(IPCB(skb)->frag_max_size &&
IPCB(skb)->frag_max_size > mtu))) {
IP_INC_STATS(net, IPSTATS_MIB_FRAGFAILS);
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_FRAG_NEEDED,
htonl(mtu));
kfree_skb(skb);
return -EMSGSIZE;
}

return ip_do_fragment(net, sk, skb, output);

然后进入到 ip_do_fragment 当中。我们先看一下慢速路径是如何处理。

首先知道 IP 头部的长度,已经负载 (left),然后当前的指针,已经链路层需要预留的长度。

1
2
3
4
5
6
7
8
slow_path:
iph = ip_hdr(skb);

left = skb->len - hlen; /* Space per frame */
ptr = hlen; /* Where to start from */

ll_rs = LL_RESERVED_SPACE(rt->dst.dev);

IP 的 offset,以及不是最后一个分片的标志位,这里是进行分片的,不知道为什么要获取一些重组时候需要的数据,TODO。

/*
 *    Fragment the datagram.
 */

offset = (ntohs(iph->frag_off) & IP_OFFSET) << 3;
not_last_frag = iph->frag_off & htons(IP_MF);

调整要分配的 skb_buff 的长度,首先不能超过 mtu,然后最后一段要按 8 对齐。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* Keep copying data until we run out.
*/

while (left > 0) {
len = left;
/* IF: it doesn't fit, use 'mtu' - the data space left */
if (len > mtu)
len = mtu;
/* IF: we are not sending up to and including the packet end
then align the next start on an eight byte boundary */
if (len < left) {
len &= ~7;
}

/* Allocate buffer */
skb2 = alloc_skb(len + hlen + ll_rs, GFP_ATOMIC);
if (!skb2) {
err = -ENOMEM;
goto fail;
}

设置分片的元数据,ip_copy_metadata 会拷贝优先级,协议类型,等辅助信息。然后保留 L2 的头部空间,接着在保留 IP 层的长度,然后设置网络头部,接着设置传输层头部的位置,就是一些初始化的动作。

1
2
3
4
5
6
7
8
9
/*
* Set up data on packet
*/

ip_copy_metadata(skb2, skb);
skb_reserve(skb2, ll_rs);
skb_put(skb2, len + hlen);
skb_reset_network_header(skb2);
skb2->transport_header = skb2->network_header + hlen;

设置对应 sk 为 owner

1
2
3
4
5
6
7
/*
* Charge the memory for the fragment to any owner
* it might possess
*/

if (skb->sk)
skb_set_owner_w(skb2, skb->sk);

拷贝网络层的头部

1
2
3
4
5
/*
* Copy the packet header into the new buffer.
*/

skb_copy_from_linear_data(skb, skb_network_header(skb2), hlen);

然后拷贝真正的负载,这里没有直接用 memcpy 的原因是,对应的空间不一定是连续的,它可能含有 frag_list,甚至是之前检查没有通过的快速路径到达了这里。

1
2
3
4
5
6
/*
* Copy a block of the IP datagram.
*/
if (skb_copy_bits(skb, ptr, skb_transport_header(skb2), len))
BUG();
left -= len;

设置 IP 头的偏移和分片标志。

1
2
3
4
5
6
7
8
/*
* Fill in the new header fields.
*/
iph = ip_hdr(skb2);
iph->frag_off = htons((offset >> 3));

if (IPCB(skb)->flags & IPSKB_FRAG_PMTU)
iph->frag_off |= htons(IP_DF);

如果是第一个分片就尝试更新 IP options。

1
2
3
4
5
6
7
8
/* ANK: dirty, but effective trick. Upgrade options only if
* the segment to be fragmented was THE FIRST (otherwise,
* options are already fixed) and make it ONCE
* on the initial skb, so that all the following fragments
* will inherit fixed options.
*/
if (offset == 0)
ip_options_fragment(skb);

最后修改位移,更新标记位,计算 checksum,然后送到 output。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* Added AC : If we are fragmenting a fragment that's not the
* last fragment then keep MF on each bit
*/
if (left > 0 || not_last_frag)
iph->frag_off |= htons(IP_MF);
ptr += len;
offset += len;

/*
* Put this fragment into the sending queue.
*/
iph->tot_len = htons(len + hlen);

ip_send_check(iph);

err = output(net, sk, skb2);
if (err)
goto fail;

IP_INC_STATS(net, IPSTATS_MIB_FRAGCREATES);

这个就是慢速路径的分片过程,快速路径的分片过程其实更简单,因为比较麻烦的事情已经在 ip_append_data 里面处理过了,在我上一篇文章里面有介绍这个过程,就是在上层调用 ip_append_data 的时候,会在主动的进行分段式的缓存,而不使用连续空间,每个分段式的换粗也会不超过分片的大小,这样每个缓存就可以直接用来做分片了。

现在再回头看快速路径,快速路径主要检查有没有 frag_list 也就是之前分配好的 buffer 列表。获取第一个 buffer (存在 frags 里面,不是 frag_list)的长度,如果比 mtu 大,或者不是 8 的倍数,或者已经是分段了,或者是一段 shared skb_buff (因为快速路径不会拷贝内存,慢速路径会会分配新的内存,不影响之前有人引用)都不行,要进入慢速路径。

1
2
3
4
5
6
7
8
9
10
if (skb_has_frag_list(skb)) {
struct sk_buff *frag, *frag2;
unsigned int first_len = skb_pagelen(skb);

if (first_len - hlen > mtu ||
((first_len - hlen) & 7) ||
ip_is_fragment(iph) ||
skb_cloned(skb))
goto slow_path;

首先保证每个 frag_list 里面 frag 不超过 mtu,然后不是最后一段需要是 8 的倍数,有足够的头部空间用来给新的 IP 分片用,然后 frag 的 buffer 也不能是 shared,最后绑定 sk 关系,减掉 skb 的 truesize。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
skb_walk_frags(skb, frag) {
/* Correct geometry. */
if (frag->len > mtu ||
((frag->len & 7) && frag->next) ||
skb_headroom(frag) < hlen)
goto slow_path_clean;

/* Partially cloned skb? */
if (skb_shared(frag))
goto slow_path_clean;

BUG_ON(frag->sk);
if (skb->sk) {
frag->sk = skb->sk;
frag->destructor = sock_wfree;
}
skb->truesize -= frag->truesize;
}

到这里就可以真的开始分片了,初始化头部信息,以及要用来分片的 frag。

1
2
3
4
5
6
7
8
9
err = 0;
offset = 0;
frag = skb_shinfo(skb)->frag_list;
skb_frag_list_init(skb);
skb->data_len = first_len - skb_headlen(skb);
skb->len = first_len;
iph->tot_len = htons(first_len);
iph->frag_off = htons(IP_MF);
ip_send_check(iph);

这个循环里面做的事情就更简单了,比起慢速路径来说,就是给每个原本没有头部的 buffer,加上头部变成真正的 fragment。保留空间,设置网络层头部,拷贝头部memcpy(skb_network_header(frag), iph, hlen);,拷贝原信息,如果是第一个分片更新 options,然后更新标记位,然后送到 output。直到 frag_list 被循环完,这就大功告成了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
for (;;) {
/* Prepare header of the next frame,
* before previous one went down. */
if (frag) {
frag->ip_summed = CHECKSUM_NONE;
skb_reset_transport_header(frag);
__skb_push(frag, hlen);
skb_reset_network_header(frag);
memcpy(skb_network_header(frag), iph, hlen);
iph = ip_hdr(frag);
iph->tot_len = htons(frag->len);
ip_copy_metadata(frag, skb);
if (offset == 0)
ip_options_fragment(frag);
offset += skb->len - hlen;
iph->frag_off = htons(offset>>3);
if (frag->next)
iph->frag_off |= htons(IP_MF);
/* Ready, complete checksum */
ip_send_check(iph);
}

err = output(net, sk, skb);

if (!err)
IP_INC_STATS(net, IPSTATS_MIB_FRAGCREATES);
if (err || !frag)
break;

skb = frag;
frag = skb->next;
skb->next = NULL;
}

IP 重组

重组一般发生在向上层协议栈传输的时候,不过有的路由器也有可能进行重组,可能要对整个 IP packet 进行校验等,一般情况下,转发不太会对 IP 进行重组。IP 重组讲起来也有些麻烦。

每个正在被重组的 IP packet 都会用一个 ipq 表示,这个 ipq 使用的是 hash table (inet_frags->hash) 的搜索结构,没有 ipq 由 源地址,目的地址,协议和 ID 确定,所以存在重复的可能。ip_defrag依赖两个函数一个是ip_find用于寻找 ipq 如果没有找到的话会自动创建一个,其次是用于入队的 ip_frag_queue ,进行重组的工作。sk_buff->cb 用于保存当前的 offset。对于分片的重组也会有超时机制,防止一个 ipq 停留太长的时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* Process an incoming IP datagram fragment. */
int ip_defrag(struct net *net, struct sk_buff *skb, u32 user)
{
struct net_device *dev = skb->dev ? : skb_dst(skb)->dev;
int vif = l3mdev_master_ifindex_rcu(dev);
struct ipq *qp;

__IP_INC_STATS(net, IPSTATS_MIB_REASMREQDS);
skb_orphan(skb);

/* Lookup (or create) queue header */
qp = ip_find(net, ip_hdr(skb), user, vif);
if (qp) {
int ret;

spin_lock(&qp->q.lock);

ret = ip_frag_queue(qp, skb);

spin_unlock(&qp->q.lock);
ipq_put(qp);
return ret;
}

__IP_INC_STATS(net, IPSTATS_MIB_REASMFAILS);
kfree_skb(skb);
return -ENOMEM;
}

ip_find 主要两个功能,根据原信息计算 hash 值,从net->ipv4.frags 的 hash 表当中寻找到对应的 ipq

1
2
3
4
hash = ipqhashfn(iph->id, iph->saddr, iph->daddr, iph->protocol);

q = inet_frag_find(&net->ipv4.frags, &ip4_frags, &arg, hash);

然后进入到 ip_frag_queue 当中首先检查,如果出现错误,就把 ipq 标记为可以被之后的垃圾回收清扫。

1
2
3
4
5
6
7
8
9
10
if (qp->q.flags & INET_FRAG_COMPLETE)
goto err;

if (!(IPCB(skb)->flags & IPSKB_FRAG_COMPLETE) &&
unlikely(ip_frag_too_far(qp)) &&
unlikely(err = ip_frag_reinit(qp))) {
ipq_kill(qp);
goto err;
}

获取 offset,flags 和头部。

1
2
3
4
5
6
7
ecn = ip4_frag_ecn(ip_hdr(skb)->tos);
offset = ntohs(ip_hdr(skb)->frag_off);
flags = offset & ~IP_OFFSET;
offset &= IP_OFFSET;
offset <<= 3; /* offset is in 8-byte chunks */
ihl = ip_hdrlen(skb);

计算这个追加的 fragment 会拷贝的位置的末尾在哪。

1
2
3
4
/* Determine the position of this fragment. */
end = offset + skb->len - skb_network_offset(skb) - ihl;
err = -EINVAL;

如果是最后一个 fragment,那么不应该超过 q.len,或者已经有了最后一个了,但是 endq.len 不一致,所以有一些 corruption。如果检查没问题,就更新q.flasg 标记为最后一个和把 end 赋值给q.len

1
2
3
4
5
6
7
8
9
10
11
/* Is this the final fragment? */
if ((flags & IP_MF) == 0) {
/* If we already have some bits beyond end
* or have different end, the segment is corrupted.
*/
if (end < qp->q.len ||
((qp->q.flags & INET_FRAG_LAST_IN) && end != qp->q.len))
goto err;
qp->q.flags |= INET_FRAG_LAST_IN;
qp->q.len = end;

如果不是最后一个,长度要与 8 对齐,然后更新 q.len

1
2
3
4
5
6
7
8
9
10
11
12
13
} else {
if (end&7) {
end &= ~7;
if (skb->ip_summed != CHECKSUM_UNNECESSARY)
skb->ip_summed = CHECKSUM_NONE;
}
if (end > qp->q.len) {
/* Some bits beyond end -> corruption. */
if (qp->q.flags & INET_FRAG_LAST_IN)
goto err;
qp->q.len = end;
}

剩下的就是从链表 q.fragments 当中中根据offset 寻找到要插入的位置,会先看一下表尾,再进行遍历。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
if (end == offset)
goto err;

err = -ENOMEM;
if (!pskb_pull(skb, skb_network_offset(skb) + ihl))
goto err;

err = pskb_trim_rcsum(skb, end - offset);
if (err)
goto err;

/* Find out which fragments are in front and at the back of us
* in the chain of fragments so far. We must know where to put
* this fragment, right?
*/
prev = qp->q.fragments_tail;
if (!prev || FRAG_CB(prev)->offset < offset) {
next = NULL;
goto found;
}
prev = NULL;
for (next = qp->q.fragments; next != NULL; next = next->next) {
if (FRAG_CB(next)->offset >= offset)
break; /* bingo! */
prev = next;
}

如果和前面的分组有重叠,就把重叠的部分去掉,CHECKSUM_NONE 可以使当前的校验和失效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (prev) {
int i = (FRAG_CB(prev)->offset + prev->len) - offset;

if (i > 0) {
offset += i;
err = -EINVAL;
if (end <= offset)
goto err;
err = -ENOMEM;
if (!pskb_pull(skb, i))
goto err;
if (skb->ip_summed != CHECKSUM_UNNECESSARY)
skb->ip_summed = CHECKSUM_NONE;
}
}

然后向后检查有没有重叠,并且把重叠的部分去掉,如果重叠的部分比 next 本身还要大,直接把 next 删掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
while (next && FRAG_CB(next)->offset < end) {
int i = end - FRAG_CB(next)->offset; /* overlap is 'i' bytes */

if (i < next->len) {
/* Eat head of the next overlapped fragment
* and leave the loop. The next ones cannot overlap.
*/
if (!pskb_pull(next, i))
goto err;
FRAG_CB(next)->offset += i;
qp->q.meat -= i;
if (next->ip_summed != CHECKSUM_UNNECESSARY)
next->ip_summed = CHECKSUM_NONE;
break;
} else {
struct sk_buff *free_it = next;

/* Old fragment is completely overridden with
* new one drop it.
*/
next = next->next;

if (prev)
prev->next = next;
else
qp->q.fragments = next;

qp->q.meat -= free_it->len;
sub_frag_mem_limit(qp->q.net, free_it->truesize);
kfree_skb(free_it);
}
}

剩下的就是插入链表,并且更新 ipq 的信息了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
skb->next = next;
if (!next)
qp->q.fragments_tail = skb;
if (prev)
prev->next = skb;
else
qp->q.fragments = skb;

dev = skb->dev;
if (dev) {
qp->iif = dev->ifindex;
skb->dev = NULL;
}
qp->q.stamp = skb->tstamp;
qp->q.meat += skb->len;
qp->ecn |= ecn;
add_frag_mem_limit(qp->q.net, skb->truesize);
if (offset == 0)
qp->q.flags |= INET_FRAG_FIRST_IN;

fragsize = skb->len + ihl;

if (fragsize > qp->q.max_size)
qp->q.max_size = fragsize;

if (ip_hdr(skb)->frag_off & htons(IP_DF) &&
fragsize > qp->max_df_size)
qp->max_df_size = fragsize;

然后如果,第一个包和最后一个包都收齐了的话,就尝试进行重组。

1
2
3
4
5
6
7
8
9
if (qp->q.flags == (INET_FRAG_FIRST_IN | INET_FRAG_LAST_IN) &&
qp->q.meat == qp->q.len) {
unsigned long orefdst = skb->_skb_refdst;

skb->_skb_refdst = 0UL;
err = ip_frag_reasm(qp, prev, dev);
skb->_skb_refdst = orefdst;
return err;
}

另外垃圾回收的过程,就是在内存超过阈值的时候,把超时的 ipq 从 hash 表当中剔除。内存阈值通过 ip_frag_mem获取。

1
2
3
4
int ip_frag_mem(struct net *net)
{
return sum_frag_mem_limit(&net->ipv4.frags);
}

总结

IP 分片与重组的整体流程大致如此,IP 面临的覆盖的现象,是由于不同的 packet 但是 hash 元素一样导致的。另一方面重叠处理一个是防止出现重叠包攻击导致内存溢出。还有就是具体的校验过程会丢给上层的协议来控制。

IP 层主要的工作是头部验证,头部选项的处理,分片和重组,以及路由,本篇文章主要分析 IP 层的主体流程,路由和分片的具体细节暂时略解。

ip_init 注册 ip_rcv 处理函数,然后初始化路由子系统,和对端管理器。两个结构 ip_tstampsip_identsip_rcv 是 IP 的入口,主要是一些参数检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/*
* Main IP Receive routine.
*/
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
const struct iphdr *iph;
struct net *net;
u32 len;

/* When the interface is in promisc. mode, drop all the crap
* that it receives, do not try to analyse it.
*/
// 如果 L2 地址不是本机地址 pkt_type 就会被设置成 PACKET_OHTERHOST
// 然后进行丢包
if (skb->pkt_type == PACKET_OTHERHOST)
goto drop;


net = dev_net(dev);
__IP_UPD_PO_STATS(net, IPSTATS_MIB_IN, skb->len);
// 检查引用计数,如果有人引用就复制一份自己的 skb。
skb = skb_share_check(skb, GFP_ATOMIC);
if (!skb) {
__IP_INC_STATS(net, IPSTATS_MIB_INDISCARDS);
goto out;
}
// 保证有 iphdr 大小,如果没有,则可能尝试从 skb_shinfo(skb)->frags[] 中获取
if (!pskb_may_pull(skb, sizeof(struct iphdr)))
goto inhdr_error;

iph = ip_hdr(skb);

/*
* RFC1122: 3.2.1.2 MUST silently discard any IP frame that fails the checksum.
*
* Is the datagram acceptable?
*
* 1. Length at least the size of an ip header
* 2. Version of 4
* 3. Checksums correctly. [Speed optimisation for later, skip loopback checksums]
* 4. Doesn't have a bogus length
*/
// ip 头部长度至少 20 个字节
if (iph->ihl < 5 || iph->version != 4)
goto inhdr_error;

BUILD_BUG_ON(IPSTATS_MIB_ECT1PKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_ECT_1);
BUILD_BUG_ON(IPSTATS_MIB_ECT0PKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_ECT_0);
BUILD_BUG_ON(IPSTATS_MIB_CEPKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_CE);
__IP_ADD_STATS(net,
IPSTATS_MIB_NOECTPKTS + (iph->tos & INET_ECN_MASK),
max_t(unsigned short, 1, skb_shinfo(skb)->gso_segs));
// 保证完整的头部大小
if (!pskb_may_pull(skb, iph->ihl*4))
goto inhdr_error;

iph = ip_hdr(skb);
// 做校验和
if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl)))
goto csum_error;
// 保证 skb buffer 的大小比 packet 长度大,不然就丢包
// 这个原因是 L2 有 padding? (TODO)
// 并且 packet 长度至少有头部那么大
len = ntohs(iph->tot_len);
if (skb->len < len) {
__IP_INC_STATS(net, IPSTATS_MIB_INTRUNCATEDPKTS);
goto drop;
} else if (len < (iph->ihl*4))
goto inhdr_error;

/* Our transport medium may have padded the buffer out. Now we know it
* is IP we can trim to the true length of the frame.
* Note this now means skb->len holds ntohs(iph->tot_len).
*/
if (pskb_trim_rcsum(skb, len)) {
__IP_INC_STATS(net, IPSTATS_MIB_INDISCARDS);
goto drop;
}

skb->transport_header = skb->network_header + iph->ihl*4;

/* Remove any debris in the socket control block */
memset(IPCB(skb), 0, sizeof(struct inet_skb_parm));
IPCB(skb)->iif = skb->skb_iif;

/* Must drop socket now because of tproxy. */
skb_orphan(skb);

return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING,
net, NULL, skb, dev, NULL,
ip_rcv_finish);

csum_error:
__IP_INC_STATS(net, IPSTATS_MIB_CSUMERRORS);
inhdr_error:
__IP_INC_STATS(net, IPSTATS_MIB_INHDRERRORS);
drop:
kfree_skb(skb);
out:
return NET_RX_DROP;
}

但实际上,大部分的函数是分成两部分的,真正的行为都在 _finish 后缀当中,前期都是检查。在 netfilter 里面 NF_HOOK 这个宏是有个 okfn 如果通过了 netfilter 的检查就会调用这个函数。对应的就是 ip_rcv_finish 当中,要决定是否继续向上层传递还是要进行找到出口设备确定下一跳,进行转发。如果是从设备就交给主设备的 handler 处理(这个和 VRF 有关,主设备代表这些从设备表示的一个域,用于分配一个专有的 FIB 表等等,类似某种程度的隔离)。

1
2
3
4
5
6
/* if ingress device is enslaved to an L3 master device pass the
* skb to its handler for processing
*/
skb = l3mdev_ip_rcv(skb);
if (!skb)
return NET_RX_SUCCESS;

如果设置了 ip_early_demux 并且不是分片的 IP 包,就会提前调用 TCP 的 early_demux 提前解复用这个包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (net->ipv4.sysctl_ip_early_demux &&
!skb_dst(skb) &&
!skb->sk &&
!ip_is_fragment(iph)) {
const struct net_protocol *ipprot;
int protocol = iph->protocol;

ipprot = rcu_dereference(inet_protos[protocol]);
if (ipprot && (edemux = READ_ONCE(ipprot->early_demux))) {
edemux(skb);
/* must reload iph, skb->head might have changed */
iph = ip_hdr(skb);
}
}

在路由系统中找到 dst 指向的 dst_entry,接下来的处理函数也会存在 dst 当中,为一下三种。

  • ip_forward() 转发到其他主机
  • ip_local_deliver() 传入传输层
  • ip_error() 出现了错误,可能会发送一个 ICMP
1
2
3
4
5
6
7
8
9
10
11
12
13
14

/*
* Initialise the virtual path cache for the packet. It describes
* how the packet travels inside Linux networking.
*/
if (!skb_valid_dst(skb)) {
int err = ip_route_input_noref(skb, iph->daddr, iph->saddr,
iph->tos, dev);
if (unlikely(err)) {
if (err == -EXDEV)
__NET_INC_STATS(net, LINUX_MIB_IPRPFILTER);
goto drop;
}
}

如果编译选项带了 CONFIG_IP_ROUTE_CLASSID 那么有流量控制的 classid 的就会进行一些统计。

1
2
3
4
5
6
7
8
9
10
11
#ifdef CONFIG_IP_ROUTE_CLASSID
if (unlikely(skb_dst(skb)->tclassid)) {
struct ip_rt_acct *st = this_cpu_ptr(ip_rt_acct);
u32 idx = skb_dst(skb)->tclassid;
st[idx&0xFF].o_packets++;
st[idx&0xFF].o_bytes += skb->len;
st[(idx>>16)&0xFF].i_packets++;
st[(idx>>16)&0xFF].i_bytes += skb->len;
}
#endif

首先如果 IP 头部长度大于 5 说明有 options,调用 ip_rcv_options 进行处理,如果失败了就进行丢包。

1
2
if (iph->ihl > 5 && ip_rcv_options(skb))
goto drop;

具体过程在 ip_rcv_options 当中。

1
2
3
4
5
if (skb_cow(skb, skb_headroom(skb))) {
__IP_INC_STATS(dev_net(dev), IPSTATS_MIB_INDISCARDS);
goto drop;
}

首先把 skb_headroom 等于 skb->data - skb->head,计算了头部的长度,如果这个头部有 clone 就会被复制一份来 declone

然后根据把 IP option 存进结构化的 inet_skb_parm 当中,其中有个成员是 struct ip_optionsIPCB

代表的是 #define IPCB(skb) ((struct inet_skb_parm*)((skb)->cb))skb->cb 是一个缓冲区用于协议栈每层的处理函数存放一些临时的私有变量。

1
2
3
4
5
6
7
8
9
iph = ip_hdr(skb);
opt = &(IPCB(skb)->opt);
opt->optlen = iph->ihl*4 - sizeof(struct iphdr);

if (ip_options_compile(dev_net(dev), opt, skb)) {
__IP_INC_STATS(dev_net(dev), IPSTATS_MIB_INHDRERRORS);
goto drop;
}

in_device 是和 IP 有关的设备信息,如果没有 source route 选项就直接跳过,不然处理 source route 选项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (unlikely(opt->srr)) {
struct in_device *in_dev = __in_dev_get_rcu(dev);

if (in_dev) {
if (!IN_DEV_SOURCE_ROUTE(in_dev)) {
if (IN_DEV_LOG_MARTIANS(in_dev))
net_info_ratelimited("source route option %pI4 -> %pI4\n",
&iph->saddr,
&iph->daddr);
goto drop;
}
}

if (ip_options_rcv_srr(skb))
goto drop;
}

source route 是一个多字节选项。此选项中,发送节点会列出后续几跳的 IP 地址(不能超过 IP 报头的最大长度)。如果列表中有某台主机宕机了,则必须重新计算来源地路由,重新发送,而不是使用动态路由。ip_options_rcv_srr 的具体工作就是根据提取出的目的地在本地计算目的地是否可达,如果成功就反回 0, 不然就丢弃。

ip_rcv_options 出来以后根据组播广播进行数据统计。下面的 IN_DEV_ORCONF 不太确定是啥 (TODO)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
rt = skb_rtable(skb);
if (rt->rt_type == RTN_MULTICAST) {
__IP_UPD_PO_STATS(net, IPSTATS_MIB_INMCAST, skb->len);
} else if (rt->rt_type == RTN_BROADCAST) {
__IP_UPD_PO_STATS(net, IPSTATS_MIB_INBCAST, skb->len);
} else if (skb->pkt_type == PACKET_BROADCAST ||
skb->pkt_type == PACKET_MULTICAST) {
struct in_device *in_dev = __in_dev_get_rcu(dev);

/* RFC 1122 3.3.6:
*
* When a host sends a datagram to a link-layer broadcast
* address, the IP destination address MUST be a legal IP
* broadcast or IP multicast address.
*
* A host SHOULD silently discard a datagram that is received
* via a link-layer broadcast (see Section 2.4) but does not
* specify an IP multicast or broadcast destination address.
*
* This doesn't explicitly say L2 *broadcast*, but broadcast is
* in a way a form of multicast and the most common use case for
* this is 802.11 protecting against cross-station spoofing (the
* so-called "hole-196" attack) so do it for both.
*/
if (in_dev &&
IN_DEV_ORCONF(in_dev, DROP_UNICAST_IN_L2_MULTICAST))
goto drop;
}

return dst_input(skb);

进入 dst_input 就会交给 skb­>dst ­>input 来处理。

转发会分成两部分处理 ip_forwardip_forward_finish IP 转发分成几个步骤

  1. 处理 IP options,可能会要求记录本地 IP 地址和时间戳
  2. 基于 IP 头,确保这个 pakcet 可以发出去
  3. 减 1 TTL,到达 0 就丢弃
  4. 根据 MTU 进行分组
  5. 发送至出口设备

期间如果出错了,会通过 ICMP 告知。xfrm4_xxx 是 IPsec 相关的函数。

1
2
if (IPCB(skb)->opt.router_alert && ip_call_ra_chain(skb))
return NET_RX_SUCCESS;

首先是检查 IP 选项当中有没有 Router Alert,如果有的话就交给 ip_ra_chain 中对此感兴趣的 raw socket 来处理,并且就次结束。

在这里检查 TTL 是否耗尽

1
2
3
if (ip_hdr(skb)->ttl <= 1)
goto too_many_hops;

如果是严格的源路由,下一条是网关而不是直接连接的路由就丢包。

rt_uses_gateway 代表两种意思

  • 1 的时候表示网关
  • 0 的时候表示直接路由
1
2
3
if (opt->is_strictroute && rt->rt_uses_gateway)
goto sr_failed;

如果超出 MTU 进行丢包

1
2
3
4
5
6
7
8
9
IPCB(skb)->flags |= IPSKB_FORWARDED;
mtu = ip_dst_mtu_maybe_forward(&rt->dst, true);
if (ip_exceeds_mtu(skb, mtu)) {
IP_INC_STATS(net, IPSTATS_MIB_FRAGFAILS);
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_FRAG_NEEDED,
htonl(mtu));
goto drop;
}

declone 这个 skb 并且确保预留 L2 的空间,然后减少 TTL。

1
2
3
4
5
6
7
8
/* We are about to mangle packet. Copy it! */
if (skb_cow(skb, LL_RESERVED_SPACE(rt->dst.dev)+rt->dst.header_len))
goto drop;
iph = ip_hdr(skb);

/* Decrease ttl after skb cow done */
ip_decrease_ttl(iph);

如果被标记为 IPSKB_DOREDIRECT 发送 redirect ICMP,接着设置优先级。

1
2
3
4
5
6
7
8
9
10
/*
* We now generate an ICMP HOST REDIRECT giving the route
* we calculated.
*/
if (IPCB(skb)->flags & IPSKB_DOREDIRECT && !opt->srr &&
!skb_sec_path(skb))
ip_rt_send_redirect(skb);

skb->priority = rt_tos2priority(iph->tos);

然后进入 ip_forward_finish

1
2
3
return NF_HOOK(NFPROTO_IPV4, NF_INET_FORWARD,
net, NULL, skb, skb->dev, rt->dst.dev,
ip_forward_finish);

ip_forward_finish 当中会进入 dst_outputip_forward_options 会处理一些 IP 选项并且重新计算 IP 头的校验和。

1
2
3
4
5
6

if (unlikely(opt->optlen))
ip_forward_options(skb);

return dst_output(net, sk, skb);

在内部有两种情况,一种是单播,一种是广播,对应的处理函数分别是 ip_outputip_mc_output 两种处理函数,会进行分组操作,然后在 ip_finish_output 当中进入邻居系统。

1
skb_dst(skb)->output(net, sk, skb);

ip_local_deliver 主要的工作是对分片进行重组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int ip_local_deliver(struct sk_buff *skb)
{
/*
* Reassemble IP fragments.
*/
struct net *net = dev_net(skb->dev);

if (ip_is_fragment(ip_hdr(skb))) {
if (ip_defrag(net, skb, IP_DEFRAG_LOCAL_DELIVER))
return 0;
}

return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN,
net, NULL, skb, skb->dev, NULL,
ip_local_deliver_finish);
}

这是 3 层接收端的一个大体结构,下面看一下 3 层发送端的一些内容。发送的入口

1
2
int ip_queue_xmit(struct sk_buff *skb, int ipfragok)

首先会检查是否已经有了路由信息,这个在 SCTP 的情况下会发生。

1
2
3
4
rt = skb_rtable(skb);
if (rt)
goto packet_routed;

检查是否缓存了 route 如果有的话却是路由信息的有效性

1
2
/* Make sure we can route this packet. */
rt = (struct rtable *)__sk_dst_check(sk, 0);

ip_route_output_ports 确保 source route list 的 下一跳和 daddr 一致,并且将路由设置在 skb 里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (!rt) {
__be32 daddr;

/* Use correct destination address if we have options. */
daddr = inet->inet_daddr;
if (inet_opt && inet_opt->opt.srr)
daddr = inet_opt->opt.faddr;

/* If this fails, retransmit mechanism of transport layer will
* keep trying until route appears or the connection times
* itself out.
*/
rt = ip_route_output_ports(net, fl4, sk,
daddr, inet->inet_saddr,
inet->inet_dport,
inet->inet_sport,
sk->sk_protocol,
RT_CONN_FLAGS(sk),
sk->sk_bound_dev_if);
if (IS_ERR(rt))
goto no_route;
sk_setup_caps(sk, &rt->dst);
}
skb_dst_set_noref(skb, &rt->dst);

接下来就需要构建 IP 头了,先调用空出需要的 IP 头部空间,并且重置我的大脑。

1
2
/* OK, we know where to send it, allocate and build IP header. */
skb_push(skb, sizeof(struct iphdr) + (inet_opt ? inet_opt->opt.optlen : 0));

重置 IP 头长度其实就是设置了 network_header 等于 data - head

1
skb_reset_network_header(skb);

转换成 16 位的指针并且把 IP 协议号,IP 头长度,inet->tos TOS 写入。

1
2
iph = ip_hdr(skb);
*((__be16 *)iph) = htons((4 << 12) | (5 << 8) | (inet->tos & 0xff));

初始化 iph

1
2
3
4
5
6
7
if (ip_dont_fragment(sk, &rt->dst) && !skb->ignore_df)
iph->frag_off = htons(IP_DF);
else
iph->frag_off = 0;
iph->ttl = ip_select_ttl(inet, &rt->dst);
iph->protocol = sk->sk_protocol;
ip_copy_addrs(iph, fl4);

如果有选项长度更新选项长度

1
2
3
4
5
6
/* Transport layer set skb->h.foo itself. */

if (inet_opt && inet_opt->opt.optlen) {
iph->ihl += inet_opt->opt.optlen >> 2;
ip_options_build(skb, &inet_opt->opt, inet->inet_daddr, rt, 0);
}

基于是否分片给 packet 分配 ID,然后进入 ip_local_out

1
2
3
4
5
6
7
8
ip_select_ident_segs(net, skb, sk,
skb_shinfo(skb)->gso_segs ?: 1);

/* TODO : should we use skb->sk here instead of sk ? */
skb->priority = sk->sk_priority;
skb->mark = sk->sk_mark;

res = ip_local_out(net, sk, skb);

当这里先告一段落,再来看一下 ip_append_data 这个函数的作用是缓存进合理的结构以便之后进行分片然后发送。ip_push_pending_frames 就可以触发这个动作。

1
2
3
4
5
6
7
8
int ip_append_data(struct sock *sk, struct flowi4 *fl4,
int getfrag(void *from, char *to, int offset, int len,
int odd, struct sk_buff *skb),
void *from, int len, int protolen,
struct ipcm_cookie *ipc,
struct rtable **rt,
unsigned int flags);

第一件事情,检查是否有 MSG_PROBE 的标志,有了这个标志的话,表示请求并不真的是需要向下调用。这个在测试对应 IP 地址的 PMTU 的时候会用到。

1
2
3
if (flags&MSG_PROBE)
return 0;

如果 sock 相关联的 sk_write_queue 队列为空,说明这个是第一个 IP fragment ,如果不是第一个那么就把 transhdrlen 设置成 0,因为只有第一个 IP fragment 才有头部长度的信息。

1
2
3
4
5
6
7
8
if (skb_queue_empty(&sk->sk_write_queue)) {
err = ip_setup_cork(sk, &inet->cork.base, ipc, rtp);
if (err)
return err;
} else {
transhdrlen = 0;
}

ip_setup_cork 主要是初始化了 net->corkcork 保存了 IP options 和 路由信息。

进入 __append_ip_data 之后可以看到

1
2
3
4
5
6
7
8
   skb = skb_peek_tail(queue);

exthdrlen = !skb ? rt->dst.header_len : 0;
mtu = cork->fragsize;
if (cork->tx_flags & SKBTX_ANY_SW_TSTAMP &&
sk->sk_tsflags & SOF_TIMESTAMPING_OPT_ID)
tskey = sk->sk_tskey++;

计算 L2 头部长度,fragment 头部长度,最大可以容纳的 fragment 长度,还有不分片的情况下的最大长度。(如果忽略分片就直接默认最大值,64KB 就是 0xFFFF,否则使用 mtu)

1
2
3
4
5
6
hh_len = LL_RESERVED_SPACE(rt->dst.dev);

fragheaderlen = sizeof(struct iphdr) + (opt ? opt->optlen : 0);
maxfraglen = ((mtu - fragheaderlen) & ~7) + fragheaderlen;
maxnonfragsize = ip_sk_ignore_df(sk) ? 0xFFFF : mtu;

首先累加的 buffer 长度不能超过最大的 IP 包长度。

1
2
3
4
5
6
if (cork->length + length > maxnonfragsize - fragheaderlen) {
ip_local_error(sk, EMSGSIZE, fl4->daddr, inet->inet_dport,
mtu - (opt ? opt->optlen : 0));
return -EMSGSIZE;
}

如果是第一个包,并且不会有新的分片,并且硬件支持 checksum 就可以标志为 CHECKSUM_PARTIAL

1
2
3
4
5
6
7
8
9
10
/*
* transhdrlen > 0 means that this is the first fragment and we wish
* it won't be fragmented in the future.
*/
if (transhdrlen &&
length + fragheaderlen <= mtu &&
rt->dst.dev->features & (NETIF_F_HW_CSUM | NETIF_F_IP_CSUM) &&
!(flags & MSG_MORE) &&
!exthdrlen)
csummode = CHECKSUM_PARTIAL;

下面如果满足下面条件就进入 UDP Fragment Offload 例程。是硬件网卡提供的一种特性,由内核和驱动配合完成相关功能。其目的是由网卡硬件来完成本来需要软件进行的分段(分片)操作用于提升效率和性能。如大家所知,在网络上传输的数据包不能大于 mtu,当用户发送大于 mtu 的数据报文时,通常会在传输层(或者在特殊情况下在 IP 层分片,比如 ip 转发或 ipsec 时)就会按 mtu 大小进行分段,防止发送出去的报文大于 mtu,为提升该操作的性能,新的网卡硬件基本都实现了 UFO 功能,可以使分段(或分片)操作在网卡硬件完成,此时用户态就可以发送长度大于 mtu 的包,而且不必在协议栈中进行分段(或分片)。如果硬件支持,是 UDP 协议,并且是大于 mtu 的可以直接用这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
if ((((length + fragheaderlen) > mtu) || (skb && skb_is_gso(skb))) &&
(sk->sk_protocol == IPPROTO_UDP) &&
(rt->dst.dev->features & NETIF_F_UFO) && !dst_xfrm(&rt->dst) &&
(sk->sk_type == SOCK_DGRAM) && !sk->sk_no_check_tx) {
err = ip_ufo_append_data(sk, queue, getfrag, from, length,
hh_len, fragheaderlen, transhdrlen,
maxfraglen, flags);
if (err)
goto error;
return 0;
}

剩下的代码有点啰嗦,总体来说就是把 buff 拆分成可以直接发送的 IP fragment,但是需要先把道理讲清楚,不然看代码有点复杂。

确定剩余可以用来拷贝的空间,不能超过 mtu 和 maxfraglen

1
2
3
4
5
/* Check if the remaining data fits into current packet. */
copy = mtu - skb->len;
if (copy < length)
copy = maxfraglen - skb->len;

如果不够,就需要分配一个新的 skb,这里面的几个变量具体解释一下,首先是

fraggap 表示的是 mtu 不是 8 的倍数,在最后那个比 8 的倍数多,又小于 mtu 的部分就是 fraggap 了,所以 datalenlength + fraggapfraggap 这部分会从 prev_skb 尾部移动到新 skb 的头部。fraglen 是带上 frag 头部的长度 fraglen = datalen + fragheaderlen。如果 flag 包含了 MSG_MORE 那么会尽量分配一个 mtu,当然这是在不支持 SG(Scatter/Gather I/O) 的情况下。因为支持 SG 的话,就可以直接分散的分配这些内存,最后进行 skb 的分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
		if (copy <= 0) {
char *data;
unsigned int datalen;
unsigned int fraglen;
unsigned int fraggap;
unsigned int alloclen;
struct sk_buff *skb_prev;
alloc_new_skb:
skb_prev = skb;
if (skb_prev)
fraggap = skb_prev->len - maxfraglen;
else
fraggap = 0;

/*
* If remaining data exceeds the mtu,
* we know we need more fragment(s).
*/
datalen = length + fraggap;
if (datalen > mtu - fragheaderlen)
datalen = maxfraglen - fragheaderlen;
fraglen = datalen + fragheaderlen;

if ((flags & MSG_MORE) &&
!(rt->dst.dev->features&NETIF_F_SG))
alloclen = mtu;
else
alloclen = fraglen;

alloclen += exthdrlen;

/* The last fragment gets additional space at tail.
* Note, with MSG_MORE we overallocate on fragments,
* because we have no idea what fragment will be
* the last.
*/
if (datalen == length + fraggap)
alloclen += rt->dst.trailer_len;

if (transhdrlen) {
skb = sock_alloc_send_skb(sk,
alloclen + hh_len + 15,
(flags & MSG_DONTWAIT), &err);
} else {
skb = NULL;
if (atomic_read(&sk->sk_wmem_alloc) <=
2 * sk->sk_sndbuf)
skb = sock_wmalloc(sk,
alloclen + hh_len + 15, 1,
sk->sk_allocation);
if (unlikely(!skb))
err = -ENOBUFS;
}
if (!skb)
goto error;


length 表示需要传输的长度,在循环不断进行中这个 length 就会变成 0。

接下来的部分是初始化 csumip_summed 保留硬件头部长度。

1
2
3
4
5
6
7
         /*
* Fill in the control structures
*/
skb->ip_summed = csummode;
skb->csum = 0;
skb_reserve(skb, hh_len);

设置 tx_flagstskey

1
2
3
4
5
6
/* only the initial fragment is time stamped */
skb_shinfo(skb)->tx_flags = cork->tx_flags;
cork->tx_flags = 0;
skb_shinfo(skb)->tskey = tskey;
tskey = 0;

保留数据空间,并且把指针移动到头部后面,指向负载的部分。

1
2
3
4
5
6
7
 *	Find where to start putting bytes.
*/
data = skb_put(skb, fraglen + exthdrlen);
skb_set_network_header(skb, exthdrlen);
skb->transport_header = (skb->network_header +
fragheaderlen);
data += fragheaderlen + exthdrlen;

这部分就是把上个 skb 的 fraggap 移到当前这个,并且重新获取 checksum。

1
2
3
4
5
6
7
8
9
10
if (fraggap) {
skb->csum = skb_copy_and_csum_bits(
skb_prev, maxfraglen,
data + transhdrlen, fraggap, 0);
skb_prev->csum = csum_sub(skb_prev->csum,
skb->csum);
data += fraggap;
pskb_trim_unique(skb_prev, maxfraglen);
}

接下来调用 getfrag 拷贝数据,加入到队尾。

getfrag 对应的 4 个 routine 分别是

  • ICMP icmp_glue_bits
  • UDP. ip_generic_getfrag
  • RAW iP ip_generic_getfrag
  • TCP. Ip_reply_glue_bits

getfrag 的功能就是从 from 拷贝到 to ,因为可能是用户态的数据,所以包含了地址转换的功能并且在比好的时候重新计算校验和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
	copy = datalen - transhdrlen - fraggap;
if (copy > 0 && getfrag(from, data + transhdrlen, offset, copy, fraggap, skb) < 0) {
err = -EFAULT;
kfree_skb(skb);
goto error;
}

offset += copy;
length -= datalen - fraggap;
transhdrlen = 0;
exthdrlen = 0;
csummode = CHECKSUM_NONE;

if ((flags & MSG_CONFIRM) && !skb_prev)
skb_set_dst_pending_confirm(skb, 1);

/*
* Put the packet on the pending queue.
*/
__skb_queue_tail(queue, skb);
continue;
}

剩下的就是 copy 足够,不需要分配新的 skb 的条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
if (copy > length)
copy = length;

if (!(rt->dst.dev->features&NETIF_F_SG)) {
unsigned int off;

off = skb->len;
if (getfrag(from, skb_put(skb, copy),
offset, copy, off, skb) < 0) {
__skb_trim(skb, off);
err = -EFAULT;
goto error;
}
} else {
int i = skb_shinfo(skb)->nr_frags;

err = -ENOMEM;
if (!sk_page_frag_refill(sk, pfrag))
goto error;

if (!skb_can_coalesce(skb, i, pfrag->page,
pfrag->offset)) {
err = -EMSGSIZE;
if (i == MAX_SKB_FRAGS)
goto error;

__skb_fill_page_desc(skb, i, pfrag->page,
pfrag->offset, 0);
skb_shinfo(skb)->nr_frags = ++i;
get_page(pfrag->page);
}
copy = min_t(int, copy, pfrag->size - pfrag->offset);
if (getfrag(from,
page_address(pfrag->page) + pfrag->offset,
offset, copy, skb->len, skb) < 0)
goto error_efault;

pfrag->offset += copy;
skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
skb->len += copy;
skb->data_len += copy;
skb->truesize += copy;
atomic_add(copy, &sk->sk_wmem_alloc);
}
offset += copy;
length -= copy;

VFIO——将设备暴露到用户态

在开始之前我们先要说一个东西就是 DMA,直接让设备访问内存,可以不通过 CPU 搬运数据。

这是一个比较简单的体系结构图,设备 和 CPU 通过存储控制器访问存储器。一个简单的 case 是 CPU 向存储器写数据,然后设备从存储器读数据。这么快来一切都很正常。但是实际上 CPU 是有一层缓存的,例如下面这样的。

CPU 想内存写数据,但是先要清空到不一致的缓存,然后设备再去读数据,不然设备读到的数据和 CPU 实际的数据会不一致(因为缓存里的数据可能和存储器的不一致),而且实际上缓存也不只是一层,所以需要一个中间层来保证 从 CPU 的角度和从设备的角度内存都是一致的,所以就有了下面这个结构。

CPU 和 设备都会走缓存验证一遍以后,再落到存储器上,这样带上缓存以后大家的一致性都是一样的了。所以从设备的角度,设备也拥有了缓存,实际上这个和 IOMMU 关系不是很大,接下来设备其实也可以和 CPU 一样有一层 MMU,也就是地址到存储器物理地址的转换。注意,这里我用了地址,因为对 CPU 来说是虚拟地址,但是对设备来说是一个总线域的地址。这里要明确区分一下,一个是总线地址,是从设备的角度来看的,一个是 CPU 的虚拟地址,这是从 CPU 角度来看的,两个是不同的东西。将总线域地址转换成存储器物理地址的设备就叫 IOMMU。

如果没有 IOMMU,DMA 也能照常工作,IOMMU 的主要作用就是保护功能,防止使用 DMA 的设备访问任意存储器的物理地址。

IOMMU 在不同架构上名字不太一样,AMD 叫 AMD-Vi,最开始针对的设备只是显卡,Intel 叫 VT-d,arm 叫 SMMU,具体对应的手册也不太一样,但是主要解决的问题是一致的。在 VTd 中,dmar (DMA remapping) 就是那个 IOMMU 设备,通过中断的方式实现类似 page fault 一样的内存分配行为。DMA 传输是由 CPU 发起的:CPU 会告诉 DMA 控制器,帮忙将 xxx 地方的数据搬到 xxx 地方。CPU 发完指令之后,就当甩手掌柜了。IOMMU 有点像 MMU 是一个将设备地址翻译到内存地址的页表体系,也会有对应的页表,这个东西在虚拟化中也非常有用,可以将原本有软件模拟的设备,用直接的硬件替代,而原本的隔离通过 IOMMU 来完成。如下图所示,原本需要通过软件模拟的驱动设备可以通过 IOMMU 以__安全__的方式来直接把硬件设备分配个用户态的 Guest OS。

理论上讲没有 IOMMU 实际上是可以工作的,但是硬件的角度,设备就拥有了整个存储器的全局视图,这是无论如何都非常不合理的事情,不应该让设备拥有访问任意物理内存的能力。

这里要提的另外一个功能就是对中断的隔离,类似于下面的通过在中断请求中添加标识来重定向中断到对应的中断回调上。

VFIO 的作用就是通过 IOMMU 以安全的方式来将设备的访问直接暴露到用户空间,而不用专门完成某个驱动等待合并到上游或者使用之前的对 IOMMU 没有感知的 UIO 的框架。通过 VFIO 向用户态开放 IOMMU 的功能,编写用户态的驱动。

对于 IOMMU 来说,隔离的级别不一定是单个设备,比如一个后面有几个设备的 PCI 桥,从 PCI 桥角度来说,都是来自 PCI 桥的总线事务。所以 IOMMU 有一个 iommu_group的概念,代表一组与其他设备隔离的设备的集合。

IOMMU 根据手册上讲还有一个域的概念,可以简单理解为一段物理地址的抽象。

iommu_group 的层级上,VFIO 封装了一层 container class,这个的作用对应于希望能够在不同的iommu_group 之间共享 TLBpage tables,这个就是一个集合的概念,跟容器的那个概念没啥关系,一个集合总归要有个名字。通过把 host 的 device 和 driver 解绑,然后绑定到 VFIO 的 driver 上,就会有个/dev/vfio/$GROUP/ 出现,然后这个 $GROUP代表的就是这个 device 的 iommu_group号,如果要使用 VFIO 就要把这个 group 下的所有 device 都解绑才可以。

通过打开/dev/vfio/vfio就能创建一个 VFIO 的 container,然后再打开/dev/vfio/$GROUPVFIO_GROUP_SET_CONTAINER ioctl 把文件描述传进去,就把 group 加进去了,如果支持多个 group 共享页表等结构,还可以把相应的 group 也加进去。(再强调一遍这个页表是总线地址到存储器物理地址,IOMMU 管理的那个页表)。

下面举个官方的栗子,获取 PCI 设备 0000:06:0d.0 的 group_id (PCI 命名的规则是 domain:bus:slot.func

1
2
$ readlink /sys/bus/pci/devices/0000:06:0d.0/iommu_group
../../../../kernel/iommu_groups/26

使用之前需要你已经加载了 VFIO 模块

1
modprobe vfio-pci

解绑 PCI 设备,然后创建一个 container id

1
2
3
4
$ lspci -n -s 0000:06:0d.0
06:0d.0 0401: 1102:0002 (rev 08)
# echo 0000:06:0d.0 > /sys/bus/pci/devices/0000:06:0d.0/driver/unbind
# echo 1102 0002 > /sys/bus/pci/drivers/vfio-pci/new_id

然后寻找其他同属于一个 group 的设备

1
2
3
4
5
6
7
8
$ ls -l /sys/bus/pci/devices/0000:06:0d.0/iommu_group/devices
total 0
lrwxrwxrwx. 1 root root 0 Apr 23 16:13 0000:00:1e.0 ->
../../../../devices/pci0000:00/0000:00:1e.0
lrwxrwxrwx. 1 root root 0 Apr 23 16:13 0000:06:0d.0 ->
../../../../devices/pci0000:00/0000:00:1e.0/0000:06:0d.0
lrwxrwxrwx. 1 root root 0 Apr 23 16:13 0000:06:0d.1 ->
../../../../devices/pci0000:00/0000:00:1e.0/0000:06:0d.1

PCI 桥 0000:00:1e.0 后面挂了两个设备,一个是刚才加进去的 0000:06:0d.0,还有一个是 0000:06:0d.1,通过上面的步奏加进去就可以。

最后一步是让用户有权限使用这个 group。

1
# chown user:user /dev/vfio/26

下面就是一个样例,从用户态使用 VFIO,整个的使用方式是通过 ioctl来获取中断相关信息,以及注册中断处理函数,然后也是通过 ioctl来获取region信息,然后调用相应的mmap函数,让 CPU 可以访问内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
int container, group, device, i;
struct vfio_group_status group_status =
{ .argsz = sizeof(group_status) };
struct vfio_iommu_type1_info iommu_info = { .argsz = sizeof(iommu_info) };
struct vfio_iommu_type1_dma_map dma_map = { .argsz = sizeof(dma_map) };
struct vfio_device_info device_info = { .argsz = sizeof(device_info) };

/* Create a new container */
container = open("/dev/vfio/vfio", O_RDWR);

if (ioctl(container, VFIO_GET_API_VERSION) != VFIO_API_VERSION)
/* Unknown API version */

if (!ioctl(container, VFIO_CHECK_EXTENSION, VFIO_TYPE1_IOMMU))
/* Doesn't support the IOMMU driver we want. */

/* Open the group */
group = open("/dev/vfio/26", O_RDWR);

/* Test the group is viable and available */
ioctl(group, VFIO_GROUP_GET_STATUS, &group_status);

if (!(group_status.flags & VFIO_GROUP_FLAGS_VIABLE))
/* Group is not viable (ie, not all devices bound for vfio) */

/* Add the group to the container */
ioctl(group, VFIO_GROUP_SET_CONTAINER, &container);

/* Enable the IOMMU model we want */
ioctl(container, VFIO_SET_IOMMU, VFIO_TYPE1_IOMMU);

/* Get addition IOMMU info */
ioctl(container, VFIO_IOMMU_GET_INFO, &iommu_info);

/* Allocate some space and setup a DMA mapping */
dma_map.vaddr = mmap(0, 1024 * 1024, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS, 0, 0);
dma_map.size = 1024 * 1024;
dma_map.iova = 0; /* 1MB starting at 0x0 from device view */
dma_map.flags = VFIO_DMA_MAP_FLAG_READ | VFIO_DMA_MAP_FLAG_WRITE;

ioctl(container, VFIO_IOMMU_MAP_DMA, &dma_map);

/* Get a file descriptor for the device */
device = ioctl(group, VFIO_GROUP_GET_DEVICE_FD, "0000:06:0d.0");

/* Test and setup the device */
ioctl(device, VFIO_DEVICE_GET_INFO, &device_info);

for (i = 0; i < device_info.num_regions; i++) {
struct vfio_region_info reg = { .argsz = sizeof(reg) };

reg.index = i;

ioctl(device, VFIO_DEVICE_GET_REGION_INFO, &reg);

/* Setup mappings... read/write offsets, mmaps
* For PCI devices, config space is a region */
}

for (i = 0; i < device_info.num_irqs; i++) {
struct vfio_irq_info irq = { .argsz = sizeof(irq) };

irq.index = i;

ioctl(device, VFIO_DEVICE_GET_IRQ_INFO, &irq);

/* Setup IRQs... eventfds, VFIO_DEVICE_SET_IRQS */
}

/* Gratuitous device reset and go... */
ioctl(device, VFIO_DEVICE_RESET);

include/linux/vfio.h里面有完整的 API,这里就简单略过。

在理解了一些基本原理和使用方式之后再来看 VFIO 的代码应该叫就比较容易理解了。

首先是作为 PCI 设备的 probe。主要是通过 vfio_iommu_group_get 分配 iommu_group,然后调用vfio_add_group_dev初始化设备回调接口vfio_pci_ops,而remove就是反过来把对应的结构释放掉就可以。然后再看注册的回调函数结构体。

1
2
3
4
5
6
7
8
9
10
static const struct vfio_device_ops vfio_pci_ops = {
.name = "vfio-pci",
.open = vfio_pci_open,
.release = vfio_pci_release,
.ioctl = vfio_pci_ioctl,
.read = vfio_pci_read,
.write = vfio_pci_write,
.mmap = vfio_pci_mmap,
.request = vfio_pci_request,
};

这里分析几个关键的函数,他们会通过file_operations vfio_fops被间接的调用。

首先是 mmap,就是在调用vfio_pci_mmap的时候最终调用remap_pfn_range(vma, vma->vm_start, vma->vm_pgoff, req_len, vma->vm_page_prot); 来将物理内存映射到用户态空间,这就是上面的栗子中 mmap 系统调用的入口,而具体要映射的物理内存是通过一系列pci_resource_xxx宏从 PCI bar 空间读出来的配置。

然后是 ioctl 接口,这个接口比较丰富,也简单的看一下。比如 VFIO_DEVICE_SET_IRQS会通过使用用户态传进来的结构体,调用vfio_pci_set_irqs_ioctl注册中断处理函数。而通过vfio_ioctl_set_iommu会设置 container 的 iommu_group 以及对应的 driver。read/write接口都是用于修改 PCI 配置信息的。

简单的来说,VFIO 的主要工作是把设备通过 IOMMU 映射的 DMA 物理内存地址映射到用户态中,让用户态程序可以自行操纵设备的传输,并且可以保证一定程度的安全,另外可以自行注册中断处理函数,从而在用户态实现设备的驱动程序,通过这样的框架,可以在 DPDK 中充分发挥用户态协议栈的威力。

参考文献

  1. PCI 基本概念
  2. dmar 和 iommu
  3. 总线基本概念
  4. 《PCI Express 体系结构导读》王齐著
  5. mastering the dma and iommu apis
  6. VFIO - “Virtual Function I/O”
  7. Intel® Virtualization Technology for Directed I/O: Spec
  8. Linux 下 PCI 设备驱动程序开发

在分析net_clsnet_prio之前先要解释几个东西,一个是网络的 QoS 以及 netfilter。

网络 QoS

IP 服务模型是尽力而为的,这样的模型不能体现某些流量的重要性,所以诞生了QOS技术,Linux 很早就提供了流量控制接口,命令行工具是tc

协议栈的 QoS 主要由三部分组成。

qdisc 队列规则(queueing discipline),class 控制策略,filter 根据 filter 划入具体的控制策略(class)。

一般由流程是这样的,当一个 qdisc 被入队的时候,会循环匹配其中的 filter 如果匹配到了的话,就会将 packet 入队到对应的 class 当中,大部分情况下被 class 的“所有者”代表的 qdisc 入队。一些没有匹配的 packet 会进入默认的 class。下面将会介绍几种常用的 queue discpline。

pfifo

默认 qdisc 就是 pfifo,实现在 net/sched/sch_prio.c

设有三个优先级的队列,优先级由高到低分别是

  1. “interactive”
  2. “best effort”
  3. “bulk”

先消费 0 队列再消费 1 队列,依次类推,一般的packet都是属于 1。

pfifo

用 IP 的 ToS 可以映射到这些队列,对应的关系如图。

tos

他的实现依赖下面这个结构,bands 一般是 3 个,代表三个不同优先级的队列,然后filter_list是他的过滤器列表,最后prio2band就是上图的ToS To Queue 的映射中的Linux priority 到 Band的那部分,queues保存的是三个fifoqdisc,也就是三个最简单的队列。

1
2
3
4
5
6
7
struct prio_sched_data {
int bands;
struct tcf_proto __rcu *filter_list;
u8 prio2band[TC_PRIO_MAX+1];
struct Qdisc *queues[TCQ_PRIO_BANDS];
};

这里虽然和 IP 的 ToS 暂时没有关系,但是最后cgroup的部分会提到怎么联系起来的。

pfifo enqueue 的时候会调用prio_classify根据skb->priority来选择队列进行入队,dequeue 的时候则 round robin 每次依次从优先级高到低取出一个 packet。

HTB (Hierarchical Token Bucket)

HTB 是一个层级令牌桶的 qdisc,而且可以加入 class,HTB 的整体结构如下,HTB 的作者在这里 解释了他的设计。

这里简单解释一下,每个 class 有一个 AR(保障速率)CR(最大速率),P(优先级),level(在树中的层次),quantum(量子,一个动态参数),和实际的速率 R,中间层的 class 通过计算子层的速率获得。

Leaf 没有子节点,并且只有 Leaf 有传输功能的 queue,其他只是帮助构造层级关系。

Mode class 的状态

  1. Red ( R > CR ) 也就是超速

  2. Yellow (R <= CR && R > AR) 也就是合理超速

  3. Green 就是没有超过保障速率

下面我们有几个等式

1
Rc = min(CRc, ARc + Bc)        [eq1]

Bc 表示从祖先那里借到的速率,下面这段公式表示的意思是,如果我是当前 prio 最小的,从优先级的兄弟节点中加权平均借到父节点的速率,其他的 prio 更大的节点都不能借到速率。

1
2
3
4
5
       Qc Rp
Bc = ----------------------------- iff min[Pi over D(p)]>=Pc [eq2]
sum[Qi over D(p) where Pi=Pc]

Bc = 0 otherwise [eq3]

如果没有父节点的话,Bc 就是 0。这样算出来代表的意义是什么呢,就是说先服务优先级更高的节点,并且按照量子的大小在同优先级的节点中分配速率。

然后我们具体看一下 HTB 调度器是如何工作的。

htb_sch_1

htb_sch_2

每个 class 有个 slot 包含不同的 prio 级别,指向 yellow 的子节点,然后每个层级包含一个 slot 也有不同的 prio 级别指向这一层中 green 的节点,上图展示了的是有两个 prio 的 slot,右边的 self slot 有个白色的是用于 yellow red 的 wait list。

假设当前如图 1 的状态,所有节点都是绿的没有 packet 到来,现在有有两个包到达 C 和 D,然后激活它们,并且它们现在都是绿的,所以 self slot 指向他们,然后因为 D 的优先级更高,所以先把 D 出队。所以你可以发现,出队的顺序很简答,就是按照优先级从 self slot 里面把绿色的包按顺序取出来就可以。

feed3

feed4

然后我们看一下更复杂的情况,在图 3 中,我们从 D 中出队一个 packet(htb_dequeue),然后htb_charge_class会增加 D 的速率,导致 D 变成 yellow,离开 self slot (通过htb_deactivate_prioshtb_remove_class_from_row),然后添加到 B 的 slot 里面 (htb_activate_prios),并且递归向上添加htb_add_class_to_row,D 会在一段时间后进入 self slot 的白色等待区,然后 D 又会变回绿色。现在如果选择的话,就会从 C 出队,因为虽然 C 的优先级低但是 C 不需要借别人的速率。

在图 4,假设 C 已经完全消耗了速率达到了最大限速,这个时候 D 就会开始工作然后把 B 消耗完,B 被消耗了以后就会去消耗 A,从这里就可以看到一个借取的过程。

htb_sch_5

htb_sch_6

现在说一个更复杂一点的例子,在图 5 中,A 已经被消耗光了,E 开始工作,然后 C 也能开始工作,变成图 6 的样子。注意即使 D 没有被使用但是他的优先级还会被 class slot 维持,也就是红线。但是 C 和 E 都是同一个优先级,这样的话,就要使用 DRR 算法(也就是在 RR 算法上给每个变量加一个权重,也就是之前的那个量子 quantom)。然后也可以发现一个 class 可以对不同的优先级(红色和蓝色)保持 active。

下面是一个 HTB 的全貌图,抽象的可以理解成一个从父 class 中根据优先级带权重的分享令牌的一个算法。

htb arch

下面三个值对应的就是三种颜色。

1
2
3
4
5
6
/* used internaly to keep status of single class */
enum htb_cmode {
HTB_CANT_SEND, /* class can't send and can't borrow */
HTB_MAY_BORROW, /* class can't send but may borrow */
HTB_CAN_SEND /* class can send */
};

HTB 的实现依赖于

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
struct htb_sched {
struct Qdisc_class_hash clhash;
int defcls; /* class where unclassified flows go to */
int rate2quantum; /* quant = rate / rate2quantum */

/* filters for qdisc itself */
struct tcf_proto __rcu *filter_list;

#define HTB_WARN_TOOMANYEVENTS 0x1
unsigned int warned; /* only one warning */
int direct_qlen;
struct work_struct work;

/* non shaped skbs; let them go directly thru */
struct qdisc_skb_head direct_queue;
long direct_pkts;

struct qdisc_watchdog watchdog;

s64 now; /* cached dequeue time */
struct list_head drops[TC_HTB_NUMPRIO];/* active leaves (for drops) */

/* time of nearest event per level (row) */
s64 near_ev_cache[TC_HTB_MAXDEPTH];

int row_mask[TC_HTB_MAXDEPTH];

struct htb_level hlevel[TC_HTB_MAXDEPTH];
};


其中hlevel对应的就是self slot,而clhash 可以通过 classid 找到 hub_class,其中的->un.inner就是对应的class slot->un.leaf对应的就是叶子结点。

使用

qdisc 参数

parent major:minor 或者 root

一个 qdisc 是根节点就是 root,否则其他的情况指定 parent。其中 major:minor 是 class 的 handle id,每个 class 都要指定一个 id 用于标识。

handle major: ,这个语法有点奇怪,是可选的,如果 qdisc 下面还要分类(多个 class),则需要指定这个 hanlde。对于 root,通常是”1:”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// handle 是一组用户指定的标示符,格式为 major:minor。
// 如果是一条 queueing discipline,minor 需要一直为 0。
# tc qdisc add dev eth0 root handle 1: htb

// parent 指明该新增的 class 添加到那一个父 handle 上去
// classid 指明该 class handle 的唯一 ID,minor 需要是非零值
// ceil 定义 rate 的上界
# tc class add dev eth0 parent 1:1 classid 1:6 htb rate 256kbit ceil 512kbit

// 新建一个带宽为 100kbps 的 root class, 其 classid 为 1:1
# tc class add dev eth0 parent 1: classid 1:1 htb rate 100kbps ceil 100kbps
// 接着建立两个子 class,指定其 parent 为 1:1,ceil 用来限制子类最大的带宽
# tc class add dev eth0 parent 1:1 classid 1:10 htb rate 40kbps ceil 100kbps
# tc class add dev eth0 parent 1:1 classid 1:11 htb rate 60kbps ceil 100kbps
// 随后建立 filter 指定哪些类型的 packet 进入那个 class
# tc filter add dev eth0 protocol ip parent 1:0 prio 1 u32 match ip src 1.2.3.4 match ip dport 80 0xffff flowid 1:10
# tc filter add dev eth0 protocol ip parent 1:0 prio 1 u32 match ip src 1.2.3.4 flow 1:11
// 最后为这些 class 添加 queuing disciplines
# tc qdisc add dev eth0 parent 1:10 handle 20: pfifo limit 5
# tc qdisc add dev eth0 parent 1:11 handle 30: sfq perturb 10

实现

用 rtnetlink 接口实现 API。

include/net/sch_generic.h下有Qdisc_ops,Qdisc_class_ops,tcf_proto_ops的定义。

同一个文件下还有Qdisc_class_ops的定义。

qdisc_run首先检查设备的运行状态然后调用__qdisc_run,这个函数的主体就是调用 qdisc_restart, 直到超过限制或者需要让出时间 CPU 了,最后清空 qdiscRUNNING状态。

在此结构中,enqueue 和 dequeue 两个函数是整个 QoS 调度的入口函数。其中的Qdisc_class_ops用于对此 Qdisc的 filter list 进行操作,添加删除等,通过对 Qdisc 添加 fliter,filter 对 enqueue 到此 Qdisc 的 pkt 进行分类,从而归类到此 Qdisc 的子 class 中,而每个子 class 都有自己的 Qdisc 进行 pkt enqueue 的管理,因此实现一个树形的 filter 结构。

callgraph Qdisc 在从来自上层的dev_queue_xmit主动发送开始起作用,对出口数据包作限制。

1
2
3
4
5
6
7
8
dev_queue_xmit ->
__dev_queue_xmit ->
__dev_xmit_skb ->
qdisc->enqueue
__qdisc_run ->
qdisc_restart ->
qdisc->dequeue ->
sch_direct_xmit

enqueue的时候会主动唤起dequeue也可能是硬件发送就绪会唤醒发送软中断来dequeue,描述的整个过程大概是图中的这部分。

补充

netfilter

netfilter 在数据包传输中有一些 hook 可以在其中注册回调函数。

netfilter

iptables

主要是四表五链,是 netfilter 的用户态工具。

deep dive

cgroup 子系统 net_cls (Network classifier cgroup)

net_cls 可以给 packet 打上 classid 的标签,用于过滤分类,有了上面的详细解释,这个 classid 的作用也非常明显了,就是用于标记skb所属的 qdisc class 的。

有了这个标签,流量控制器(tc)可以对不同的 cgroup 的 packet 起作用,Netfilter(iptables)也可以基于这个标签有对应的动作。创建一个 net_cls cgroup 对应的是创建一个 net_cls.classid 文件,这个文件初始化为 0。可以写 16 进制的 0xAAAABBBB 到这个文件里面,AAAA 是 major 号,BBBB 是 minor 号。读这个文件返回的是十进制的数字。

例子

1
2
3
4
mkdir /sys/fs/cgroup/net_cls
mount -t cgroup -onet_cls net_cls /sys/fs/cgroup/net_cls
mkdir /sys/fs/cgroup/net_cls/0
echo 0x100001 > /sys/fs/cgroup/net_cls/0/net_cls.classid

设置一个 10:1 handle.

1
2
cat /sys/fs/cgroup/net_cls/0/net_cls.classid
1048577

配置 tc:

1
2
tc qdisc add dev eth0 root handle 10: htb
tc class add dev eth0 parent 10: classid 10:1 htb rate 40mbit

创建 traffic class 10:1

1
tc filter add dev eth0 parent 10: protocol ip prio 10 handle 1: cgroup

配置 iptables,也可以用于这个 classid。

1
iptables -A OUTPUT -m cgroup ! --cgroup 0x100001 -j DROP

对应的实现在net/core/netclassid_cgroup.c下面。起作用的方式是css_cls_stateclassid并且sock_cgroup_set_classid(&sock->sk->sk_cgrp_data,(unsigned long)v)来设置sockclassid

cgroup net_prio 子系统

网络优先权(net_prio)子系统可以为各个 cgroup 中的应用程序动态配置每个网络接口的流量优先级。

net_prio.prioidx

只读文件。它包含一个特有整数值,kernel 使用该整数值作为这个 cgroup 的内部代表。

net_prio.ifpriomap

包含优先级图谱,这些优先级被分配给源于此群组进程的流量以及通过不同接口离开系统的流量。回顾pfifo里优先级映射,对应的就是这个值。该图用 的形式以成对列表表示:

1
2
3
4
~]# cat /cgroup/net_prio/iscsi/net_prio.ifpriomap
eth0 5
eth1 4
eth2 6

net_prio.ifpriomap 文件的目录可以使用上述格式,通过将字符串回显至文件的方式来修改。例如:

1
~]# echo "eth0 5" > /cgroup/net_prio/iscsi/net_prio.ifpriomap

上述指令将强制设定任何源于 iscsi net_prio cgroup 进程的流量和 eth0 网络接口传出的流量的优先级为 5。父 cgroup 也有可写入的 net_prio.ifpriomap 文件,可以设定系统默认优先级。

对应的实现在net/core/netprio_cgroup.c下面。实现方式是通过扩展dev->priomapprioid->prio的映射记录这个优先级和 cgroup 的关系。

net_prio 使用每个 cgroup 的 id(cgroupo->id)作为 sequence number,并将这个存储在 sk_cgrp_prioidx 中。sk_cgrp_prioidx 这个是单纯的用于设置网络包的优先级,使用这个之后将会覆盖之前通过 SO_PRIORITY socket 选项或者其他方式设置的值。

cgroup 的整体结构

cgroup 是容器当中对资源进行限制的机制,完整的名称是叫 control group。经常提到的 hierarchy 对应的是一个层级,而subsystem 对应的是一个子系统,都是可以望文生意的。创建一个层级是通过挂载完成的,也就是说层级对应的是文件系统 root 目录的结构。

子系统目前有下列几种

  1. devices 设备权限
  2. cpuset 分配指定的 CPU 和内存节点
  3. cpu 控制 CPU 占用率
  4. cpuacct 统计 CPU 使用情况
  5. memory 限制内存的使用上限
  6. freezer 暂停 Cgroup 中的进程
  7. net_cls 配合 tc(traffic controller)限制网络带宽
  8. net_prio 设置进程的网络流量优先级
  9. huge_tlb 限制 HugeTLB 的使用
  10. perf_event 允许 Perf 工具基于 Cgroup 分组做性能检测

创建层级通过 mount -t cgroup -o subsystems name /cgroup/name,/cgroup/name 是用来挂载层级的目录(层级结构是通过挂载添加的),-o 是子系统列表,比如 -o cpu,cpuset,memory,name 是层级的名称,一个层级可以包含多个子系统,如果要修改层级里的子系统重新 mount 即可。子系统和层级之间满足几个关系。

  1. 同一个 hierarchy 可以附加一个或多个 subsystem
  2. 一个 subsystem 可以附加到多个 hierarchy,当且仅当这些 hierarchy 只有这唯一一个 subsystem
  3. 系统每次新建一个 hierarchy 时,该系统上的所有 task 默认构成了这个新建的 hierarchy 的初始化 cgroup,这个 cgroup 也称为 root cgroup。对于你创建的每个 hierarchy,task 只能存在于其中一个 cgroup 中,即一个 task 不能存在于同一个 hierarchy 的不同 cgroup 中,但是一个 task 可以存在在不同 hierarchy 中的多个 cgroup 中。如果操作时把一个 task 添加到同一个 hierarchy 中的另一个 cgroup 中,则会从第一个 cgroup 中移除

/proc/self 对应的是当前进程的 proc 目录,比如当前进程 pid 是1,那么/proc/1/proc/self是等价的。运行man proc可以看到/proc/self/cgroup的解释。

/proc/[pid]/cgroup (since Linux 2.6.24)
This file describes control groups to which the process/task belongs. For each cgroup hierarchy there is one entry
containing colon-separated fields of the form:

5:cpuacct,cpu,cpuset:/daemons

The colon-separated fields are, from left to right:

1. hierarchy ID number

2. set of subsystems bound to the hierarchy

3. control group in the hierarchy to which the process belongs

This file is present only if the CONFIG_CGROUPS kernel configuration option is enabled.

这个展示的是当前进程属于的 control groups, 每一行是一排 hierarchy,中间是子系统,最后是受控制的 cgroup,可以通过这个文件知道自己所属于的cgroup。

cgroup 的创建

创建一个独立的 cgroup 则是在层级结构下面创建一个目录。

先看一下创建目录做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static int cgroup_mkdir(struct inode *dir, struct dentry *dentry, int mode)
{
struct cgroup *c_parent = dentry->d_parent->d_fsdata;

/* the vfs holds inode->i_mutex already */
return cgroup_create(c_parent, dentry, mode | S_IFDIR);
}

static long cgroup_create(struct cgroup *parent, struct dentry *dentry,
mode_t mode)
{
/* 获取父cgroup对应的层级hierarchy */
/* 这里叫做 cgroupfs 其实是匹配的,因为创建 hierachy 就是 mount 了一个文件系统的动作 */
struct cgroupfs_root *root = parent->root;
/* 初始化一个cgroup结构 cgrp */
init_cgroup_housekeeping(cgrp);
cgrp->parent = parent; /* 设置父 cgroup */
cgrp->root = parent->root; /* 继承 parent 的 hierachy */
cgrp->top_cgroup = parent->top_cgroup; /* 继承父 parent 对应 hierachy 的root cgroup */
/* 继承父 parent 的 notify_on_release 设置 */
if (notify_on_release(parent))
set_bit(CGRP_NOTIFY_ON_RELEASE, &cgrp->flags);
/* 对所属的hierachy的子系统进行初始化 */
for_each_subsys(root, ss) {
struct cgroup_subsys_state *css = ss->create(ss, cgrp);
if (IS_ERR(css)) {
err = PTR_ERR(css);
goto err_destroy;
}
init_cgroup_css(css, ss, cgrp);
if (ss->use_id)
if (alloc_css_id(ss, parent, cgrp))
goto err_destroy;
/* At error, ->destroy() callback has to free assigned ID. */
}
/* 加入到父 cgroup 的子列表里 */
cgroup_lock_hierarchy(root);
list_add(&cgrp->sibling, &cgrp->parent->children);
cgroup_unlock_hierarchy(root);
/* 创建 cgroup 目录 */
cgroup_create_dir(cgrp, dentry, mode);
/* 创建目录下对应的文件,比如common的部分(tasks),或者子系统的部分(cpu.shares,freezer.state)*/
cgroup_populate_dir(cgrp);

看一下 cgroup_subsys_state->create 的实现,举个例子比如kernel/cpuset.ccpuset子系统的创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static struct cgroup_subsys_state *cpuset_create(
struct cgroup_subsys *ss,
struct cgroup *cont)
{
struct cpuset *cs;
struct cpuset *parent;

if (!cont->parent) {
return &top_cpuset.css;
}
parent = cgroup_cs(cont->parent);
cs = kmalloc(sizeof(*cs), GFP_KERNEL);
if (!cs)
return ERR_PTR(-ENOMEM);
if (!alloc_cpumask_var(&cs->cpus_allowed, GFP_KERNEL)) {
kfree(cs);
return ERR_PTR(-ENOMEM);
}

cs->flags = 0;
if (is_spread_page(parent))
set_bit(CS_SPREAD_PAGE, &cs->flags);
if (is_spread_slab(parent))
set_bit(CS_SPREAD_SLAB, &cs->flags);
set_bit(CS_SCHED_LOAD_BALANCE, &cs->flags);
cpumask_clear(cs->cpus_allowed);
nodes_clear(cs->mems_allowed);
fmeter_init(&cs->fmeter);
cs->relax_domain_level = -1;

cs->parent = parent;
number_of_cpusets++;
return &cs->css ;
}

其实就是一系列 cgroup 初始化动作,填充目录的部分作为接口留给子系统实现。

总结一下:

hierarchy 对应的是 cgroup 的一个根,拥有一个top_cgroup,之后 hierarchy 下面的目录(cgroup)都是继承这些内容。

真正起作用的入口其实是对文件的读写操作,关于这一块VFS的内容可以看一下我之前的博客,这也是由子系统实现的,接下来看看子系统的实现。

freezer 子系统

freeze tasks 的相关内容可以在内核文档当中找到,简单来说是为了提供一种机制能够让进程挂起。这些函数在电源控制里面有很多用到的地方,比如我们常说的挂起,就是让所有进程进入冬眠状态。

首先看这个子系统是因为它比较简单,属性比较少,实现的代码也比较少。

这里铺垫一些知识,说明内核是如何睡眠和唤醒进程的。
一般内核进程进入睡眠需要进入wait_queue,然后调用 schedule。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* wait 是我们想要让 task 睡眠的 queue entry, q 是等待队列 */
DEFINE_WAIT(wait);
/* 添加到等待队列中 */
add_wait_queue(q, &wait);

/*
* 这里要检查condition是因为可能在唤醒之后这个condition的条件又不成立了,
* 这个和条件变量一样,即使条件满足被wake up了,也可能被其他进程修改了该条件.
*/
while (!condition) {
prepare_to_wait(&q, &wait, TASK_INTERRUPTIBLE);
if (signal_pending(current))
/* 处理信号 */
/* 进行调度 */
schedule();
}

finish_wait(&g, &wait);

内核当中进入睡眠都是这个模板,这里的 schedule,会触发调度器遍历 scheduler class,选择优先级最高的调度类。
一般就是CFS(Complete Fair Scheduler),接下来会进行 context 切换,注意不是像一般理解的那样,函数调用增长栈空间,而是把栈和寄存器都换掉,刷掉缓存等等,由此进入另外一个进程的上下文。直到被唤醒从恢复保存的 IP 重新开始执行。

唤醒的过程则是,调用wake_up()函数把 task 的状态重新设置为 TASK_RUNNING,并且把task从等待队列移除。
它会使用 enqueue_task() 把任务从新加入到调度器中。如果是 CFS 调度器的话就是加入到红黑树中,当need_resched设置了的话会显式调用调用schedule()调度,不然还会继续执行唤醒者的上下文。

然后说一下Freezing of tasks,就是通过发送信号唤醒用户态的进程和内核进程,所有这些进程需要响应这个信号并且最后调用 refrigerator() 进入睡眠,也就上面的那个循环。
下图是进入冬眠进程的过程.

“冰箱”这个函数名称很形象,就是把当前 task 丢入睡眠状态直到解封。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* Refrigerator is place where frozen processes are stored :-). */
void refrigerator(void)
{
/* Hmm, should we be allowed to suspend when there are realtime
processes around? */
long save;

task_lock(current);
if (freezing(current)) {
frozen_process();
task_unlock(current);
} else {
task_unlock(current);
return;
}
save = current->state;
pr_debug("%s entered refrigerator\n", current->comm);

spin_lock_irq(&current->sighand->siglock);
recalc_sigpending(); /* We sent fake signal, clean it up */
spin_unlock_irq(&current->sighand->siglock);

/* prevent accounting of that task to load */
current->flags |= PF_FREEZING;

for (;;) {
set_current_state(TASK_UNINTERRUPTIBLE);
if (!frozen(current))
break;
schedule();
}

/* Remove the accounting blocker */
current->flags &= ~PF_FREEZING;

pr_debug("%s left refrigerator\n", current->comm);
__set_current_state(save);
}

所以 freezer subsystem 干的事情就是这样一件事情,把 cgroup 中的进程进行挂起和恢复。现在具体看一下实现。

1
2
3
4
5
6
7
8
9
10
11
enum freezer_state {           
CGROUP_THAWED = 0,
CGROUP_FREEZING,
CGROUP_FROZEN,
};

struct freezer {
struct cgroup_subsys_state css;
enum freezer_state state;
spinlock_t lock; /* protects _writes_ to state */
};

freezer 有三种状态,THAWED,FREEZING,FROZEN,分别代表正常状态,停止中和停止。

freezer 对应的文件有state,cftype是对vfs的file结构的一个封装,最后加上子系统的name,文件名对应的就是”freezer.state”。
freezer.state更改文件内容的操作就可以更改cgroup当中task的挂起和恢复.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* 文件的读写函数 */
static struct cftype files[] = {
{
.name = "state",
.read_seq_string = freezer_read,
.write_string = freezer_write,
},
};

/* 添加子系统文件到cgroup目录中 */
static int freezer_populate(struct cgroup_subsys *ss, struct cgroup *cgroup)
{
if (!cgroup->parent)
return 0;
return cgroup_add_files(cgroup, ss, files, ARRAY_SIZE(files));
}

首先看一下freezer_read.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int freezer_read(struct cgroup *cgroup, struct cftype *cft,
struct seq_file *m)
{
struct freezer *freezer;
enum freezer_state state;

if (!cgroup_lock_live_group(cgroup))
return -ENODEV;

freezer = cgroup_freezer(cgroup);
spin_lock_irq(&freezer->lock);
state = freezer->state;
if (state == CGROUP_FREEZING) {
/* We change from FREEZING to FROZEN lazily if the cgroup was
* only partially frozen when we exitted write. */
update_freezer_state(cgroup, freezer);
state = freezer->state;
}
spin_unlock_irq(&freezer->lock);
cgroup_unlock();

seq_puts(m, freezer_state_strs[state]);
seq_putc(m, '\n');
return 0;
}

整个函数就是把 freezer 的 state 转换成字符换然后读取出来。

再看下 freezer_write 是如何改变进程状态的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int freezer_write(struct cgroup *cgroup,
struct cftype *cft,
const char *buffer)
{
int retval;
enum freezer_state goal_state;

if (strcmp(buffer, freezer_state_strs[CGROUP_THAWED]) == 0)
goal_state = CGROUP_THAWED;
else if (strcmp(buffer, freezer_state_strs[CGROUP_FROZEN]) == 0)
goal_state = CGROUP_FROZEN;
else
return -EINVAL;

if (!cgroup_lock_live_group(cgroup))
return -ENODEV;
retval = freezer_change_state(cgroup, goal_state);
cgroup_unlock();
return retval;
}

其实只是把写入的字符串转换成对应的枚举类型,然后调用freezer_change_state(cgroup, goal_state);

为了不贴过多的代码,这里略写,其实是根据类型不同进行调用了unfreeze_cgrouptry_to_freeze_cgroup

try_to_freeze_cgroup 遍历每个task执行freeze操作,而unfreeze也是类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int try_to_freeze_cgroup(struct cgroup *cgroup, struct freezer *freezer)
{
struct cgroup_iter it;
struct task_struct *task;
unsigned int num_cant_freeze_now = 0;

freezer->state = CGROUP_FREEZING;
cgroup_iter_start(cgroup, &it);
while ((task = cgroup_iter_next(cgroup, &it))) {
/* 尝试freeze task */
if (!freeze_task(task, true))
continue;
if (is_task_frozen_enough(task))
continue;
if (!freezing(task) && !freezer_should_skip(task))
num_cant_freeze_now++;
}
cgroup_iter_end(cgroup, &it);

return num_cant_freeze_now ? -EBUSY : 0;
}

所以,这里我们看最后的freeze和unfreeze某个task的动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool freeze_task(struct task_struct *p, bool sig_only)
{
/*
* We first check if the task is freezing and next if it has already
* been frozen to avoid the race with frozen_process() which first marks
* the task as frozen and next clears its TIF_FREEZE.
*/
if (!freezing(p)) {
rmb();
/* 如果frozen标记了
* 说明已经冻结,就返回失败
*/
if (frozen(p))
return false;

if (!sig_only || should_send_signal(p))
set_freeze_flag(p);
else
return false;
}

if (should_send_signal(p)) {
if (!signal_pending(p))
fake_signal_wake_up(p);
} else if (sig_only) {
return false;
} else {
wake_up_state(p, TASK_INTERRUPTIBLE);
}

return true;
}

停止的方式就是通过标记 freeze_flag,然后通过发送信号或者唤醒 task 来处理TIF_FREEZE标记(取决于是否设置了PF_FREEZER_NOSIG)。
最后又回到了之前给的那张流程图,等处理函数运行又会try_to_freeze()检查信号或者标志位,然后进入冰箱,而唤醒的方式则是反过来,把标记清除并且wake_up进程即可。

cpu 子系统

cpu子系统是对CPU时间配额进行限制的子系统,属性在这里列举一下

  • cpu.cfs_period_us 完全公平调度器的调整时间配额的周期
  • cpu.cfs_quota_us 完全公平调度器的周期当中可以占用的时间
  • cpu.stat 统计值
    • nr_periods 进入周期的次数
    • nr_throttled 运行时间被调整的次数
    • throttled_time 用于调整的时间
  • cpu.share cgroup中cpu的分配,如果a group是100,b group是300,那么a就会获得\(\frac{1}{4}\),b就会获得\(\frac{3}{4}\)的CPU。

CFS 调度器

接下来看一下对于 CPU 的限制是如何做到的,这要补充一下 CFS(完全公平调度器) 的相关的内容

CFS 保证进程之间完全公平获得 CPU 的份额,和我们传统的操作系统的时间片的理念不同,CFS 计算进程的 vruntime (其实就是总时间中的比例,并且带上进程优先级作为权重),来选择需要调度的下一个进程。用户态暴露的权重就是nice值,这个值越高权重就会低,反之亦然。(坊间的解释是 nice 的意思就是我对别的进程很 nice ,所以让别的进程多运行一会儿,自己少运行一会儿)。

CFS主要有几点,时间计算,进程选择,调度入口。

时间计算

先看下面这句话

Linux is a multi-user operating system. Consider a scenario where user A spawns ten tasks and user B spawns five. Using the above approach, every task would get ~7% of the available CPU time within a scheduling period. So user A gets 67% and user B gets 33% of the CPU time during their runs. Clearly, if user A continues to spawn more tasks, he can starve user B of even more CPU time. To address this problem, the concept of “group scheduling” was introduced in the scheduler, where, instead of dividing the CPU time among tasks, it is divided among groups of tasks.

总结来说 CPU 的时间并不是分给独立的 task 的,而是分给 task_group 的,这样防止用户 A 的进程数远远大于 B 而导致 B 饥饿的情况。这一组task通过 sched_entity 来表示。能够导致进程分组的方式一种是把进程划入一个cgroup,一种是通过set_sid()系统调用的新session中创建的进程会自动分组,这需要CONFIG_SCHED_AUTOGROUP编译选项开启。

调度的粒度是以sched_entity为粒度的,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
struct sched_entity {
struct load_weight load; /* for load-balancing */
struct rb_node run_node;
struct list_head group_node;
unsigned int on_rq;

u64 exec_start;
u64 sum_exec_runtime;
u64 vruntime;
u64 prev_sum_exec_runtime;

u64 nr_migrations;

#if defined(CONFIG_SMP) && defined(CONFIG_FAIR_GROUP_SCHED)
/* Per-entity load-tracking */
struct sched_avg avg;
#endif
#ifdef CONFIG_SCHEDSTATS
struct sched_statistics statistics;
#endif

#ifdef CONFIG_FAIR_GROUP_SCHED
struct sched_entity *parent;
/* rq on which this entity is (to be) queued: */
struct cfs_rq *cfs_rq;
/* rq "owned" by this entity/group: */
struct cfs_rq *my_q;
#endif
};

每个调度实体都有两个cfs_rq结构

1
2
3
4
5
6
7
   struct cfs_rq {
struct load_weight load;
unsigned long runnable_load_avg;
unsigned long blocked_load_avg;
unsigned long tg_load_contrib;
/* ... */
};

Each scheduling entity may, in turn, be queued on a parent scheduling entity’s run queue. At the lowest level of this hierarchy, the scheduling entity is a task; the scheduler traverses this hierarchy until the end when it has to pick a task to run on the CPU.

最底层的调度实体就是进程,而每个调度实体还会有两个cfs_rq,一个是cfs_rq另一个是my_q,前者是当前调度实体从属的rq,后者他自己的rq,所有的子调度实体都在这个rq上,从而构成了树形结构。可以通过cfs_rq遍历调度实体,而把自己的时间平分给my_q的调度实体.

1
2
3
4
5
6
7
   struct task_group {
struct sched_entity **se;
struct cfs_rq **cfs_rq;
unsigned long shares;
atomic_long_t load_avg;
/* ... */
};

Tasks belonging to a group can be scheduled on any CPU. Therefore it is not sufficient for a group to have a single scheduling entity; instead, every group must have one scheduling entity for each CPU. Tasks belonging to a group must move between the run queues in these per-CPU scheduling entities only, so that the footprint of the task is associated with the group even during task migrations.

单独的sched_entity为了适应SMP结构,又引入了task_group结构,包含了数组,分别属于某个CPU,对于一个进程想要从CPU1迁移到CPU2,只要把进程从tg->cfs_rq[0]转移到tg->cfs_rq[1]即可,一种 percpu 的结构。

优先级有一个映射表,表示调度占的权重,一般nice值为0的时候,大家都是1024,但是nice值为1的时候,权重就会降低到820,对于所有1024权重的进程,就会享有更少的时间,这个映射体现的是每提升一个等级,相差值大约为10%。

下面是 nice 值到权重的映射,这是内核普通进程的优先级范围(100-139),内核拥有140个优先级。

1
2
3
4
5
6
7
8
9
10
static const int prio_to_weight[40] = {
/* -20 */ 88761, 71755, 56483, 46273, 36291,
/* -15 */ 29154, 23254, 18705, 14949, 11916,
/* -10 */ 9548, 7620, 6100, 4904, 3906,
/* -5 */ 3121, 2501, 1991, 1586, 1277,
/* 0 */ 1024, 820, 655, 526, 423,
/* 5 */ 335, 272, 215, 172, 137,
/* 10 */ 110, 87, 70, 56, 45,
/* 15 */ 36, 29, 23, 18, 15,
};

调度实体中包含了一个结构就是表示这个权重的值,表示进程占的权重。

1
2
3
   struct load_weight {
unsigned long weight;
};

最后 time_slice = (sched_period() * se.load.weight) / cfs_rq.load.weight; 就是 se 运行时应该分配到的 CPU 时间的份额(sched_periodcfs.nr_running调度最小粒度时间,理想要每个进程都能运行一次)。

另外,用于衡量CPU负载的方式是通过sched_entity中的sched_avg结构,这个结构用于记录负载情况:

1
2
3
4
struct sched_avg {
u32 runnable_sum, runnable_avg_period;
unsigned long load_avg_contrib;
};

sched_entity是一个进程的时候,计算公式是sa.load_avg_contrib = (sa.runnable_sum * se.load.weight) / sa.runnable_period;(sesched_entitysasched_avg),runnbale_sum 是处于 RUNNING 状态的时间,runnable_period 表示可以变成运行状态的时间段。runnable_load_avgcfs_rq中用于统计所有seload的合,以此来表示CPU负载。blocked_load_avg 是相应的进程处于阻塞状态的负载。

sched_entity是一组进程的时候,计算方式是,
首先提取task group

1
tg = cfs_rq->tg;

之前已经统计了队列中的所有se的总和runnable_load_avg,然后全部累加到tg中。

1
2
cfs_rq->tg_load_contrib = cfs_rq->runnable_load_avg + cfs_rq->blocked_load_avg;
tg->load_avg += cfs_rq->tg_load_contrib;

最后se的值是通过在tg中的比重得到的,这里的tg->shares是最大允许的负载。

1
2
se->avg.load_avg_contrib =
(cfs_rq->tg_load_contrib * tg->shares / tg->load_avg);

进程选择

完全公平调度器的进程选择其实很简单,通过调用pick_next_entity()每次选择 vruntime 最小的进程进行运行。装载进程的结构选择的是红黑树,并且最左下角的结点是个特殊的节点,被保存起来,这样防止每次都从 root 一直搜索到最左下角,每次选择进程的时候直接选择该节点即可。

每次 vruntime 会在时钟中断和任何进程运行状态发生改变的时候进行计算,方式是通过权重调整得到一个 delta 值然后加到 vruntime 上面。
公式是vruntime += delta_exec * (NICE_0_LOAD/curr->load.weight);。这里的 weight 取决于 shares 值和负载等等因素的综合结果。

通过enqueue_entity()可以把进程加入到红黑树当中,当进程被唤醒的时候,或者第一次调用fork的时候就会被调用这个函数。具体就是更新了统计数据,并且把调度节点插入到红黑树当中。如果正好插入到了最右下角,那么就能马上被运行了。

通过dequeue_entity()可以把调度结点从红黑树中删除,这是进程在阻塞或者终止的时候会被调用的函数,具体就是把调度节点移除红黑树并且调整红黑树。

调度入口

内核的调度入口就是schedule(),遍历所有的调度类(因为内核中调度器的实现不只一种),选择权重最高的调度类并且进行进程选择,然后执行该进程。

这里列举一下所有抢占可能发生的时机

  1. 用户态进程:
    • 从系统调用返回
    • 从中断返回
  2. 内核态进程:
    • 从中断返回内核态
    • 进程主动调用schedule()
    • 进程变为可抢占状态(没有持有锁,其实还是中断驱动的)
    • 进程阻塞(最后还是调用schedule)

具体看 cgroup 的 cpu 子系统

补充完调度器的知识,再回来看 cgroup 是如何对进程做运行时间限制的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static struct cftype cpu_files[] = {
#ifdef CONFIG_FAIR_GROUP_SCHED
{
.name = "shares",
.read_u64 = cpu_shares_read_u64,
.write_u64 = cpu_shares_write_u64,
},
#endif
#ifdef CONFIG_CFS_BANDWIDTH
{
.name = "cfs_quota_us",
.read_s64 = cpu_cfs_quota_read_s64,
.write_s64 = cpu_cfs_quota_write_s64,
},
{
.name = "cfs_period_us",
.read_u64 = cpu_cfs_period_read_u64,
.write_u64 = cpu_cfs_period_write_u64,
},
{
.name = "stat",
.read_map = cpu_stats_show,
},
#endif

cpu 子系统也是用对文件进行读写的接口,其实就是获取了 cgroup 的 subsystem 的从属的 task_group,并且读取或者设置了 quota_usperiod_write 以及 shares 属性。具体这些属性应用的地方在调度器内部。task_group是一个管理组调度的结构。

因为内嵌了一个cgroup_subsys_state,这样cgroup就能通过自己的css成员反找到这个task_group

看一下cpu_shares_write_u64的实现

1
2
3
cpu_shares_write_u64 实际调用的是
-> sched_group_set_shares(cgroup_tg(cgrp), scale_load(shareval))
-> update_cfs_shares 获取 cgroup 的task group结构,调整权重
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void update_cfs_shares(struct cfs_rq *cfs_rq)
{
struct task_group *tg;
struct sched_entity *se;
long shares;

tg = cfs_rq->tg;
se = tg->se[cpu_of(rq_of(cfs_rq))];
if (!se || throttled_hierarchy(cfs_rq))
return;
#ifndef CONFIG_SMP
if (likely(se->load.weight == tg->shares))
return;
#endif
/* 根据tg->shares 和 rq 的负载计算出新的权重 */
shares = calc_cfs_shares(cfs_rq, tg);

reweight_entity(cfs_rq_of(se), se, shares);
}

最后一步 reweight_entity 就是调整se->load.weight的权重,从这里来保证shares能够调整进程可以获得的运行时间。当然除了这里,任何发生调度的地方都会有这样的行为,只不过我们主动修改了shares的值。

下图表示了展示了sharestask group中的作用。

对于cpu.cfs_period_uscpu.cfs_quota_us,是关于CPU bandwith的内容,论文CPU bandwidth control for CFS详细描述了其中的设计。论文中举例提到,shares 值只是使得CPU 的时间能够平均分配,但是实际运行时间可能会有变化,不能限制一个进程运行的上限。

在调度实体sched_entity中内嵌了一个结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct cfs_bandwidth {
#ifdef CONFIG_CFS_BANDWIDTH
raw_spinlock_t lock;
ktime_t period;
/* quota 是被赋予的时间,runtime 是实际运行的时间 */
u64 quota, runtime;
s64 hierarchal_quota;
/* 到期时间 */
u64 runtime_expires;

int idle, timer_active;
struct hrtimer period_timer, slack_timer;
struct list_head throttled_cfs_rq;

/* statistics */
int nr_periods, nr_throttled;
u64 throttled_time;
#endif
};

在每次调度的时候(无论是时间中断还是其他调度时间导致 enqueue 或者 dequeue),都会调用account_cfs_rq_runtime(),runtime 相当于实际使用的 quota , 在论文里说的是account_cfs_rq_quota(),对占用时间更新,计算剩余可以运行的时间,如果不够,则进行限制,标记为不可调度。其中内含一个高精度定时器period_timer定时扫描进程,把限制的进程解除,并给予更多的bandwidth以继续运行,period就是计时器的周期,每次都会更新可运行的时间。注意这里用的时间是真实时间.

另外,cpu.stat 主要是控制过程中的统计信息,是只读属性,比如被限制了多少次等等,具体的代码分析就直接略过。

cpuacct 子系统

cpuacct 比较简单,因为主要是一些统计信息

  • cpuacct.stat cgroup 及子消耗在用户态和内核态的CPU循环次数
  • cpuacct.usage cgroup 消耗的CPU总时间
  • cpuacct.usage_percpu cgroup在每个CPU上消耗的总时间

kernel/sched/cpuacct.c下有具体实现。

1
2
3
4
5
6
7
/* track cpu usage of a group of tasks and its child groups */
struct cpuacct {
struct cgroup_subsys_state css;
/* cpuusage holds pointer to a u64-type object on every cpu */
u64 __percpu *cpuusage;
struct kernel_cpustat __percpu *cpustat;
};

其中定义了per-cpu结构,让每个CPU都独占了一个用于统计的值,算是CPU的私有变量。

接口如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static struct cftype files[] = {
{
.name = "usage",
.read_u64 = cpuusage_read,
.write_u64 = cpuusage_write,
},
{
.name = "usage_percpu",
.read_seq_string = cpuacct_percpu_seq_read,
},
{
.name = "stat",
.read_map = cpuacct_stats_show,
},
{ } /* terminate */
};

每次调度update_curr,都会调用cpuacct_charge更新cpuacct中的值,作为统计数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* charge this task's execution time to its accounting group.
*
* called with rq->lock held.
*/
void cpuacct_charge(struct task_struct *tsk, u64 cputime)
{
struct cpuacct *ca;
int cpu;
/* 获取当前task属于的cpu */
cpu = task_cpu(tsk);

rcu_read_lock();
/* task的cpuacct结构 */
ca = task_ca(tsk);
/* 所有父节点的值都应该相应变化
* 上溯父节点更新统计值
*/
while (true) {
u64 *cpuusage = per_cpu_ptr(ca->cpuusage, cpu);
*cpuusage += cputime;

ca = parent_ca(ca);
if (!ca)
break;
}

rcu_read_unlock();
}

在时钟中断的时候会最终调用cpuacct_account_field()来更新kcpustat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* Add user/system time to cpuacct.
*
* Note: it's the caller that updates the account of the root cgroup.
*/
void cpuacct_account_field(struct task_struct *p, int index, u64 val)
{
struct kernel_cpustat *kcpustat;
struct cpuacct *ca;

rcu_read_lock();
ca = task_ca(p);
while (ca != &root_cpuacct) {
kcpustat = this_cpu_ptr(ca->cpustat);
kcpustat->cpustat[index] += val;
ca = __parent_ca(ca);
}
rcu_read_unlock();
}

cpuacct 算是一个简单的子系统,多是统计信息。

cpuset 子系统

cpuset 子系统用于分配独立的内存节点和CPU节点,这个主要应用与NUMA结构里面,多内存节点属于结构,先看一下cpuset的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct cpuset {
struct cgroup_subsys_state css;

unsigned long flags; /* "unsigned long" so bitops work */
cpumask_var_t cpus_allowed; /* CPUs allowed to tasks in cpuset */
nodemask_t mems_allowed; /* Memory Nodes allowed to tasks */

struct fmeter fmeter; /* memory_pressure filter */

/*
* Tasks are being attached to this cpuset. Used to prevent
* zeroing cpus/mems_allowed between ->can_attach() and ->attach().
*/
int attach_in_progress;

/* partition number for rebuild_sched_domains() */
int pn;

/* for custom sched domain */
int relax_domain_level;

struct work_struct hotplug_work;
};
  • cpuset.cpus cpu结点限制
  • cpuset.mems 内存结点限制
  • cpuset.memory_migrate 内存结点改变是否迁移
  • cpuset.cpu_exclusive 指定的限制是否是独享的,除了父节点或者子节点,不会和其他cpuset有交集
  • cpuset.mem_exclusive 指定的限制是否是独享的,除了父节点或者子节点,不会和其他cpuset有交集
  • cpuset.memory_pressure 换页压力的比率统计
  • cpuset.mem_hardwall 限制内核内存分配的结点,mems是限制用户态的分配
  • cpuset.memory_spread_page 把page cache分散到分配的各个结点中,而不是当前运行的结点.
  • cpuset.memory_spread_slab 把fs相关的slab的对象(inode和dentry)分散到结点中.
  • cpuset.sched_load_balance 打开调度CPU的负载均衡,这里指的是cpuset拥有的sched_domain,默认全局的CPU调度是本来就有负载均衡的。
  • cpuset.sched_relax_domain_level
  • cpuset.memory_pressure_enabled 计算换页压力的开关,注意,这个属性在top_group里面才有

cpus_allowedmems_allowed就是允许分配的内存节点和CPU节点的掩码

分配内存的时候调用栈是alloc_pages()->alloc_pages_current()->__alloc_pages_nodemask(),直到寻找可分配结点的时候会调用 zref_in_nodemask 来判断是否可以分配在该结点。

1
2
3
4
5
6
7
8
9
static inline int zref_in_nodemask(struct zoneref *zref, nodemask_t *nodes)
{
#ifdef CONFIG_NUMA
return node_isset(zonelist_node_idx(zref), *nodes);
#else
return 1;
#endif /* CONFIG_NUMA */
}

从这个函数也可以看到如果编译选项带了CONFIG_NUMA才会起作用,不然返回的永远都是真。

分散file cacheslab cache的方式是通过设置标志位来实现的。

Setting the flag ‘cpuset.memory_spread_page’ turns on a per-process flag
PFA_SPREAD_PAGE for each task that is in that cpuset or subsequently
joins that cpuset. The page allocation calls for the page cache
is modified to perform an inline check for this PFA_SPREAD_PAGE task
flag, and if set, a call to a new routine cpuset_mem_spread_node()
returns the node to prefer for the allocation.

Similarly, setting ‘cpuset.memory_spread_slab’ turns on the flag
PFA_SPREAD_SLAB, and appropriately marked slab caches will allocate
pages from the node returned by cpuset_mem_spread_node().

内存分配向结点的传播,都是通过设置标志PFA_SPREAD_PAGE或者PFA_SPREAD_SLAB来标记的,这个时候对应的函数cpuset_mem_spread_nodecpuset_mem_spread_node会返回希望分配的结点,举个例子,cpuset_mem_spread_node会从允许的节点中随机返回一个值,以达到分配对象分散在结点当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int cpuset_spread_node(int *rotor)
{
int node;

node = next_node(*rotor, current->mems_allowed);
if (node == MAX_NUMNODES)
node = first_node(current->mems_allowed);
*rotor = node;
return node;
}

int cpuset_mem_spread_node(void)
{
if (current->cpuset_mem_spread_rotor == NUMA_NO_NODE)
current->cpuset_mem_spread_rotor =
node_random(&current->mems_allowed);

return cpuset_spread_node(&current->cpuset_mem_spread_rotor);
}

对于CPU结点的控制是通过修改cpus_allowed来控制的,在task被唤醒的时候选择运行的rq时就会对掩码做判断,这是调度类需要实现的接口select_task_rq,比如CFS的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
* The caller (fork, wakeup) owns p->pi_lock, ->cpus_allowed is stable.
*/
static inline
int select_task_rq(struct task_struct *p, int sd_flags, int wake_flags)
{
int cpu = p->sched_class->select_task_rq(p, sd_flags, wake_flags);

/*
* In order not to call set_task_cpu() on a blocking task we need
* to rely on ttwu() to place the task on a valid ->cpus_allowed
* cpu.
*
* Since this is common to all placement strategies, this lives here.
*
* [ this allows ->select_task() to simply return task_cpu(p) and
* not worry about this generic constraint ]
*/
if (unlikely(!cpumask_test_cpu(cpu, tsk_cpus_allowed(p)) ||
!cpu_online(cpu)))
cpu = select_fallback_rq(task_cpu(p), p);

return cpu;
}

load_balance 设置的是cpusetCS_SCHED_LOAD_BALANCE标志,之后会调用update_cpumask,这个标志的更新会调用rebuild_sched_domains_locked(),会引起sched_domain的分配。当然这不是唯一的sched_domain重新划分的触发点,触发点有一下几点。

  1. 绑定了CPU并且该标记改变
  2. 这个标记为enable,绑定CPU发生改变
  3. 绑定了CPU,这个标记为enable,标记cpuset.sched_relax_domain_level发生改变
  4. 绑定了CPU,并且该标记设置了,但是cpuset被删除了
  5. CPU 转变 offline/online 状态

简单说一下[sched_domain](https://www.ibm.com/developerworks/cn/linux/l-cn-schldom/ sched domain 的详细内容)的作用,其实就是划定了负载均衡的 CPU 范围,默认是有一个全局的sched_domain,对所有 CPU 做负载均衡的,现在再划分出一个sched_domain把 CPU 的某个子集作为负载均衡的单元。
每个 Scheduling Domain 其实就是具有相同属性的一组 CPU 的集合. 并且跟据 Hyper-threading, Multi-core, SMP, NUMA architectures 这样的系统结构划分成不同的级别,不同级之间通过指针链接在一起, 从而形成一种的树状的关系, 如下图所示。

调度器会调用partition_sched_domains()来更新自己的scehd_domains,调度域发生作用的地方是在时钟中断的时候会触发SCHED_SOFTIRQ对任务做迁移,或者p->sched_class->select_task_rq,会在选择运行 CPU 时进行抉择,看一下 CFS 的实现的select_task_rq的简化流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 向上遍历更高层次的domain,如果发现同属一个domain
// 就是affine目标
for_each_domain(cpu, tmp) {
/*
* If both cpu and prev_cpu are part of this domain,
* cpu is a valid SD_WAKE_AFFINE target.
*/
if (want_affine && (tmp->flags & SD_WAKE_AFFINE) &&
cpumask_test_cpu(prev_cpu, sched_domain_span(tmp))) {
affine_sd = tmp;
break;
}

if (tmp->flags & sd_flag)
sd = tmp;
}
// 如果上面的条件满足,从prev_cpu中选出一个idle的new_cpu来运行.
if (affine_sd) {
if (cpu != prev_cpu && wake_affine(affine_sd, p, sync))
prev_cpu = cpu;
// 在同一个级别的sched domain向下找到一个idle的CPU.
new_cpu = select_idle_sibling(p, prev_cpu);
// 快速路径,有idle的CPU就不用负载均衡了.
goto unlock;
}
// 遍历层级
while (sd) {
// 找到负载最小的CPU
group = find_idlest_group(sd, p, cpu, load_idx);
if (!group) {
sd = sd->child;
continue;
}

new_cpu = find_idlest_cpu(group, p, cpu);
/* 如果最闲置的CPU没有变的话,或者没有找到的话,就向下遍历.
if (new_cpu == -1 || new_cpu == cpu) {
/* Now try balancing at a lower domain level of cpu */
sd = sd->child;
continue;
}

/* Now try balancing at a lower domain level of new_cpu */
cpu = new_cpu;
weight = sd->span_weight;
sd = NULL;
// 如果选出的节点weight比其他节点都大的话.
// 再向下一个层级遍历.
for_each_domain(cpu, tmp) {
if (weight <= tmp->span_weight)
break;
if (tmp->flags & sd_flag)
sd = tmp;
}
/* while loop will break here if sd == NULL */
}


负载均衡的对象有个例外。

CPUs in “cpuset.isolcpus” were excluded from load balancing by the
isolcpus= kernel boot option, and will never be load balanced regardless
of the value of “cpuset.sched_load_balance” in any cpuset.

如果boot选项标记了该CPU,会无视sched_load_balance的设置。

cpuset.sched_relax_domain_level有几个等级,越大越优先,表示迁移时搜索CPU的范围,这个主要开启了负载均衡选项的时候才有用。

  • -1 : no request. use system default or follow request of others. 用默认的或者按照其他组的优先级来.
  • 0 : no search,不搜索.
  • 1 : search siblings (hyperthreads in a core,搜索CPU当中的超线程).
  • 2 : search cores in a package.(搜索CPU当中的核).
  • 3 : search cpus in a node [= system wide on non-NUMA system]
  • 4 : search nodes in a chunk of node [on NUMA system]
  • 5 : search system wide [on NUMA system]

memory 子系统

看完 cpu 部分再开看一下内存子系统是如何做限制的

memory 子系统的参数比较多

  • memory.usage_in_bytes # 当前内存中的 res_counter 使用量
  • memory.memsw.usage_in_bytes # 当前内存和交换空间中的 res_counter 使用量
  • memory.limit_in_bytes # 设置/读取 内存使用量
  • memory.memsw.limit_in_bytes # 设置/读取 内存加交换空间使用量
  • memory.failcnt # 读取内存使用量被限制的次数
  • memory.memsw.failcnt # 读取内存和交换空间使用量被限制的次数
  • memory.max_usage_in_bytes # 最大内存使用量
  • memory.memsw.max_usage_in_bytes # 最大内存和交换空间使用量
  • memory.soft_limit_in_bytes # 设置/读取内存的soft limit
  • memory.stat # 统计信息
  • memory.use_hierarchy # 设置/读取 层级统计的使能
  • memory.force_empty # trigger forced move charge to parent?
  • memory.pressure_level # 设置内存压力通知
  • memory.swappiness # 设置/读取 vmscan swappiness 参数?
  • memory.move_charge_at_immigrate # 设置/读取 controls of moving charges?
  • memory.oom_control # 设置/读取 内存超限控制信息
  • memory.numa_stat # 每个numa节点的内存使用数量
  • memory.kmem.limit_in_bytes # 设置/读取 内核内存限制的hard limit
  • memory.kmem.usage_in_bytes # 读取当前内核内存的分配
  • memory.kmem.failcnt # 读取当前内核内存分配受限的次数
  • memory.kmem.max_usage_in_bytes # 读取最大内核内存使用量
  • memory.kmem.tcp.limit_in_bytes # 设置tcp 缓存内存的hard limit
  • memory.kmem.tcp.usage_in_bytes # 读取tcp 缓存内存的使用量
  • memory.kmem.tcp.failcnt # tcp 缓存内存分配的受限次数
  • memory.kmem.tcp.max_usage_in_bytes # tcp 缓存内存的最大使用量

对于大部分的数据是通过res_counter来保存的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* The core object. the cgroup that wishes to account for some
* resource may include this counter into its structures and use
* the helpers described beyond
*/

struct res_counter {
/*
* the current resource consumption level
*/
unsigned long long usage;
/*
* the maximal value of the usage from the counter creation
*/
unsigned long long max_usage;
/*
* the limit that usage cannot exceed
*/
unsigned long long limit;
/*
* the limit that usage can be exceed
*/
unsigned long long soft_limit;
/*
* the number of unsuccessful attempts to consume the resource
*/
unsigned long long failcnt;
/*
* the lock to protect all of the above.
* the routines below consider this to be IRQ-safe
*/
spinlock_t lock;
/*
* Parent counter, used for hierarchial resource accounting
*/
struct res_counter *parent;
};

获取方式是通过该结构相关的封装接口提供的,比如mem_cgroup_usage就是通过res_counter_red_u64来获取对应的res_counterRES_USAGE对应的值的,也就是unsigned long long usage这个成员。(如果不是root,还会递归获取rss和page cache的合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static inline u64 mem_cgroup_usage(struct mem_cgroup *memcg, bool swap)
{
u64 val;

if (!mem_cgroup_is_root(memcg)) {
if (!swap)
return res_counter_read_u64(&memcg->res, RES_USAGE);
else
return res_counter_read_u64(&memcg->memsw, RES_USAGE);
}

/*
* Transparent hugepages are still accounted for in MEM_CGROUP_STAT_RSS
* as well as in MEM_CGROUP_STAT_RSS_HUGE.
*/
// 如果是root就把所有的内存使用量都算进来.
val = mem_cgroup_recursive_stat(memcg, MEM_CGROUP_STAT_CACHE);
val += mem_cgroup_recursive_stat(memcg, MEM_CGROUP_STAT_RSS);

if (swap)
val += mem_cgroup_recursive_stat(memcg, MEM_CGROUP_STAT_SWAP);

return val << PAGE_SHIFT;
}

struct mem_cgroup 是负责内存 cgroup 的结构,简化的表示是

1
2
3
4
5
6
7
8
struct mem_cgroup {
struct cgroup_subsys_state css; // 通过css关联cgroup.
struct res_counter res; // mem统计变量
res_counter memsw; // mem+sw的和
struct res_counter kmem; // 内核内存统计量
...
}

这些参数的入口都在mm/memcontrol.c下,比如说memory.usage_in_bytes的读取调用的是mem_cgroup_read函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static ssize_t mem_cgroup_read(struct cgroup *cont, struct cftype *cft,
struct file *file, char __user *buf,
size_t nbytes, loff_t *ppos)
{
// 获取cgroup对应的mem_cgroup.
struct mem_cgroup *memcg = mem_cgroup_from_cont(cont);
char str[64];
u64 val;
int name, len;
enum res_type type;

// 获取读取的类型,memory.usage_in_bytes就是_MEM
type = MEMFILE_TYPE(cft->private);
// 名称
name = MEMFILE_ATTR(cft->private);

switch (type) {
case _MEM:
if (name == RES_USAGE)
val = mem_cgroup_usage(memcg, false);
else
val = res_counter_read_u64(&memcg->res, name);
break;
case _MEMSWAP:
if (name == RES_USAGE)
val = mem_cgroup_usage(memcg, true);
else
val = res_counter_read_u64(&memcg->memsw, name);
break;
case _KMEM:
val = res_counter_read_u64(&memcg->kmem, name);
break;
default:
BUG();
}

len = scnprintf(str, sizeof(str), "%llu\n", (unsigned long long)val);
return simple_read_from_buffer(buf, nbytes, ppos, str, len);
}

接下来再看一下这些值是在什么时候统计的,统计的入口是mem_cgroup_charge_common(),如果统计值超过限制就会在cgroup内进行回收。调用者分别是缺页时调用的mem_cgroup_newpage_charge 和 page cache 相关的mem_cgroup_cache_charge

简单复习一下内存分配的过程,来自维基百科

创建进程fork(), 程序载入execve(), 映射文件mmap(), 动态内存分配 malloc()/brk() 等进程相关操作都需要分配内存给进程。不过这时进程申请和获得的还不是实际内存,而是虚拟内存,准确的说是“内存区域”。进程对内存区域的分配最终都会归结到 do_mmap() 函数上来(brk调用被单独以系统调用实现,不用do_mmap()),内核使用 do_mmap() 函数创建一个新的线性地址区间。但是说该函数创建了一个新 VMA 并不非常准确,因为如果创建的地址区间和一个已经存在的地址区间相邻,并且它们具有相同的访问权限的话,那么两个区间将合并为一个。如果不能合并,那么就确实需要创建一个新的 VMA 了。但无论哪种情况,do_mmap() 函数都会将一个地址区间加入到进程的地址空间中–无论是扩展已存在的内存区域还是创建一个新的区域。同样,释放一个内存区域应使用函数 do_ummap(),它会销毁对应的内存区域。当进程需要内存时,从内核获得的仅仅是虚拟的内存区域,而不是实际的物理地址,进程并没有获得物理内存,获得的仅仅是对一个新的线性地址区间的使用权。实际的物理内存只有当进程真的去访问新获取的虚拟地址时,才会由”请求页机制”产生”缺页”异常,从而进入分配实际页面的例程

和下面来自内核文档

与用户进程相似,内核也有一个名为 init_mm 的 mm_strcut 结构来描述内核地址空间,其中页表项 pdg=swapper_pg_dir包含了系统内核空间(3G-4G)的映射关系。因此 vmalloc 分配内核虚拟地址必须更新内核页表,而kmalloc或get_free_page由于分配的连续内存,所以不需要更新内核页表。[13]

内存页的分配是基于伙伴分配系统,也就是基于2的阶乘通过拆分大阶乘的连续页和合并小阶乘的连续页来管理物理内存的方式,这在任何一本操作系统的书里都会讲到,我之前的博客也详细分析了。

当进程进入缺页异常的时候就会分配具体的物理内存,当物理内存使用超过高水平线以后,换页daemon(kswapd)就会被唤醒用于把内存交换到交换空间以腾出内存,当内存恢复至高水平线以后换页daemon进入睡眠。

缺页异常的入口是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static int __do_fault(struct mm_struct *mm, struct vm_area_struct *vma,
unsigned long address, pmd_t *pmd,
pgoff_t pgoff, unsigned int flags, pte_t orig_pte)
{
pte_t *page_table;
spinlock_t *ptl;
struct page *page;
struct page *cow_page;
pte_t entry;
int anon = 0;
struct page *dirty_page = NULL;
struct vm_fault vmf;
int ret;
int page_mkwrite = 0;

/*
* If we do COW later, allocate page befor taking lock_page()
* on the file cache page. This will reduce lock holding time.
*/
if ((flags & FAULT_FLAG_WRITE) && !(vma->vm_flags & VM_SHARED)) {

if (unlikely(anon_vma_prepare(vma)))
return VM_FAULT_OOM;

/* 分配内存并且映射到内存区间 */
cow_page = alloc_page_vma(GFP_HIGHUSER_MOVABLE, vma, address);
if (!cow_page)
return VM_FAULT_OOM;
/* 进行统计 */
if (mem_cgroup_newpage_charge(cow_page, mm, GFP_KERNEL)) {
page_cache_release(cow_page);
return VM_FAULT_OOM;
}
} else
cow_page = NULL;

mem_cgroup_newpage_charge则调用mem_cgroup_charge_common

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int mem_cgroup_newpage_charge(struct page *page,
struct mm_struct *mm, gfp_t gfp_mask)
{
if (mem_cgroup_disabled())
return 0;
// 不应该关联到页表
VM_BUG_ON(page_mapped(page));
// 对应用户态地址,但是不是匿名页
VM_BUG_ON(page->mapping && !PageAnon(page));
// mm 为空
VM_BUG_ON(!mm);
return mem_cgroup_charge_common(page, mm, gfp_mask,
MEM_CGROUP_CHARGE_TYPE_ANON);
}

mem_cgroup_charge_common内容是,返回ret < 0则是OOM。第一步是调用__mem_cgroup_try_charge查看当前使用量是否超过内存限制,如果超过就进行内存回收。第二步如果成功就调用__mem_cgroup_commit_charge添加统计值 ,不然就返回无法分配内存的错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int mem_cgroup_charge_common(struct page *page, struct mm_struct *mm,
gfp_t gfp_mask, enum charge_type ctype)
{
struct mem_cgroup *memcg = NULL;
unsigned int nr_pages = 1;
bool oom = true;
int ret;
if (PageTransHuge(page)) {
nr_pages <<= compound_order(page);
VM_BUG_ON(!PageTransHuge(page));
/*
* Never OOM-kill a process for a huge page. The
* fault handler will fall back to regular pages.
*/
oom = false;
}

ret = __mem_cgroup_try_charge(mm, gfp_mask, nr_pages, &memcg, oom);
if (ret == -ENOMEM)
return ret;
__mem_cgroup_commit_charge(memcg, page, nr_pages, ctype, false);
return 0;
}

__mem_cgroup_try_charge最终会调用mem_cgroup_do_charge,省略代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static int mem_cgroup_do_charge(struct mem_cgroup *memcg, gfp_t gfp_mask,
unsigned int nr_pages, unsigned int min_pages,
bool invoke_oom)
{
unsigned long csize = nr_pages * PAGE_SIZE;
struct mem_cgroup *mem_over_limit;
struct res_counter *fail_res;
unsigned long flags = 0;
int ret;
// 更新res计数器
ret = res_counter_charge(&memcg->res, csize, &fail_res);

if (likely(!ret)) {
if (!do_swap_account)
return CHARGE_OK;
// 计数成功,如果开启swap计数,记录memsw.
ret = res_counter_charge(&memcg->memsw, csize, &fail_res);
if (likely(!ret))
return CHARGE_OK;
// swap计数失败,退回res的计数
res_counter_uncharge(&memcg->res, csize);
// 获取fail_res对应的memcg,也就是计数失败的memcg.
mem_over_limit = mem_cgroup_from_res_counter(fail_res, memsw);
flags |= MEM_CGROUP_RECLAIM_NOSWAP;
} else
mem_over_limit = mem_cgroup_from_res_counter(fail_res, res);
// 回收内存
ret = mem_cgroup_reclaim(mem_over_limit, gfp_mask, flags);
// 告诉上层重试一次,可能回收了一些内存
if (nr_pages <= (1 << PAGE_ALLOC_COSTLY_ORDER) && ret)
return CHARGE_RETRY;
if (invoke_oom)
// 进入oom的处理
mem_cgroup_oom(mem_over_limit, gfp_mask, get_order(csize));

mem_cgroup_reclaim的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static unsigned long mem_cgroup_reclaim(struct mem_cgroup *memcg,
gfp_t gfp_mask,
unsigned long flags)
{
unsigned long total = 0;
bool noswap = false;
int loop;

if (flags & MEM_CGROUP_RECLAIM_NOSWAP)
noswap = true;
if (!(flags & MEM_CGROUP_RECLAIM_SHRINK) && memcg->memsw_is_minimum)
noswap = true;

for (loop = 0; loop < MEM_CGROUP_MAX_RECLAIM_LOOPS; loop++) {
if (loop)
drain_all_stock_async(memcg);
total += try_to_free_mem_cgroup_pages(memcg, gfp_mask, noswap);
/*
* Allow limit shrinkers, which are triggered directly
* by userspace, to catch signals and stop reclaim
* after minimal progress, regardless of the margin.
*/
if (total && (flags & MEM_CGROUP_RECLAIM_SHRINK))
break;
if (mem_cgroup_margin(memcg))
break;
/*
* If nothing was reclaimed after two attempts, there
* may be no reclaimable pages in this hierarchy.
*/
if (loop && !total)
break;
}
return total;
}

RSSpage_fault的时候记录,page cache是插入到inoderadix-tree中才记录的。
RSS在完全unmap的时候减少计数,page cachepage在离开inoderadix-tree才减少计数。
即使RSS完全unmap,也就是被kswapd给换出,可能作为SwapCache存留在系统中,除非不作为SwapCache,不然还是会被计数。
一个换入的page不会马上计数,只有被map的时候才会,当进行换页的时候,会预读一些不属于当前进程的page,而不是通过page fault,所以不在换入的时候计数。

补充:

  • why ‘memory+swap’ rather than swap.
    The global LRU(kswapd) can swap out arbitrary pages. Swap-out means
    to move account from memory to swap…there is no change in usage of
    memory+swap. In other words, when we want to limit the usage of swap without
    affecting global LRU, memory+swap limit is better than just limiting swap from
    an OS point of view.[12]

使用memoery+swap来统计而不是光统计swap,是因为kswapd换出的page只是从内存到了交换空间而已 ,在不影响kswpad的单页内存池LRU的情况下,这样的统计更有意义。

内核内存是不会被换出的。只有在被限制的时候才会开始计数。并且限制不能在已经有进程或者有子cgroup的情况下设置。
计数部分

When use_hierarchy == 1 and a group is accounted, its children will
automatically be accounted regardless of their limit value.

总结

目前总结了调度相关和内存相关的 cgroup 代码,可以看出 cgroup 本身其实主要是在一些 hook 的地方做检查,真正的控制的执行者还是调度器和内存分配器本身,cgroup 只是统计数据并且在必要的时候触发调度和内存回收等等。接下来我会就网络的部分进行一些分析,希望能够把完整的各个 cgroup 的子系统都能够解析一下。

参考:

  1. 《Docker 进阶与实战》
  2. http://abcdxyzk.github.io/download/kernel/Linux_Physical_Memory_Page_Allocation.pdf

首先介绍一下 hugepage 的背景。

一般来说操作系统分配内存的最小单元是页,一般是 4KB 大小,但是这个页放到现在来说可能有点“不够用”,因为很多程序内存消耗很大,分配内存很频繁,所以选择更大的页可以提升性能,大页带来的好处很多,首先是页表的层次可以减少,增加访存的速度,其次是减少 TLB miss 的概率,同时 page fault 也会减少,减少到 hugepage size / 4KB (x86_64 一般有 2MB 的 hugepage 和 1GB 的 hugepage)。例如下面这张图就说明了 hugepage 带来的改变。文章中的代码使用的是 4.x 的内核版本。

hugepage 有两种类型,一种是 THP(Transparent Huge Page) ,顾名思义,就是对用户来说对这种大页是无感知的,它本身可以被分成 4KB 的小页,并且可以被 swap out,有一个 Khugepaged 周期性扫描 4KB 的页合并成大页。

另一种大页是 persistent hugepage,这种 page 是预先分配的并且不能拆分成 4KB 小页,而且不能 swap out。这种的隐患是可能在内存 fragmentation 太多分不出大页的时候压缩小页,这在内存分配有压力的时候会造成很大的性能影响。

使用 persistent hugepage 可以通过 echo 512 | sudo tee /sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages,这会预先分配 512 个 2MB 的大页。具体使用是通过 fs/hugetlbfs 下实现的hugetlbfs来实现的。应用程序需要通过mmap进行文件映射来使用这些大页。具体的使用方式内核附带的一个测试可以作为参考。

这个 echo 触发的是 sysfsnr_hugepages_store_common, 它会设置最大的 hugepage 个数(存在 hstate 中,一个保存 hugepage 状态的结构体),h->max_huge_pages = set_max_huge_pages(h, count, nodes_allowed);set_max_huge_pages 有一个副作用就是调用 alloc_fresh_huge_page 来(分配或者减少)大页以达到 count 个。比如说分配会调用 ret = alloc_fresh_huge_page(h, nodes_allowed);,然后加入到 hstate 的 freelist 当中, 减少则是相反的,如果 freelist 上没有就会触发 buddysystem 的 __alloc_buddy_huge_page_no_mpol

我们来看一下具体的实现,首先关键的结构体是 hugetlbfs_file_operations , 其中规定了 mmap 函数,也就是当我打开 hugetlbfs 文件系统下的文件对对应的 fd 调用 mmap 的时候触发的对应的函数。

1
2
3
4
5
6
7
8
9
const struct file_operations hugetlbfs_file_operations = {
.read_iter = hugetlbfs_read_iter,
.mmap = hugetlbfs_file_mmap,
.fsync = noop_fsync,
.get_unmapped_area = hugetlb_get_unmapped_area,
.llseek = default_llseek,
.fallocate = hugetlbfs_fallocate,
};

在解释 mmap 之前,先复习一下进程空间内存的相关的内容。进程的内存空间是通过mm_struct这个结构体管理的。进程的地址空间,基本上是一些松散的区间,每个区间有相同的功能和保护属性(只读等属性),这个区间用 vm_area_struct 表示。再来看 hugetlbfs_file_mmap 中的一段代码。把申请的虚拟空间的地址长度按照大页对齐以后,保留对应个数的大页。

1
2
3
4
5
6
7
8
if (hugetlb_reserve_pages(inode,
vma->vm_pgoff >> huge_page_order(h),
len >> huge_page_shift(h), vma,
vma->vm_flags))
goto out;

ret = 0;

hugetlb_reserve_pages 要处理两种逻辑,如果是 VM_MAYSHARE 就从 inode 中取出 resv_map 并且获取分配长度,不然就使用vma的长度,然后用 hugepage_subpool_get_pageshugepage_subpool 中减掉对应的spool->rsv_hpages, 这个个人感觉也不是池子,只是一个统计数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct hstate *h = hstate_inode(inode); // 获取 hugepage state,里面保存了 hugepage 的相关信息
struct hugepage_subpool *spool = subpool_inode(inode); // 写获取 hugepage 的 pool, pool 其实只是个数字 rsv_pages。
...
// 然后根据 flag 算出要使用的长度 chg,我猜应该是 charge 的缩写。
if (!vma || vma->vm_flags & VM_MAYSHARE) {
resv_map = inode_resv_map(inode);

chg = region_chg(resv_map, from, to);

} else {
resv_map = resv_map_alloc();
if (!resv_map)
return -ENOMEM;

chg = to - from;

set_vma_resv_map(vma, resv_map);
set_vma_resv_flags(vma, HPAGE_RESV_OWNER);
}
...
// 最后从 hugepage 的 pool 减掉对应的个数。
gbl_reserve = hugepage_subpool_get_pages(spool, chg);
// 确定有足够的 hugepage 如果没有就从 buddysystem 里面取出来。
ret = hugetlb_acct_memory(h, gbl_reserve);
// 把 page 和 vma 做映射
region_add(resv_map, from, to);

do_page_fault 一直调用到 __handle_mm_fault 的时候,如果对应的 vma 是大页分配的,会直接进到hugetlb_fault,最后在缺页的时候会调用hugetlb_no_page,然后调用 alloc_huge_page_node,就会看到 __alloc_buddy_huge_page,开始走 buddy system, buddy system ,这里的 buddy system 没有细说,可以参考我之前的一篇文章

补充:

你可以理解mm_struct管理着一个vm_area_struct的链表,而 vm_area_struct 对应的主要操作函数如下(还有很多,这里只列了和本文相关的函数指针)。

1
2
3
4
5
6
struct vm_operations_struct {
void (*open)(struct vm_area_struct * area);
void (*close)(struct vm_area_struct * area);
int (*fault)(struct vm_area_struct *vma, struct vm_fault *vmf);
};

对应到 hugetlbfs 就是如下的 hugetlb_vm_ops 它会在对文件 mmap 的时候,进行初始化vma->vm_ops = &hugetlb_vm_ops;

1
2
3
4
5
6
const struct vm_operations_struct hugetlb_vm_ops = {
.fault = hugetlb_vm_op_fault,
.open = hugetlb_vm_op_open,
.close = hugetlb_vm_op_close,
};

但是注意,hugetlb_vm_op_fault 如果被触发说明有 BUG,因为 hugepage 的 page fault 是在do_page_fault里面独立处理的,不会调用到vm_operations_structfault接口。mmap 走的流程是 mmap -> mmap_region -> make_pages_presetn -> get_user_pages -> handle_page_fault -> handle_mm_fault -> hugetlb_fault

以上是大页的分配和使用的流程,希望对大家有帮助。

参考:

  1. Linux KVM concept - Memory
  2. Linux 内核中大页的实现与分析,第 1 部分

不同的隔离级别有不同的约束,而且不一定是子集和超集的关系,约束可能是交集的。这里尝试循序渐进的加强锁的级别和实现方式来阐述各个隔离级别的区别,由此可以选择在实际开发中适合业务的隔离策略。我自己最近也在调研 cockroachdb 比较关心事务这方面的性能,cockroachdb 的事务主要是两个级别,SI 和 SSI,会在下面提到。

总视图

这是各个隔离级别的一个关系,可以看出不一定是包含与被包含的关系(因为完整的隔离级别不只 4 种)。

定义

长期锁:到事务结束就释放的锁
短期锁:对相关数据操作完成就释放的锁

这里提到的写锁和排他锁可以互换,读锁和共享锁可以互换,长期锁也被称为二阶段锁,就是事务某个时候锁上了算一个阶段,最后一起释放算一个阶段。

断言型:基于先判的锁的修饰词,比如 WHERE 语句指定的范围就是预测型,如果没有 WHERE 可能就是整张表。

如果没有断言修饰的话,锁就是有指定数据的锁,也就是有明确索引的锁。

缩写 P(Phenomena) A(Anomalies)

P0 dirty write (脏写)

现象:

我们最开始的阶段是一切皆有可能发生,没有任何锁,所以碰到的第一个问题是脏写。

脏写导致的问题是,破坏数据的一致性,一个事务 (T1) 如果覆盖了另一个正在执行事务 (T2) 之前写入的值的时候就会导致脏写。比如 T1 写入 x=y=1 并且 T2 写入 x=y=2,但是整个数据的一致性就被破坏了。

解决:

对 x 和 y 持有长期写锁。基本上没有事务不拿长期写锁的,不然数据库连回退的可能都没有。防止脏写以后会出现新的现象,脏读。

P1 dirty read (脏读 read uncommited)

现象

事务 (T1) 写入的值被正在执行的事务 (T2) 读取。比如 x 向 y 转 40,在 x 写入以后,T2 看到的总和只有 60。

网上很多人把这个级别算作了脏写其实不是很严格,最开始那个 P0 级别才算是脏写。

解决

使用短期读锁和长期写锁,长期写锁可以防止 T2 的 X 读数据,短期读锁可以防止 T1 的 y 写不了(如果使用长期读锁就被阻塞到 T1 结束了)。解决脏读问题,我们又面临的问题是不可重复读

P2 non-repeatable read (不可重复读)

现象

当正在执行的事务 (T1) 读取的值被事务 (T2) 写入的时候,对 T1 来说就出现了不一致。例如下图,X 在被读取之后又被 T2 写入,这个时候总的钱数出现了不一致。

解决

使用长期读锁和长期写锁,也就是 T2 的 X 要等 T1 提交之后才能写入。对于不可重复读的问题。不过短期的断言型读锁也是足够的,因为 X 和 Y 如果都提前读取出来还是能保持一致的。我们解决了不可重复读以后,还会碰到幻读的情况。

P3 phantom (幻读)

现象

幻读发生在正在执行的事务 T1 有断言的读 (select where) 时,另外一个事务 T2 执行了和断言集合有交集的插入操作。
比如 T1 在 T2 之前读到了员工总数是 3,但是 T2 执行的时候有交集,插入了新的数据,这个时候员工总数是 4,但是 T1 如果再读取的话,就会发现员工总数变成了 4,而不是最初的 3,这就是幻读。

解决

解决幻读的方式是使用长期(断言型)读锁和写锁。也就是不允许在这个范围内进行插入操作。解决了幻读以后我们的事务就完全串行化了,这样的事务并发度是最弱的。

P4 update lost (更新丢失)

常见的 4 个隔离级别说完了以后我们看一下剩下的部分,注意更新丢失这个现象不是比幻读更约束的现象,这个是在基于防止脏读以后可能会出现的现象,会被可重复读防止。

现象

事务 T2 提交的写被其他事务覆盖,首先,没有脏写,因为 T2 已经提交,其次没有脏读,因为在写之后没有读操作,这样的现象称为更新丢失。

解决

升级到可重复读就可以了。

P4C cursor update lost (游标更新丢失)

现象

和更新丢失是一样的,这个只是约束在了游标操作的时候,事务 (T1) 对游标下的数据进行读之后被另一个事务 (T2) 提交了,在游标下的数据写之后让,T1 的写导致 T2 的更新丢失。

解决

在游标移动或者释放之前,都不释放锁,这个是到达可重复读之前的一个插曲。这个也是在读提交的阶段会发生的事情。

A5A read skew (读偏)

偏可以理解为不一致,这个是发生在多个数据之间有一个总的约束的时候。

现象

总的钱是 100,但是从 T1 的角度,总的钱数是 (50+75),因为只有短期读锁。

解决

使用快照隔离 (Snapshot Isolation),快照隔离是基于 MVCC 的。当一个 T 事务开始的时候,T 会获得一个抽象的时间戳(版本),当对数据 X 进行读取的时候,并不是直接看到最新写入的数据而是在 T 开始前的所有执行中的事务中最后一个对 X 标记的版本(如果 T 修改过 X,那么看到的是自己的版本)。也就是说 T 是基于当前的数据库的一个镜像进行操作的,有点类似于 Copy And Swap,而 T 开始执行是获得的版本就是这个快照的凭证。这样能保证所有的读都是基于一个一致的状态获取的。

SI 解决冲突的方法一般是 “First-Commiter-Wins” 也就是说,如果两个并发的事务修改了同一个数据,先写的事务会成功,而后写的事务会发现版本和原本的不一致而退出事务。

以我们的例子来说的话,T1 的 y 只会读到自己开始时候的版本,也就是 50,而不是 75,这样读偏就解决了。但是快照隔离还是不能解决另一个问题,就是写偏。这是我们要面临的新问题。

A5B write skew (写偏)

现象

这个和读偏类似,只不过,不一致在了整个系统上。T1 写锁有 y 的新版本,T2 写锁有 x 的新版本,他们没有写冲突,导致最后系统不一致,x+y 的钱变多了。

解决

目前快照隔离的算法有很多,参考 cockroachDB 使用的论文的话,可以说,通过对版本依赖构成有向图,解决成环问题,以此达到串行快照隔离的级别。

比如上面的例子,如果 T1 在 y 读了之后写了一个版本的 y 就构成一个先读后写的 rw(y) 依赖,类似的 T2 对 T1 构成了一个先读后写的 rw(x) 依赖。还有两种无害的依赖是先写后读 (wr) 和先写后写 (ww)。论文中阐述了,造成写偏的条件是成环,并且环中有两个连续的 rw 依赖。也就是下面这种形式。

这个问题的关键是,检查成环这件事情,就跟操作系统检查死锁一样,消耗太大了,性能上不能接受,所以这个实现的妥协是,把检查放宽,让一些无害的条件也被认定为有害,通过重试来恢复执行,至少防止错误的发生。
这个条件是只要有两个连续的 rw 依赖就会放弃提交,即使没有成环。这个检查发生在读的时候如果发现读的版本和自己开始之前的版本不一致就会找到依赖的事务,构建一条入边,另一个事务构建一条出边,如果某个事务入边出边都有 rw 边,这个节点就会被作为嫌疑人。当然还有其他关于串行 snapshot 隔离的论文可以参考。

其他

Oracle Consistent Read 也算是另一种快照级别的隔离。

参考文献

  1. A Critique of ANSI SQL Isolation Levels
  2. ASCI-SQL
  3. cockroachdb 使用的算法

援引自 cockroach 博客 的解释

cockroach 的底层 scaling 存储本身可以认为是一张无限大的有序 KV map,上层封装了一层 SQL。他 sharding 的方式是通过 range 实现的。对一组 64M 大小数据的范围内的 key 做冗余,用二级 metadata range 可以理论上存储 4EB 的数据。

举个例子,五节点打散三副本的一个 range 做冗余,是成本性能和效率上比较典型的一个折衷。像你说的 leader 集中的情况是比较少的,因为三副本是打散分布到不同节点上的,所以三副本的交集不会太集中,单点的压力还是很少的。

另外租约的存在(不知道算不算在 multi raft 算法内),也可以缓解集中化的压力,通过获得授权的租约旁路一些“可以本地化”的请求(比如只对一个 replica 的操作以及一些能保证数据不变的一段时间的读请求),来缓解主的压力,这个最早我记得在 GFS 里面看到过是这么设计的。

另外 gossip 的协议有点像交换机的生成树协议,是通过互相传播自己的负载和位置信心来传播信息的,没有中心化的决策节点,这样的好处就是无中心化,不会依靠一个中心的节点来做负载均衡,而且整个系统就需要一个二进制文件不依赖任何的其他服务。

如图是一个 range 在三节点上的三副本的一个 raft group 之间有网路通信。

但是如果 range 变多的话,就会很复杂,如下图,是四个节点之间大概 8 个 (如果我没看错颜色的话) range 的 raft group 之间通信。

很明显网络 RoundTrip 太多了。针对这个的优化就是 MultiRaft 了,说白了就是把网络请求 batch up,变成下面的形式。

这样网络压力就会小很多。针对这个的优化和应用本身的实现是耦合的,作者也尝试把这个优化加到 etcd/raft 当中,但是最后的决定是增加 RawNode 这样的无线程安全的接口给应用层实用,方便对心跳等请求的合并,所以你会看到有个非线程安全的 RawNode 的实现。

具体到 cockroach 的实现是可以找到相应的代码的,省略无关代码就如下。

在发送消息的时候检查是否有可以整合的请求。

1
2
3
4
5
6
7
8
9
func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
for range {
// 迭代每个 follower 发送消息
// 对消息进行缓存
if r.maybeCoalesceHeartbeat(ctx, msg, toReplica, fromReplica, true) {
continue
}
}
}

循环取出缓存的消息,注意因为是整合了消息,所以对消息来说有延迟,这里稍微加快了频率,弥补这个延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Since coalesced heartbeats adds latency to heartbeat messages, it is
// beneficial to have it run on a faster cycle than once per tick, so that
// the delay does not impact latency-sensitive features such as quiescence.
func (s *Store) startCoalescedHeartbeatsLoop() {
s.stopper.RunWorker(func() {
ticker := time.NewTicker(s.cfg.CoalescedHeartbeatsInterval)
defer func() {
ticker.Stop()
}()

for {
select {
case <-ticker.C:
s.sendQueuedHeartbeats()
case <-s.stopper.ShouldStop():
return
}
}
})
}

在 Store 这个层面是对每个 Replica (一个 raft group 的成员) 是有感知的,但是每个 raft group 是独立的,只是在发送是会检查对方节点是否和现在的 raft group 有交集,
从而尝试合并消息的发送,你可以理解为在节点之间的消息通道上对不同 raft group 之间多路复用。
这就是 MultiRaft 对多个 raft group 的网络吞吐做的一个优化,代价是可能造成消息的延迟,因为毕竟是被缓存到队列里了。

其实大家提的 LSM 最开始论文里面都使用树做搜索结构的, 现在在用的都不是严格的树结构了。

这篇文章解释的一样,从最朴素的角度上来讲可以把SSTable(sorted string table)作为一个连续的kv构成的块。

1
2
3
4
SSTable
+-+---+----+---+
|k| v | k | v | ...
+-+---+----+---+

对于一个大文件来说,读取整个文件以后就能构成一个各个键值的索引,当然可以在文件追加一块索引,和文件一起保存。

1
2
3
4
5
6
7
8
9
Index
+-+-------+
|k|offset |
+-+-------+
|k|offset |
+-+-------+
.
.
.

有了索引以后用 seek 操作或者直接把文件 mmap 到内存中都可以有很好的随机读性能。

但是对于随机写来说, 会造成大量的I/O,如果我们能够保证我们的SSTable是不可修改(immutable)的,只有SSTable在内存当中的时候(也就是MemTable)才可以修改,就能避免随机写的大负载问题。

通过下面几条约束就能完成我们的要求:

  1. 首先SSTable索引要放在内存中,这样读索引更快
  2. 所有写只能写到MemTable当中, 因为SSTable不可修改
  3. 所有读要先查看MemTable如果没有再查看内存中的索引(最后找到磁盘上的kv)
  4. 定期把MemTable刷成SSTable,这段时间MemTable也变成了不可修改的,新的MemTable会顶替
  5. 定期对SSTable进行合并

最终我们保证了随机写很快(因为只在MemTable中),随机读也很快(因为要么在MemTable中要么通过索引可以很快找到)。

还有一个问题是对于已有数据的删除和修改怎么办?

因为SSTable不可修改所以只能追加写一个新的数据覆盖老的数据,对于删除则是追加一个”墓碑”值覆盖掉存在的值。把索引指向新值,这样老值就不会被访问了。最后在SSTable合并的时候这些老值会完全消失。所以还要定期合并SSTable

以上是对leveldb的LSM结构的朴素解释。实际上MemTableSSTable都没有采用纯粹的树形结构,MemTable使用的是跳表而SSTable使用的是层次的结构。(这也是为什么 leveldb 叫 level db 的原因)

从这里开始完善朴素解释

首先对于MemTable来说不是持久化的如果重启导致内存中的数据丢失怎么办?WAL 表示的是预写日志,这个日志和MemTable是同步的,这个日志把每次的命令追加到日志中再更改MemTable,这样如果重启的话能够进行”重放”把从已经持久化的状态开始把数据填回到MemTable当中。

其次是对SSTable的合并,SSTable是分层存储的,第一层也就是Level0(被称作 young level),是MemTable刷入的一层,允许这一层的SSTable的key有交集。对于每一层都有一个阈值(young level 是 4,其他层是按大小算的,10^L MB),如果超过阈值自动向下一层合并,从level1开始的每一次key不允许有交集。具体的做法是从 young level 中把有交集的SSTable一起和下一层key有交集的SSTable合并成一个新的SSTable,然后其他层则是从自身层取出一个和下一层有交集的SSTable合并即可。这个属性可以用归纳法证明,从0层向1层合并的时候,1层只有一个的情况下肯定不会相交,然后假设n个的时候也不相交,在n+1的时候有交集,那么n+1合并时有0层的 key 和 n 当中的有交集,但是有交集的部分会被归并掉所以矛盾,所以n+1个的时候也是没有交集的。那1层能保证没有交集的话取出一个向下合并也是类似的不会有交集。所以再重复一遍分层存储的两个属性。

对于朴素解释的两个扩展使得我们对leveldb的设计更接近了。

  1. young层SSTable之间可能存在交集
  2. Li(i>0)层SSTable之间不存在交集

在这个基础上再增加几个约束条件,一个是,对于合并过程每超过2MB就会产生一个新文件,如果文件和下一层的文件有交集的个数有10个以上的话也会产生一个新文件,这样的目的是保证Ln和Ln+1之间不会重复太多。个人理解是覆盖太多,会成了倒三角的”树”情况,上一层搜索性能不好。

当然大量的随机读落在磁盘上还是会有性能问题,因为 seek 也可能是不连续的,这个可以想办法优化, 比如leveldb 里面使用了一种LRU缓存优化读性能。

参考

  1. 官方实现文档
  2. LSM-tree论文