ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

IP 层主要的工作是头部验证,头部选项的处理,分片和重组,以及路由,本篇文章主要分析 IP 层的主体流程,路由和分片的具体细节暂时略解。

ip_init 注册 ip_rcv 处理函数,然后初始化路由子系统,和对端管理器。两个结构 ip_tstampsip_identsip_rcv 是 IP 的入口,主要是一些参数检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/*
* Main IP Receive routine.
*/
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
const struct iphdr *iph;
struct net *net;
u32 len;

/* When the interface is in promisc. mode, drop all the crap
* that it receives, do not try to analyse it.
*/
// 如果 L2 地址不是本机地址 pkt_type 就会被设置成 PACKET_OHTERHOST
// 然后进行丢包
if (skb->pkt_type == PACKET_OTHERHOST)
goto drop;


net = dev_net(dev);
__IP_UPD_PO_STATS(net, IPSTATS_MIB_IN, skb->len);
// 检查引用计数,如果有人引用就复制一份自己的 skb。
skb = skb_share_check(skb, GFP_ATOMIC);
if (!skb) {
__IP_INC_STATS(net, IPSTATS_MIB_INDISCARDS);
goto out;
}
// 保证有 iphdr 大小,如果没有,则可能尝试从 skb_shinfo(skb)->frags[] 中获取
if (!pskb_may_pull(skb, sizeof(struct iphdr)))
goto inhdr_error;

iph = ip_hdr(skb);

/*
* RFC1122: 3.2.1.2 MUST silently discard any IP frame that fails the checksum.
*
* Is the datagram acceptable?
*
* 1. Length at least the size of an ip header
* 2. Version of 4
* 3. Checksums correctly. [Speed optimisation for later, skip loopback checksums]
* 4. Doesn't have a bogus length
*/
// ip 头部长度至少 20 个字节
if (iph->ihl < 5 || iph->version != 4)
goto inhdr_error;

BUILD_BUG_ON(IPSTATS_MIB_ECT1PKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_ECT_1);
BUILD_BUG_ON(IPSTATS_MIB_ECT0PKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_ECT_0);
BUILD_BUG_ON(IPSTATS_MIB_CEPKTS != IPSTATS_MIB_NOECTPKTS + INET_ECN_CE);
__IP_ADD_STATS(net,
IPSTATS_MIB_NOECTPKTS + (iph->tos & INET_ECN_MASK),
max_t(unsigned short, 1, skb_shinfo(skb)->gso_segs));
// 保证完整的头部大小
if (!pskb_may_pull(skb, iph->ihl*4))
goto inhdr_error;

iph = ip_hdr(skb);
// 做校验和
if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl)))
goto csum_error;
// 保证 skb buffer 的大小比 packet 长度大,不然就丢包
// 这个原因是 L2 有 padding? (TODO)
// 并且 packet 长度至少有头部那么大
len = ntohs(iph->tot_len);
if (skb->len < len) {
__IP_INC_STATS(net, IPSTATS_MIB_INTRUNCATEDPKTS);
goto drop;
} else if (len < (iph->ihl*4))
goto inhdr_error;

/* Our transport medium may have padded the buffer out. Now we know it
* is IP we can trim to the true length of the frame.
* Note this now means skb->len holds ntohs(iph->tot_len).
*/
if (pskb_trim_rcsum(skb, len)) {
__IP_INC_STATS(net, IPSTATS_MIB_INDISCARDS);
goto drop;
}

skb->transport_header = skb->network_header + iph->ihl*4;

/* Remove any debris in the socket control block */
memset(IPCB(skb), 0, sizeof(struct inet_skb_parm));
IPCB(skb)->iif = skb->skb_iif;

/* Must drop socket now because of tproxy. */
skb_orphan(skb);

return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING,
net, NULL, skb, dev, NULL,
ip_rcv_finish);

csum_error:
__IP_INC_STATS(net, IPSTATS_MIB_CSUMERRORS);
inhdr_error:
__IP_INC_STATS(net, IPSTATS_MIB_INHDRERRORS);
drop:
kfree_skb(skb);
out:
return NET_RX_DROP;
}

但实际上,大部分的函数是分成两部分的,真正的行为都在 _finish 后缀当中,前期都是检查。在 netfilter 里面 NF_HOOK 这个宏是有个 okfn 如果通过了 netfilter 的检查就会调用这个函数。对应的就是 ip_rcv_finish 当中,要决定是否继续向上层传递还是要进行找到出口设备确定下一跳,进行转发。如果是从设备就交给主设备的 handler 处理(这个和 VRF 有关,主设备代表这些从设备表示的一个域,用于分配一个专有的 FIB 表等等,类似某种程度的隔离)。

1
2
3
4
5
6
/* if ingress device is enslaved to an L3 master device pass the
* skb to its handler for processing
*/
skb = l3mdev_ip_rcv(skb);
if (!skb)
return NET_RX_SUCCESS;

如果设置了 ip_early_demux 并且不是分片的 IP 包,就会提前调用 TCP 的 early_demux 提前解复用这个包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (net->ipv4.sysctl_ip_early_demux &&
!skb_dst(skb) &&
!skb->sk &&
!ip_is_fragment(iph)) {
const struct net_protocol *ipprot;
int protocol = iph->protocol;

ipprot = rcu_dereference(inet_protos[protocol]);
if (ipprot && (edemux = READ_ONCE(ipprot->early_demux))) {
edemux(skb);
/* must reload iph, skb->head might have changed */
iph = ip_hdr(skb);
}
}

在路由系统中找到 dst 指向的 dst_entry,接下来的处理函数也会存在 dst 当中,为一下三种。

  • ip_forward() 转发到其他主机
  • ip_local_deliver() 传入传输层
  • ip_error() 出现了错误,可能会发送一个 ICMP
1
2
3
4
5
6
7
8
9
10
11
12
13
14

/*
* Initialise the virtual path cache for the packet. It describes
* how the packet travels inside Linux networking.
*/
if (!skb_valid_dst(skb)) {
int err = ip_route_input_noref(skb, iph->daddr, iph->saddr,
iph->tos, dev);
if (unlikely(err)) {
if (err == -EXDEV)
__NET_INC_STATS(net, LINUX_MIB_IPRPFILTER);
goto drop;
}
}

如果编译选项带了 CONFIG_IP_ROUTE_CLASSID 那么有流量控制的 classid 的就会进行一些统计。

1
2
3
4
5
6
7
8
9
10
11
#ifdef CONFIG_IP_ROUTE_CLASSID
if (unlikely(skb_dst(skb)->tclassid)) {
struct ip_rt_acct *st = this_cpu_ptr(ip_rt_acct);
u32 idx = skb_dst(skb)->tclassid;
st[idx&0xFF].o_packets++;
st[idx&0xFF].o_bytes += skb->len;
st[(idx>>16)&0xFF].i_packets++;
st[(idx>>16)&0xFF].i_bytes += skb->len;
}
#endif

首先如果 IP 头部长度大于 5 说明有 options,调用 ip_rcv_options 进行处理,如果失败了就进行丢包。

1
2
if (iph->ihl > 5 && ip_rcv_options(skb))
goto drop;

具体过程在 ip_rcv_options 当中。

1
2
3
4
5
if (skb_cow(skb, skb_headroom(skb))) {
__IP_INC_STATS(dev_net(dev), IPSTATS_MIB_INDISCARDS);
goto drop;
}

首先把 skb_headroom 等于 skb->data - skb->head,计算了头部的长度,如果这个头部有 clone 就会被复制一份来 declone

然后根据把 IP option 存进结构化的 inet_skb_parm 当中,其中有个成员是 struct ip_optionsIPCB

代表的是 #define IPCB(skb) ((struct inet_skb_parm*)((skb)->cb))skb->cb 是一个缓冲区用于协议栈每层的处理函数存放一些临时的私有变量。

1
2
3
4
5
6
7
8
9
iph = ip_hdr(skb);
opt = &(IPCB(skb)->opt);
opt->optlen = iph->ihl*4 - sizeof(struct iphdr);

if (ip_options_compile(dev_net(dev), opt, skb)) {
__IP_INC_STATS(dev_net(dev), IPSTATS_MIB_INHDRERRORS);
goto drop;
}

in_device 是和 IP 有关的设备信息,如果没有 source route 选项就直接跳过,不然处理 source route 选项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (unlikely(opt->srr)) {
struct in_device *in_dev = __in_dev_get_rcu(dev);

if (in_dev) {
if (!IN_DEV_SOURCE_ROUTE(in_dev)) {
if (IN_DEV_LOG_MARTIANS(in_dev))
net_info_ratelimited("source route option %pI4 -> %pI4\n",
&iph->saddr,
&iph->daddr);
goto drop;
}
}

if (ip_options_rcv_srr(skb))
goto drop;
}

source route 是一个多字节选项。此选项中,发送节点会列出后续几跳的 IP 地址(不能超过 IP 报头的最大长度)。如果列表中有某台主机宕机了,则必须重新计算来源地路由,重新发送,而不是使用动态路由。ip_options_rcv_srr 的具体工作就是根据提取出的目的地在本地计算目的地是否可达,如果成功就反回 0, 不然就丢弃。

ip_rcv_options 出来以后根据组播广播进行数据统计。下面的 IN_DEV_ORCONF 不太确定是啥 (TODO)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
rt = skb_rtable(skb);
if (rt->rt_type == RTN_MULTICAST) {
__IP_UPD_PO_STATS(net, IPSTATS_MIB_INMCAST, skb->len);
} else if (rt->rt_type == RTN_BROADCAST) {
__IP_UPD_PO_STATS(net, IPSTATS_MIB_INBCAST, skb->len);
} else if (skb->pkt_type == PACKET_BROADCAST ||
skb->pkt_type == PACKET_MULTICAST) {
struct in_device *in_dev = __in_dev_get_rcu(dev);

/* RFC 1122 3.3.6:
*
* When a host sends a datagram to a link-layer broadcast
* address, the IP destination address MUST be a legal IP
* broadcast or IP multicast address.
*
* A host SHOULD silently discard a datagram that is received
* via a link-layer broadcast (see Section 2.4) but does not
* specify an IP multicast or broadcast destination address.
*
* This doesn't explicitly say L2 *broadcast*, but broadcast is
* in a way a form of multicast and the most common use case for
* this is 802.11 protecting against cross-station spoofing (the
* so-called "hole-196" attack) so do it for both.
*/
if (in_dev &&
IN_DEV_ORCONF(in_dev, DROP_UNICAST_IN_L2_MULTICAST))
goto drop;
}

return dst_input(skb);

进入 dst_input 就会交给 skb­>dst ­>input 来处理。

转发会分成两部分处理 ip_forwardip_forward_finish IP 转发分成几个步骤

  1. 处理 IP options,可能会要求记录本地 IP 地址和时间戳
  2. 基于 IP 头,确保这个 pakcet 可以发出去
  3. 减 1 TTL,到达 0 就丢弃
  4. 根据 MTU 进行分组
  5. 发送至出口设备

期间如果出错了,会通过 ICMP 告知。xfrm4_xxx 是 IPsec 相关的函数。

1
2
if (IPCB(skb)->opt.router_alert && ip_call_ra_chain(skb))
return NET_RX_SUCCESS;

首先是检查 IP 选项当中有没有 Router Alert,如果有的话就交给 ip_ra_chain 中对此感兴趣的 raw socket 来处理,并且就次结束。

在这里检查 TTL 是否耗尽

1
2
3
if (ip_hdr(skb)->ttl <= 1)
goto too_many_hops;

如果是严格的源路由,下一条是网关而不是直接连接的路由就丢包。

rt_uses_gateway 代表两种意思

  • 1 的时候表示网关
  • 0 的时候表示直接路由
1
2
3
if (opt->is_strictroute && rt->rt_uses_gateway)
goto sr_failed;

如果超出 MTU 进行丢包

1
2
3
4
5
6
7
8
9
IPCB(skb)->flags |= IPSKB_FORWARDED;
mtu = ip_dst_mtu_maybe_forward(&rt->dst, true);
if (ip_exceeds_mtu(skb, mtu)) {
IP_INC_STATS(net, IPSTATS_MIB_FRAGFAILS);
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_FRAG_NEEDED,
htonl(mtu));
goto drop;
}

declone 这个 skb 并且确保预留 L2 的空间,然后减少 TTL。

1
2
3
4
5
6
7
8
/* We are about to mangle packet. Copy it! */
if (skb_cow(skb, LL_RESERVED_SPACE(rt->dst.dev)+rt->dst.header_len))
goto drop;
iph = ip_hdr(skb);

/* Decrease ttl after skb cow done */
ip_decrease_ttl(iph);

如果被标记为 IPSKB_DOREDIRECT 发送 redirect ICMP,接着设置优先级。

1
2
3
4
5
6
7
8
9
10
/*
* We now generate an ICMP HOST REDIRECT giving the route
* we calculated.
*/
if (IPCB(skb)->flags & IPSKB_DOREDIRECT && !opt->srr &&
!skb_sec_path(skb))
ip_rt_send_redirect(skb);

skb->priority = rt_tos2priority(iph->tos);

然后进入 ip_forward_finish

1
2
3
return NF_HOOK(NFPROTO_IPV4, NF_INET_FORWARD,
net, NULL, skb, skb->dev, rt->dst.dev,
ip_forward_finish);

ip_forward_finish 当中会进入 dst_outputip_forward_options 会处理一些 IP 选项并且重新计算 IP 头的校验和。

1
2
3
4
5
6

if (unlikely(opt->optlen))
ip_forward_options(skb);

return dst_output(net, sk, skb);

在内部有两种情况,一种是单播,一种是广播,对应的处理函数分别是 ip_outputip_mc_output 两种处理函数,会进行分组操作,然后在 ip_finish_output 当中进入邻居系统。

1
skb_dst(skb)->output(net, sk, skb);

ip_local_deliver 主要的工作是对分片进行重组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int ip_local_deliver(struct sk_buff *skb)
{
/*
* Reassemble IP fragments.
*/
struct net *net = dev_net(skb->dev);

if (ip_is_fragment(ip_hdr(skb))) {
if (ip_defrag(net, skb, IP_DEFRAG_LOCAL_DELIVER))
return 0;
}

return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN,
net, NULL, skb, skb->dev, NULL,
ip_local_deliver_finish);
}

这是 3 层接收端的一个大体结构,下面看一下 3 层发送端的一些内容。发送的入口

1
2
int ip_queue_xmit(struct sk_buff *skb, int ipfragok)

首先会检查是否已经有了路由信息,这个在 SCTP 的情况下会发生。

1
2
3
4
rt = skb_rtable(skb);
if (rt)
goto packet_routed;

检查是否缓存了 route 如果有的话却是路由信息的有效性

1
2
/* Make sure we can route this packet. */
rt = (struct rtable *)__sk_dst_check(sk, 0);

ip_route_output_ports 确保 source route list 的 下一跳和 daddr 一致,并且将路由设置在 skb 里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (!rt) {
__be32 daddr;

/* Use correct destination address if we have options. */
daddr = inet->inet_daddr;
if (inet_opt && inet_opt->opt.srr)
daddr = inet_opt->opt.faddr;

/* If this fails, retransmit mechanism of transport layer will
* keep trying until route appears or the connection times
* itself out.
*/
rt = ip_route_output_ports(net, fl4, sk,
daddr, inet->inet_saddr,
inet->inet_dport,
inet->inet_sport,
sk->sk_protocol,
RT_CONN_FLAGS(sk),
sk->sk_bound_dev_if);
if (IS_ERR(rt))
goto no_route;
sk_setup_caps(sk, &rt->dst);
}
skb_dst_set_noref(skb, &rt->dst);

接下来就需要构建 IP 头了,先调用空出需要的 IP 头部空间,并且重置我的大脑。

1
2
/* OK, we know where to send it, allocate and build IP header. */
skb_push(skb, sizeof(struct iphdr) + (inet_opt ? inet_opt->opt.optlen : 0));

重置 IP 头长度其实就是设置了 network_header 等于 data - head

1
skb_reset_network_header(skb);

转换成 16 位的指针并且把 IP 协议号,IP 头长度,inet->tos TOS 写入。

1
2
iph = ip_hdr(skb);
*((__be16 *)iph) = htons((4 << 12) | (5 << 8) | (inet->tos & 0xff));

初始化 iph

1
2
3
4
5
6
7
if (ip_dont_fragment(sk, &rt->dst) && !skb->ignore_df)
iph->frag_off = htons(IP_DF);
else
iph->frag_off = 0;
iph->ttl = ip_select_ttl(inet, &rt->dst);
iph->protocol = sk->sk_protocol;
ip_copy_addrs(iph, fl4);

如果有选项长度更新选项长度

1
2
3
4
5
6
/* Transport layer set skb->h.foo itself. */

if (inet_opt && inet_opt->opt.optlen) {
iph->ihl += inet_opt->opt.optlen >> 2;
ip_options_build(skb, &inet_opt->opt, inet->inet_daddr, rt, 0);
}

基于是否分片给 packet 分配 ID,然后进入 ip_local_out

1
2
3
4
5
6
7
8
ip_select_ident_segs(net, skb, sk,
skb_shinfo(skb)->gso_segs ?: 1);

/* TODO : should we use skb->sk here instead of sk ? */
skb->priority = sk->sk_priority;
skb->mark = sk->sk_mark;

res = ip_local_out(net, sk, skb);

当这里先告一段落,再来看一下 ip_append_data 这个函数的作用是缓存进合理的结构以便之后进行分片然后发送。ip_push_pending_frames 就可以触发这个动作。

1
2
3
4
5
6
7
8
int ip_append_data(struct sock *sk, struct flowi4 *fl4,
int getfrag(void *from, char *to, int offset, int len,
int odd, struct sk_buff *skb),
void *from, int len, int protolen,
struct ipcm_cookie *ipc,
struct rtable **rt,
unsigned int flags);

第一件事情,检查是否有 MSG_PROBE 的标志,有了这个标志的话,表示请求并不真的是需要向下调用。这个在测试对应 IP 地址的 PMTU 的时候会用到。

1
2
3
if (flags&MSG_PROBE)
return 0;

如果 sock 相关联的 sk_write_queue 队列为空,说明这个是第一个 IP fragment ,如果不是第一个那么就把 transhdrlen 设置成 0,因为只有第一个 IP fragment 才有头部长度的信息。

1
2
3
4
5
6
7
8
if (skb_queue_empty(&sk->sk_write_queue)) {
err = ip_setup_cork(sk, &inet->cork.base, ipc, rtp);
if (err)
return err;
} else {
transhdrlen = 0;
}

ip_setup_cork 主要是初始化了 net->corkcork 保存了 IP options 和 路由信息。

进入 __append_ip_data 之后可以看到

1
2
3
4
5
6
7
8
   skb = skb_peek_tail(queue);

exthdrlen = !skb ? rt->dst.header_len : 0;
mtu = cork->fragsize;
if (cork->tx_flags & SKBTX_ANY_SW_TSTAMP &&
sk->sk_tsflags & SOF_TIMESTAMPING_OPT_ID)
tskey = sk->sk_tskey++;

计算 L2 头部长度,fragment 头部长度,最大可以容纳的 fragment 长度,还有不分片的情况下的最大长度。(如果忽略分片就直接默认最大值,64KB 就是 0xFFFF,否则使用 mtu)

1
2
3
4
5
6
hh_len = LL_RESERVED_SPACE(rt->dst.dev);

fragheaderlen = sizeof(struct iphdr) + (opt ? opt->optlen : 0);
maxfraglen = ((mtu - fragheaderlen) & ~7) + fragheaderlen;
maxnonfragsize = ip_sk_ignore_df(sk) ? 0xFFFF : mtu;

首先累加的 buffer 长度不能超过最大的 IP 包长度。

1
2
3
4
5
6
if (cork->length + length > maxnonfragsize - fragheaderlen) {
ip_local_error(sk, EMSGSIZE, fl4->daddr, inet->inet_dport,
mtu - (opt ? opt->optlen : 0));
return -EMSGSIZE;
}

如果是第一个包,并且不会有新的分片,并且硬件支持 checksum 就可以标志为 CHECKSUM_PARTIAL

1
2
3
4
5
6
7
8
9
10
/*
* transhdrlen > 0 means that this is the first fragment and we wish
* it won't be fragmented in the future.
*/
if (transhdrlen &&
length + fragheaderlen <= mtu &&
rt->dst.dev->features & (NETIF_F_HW_CSUM | NETIF_F_IP_CSUM) &&
!(flags & MSG_MORE) &&
!exthdrlen)
csummode = CHECKSUM_PARTIAL;

下面如果满足下面条件就进入 UDP Fragment Offload 例程。是硬件网卡提供的一种特性,由内核和驱动配合完成相关功能。其目的是由网卡硬件来完成本来需要软件进行的分段(分片)操作用于提升效率和性能。如大家所知,在网络上传输的数据包不能大于 mtu,当用户发送大于 mtu 的数据报文时,通常会在传输层(或者在特殊情况下在 IP 层分片,比如 ip 转发或 ipsec 时)就会按 mtu 大小进行分段,防止发送出去的报文大于 mtu,为提升该操作的性能,新的网卡硬件基本都实现了 UFO 功能,可以使分段(或分片)操作在网卡硬件完成,此时用户态就可以发送长度大于 mtu 的包,而且不必在协议栈中进行分段(或分片)。如果硬件支持,是 UDP 协议,并且是大于 mtu 的可以直接用这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
if ((((length + fragheaderlen) > mtu) || (skb && skb_is_gso(skb))) &&
(sk->sk_protocol == IPPROTO_UDP) &&
(rt->dst.dev->features & NETIF_F_UFO) && !dst_xfrm(&rt->dst) &&
(sk->sk_type == SOCK_DGRAM) && !sk->sk_no_check_tx) {
err = ip_ufo_append_data(sk, queue, getfrag, from, length,
hh_len, fragheaderlen, transhdrlen,
maxfraglen, flags);
if (err)
goto error;
return 0;
}

剩下的代码有点啰嗦,总体来说就是把 buff 拆分成可以直接发送的 IP fragment,但是需要先把道理讲清楚,不然看代码有点复杂。

确定剩余可以用来拷贝的空间,不能超过 mtu 和 maxfraglen

1
2
3
4
5
/* Check if the remaining data fits into current packet. */
copy = mtu - skb->len;
if (copy < length)
copy = maxfraglen - skb->len;

如果不够,就需要分配一个新的 skb,这里面的几个变量具体解释一下,首先是

fraggap 表示的是 mtu 不是 8 的倍数,在最后那个比 8 的倍数多,又小于 mtu 的部分就是 fraggap 了,所以 datalenlength + fraggapfraggap 这部分会从 prev_skb 尾部移动到新 skb 的头部。fraglen 是带上 frag 头部的长度 fraglen = datalen + fragheaderlen。如果 flag 包含了 MSG_MORE 那么会尽量分配一个 mtu,当然这是在不支持 SG(Scatter/Gather I/O) 的情况下。因为支持 SG 的话,就可以直接分散的分配这些内存,最后进行 skb 的分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
		if (copy <= 0) {
char *data;
unsigned int datalen;
unsigned int fraglen;
unsigned int fraggap;
unsigned int alloclen;
struct sk_buff *skb_prev;
alloc_new_skb:
skb_prev = skb;
if (skb_prev)
fraggap = skb_prev->len - maxfraglen;
else
fraggap = 0;

/*
* If remaining data exceeds the mtu,
* we know we need more fragment(s).
*/
datalen = length + fraggap;
if (datalen > mtu - fragheaderlen)
datalen = maxfraglen - fragheaderlen;
fraglen = datalen + fragheaderlen;

if ((flags & MSG_MORE) &&
!(rt->dst.dev->features&NETIF_F_SG))
alloclen = mtu;
else
alloclen = fraglen;

alloclen += exthdrlen;

/* The last fragment gets additional space at tail.
* Note, with MSG_MORE we overallocate on fragments,
* because we have no idea what fragment will be
* the last.
*/
if (datalen == length + fraggap)
alloclen += rt->dst.trailer_len;

if (transhdrlen) {
skb = sock_alloc_send_skb(sk,
alloclen + hh_len + 15,
(flags & MSG_DONTWAIT), &err);
} else {
skb = NULL;
if (atomic_read(&sk->sk_wmem_alloc) <=
2 * sk->sk_sndbuf)
skb = sock_wmalloc(sk,
alloclen + hh_len + 15, 1,
sk->sk_allocation);
if (unlikely(!skb))
err = -ENOBUFS;
}
if (!skb)
goto error;


length 表示需要传输的长度,在循环不断进行中这个 length 就会变成 0。

接下来的部分是初始化 csumip_summed 保留硬件头部长度。

1
2
3
4
5
6
7
         /*
* Fill in the control structures
*/
skb->ip_summed = csummode;
skb->csum = 0;
skb_reserve(skb, hh_len);

设置 tx_flagstskey

1
2
3
4
5
6
/* only the initial fragment is time stamped */
skb_shinfo(skb)->tx_flags = cork->tx_flags;
cork->tx_flags = 0;
skb_shinfo(skb)->tskey = tskey;
tskey = 0;

保留数据空间,并且把指针移动到头部后面,指向负载的部分。

1
2
3
4
5
6
7
 *	Find where to start putting bytes.
*/
data = skb_put(skb, fraglen + exthdrlen);
skb_set_network_header(skb, exthdrlen);
skb->transport_header = (skb->network_header +
fragheaderlen);
data += fragheaderlen + exthdrlen;

这部分就是把上个 skb 的 fraggap 移到当前这个,并且重新获取 checksum。

1
2
3
4
5
6
7
8
9
10
if (fraggap) {
skb->csum = skb_copy_and_csum_bits(
skb_prev, maxfraglen,
data + transhdrlen, fraggap, 0);
skb_prev->csum = csum_sub(skb_prev->csum,
skb->csum);
data += fraggap;
pskb_trim_unique(skb_prev, maxfraglen);
}

接下来调用 getfrag 拷贝数据,加入到队尾。

getfrag 对应的 4 个 routine 分别是

  • ICMP icmp_glue_bits
  • UDP. ip_generic_getfrag
  • RAW iP ip_generic_getfrag
  • TCP. Ip_reply_glue_bits

getfrag 的功能就是从 from 拷贝到 to ,因为可能是用户态的数据,所以包含了地址转换的功能并且在比好的时候重新计算校验和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
	copy = datalen - transhdrlen - fraggap;
if (copy > 0 && getfrag(from, data + transhdrlen, offset, copy, fraggap, skb) < 0) {
err = -EFAULT;
kfree_skb(skb);
goto error;
}

offset += copy;
length -= datalen - fraggap;
transhdrlen = 0;
exthdrlen = 0;
csummode = CHECKSUM_NONE;

if ((flags & MSG_CONFIRM) && !skb_prev)
skb_set_dst_pending_confirm(skb, 1);

/*
* Put the packet on the pending queue.
*/
__skb_queue_tail(queue, skb);
continue;
}

剩下的就是 copy 足够,不需要分配新的 skb 的条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
if (copy > length)
copy = length;

if (!(rt->dst.dev->features&NETIF_F_SG)) {
unsigned int off;

off = skb->len;
if (getfrag(from, skb_put(skb, copy),
offset, copy, off, skb) < 0) {
__skb_trim(skb, off);
err = -EFAULT;
goto error;
}
} else {
int i = skb_shinfo(skb)->nr_frags;

err = -ENOMEM;
if (!sk_page_frag_refill(sk, pfrag))
goto error;

if (!skb_can_coalesce(skb, i, pfrag->page,
pfrag->offset)) {
err = -EMSGSIZE;
if (i == MAX_SKB_FRAGS)
goto error;

__skb_fill_page_desc(skb, i, pfrag->page,
pfrag->offset, 0);
skb_shinfo(skb)->nr_frags = ++i;
get_page(pfrag->page);
}
copy = min_t(int, copy, pfrag->size - pfrag->offset);
if (getfrag(from,
page_address(pfrag->page) + pfrag->offset,
offset, copy, skb->len, skb) < 0)
goto error_efault;

pfrag->offset += copy;
skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
skb->len += copy;
skb->data_len += copy;
skb->truesize += copy;
atomic_add(copy, &sk->sk_wmem_alloc);
}
offset += copy;
length -= copy;

VFIO——将设备暴露到用户态

在开始之前我们先要说一个东西就是 DMA,直接让设备访问内存,可以不通过 CPU 搬运数据。

这是一个比较简单的体系结构图,设备 和 CPU 通过存储控制器访问存储器。一个简单的 case 是 CPU 向存储器写数据,然后设备从存储器读数据。这么快来一切都很正常。但是实际上 CPU 是有一层缓存的,例如下面这样的。

CPU 想内存写数据,但是先要清空到不一致的缓存,然后设备再去读数据,不然设备读到的数据和 CPU 实际的数据会不一致(因为缓存里的数据可能和存储器的不一致),而且实际上缓存也不只是一层,所以需要一个中间层来保证 从 CPU 的角度和从设备的角度内存都是一致的,所以就有了下面这个结构。

CPU 和 设备都会走缓存验证一遍以后,再落到存储器上,这样带上缓存以后大家的一致性都是一样的了。所以从设备的角度,设备也拥有了缓存,实际上这个和 IOMMU 关系不是很大,接下来设备其实也可以和 CPU 一样有一层 MMU,也就是地址到存储器物理地址的转换。注意,这里我用了地址,因为对 CPU 来说是虚拟地址,但是对设备来说是一个总线域的地址。这里要明确区分一下,一个是总线地址,是从设备的角度来看的,一个是 CPU 的虚拟地址,这是从 CPU 角度来看的,两个是不同的东西。将总线域地址转换成存储器物理地址的设备就叫 IOMMU。

如果没有 IOMMU,DMA 也能照常工作,IOMMU 的主要作用就是保护功能,防止使用 DMA 的设备访问任意存储器的物理地址。

IOMMU 在不同架构上名字不太一样,AMD 叫 AMD-Vi,最开始针对的设备只是显卡,Intel 叫 VT-d,arm 叫 SMMU,具体对应的手册也不太一样,但是主要解决的问题是一致的。在 VTd 中,dmar (DMA remapping) 就是那个 IOMMU 设备,通过中断的方式实现类似 page fault 一样的内存分配行为。DMA 传输是由 CPU 发起的:CPU 会告诉 DMA 控制器,帮忙将 xxx 地方的数据搬到 xxx 地方。CPU 发完指令之后,就当甩手掌柜了。IOMMU 有点像 MMU 是一个将设备地址翻译到内存地址的页表体系,也会有对应的页表,这个东西在虚拟化中也非常有用,可以将原本有软件模拟的设备,用直接的硬件替代,而原本的隔离通过 IOMMU 来完成。如下图所示,原本需要通过软件模拟的驱动设备可以通过 IOMMU 以__安全__的方式来直接把硬件设备分配个用户态的 Guest OS。

理论上讲没有 IOMMU 实际上是可以工作的,但是硬件的角度,设备就拥有了整个存储器的全局视图,这是无论如何都非常不合理的事情,不应该让设备拥有访问任意物理内存的能力。

这里要提的另外一个功能就是对中断的隔离,类似于下面的通过在中断请求中添加标识来重定向中断到对应的中断回调上。

VFIO 的作用就是通过 IOMMU 以安全的方式来将设备的访问直接暴露到用户空间,而不用专门完成某个驱动等待合并到上游或者使用之前的对 IOMMU 没有感知的 UIO 的框架。通过 VFIO 向用户态开放 IOMMU 的功能,编写用户态的驱动。

对于 IOMMU 来说,隔离的级别不一定是单个设备,比如一个后面有几个设备的 PCI 桥,从 PCI 桥角度来说,都是来自 PCI 桥的总线事务。所以 IOMMU 有一个 iommu_group的概念,代表一组与其他设备隔离的设备的集合。

IOMMU 根据手册上讲还有一个域的概念,可以简单理解为一段物理地址的抽象。

iommu_group 的层级上,VFIO 封装了一层 container class,这个的作用对应于希望能够在不同的iommu_group 之间共享 TLBpage tables,这个就是一个集合的概念,跟容器的那个概念没啥关系,一个集合总归要有个名字。通过把 host 的 device 和 driver 解绑,然后绑定到 VFIO 的 driver 上,就会有个/dev/vfio/$GROUP/ 出现,然后这个 $GROUP代表的就是这个 device 的 iommu_group号,如果要使用 VFIO 就要把这个 group 下的所有 device 都解绑才可以。

通过打开/dev/vfio/vfio就能创建一个 VFIO 的 container,然后再打开/dev/vfio/$GROUPVFIO_GROUP_SET_CONTAINER ioctl 把文件描述传进去,就把 group 加进去了,如果支持多个 group 共享页表等结构,还可以把相应的 group 也加进去。(再强调一遍这个页表是总线地址到存储器物理地址,IOMMU 管理的那个页表)。

下面举个官方的栗子,获取 PCI 设备 0000:06:0d.0 的 group_id (PCI 命名的规则是 domain:bus:slot.func

1
2
$ readlink /sys/bus/pci/devices/0000:06:0d.0/iommu_group
../../../../kernel/iommu_groups/26

使用之前需要你已经加载了 VFIO 模块

1
modprobe vfio-pci

解绑 PCI 设备,然后创建一个 container id

1
2
3
4
$ lspci -n -s 0000:06:0d.0
06:0d.0 0401: 1102:0002 (rev 08)
# echo 0000:06:0d.0 > /sys/bus/pci/devices/0000:06:0d.0/driver/unbind
# echo 1102 0002 > /sys/bus/pci/drivers/vfio-pci/new_id

然后寻找其他同属于一个 group 的设备

1
2
3
4
5
6
7
8
$ ls -l /sys/bus/pci/devices/0000:06:0d.0/iommu_group/devices
total 0
lrwxrwxrwx. 1 root root 0 Apr 23 16:13 0000:00:1e.0 ->
../../../../devices/pci0000:00/0000:00:1e.0
lrwxrwxrwx. 1 root root 0 Apr 23 16:13 0000:06:0d.0 ->
../../../../devices/pci0000:00/0000:00:1e.0/0000:06:0d.0
lrwxrwxrwx. 1 root root 0 Apr 23 16:13 0000:06:0d.1 ->
../../../../devices/pci0000:00/0000:00:1e.0/0000:06:0d.1

PCI 桥 0000:00:1e.0 后面挂了两个设备,一个是刚才加进去的 0000:06:0d.0,还有一个是 0000:06:0d.1,通过上面的步奏加进去就可以。

最后一步是让用户有权限使用这个 group。

1
# chown user:user /dev/vfio/26

下面就是一个样例,从用户态使用 VFIO,整个的使用方式是通过 ioctl来获取中断相关信息,以及注册中断处理函数,然后也是通过 ioctl来获取region信息,然后调用相应的mmap函数,让 CPU 可以访问内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
int container, group, device, i;
struct vfio_group_status group_status =
{ .argsz = sizeof(group_status) };
struct vfio_iommu_type1_info iommu_info = { .argsz = sizeof(iommu_info) };
struct vfio_iommu_type1_dma_map dma_map = { .argsz = sizeof(dma_map) };
struct vfio_device_info device_info = { .argsz = sizeof(device_info) };

/* Create a new container */
container = open("/dev/vfio/vfio", O_RDWR);

if (ioctl(container, VFIO_GET_API_VERSION) != VFIO_API_VERSION)
/* Unknown API version */

if (!ioctl(container, VFIO_CHECK_EXTENSION, VFIO_TYPE1_IOMMU))
/* Doesn't support the IOMMU driver we want. */

/* Open the group */
group = open("/dev/vfio/26", O_RDWR);

/* Test the group is viable and available */
ioctl(group, VFIO_GROUP_GET_STATUS, &group_status);

if (!(group_status.flags & VFIO_GROUP_FLAGS_VIABLE))
/* Group is not viable (ie, not all devices bound for vfio) */

/* Add the group to the container */
ioctl(group, VFIO_GROUP_SET_CONTAINER, &container);

/* Enable the IOMMU model we want */
ioctl(container, VFIO_SET_IOMMU, VFIO_TYPE1_IOMMU);

/* Get addition IOMMU info */
ioctl(container, VFIO_IOMMU_GET_INFO, &iommu_info);

/* Allocate some space and setup a DMA mapping */
dma_map.vaddr = mmap(0, 1024 * 1024, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS, 0, 0);
dma_map.size = 1024 * 1024;
dma_map.iova = 0; /* 1MB starting at 0x0 from device view */
dma_map.flags = VFIO_DMA_MAP_FLAG_READ | VFIO_DMA_MAP_FLAG_WRITE;

ioctl(container, VFIO_IOMMU_MAP_DMA, &dma_map);

/* Get a file descriptor for the device */
device = ioctl(group, VFIO_GROUP_GET_DEVICE_FD, "0000:06:0d.0");

/* Test and setup the device */
ioctl(device, VFIO_DEVICE_GET_INFO, &device_info);

for (i = 0; i < device_info.num_regions; i++) {
struct vfio_region_info reg = { .argsz = sizeof(reg) };

reg.index = i;

ioctl(device, VFIO_DEVICE_GET_REGION_INFO, &reg);

/* Setup mappings... read/write offsets, mmaps
* For PCI devices, config space is a region */
}

for (i = 0; i < device_info.num_irqs; i++) {
struct vfio_irq_info irq = { .argsz = sizeof(irq) };

irq.index = i;

ioctl(device, VFIO_DEVICE_GET_IRQ_INFO, &irq);

/* Setup IRQs... eventfds, VFIO_DEVICE_SET_IRQS */
}

/* Gratuitous device reset and go... */
ioctl(device, VFIO_DEVICE_RESET);

include/linux/vfio.h里面有完整的 API,这里就简单略过。

在理解了一些基本原理和使用方式之后再来看 VFIO 的代码应该叫就比较容易理解了。

首先是作为 PCI 设备的 probe。主要是通过 vfio_iommu_group_get 分配 iommu_group,然后调用vfio_add_group_dev初始化设备回调接口vfio_pci_ops,而remove就是反过来把对应的结构释放掉就可以。然后再看注册的回调函数结构体。

1
2
3
4
5
6
7
8
9
10
static const struct vfio_device_ops vfio_pci_ops = {
.name = "vfio-pci",
.open = vfio_pci_open,
.release = vfio_pci_release,
.ioctl = vfio_pci_ioctl,
.read = vfio_pci_read,
.write = vfio_pci_write,
.mmap = vfio_pci_mmap,
.request = vfio_pci_request,
};

这里分析几个关键的函数,他们会通过file_operations vfio_fops被间接的调用。

首先是 mmap,就是在调用vfio_pci_mmap的时候最终调用remap_pfn_range(vma, vma->vm_start, vma->vm_pgoff, req_len, vma->vm_page_prot); 来将物理内存映射到用户态空间,这就是上面的栗子中 mmap 系统调用的入口,而具体要映射的物理内存是通过一系列pci_resource_xxx宏从 PCI bar 空间读出来的配置。

然后是 ioctl 接口,这个接口比较丰富,也简单的看一下。比如 VFIO_DEVICE_SET_IRQS会通过使用用户态传进来的结构体,调用vfio_pci_set_irqs_ioctl注册中断处理函数。而通过vfio_ioctl_set_iommu会设置 container 的 iommu_group 以及对应的 driver。read/write接口都是用于修改 PCI 配置信息的。

简单的来说,VFIO 的主要工作是把设备通过 IOMMU 映射的 DMA 物理内存地址映射到用户态中,让用户态程序可以自行操纵设备的传输,并且可以保证一定程度的安全,另外可以自行注册中断处理函数,从而在用户态实现设备的驱动程序,通过这样的框架,可以在 DPDK 中充分发挥用户态协议栈的威力。

参考文献

  1. PCI 基本概念
  2. dmar 和 iommu
  3. 总线基本概念
  4. 《PCI Express 体系结构导读》王齐著
  5. mastering the dma and iommu apis
  6. VFIO - “Virtual Function I/O”
  7. Intel® Virtualization Technology for Directed I/O: Spec
  8. Linux 下 PCI 设备驱动程序开发

在分析net_clsnet_prio之前先要解释几个东西,一个是网络的 QoS 以及 netfilter。

网络 QoS

IP 服务模型是尽力而为的,这样的模型不能体现某些流量的重要性,所以诞生了QOS技术,Linux 很早就提供了流量控制接口,命令行工具是tc

协议栈的 QoS 主要由三部分组成。

qdisc 队列规则(queueing discipline),class 控制策略,filter 根据 filter 划入具体的控制策略(class)。

一般由流程是这样的,当一个 qdisc 被入队的时候,会循环匹配其中的 filter 如果匹配到了的话,就会将 packet 入队到对应的 class 当中,大部分情况下被 class 的“所有者”代表的 qdisc 入队。一些没有匹配的 packet 会进入默认的 class。下面将会介绍几种常用的 queue discpline。

pfifo

默认 qdisc 就是 pfifo,实现在 net/sched/sch_prio.c

设有三个优先级的队列,优先级由高到低分别是

  1. “interactive”
  2. “best effort”
  3. “bulk”

先消费 0 队列再消费 1 队列,依次类推,一般的packet都是属于 1。

pfifo

用 IP 的 ToS 可以映射到这些队列,对应的关系如图。

tos

他的实现依赖下面这个结构,bands 一般是 3 个,代表三个不同优先级的队列,然后filter_list是他的过滤器列表,最后prio2band就是上图的ToS To Queue 的映射中的Linux priority 到 Band的那部分,queues保存的是三个fifoqdisc,也就是三个最简单的队列。

1
2
3
4
5
6
7
struct prio_sched_data {
int bands;
struct tcf_proto __rcu *filter_list;
u8 prio2band[TC_PRIO_MAX+1];
struct Qdisc *queues[TCQ_PRIO_BANDS];
};

这里虽然和 IP 的 ToS 暂时没有关系,但是最后cgroup的部分会提到怎么联系起来的。

pfifo enqueue 的时候会调用prio_classify根据skb->priority来选择队列进行入队,dequeue 的时候则 round robin 每次依次从优先级高到低取出一个 packet。

HTB (Hierarchical Token Bucket)

HTB 是一个层级令牌桶的 qdisc,而且可以加入 class,HTB 的整体结构如下,HTB 的作者在这里 解释了他的设计。

这里简单解释一下,每个 class 有一个 AR(保障速率)CR(最大速率),P(优先级),level(在树中的层次),quantum(量子,一个动态参数),和实际的速率 R,中间层的 class 通过计算子层的速率获得。

Leaf 没有子节点,并且只有 Leaf 有传输功能的 queue,其他只是帮助构造层级关系。

Mode class 的状态

  1. Red ( R > CR ) 也就是超速

  2. Yellow (R <= CR && R > AR) 也就是合理超速

  3. Green 就是没有超过保障速率

下面我们有几个等式

1
Rc = min(CRc, ARc + Bc)        [eq1]

Bc 表示从祖先那里借到的速率,下面这段公式表示的意思是,如果我是当前 prio 最小的,从优先级的兄弟节点中加权平均借到父节点的速率,其他的 prio 更大的节点都不能借到速率。

1
2
3
4
5
       Qc Rp
Bc = ----------------------------- iff min[Pi over D(p)]>=Pc [eq2]
sum[Qi over D(p) where Pi=Pc]

Bc = 0 otherwise [eq3]

如果没有父节点的话,Bc 就是 0。这样算出来代表的意义是什么呢,就是说先服务优先级更高的节点,并且按照量子的大小在同优先级的节点中分配速率。

然后我们具体看一下 HTB 调度器是如何工作的。

htb_sch_1

htb_sch_2

每个 class 有个 slot 包含不同的 prio 级别,指向 yellow 的子节点,然后每个层级包含一个 slot 也有不同的 prio 级别指向这一层中 green 的节点,上图展示了的是有两个 prio 的 slot,右边的 self slot 有个白色的是用于 yellow red 的 wait list。

假设当前如图 1 的状态,所有节点都是绿的没有 packet 到来,现在有有两个包到达 C 和 D,然后激活它们,并且它们现在都是绿的,所以 self slot 指向他们,然后因为 D 的优先级更高,所以先把 D 出队。所以你可以发现,出队的顺序很简答,就是按照优先级从 self slot 里面把绿色的包按顺序取出来就可以。

feed3

feed4

然后我们看一下更复杂的情况,在图 3 中,我们从 D 中出队一个 packet(htb_dequeue),然后htb_charge_class会增加 D 的速率,导致 D 变成 yellow,离开 self slot (通过htb_deactivate_prioshtb_remove_class_from_row),然后添加到 B 的 slot 里面 (htb_activate_prios),并且递归向上添加htb_add_class_to_row,D 会在一段时间后进入 self slot 的白色等待区,然后 D 又会变回绿色。现在如果选择的话,就会从 C 出队,因为虽然 C 的优先级低但是 C 不需要借别人的速率。

在图 4,假设 C 已经完全消耗了速率达到了最大限速,这个时候 D 就会开始工作然后把 B 消耗完,B 被消耗了以后就会去消耗 A,从这里就可以看到一个借取的过程。

htb_sch_5

htb_sch_6

现在说一个更复杂一点的例子,在图 5 中,A 已经被消耗光了,E 开始工作,然后 C 也能开始工作,变成图 6 的样子。注意即使 D 没有被使用但是他的优先级还会被 class slot 维持,也就是红线。但是 C 和 E 都是同一个优先级,这样的话,就要使用 DRR 算法(也就是在 RR 算法上给每个变量加一个权重,也就是之前的那个量子 quantom)。然后也可以发现一个 class 可以对不同的优先级(红色和蓝色)保持 active。

下面是一个 HTB 的全貌图,抽象的可以理解成一个从父 class 中根据优先级带权重的分享令牌的一个算法。

htb arch

下面三个值对应的就是三种颜色。

1
2
3
4
5
6
/* used internaly to keep status of single class */
enum htb_cmode {
HTB_CANT_SEND, /* class can't send and can't borrow */
HTB_MAY_BORROW, /* class can't send but may borrow */
HTB_CAN_SEND /* class can send */
};

HTB 的实现依赖于

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
struct htb_sched {
struct Qdisc_class_hash clhash;
int defcls; /* class where unclassified flows go to */
int rate2quantum; /* quant = rate / rate2quantum */

/* filters for qdisc itself */
struct tcf_proto __rcu *filter_list;

#define HTB_WARN_TOOMANYEVENTS 0x1
unsigned int warned; /* only one warning */
int direct_qlen;
struct work_struct work;

/* non shaped skbs; let them go directly thru */
struct qdisc_skb_head direct_queue;
long direct_pkts;

struct qdisc_watchdog watchdog;

s64 now; /* cached dequeue time */
struct list_head drops[TC_HTB_NUMPRIO];/* active leaves (for drops) */

/* time of nearest event per level (row) */
s64 near_ev_cache[TC_HTB_MAXDEPTH];

int row_mask[TC_HTB_MAXDEPTH];

struct htb_level hlevel[TC_HTB_MAXDEPTH];
};


其中hlevel对应的就是self slot,而clhash 可以通过 classid 找到 hub_class,其中的->un.inner就是对应的class slot->un.leaf对应的就是叶子结点。

使用

qdisc 参数

parent major:minor 或者 root

一个 qdisc 是根节点就是 root,否则其他的情况指定 parent。其中 major:minor 是 class 的 handle id,每个 class 都要指定一个 id 用于标识。

handle major: ,这个语法有点奇怪,是可选的,如果 qdisc 下面还要分类(多个 class),则需要指定这个 hanlde。对于 root,通常是”1:”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// handle 是一组用户指定的标示符,格式为 major:minor。
// 如果是一条 queueing discipline,minor 需要一直为 0。
# tc qdisc add dev eth0 root handle 1: htb

// parent 指明该新增的 class 添加到那一个父 handle 上去
// classid 指明该 class handle 的唯一 ID,minor 需要是非零值
// ceil 定义 rate 的上界
# tc class add dev eth0 parent 1:1 classid 1:6 htb rate 256kbit ceil 512kbit

// 新建一个带宽为 100kbps 的 root class, 其 classid 为 1:1
# tc class add dev eth0 parent 1: classid 1:1 htb rate 100kbps ceil 100kbps
// 接着建立两个子 class,指定其 parent 为 1:1,ceil 用来限制子类最大的带宽
# tc class add dev eth0 parent 1:1 classid 1:10 htb rate 40kbps ceil 100kbps
# tc class add dev eth0 parent 1:1 classid 1:11 htb rate 60kbps ceil 100kbps
// 随后建立 filter 指定哪些类型的 packet 进入那个 class
# tc filter add dev eth0 protocol ip parent 1:0 prio 1 u32 match ip src 1.2.3.4 match ip dport 80 0xffff flowid 1:10
# tc filter add dev eth0 protocol ip parent 1:0 prio 1 u32 match ip src 1.2.3.4 flow 1:11
// 最后为这些 class 添加 queuing disciplines
# tc qdisc add dev eth0 parent 1:10 handle 20: pfifo limit 5
# tc qdisc add dev eth0 parent 1:11 handle 30: sfq perturb 10

实现

用 rtnetlink 接口实现 API。

include/net/sch_generic.h下有Qdisc_ops,Qdisc_class_ops,tcf_proto_ops的定义。

同一个文件下还有Qdisc_class_ops的定义。

qdisc_run首先检查设备的运行状态然后调用__qdisc_run,这个函数的主体就是调用 qdisc_restart, 直到超过限制或者需要让出时间 CPU 了,最后清空 qdiscRUNNING状态。

在此结构中,enqueue 和 dequeue 两个函数是整个 QoS 调度的入口函数。其中的Qdisc_class_ops用于对此 Qdisc的 filter list 进行操作,添加删除等,通过对 Qdisc 添加 fliter,filter 对 enqueue 到此 Qdisc 的 pkt 进行分类,从而归类到此 Qdisc 的子 class 中,而每个子 class 都有自己的 Qdisc 进行 pkt enqueue 的管理,因此实现一个树形的 filter 结构。

callgraph Qdisc 在从来自上层的dev_queue_xmit主动发送开始起作用,对出口数据包作限制。

1
2
3
4
5
6
7
8
dev_queue_xmit ->
__dev_queue_xmit ->
__dev_xmit_skb ->
qdisc->enqueue
__qdisc_run ->
qdisc_restart ->
qdisc->dequeue ->
sch_direct_xmit

enqueue的时候会主动唤起dequeue也可能是硬件发送就绪会唤醒发送软中断来dequeue,描述的整个过程大概是图中的这部分。

补充

netfilter

netfilter 在数据包传输中有一些 hook 可以在其中注册回调函数。

netfilter

iptables

主要是四表五链,是 netfilter 的用户态工具。

deep dive

cgroup 子系统 net_cls (Network classifier cgroup)

net_cls 可以给 packet 打上 classid 的标签,用于过滤分类,有了上面的详细解释,这个 classid 的作用也非常明显了,就是用于标记skb所属的 qdisc class 的。

有了这个标签,流量控制器(tc)可以对不同的 cgroup 的 packet 起作用,Netfilter(iptables)也可以基于这个标签有对应的动作。创建一个 net_cls cgroup 对应的是创建一个 net_cls.classid 文件,这个文件初始化为 0。可以写 16 进制的 0xAAAABBBB 到这个文件里面,AAAA 是 major 号,BBBB 是 minor 号。读这个文件返回的是十进制的数字。

例子

1
2
3
4
mkdir /sys/fs/cgroup/net_cls
mount -t cgroup -onet_cls net_cls /sys/fs/cgroup/net_cls
mkdir /sys/fs/cgroup/net_cls/0
echo 0x100001 > /sys/fs/cgroup/net_cls/0/net_cls.classid

设置一个 10:1 handle.

1
2
cat /sys/fs/cgroup/net_cls/0/net_cls.classid
1048577

配置 tc:

1
2
tc qdisc add dev eth0 root handle 10: htb
tc class add dev eth0 parent 10: classid 10:1 htb rate 40mbit

创建 traffic class 10:1

1
tc filter add dev eth0 parent 10: protocol ip prio 10 handle 1: cgroup

配置 iptables,也可以用于这个 classid。

1
iptables -A OUTPUT -m cgroup ! --cgroup 0x100001 -j DROP

对应的实现在net/core/netclassid_cgroup.c下面。起作用的方式是css_cls_stateclassid并且sock_cgroup_set_classid(&sock->sk->sk_cgrp_data,(unsigned long)v)来设置sockclassid

cgroup net_prio 子系统

网络优先权(net_prio)子系统可以为各个 cgroup 中的应用程序动态配置每个网络接口的流量优先级。

net_prio.prioidx

只读文件。它包含一个特有整数值,kernel 使用该整数值作为这个 cgroup 的内部代表。

net_prio.ifpriomap

包含优先级图谱,这些优先级被分配给源于此群组进程的流量以及通过不同接口离开系统的流量。回顾pfifo里优先级映射,对应的就是这个值。该图用 的形式以成对列表表示:

1
2
3
4
~]# cat /cgroup/net_prio/iscsi/net_prio.ifpriomap
eth0 5
eth1 4
eth2 6

net_prio.ifpriomap 文件的目录可以使用上述格式,通过将字符串回显至文件的方式来修改。例如:

1
~]# echo "eth0 5" > /cgroup/net_prio/iscsi/net_prio.ifpriomap

上述指令将强制设定任何源于 iscsi net_prio cgroup 进程的流量和 eth0 网络接口传出的流量的优先级为 5。父 cgroup 也有可写入的 net_prio.ifpriomap 文件,可以设定系统默认优先级。

对应的实现在net/core/netprio_cgroup.c下面。实现方式是通过扩展dev->priomapprioid->prio的映射记录这个优先级和 cgroup 的关系。

net_prio 使用每个 cgroup 的 id(cgroupo->id)作为 sequence number,并将这个存储在 sk_cgrp_prioidx 中。sk_cgrp_prioidx 这个是单纯的用于设置网络包的优先级,使用这个之后将会覆盖之前通过 SO_PRIORITY socket 选项或者其他方式设置的值。

cgroup 的整体结构

cgroup 是容器当中对资源进行限制的机制,完整的名称是叫 control group。经常提到的 hierarchy 对应的是一个层级,而subsystem 对应的是一个子系统,都是可以望文生意的。创建一个层级是通过挂载完成的,也就是说层级对应的是文件系统 root 目录的结构。

子系统目前有下列几种

  1. devices 设备权限
  2. cpuset 分配指定的 CPU 和内存节点
  3. cpu 控制 CPU 占用率
  4. cpuacct 统计 CPU 使用情况
  5. memory 限制内存的使用上限
  6. freezer 暂停 Cgroup 中的进程
  7. net_cls 配合 tc(traffic controller)限制网络带宽
  8. net_prio 设置进程的网络流量优先级
  9. huge_tlb 限制 HugeTLB 的使用
  10. perf_event 允许 Perf 工具基于 Cgroup 分组做性能检测

创建层级通过 mount -t cgroup -o subsystems name /cgroup/name,/cgroup/name 是用来挂载层级的目录(层级结构是通过挂载添加的),-o 是子系统列表,比如 -o cpu,cpuset,memory,name 是层级的名称,一个层级可以包含多个子系统,如果要修改层级里的子系统重新 mount 即可。子系统和层级之间满足几个关系。

  1. 同一个 hierarchy 可以附加一个或多个 subsystem
  2. 一个 subsystem 可以附加到多个 hierarchy,当且仅当这些 hierarchy 只有这唯一一个 subsystem
  3. 系统每次新建一个 hierarchy 时,该系统上的所有 task 默认构成了这个新建的 hierarchy 的初始化 cgroup,这个 cgroup 也称为 root cgroup。对于你创建的每个 hierarchy,task 只能存在于其中一个 cgroup 中,即一个 task 不能存在于同一个 hierarchy 的不同 cgroup 中,但是一个 task 可以存在在不同 hierarchy 中的多个 cgroup 中。如果操作时把一个 task 添加到同一个 hierarchy 中的另一个 cgroup 中,则会从第一个 cgroup 中移除

/proc/self 对应的是当前进程的 proc 目录,比如当前进程 pid 是1,那么/proc/1/proc/self是等价的。运行man proc可以看到/proc/self/cgroup的解释。

/proc/[pid]/cgroup (since Linux 2.6.24)
This file describes control groups to which the process/task belongs. For each cgroup hierarchy there is one entry
containing colon-separated fields of the form:

5:cpuacct,cpu,cpuset:/daemons

The colon-separated fields are, from left to right:

1. hierarchy ID number

2. set of subsystems bound to the hierarchy

3. control group in the hierarchy to which the process belongs

This file is present only if the CONFIG_CGROUPS kernel configuration option is enabled.

这个展示的是当前进程属于的 control groups, 每一行是一排 hierarchy,中间是子系统,最后是受控制的 cgroup,可以通过这个文件知道自己所属于的cgroup。

cgroup 的创建

创建一个独立的 cgroup 则是在层级结构下面创建一个目录。

先看一下创建目录做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static int cgroup_mkdir(struct inode *dir, struct dentry *dentry, int mode)
{
struct cgroup *c_parent = dentry->d_parent->d_fsdata;

/* the vfs holds inode->i_mutex already */
return cgroup_create(c_parent, dentry, mode | S_IFDIR);
}

static long cgroup_create(struct cgroup *parent, struct dentry *dentry,
mode_t mode)
{
/* 获取父cgroup对应的层级hierarchy */
/* 这里叫做 cgroupfs 其实是匹配的,因为创建 hierachy 就是 mount 了一个文件系统的动作 */
struct cgroupfs_root *root = parent->root;
/* 初始化一个cgroup结构 cgrp */
init_cgroup_housekeeping(cgrp);
cgrp->parent = parent; /* 设置父 cgroup */
cgrp->root = parent->root; /* 继承 parent 的 hierachy */
cgrp->top_cgroup = parent->top_cgroup; /* 继承父 parent 对应 hierachy 的root cgroup */
/* 继承父 parent 的 notify_on_release 设置 */
if (notify_on_release(parent))
set_bit(CGRP_NOTIFY_ON_RELEASE, &cgrp->flags);
/* 对所属的hierachy的子系统进行初始化 */
for_each_subsys(root, ss) {
struct cgroup_subsys_state *css = ss->create(ss, cgrp);
if (IS_ERR(css)) {
err = PTR_ERR(css);
goto err_destroy;
}
init_cgroup_css(css, ss, cgrp);
if (ss->use_id)
if (alloc_css_id(ss, parent, cgrp))
goto err_destroy;
/* At error, ->destroy() callback has to free assigned ID. */
}
/* 加入到父 cgroup 的子列表里 */
cgroup_lock_hierarchy(root);
list_add(&cgrp->sibling, &cgrp->parent->children);
cgroup_unlock_hierarchy(root);
/* 创建 cgroup 目录 */
cgroup_create_dir(cgrp, dentry, mode);
/* 创建目录下对应的文件,比如common的部分(tasks),或者子系统的部分(cpu.shares,freezer.state)*/
cgroup_populate_dir(cgrp);

看一下 cgroup_subsys_state->create 的实现,举个例子比如kernel/cpuset.ccpuset子系统的创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static struct cgroup_subsys_state *cpuset_create(
struct cgroup_subsys *ss,
struct cgroup *cont)
{
struct cpuset *cs;
struct cpuset *parent;

if (!cont->parent) {
return &top_cpuset.css;
}
parent = cgroup_cs(cont->parent);
cs = kmalloc(sizeof(*cs), GFP_KERNEL);
if (!cs)
return ERR_PTR(-ENOMEM);
if (!alloc_cpumask_var(&cs->cpus_allowed, GFP_KERNEL)) {
kfree(cs);
return ERR_PTR(-ENOMEM);
}

cs->flags = 0;
if (is_spread_page(parent))
set_bit(CS_SPREAD_PAGE, &cs->flags);
if (is_spread_slab(parent))
set_bit(CS_SPREAD_SLAB, &cs->flags);
set_bit(CS_SCHED_LOAD_BALANCE, &cs->flags);
cpumask_clear(cs->cpus_allowed);
nodes_clear(cs->mems_allowed);
fmeter_init(&cs->fmeter);
cs->relax_domain_level = -1;

cs->parent = parent;
number_of_cpusets++;
return &cs->css ;
}

其实就是一系列 cgroup 初始化动作,填充目录的部分作为接口留给子系统实现。

总结一下:

hierarchy 对应的是 cgroup 的一个根,拥有一个top_cgroup,之后 hierarchy 下面的目录(cgroup)都是继承这些内容。

真正起作用的入口其实是对文件的读写操作,关于这一块VFS的内容可以看一下我之前的博客,这也是由子系统实现的,接下来看看子系统的实现。

freezer 子系统

freeze tasks 的相关内容可以在内核文档当中找到,简单来说是为了提供一种机制能够让进程挂起。这些函数在电源控制里面有很多用到的地方,比如我们常说的挂起,就是让所有进程进入冬眠状态。

首先看这个子系统是因为它比较简单,属性比较少,实现的代码也比较少。

这里铺垫一些知识,说明内核是如何睡眠和唤醒进程的。
一般内核进程进入睡眠需要进入wait_queue,然后调用 schedule。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* wait 是我们想要让 task 睡眠的 queue entry, q 是等待队列 */
DEFINE_WAIT(wait);
/* 添加到等待队列中 */
add_wait_queue(q, &wait);

/*
* 这里要检查condition是因为可能在唤醒之后这个condition的条件又不成立了,
* 这个和条件变量一样,即使条件满足被wake up了,也可能被其他进程修改了该条件.
*/
while (!condition) {
prepare_to_wait(&q, &wait, TASK_INTERRUPTIBLE);
if (signal_pending(current))
/* 处理信号 */
/* 进行调度 */
schedule();
}

finish_wait(&g, &wait);

内核当中进入睡眠都是这个模板,这里的 schedule,会触发调度器遍历 scheduler class,选择优先级最高的调度类。
一般就是CFS(Complete Fair Scheduler),接下来会进行 context 切换,注意不是像一般理解的那样,函数调用增长栈空间,而是把栈和寄存器都换掉,刷掉缓存等等,由此进入另外一个进程的上下文。直到被唤醒从恢复保存的 IP 重新开始执行。

唤醒的过程则是,调用wake_up()函数把 task 的状态重新设置为 TASK_RUNNING,并且把task从等待队列移除。
它会使用 enqueue_task() 把任务从新加入到调度器中。如果是 CFS 调度器的话就是加入到红黑树中,当need_resched设置了的话会显式调用调用schedule()调度,不然还会继续执行唤醒者的上下文。

然后说一下Freezing of tasks,就是通过发送信号唤醒用户态的进程和内核进程,所有这些进程需要响应这个信号并且最后调用 refrigerator() 进入睡眠,也就上面的那个循环。
下图是进入冬眠进程的过程.

“冰箱”这个函数名称很形象,就是把当前 task 丢入睡眠状态直到解封。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* Refrigerator is place where frozen processes are stored :-). */
void refrigerator(void)
{
/* Hmm, should we be allowed to suspend when there are realtime
processes around? */
long save;

task_lock(current);
if (freezing(current)) {
frozen_process();
task_unlock(current);
} else {
task_unlock(current);
return;
}
save = current->state;
pr_debug("%s entered refrigerator\n", current->comm);

spin_lock_irq(&current->sighand->siglock);
recalc_sigpending(); /* We sent fake signal, clean it up */
spin_unlock_irq(&current->sighand->siglock);

/* prevent accounting of that task to load */
current->flags |= PF_FREEZING;

for (;;) {
set_current_state(TASK_UNINTERRUPTIBLE);
if (!frozen(current))
break;
schedule();
}

/* Remove the accounting blocker */
current->flags &= ~PF_FREEZING;

pr_debug("%s left refrigerator\n", current->comm);
__set_current_state(save);
}

所以 freezer subsystem 干的事情就是这样一件事情,把 cgroup 中的进程进行挂起和恢复。现在具体看一下实现。

1
2
3
4
5
6
7
8
9
10
11
enum freezer_state {           
CGROUP_THAWED = 0,
CGROUP_FREEZING,
CGROUP_FROZEN,
};

struct freezer {
struct cgroup_subsys_state css;
enum freezer_state state;
spinlock_t lock; /* protects _writes_ to state */
};

freezer 有三种状态,THAWED,FREEZING,FROZEN,分别代表正常状态,停止中和停止。

freezer 对应的文件有state,cftype是对vfs的file结构的一个封装,最后加上子系统的name,文件名对应的就是”freezer.state”。
freezer.state更改文件内容的操作就可以更改cgroup当中task的挂起和恢复.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* 文件的读写函数 */
static struct cftype files[] = {
{
.name = "state",
.read_seq_string = freezer_read,
.write_string = freezer_write,
},
};

/* 添加子系统文件到cgroup目录中 */
static int freezer_populate(struct cgroup_subsys *ss, struct cgroup *cgroup)
{
if (!cgroup->parent)
return 0;
return cgroup_add_files(cgroup, ss, files, ARRAY_SIZE(files));
}

首先看一下freezer_read.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int freezer_read(struct cgroup *cgroup, struct cftype *cft,
struct seq_file *m)
{
struct freezer *freezer;
enum freezer_state state;

if (!cgroup_lock_live_group(cgroup))
return -ENODEV;

freezer = cgroup_freezer(cgroup);
spin_lock_irq(&freezer->lock);
state = freezer->state;
if (state == CGROUP_FREEZING) {
/* We change from FREEZING to FROZEN lazily if the cgroup was
* only partially frozen when we exitted write. */
update_freezer_state(cgroup, freezer);
state = freezer->state;
}
spin_unlock_irq(&freezer->lock);
cgroup_unlock();

seq_puts(m, freezer_state_strs[state]);
seq_putc(m, '\n');
return 0;
}

整个函数就是把 freezer 的 state 转换成字符换然后读取出来。

再看下 freezer_write 是如何改变进程状态的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int freezer_write(struct cgroup *cgroup,
struct cftype *cft,
const char *buffer)
{
int retval;
enum freezer_state goal_state;

if (strcmp(buffer, freezer_state_strs[CGROUP_THAWED]) == 0)
goal_state = CGROUP_THAWED;
else if (strcmp(buffer, freezer_state_strs[CGROUP_FROZEN]) == 0)
goal_state = CGROUP_FROZEN;
else
return -EINVAL;

if (!cgroup_lock_live_group(cgroup))
return -ENODEV;
retval = freezer_change_state(cgroup, goal_state);
cgroup_unlock();
return retval;
}

其实只是把写入的字符串转换成对应的枚举类型,然后调用freezer_change_state(cgroup, goal_state);

为了不贴过多的代码,这里略写,其实是根据类型不同进行调用了unfreeze_cgrouptry_to_freeze_cgroup

try_to_freeze_cgroup 遍历每个task执行freeze操作,而unfreeze也是类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int try_to_freeze_cgroup(struct cgroup *cgroup, struct freezer *freezer)
{
struct cgroup_iter it;
struct task_struct *task;
unsigned int num_cant_freeze_now = 0;

freezer->state = CGROUP_FREEZING;
cgroup_iter_start(cgroup, &it);
while ((task = cgroup_iter_next(cgroup, &it))) {
/* 尝试freeze task */
if (!freeze_task(task, true))
continue;
if (is_task_frozen_enough(task))
continue;
if (!freezing(task) && !freezer_should_skip(task))
num_cant_freeze_now++;
}
cgroup_iter_end(cgroup, &it);

return num_cant_freeze_now ? -EBUSY : 0;
}

所以,这里我们看最后的freeze和unfreeze某个task的动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool freeze_task(struct task_struct *p, bool sig_only)
{
/*
* We first check if the task is freezing and next if it has already
* been frozen to avoid the race with frozen_process() which first marks
* the task as frozen and next clears its TIF_FREEZE.
*/
if (!freezing(p)) {
rmb();
/* 如果frozen标记了
* 说明已经冻结,就返回失败
*/
if (frozen(p))
return false;

if (!sig_only || should_send_signal(p))
set_freeze_flag(p);
else
return false;
}

if (should_send_signal(p)) {
if (!signal_pending(p))
fake_signal_wake_up(p);
} else if (sig_only) {
return false;
} else {
wake_up_state(p, TASK_INTERRUPTIBLE);
}

return true;
}

停止的方式就是通过标记 freeze_flag,然后通过发送信号或者唤醒 task 来处理TIF_FREEZE标记(取决于是否设置了PF_FREEZER_NOSIG)。
最后又回到了之前给的那张流程图,等处理函数运行又会try_to_freeze()检查信号或者标志位,然后进入冰箱,而唤醒的方式则是反过来,把标记清除并且wake_up进程即可。

cpu 子系统

cpu子系统是对CPU时间配额进行限制的子系统,属性在这里列举一下

  • cpu.cfs_period_us 完全公平调度器的调整时间配额的周期
  • cpu.cfs_quota_us 完全公平调度器的周期当中可以占用的时间
  • cpu.stat 统计值
    • nr_periods 进入周期的次数
    • nr_throttled 运行时间被调整的次数
    • throttled_time 用于调整的时间
  • cpu.share cgroup中cpu的分配,如果a group是100,b group是300,那么a就会获得\(\frac{1}{4}\),b就会获得\(\frac{3}{4}\)的CPU。

CFS 调度器

接下来看一下对于 CPU 的限制是如何做到的,这要补充一下 CFS(完全公平调度器) 的相关的内容

CFS 保证进程之间完全公平获得 CPU 的份额,和我们传统的操作系统的时间片的理念不同,CFS 计算进程的 vruntime (其实就是总时间中的比例,并且带上进程优先级作为权重),来选择需要调度的下一个进程。用户态暴露的权重就是nice值,这个值越高权重就会低,反之亦然。(坊间的解释是 nice 的意思就是我对别的进程很 nice ,所以让别的进程多运行一会儿,自己少运行一会儿)。

CFS主要有几点,时间计算,进程选择,调度入口。

时间计算

先看下面这句话

Linux is a multi-user operating system. Consider a scenario where user A spawns ten tasks and user B spawns five. Using the above approach, every task would get ~7% of the available CPU time within a scheduling period. So user A gets 67% and user B gets 33% of the CPU time during their runs. Clearly, if user A continues to spawn more tasks, he can starve user B of even more CPU time. To address this problem, the concept of “group scheduling” was introduced in the scheduler, where, instead of dividing the CPU time among tasks, it is divided among groups of tasks.

总结来说 CPU 的时间并不是分给独立的 task 的,而是分给 task_group 的,这样防止用户 A 的进程数远远大于 B 而导致 B 饥饿的情况。这一组task通过 sched_entity 来表示。能够导致进程分组的方式一种是把进程划入一个cgroup,一种是通过set_sid()系统调用的新session中创建的进程会自动分组,这需要CONFIG_SCHED_AUTOGROUP编译选项开启。

调度的粒度是以sched_entity为粒度的,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
struct sched_entity {
struct load_weight load; /* for load-balancing */
struct rb_node run_node;
struct list_head group_node;
unsigned int on_rq;

u64 exec_start;
u64 sum_exec_runtime;
u64 vruntime;
u64 prev_sum_exec_runtime;

u64 nr_migrations;

#if defined(CONFIG_SMP) && defined(CONFIG_FAIR_GROUP_SCHED)
/* Per-entity load-tracking */
struct sched_avg avg;
#endif
#ifdef CONFIG_SCHEDSTATS
struct sched_statistics statistics;
#endif

#ifdef CONFIG_FAIR_GROUP_SCHED
struct sched_entity *parent;
/* rq on which this entity is (to be) queued: */
struct cfs_rq *cfs_rq;
/* rq "owned" by this entity/group: */
struct cfs_rq *my_q;
#endif
};

每个调度实体都有两个cfs_rq结构

1
2
3
4
5
6
7
   struct cfs_rq {
struct load_weight load;
unsigned long runnable_load_avg;
unsigned long blocked_load_avg;
unsigned long tg_load_contrib;
/* ... */
};

Each scheduling entity may, in turn, be queued on a parent scheduling entity’s run queue. At the lowest level of this hierarchy, the scheduling entity is a task; the scheduler traverses this hierarchy until the end when it has to pick a task to run on the CPU.

最底层的调度实体就是进程,而每个调度实体还会有两个cfs_rq,一个是cfs_rq另一个是my_q,前者是当前调度实体从属的rq,后者他自己的rq,所有的子调度实体都在这个rq上,从而构成了树形结构。可以通过cfs_rq遍历调度实体,而把自己的时间平分给my_q的调度实体.

1
2
3
4
5
6
7
   struct task_group {
struct sched_entity **se;
struct cfs_rq **cfs_rq;
unsigned long shares;
atomic_long_t load_avg;
/* ... */
};

Tasks belonging to a group can be scheduled on any CPU. Therefore it is not sufficient for a group to have a single scheduling entity; instead, every group must have one scheduling entity for each CPU. Tasks belonging to a group must move between the run queues in these per-CPU scheduling entities only, so that the footprint of the task is associated with the group even during task migrations.

单独的sched_entity为了适应SMP结构,又引入了task_group结构,包含了数组,分别属于某个CPU,对于一个进程想要从CPU1迁移到CPU2,只要把进程从tg->cfs_rq[0]转移到tg->cfs_rq[1]即可,一种 percpu 的结构。

优先级有一个映射表,表示调度占的权重,一般nice值为0的时候,大家都是1024,但是nice值为1的时候,权重就会降低到820,对于所有1024权重的进程,就会享有更少的时间,这个映射体现的是每提升一个等级,相差值大约为10%。

下面是 nice 值到权重的映射,这是内核普通进程的优先级范围(100-139),内核拥有140个优先级。

1
2
3
4
5
6
7
8
9
10
static const int prio_to_weight[40] = {
/* -20 */ 88761, 71755, 56483, 46273, 36291,
/* -15 */ 29154, 23254, 18705, 14949, 11916,
/* -10 */ 9548, 7620, 6100, 4904, 3906,
/* -5 */ 3121, 2501, 1991, 1586, 1277,
/* 0 */ 1024, 820, 655, 526, 423,
/* 5 */ 335, 272, 215, 172, 137,
/* 10 */ 110, 87, 70, 56, 45,
/* 15 */ 36, 29, 23, 18, 15,
};

调度实体中包含了一个结构就是表示这个权重的值,表示进程占的权重。

1
2
3
   struct load_weight {
unsigned long weight;
};

最后 time_slice = (sched_period() * se.load.weight) / cfs_rq.load.weight; 就是 se 运行时应该分配到的 CPU 时间的份额(sched_periodcfs.nr_running调度最小粒度时间,理想要每个进程都能运行一次)。

另外,用于衡量CPU负载的方式是通过sched_entity中的sched_avg结构,这个结构用于记录负载情况:

1
2
3
4
struct sched_avg {
u32 runnable_sum, runnable_avg_period;
unsigned long load_avg_contrib;
};

sched_entity是一个进程的时候,计算公式是sa.load_avg_contrib = (sa.runnable_sum * se.load.weight) / sa.runnable_period;(sesched_entitysasched_avg),runnbale_sum 是处于 RUNNING 状态的时间,runnable_period 表示可以变成运行状态的时间段。runnable_load_avgcfs_rq中用于统计所有seload的合,以此来表示CPU负载。blocked_load_avg 是相应的进程处于阻塞状态的负载。

sched_entity是一组进程的时候,计算方式是,
首先提取task group

1
tg = cfs_rq->tg;

之前已经统计了队列中的所有se的总和runnable_load_avg,然后全部累加到tg中。

1
2
cfs_rq->tg_load_contrib = cfs_rq->runnable_load_avg + cfs_rq->blocked_load_avg;
tg->load_avg += cfs_rq->tg_load_contrib;

最后se的值是通过在tg中的比重得到的,这里的tg->shares是最大允许的负载。

1
2
se->avg.load_avg_contrib =
(cfs_rq->tg_load_contrib * tg->shares / tg->load_avg);

进程选择

完全公平调度器的进程选择其实很简单,通过调用pick_next_entity()每次选择 vruntime 最小的进程进行运行。装载进程的结构选择的是红黑树,并且最左下角的结点是个特殊的节点,被保存起来,这样防止每次都从 root 一直搜索到最左下角,每次选择进程的时候直接选择该节点即可。

每次 vruntime 会在时钟中断和任何进程运行状态发生改变的时候进行计算,方式是通过权重调整得到一个 delta 值然后加到 vruntime 上面。
公式是vruntime += delta_exec * (NICE_0_LOAD/curr->load.weight);。这里的 weight 取决于 shares 值和负载等等因素的综合结果。

通过enqueue_entity()可以把进程加入到红黑树当中,当进程被唤醒的时候,或者第一次调用fork的时候就会被调用这个函数。具体就是更新了统计数据,并且把调度节点插入到红黑树当中。如果正好插入到了最右下角,那么就能马上被运行了。

通过dequeue_entity()可以把调度结点从红黑树中删除,这是进程在阻塞或者终止的时候会被调用的函数,具体就是把调度节点移除红黑树并且调整红黑树。

调度入口

内核的调度入口就是schedule(),遍历所有的调度类(因为内核中调度器的实现不只一种),选择权重最高的调度类并且进行进程选择,然后执行该进程。

这里列举一下所有抢占可能发生的时机

  1. 用户态进程:
    • 从系统调用返回
    • 从中断返回
  2. 内核态进程:
    • 从中断返回内核态
    • 进程主动调用schedule()
    • 进程变为可抢占状态(没有持有锁,其实还是中断驱动的)
    • 进程阻塞(最后还是调用schedule)

具体看 cgroup 的 cpu 子系统

补充完调度器的知识,再回来看 cgroup 是如何对进程做运行时间限制的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static struct cftype cpu_files[] = {
#ifdef CONFIG_FAIR_GROUP_SCHED
{
.name = "shares",
.read_u64 = cpu_shares_read_u64,
.write_u64 = cpu_shares_write_u64,
},
#endif
#ifdef CONFIG_CFS_BANDWIDTH
{
.name = "cfs_quota_us",
.read_s64 = cpu_cfs_quota_read_s64,
.write_s64 = cpu_cfs_quota_write_s64,
},
{
.name = "cfs_period_us",
.read_u64 = cpu_cfs_period_read_u64,
.write_u64 = cpu_cfs_period_write_u64,
},
{
.name = "stat",
.read_map = cpu_stats_show,
},
#endif

cpu 子系统也是用对文件进行读写的接口,其实就是获取了 cgroup 的 subsystem 的从属的 task_group,并且读取或者设置了 quota_usperiod_write 以及 shares 属性。具体这些属性应用的地方在调度器内部。task_group是一个管理组调度的结构。

因为内嵌了一个cgroup_subsys_state,这样cgroup就能通过自己的css成员反找到这个task_group

看一下cpu_shares_write_u64的实现

1
2
3
cpu_shares_write_u64 实际调用的是
-> sched_group_set_shares(cgroup_tg(cgrp), scale_load(shareval))
-> update_cfs_shares 获取 cgroup 的task group结构,调整权重
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void update_cfs_shares(struct cfs_rq *cfs_rq)
{
struct task_group *tg;
struct sched_entity *se;
long shares;

tg = cfs_rq->tg;
se = tg->se[cpu_of(rq_of(cfs_rq))];
if (!se || throttled_hierarchy(cfs_rq))
return;
#ifndef CONFIG_SMP
if (likely(se->load.weight == tg->shares))
return;
#endif
/* 根据tg->shares 和 rq 的负载计算出新的权重 */
shares = calc_cfs_shares(cfs_rq, tg);

reweight_entity(cfs_rq_of(se), se, shares);
}

最后一步 reweight_entity 就是调整se->load.weight的权重,从这里来保证shares能够调整进程可以获得的运行时间。当然除了这里,任何发生调度的地方都会有这样的行为,只不过我们主动修改了shares的值。

下图表示了展示了sharestask group中的作用。

对于cpu.cfs_period_uscpu.cfs_quota_us,是关于CPU bandwith的内容,论文CPU bandwidth control for CFS详细描述了其中的设计。论文中举例提到,shares 值只是使得CPU 的时间能够平均分配,但是实际运行时间可能会有变化,不能限制一个进程运行的上限。

在调度实体sched_entity中内嵌了一个结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct cfs_bandwidth {
#ifdef CONFIG_CFS_BANDWIDTH
raw_spinlock_t lock;
ktime_t period;
/* quota 是被赋予的时间,runtime 是实际运行的时间 */
u64 quota, runtime;
s64 hierarchal_quota;
/* 到期时间 */
u64 runtime_expires;

int idle, timer_active;
struct hrtimer period_timer, slack_timer;
struct list_head throttled_cfs_rq;

/* statistics */
int nr_periods, nr_throttled;
u64 throttled_time;
#endif
};

在每次调度的时候(无论是时间中断还是其他调度时间导致 enqueue 或者 dequeue),都会调用account_cfs_rq_runtime(),runtime 相当于实际使用的 quota , 在论文里说的是account_cfs_rq_quota(),对占用时间更新,计算剩余可以运行的时间,如果不够,则进行限制,标记为不可调度。其中内含一个高精度定时器period_timer定时扫描进程,把限制的进程解除,并给予更多的bandwidth以继续运行,period就是计时器的周期,每次都会更新可运行的时间。注意这里用的时间是真实时间.

另外,cpu.stat 主要是控制过程中的统计信息,是只读属性,比如被限制了多少次等等,具体的代码分析就直接略过。

cpuacct 子系统

cpuacct 比较简单,因为主要是一些统计信息

  • cpuacct.stat cgroup 及子消耗在用户态和内核态的CPU循环次数
  • cpuacct.usage cgroup 消耗的CPU总时间
  • cpuacct.usage_percpu cgroup在每个CPU上消耗的总时间

kernel/sched/cpuacct.c下有具体实现。

1
2
3
4
5
6
7
/* track cpu usage of a group of tasks and its child groups */
struct cpuacct {
struct cgroup_subsys_state css;
/* cpuusage holds pointer to a u64-type object on every cpu */
u64 __percpu *cpuusage;
struct kernel_cpustat __percpu *cpustat;
};

其中定义了per-cpu结构,让每个CPU都独占了一个用于统计的值,算是CPU的私有变量。

接口如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static struct cftype files[] = {
{
.name = "usage",
.read_u64 = cpuusage_read,
.write_u64 = cpuusage_write,
},
{
.name = "usage_percpu",
.read_seq_string = cpuacct_percpu_seq_read,
},
{
.name = "stat",
.read_map = cpuacct_stats_show,
},
{ } /* terminate */
};

每次调度update_curr,都会调用cpuacct_charge更新cpuacct中的值,作为统计数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* charge this task's execution time to its accounting group.
*
* called with rq->lock held.
*/
void cpuacct_charge(struct task_struct *tsk, u64 cputime)
{
struct cpuacct *ca;
int cpu;
/* 获取当前task属于的cpu */
cpu = task_cpu(tsk);

rcu_read_lock();
/* task的cpuacct结构 */
ca = task_ca(tsk);
/* 所有父节点的值都应该相应变化
* 上溯父节点更新统计值
*/
while (true) {
u64 *cpuusage = per_cpu_ptr(ca->cpuusage, cpu);
*cpuusage += cputime;

ca = parent_ca(ca);
if (!ca)
break;
}

rcu_read_unlock();
}

在时钟中断的时候会最终调用cpuacct_account_field()来更新kcpustat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* Add user/system time to cpuacct.
*
* Note: it's the caller that updates the account of the root cgroup.
*/
void cpuacct_account_field(struct task_struct *p, int index, u64 val)
{
struct kernel_cpustat *kcpustat;
struct cpuacct *ca;

rcu_read_lock();
ca = task_ca(p);
while (ca != &root_cpuacct) {
kcpustat = this_cpu_ptr(ca->cpustat);
kcpustat->cpustat[index] += val;
ca = __parent_ca(ca);
}
rcu_read_unlock();
}

cpuacct 算是一个简单的子系统,多是统计信息。

cpuset 子系统

cpuset 子系统用于分配独立的内存节点和CPU节点,这个主要应用与NUMA结构里面,多内存节点属于结构,先看一下cpuset的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct cpuset {
struct cgroup_subsys_state css;

unsigned long flags; /* "unsigned long" so bitops work */
cpumask_var_t cpus_allowed; /* CPUs allowed to tasks in cpuset */
nodemask_t mems_allowed; /* Memory Nodes allowed to tasks */

struct fmeter fmeter; /* memory_pressure filter */

/*
* Tasks are being attached to this cpuset. Used to prevent
* zeroing cpus/mems_allowed between ->can_attach() and ->attach().
*/
int attach_in_progress;

/* partition number for rebuild_sched_domains() */
int pn;

/* for custom sched domain */
int relax_domain_level;

struct work_struct hotplug_work;
};
  • cpuset.cpus cpu结点限制
  • cpuset.mems 内存结点限制
  • cpuset.memory_migrate 内存结点改变是否迁移
  • cpuset.cpu_exclusive 指定的限制是否是独享的,除了父节点或者子节点,不会和其他cpuset有交集
  • cpuset.mem_exclusive 指定的限制是否是独享的,除了父节点或者子节点,不会和其他cpuset有交集
  • cpuset.memory_pressure 换页压力的比率统计
  • cpuset.mem_hardwall 限制内核内存分配的结点,mems是限制用户态的分配
  • cpuset.memory_spread_page 把page cache分散到分配的各个结点中,而不是当前运行的结点.
  • cpuset.memory_spread_slab 把fs相关的slab的对象(inode和dentry)分散到结点中.
  • cpuset.sched_load_balance 打开调度CPU的负载均衡,这里指的是cpuset拥有的sched_domain,默认全局的CPU调度是本来就有负载均衡的。
  • cpuset.sched_relax_domain_level
  • cpuset.memory_pressure_enabled 计算换页压力的开关,注意,这个属性在top_group里面才有

cpus_allowedmems_allowed就是允许分配的内存节点和CPU节点的掩码

分配内存的时候调用栈是alloc_pages()->alloc_pages_current()->__alloc_pages_nodemask(),直到寻找可分配结点的时候会调用 zref_in_nodemask 来判断是否可以分配在该结点。

1
2
3
4
5
6
7
8
9
static inline int zref_in_nodemask(struct zoneref *zref, nodemask_t *nodes)
{
#ifdef CONFIG_NUMA
return node_isset(zonelist_node_idx(zref), *nodes);
#else
return 1;
#endif /* CONFIG_NUMA */
}

从这个函数也可以看到如果编译选项带了CONFIG_NUMA才会起作用,不然返回的永远都是真。

分散file cacheslab cache的方式是通过设置标志位来实现的。

Setting the flag ‘cpuset.memory_spread_page’ turns on a per-process flag
PFA_SPREAD_PAGE for each task that is in that cpuset or subsequently
joins that cpuset. The page allocation calls for the page cache
is modified to perform an inline check for this PFA_SPREAD_PAGE task
flag, and if set, a call to a new routine cpuset_mem_spread_node()
returns the node to prefer for the allocation.

Similarly, setting ‘cpuset.memory_spread_slab’ turns on the flag
PFA_SPREAD_SLAB, and appropriately marked slab caches will allocate
pages from the node returned by cpuset_mem_spread_node().

内存分配向结点的传播,都是通过设置标志PFA_SPREAD_PAGE或者PFA_SPREAD_SLAB来标记的,这个时候对应的函数cpuset_mem_spread_nodecpuset_mem_spread_node会返回希望分配的结点,举个例子,cpuset_mem_spread_node会从允许的节点中随机返回一个值,以达到分配对象分散在结点当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int cpuset_spread_node(int *rotor)
{
int node;

node = next_node(*rotor, current->mems_allowed);
if (node == MAX_NUMNODES)
node = first_node(current->mems_allowed);
*rotor = node;
return node;
}

int cpuset_mem_spread_node(void)
{
if (current->cpuset_mem_spread_rotor == NUMA_NO_NODE)
current->cpuset_mem_spread_rotor =
node_random(&current->mems_allowed);

return cpuset_spread_node(&current->cpuset_mem_spread_rotor);
}

对于CPU结点的控制是通过修改cpus_allowed来控制的,在task被唤醒的时候选择运行的rq时就会对掩码做判断,这是调度类需要实现的接口select_task_rq,比如CFS的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
* The caller (fork, wakeup) owns p->pi_lock, ->cpus_allowed is stable.
*/
static inline
int select_task_rq(struct task_struct *p, int sd_flags, int wake_flags)
{
int cpu = p->sched_class->select_task_rq(p, sd_flags, wake_flags);

/*
* In order not to call set_task_cpu() on a blocking task we need
* to rely on ttwu() to place the task on a valid ->cpus_allowed
* cpu.
*
* Since this is common to all placement strategies, this lives here.
*
* [ this allows ->select_task() to simply return task_cpu(p) and
* not worry about this generic constraint ]
*/
if (unlikely(!cpumask_test_cpu(cpu, tsk_cpus_allowed(p)) ||
!cpu_online(cpu)))
cpu = select_fallback_rq(task_cpu(p), p);

return cpu;
}

load_balance 设置的是cpusetCS_SCHED_LOAD_BALANCE标志,之后会调用update_cpumask,这个标志的更新会调用rebuild_sched_domains_locked(),会引起sched_domain的分配。当然这不是唯一的sched_domain重新划分的触发点,触发点有一下几点。

  1. 绑定了CPU并且该标记改变
  2. 这个标记为enable,绑定CPU发生改变
  3. 绑定了CPU,这个标记为enable,标记cpuset.sched_relax_domain_level发生改变
  4. 绑定了CPU,并且该标记设置了,但是cpuset被删除了
  5. CPU 转变 offline/online 状态

简单说一下[sched_domain](https://www.ibm.com/developerworks/cn/linux/l-cn-schldom/ sched domain 的详细内容)的作用,其实就是划定了负载均衡的 CPU 范围,默认是有一个全局的sched_domain,对所有 CPU 做负载均衡的,现在再划分出一个sched_domain把 CPU 的某个子集作为负载均衡的单元。
每个 Scheduling Domain 其实就是具有相同属性的一组 CPU 的集合. 并且跟据 Hyper-threading, Multi-core, SMP, NUMA architectures 这样的系统结构划分成不同的级别,不同级之间通过指针链接在一起, 从而形成一种的树状的关系, 如下图所示。

调度器会调用partition_sched_domains()来更新自己的scehd_domains,调度域发生作用的地方是在时钟中断的时候会触发SCHED_SOFTIRQ对任务做迁移,或者p->sched_class->select_task_rq,会在选择运行 CPU 时进行抉择,看一下 CFS 的实现的select_task_rq的简化流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 向上遍历更高层次的domain,如果发现同属一个domain
// 就是affine目标
for_each_domain(cpu, tmp) {
/*
* If both cpu and prev_cpu are part of this domain,
* cpu is a valid SD_WAKE_AFFINE target.
*/
if (want_affine && (tmp->flags & SD_WAKE_AFFINE) &&
cpumask_test_cpu(prev_cpu, sched_domain_span(tmp))) {
affine_sd = tmp;
break;
}

if (tmp->flags & sd_flag)
sd = tmp;
}
// 如果上面的条件满足,从prev_cpu中选出一个idle的new_cpu来运行.
if (affine_sd) {
if (cpu != prev_cpu && wake_affine(affine_sd, p, sync))
prev_cpu = cpu;
// 在同一个级别的sched domain向下找到一个idle的CPU.
new_cpu = select_idle_sibling(p, prev_cpu);
// 快速路径,有idle的CPU就不用负载均衡了.
goto unlock;
}
// 遍历层级
while (sd) {
// 找到负载最小的CPU
group = find_idlest_group(sd, p, cpu, load_idx);
if (!group) {
sd = sd->child;
continue;
}

new_cpu = find_idlest_cpu(group, p, cpu);
/* 如果最闲置的CPU没有变的话,或者没有找到的话,就向下遍历.
if (new_cpu == -1 || new_cpu == cpu) {
/* Now try balancing at a lower domain level of cpu */
sd = sd->child;
continue;
}

/* Now try balancing at a lower domain level of new_cpu */
cpu = new_cpu;
weight = sd->span_weight;
sd = NULL;
// 如果选出的节点weight比其他节点都大的话.
// 再向下一个层级遍历.
for_each_domain(cpu, tmp) {
if (weight <= tmp->span_weight)
break;
if (tmp->flags & sd_flag)
sd = tmp;
}
/* while loop will break here if sd == NULL */
}


负载均衡的对象有个例外。

CPUs in “cpuset.isolcpus” were excluded from load balancing by the
isolcpus= kernel boot option, and will never be load balanced regardless
of the value of “cpuset.sched_load_balance” in any cpuset.

如果boot选项标记了该CPU,会无视sched_load_balance的设置。

cpuset.sched_relax_domain_level有几个等级,越大越优先,表示迁移时搜索CPU的范围,这个主要开启了负载均衡选项的时候才有用。

  • -1 : no request. use system default or follow request of others. 用默认的或者按照其他组的优先级来.
  • 0 : no search,不搜索.
  • 1 : search siblings (hyperthreads in a core,搜索CPU当中的超线程).
  • 2 : search cores in a package.(搜索CPU当中的核).
  • 3 : search cpus in a node [= system wide on non-NUMA system]
  • 4 : search nodes in a chunk of node [on NUMA system]
  • 5 : search system wide [on NUMA system]

memory 子系统

看完 cpu 部分再开看一下内存子系统是如何做限制的

memory 子系统的参数比较多

  • memory.usage_in_bytes # 当前内存中的 res_counter 使用量
  • memory.memsw.usage_in_bytes # 当前内存和交换空间中的 res_counter 使用量
  • memory.limit_in_bytes # 设置/读取 内存使用量
  • memory.memsw.limit_in_bytes # 设置/读取 内存加交换空间使用量
  • memory.failcnt # 读取内存使用量被限制的次数
  • memory.memsw.failcnt # 读取内存和交换空间使用量被限制的次数
  • memory.max_usage_in_bytes # 最大内存使用量
  • memory.memsw.max_usage_in_bytes # 最大内存和交换空间使用量
  • memory.soft_limit_in_bytes # 设置/读取内存的soft limit
  • memory.stat # 统计信息
  • memory.use_hierarchy # 设置/读取 层级统计的使能
  • memory.force_empty # trigger forced move charge to parent?
  • memory.pressure_level # 设置内存压力通知
  • memory.swappiness # 设置/读取 vmscan swappiness 参数?
  • memory.move_charge_at_immigrate # 设置/读取 controls of moving charges?
  • memory.oom_control # 设置/读取 内存超限控制信息
  • memory.numa_stat # 每个numa节点的内存使用数量
  • memory.kmem.limit_in_bytes # 设置/读取 内核内存限制的hard limit
  • memory.kmem.usage_in_bytes # 读取当前内核内存的分配
  • memory.kmem.failcnt # 读取当前内核内存分配受限的次数
  • memory.kmem.max_usage_in_bytes # 读取最大内核内存使用量
  • memory.kmem.tcp.limit_in_bytes # 设置tcp 缓存内存的hard limit
  • memory.kmem.tcp.usage_in_bytes # 读取tcp 缓存内存的使用量
  • memory.kmem.tcp.failcnt # tcp 缓存内存分配的受限次数
  • memory.kmem.tcp.max_usage_in_bytes # tcp 缓存内存的最大使用量

对于大部分的数据是通过res_counter来保存的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* The core object. the cgroup that wishes to account for some
* resource may include this counter into its structures and use
* the helpers described beyond
*/

struct res_counter {
/*
* the current resource consumption level
*/
unsigned long long usage;
/*
* the maximal value of the usage from the counter creation
*/
unsigned long long max_usage;
/*
* the limit that usage cannot exceed
*/
unsigned long long limit;
/*
* the limit that usage can be exceed
*/
unsigned long long soft_limit;
/*
* the number of unsuccessful attempts to consume the resource
*/
unsigned long long failcnt;
/*
* the lock to protect all of the above.
* the routines below consider this to be IRQ-safe
*/
spinlock_t lock;
/*
* Parent counter, used for hierarchial resource accounting
*/
struct res_counter *parent;
};

获取方式是通过该结构相关的封装接口提供的,比如mem_cgroup_usage就是通过res_counter_red_u64来获取对应的res_counterRES_USAGE对应的值的,也就是unsigned long long usage这个成员。(如果不是root,还会递归获取rss和page cache的合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static inline u64 mem_cgroup_usage(struct mem_cgroup *memcg, bool swap)
{
u64 val;

if (!mem_cgroup_is_root(memcg)) {
if (!swap)
return res_counter_read_u64(&memcg->res, RES_USAGE);
else
return res_counter_read_u64(&memcg->memsw, RES_USAGE);
}

/*
* Transparent hugepages are still accounted for in MEM_CGROUP_STAT_RSS
* as well as in MEM_CGROUP_STAT_RSS_HUGE.
*/
// 如果是root就把所有的内存使用量都算进来.
val = mem_cgroup_recursive_stat(memcg, MEM_CGROUP_STAT_CACHE);
val += mem_cgroup_recursive_stat(memcg, MEM_CGROUP_STAT_RSS);

if (swap)
val += mem_cgroup_recursive_stat(memcg, MEM_CGROUP_STAT_SWAP);

return val << PAGE_SHIFT;
}

struct mem_cgroup 是负责内存 cgroup 的结构,简化的表示是

1
2
3
4
5
6
7
8
struct mem_cgroup {
struct cgroup_subsys_state css; // 通过css关联cgroup.
struct res_counter res; // mem统计变量
res_counter memsw; // mem+sw的和
struct res_counter kmem; // 内核内存统计量
...
}

这些参数的入口都在mm/memcontrol.c下,比如说memory.usage_in_bytes的读取调用的是mem_cgroup_read函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static ssize_t mem_cgroup_read(struct cgroup *cont, struct cftype *cft,
struct file *file, char __user *buf,
size_t nbytes, loff_t *ppos)
{
// 获取cgroup对应的mem_cgroup.
struct mem_cgroup *memcg = mem_cgroup_from_cont(cont);
char str[64];
u64 val;
int name, len;
enum res_type type;

// 获取读取的类型,memory.usage_in_bytes就是_MEM
type = MEMFILE_TYPE(cft->private);
// 名称
name = MEMFILE_ATTR(cft->private);

switch (type) {
case _MEM:
if (name == RES_USAGE)
val = mem_cgroup_usage(memcg, false);
else
val = res_counter_read_u64(&memcg->res, name);
break;
case _MEMSWAP:
if (name == RES_USAGE)
val = mem_cgroup_usage(memcg, true);
else
val = res_counter_read_u64(&memcg->memsw, name);
break;
case _KMEM:
val = res_counter_read_u64(&memcg->kmem, name);
break;
default:
BUG();
}

len = scnprintf(str, sizeof(str), "%llu\n", (unsigned long long)val);
return simple_read_from_buffer(buf, nbytes, ppos, str, len);
}

接下来再看一下这些值是在什么时候统计的,统计的入口是mem_cgroup_charge_common(),如果统计值超过限制就会在cgroup内进行回收。调用者分别是缺页时调用的mem_cgroup_newpage_charge 和 page cache 相关的mem_cgroup_cache_charge

简单复习一下内存分配的过程,来自维基百科

创建进程fork(), 程序载入execve(), 映射文件mmap(), 动态内存分配 malloc()/brk() 等进程相关操作都需要分配内存给进程。不过这时进程申请和获得的还不是实际内存,而是虚拟内存,准确的说是“内存区域”。进程对内存区域的分配最终都会归结到 do_mmap() 函数上来(brk调用被单独以系统调用实现,不用do_mmap()),内核使用 do_mmap() 函数创建一个新的线性地址区间。但是说该函数创建了一个新 VMA 并不非常准确,因为如果创建的地址区间和一个已经存在的地址区间相邻,并且它们具有相同的访问权限的话,那么两个区间将合并为一个。如果不能合并,那么就确实需要创建一个新的 VMA 了。但无论哪种情况,do_mmap() 函数都会将一个地址区间加入到进程的地址空间中–无论是扩展已存在的内存区域还是创建一个新的区域。同样,释放一个内存区域应使用函数 do_ummap(),它会销毁对应的内存区域。当进程需要内存时,从内核获得的仅仅是虚拟的内存区域,而不是实际的物理地址,进程并没有获得物理内存,获得的仅仅是对一个新的线性地址区间的使用权。实际的物理内存只有当进程真的去访问新获取的虚拟地址时,才会由”请求页机制”产生”缺页”异常,从而进入分配实际页面的例程

和下面来自内核文档

与用户进程相似,内核也有一个名为 init_mm 的 mm_strcut 结构来描述内核地址空间,其中页表项 pdg=swapper_pg_dir包含了系统内核空间(3G-4G)的映射关系。因此 vmalloc 分配内核虚拟地址必须更新内核页表,而kmalloc或get_free_page由于分配的连续内存,所以不需要更新内核页表。[13]

内存页的分配是基于伙伴分配系统,也就是基于2的阶乘通过拆分大阶乘的连续页和合并小阶乘的连续页来管理物理内存的方式,这在任何一本操作系统的书里都会讲到,我之前的博客也详细分析了。

当进程进入缺页异常的时候就会分配具体的物理内存,当物理内存使用超过高水平线以后,换页daemon(kswapd)就会被唤醒用于把内存交换到交换空间以腾出内存,当内存恢复至高水平线以后换页daemon进入睡眠。

缺页异常的入口是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static int __do_fault(struct mm_struct *mm, struct vm_area_struct *vma,
unsigned long address, pmd_t *pmd,
pgoff_t pgoff, unsigned int flags, pte_t orig_pte)
{
pte_t *page_table;
spinlock_t *ptl;
struct page *page;
struct page *cow_page;
pte_t entry;
int anon = 0;
struct page *dirty_page = NULL;
struct vm_fault vmf;
int ret;
int page_mkwrite = 0;

/*
* If we do COW later, allocate page befor taking lock_page()
* on the file cache page. This will reduce lock holding time.
*/
if ((flags & FAULT_FLAG_WRITE) && !(vma->vm_flags & VM_SHARED)) {

if (unlikely(anon_vma_prepare(vma)))
return VM_FAULT_OOM;

/* 分配内存并且映射到内存区间 */
cow_page = alloc_page_vma(GFP_HIGHUSER_MOVABLE, vma, address);
if (!cow_page)
return VM_FAULT_OOM;
/* 进行统计 */
if (mem_cgroup_newpage_charge(cow_page, mm, GFP_KERNEL)) {
page_cache_release(cow_page);
return VM_FAULT_OOM;
}
} else
cow_page = NULL;

mem_cgroup_newpage_charge则调用mem_cgroup_charge_common

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int mem_cgroup_newpage_charge(struct page *page,
struct mm_struct *mm, gfp_t gfp_mask)
{
if (mem_cgroup_disabled())
return 0;
// 不应该关联到页表
VM_BUG_ON(page_mapped(page));
// 对应用户态地址,但是不是匿名页
VM_BUG_ON(page->mapping && !PageAnon(page));
// mm 为空
VM_BUG_ON(!mm);
return mem_cgroup_charge_common(page, mm, gfp_mask,
MEM_CGROUP_CHARGE_TYPE_ANON);
}

mem_cgroup_charge_common内容是,返回ret < 0则是OOM。第一步是调用__mem_cgroup_try_charge查看当前使用量是否超过内存限制,如果超过就进行内存回收。第二步如果成功就调用__mem_cgroup_commit_charge添加统计值 ,不然就返回无法分配内存的错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int mem_cgroup_charge_common(struct page *page, struct mm_struct *mm,
gfp_t gfp_mask, enum charge_type ctype)
{
struct mem_cgroup *memcg = NULL;
unsigned int nr_pages = 1;
bool oom = true;
int ret;
if (PageTransHuge(page)) {
nr_pages <<= compound_order(page);
VM_BUG_ON(!PageTransHuge(page));
/*
* Never OOM-kill a process for a huge page. The
* fault handler will fall back to regular pages.
*/
oom = false;
}

ret = __mem_cgroup_try_charge(mm, gfp_mask, nr_pages, &memcg, oom);
if (ret == -ENOMEM)
return ret;
__mem_cgroup_commit_charge(memcg, page, nr_pages, ctype, false);
return 0;
}

__mem_cgroup_try_charge最终会调用mem_cgroup_do_charge,省略代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static int mem_cgroup_do_charge(struct mem_cgroup *memcg, gfp_t gfp_mask,
unsigned int nr_pages, unsigned int min_pages,
bool invoke_oom)
{
unsigned long csize = nr_pages * PAGE_SIZE;
struct mem_cgroup *mem_over_limit;
struct res_counter *fail_res;
unsigned long flags = 0;
int ret;
// 更新res计数器
ret = res_counter_charge(&memcg->res, csize, &fail_res);

if (likely(!ret)) {
if (!do_swap_account)
return CHARGE_OK;
// 计数成功,如果开启swap计数,记录memsw.
ret = res_counter_charge(&memcg->memsw, csize, &fail_res);
if (likely(!ret))
return CHARGE_OK;
// swap计数失败,退回res的计数
res_counter_uncharge(&memcg->res, csize);
// 获取fail_res对应的memcg,也就是计数失败的memcg.
mem_over_limit = mem_cgroup_from_res_counter(fail_res, memsw);
flags |= MEM_CGROUP_RECLAIM_NOSWAP;
} else
mem_over_limit = mem_cgroup_from_res_counter(fail_res, res);
// 回收内存
ret = mem_cgroup_reclaim(mem_over_limit, gfp_mask, flags);
// 告诉上层重试一次,可能回收了一些内存
if (nr_pages <= (1 << PAGE_ALLOC_COSTLY_ORDER) && ret)
return CHARGE_RETRY;
if (invoke_oom)
// 进入oom的处理
mem_cgroup_oom(mem_over_limit, gfp_mask, get_order(csize));

mem_cgroup_reclaim的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static unsigned long mem_cgroup_reclaim(struct mem_cgroup *memcg,
gfp_t gfp_mask,
unsigned long flags)
{
unsigned long total = 0;
bool noswap = false;
int loop;

if (flags & MEM_CGROUP_RECLAIM_NOSWAP)
noswap = true;
if (!(flags & MEM_CGROUP_RECLAIM_SHRINK) && memcg->memsw_is_minimum)
noswap = true;

for (loop = 0; loop < MEM_CGROUP_MAX_RECLAIM_LOOPS; loop++) {
if (loop)
drain_all_stock_async(memcg);
total += try_to_free_mem_cgroup_pages(memcg, gfp_mask, noswap);
/*
* Allow limit shrinkers, which are triggered directly
* by userspace, to catch signals and stop reclaim
* after minimal progress, regardless of the margin.
*/
if (total && (flags & MEM_CGROUP_RECLAIM_SHRINK))
break;
if (mem_cgroup_margin(memcg))
break;
/*
* If nothing was reclaimed after two attempts, there
* may be no reclaimable pages in this hierarchy.
*/
if (loop && !total)
break;
}
return total;
}

RSSpage_fault的时候记录,page cache是插入到inoderadix-tree中才记录的。
RSS在完全unmap的时候减少计数,page cachepage在离开inoderadix-tree才减少计数。
即使RSS完全unmap,也就是被kswapd给换出,可能作为SwapCache存留在系统中,除非不作为SwapCache,不然还是会被计数。
一个换入的page不会马上计数,只有被map的时候才会,当进行换页的时候,会预读一些不属于当前进程的page,而不是通过page fault,所以不在换入的时候计数。

补充:

  • why ‘memory+swap’ rather than swap.
    The global LRU(kswapd) can swap out arbitrary pages. Swap-out means
    to move account from memory to swap…there is no change in usage of
    memory+swap. In other words, when we want to limit the usage of swap without
    affecting global LRU, memory+swap limit is better than just limiting swap from
    an OS point of view.[12]

使用memoery+swap来统计而不是光统计swap,是因为kswapd换出的page只是从内存到了交换空间而已 ,在不影响kswpad的单页内存池LRU的情况下,这样的统计更有意义。

内核内存是不会被换出的。只有在被限制的时候才会开始计数。并且限制不能在已经有进程或者有子cgroup的情况下设置。
计数部分

When use_hierarchy == 1 and a group is accounted, its children will
automatically be accounted regardless of their limit value.

总结

目前总结了调度相关和内存相关的 cgroup 代码,可以看出 cgroup 本身其实主要是在一些 hook 的地方做检查,真正的控制的执行者还是调度器和内存分配器本身,cgroup 只是统计数据并且在必要的时候触发调度和内存回收等等。接下来我会就网络的部分进行一些分析,希望能够把完整的各个 cgroup 的子系统都能够解析一下。

参考:

  1. 《Docker 进阶与实战》
  2. http://abcdxyzk.github.io/download/kernel/Linux_Physical_Memory_Page_Allocation.pdf

首先介绍一下 hugepage 的背景。

一般来说操作系统分配内存的最小单元是页,一般是 4KB 大小,但是这个页放到现在来说可能有点“不够用”,因为很多程序内存消耗很大,分配内存很频繁,所以选择更大的页可以提升性能,大页带来的好处很多,首先是页表的层次可以减少,增加访存的速度,其次是减少 TLB miss 的概率,同时 page fault 也会减少,减少到 hugepage size / 4KB (x86_64 一般有 2MB 的 hugepage 和 1GB 的 hugepage)。例如下面这张图就说明了 hugepage 带来的改变。文章中的代码使用的是 4.x 的内核版本。

hugepage 有两种类型,一种是 THP(Transparent Huge Page) ,顾名思义,就是对用户来说对这种大页是无感知的,它本身可以被分成 4KB 的小页,并且可以被 swap out,有一个 Khugepaged 周期性扫描 4KB 的页合并成大页。

另一种大页是 persistent hugepage,这种 page 是预先分配的并且不能拆分成 4KB 小页,而且不能 swap out。这种的隐患是可能在内存 fragmentation 太多分不出大页的时候压缩小页,这在内存分配有压力的时候会造成很大的性能影响。

使用 persistent hugepage 可以通过 echo 512 | sudo tee /sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages,这会预先分配 512 个 2MB 的大页。具体使用是通过 fs/hugetlbfs 下实现的hugetlbfs来实现的。应用程序需要通过mmap进行文件映射来使用这些大页。具体的使用方式内核附带的一个测试可以作为参考。

这个 echo 触发的是 sysfsnr_hugepages_store_common, 它会设置最大的 hugepage 个数(存在 hstate 中,一个保存 hugepage 状态的结构体),h->max_huge_pages = set_max_huge_pages(h, count, nodes_allowed);set_max_huge_pages 有一个副作用就是调用 alloc_fresh_huge_page 来(分配或者减少)大页以达到 count 个。比如说分配会调用 ret = alloc_fresh_huge_page(h, nodes_allowed);,然后加入到 hstate 的 freelist 当中, 减少则是相反的,如果 freelist 上没有就会触发 buddysystem 的 __alloc_buddy_huge_page_no_mpol

我们来看一下具体的实现,首先关键的结构体是 hugetlbfs_file_operations , 其中规定了 mmap 函数,也就是当我打开 hugetlbfs 文件系统下的文件对对应的 fd 调用 mmap 的时候触发的对应的函数。

1
2
3
4
5
6
7
8
9
const struct file_operations hugetlbfs_file_operations = {
.read_iter = hugetlbfs_read_iter,
.mmap = hugetlbfs_file_mmap,
.fsync = noop_fsync,
.get_unmapped_area = hugetlb_get_unmapped_area,
.llseek = default_llseek,
.fallocate = hugetlbfs_fallocate,
};

在解释 mmap 之前,先复习一下进程空间内存的相关的内容。进程的内存空间是通过mm_struct这个结构体管理的。进程的地址空间,基本上是一些松散的区间,每个区间有相同的功能和保护属性(只读等属性),这个区间用 vm_area_struct 表示。再来看 hugetlbfs_file_mmap 中的一段代码。把申请的虚拟空间的地址长度按照大页对齐以后,保留对应个数的大页。

1
2
3
4
5
6
7
8
if (hugetlb_reserve_pages(inode,
vma->vm_pgoff >> huge_page_order(h),
len >> huge_page_shift(h), vma,
vma->vm_flags))
goto out;

ret = 0;

hugetlb_reserve_pages 要处理两种逻辑,如果是 VM_MAYSHARE 就从 inode 中取出 resv_map 并且获取分配长度,不然就使用vma的长度,然后用 hugepage_subpool_get_pageshugepage_subpool 中减掉对应的spool->rsv_hpages, 这个个人感觉也不是池子,只是一个统计数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct hstate *h = hstate_inode(inode); // 获取 hugepage state,里面保存了 hugepage 的相关信息
struct hugepage_subpool *spool = subpool_inode(inode); // 写获取 hugepage 的 pool, pool 其实只是个数字 rsv_pages。
...
// 然后根据 flag 算出要使用的长度 chg,我猜应该是 charge 的缩写。
if (!vma || vma->vm_flags & VM_MAYSHARE) {
resv_map = inode_resv_map(inode);

chg = region_chg(resv_map, from, to);

} else {
resv_map = resv_map_alloc();
if (!resv_map)
return -ENOMEM;

chg = to - from;

set_vma_resv_map(vma, resv_map);
set_vma_resv_flags(vma, HPAGE_RESV_OWNER);
}
...
// 最后从 hugepage 的 pool 减掉对应的个数。
gbl_reserve = hugepage_subpool_get_pages(spool, chg);
// 确定有足够的 hugepage 如果没有就从 buddysystem 里面取出来。
ret = hugetlb_acct_memory(h, gbl_reserve);
// 把 page 和 vma 做映射
region_add(resv_map, from, to);

do_page_fault 一直调用到 __handle_mm_fault 的时候,如果对应的 vma 是大页分配的,会直接进到hugetlb_fault,最后在缺页的时候会调用hugetlb_no_page,然后调用 alloc_huge_page_node,就会看到 __alloc_buddy_huge_page,开始走 buddy system, buddy system ,这里的 buddy system 没有细说,可以参考我之前的一篇文章

补充:

你可以理解mm_struct管理着一个vm_area_struct的链表,而 vm_area_struct 对应的主要操作函数如下(还有很多,这里只列了和本文相关的函数指针)。

1
2
3
4
5
6
struct vm_operations_struct {
void (*open)(struct vm_area_struct * area);
void (*close)(struct vm_area_struct * area);
int (*fault)(struct vm_area_struct *vma, struct vm_fault *vmf);
};

对应到 hugetlbfs 就是如下的 hugetlb_vm_ops 它会在对文件 mmap 的时候,进行初始化vma->vm_ops = &hugetlb_vm_ops;

1
2
3
4
5
6
const struct vm_operations_struct hugetlb_vm_ops = {
.fault = hugetlb_vm_op_fault,
.open = hugetlb_vm_op_open,
.close = hugetlb_vm_op_close,
};

但是注意,hugetlb_vm_op_fault 如果被触发说明有 BUG,因为 hugepage 的 page fault 是在do_page_fault里面独立处理的,不会调用到vm_operations_structfault接口。mmap 走的流程是 mmap -> mmap_region -> make_pages_presetn -> get_user_pages -> handle_page_fault -> handle_mm_fault -> hugetlb_fault

以上是大页的分配和使用的流程,希望对大家有帮助。

参考:

  1. Linux KVM concept - Memory
  2. Linux 内核中大页的实现与分析,第 1 部分

不同的隔离级别有不同的约束,而且不一定是子集和超集的关系,约束可能是交集的。这里尝试循序渐进的加强锁的级别和实现方式来阐述各个隔离级别的区别,由此可以选择在实际开发中适合业务的隔离策略。我自己最近也在调研 cockroachdb 比较关心事务这方面的性能,cockroachdb 的事务主要是两个级别,SI 和 SSI,会在下面提到。

总视图

这是各个隔离级别的一个关系,可以看出不一定是包含与被包含的关系(因为完整的隔离级别不只 4 种)。

定义

长期锁:到事务结束就释放的锁
短期锁:对相关数据操作完成就释放的锁

这里提到的写锁和排他锁可以互换,读锁和共享锁可以互换,长期锁也被称为二阶段锁,就是事务某个时候锁上了算一个阶段,最后一起释放算一个阶段。

断言型:基于先判的锁的修饰词,比如 WHERE 语句指定的范围就是预测型,如果没有 WHERE 可能就是整张表。

如果没有断言修饰的话,锁就是有指定数据的锁,也就是有明确索引的锁。

缩写 P(Phenomena) A(Anomalies)

P0 dirty write (脏写)

现象:

我们最开始的阶段是一切皆有可能发生,没有任何锁,所以碰到的第一个问题是脏写。

脏写导致的问题是,破坏数据的一致性,一个事务 (T1) 如果覆盖了另一个正在执行事务 (T2) 之前写入的值的时候就会导致脏写。比如 T1 写入 x=y=1 并且 T2 写入 x=y=2,但是整个数据的一致性就被破坏了。

解决:

对 x 和 y 持有长期写锁。基本上没有事务不拿长期写锁的,不然数据库连回退的可能都没有。防止脏写以后会出现新的现象,脏读。

P1 dirty read (脏读 read uncommited)

现象

事务 (T1) 写入的值被正在执行的事务 (T2) 读取。比如 x 向 y 转 40,在 x 写入以后,T2 看到的总和只有 60。

网上很多人把这个级别算作了脏写其实不是很严格,最开始那个 P0 级别才算是脏写。

解决

使用短期读锁和长期写锁,长期写锁可以防止 T2 的 X 读数据,短期读锁可以防止 T1 的 y 写不了(如果使用长期读锁就被阻塞到 T1 结束了)。解决脏读问题,我们又面临的问题是不可重复读

P2 non-repeatable read (不可重复读)

现象

当正在执行的事务 (T1) 读取的值被事务 (T2) 写入的时候,对 T1 来说就出现了不一致。例如下图,X 在被读取之后又被 T2 写入,这个时候总的钱数出现了不一致。

解决

使用长期读锁和长期写锁,也就是 T2 的 X 要等 T1 提交之后才能写入。对于不可重复读的问题。不过短期的断言型读锁也是足够的,因为 X 和 Y 如果都提前读取出来还是能保持一致的。我们解决了不可重复读以后,还会碰到幻读的情况。

P3 phantom (幻读)

现象

幻读发生在正在执行的事务 T1 有断言的读 (select where) 时,另外一个事务 T2 执行了和断言集合有交集的插入操作。
比如 T1 在 T2 之前读到了员工总数是 3,但是 T2 执行的时候有交集,插入了新的数据,这个时候员工总数是 4,但是 T1 如果再读取的话,就会发现员工总数变成了 4,而不是最初的 3,这就是幻读。

解决

解决幻读的方式是使用长期(断言型)读锁和写锁。也就是不允许在这个范围内进行插入操作。解决了幻读以后我们的事务就完全串行化了,这样的事务并发度是最弱的。

P4 update lost (更新丢失)

常见的 4 个隔离级别说完了以后我们看一下剩下的部分,注意更新丢失这个现象不是比幻读更约束的现象,这个是在基于防止脏读以后可能会出现的现象,会被可重复读防止。

现象

事务 T2 提交的写被其他事务覆盖,首先,没有脏写,因为 T2 已经提交,其次没有脏读,因为在写之后没有读操作,这样的现象称为更新丢失。

解决

升级到可重复读就可以了。

P4C cursor update lost (游标更新丢失)

现象

和更新丢失是一样的,这个只是约束在了游标操作的时候,事务 (T1) 对游标下的数据进行读之后被另一个事务 (T2) 提交了,在游标下的数据写之后让,T1 的写导致 T2 的更新丢失。

解决

在游标移动或者释放之前,都不释放锁,这个是到达可重复读之前的一个插曲。这个也是在读提交的阶段会发生的事情。

A5A read skew (读偏)

偏可以理解为不一致,这个是发生在多个数据之间有一个总的约束的时候。

现象

总的钱是 100,但是从 T1 的角度,总的钱数是 (50+75),因为只有短期读锁。

解决

使用快照隔离 (Snapshot Isolation),快照隔离是基于 MVCC 的。当一个 T 事务开始的时候,T 会获得一个抽象的时间戳(版本),当对数据 X 进行读取的时候,并不是直接看到最新写入的数据而是在 T 开始前的所有执行中的事务中最后一个对 X 标记的版本(如果 T 修改过 X,那么看到的是自己的版本)。也就是说 T 是基于当前的数据库的一个镜像进行操作的,有点类似于 Copy And Swap,而 T 开始执行是获得的版本就是这个快照的凭证。这样能保证所有的读都是基于一个一致的状态获取的。

SI 解决冲突的方法一般是 “First-Commiter-Wins” 也就是说,如果两个并发的事务修改了同一个数据,先写的事务会成功,而后写的事务会发现版本和原本的不一致而退出事务。

以我们的例子来说的话,T1 的 y 只会读到自己开始时候的版本,也就是 50,而不是 75,这样读偏就解决了。但是快照隔离还是不能解决另一个问题,就是写偏。这是我们要面临的新问题。

A5B write skew (写偏)

现象

这个和读偏类似,只不过,不一致在了整个系统上。T1 写锁有 y 的新版本,T2 写锁有 x 的新版本,他们没有写冲突,导致最后系统不一致,x+y 的钱变多了。

解决

目前快照隔离的算法有很多,参考 cockroachDB 使用的论文的话,可以说,通过对版本依赖构成有向图,解决成环问题,以此达到串行快照隔离的级别。

比如上面的例子,如果 T1 在 y 读了之后写了一个版本的 y 就构成一个先读后写的 rw(y) 依赖,类似的 T2 对 T1 构成了一个先读后写的 rw(x) 依赖。还有两种无害的依赖是先写后读 (wr) 和先写后写 (ww)。论文中阐述了,造成写偏的条件是成环,并且环中有两个连续的 rw 依赖。也就是下面这种形式。

这个问题的关键是,检查成环这件事情,就跟操作系统检查死锁一样,消耗太大了,性能上不能接受,所以这个实现的妥协是,把检查放宽,让一些无害的条件也被认定为有害,通过重试来恢复执行,至少防止错误的发生。
这个条件是只要有两个连续的 rw 依赖就会放弃提交,即使没有成环。这个检查发生在读的时候如果发现读的版本和自己开始之前的版本不一致就会找到依赖的事务,构建一条入边,另一个事务构建一条出边,如果某个事务入边出边都有 rw 边,这个节点就会被作为嫌疑人。当然还有其他关于串行 snapshot 隔离的论文可以参考。

其他

Oracle Consistent Read 也算是另一种快照级别的隔离。

参考文献

  1. A Critique of ANSI SQL Isolation Levels
  2. ASCI-SQL
  3. cockroachdb 使用的算法

援引自 cockroach 博客 的解释

cockroach 的底层 scaling 存储本身可以认为是一张无限大的有序 KV map,上层封装了一层 SQL。他 sharding 的方式是通过 range 实现的。对一组 64M 大小数据的范围内的 key 做冗余,用二级 metadata range 可以理论上存储 4EB 的数据。

举个例子,五节点打散三副本的一个 range 做冗余,是成本性能和效率上比较典型的一个折衷。像你说的 leader 集中的情况是比较少的,因为三副本是打散分布到不同节点上的,所以三副本的交集不会太集中,单点的压力还是很少的。

另外租约的存在(不知道算不算在 multi raft 算法内),也可以缓解集中化的压力,通过获得授权的租约旁路一些“可以本地化”的请求(比如只对一个 replica 的操作以及一些能保证数据不变的一段时间的读请求),来缓解主的压力,这个最早我记得在 GFS 里面看到过是这么设计的。

另外 gossip 的协议有点像交换机的生成树协议,是通过互相传播自己的负载和位置信心来传播信息的,没有中心化的决策节点,这样的好处就是无中心化,不会依靠一个中心的节点来做负载均衡,而且整个系统就需要一个二进制文件不依赖任何的其他服务。

如图是一个 range 在三节点上的三副本的一个 raft group 之间有网路通信。

但是如果 range 变多的话,就会很复杂,如下图,是四个节点之间大概 8 个 (如果我没看错颜色的话) range 的 raft group 之间通信。

很明显网络 RoundTrip 太多了。针对这个的优化就是 MultiRaft 了,说白了就是把网络请求 batch up,变成下面的形式。

这样网络压力就会小很多。针对这个的优化和应用本身的实现是耦合的,作者也尝试把这个优化加到 etcd/raft 当中,但是最后的决定是增加 RawNode 这样的无线程安全的接口给应用层实用,方便对心跳等请求的合并,所以你会看到有个非线程安全的 RawNode 的实现。

具体到 cockroach 的实现是可以找到相应的代码的,省略无关代码就如下。

在发送消息的时候检查是否有可以整合的请求。

1
2
3
4
5
6
7
8
9
func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
for range {
// 迭代每个 follower 发送消息
// 对消息进行缓存
if r.maybeCoalesceHeartbeat(ctx, msg, toReplica, fromReplica, true) {
continue
}
}
}

循环取出缓存的消息,注意因为是整合了消息,所以对消息来说有延迟,这里稍微加快了频率,弥补这个延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Since coalesced heartbeats adds latency to heartbeat messages, it is
// beneficial to have it run on a faster cycle than once per tick, so that
// the delay does not impact latency-sensitive features such as quiescence.
func (s *Store) startCoalescedHeartbeatsLoop() {
s.stopper.RunWorker(func() {
ticker := time.NewTicker(s.cfg.CoalescedHeartbeatsInterval)
defer func() {
ticker.Stop()
}()

for {
select {
case <-ticker.C:
s.sendQueuedHeartbeats()
case <-s.stopper.ShouldStop():
return
}
}
})
}

在 Store 这个层面是对每个 Replica (一个 raft group 的成员) 是有感知的,但是每个 raft group 是独立的,只是在发送是会检查对方节点是否和现在的 raft group 有交集,
从而尝试合并消息的发送,你可以理解为在节点之间的消息通道上对不同 raft group 之间多路复用。
这就是 MultiRaft 对多个 raft group 的网络吞吐做的一个优化,代价是可能造成消息的延迟,因为毕竟是被缓存到队列里了。

其实大家提的 LSM 最开始论文里面都使用树做搜索结构的, 现在在用的都不是严格的树结构了。

这篇文章解释的一样,从最朴素的角度上来讲可以把SSTable(sorted string table)作为一个连续的kv构成的块。

1
2
3
4
SSTable
+-+---+----+---+
|k| v | k | v | ...
+-+---+----+---+

对于一个大文件来说,读取整个文件以后就能构成一个各个键值的索引,当然可以在文件追加一块索引,和文件一起保存。

1
2
3
4
5
6
7
8
9
Index
+-+-------+
|k|offset |
+-+-------+
|k|offset |
+-+-------+
.
.
.

有了索引以后用 seek 操作或者直接把文件 mmap 到内存中都可以有很好的随机读性能。

但是对于随机写来说, 会造成大量的I/O,如果我们能够保证我们的SSTable是不可修改(immutable)的,只有SSTable在内存当中的时候(也就是MemTable)才可以修改,就能避免随机写的大负载问题。

通过下面几条约束就能完成我们的要求:

  1. 首先SSTable索引要放在内存中,这样读索引更快
  2. 所有写只能写到MemTable当中, 因为SSTable不可修改
  3. 所有读要先查看MemTable如果没有再查看内存中的索引(最后找到磁盘上的kv)
  4. 定期把MemTable刷成SSTable,这段时间MemTable也变成了不可修改的,新的MemTable会顶替
  5. 定期对SSTable进行合并

最终我们保证了随机写很快(因为只在MemTable中),随机读也很快(因为要么在MemTable中要么通过索引可以很快找到)。

还有一个问题是对于已有数据的删除和修改怎么办?

因为SSTable不可修改所以只能追加写一个新的数据覆盖老的数据,对于删除则是追加一个”墓碑”值覆盖掉存在的值。把索引指向新值,这样老值就不会被访问了。最后在SSTable合并的时候这些老值会完全消失。所以还要定期合并SSTable

以上是对leveldb的LSM结构的朴素解释。实际上MemTableSSTable都没有采用纯粹的树形结构,MemTable使用的是跳表而SSTable使用的是层次的结构。(这也是为什么 leveldb 叫 level db 的原因)

从这里开始完善朴素解释

首先对于MemTable来说不是持久化的如果重启导致内存中的数据丢失怎么办?WAL 表示的是预写日志,这个日志和MemTable是同步的,这个日志把每次的命令追加到日志中再更改MemTable,这样如果重启的话能够进行”重放”把从已经持久化的状态开始把数据填回到MemTable当中。

其次是对SSTable的合并,SSTable是分层存储的,第一层也就是Level0(被称作 young level),是MemTable刷入的一层,允许这一层的SSTable的key有交集。对于每一层都有一个阈值(young level 是 4,其他层是按大小算的,10^L MB),如果超过阈值自动向下一层合并,从level1开始的每一次key不允许有交集。具体的做法是从 young level 中把有交集的SSTable一起和下一层key有交集的SSTable合并成一个新的SSTable,然后其他层则是从自身层取出一个和下一层有交集的SSTable合并即可。这个属性可以用归纳法证明,从0层向1层合并的时候,1层只有一个的情况下肯定不会相交,然后假设n个的时候也不相交,在n+1的时候有交集,那么n+1合并时有0层的 key 和 n 当中的有交集,但是有交集的部分会被归并掉所以矛盾,所以n+1个的时候也是没有交集的。那1层能保证没有交集的话取出一个向下合并也是类似的不会有交集。所以再重复一遍分层存储的两个属性。

对于朴素解释的两个扩展使得我们对leveldb的设计更接近了。

  1. young层SSTable之间可能存在交集
  2. Li(i>0)层SSTable之间不存在交集

在这个基础上再增加几个约束条件,一个是,对于合并过程每超过2MB就会产生一个新文件,如果文件和下一层的文件有交集的个数有10个以上的话也会产生一个新文件,这样的目的是保证Ln和Ln+1之间不会重复太多。个人理解是覆盖太多,会成了倒三角的”树”情况,上一层搜索性能不好。

当然大量的随机读落在磁盘上还是会有性能问题,因为 seek 也可能是不连续的,这个可以想办法优化, 比如leveldb 里面使用了一种LRU缓存优化读性能。

参考

  1. 官方实现文档
  2. LSM-tree论文

SSA概述

SSA在Go1.7中被引入,这个特性对编译器的性能有很大的提高,但是也导致编译过程有些减速。下面来结合网上的资粮和书籍,简单说明一下SSA以及SSA的应用。

SSA 代表 static single-assignment,是一种IR(中间表示代码),要保证每个变量只被赋值一次。这个能帮助简化编译器的优化算法。

1
2
3
y := 1
y := 2
x := y

比如上面这段代码,y = 1其实是不可用的,这个要通过定义的可达分析来确定y是要用1还是2,而SSA有一个标识符可以称之为版本或者“代“。

1
2
3
y1 := 1
y2 := 2
x1 := y2

这样就没有任何间接值了。用SSA表示的好处是对于同一个变量的无关使用表示成不同“代”,可以方便很多编译器的优化算法的实现。

一个概念:

Φ(读作fai) 函数,表示要根据控制流赋值的“代”。

例子可以参考维基里的这一段

三个定义:

A dominate B,如果从起点开始必须通过A到达B。也就是说A是到B的必经之路。

A strictly dominate B,如果 A dominate B,并且A和B不相等。

A 的 dominance frontier 含有B,如果A没有strictly dominate B,但是 dominate 了B的一个前驱节点。

用遍历的方式确定 dominance frontier 的伪代码。

1
2
3
4
5
6
7
for each node b
if the number of immediate predecessors of b ≥ 2
for each p in immediate predecessors of b
runner := p
while runner ≠ idom(b)
add b to runner’s dominance frontier set
runner := idom(runner)

idom(b) 代表相邻的strictly dominate b的结点。这样的点只有一个,因为相邻的点有两个的话就不会是必经之路了。

如图是一个例子,2的前驱是1和7,7没有sd(strictly dominate) 2,所以把2加入到7的DF(dominate frontiers)当中。3是7的相邻sd,然后3不是2的sd所以2加入到3的DF当中,接着2是3相邻的sd,然后2不是2的sd所以把2加入到2的DF中,最后遍历到7,5和6不是7的sd所以把7加入到5和6的df当中。

df(A)可以认为是可以通过A到达的,但不是必经之路的点的集合。

有了这个定义以后就可以插入Φ函数了和重命名了。如果X中有定义a那么所有df(X)都需要a的Φ函数。并且Φ函数本身也是一个定义。

比如还是同一个例子。

1当中有j的定义,但是df(1)是空的,5当中有j的定义,并且5的df有7所以7当中要插入一个φ(j, j)。j现在在7中定义了(通过φ函数)所以df(7)中的2也要有φ(j, j),6也有j的定义但是7已经有了φ函数了,2的df有2,但是2已经有φ函数了。类似的方式可以应用到i和k。接着对定义重命名就可以完成SSA的转换了。

SSA的应用

上面只是通俗的解释了一下SSA,没有给出更多详细的理论和算法以及证明,因为证明实在难看懂,下面说一下SSA的应用。

DEAD CODE 消除

因为每个变量都有“代”(因为大家都只被赋值一次),所以很容易检查出没有被使用的变量并且删除对应的定义。另外如果删除v=x这样的定义还要在x的use表中把这条语句删除。

简单的常量扩展

比如v = φ(c1,c2,...,cn)这种格式,如果c都是相等的可以直接替换成c,或者v=c,如果c是常量的话也可以直接替换掉。在做这些的同时也可以做其他的优化,能够在一趟遍历中都完成,比如copy propagationx=y或者x=φ(y)都可以直接用y替换掉x。比如constant foldingx=a+b如果a+b是常量的话,可以直接用常量赋值。

当然还有其他的优化算法,出发点都是基于SSA的简单性。

从SSA转换回原始代码

y = φ(x1, x2, x3)这样的形式要根据条件分支再拆分回原来的形态,比如满足1条件,使用y=x1这样的形式。并且可能会很自然的想把x1和x2变回使用同一个寄存器,但是通过优化过程中的一些手段(copy propagation)已经把大部分move指令给优化掉了,并且重新推回x可能会有生命周期的影响,所以还是保留了“代”。

总结

其实这个就是把def-use换成了use-def方便做代码优化呀:)害我看那么久

参考文献

  1. 虎书第十九章
  2. 维基百科

本篇文章主要分析一下经过纯物理层之后从设备读取的frame是如何进入协议栈的和对应封装好的frame是如何离开协议栈的,多亏协议栈的分层思路,每一层之间都可以独立分析和阅读,对于驱动来说,首先要面对的就是中断了, 本文基于4.7.2的内核简单的过了一遍.

中断处理

中断处理是分析链路层数据传输的入口。
例如在drivers/net/ethernet/3com/3c59x.c这个是一个华三的设备驱动, 观察这个驱动的中断处理函数可以知道,frame的接收的入口是vortex_rx,而当设备有足够的空间发送frame的时候就会调用netif_wake_queue来触发发送frame的例程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* This is the ISR for the vortex series chips.
* full_bus_master_tx == 0 && full_bus_master_rx == 0
*/

static irqreturn_t
vortex_interrupt(int irq, void *dev_id)
{
struct net_device *dev = dev_id;
struct vortex_private *vp = netdev_priv(dev);
void __iomem *ioaddr;
int status;
int work_done = max_interrupt_work;
int handled = 0;
unsigned int bytes_compl = 0, pkts_compl = 0;

ioaddr = vp->ioaddr;
spin_lock(&vp->lock);

status = ioread16(ioaddr + EL3_STATUS); // 从ioremap的地址当中读取网卡的状态,这个是设备规定的地址

if (vortex_debug > 6)
pr_debug("vortex_interrupt(). status=0x%4x\n", status);

// 在中断处理的时候,设备是关中断的,这个时候可以通过观察这个status来检查设备是否有中断到来.
if ((status & IntLatch) == 0) // 有中断需要处理,但是可能已经被其他中断处理函数处理了
goto handler_exit; /* No interrupt: shared IRQs cause this */
handled = 1;

if (status & IntReq) { // 中断请求
status |= vp->deferred;
vp->deferred = 0;
}

if (status == 0xffff) /* h/w no longer present (hotplug)? */
goto handler_exit;

if (vortex_debug > 4)
pr_debug("%s: interrupt, status %4.4x, latency %d ticks.\n",
dev->name, status, ioread8(ioaddr + Timer));

spin_lock(&vp->window_lock);
window_set(vp, 7);

do {
if (vortex_debug > 5)
pr_debug("%s: In interrupt loop, status %4.4x.\n",
dev->name, status);
if (status & RxComplete) // 中断表示接收完成的时候调用 vortex_rx
vortex_rx(dev);

if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug(" TX room bit was handled.\n");
/* There's room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

if (status & DMADone) {// 表示发送完成可以清除sk_buff了
if (ioread16(ioaddr + Wn7_MasterStatus) & 0x1000) {
iowrite16(0x1000, ioaddr + Wn7_MasterStatus); /* Ack the event. */
pci_unmap_single(VORTEX_PCI(vp), vp->tx_skb_dma, (vp->tx_skb->len + 3) & ~3, PCI_DMA_TODEVICE);
pkts_compl++;
bytes_compl += vp->tx_skb->len;
dev_kfree_skb_irq(vp->tx_skb); /* Release the transferred buffer */
if (ioread16(ioaddr + TxFree) > 1536) {
/*
* AKPM: FIXME: I don't think we need this. If the queue was stopped due to
* insufficient FIFO room, the TxAvailable test will succeed and call
* netif_wake_queue()
*/
netif_wake_queue(dev);
} else { /* Interrupt when FIFO has room for max-sized packet. */
iowrite16(SetTxThreshold + (1536>>2), ioaddr + EL3_CMD);
netif_stop_queue(dev);
}
}
}
/* Check for all uncommon interrupts at once. */
if (status & (HostError | RxEarly | StatsFull | TxComplete | IntReq)) {
if (status == 0xffff)
break;
if (status & RxEarly)
vortex_rx(dev);
spin_unlock(&vp->window_lock);
vortex_error(dev, status);
spin_lock(&vp->window_lock);
window_set(vp, 7);
}

if (--work_done < 0) { // 最多处理work_done个frame
pr_warn("%s: Too much work in interrupt, status %4.4x\n",
dev->name, status);
/* Disable all pending interrupts. */
do {
vp->deferred |= status; // 把当前状态保存起来等下次中断的时候处理
iowrite16(SetStatusEnb | (~vp->deferred & vp->status_enable),
ioaddr + EL3_CMD);
iowrite16(AckIntr | (vp->deferred & 0x7ff), ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_CMD)) & IntLatch); // 把中断清掉?
/* The timer will reenable interrupts. */
mod_timer(&vp->timer, jiffies + 1*HZ);
break;
}
/* Acknowledge the IRQ. */
iowrite16(AckIntr | IntReq | IntLatch, ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_STATUS)) & (IntLatch | RxComplete));// 当有pending的中断并且是接收完成的状态

netdev_completed_queue(dev, pkts_compl, bytes_compl);
spin_unlock(&vp->window_lock);

if (vortex_debug > 4)
pr_debug("%s: exiting interrupt, status %4.4x.\n",
dev->name, status);
handler_exit:
spin_unlock(&vp->lock);
return IRQ_RETVAL(handled);
}

当中断到来的时候检查RxComplete,如果这个状态置位了说明有frame可以处理,当状态有TxAvailable的时候表示网卡的缓冲有空可以尝试发送frame,这样的检查会循环很多次,直到出现错误或者超过规定的循环次数max_interrupt_work,这个值对于这个设备来说是32,也就是说最多接受32帧就会退出中断处理。

frame的接收

vortex_rx最终会调用netif_rx这个函数很关键,vortex_rx会通过netdev_alloc_skb分配一个sk_buff,这个是一个会贯穿整个协议栈的一个buff,然后从设备中拷贝frame,拷贝的方式也有很多比如直接读取或者利用DMA。
从DMA读取到内存当中比如截取的这段代码

1
2
3
4
5
6
7
8
9
10
// 转换成总线地址给DMA设备,启动DMA传输,然后循环检查传输状态
// 最后取消总线地址的映射.
dma_addr_t dma = pci_map_single(VORTEX_PCI(vp), skb_put(skb, pkt_len),
pkt_len, PCI_DMA_FROMDEVICE);
iowrite32(dma, ioaddr + Wn7_MasterAddr);
iowrite16((skb->len + 3) & ~3, ioaddr + Wn7_MasterLen);
iowrite16(StartDMAUp, ioaddr + EL3_CMD);
while (ioread16(ioaddr + Wn7_MasterStatus) & 0x8000)
;
pci_unmap_single(VORTEX_PCI(vp), dma, pkt_len, PCI_DMA_FROMDEVICE);

也有不饶过CPU直接读取的

1
2
3
ioread32_rep(ioaddr + RX_FIFO,
skb_put(skb, pkt_len),
(pkt_len + 3) >> 2);

接着调用eth_type_trans确定对应的protocol,并且最终调用netif_rx来继续处理接收任务。

到了netif_rx就是一个通用流程了,netif_rx调用了netif_rx_internal,这个函数会先通过net_timestamp_check检查sk_buff的时间戳,并且设置,然后调用enqueue_to_backlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
* enqueue_to_backlog is called to queue an skb to a per CPU backlog
* queue (may be a remote CPU queue).
*/
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;

sd = &per_cpu(softnet_data, cpu); // 获取per-cpu softnet_data

local_irq_save(flags);

rps_lock(sd);
if (!netif_running(skb->dev)) // 如果设备已经没有运行的直接丢frame
goto drop;
qlen = skb_queue_len(&sd->input_pkt_queue); // 获取softnet_data的&sk_buff的队列长度
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) { // 如果没有超过最大长度并且没有被限制
if (qlen) {
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags); // 加入队列并且开中断
return NET_RX_SUCCESS;
}
// 如果队列为空可以尝试调度 backlog device,
// 再把frame加入到队列当中。
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
____napi_schedule(sd, &sd->backlog);
}
goto enqueue;
}

drop:
sd->dropped++;
rps_unlock(sd);

local_irq_restore(flags);

atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}

进入到____napi_schedule则很简单了,就是唤起把backlog这个softnet_data上的device加入到poll_list当中然后唤起softirq来处理。

1
2
3
4
5
6
7
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

这里要提的是,kernel对于frame的处理有一个比较新的API,称之为NAPI。

对于NAPI来说, net_rx_action结合了轮询和中断, 当设备有中断表示收到frame的时候会把它加入softnet_datapoll_list, 这个时候设备不再发出中断,然后通过轮询这些设备是否有剩余frame处理,并且限制最大处理数量,这样能保证cpu不会承受过大的中断load的压力,并且可以使得设备之间能够相对公平获得处理的机会,而不是中断更频繁的设备获得的机会更多。

对于使用NAPI的设备来说显然需要提供poll接口来供查询之用,同时用NET_RX_SOFTIRQ触发软中断执行net_rx_action这个软中断处理函数,而不需要netif_rx这个接口。

NET_RX_SOFTIRQ做的事情就是把设备加入poll列表并且触发softirq. 对于没有使用NAPI的设备来说,会使用per-cpu 结构softnet_data中的backlog_dev(是一个假的胶水层的封装)来替代放入对应的poll列表中,然后再进入netif_rx_schedule的例程来处理。也就是说NAPI-unaware的设备用的是backlog_dev而NAPI-aware的设备用的是自己的device结构体。

相反的把设备从轮询列表中移除依靠的是netif_rx_complete. 这样轮询检查的时候就不会处理对应的设备了.

这里说了这么久的轮询不要误会frame的主力是轮询,显然这是不行的,因为网络数据要求接受很快,还是中断驱动的,只不过为了物尽其用,既然你有数据帧还在就不要中断告诉我,我继续处理就可以了。

现在看一下加入到poll_list之后,NET_RX_SOFTIRQ软中断是如何工作的。关于软中断的实现可以参考参考列表中的第二个链接,我本来想自己总结一下,但是发现这篇文章确实总结的很好,所以保存一下就好了。

补充一点,softnet_data->input_pkt_queue是frame的缓存队列,这个队列有一个最大值netdev_max_backlog,目前这个值是1000,也就是每个CPU最多有1000个frame没处理。对于有自己device的设备,frame需要通过设备的poll方法来获取。

net_rx_action对应的是软中断NET_RX_SOFTIRQ的处理函数。之前说过net_rx_action的工作方式,这里看一下具体的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget; // 一个budget用于限制运行时间,目前是300
LIST_HEAD(list);
LIST_HEAD(repoll);

local_irq_disable();
list_splice_init(&sd->poll_list, &list); // 把poll_list合并到list上并且把poll_list清空
local_irq_enable();

for (;;) {
struct napi_struct *n;

if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
return;
break;
}

n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll); // 每调用一次会减掉相应的budget,如果还有work没完成还需要poll就会加入到repoll链表里。

/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 || // 如果budge耗尽或者超过了两个jiffies就会停止
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}

__kfree_skb_flush();
local_irq_disable();

list_splice_tail_init(&sd->poll_list, &list); // 再把poll_list合并到list当中,并且清空poll_list。
list_splice_tail(&repoll, &list); // 然后把repoll合并到list当中。
list_splice(&list, &sd->poll_list); // 再把list合并到poll_list当中。
if (!list_empty(&sd->poll_list)) // 这几步的过程就是把需要repoll的设备和当前的poll_list合并
__raise_softirq_irqoff(NET_RX_SOFTIRQ); // 如果还有需要poll的设备就再触发软中断.

net_rps_action_and_irq_enable(sd);
}

整个过程就是检查poll_list并且轮询调用poll方法。接下来具体看一下poll方法的相关内容。以非NAPI设备为例,使用的是backlogpoll方法,对应的方法可以在net/core/dev.c当中找到,在初始化函数net_dev_init当中sd->backlog.poll = process_backlog;给每个per-cpu结构的softnet_data都是指向了这个poll函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);

/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}

napi->weight = weight_p; // 这个值目前是32
local_irq_disable();
while (1) {
struct sk_buff *skb;

while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
local_irq_enable();
__netif_receive_skb(skb); // 取出的skb交给__netif_receive_skb处理
rcu_read_unlock();
local_irq_disable();
input_queue_head_incr(sd);
if (++work >= quota) {
local_irq_enable();
return work;
}
}

rps_lock(sd);
if (skb_queue_empty(&sd->input_pkt_queue)) {
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
napi->state = 0;
rps_unlock(sd);

break;
}

skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
rps_unlock(sd);
}
local_irq_enable();

return work;
}

这个函数的主体就是把&sd->input_pkt_queue交给&sd->process_queue然后从&sd->process_queue当中取出sk_buff,再调用__netif_receive_skb,进一步处理.我想老是把&sd->input_pkt_queue拷贝并且清空应该是因为希望不因为锁独占这个队列太久的原因.

__netif_receive_skb的内容主要是根据skb->protocol,遍历&skb->dev->ptype_all然后将frame交给L3的ptype->func处理,还有一些在这一层需要处理的特性,比如bridging.

目前说描述的东西就是整个传输路径的这部分,完整的图在这里

frame的发送

在frame的发送路径,主要包含几个任务,开关设备的发送功能,调度设备进行发送,选择在设备发送队列的frame进行发送,还要传输过程的本身.而且发送的例程也是类似接收的过程,有对应的softirq(NET_TX_SOFTIRQ)和对应的handler,net_tx_action。和poll_list对应的是output_queue,也是一个等待发送的设备列表。
__LINK_STATE_START__LINK_ STATE_XOFF__LINK_STATE_RX_SCHED__LINK_STATE_SCHED也是对应的.回顾开头中断的代码

1
2
3
4
5
6
7
if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug(" TX room bit was handled.\n");
/* There's room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

就是说当状态可用的时候,尝试调用netif_wake_queue来触发frame的发送。

首先来看一下如何选择发送的设备。

1
2
3
4
5
6
7
8
9
10
11
/**
* netif_wake_queue - restart transmit
* @dev: network device
*
* Allow upper layers to call the device hard_start_xmit routine.
* Used for flow control when transmit resources are available.
*/
static inline void netif_wake_queue(struct net_device *dev)
{
netif_tx_wake_queue(netdev_get_tx_queue(dev, 0));
}

这个函数内部调用了

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void __netif_reschedule(struct Qdisc *q)
{
struct softnet_data *sd;
unsigned long flags;

local_irq_save(flags);
sd = this_cpu_ptr(&softnet_data);
q->next_sched = NULL;
*sd->output_queue_tailp = q;
sd->output_queue_tailp = &q->next_sched;
raise_softirq_irqoff(NET_TX_SOFTIRQ);
local_irq_restore(flags);
}

主体就是把设备加入output_queue(通过->next_sched连接)然后唤起软中断NET_TX_SOFTIRQ

再来看看软中断的主体。
首先是遍历completion_queue来释放sk_buff这个连接是通过dev_kfree_skb_irq添加的,和正常的dev_kfree_skb不同,它是把sk_buff加入到释放列表中就快速返回了, 然后就会遍历设备列表寻找可运行的设备,调用qdisk_run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 void net_tx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);

if (sd->completion_queue) { // 检查completion_queue逐一释放sk_buff, 他们是通过dev_kfree_skb_irq,因为他不真的释放sk_buff,而是把sk_buff移到completio_queue当中.
struct sk_buff *clist;

local_irq_disable();
clist = sd->completion_queue;
sd->completion_queue = NULL;
local_irq_enable();

while (clist) {
struct sk_buff *skb = clist;
clist = clist->next;

WARN_ON(atomic_read(&skb->users));
if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
trace_consume_skb(skb);
else
trace_kfree_skb(skb, net_tx_action);

if (skb->fclone != SKB_FCLONE_UNAVAILABLE)
__kfree_skb(skb);
else
__kfree_skb_defer(skb);
}

__kfree_skb_flush();
}

if (sd->output_queue) {
struct Qdisc *head;

local_irq_disable();
head = sd->output_queue; // 拷贝并首尾相连.
sd->output_queue = NULL;
sd->output_queue_tailp = &sd->output_queue;
local_irq_enable();

while (head) {
struct Qdisc *q = head;
spinlock_t *root_lock;

head = head->next_sched;

root_lock = qdisc_lock(q);
if (spin_trylock(root_lock)) {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
qdisc_run(q); // qdisk_run
spin_unlock(root_lock);
} else {
if (!test_bit(__QDISC_STATE_DEACTIVATED,
&q->state)) {
__netif_reschedule(q);
} else {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
}
}
}
}
}

qdisc_run首先检查设备的运行状态然后调用__qdisc_run,这个函数的主体就是调用qdisc_restart,直到超过限制或者需要让出时间CPU了,最后清空qdisc的RUNNING状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void __qdisc_run(struct Qdisc *q)
{
int quota = weight_p;
int packets;

while (qdisc_restart(q, &packets)) {
/*
* Ordered by possible occurrence: Postpone processing if
* 1. we've exceeded packet quota
* 2. another process needs the CPU;
*/
quota -= packets;
if (quota <= 0 || need_resched()) {
__netif_schedule(q);
break;
}
}

qdisc_run_end(q);
}

qdisc_restart的主体是从队列中取出sk_buff,然后调用sch_direct_xmit,它的功能是调用dev_hard_start_xmit来运行设备驱动的指定的传输方法hard_start_xmit,如果没有成功则把sk_buff重新加入到出队当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
* Transmit possibly several skbs, and handle the return status as
* required. Holding the __QDISC___STATE_RUNNING bit guarantees that
* only one CPU can execute this function.
*
* Returns to the caller:
* 0 - queue is empty or throttled.
* >0 - queue is not empty.
*/
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock, bool validate)
{
int ret = NETDEV_TX_BUSY;

/* And release qdisc */
spin_unlock(root_lock);

/* Note that we validate skb (GSO, checksum, ...) outside of locks */
if (validate)
skb = validate_xmit_skb_list(skb, dev);

if (likely(skb)) {
HARD_TX_LOCK(dev, txq, smp_processor_id());
if (!netif_xmit_frozen_or_stopped(txq))
skb = dev_hard_start_xmit(skb, dev, txq, &ret);

HARD_TX_UNLOCK(dev, txq);
} else {
spin_lock(root_lock);
return qdisc_qlen(q);
}
spin_lock(root_lock);

if (dev_xmit_complete(ret)) {
/* Driver sent out skb successfully or skb was consumed */
ret = qdisc_qlen(q);
} else {
/* Driver returned NETDEV_TX_BUSY - requeue skb */
if (unlikely(ret != NETDEV_TX_BUSY))
net_warn_ratelimited("BUG %s code %d qlen %d\n",
dev->name, ret, q->q.qlen);

ret = dev_requeue_skb(skb, q);
}

if (ret && netif_xmit_frozen_or_stopped(txq))
ret = 0;

return ret;
}

至此整个L2的发送和接收就大致有了一个了解,并且能够理解整个的工作过程,这里有一点没有讲到的是L2的转发机制,比如STP等协议和实现,可能会在接下来的文章中继续剖析.

参考:

  1. Linux 下DMA浅析
  2. linux kernel的中断子系统之(八):softirq
  3. Linux Network Internals