ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

kernel-协议栈链路层分析

本篇文章主要分析一下经过纯物理层之后从设备读取的frame是如何进入协议栈的和对应封装好的frame是如何离开协议栈的,多亏协议栈的分层思路,每一层之间都可以独立分析和阅读,对于驱动来说,首先要面对的就是中断了, 本文基于4.7.2的内核简单的过了一遍.

中断处理

中断处理是分析链路层数据传输的入口。
例如在drivers/net/ethernet/3com/3c59x.c这个是一个华三的设备驱动, 观察这个驱动的中断处理函数可以知道,frame的接收的入口是vortex_rx,而当设备有足够的空间发送frame的时候就会调用netif_wake_queue来触发发送frame的例程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* This is the ISR for the vortex series chips.
* full_bus_master_tx == 0 && full_bus_master_rx == 0
*/

static irqreturn_t
vortex_interrupt(int irq, void *dev_id)
{
struct net_device *dev = dev_id;
struct vortex_private *vp = netdev_priv(dev);
void __iomem *ioaddr;
int status;
int work_done = max_interrupt_work;
int handled = 0;
unsigned int bytes_compl = 0, pkts_compl = 0;

ioaddr = vp->ioaddr;
spin_lock(&vp->lock);

status = ioread16(ioaddr + EL3_STATUS); // 从ioremap的地址当中读取网卡的状态,这个是设备规定的地址

if (vortex_debug > 6)
pr_debug("vortex_interrupt(). status=0x%4x\n", status);

// 在中断处理的时候,设备是关中断的,这个时候可以通过观察这个status来检查设备是否有中断到来.
if ((status & IntLatch) == 0) // 有中断需要处理,但是可能已经被其他中断处理函数处理了
goto handler_exit; /* No interrupt: shared IRQs cause this */
handled = 1;

if (status & IntReq) { // 中断请求
status |= vp->deferred;
vp->deferred = 0;
}

if (status == 0xffff) /* h/w no longer present (hotplug)? */
goto handler_exit;

if (vortex_debug > 4)
pr_debug("%s: interrupt, status %4.4x, latency %d ticks.\n",
dev->name, status, ioread8(ioaddr + Timer));

spin_lock(&vp->window_lock);
window_set(vp, 7);

do {
if (vortex_debug > 5)
pr_debug("%s: In interrupt loop, status %4.4x.\n",
dev->name, status);
if (status & RxComplete) // 中断表示接收完成的时候调用 vortex_rx
vortex_rx(dev);

if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug(" TX room bit was handled.\n");
/* There's room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

if (status & DMADone) {// 表示发送完成可以清除sk_buff了
if (ioread16(ioaddr + Wn7_MasterStatus) & 0x1000) {
iowrite16(0x1000, ioaddr + Wn7_MasterStatus); /* Ack the event. */
pci_unmap_single(VORTEX_PCI(vp), vp->tx_skb_dma, (vp->tx_skb->len + 3) & ~3, PCI_DMA_TODEVICE);
pkts_compl++;
bytes_compl += vp->tx_skb->len;
dev_kfree_skb_irq(vp->tx_skb); /* Release the transferred buffer */
if (ioread16(ioaddr + TxFree) > 1536) {
/*
* AKPM: FIXME: I don't think we need this. If the queue was stopped due to
* insufficient FIFO room, the TxAvailable test will succeed and call
* netif_wake_queue()
*/
netif_wake_queue(dev);
} else { /* Interrupt when FIFO has room for max-sized packet. */
iowrite16(SetTxThreshold + (1536>>2), ioaddr + EL3_CMD);
netif_stop_queue(dev);
}
}
}
/* Check for all uncommon interrupts at once. */
if (status & (HostError | RxEarly | StatsFull | TxComplete | IntReq)) {
if (status == 0xffff)
break;
if (status & RxEarly)
vortex_rx(dev);
spin_unlock(&vp->window_lock);
vortex_error(dev, status);
spin_lock(&vp->window_lock);
window_set(vp, 7);
}

if (--work_done < 0) { // 最多处理work_done个frame
pr_warn("%s: Too much work in interrupt, status %4.4x\n",
dev->name, status);
/* Disable all pending interrupts. */
do {
vp->deferred |= status; // 把当前状态保存起来等下次中断的时候处理
iowrite16(SetStatusEnb | (~vp->deferred & vp->status_enable),
ioaddr + EL3_CMD);
iowrite16(AckIntr | (vp->deferred & 0x7ff), ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_CMD)) & IntLatch); // 把中断清掉?
/* The timer will reenable interrupts. */
mod_timer(&vp->timer, jiffies + 1*HZ);
break;
}
/* Acknowledge the IRQ. */
iowrite16(AckIntr | IntReq | IntLatch, ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_STATUS)) & (IntLatch | RxComplete));// 当有pending的中断并且是接收完成的状态

netdev_completed_queue(dev, pkts_compl, bytes_compl);
spin_unlock(&vp->window_lock);

if (vortex_debug > 4)
pr_debug("%s: exiting interrupt, status %4.4x.\n",
dev->name, status);
handler_exit:
spin_unlock(&vp->lock);
return IRQ_RETVAL(handled);
}

当中断到来的时候检查RxComplete,如果这个状态置位了说明有frame可以处理,当状态有TxAvailable的时候表示网卡的缓冲有空可以尝试发送frame,这样的检查会循环很多次,直到出现错误或者超过规定的循环次数max_interrupt_work,这个值对于这个设备来说是32,也就是说最多接受32帧就会退出中断处理。

frame的接收

vortex_rx最终会调用netif_rx这个函数很关键,vortex_rx会通过netdev_alloc_skb分配一个sk_buff,这个是一个会贯穿整个协议栈的一个buff,然后从设备中拷贝frame,拷贝的方式也有很多比如直接读取或者利用DMA。
从DMA读取到内存当中比如截取的这段代码

1
2
3
4
5
6
7
8
9
10
// 转换成总线地址给DMA设备,启动DMA传输,然后循环检查传输状态
// 最后取消总线地址的映射.
dma_addr_t dma = pci_map_single(VORTEX_PCI(vp), skb_put(skb, pkt_len),
pkt_len, PCI_DMA_FROMDEVICE);
iowrite32(dma, ioaddr + Wn7_MasterAddr);
iowrite16((skb->len + 3) & ~3, ioaddr + Wn7_MasterLen);
iowrite16(StartDMAUp, ioaddr + EL3_CMD);
while (ioread16(ioaddr + Wn7_MasterStatus) & 0x8000)
;
pci_unmap_single(VORTEX_PCI(vp), dma, pkt_len, PCI_DMA_FROMDEVICE);

也有不饶过CPU直接读取的

1
2
3
ioread32_rep(ioaddr + RX_FIFO,
skb_put(skb, pkt_len),
(pkt_len + 3) >> 2);

接着调用eth_type_trans确定对应的protocol,并且最终调用netif_rx来继续处理接收任务。

到了netif_rx就是一个通用流程了,netif_rx调用了netif_rx_internal,这个函数会先通过net_timestamp_check检查sk_buff的时间戳,并且设置,然后调用enqueue_to_backlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
* enqueue_to_backlog is called to queue an skb to a per CPU backlog
* queue (may be a remote CPU queue).
*/
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;

sd = &per_cpu(softnet_data, cpu); // 获取per-cpu softnet_data

local_irq_save(flags);

rps_lock(sd);
if (!netif_running(skb->dev)) // 如果设备已经没有运行的直接丢frame
goto drop;
qlen = skb_queue_len(&sd->input_pkt_queue); // 获取softnet_data的&sk_buff的队列长度
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) { // 如果没有超过最大长度并且没有被限制
if (qlen) {
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags); // 加入队列并且开中断
return NET_RX_SUCCESS;
}
// 如果队列为空可以尝试调度 backlog device,
// 再把frame加入到队列当中。
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
____napi_schedule(sd, &sd->backlog);
}
goto enqueue;
}

drop:
sd->dropped++;
rps_unlock(sd);

local_irq_restore(flags);

atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}

进入到____napi_schedule则很简单了,就是唤起把backlog这个softnet_data上的device加入到poll_list当中然后唤起softirq来处理。

1
2
3
4
5
6
7
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

这里要提的是,kernel对于frame的处理有一个比较新的API,称之为NAPI。

对于NAPI来说, net_rx_action结合了轮询和中断, 当设备有中断表示收到frame的时候会把它加入softnet_datapoll_list, 这个时候设备不再发出中断,然后通过轮询这些设备是否有剩余frame处理,并且限制最大处理数量,这样能保证cpu不会承受过大的中断load的压力,并且可以使得设备之间能够相对公平获得处理的机会,而不是中断更频繁的设备获得的机会更多。

对于使用NAPI的设备来说显然需要提供poll接口来供查询之用,同时用NET_RX_SOFTIRQ触发软中断执行net_rx_action这个软中断处理函数,而不需要netif_rx这个接口。

NET_RX_SOFTIRQ做的事情就是把设备加入poll列表并且触发softirq. 对于没有使用NAPI的设备来说,会使用per-cpu 结构softnet_data中的backlog_dev(是一个假的胶水层的封装)来替代放入对应的poll列表中,然后再进入netif_rx_schedule的例程来处理。也就是说NAPI-unaware的设备用的是backlog_dev而NAPI-aware的设备用的是自己的device结构体。

相反的把设备从轮询列表中移除依靠的是netif_rx_complete. 这样轮询检查的时候就不会处理对应的设备了.

这里说了这么久的轮询不要误会frame的主力是轮询,显然这是不行的,因为网络数据要求接受很快,还是中断驱动的,只不过为了物尽其用,既然你有数据帧还在就不要中断告诉我,我继续处理就可以了。

现在看一下加入到poll_list之后,NET_RX_SOFTIRQ软中断是如何工作的。关于软中断的实现可以参考参考列表中的第二个链接,我本来想自己总结一下,但是发现这篇文章确实总结的很好,所以保存一下就好了。

补充一点,softnet_data->input_pkt_queue是frame的缓存队列,这个队列有一个最大值netdev_max_backlog,目前这个值是1000,也就是每个CPU最多有1000个frame没处理。对于有自己device的设备,frame需要通过设备的poll方法来获取。

net_rx_action对应的是软中断NET_RX_SOFTIRQ的处理函数。之前说过net_rx_action的工作方式,这里看一下具体的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget; // 一个budget用于限制运行时间,目前是300
LIST_HEAD(list);
LIST_HEAD(repoll);

local_irq_disable();
list_splice_init(&sd->poll_list, &list); // 把poll_list合并到list上并且把poll_list清空
local_irq_enable();

for (;;) {
struct napi_struct *n;

if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
return;
break;
}

n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll); // 每调用一次会减掉相应的budget,如果还有work没完成还需要poll就会加入到repoll链表里。

/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 || // 如果budge耗尽或者超过了两个jiffies就会停止
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}

__kfree_skb_flush();
local_irq_disable();

list_splice_tail_init(&sd->poll_list, &list); // 再把poll_list合并到list当中,并且清空poll_list。
list_splice_tail(&repoll, &list); // 然后把repoll合并到list当中。
list_splice(&list, &sd->poll_list); // 再把list合并到poll_list当中。
if (!list_empty(&sd->poll_list)) // 这几步的过程就是把需要repoll的设备和当前的poll_list合并
__raise_softirq_irqoff(NET_RX_SOFTIRQ); // 如果还有需要poll的设备就再触发软中断.

net_rps_action_and_irq_enable(sd);
}

整个过程就是检查poll_list并且轮询调用poll方法。接下来具体看一下poll方法的相关内容。以非NAPI设备为例,使用的是backlogpoll方法,对应的方法可以在net/core/dev.c当中找到,在初始化函数net_dev_init当中sd->backlog.poll = process_backlog;给每个per-cpu结构的softnet_data都是指向了这个poll函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);

/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}

napi->weight = weight_p; // 这个值目前是32
local_irq_disable();
while (1) {
struct sk_buff *skb;

while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
local_irq_enable();
__netif_receive_skb(skb); // 取出的skb交给__netif_receive_skb处理
rcu_read_unlock();
local_irq_disable();
input_queue_head_incr(sd);
if (++work >= quota) {
local_irq_enable();
return work;
}
}

rps_lock(sd);
if (skb_queue_empty(&sd->input_pkt_queue)) {
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
napi->state = 0;
rps_unlock(sd);

break;
}

skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
rps_unlock(sd);
}
local_irq_enable();

return work;
}

这个函数的主体就是把&sd->input_pkt_queue交给&sd->process_queue然后从&sd->process_queue当中取出sk_buff,再调用__netif_receive_skb,进一步处理.我想老是把&sd->input_pkt_queue拷贝并且清空应该是因为希望不因为锁独占这个队列太久的原因.

__netif_receive_skb的内容主要是根据skb->protocol,遍历&skb->dev->ptype_all然后将frame交给L3的ptype->func处理,还有一些在这一层需要处理的特性,比如bridging.

目前说描述的东西就是整个传输路径的这部分,完整的图在这里

frame的发送

在frame的发送路径,主要包含几个任务,开关设备的发送功能,调度设备进行发送,选择在设备发送队列的frame进行发送,还要传输过程的本身.而且发送的例程也是类似接收的过程,有对应的softirq(NET_TX_SOFTIRQ)和对应的handler,net_tx_action。和poll_list对应的是output_queue,也是一个等待发送的设备列表。
__LINK_STATE_START__LINK_ STATE_XOFF__LINK_STATE_RX_SCHED__LINK_STATE_SCHED也是对应的.回顾开头中断的代码

1
2
3
4
5
6
7
if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug(" TX room bit was handled.\n");
/* There's room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

就是说当状态可用的时候,尝试调用netif_wake_queue来触发frame的发送。

首先来看一下如何选择发送的设备。

1
2
3
4
5
6
7
8
9
10
11
/**
* netif_wake_queue - restart transmit
* @dev: network device
*
* Allow upper layers to call the device hard_start_xmit routine.
* Used for flow control when transmit resources are available.
*/
static inline void netif_wake_queue(struct net_device *dev)
{
netif_tx_wake_queue(netdev_get_tx_queue(dev, 0));
}

这个函数内部调用了

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void __netif_reschedule(struct Qdisc *q)
{
struct softnet_data *sd;
unsigned long flags;

local_irq_save(flags);
sd = this_cpu_ptr(&softnet_data);
q->next_sched = NULL;
*sd->output_queue_tailp = q;
sd->output_queue_tailp = &q->next_sched;
raise_softirq_irqoff(NET_TX_SOFTIRQ);
local_irq_restore(flags);
}

主体就是把设备加入output_queue(通过->next_sched连接)然后唤起软中断NET_TX_SOFTIRQ

再来看看软中断的主体。
首先是遍历completion_queue来释放sk_buff这个连接是通过dev_kfree_skb_irq添加的,和正常的dev_kfree_skb不同,它是把sk_buff加入到释放列表中就快速返回了, 然后就会遍历设备列表寻找可运行的设备,调用qdisk_run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 void net_tx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);

if (sd->completion_queue) { // 检查completion_queue逐一释放sk_buff, 他们是通过dev_kfree_skb_irq,因为他不真的释放sk_buff,而是把sk_buff移到completio_queue当中.
struct sk_buff *clist;

local_irq_disable();
clist = sd->completion_queue;
sd->completion_queue = NULL;
local_irq_enable();

while (clist) {
struct sk_buff *skb = clist;
clist = clist->next;

WARN_ON(atomic_read(&skb->users));
if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
trace_consume_skb(skb);
else
trace_kfree_skb(skb, net_tx_action);

if (skb->fclone != SKB_FCLONE_UNAVAILABLE)
__kfree_skb(skb);
else
__kfree_skb_defer(skb);
}

__kfree_skb_flush();
}

if (sd->output_queue) {
struct Qdisc *head;

local_irq_disable();
head = sd->output_queue; // 拷贝并首尾相连.
sd->output_queue = NULL;
sd->output_queue_tailp = &sd->output_queue;
local_irq_enable();

while (head) {
struct Qdisc *q = head;
spinlock_t *root_lock;

head = head->next_sched;

root_lock = qdisc_lock(q);
if (spin_trylock(root_lock)) {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
qdisc_run(q); // qdisk_run
spin_unlock(root_lock);
} else {
if (!test_bit(__QDISC_STATE_DEACTIVATED,
&q->state)) {
__netif_reschedule(q);
} else {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
}
}
}
}
}

qdisc_run首先检查设备的运行状态然后调用__qdisc_run,这个函数的主体就是调用qdisc_restart,直到超过限制或者需要让出时间CPU了,最后清空qdisc的RUNNING状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void __qdisc_run(struct Qdisc *q)
{
int quota = weight_p;
int packets;

while (qdisc_restart(q, &packets)) {
/*
* Ordered by possible occurrence: Postpone processing if
* 1. we've exceeded packet quota
* 2. another process needs the CPU;
*/
quota -= packets;
if (quota <= 0 || need_resched()) {
__netif_schedule(q);
break;
}
}

qdisc_run_end(q);
}

qdisc_restart的主体是从队列中取出sk_buff,然后调用sch_direct_xmit,它的功能是调用dev_hard_start_xmit来运行设备驱动的指定的传输方法hard_start_xmit,如果没有成功则把sk_buff重新加入到出队当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
* Transmit possibly several skbs, and handle the return status as
* required. Holding the __QDISC___STATE_RUNNING bit guarantees that
* only one CPU can execute this function.
*
* Returns to the caller:
* 0 - queue is empty or throttled.
* >0 - queue is not empty.
*/
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock, bool validate)
{
int ret = NETDEV_TX_BUSY;

/* And release qdisc */
spin_unlock(root_lock);

/* Note that we validate skb (GSO, checksum, ...) outside of locks */
if (validate)
skb = validate_xmit_skb_list(skb, dev);

if (likely(skb)) {
HARD_TX_LOCK(dev, txq, smp_processor_id());
if (!netif_xmit_frozen_or_stopped(txq))
skb = dev_hard_start_xmit(skb, dev, txq, &ret);

HARD_TX_UNLOCK(dev, txq);
} else {
spin_lock(root_lock);
return qdisc_qlen(q);
}
spin_lock(root_lock);

if (dev_xmit_complete(ret)) {
/* Driver sent out skb successfully or skb was consumed */
ret = qdisc_qlen(q);
} else {
/* Driver returned NETDEV_TX_BUSY - requeue skb */
if (unlikely(ret != NETDEV_TX_BUSY))
net_warn_ratelimited("BUG %s code %d qlen %d\n",
dev->name, ret, q->q.qlen);

ret = dev_requeue_skb(skb, q);
}

if (ret && netif_xmit_frozen_or_stopped(txq))
ret = 0;

return ret;
}

至此整个L2的发送和接收就大致有了一个了解,并且能够理解整个的工作过程,这里有一点没有讲到的是L2的转发机制,比如STP等协议和实现,可能会在接下来的文章中继续剖析.

参考:

  1. Linux 下DMA浅析
  2. linux kernel的中断子系统之(八):softirq
  3. Linux Network Internals