ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

SSA概述

SSA在Go1.7中被引入,这个特性对编译器的性能有很大的提高,但是也导致编译过程有些减速。下面来结合网上的资粮和书籍,简单说明一下SSA以及SSA的应用。

SSA 代表 static single-assignment,是一种IR(中间表示代码),要保证每个变量只被赋值一次。这个能帮助简化编译器的优化算法。

1
2
3
y := 1
y := 2
x := y

比如上面这段代码,y = 1其实是不可用的,这个要通过定义的可达分析来确定y是要用1还是2,而SSA有一个标识符可以称之为版本或者“代“。

1
2
3
y1 := 1
y2 := 2
x1 := y2

这样就没有任何间接值了。用SSA表示的好处是对于同一个变量的无关使用表示成不同“代”,可以方便很多编译器的优化算法的实现。

一个概念:

Φ(读作fai) 函数,表示要根据控制流赋值的“代”。

例子可以参考维基里的这一段

三个定义:

A dominate B,如果从起点开始必须通过A到达B。也就是说A是到B的必经之路。

A strictly dominate B,如果 A dominate B,并且A和B不相等。

A 的 dominance frontier 含有B,如果A没有strictly dominate B,但是 dominate 了B的一个前驱节点。

用遍历的方式确定 dominance frontier 的伪代码。

1
2
3
4
5
6
7
for each node b
if the number of immediate predecessors of b ≥ 2
for each p in immediate predecessors of b
runner := p
while runner ≠ idom(b)
add b to runner’s dominance frontier set
runner := idom(runner)

idom(b) 代表相邻的strictly dominate b的结点。这样的点只有一个,因为相邻的点有两个的话就不会是必经之路了。

如图是一个例子,2的前驱是1和7,7没有sd(strictly dominate) 2,所以把2加入到7的DF(dominate frontiers)当中。3是7的相邻sd,然后3不是2的sd所以2加入到3的DF当中,接着2是3相邻的sd,然后2不是2的sd所以把2加入到2的DF中,最后遍历到7,5和6不是7的sd所以把7加入到5和6的df当中。

df(A)可以认为是可以通过A到达的,但不是必经之路的点的集合。

有了这个定义以后就可以插入Φ函数了和重命名了。如果X中有定义a那么所有df(X)都需要a的Φ函数。并且Φ函数本身也是一个定义。

比如还是同一个例子。

1当中有j的定义,但是df(1)是空的,5当中有j的定义,并且5的df有7所以7当中要插入一个φ(j, j)。j现在在7中定义了(通过φ函数)所以df(7)中的2也要有φ(j, j),6也有j的定义但是7已经有了φ函数了,2的df有2,但是2已经有φ函数了。类似的方式可以应用到i和k。接着对定义重命名就可以完成SSA的转换了。

SSA的应用

上面只是通俗的解释了一下SSA,没有给出更多详细的理论和算法以及证明,因为证明实在难看懂,下面说一下SSA的应用。

DEAD CODE 消除

因为每个变量都有“代”(因为大家都只被赋值一次),所以很容易检查出没有被使用的变量并且删除对应的定义。另外如果删除v=x这样的定义还要在x的use表中把这条语句删除。

简单的常量扩展

比如v = φ(c1,c2,...,cn)这种格式,如果c都是相等的可以直接替换成c,或者v=c,如果c是常量的话也可以直接替换掉。在做这些的同时也可以做其他的优化,能够在一趟遍历中都完成,比如copy propagationx=y或者x=φ(y)都可以直接用y替换掉x。比如constant foldingx=a+b如果a+b是常量的话,可以直接用常量赋值。

当然还有其他的优化算法,出发点都是基于SSA的简单性。

从SSA转换回原始代码

y = φ(x1, x2, x3)这样的形式要根据条件分支再拆分回原来的形态,比如满足1条件,使用y=x1这样的形式。并且可能会很自然的想把x1和x2变回使用同一个寄存器,但是通过优化过程中的一些手段(copy propagation)已经把大部分move指令给优化掉了,并且重新推回x可能会有生命周期的影响,所以还是保留了“代”。

总结

其实这个就是把def-use换成了use-def方便做代码优化呀:)害我看那么久

参考文献

  1. 虎书第十九章
  2. 维基百科

本篇文章主要分析一下经过纯物理层之后从设备读取的frame是如何进入协议栈的和对应封装好的frame是如何离开协议栈的,多亏协议栈的分层思路,每一层之间都可以独立分析和阅读,对于驱动来说,首先要面对的就是中断了, 本文基于4.7.2的内核简单的过了一遍.

中断处理

中断处理是分析链路层数据传输的入口。
例如在drivers/net/ethernet/3com/3c59x.c这个是一个华三的设备驱动, 观察这个驱动的中断处理函数可以知道,frame的接收的入口是vortex_rx,而当设备有足够的空间发送frame的时候就会调用netif_wake_queue来触发发送frame的例程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* This is the ISR for the vortex series chips.
* full_bus_master_tx == 0 && full_bus_master_rx == 0
*/

static irqreturn_t
vortex_interrupt(int irq, void *dev_id)
{
struct net_device *dev = dev_id;
struct vortex_private *vp = netdev_priv(dev);
void __iomem *ioaddr;
int status;
int work_done = max_interrupt_work;
int handled = 0;
unsigned int bytes_compl = 0, pkts_compl = 0;

ioaddr = vp->ioaddr;
spin_lock(&vp->lock);

status = ioread16(ioaddr + EL3_STATUS); // 从ioremap的地址当中读取网卡的状态,这个是设备规定的地址

if (vortex_debug > 6)
pr_debug("vortex_interrupt(). status=0x%4x\n", status);

// 在中断处理的时候,设备是关中断的,这个时候可以通过观察这个status来检查设备是否有中断到来.
if ((status & IntLatch) == 0) // 有中断需要处理,但是可能已经被其他中断处理函数处理了
goto handler_exit; /* No interrupt: shared IRQs cause this */
handled = 1;

if (status & IntReq) { // 中断请求
status |= vp->deferred;
vp->deferred = 0;
}

if (status == 0xffff) /* h/w no longer present (hotplug)? */
goto handler_exit;

if (vortex_debug > 4)
pr_debug("%s: interrupt, status %4.4x, latency %d ticks.\n",
dev->name, status, ioread8(ioaddr + Timer));

spin_lock(&vp->window_lock);
window_set(vp, 7);

do {
if (vortex_debug > 5)
pr_debug("%s: In interrupt loop, status %4.4x.\n",
dev->name, status);
if (status & RxComplete) // 中断表示接收完成的时候调用 vortex_rx
vortex_rx(dev);

if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug(" TX room bit was handled.\n");
/* There's room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

if (status & DMADone) {// 表示发送完成可以清除sk_buff了
if (ioread16(ioaddr + Wn7_MasterStatus) & 0x1000) {
iowrite16(0x1000, ioaddr + Wn7_MasterStatus); /* Ack the event. */
pci_unmap_single(VORTEX_PCI(vp), vp->tx_skb_dma, (vp->tx_skb->len + 3) & ~3, PCI_DMA_TODEVICE);
pkts_compl++;
bytes_compl += vp->tx_skb->len;
dev_kfree_skb_irq(vp->tx_skb); /* Release the transferred buffer */
if (ioread16(ioaddr + TxFree) > 1536) {
/*
* AKPM: FIXME: I don't think we need this. If the queue was stopped due to
* insufficient FIFO room, the TxAvailable test will succeed and call
* netif_wake_queue()
*/
netif_wake_queue(dev);
} else { /* Interrupt when FIFO has room for max-sized packet. */
iowrite16(SetTxThreshold + (1536>>2), ioaddr + EL3_CMD);
netif_stop_queue(dev);
}
}
}
/* Check for all uncommon interrupts at once. */
if (status & (HostError | RxEarly | StatsFull | TxComplete | IntReq)) {
if (status == 0xffff)
break;
if (status & RxEarly)
vortex_rx(dev);
spin_unlock(&vp->window_lock);
vortex_error(dev, status);
spin_lock(&vp->window_lock);
window_set(vp, 7);
}

if (--work_done < 0) { // 最多处理work_done个frame
pr_warn("%s: Too much work in interrupt, status %4.4x\n",
dev->name, status);
/* Disable all pending interrupts. */
do {
vp->deferred |= status; // 把当前状态保存起来等下次中断的时候处理
iowrite16(SetStatusEnb | (~vp->deferred & vp->status_enable),
ioaddr + EL3_CMD);
iowrite16(AckIntr | (vp->deferred & 0x7ff), ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_CMD)) & IntLatch); // 把中断清掉?
/* The timer will reenable interrupts. */
mod_timer(&vp->timer, jiffies + 1*HZ);
break;
}
/* Acknowledge the IRQ. */
iowrite16(AckIntr | IntReq | IntLatch, ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_STATUS)) & (IntLatch | RxComplete));// 当有pending的中断并且是接收完成的状态

netdev_completed_queue(dev, pkts_compl, bytes_compl);
spin_unlock(&vp->window_lock);

if (vortex_debug > 4)
pr_debug("%s: exiting interrupt, status %4.4x.\n",
dev->name, status);
handler_exit:
spin_unlock(&vp->lock);
return IRQ_RETVAL(handled);
}

当中断到来的时候检查RxComplete,如果这个状态置位了说明有frame可以处理,当状态有TxAvailable的时候表示网卡的缓冲有空可以尝试发送frame,这样的检查会循环很多次,直到出现错误或者超过规定的循环次数max_interrupt_work,这个值对于这个设备来说是32,也就是说最多接受32帧就会退出中断处理。

frame的接收

vortex_rx最终会调用netif_rx这个函数很关键,vortex_rx会通过netdev_alloc_skb分配一个sk_buff,这个是一个会贯穿整个协议栈的一个buff,然后从设备中拷贝frame,拷贝的方式也有很多比如直接读取或者利用DMA。
从DMA读取到内存当中比如截取的这段代码

1
2
3
4
5
6
7
8
9
10
// 转换成总线地址给DMA设备,启动DMA传输,然后循环检查传输状态
// 最后取消总线地址的映射.
dma_addr_t dma = pci_map_single(VORTEX_PCI(vp), skb_put(skb, pkt_len),
pkt_len, PCI_DMA_FROMDEVICE);
iowrite32(dma, ioaddr + Wn7_MasterAddr);
iowrite16((skb->len + 3) & ~3, ioaddr + Wn7_MasterLen);
iowrite16(StartDMAUp, ioaddr + EL3_CMD);
while (ioread16(ioaddr + Wn7_MasterStatus) & 0x8000)
;
pci_unmap_single(VORTEX_PCI(vp), dma, pkt_len, PCI_DMA_FROMDEVICE);

也有不饶过CPU直接读取的

1
2
3
ioread32_rep(ioaddr + RX_FIFO,
skb_put(skb, pkt_len),
(pkt_len + 3) >> 2);

接着调用eth_type_trans确定对应的protocol,并且最终调用netif_rx来继续处理接收任务。

到了netif_rx就是一个通用流程了,netif_rx调用了netif_rx_internal,这个函数会先通过net_timestamp_check检查sk_buff的时间戳,并且设置,然后调用enqueue_to_backlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
* enqueue_to_backlog is called to queue an skb to a per CPU backlog
* queue (may be a remote CPU queue).
*/
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;

sd = &per_cpu(softnet_data, cpu); // 获取per-cpu softnet_data

local_irq_save(flags);

rps_lock(sd);
if (!netif_running(skb->dev)) // 如果设备已经没有运行的直接丢frame
goto drop;
qlen = skb_queue_len(&sd->input_pkt_queue); // 获取softnet_data的&sk_buff的队列长度
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) { // 如果没有超过最大长度并且没有被限制
if (qlen) {
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags); // 加入队列并且开中断
return NET_RX_SUCCESS;
}
// 如果队列为空可以尝试调度 backlog device,
// 再把frame加入到队列当中。
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
____napi_schedule(sd, &sd->backlog);
}
goto enqueue;
}

drop:
sd->dropped++;
rps_unlock(sd);

local_irq_restore(flags);

atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}

进入到____napi_schedule则很简单了,就是唤起把backlog这个softnet_data上的device加入到poll_list当中然后唤起softirq来处理。

1
2
3
4
5
6
7
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

这里要提的是,kernel对于frame的处理有一个比较新的API,称之为NAPI。

对于NAPI来说, net_rx_action结合了轮询和中断, 当设备有中断表示收到frame的时候会把它加入softnet_datapoll_list, 这个时候设备不再发出中断,然后通过轮询这些设备是否有剩余frame处理,并且限制最大处理数量,这样能保证cpu不会承受过大的中断load的压力,并且可以使得设备之间能够相对公平获得处理的机会,而不是中断更频繁的设备获得的机会更多。

对于使用NAPI的设备来说显然需要提供poll接口来供查询之用,同时用NET_RX_SOFTIRQ触发软中断执行net_rx_action这个软中断处理函数,而不需要netif_rx这个接口。

NET_RX_SOFTIRQ做的事情就是把设备加入poll列表并且触发softirq. 对于没有使用NAPI的设备来说,会使用per-cpu 结构softnet_data中的backlog_dev(是一个假的胶水层的封装)来替代放入对应的poll列表中,然后再进入netif_rx_schedule的例程来处理。也就是说NAPI-unaware的设备用的是backlog_dev而NAPI-aware的设备用的是自己的device结构体。

相反的把设备从轮询列表中移除依靠的是netif_rx_complete. 这样轮询检查的时候就不会处理对应的设备了.

这里说了这么久的轮询不要误会frame的主力是轮询,显然这是不行的,因为网络数据要求接受很快,还是中断驱动的,只不过为了物尽其用,既然你有数据帧还在就不要中断告诉我,我继续处理就可以了。

现在看一下加入到poll_list之后,NET_RX_SOFTIRQ软中断是如何工作的。关于软中断的实现可以参考参考列表中的第二个链接,我本来想自己总结一下,但是发现这篇文章确实总结的很好,所以保存一下就好了。

补充一点,softnet_data->input_pkt_queue是frame的缓存队列,这个队列有一个最大值netdev_max_backlog,目前这个值是1000,也就是每个CPU最多有1000个frame没处理。对于有自己device的设备,frame需要通过设备的poll方法来获取。

net_rx_action对应的是软中断NET_RX_SOFTIRQ的处理函数。之前说过net_rx_action的工作方式,这里看一下具体的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget; // 一个budget用于限制运行时间,目前是300
LIST_HEAD(list);
LIST_HEAD(repoll);

local_irq_disable();
list_splice_init(&sd->poll_list, &list); // 把poll_list合并到list上并且把poll_list清空
local_irq_enable();

for (;;) {
struct napi_struct *n;

if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
return;
break;
}

n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll); // 每调用一次会减掉相应的budget,如果还有work没完成还需要poll就会加入到repoll链表里。

/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 || // 如果budge耗尽或者超过了两个jiffies就会停止
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}

__kfree_skb_flush();
local_irq_disable();

list_splice_tail_init(&sd->poll_list, &list); // 再把poll_list合并到list当中,并且清空poll_list。
list_splice_tail(&repoll, &list); // 然后把repoll合并到list当中。
list_splice(&list, &sd->poll_list); // 再把list合并到poll_list当中。
if (!list_empty(&sd->poll_list)) // 这几步的过程就是把需要repoll的设备和当前的poll_list合并
__raise_softirq_irqoff(NET_RX_SOFTIRQ); // 如果还有需要poll的设备就再触发软中断.

net_rps_action_and_irq_enable(sd);
}

整个过程就是检查poll_list并且轮询调用poll方法。接下来具体看一下poll方法的相关内容。以非NAPI设备为例,使用的是backlogpoll方法,对应的方法可以在net/core/dev.c当中找到,在初始化函数net_dev_init当中sd->backlog.poll = process_backlog;给每个per-cpu结构的softnet_data都是指向了这个poll函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);

/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}

napi->weight = weight_p; // 这个值目前是32
local_irq_disable();
while (1) {
struct sk_buff *skb;

while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
local_irq_enable();
__netif_receive_skb(skb); // 取出的skb交给__netif_receive_skb处理
rcu_read_unlock();
local_irq_disable();
input_queue_head_incr(sd);
if (++work >= quota) {
local_irq_enable();
return work;
}
}

rps_lock(sd);
if (skb_queue_empty(&sd->input_pkt_queue)) {
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
napi->state = 0;
rps_unlock(sd);

break;
}

skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
rps_unlock(sd);
}
local_irq_enable();

return work;
}

这个函数的主体就是把&sd->input_pkt_queue交给&sd->process_queue然后从&sd->process_queue当中取出sk_buff,再调用__netif_receive_skb,进一步处理.我想老是把&sd->input_pkt_queue拷贝并且清空应该是因为希望不因为锁独占这个队列太久的原因.

__netif_receive_skb的内容主要是根据skb->protocol,遍历&skb->dev->ptype_all然后将frame交给L3的ptype->func处理,还有一些在这一层需要处理的特性,比如bridging.

目前说描述的东西就是整个传输路径的这部分,完整的图在这里

frame的发送

在frame的发送路径,主要包含几个任务,开关设备的发送功能,调度设备进行发送,选择在设备发送队列的frame进行发送,还要传输过程的本身.而且发送的例程也是类似接收的过程,有对应的softirq(NET_TX_SOFTIRQ)和对应的handler,net_tx_action。和poll_list对应的是output_queue,也是一个等待发送的设备列表。
__LINK_STATE_START__LINK_ STATE_XOFF__LINK_STATE_RX_SCHED__LINK_STATE_SCHED也是对应的.回顾开头中断的代码

1
2
3
4
5
6
7
if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug(" TX room bit was handled.\n");
/* There's room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

就是说当状态可用的时候,尝试调用netif_wake_queue来触发frame的发送。

首先来看一下如何选择发送的设备。

1
2
3
4
5
6
7
8
9
10
11
/**
* netif_wake_queue - restart transmit
* @dev: network device
*
* Allow upper layers to call the device hard_start_xmit routine.
* Used for flow control when transmit resources are available.
*/
static inline void netif_wake_queue(struct net_device *dev)
{
netif_tx_wake_queue(netdev_get_tx_queue(dev, 0));
}

这个函数内部调用了

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void __netif_reschedule(struct Qdisc *q)
{
struct softnet_data *sd;
unsigned long flags;

local_irq_save(flags);
sd = this_cpu_ptr(&softnet_data);
q->next_sched = NULL;
*sd->output_queue_tailp = q;
sd->output_queue_tailp = &q->next_sched;
raise_softirq_irqoff(NET_TX_SOFTIRQ);
local_irq_restore(flags);
}

主体就是把设备加入output_queue(通过->next_sched连接)然后唤起软中断NET_TX_SOFTIRQ

再来看看软中断的主体。
首先是遍历completion_queue来释放sk_buff这个连接是通过dev_kfree_skb_irq添加的,和正常的dev_kfree_skb不同,它是把sk_buff加入到释放列表中就快速返回了, 然后就会遍历设备列表寻找可运行的设备,调用qdisk_run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 void net_tx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);

if (sd->completion_queue) { // 检查completion_queue逐一释放sk_buff, 他们是通过dev_kfree_skb_irq,因为他不真的释放sk_buff,而是把sk_buff移到completio_queue当中.
struct sk_buff *clist;

local_irq_disable();
clist = sd->completion_queue;
sd->completion_queue = NULL;
local_irq_enable();

while (clist) {
struct sk_buff *skb = clist;
clist = clist->next;

WARN_ON(atomic_read(&skb->users));
if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
trace_consume_skb(skb);
else
trace_kfree_skb(skb, net_tx_action);

if (skb->fclone != SKB_FCLONE_UNAVAILABLE)
__kfree_skb(skb);
else
__kfree_skb_defer(skb);
}

__kfree_skb_flush();
}

if (sd->output_queue) {
struct Qdisc *head;

local_irq_disable();
head = sd->output_queue; // 拷贝并首尾相连.
sd->output_queue = NULL;
sd->output_queue_tailp = &sd->output_queue;
local_irq_enable();

while (head) {
struct Qdisc *q = head;
spinlock_t *root_lock;

head = head->next_sched;

root_lock = qdisc_lock(q);
if (spin_trylock(root_lock)) {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
qdisc_run(q); // qdisk_run
spin_unlock(root_lock);
} else {
if (!test_bit(__QDISC_STATE_DEACTIVATED,
&q->state)) {
__netif_reschedule(q);
} else {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
}
}
}
}
}

qdisc_run首先检查设备的运行状态然后调用__qdisc_run,这个函数的主体就是调用qdisc_restart,直到超过限制或者需要让出时间CPU了,最后清空qdisc的RUNNING状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void __qdisc_run(struct Qdisc *q)
{
int quota = weight_p;
int packets;

while (qdisc_restart(q, &packets)) {
/*
* Ordered by possible occurrence: Postpone processing if
* 1. we've exceeded packet quota
* 2. another process needs the CPU;
*/
quota -= packets;
if (quota <= 0 || need_resched()) {
__netif_schedule(q);
break;
}
}

qdisc_run_end(q);
}

qdisc_restart的主体是从队列中取出sk_buff,然后调用sch_direct_xmit,它的功能是调用dev_hard_start_xmit来运行设备驱动的指定的传输方法hard_start_xmit,如果没有成功则把sk_buff重新加入到出队当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
* Transmit possibly several skbs, and handle the return status as
* required. Holding the __QDISC___STATE_RUNNING bit guarantees that
* only one CPU can execute this function.
*
* Returns to the caller:
* 0 - queue is empty or throttled.
* >0 - queue is not empty.
*/
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock, bool validate)
{
int ret = NETDEV_TX_BUSY;

/* And release qdisc */
spin_unlock(root_lock);

/* Note that we validate skb (GSO, checksum, ...) outside of locks */
if (validate)
skb = validate_xmit_skb_list(skb, dev);

if (likely(skb)) {
HARD_TX_LOCK(dev, txq, smp_processor_id());
if (!netif_xmit_frozen_or_stopped(txq))
skb = dev_hard_start_xmit(skb, dev, txq, &ret);

HARD_TX_UNLOCK(dev, txq);
} else {
spin_lock(root_lock);
return qdisc_qlen(q);
}
spin_lock(root_lock);

if (dev_xmit_complete(ret)) {
/* Driver sent out skb successfully or skb was consumed */
ret = qdisc_qlen(q);
} else {
/* Driver returned NETDEV_TX_BUSY - requeue skb */
if (unlikely(ret != NETDEV_TX_BUSY))
net_warn_ratelimited("BUG %s code %d qlen %d\n",
dev->name, ret, q->q.qlen);

ret = dev_requeue_skb(skb, q);
}

if (ret && netif_xmit_frozen_or_stopped(txq))
ret = 0;

return ret;
}

至此整个L2的发送和接收就大致有了一个了解,并且能够理解整个的工作过程,这里有一点没有讲到的是L2的转发机制,比如STP等协议和实现,可能会在接下来的文章中继续剖析.

参考:

  1. Linux 下DMA浅析
  2. linux kernel的中断子系统之(八):softirq
  3. Linux Network Internals

经常提到的context都是一些在一个http请求的不同中间件之间传递参数和返回值的上下文. 标准库里的上下文稍微丰富一点, 比如流水线模型都会有一个通知退出的channel, 也一并封装进了Context, 所以既有传值功能也有流程控制的功能.

一般会把Context作为函数的第一个参数传递下去, 比如Context可以保存一个Request的id, 在打log的时候带上, 这样就可以区分出输出中同一个request的调用情况. 或者子函数select 传入的Context.Done channel用于在流水线中退出的通知等等.

Context接口有四个方法, 并且线程安全.

Done 是一个channel用于通知退出事件, 当从这个channel里读出的时候说明Context结束.
Err 返回退出的原因.
Deadline 返回一个截止时间(如果设置了的话).
Value 可以取得Context里面存得值.

Context的实现有4种, cancelCtx, emptyCtx, timerCtx, valueCtx.

valueCtx只是最简单的Ctx用来存k-v, 这类似于中间件之间传递各种值的功能, 没有Done, Err, Deadline这些方法, 是组合父Context得到的.

cancelCtx是一个带取消功能的Ctx, 返回一个cancel函数, 可以通过调用它主动取消这个Ctx, 比如错误退出的时候, 可以调用cancel函数, 他会同时调用子Ctx的cancel函数, 以此来让Context的Done起作用, cancelCtx是树形的, 有Done, Err, 但是没有Deadline, 组合了父结构获得的.

timerCtx是一个cancelCtx的封装, 由timer调用cancel函数, 并且不会超过parent的Deadline(不然就退化成cancelCtx由parent触发cancel), 有Deadline, 其余部分是组合了cancelCtx.

Context.WithCancel就是绑定一个parent, 返回一个cancelCtx的Context, 用返回的CancelFunc可以用来主动关闭Context, 比如程序错误的时候可以调用CancelFunc来让select Done的goroutine知道.

Context.WithTimer是对Context.WithDeadline的封装, Context.WithDeadline返回的是timerCtx.

Context.WithValue则仅仅是用来存值的Context.

WithXXX的函数都是要接受一个parent的, 最初的parent就用Context.Background().
它是一种emptyCtx, Err, Done(nil channel永远阻塞), Deadline, Value全都返回空值, 没有实际作用只是作为树形结构的root.

另一种emptyCtx是context.TODO(), 用于没有明确作用的不知道父Context会是什么的时候可以使用.

主要结构

为了解决外部碎片的问题, 需要有一种手段可以分配非连续的页映射到一段连续的虚拟地址上. 这就是vmalloc的目的, 本文基于4.7.2的代码对vmalloc的实现进行分析.

这里要提到一个重要的结构体

1
2
3
4
5
6
7
8
9
10
11
struct vm_struct {
struct vm_struct *next;
void *addr;
unsigned long size;
unsigned long flags;
struct page **pages;
unsigned int nr_pages;
phys_addr_t phys_addr;
const void *caller;
};

这个结构用于描述每一段映射的内存.

比起32位的结构, 64位结构有更大的寻址空间, 可以通过查看Documentation/x86/x86_64/mm.txt 获得64位的内存映射, 比起32位的变化就是跨度变大了很多.

直接映射是64TB, 有一个1TB的空洞, 接着就是32TB的vmalloc空间,以VMALLOC_START(64TB+1TB)开始,VMALLOC_END(64TB+1TB+32TB)结尾.

原来的32位时, 内核的虚拟空间只有1G, 直接映射的区间占896MB, 隔了一个8MB的空洞, 再接着是vmalloc的映射区, 大小一般是128MB.

vmalloc和物理映射的空洞主要是一个安全区, 如果越过了这个hole说明程序有问题存在非法的访问.

vm_struct的组织是通过链表实现的,
其中next指向下一个vm_struct, addr指向的是虚拟地址, size对应分配的内存的大小,在有guard page(也就是flag的VM_NO_GUARD没有设置的话)的时候会多出一个PAGE_SIZE的大小(但没有映射内存), pages是页的地址, nr_pages是页的数量, phy_addr这个字段一般为0,当用于ioremap的时候会用到, caller是调用者的地址.

上面说了, 每个vm_struct代表的区间之间会有一PAGE_SIZE大小的分隔,也是为了能够方便检查非法越界的情况.

这是32位的一张分布图, 整体结构没变, 就是64位的区间变大了.

vmap_area用于代表虚拟地址的区间, 和vm_struct是有对应关系的.
list 是按地址排序存的线性链表节点.
rb_node 是按地址存的红黑树节点.
这两个值都是在插入的时候修改的.
也就是为了方便搜索,它既存在红黑树上也存在链表里面.

1
2
3
4
5
6
7
8
9
10
11
12
struct vmap_area {
unsigned long va_start;
unsigned long va_end;
unsigned long flags;
struct rb_node rb_node; /* address sorted rbtree */
struct list_head list; /* address sorted list */
struct llist_node purge_list; /* "lazy purge" list */
struct vm_struct *vm;
struct rcu_head rcu_head;
};


vmap_area 虚拟地址的管理

接下了说一下调用过程, 首先vmalloc会调用__vmalloc_node_flags,这是一层封装,指定了GFP的flag, 接着调用__vmalloc_node,它通过指定搜寻的区域(VMALLOC_START,VMALLOC_END)调用了__vmalloc_node_range, 然后通过__get_vm_area_node寻找合适的虚拟内存空间,并且分配vm_struct,最后调用__vmalloc_area_node根据vm_struct分配具体的物理页并在页表中建立虚拟地址的映射.

__get_vm_area_nodesize进行对齐, 分配vmap_area的内存, 然后增加一个PAGE_SIZE的guard page, 接着就是寻找一段合适的虚拟地址的区间来分配给vmap_area并且用它来初始化vm_struct.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static struct vm_struct *__get_vm_area_node(unsigned long size,
unsigned long align, unsigned long flags, unsigned long start,
unsigned long end, int node, gfp_t gfp_mask, const void *caller)
{
struct vmap_area *va;
struct vm_struct *area;

BUG_ON(in_interrupt());
if (flags & VM_IOREMAP)
align = 1ul << clamp_t(int, fls_long(size),
PAGE_SHIFT, IOREMAP_MAX_ORDER);

size = PAGE_ALIGN(size);
if (unlikely(!size))
return NULL;

/* 分配vm_struct
* 并且初始化全0
*/
area = kzalloc_node(sizeof(*area), gfp_mask & GFP_RECLAIM_MASK, node);
if (unlikely(!area))
return NULL;
/* 如果没有标记,size要多算一个guard page用于分隔 */
if (!(flags & VM_NO_GUARD))
size += PAGE_SIZE;
/* 分配vmap_area */
va = alloc_vmap_area(size, align, start, end, node, gfp_mask);
if (IS_ERR(va)) {
kfree(area);
return NULL;
}

/* 初始化 */
setup_vmalloc_vm(area, va, flags, caller);

return area;
}

alloc_vmap_area会缓存上次一次搜索的节点到free_vmap_cache, 这样是认为一般虚拟内存是按顺序连续分配的, 如果通过缓存没有搜索到就要从全局的vmap_area_root开始搜索.

这里说明一下vmap_area的分配过程, 主要是在红黑树中根据所指定的区间寻找合适的点,如果找到一个区间正好包含了vstart,那么在这个区间链表后面寻找合适的空洞进行分配就可以, 也有可能在红黑书上找到的就是空洞就不用接着遍历了, 总之搜索的过程是先红黑书然后再链表.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
* Allocate a region of KVA of the specified size and alignment, within the
* vstart and vend.
*/
static struct vmap_area *alloc_vmap_area(unsigned long size,
unsigned long align,
unsigned long vstart, unsigned long vend,
int node, gfp_t gfp_mask)
{
struct vmap_area *va;
struct rb_node *n;
unsigned long addr;
int purged = 0;
struct vmap_area *first;

BUG_ON(!size);
BUG_ON(offset_in_page(size));
BUG_ON(!is_power_of_2(align));

might_sleep_if(gfpflags_allow_blocking(gfp_mask));

va = kmalloc_node(sizeof(struct vmap_area),
gfp_mask & GFP_RECLAIM_MASK, node);
if (unlikely(!va))
return ERR_PTR(-ENOMEM);

/*
* Only scan the relevant parts containing pointers to other objects
* to avoid false negatives.
*/
kmemleak_scan_area(&va->rb_node, SIZE_MAX, gfp_mask & GFP_RECLAIM_MASK);

retry:
spin_lock(&vmap_area_lock);
/*
* Invalidate cache if we have more permissive parameters.
* cached_hole_size notes the largest hole noticed _below_
* the vmap_area cached in free_vmap_cache: if size fits
* into that hole, we want to scan from vstart to reuse
* the hole instead of allocating above free_vmap_cache.
* Note that __free_vmap_area may update free_vmap_cache
* without updating cached_hole_size or cached_align.
*/
if (!free_vmap_cache || /* 缓存为空, 并且在下面条件满足的时候不使用缓存 */
size < cached_hole_size || /* 至少有一个空洞可以复用, 直接从头开始搜索 */
vstart < cached_vstart || /* 别缓存的起点小 */
align < cached_align) { /* 对齐大小比缓存的小 */
nocache:
cached_hole_size = 0;
free_vmap_cache = NULL;
}
/* record if we encounter less permissive parameters */
cached_vstart = vstart;
cached_align = align;

/* find starting point for our search */
if (free_vmap_cache) { /* 这个地方是用缓存上次搜索的结果, 每次找到以后会存到这里 */
first = rb_entry(free_vmap_cache, struct vmap_area, rb_node);
addr = ALIGN(first->va_end, align);
if (addr < vstart) /* 缓存的末尾对齐之后超出了范围从头开始搜索 */
goto nocache;
if (addr + size < addr) /* 整数溢出 */
goto overflow;

} else {
addr = ALIGN(vstart, align);
if (addr + size < addr)
goto overflow;

n = vmap_area_root.rb_node;
first = NULL;

while (n) { /* 用对齐之后的vstart正好<=va_ned&&>=va_start的节点 */
struct vmap_area *tmp;
tmp = rb_entry(n, struct vmap_area, rb_node);
if (tmp->va_end >= addr) {
first = tmp;
if (tmp->va_start <= addr)
break;
n = n->rb_left;
} else
n = n->rb_right;
}

if (!first)
goto found;
}
/* from the starting point, walk areas until a suitable hole is found */
while (addr + size > first->va_start && addr + size <= vend) {
if (addr + cached_hole_size < first->va_start) /* cached_hole_size */
cached_hole_size = first->va_start - addr; /* 缓存最大的空洞大小 */
addr = ALIGN(first->va_end, align);
if (addr + size < addr)
goto overflow;

if (list_is_last(&first->list, &vmap_area_list))
goto found;

first = list_next_entry(first, list);
}

found:
if (addr + size > vend)
goto overflow;

va->va_start = addr;
va->va_end = addr + size;
va->flags = 0;
__insert_vmap_area(va); /* 插入到红黑书和链表当中 */
free_vmap_cache = &va->rb_node;
spin_unlock(&vmap_area_lock);

BUG_ON(!IS_ALIGNED(va->va_start, align));
BUG_ON(va->va_start < vstart);
BUG_ON(va->va_end > vend);

return va;

overflow:
spin_unlock(&vmap_area_lock);
if (!purged) {
purge_vmap_area_lazy();
purged = 1;
goto retry;
}

if (gfpflags_allow_blocking(gfp_mask)) {
unsigned long freed = 0;
blocking_notifier_call_chain(&vmap_notify_list, 0, &freed);
if (freed > 0) {
purged = 0;
goto retry;
}
}

if (printk_ratelimit())
pr_warn("vmap allocation for size %lu failed: use vmalloc=<size> to increase size\n",
size);
kfree(va);
return ERR_PTR(-EBUSY);
}


建立映射

成功分配好vm_struct之后就可以建立映射了.
__vmalloc_area_node的过程首先是分配物理页,调用alloc_kmem_pages,这个函数和alloc_pages不同的是套了一层cgroup的kmem计数器,来限制内存分配, 然后再调用map_vm_area在页表上建立映射,内部调用的是vmap_page_range,指定了vm_struct的start和end, 而vmap_page_range则调用了vmap_page_range_noflush迭代去初始化各级页表. 内核的虚拟地址的映射就全部完成了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
*
* Set up page tables in kva (addr, end). The ptes shall have prot "prot", and
* will have pfns corresponding to the "pages" array.
* 这里的kva指的是kernel virtual address.
* Ie. pte at addr+N*PAGE_SIZE shall point to pfn corresponding to pages[N]
*/
static int vmap_page_range_noflush(unsigned long start, unsigned long end,
pgprot_t prot, struct page **pages)
{
pgd_t *pgd;
unsigned long next;
unsigned long addr = start;
int err = 0;
int nr = 0;

BUG_ON(addr >= end);
pgd = pgd_offset_k(addr); /* 把虚拟地址hash到pgd */
do {
next = pgd_addr_end(addr, end); /* 获取下一个pgd */
err = vmap_pud_range(pgd, addr, next, prot, pages, &nr); /* 再去初始化 pud pmd pte */
if (err)
return err;
} while (pgd++, addr = next, addr != end);

return nr;
}


对应的调用关系如图.

释放

释放的入口是vfree, 这个函数首先通过in_interrupt检查是否在中断上下文, 如果是正处于中断中则使用workqueue传入vfree_deferred用workqueue进行释放, 不然就直接调用__vunmap来释放, 它会调用remove_vm_area, 最终调用用unmap_vmap_area重置页表, 然后用free_vmap_area_noflushvmp_area加入到vmap_purge_list这个链表当中, 放入这个链表之后只有链表长度超过阈值才会尝试调用try_purge_vmap_area_lazy真正释放掉vmap_area,释放过程主要是从红黑树中移除, 然后把自己也给free掉, 这是一个延迟的释放过程, 最后把把vm_struct自己和它管理的物理页全部释放. 调用图如图所示.

总结

vmalloc主要是针对连续地址分配的需求, 涉及两个部分, 一个是虚拟地址的管理以及到物理地址的映射. 其中虚拟地址的管理比较麻烦一点, 用链表红黑树保存了每个vmap_area并且寻找合适的空洞用于分配虚拟地址. 而物理地址的分配之前我的博客已经讲过了, 建立映射只要设置对应的页表项就OK了.

参考

  1. 《LINUX3.0内核源代码分析》第四章: 内存管理

  2. linux内存管理之vmalloc

  3. lkvm

很多人看内核物理页的分配都喜欢从__alloc_pages开始看,喜欢从程序的执行流程去看问题,其实可换个思路,从开发或者说实现的角度去理解物理页的分配效果可能更好,所以这篇博客,反其道而行之,基于4.7.2的内核代码, 由内向外解释物理页的分配过程的细节.

binary buddy system allocation 算法

伙伴分配器算法的流程如下:

  1. 如果内存被分配
    1. 寻找一个合适大小的内存(要求是: 大于 requested memory, 同时以\(2^k\)为量纲最小的块, 也就是分配一个满足要求的最小块)
      1. 如果找到了直接分配
      2. 如果没有找到, 需要进行如下尝试
        1. 拆分一个比 requested memory 更大的内存块(比如比之前没找到的\(2^x\)大的\(2^{x+1}\), 分成两半.
        2. 如果拆分出来的一半满足requested memory, 并且不能再分了, 已经是最小的了, 就分配该块.
        3. 重复1, 寻找合适大小的内存块.
        4. 重复这个流程直到找到合适的内存块.
  2. 如果内存被释放
    1. 释放\(2^k\)内存块
    2. 查看内存块的伙伴也就是分配之后的另一半\(2^k\)块是否也free了
    3. 如果是,则会回到2并且重复执行直到所有内存被释放或者有一个伙伴没有被free掉, 无法合并.

内存碎片

内存碎片有两种,一种是External fragmentation和Internal Fragmentation, 前者是因为内存块划分的太小而无法满足大的内存分配的需要, 在这点上内核遇到的大内存块分配其实并不多, 而且通过vmalloc把不连续的物理地址映射成连续的虚拟地址很好的解决了这个问题. 后者的解决方法是引入了slab allocator, 对小内存进行了有效的缓存, 避免了Internal fragmentation的情况.

具体实现

每个struct zone结构有一个struct free_area free_area[MAX_ORDER];的定义, 对应的结构如下.

1
2
3
4
struct free_area {
struct list_head free_list[MIGRATE_TYPES];
unsigned long nr_free;
};

所以整个的空闲块的管理结构如图.

每一个free_area用来存储一种\(2^k\)块,用链表链接,MAX_ORDER表示的允许最大的块就是\(2^{MAX_ORDER}\)大小.

migratetype

图中省略了MIGRATE_TYPES的表示, 仔细观察free_list是一个数组每个代表了一种migratetype, 用于page分配从node之间迁移的, 让内存尽量分配在离CPU近的node上, 这个特性是在2.6.32.25引入的, 同时对外部碎片的现象进行了优化, 这一点是这么理解的, 外部碎片是因为小内存块的分配位置可能占据了某个地方, 导致大的内存块无法分配出来, 比如说如果这个页是一些内核的永久数据可能这个碎片就会一直存在, 但是我们提前从可移动页分配一大块内存来给不可移动页, 那么内存碎片就不会影响到可移动页的分配, 这样就不会让内核初始化的时候分配的一些永久性的内存导致碎片影响了其他内存的分配,下面我们看看迁移类型的具体定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* PAGE_ALLOC_COSTLY_ORDER is the order at which allocations are deemed
* costly to service. That is between allocation orders which should
* coalesce naturally under reasonable reclaim pressure and those which
* will not.
*/
/* 内核认为超过8个页算是大的内存分配 */
#define PAGE_ALLOC_COSTLY_ORDER 3

enum {
MIGRATE_UNMOVABLE,
MIGRATE_MOVABLE,
MIGRATE_RECLAIMABLE,
MIGRATE_PCPTYPES, /* the number of types on the pcp lists */
MIGRATE_HIGHATOMIC = MIGRATE_PCPTYPES,
#ifdef CONFIG_CMA
/*
* MIGRATE_CMA migration type is designed to mimic the way
* ZONE_MOVABLE works. Only movable pages can be allocated
* from MIGRATE_CMA pageblocks and page allocator never
* implicitly change migration type of MIGRATE_CMA pageblock.
*
* The way to use it is to change migratetype of a range of
* pageblocks to MIGRATE_CMA which can be done by
* __free_pageblock_cma() function. What is important though
* is that a range of pageblocks must be aligned to
* MAX_ORDER_NR_PAGES should biggest page be bigger then
* a single pageblock.
*/
MIGRATE_CMA,
#endif
#ifdef CONFIG_MEMORY_ISOLATION
MIGRATE_ISOLATE, /* can't allocate from here */
#endif
MIGRATE_TYPES
};


这里简单解释一下:

  1. MIGRATE_UNMOVABLE
     这种类型的页的位置是固定不变的, 不可以移动, 内核中大部分数据是这样的.
    
  2. MIGRATE_MOVABLE
     不能够直接移动,但是可以删除,而内容则可以从某些源重新生成.如文件数据映射的页面则归属此类.
    
  3. MIGRATE_RECLAIMABLE
     可以移动。分配给用户态程序运行的用户空间页面则为该类.由于是通过页面映射而得,将其复制到新位置后,更新映射表项,重新映射,应用程序是不感知的.
    
  4. MIGRATE_PCPTYPES
     是一个分界数,小于这个数的都是在pcplist上的.
    
  5. MIGRATE_CMA
     连续内存分配,用于避免预留大块内存导致系统可用内存减少而实现的,即当驱动不使用内存时,将其分配给用户使用,而需要时则通过回收或者迁移的方式将内存腾出来。
    
  6. MIGRATE_ISOLATE
     表示NUMA不能夸node迁移page, 让cpu和节点之间保持亲和性.
    
  7. MIGRATE_TYPES
     分界数, 低于这个才是迁移类型.
    

实现概要

实际上的合并对应的是通过小块合并成大块从下级的free_area的链表中移除, 然后添加到上一层free_area当中. 拆分则相反, 从上级free_area当中删除大块, 并且拆分成两个小块, 加入到下层free_area的list当中, 当然, 如果有一块符合需求就直接分配了. 这就是整个物理页分配的核心内容.

分配

整个算法的入口是__rmqueue, 从伙伴分配器中分配对应\( 2^{order} \) 的物理块.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
* Do the hard work of removing an element from the buddy allocator.
* Call me with the zone->lock already held.
*/
static struct page *__rmqueue(struct zone *zone, unsigned int order,
int migratetype)
{
struct page *page;

page = __rmqueue_smallest(zone, order, migratetype);
if (unlikely(!page)) {
if (migratetype == MIGRATE_MOVABLE)
page = __rmqueue_cma_fallback(zone, order); /* MOVABLE的失败优先从cma迁移 */

if (!page)
page = __rmqueue_fallback(zone, order, migratetype); /* 失败的话会从其他备选迁移类型当中迁移page */
}

trace_mm_page_alloc_zone_locked(page, order, migratetype);
return page;
}

分配失败退化的行为在后面讨论,先看主要路径, 算法主要体现在__rmqueue_smallest这个函数上, 依次从order开始向上寻找free_area并从list当中取出page块.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
* Go through the free lists for the given migratetype and remove
* the smallest available page from the freelists
*/
static inline
struct page *__rmqueue_smallest(struct zone *zone, unsigned int order,
int migratetype)
{
unsigned int current_order;
struct free_area *area;
struct page *page;

/* Find a page of the appropriate size in the preferred list */
for (current_order = order; current_order < MAX_ORDER; ++current_order) {
area = &(zone->free_area[current_order]);
page = list_first_entry_or_null(&area->free_list[migratetype],
struct page, lru);
if (!page)
continue; /* 如果失败就尝试更大的块 */
list_del(&page->lru); /* 这个lru取决他的上下文,比如在这里就是free_list */
rmv_page_order(page); /* 设置flags */
area->nr_free--; /* free计数器-1 */
expand(zone, page, order, current_order, area, migratetype);
set_pcppage_migratetype(page, migratetype);/* 设置page的migratetype */
return page;
}
return NULL;
}

从下往上找到合适的块以后再从上往下进行拆分, 这依赖于expand函数完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/*
* The order of subdivision here is critical for the IO subsystem.
* Please do not alter this order without good reasons and regression
* testing. Specifically, as large blocks of memory are subdivided,
* the order in which smaller blocks are delivered depends on the order
* they're subdivided in this function. This is the primary factor
* influencing the order in which pages are delivered to the IO
* subsystem according to empirical testing, and this is also justified
* by considering the behavior of a buddy system containing a single
* large block of memory acted on by a series of small allocations.
* This behavior is a critical factor in sglist merging's success.
*
* -- nyc
*/
static inline void expand(struct zone *zone, struct page *page,
int low, int high, struct free_area *area,
int migratetype)
{
unsigned long size = 1 << high;

while (high > low) { /* 从高阶向低阶迭代 */
area--;
high--;
size >>= 1;
VM_BUG_ON_PAGE(bad_range(zone, &page[size]), &page[size]);
/* guardpage TODO */
if (IS_ENABLED(CONFIG_DEBUG_PAGEALLOC) &&
debug_guardpage_enabled() &&
high < debug_guardpage_minorder()) {
/*
* Mark as guard pages (or page), that will allow to
* merge back to allocator when buddy will be freed.
* Corresponding page table entries will not be touched,
* pages will stay not present in virtual address space
*/
set_page_guard(zone, &page[size], high, migratetype);
continue;
}
list_add(&page[size].lru, &area->free_list[migratetype]); /* 把page放到area的free_list里面, 这个page是一个数组形式的 比如之前是8 这里变成4,那么后半部分会被留下来,前半部分会继续用于迭代 */
area->nr_free++; /* 空闲块计数器+1 */
set_page_order(&page[size], high); /* 相当于page->private = high 表示自己属于order为high的阶的block中 */
}
}

expand 就是向下一层的free_area留一半, 然后接着用另一半进行迭代.

举个例子说明分配的过程:

假设当前需要从zone当中分配一个order为1的page, 就会从下往上找到一个可以分配的块.

从1开始迭代到2, 发现2有可以分配的\(2^3\)page, 然后进入expand, 进行拆分.

拆分以后一半用于分配一半进入了同一层的free_list当中,如图所示, 核心流程非常简洁明了.

现在再重新回到__rmqueue,分析这个函数后半部分的fallback行为是如何迁移的.
首先如果编译选项带了CMA就会有具体内容, 其实就是再尝试从MIGRATE_CMA当中再分配一次page.

1
2
3
4
5
6
7
8
9
10
11
#ifdef CONFIG_CMA
static struct page *__rmqueue_cma_fallback(struct zone *zone,
unsigned int order)
{
return __rmqueue_smallest(zone, order, MIGRATE_CMA);
}
#else
static inline struct page *__rmqueue_cma_fallback(struct zone *zone,
unsigned int order) { return NULL; }
#endif

之后是一般性的退化流程是__rmqueue_fallback这个函数.
说这个函数之前需要看一张表.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

/*
* This array describes the order lists are fallen back to when
* the free lists for the desirable migrate type are depleted
*/
static int fallbacks[MIGRATE_TYPES][4] = {
[MIGRATE_UNMOVABLE] = { MIGRATE_RECLAIMABLE, MIGRATE_MOVABLE, MIGRATE_TYPES },
[MIGRATE_RECLAIMABLE] = { MIGRATE_UNMOVABLE, MIGRATE_MOVABLE, MIGRATE_TYPES },
[MIGRATE_MOVABLE] = { MIGRATE_RECLAIMABLE, MIGRATE_UNMOVABLE, MIGRATE_TYPES },
#ifdef CONFIG_CMA
[MIGRATE_CMA] = { MIGRATE_TYPES }, /* Never used */
#endif
#ifdef CONFIG_MEMORY_ISOLATION
[MIGRATE_ISOLATE] = { MIGRATE_TYPES }, /* Never used */
#endif
};

这张表描述的是当一种类型无法满足时退化选择的迁移类型的一个顺序.接下来看函数的实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

/* Remove an element from the buddy allocator from the fallback list */
static inline struct page *
__rmqueue_fallback(struct zone *zone, unsigned int order, int start_migratetype)
{
struct free_area *area;
unsigned int current_order;
struct page *page;
int fallback_mt;
bool can_steal;

/* Find the largest possible block of pages in the other list */
for (current_order = MAX_ORDER-1;
current_order >= order && current_order <= MAX_ORDER-1;
--current_order) {
area = &(zone->free_area[current_order]);
/* 遍历fallback table 找到合适的fallback migratetype 要综合考虑: 是否有超过一定阈值的空闲页,并且是可以退化的页类型 */
fallback_mt = find_suitable_fallback(area, current_order,
start_migratetype, false, &can_steal);
if (fallback_mt == -1)
continue;

page = list_first_entry(&area->free_list[fallback_mt],
struct page, lru);
/* 从对应迁类型中"偷取"page TODO:具体如何迁移的 我感觉应该是很抽象的迁移就可以了 */
if (can_steal)
steal_suitable_fallback(zone, page, start_migratetype);

/* Remove the page from the freelists */
area->nr_free--;
list_del(&page->lru);
rmv_page_order(page);

expand(zone, page, order, current_order, area,
start_migratetype);
/*
* The pcppage_migratetype may differ from pageblock's
* migratetype depending on the decisions in
* find_suitable_fallback(). This is OK as long as it does not
* differ for MIGRATE_CMA pageblocks. Those can be used as
* fallback only via special __rmqueue_cma_fallback() function
*/
set_pcppage_migratetype(page, start_migratetype);

trace_mm_page_alloc_extfrag(page, order, current_order,
start_migratetype, fallback_mt);

return page;
}

return NULL;
}

和主要分配路径不同的事, 迭代器是从最大的阶数开始迭代的,这个代表的意思是说我尽量多迁移一点空间给你, 不然你还要迁移的时候我又分配一小块空间给你, 你的碎片间接导致了我的碎片产生, 所以从大到小开始迭代.

整个核心部分分析完以后, 按照从里向外分析的思路,这是基于__rmqueu的一套更高层次的函数, 再倒回来看alloc_pages的实现会发现非常容易理解.首先__alloc_pages调用__alloc_pages_nodemask. 这里补充一个结构体.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
* Structure for holding the mostly immutable allocation parameters passed
* between functions involved in allocations, including the alloc_pages*
* family of functions.
*
* nodemask, migratetype and high_zoneidx are initialized only once in
* __alloc_pages_nodemask() and then never change.
*
* zonelist, preferred_zone and classzone_idx are set first in
* __alloc_pages_nodemask() for the fast path, and might be later changed
* in __alloc_pages_slowpath(). All other functions pass the whole strucure
* by a const pointer.
*/
struct alloc_context {
struct zonelist *zonelist;
nodemask_t *nodemask;
struct zoneref *preferred_zoneref;
int migratetype;
enum zone_type high_zoneidx;
bool spread_dirty_pages;
};

这个结构体主要是一些内存分配的参数, 因为从上到下的调用都一直有用到这几个参数所以合成一个context向下传递.

__alloc_pages_nodemask第一步会检查cpusets的功能是否打开, 这个是一个cgroup的子模块, 如果没有设置nodemask就会用cpusets配置的cpuset_current_mems_allowed来限制在哪个node上分配, 这个也是在NUMA结构当中才会有用的. 之后会用到might_sleep_if,判断gfp_mask & __GFP_DIRECT_RECLAIM), 表示当前内存压力比较大需要直接回收内存, 会循环睡眠同步等待页可用, 而might_sleep_if是一个debug函数,标记当前函数在iftrue的时候表示可能会进入睡眠, 如果当前调用进入了一个不可睡眠的上下文就会报错. should_fail_alloc_page会做一些预检查, 一些无法分配的条件会直接报错.
接着获取read_mems_allowed_begin,这个会拿到current当前进程的cpusets允许的节点分配掩码, 这个结果的读取通过顺序锁(seqcount_t)进行, 如果在分配page期间这个值发生了改变(通过read_mems_allowed_retry判断), 那么读操作需要重新进行尝试.
调用first_zones_zonelist会从zonelist这个表示分配page的zone的优先顺序链表里获取第一个zoneref, zoneref是一个链表节点用于迭代之后的zone选项.
接着就会调用关键函数get_page_from_freelist从而分配到页.
分配完页如果失败会调用__alloc_pages_slowpath唤起swapd进程进行页回收从而获取可用的页, 再尝试调用get_page_from_free_list.
函数的调用图如下.

这是最外层的一个过程, 但是内核往往经过了很多实践才会有这么一套机制, 内核在这一层之前加了一套缓存, 来加速物理页的分配, get_page_from_free_list到伙伴分配器之间还有一层高速缓存. 这个结构是一个percpu结构, 每个cpu独有一份链表缓存分配的页,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct per_cpu_pages {
int count; /* number of pages in the list */
int high; /* high watermark, emptying needed */
int batch; /* chunk size for buddy add/remove */

/* Lists of pages, one per migrate type stored on the pcp-lists */
struct list_head lists[MIGRATE_PCPTYPES];
};

struct per_cpu_pageset {
struct per_cpu_pages pcp;
#ifdef CONFIG_NUMA
s8 expire;
#endif
#ifdef CONFIG_SMP
s8 stat_threshold;
s8 vm_stat_diff[NR_VM_ZONE_STAT_ITEMS];
#endif
};

每个zone都会有一个这个结构用于缓存页的分配, 中文一般就叫高速缓存.

缓存的过程是这样的, 首先通过for_next_zone_zonelist_nodemask遍历每个zone 会选择合适的zone, 选择合适的zone一笔带过但其实考虑了很多因素比如是否满足分配的wartermark, 是否需要均衡到别的zone当中, 是否被cpusets禁止了分配等等, 然后调用buffered_rmqueue这个函数, 这个函数就是构建对buddy分配器裹了一层高速缓存. 这个函数在只分配1页(order=0)的情况下,会判断是不是”冷”页, bool cold = ((gfp_flags & __GFP_COLD) != 0), 冷页的区别是会放到高速缓存的末尾, 相反热页会放到高速缓存的头部. 然后尝试获取高速缓存的page, 如果高速缓存为空就调用rmqueue_bulk分配一段batch大小的页块到高速缓存中继续尝试分配, 而rmqueue_bulk就是循环了batch次分配了order=0的page逐个添加到高速缓存当中. 如果需要分配的order不为0的话还是会直接从buddy-system当中分配不依靠高速缓存. 高速缓存主要是缓存单页的分配.

所以调整后的走高速缓存的主流程就如下图所示.

释放

还是从伙伴系统的最内部实现向外说, 释放对应的是__free_one_page, 指定释放\(2^{order}\)的物理页块, 地址为page, 可以说释放过程是拆分过程的一个逆过程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
* Freeing function for a buddy system allocator.
*
* The concept of a buddy system is to maintain direct-mapped table
* (containing bit values) for memory blocks of various "orders".
* The bottom level table contains the map for the smallest allocatable
* units of memory (here, pages), and each level above it describes
* pairs of units from the levels below, hence, "buddies".
* At a high level, all that happens here is marking the table entry
* at the bottom level available, and propagating the changes upward
* as necessary, plus some accounting needed to play nicely with other
* parts of the VM system.
* At each level, we keep a list of pages, which are heads of continuous
* free pages of length of (1 << order) and marked with _mapcount
* PAGE_BUDDY_MAPCOUNT_VALUE. Page's order is recorded in page_private(page)
* field.
* So when we are allocating or freeing one, we can derive the state of the
* other. That is, if we allocate a small block, and both were
* free, the remainder of the region must be split into blocks.
* If a block is freed, and its buddy is also free, then this
* triggers coalescing into a block of larger size.
*
* -- nyc
*/

static inline void __free_one_page(struct page *page, /* 把page加回到free_list里面 */
unsigned long pfn,
struct zone *zone, unsigned int order,
int migratetype)
{
unsigned long page_idx;
unsigned long combined_idx;
unsigned long uninitialized_var(buddy_idx);
struct page *buddy;
unsigned int max_order;

max_order = min_t(unsigned int, MAX_ORDER, pageblock_order + 1); /* 获取最大的阶数 */

VM_BUG_ON(!zone_is_initialized(zone)); /* 检查wait_table是否存在来表示zone是否初始化过 里面用到了双感叹号表示强制1或者0 */
VM_BUG_ON_PAGE(page->flags & PAGE_FLAGS_CHECK_AT_PREP, page);

VM_BUG_ON(migratetype == -1);
if (likely(!is_migrate_isolate(migratetype))) /* 是isolate类型 */
__mod_zone_freepage_state(zone, 1 << order, migratetype);

page_idx = pfn & ((1 << MAX_ORDER) - 1); /* 获取page的下标 */

VM_BUG_ON_PAGE(page_idx & ((1 << order) - 1), page);
VM_BUG_ON_PAGE(bad_range(zone, page), page);

continue_merging:
while (order < max_order - 1) {
buddy_idx = __find_buddy_index(page_idx, order);/* 获取buddy的下标 */
buddy = page + (buddy_idx - page_idx); /* 根据相对距离得到buddy page */
if (!page_is_buddy(page, buddy, order)) /* 如果不是buddy就结束合并 */
goto done_merging;
/*
* Our buddy is free or it is CONFIG_DEBUG_PAGEALLOC guard page,
* merge with it and move up one order.
*/
if (page_is_guard(buddy)) {
clear_page_guard(zone, buddy, order, migratetype);
} else {
/* 把buddy从free_area当中移除 */
list_del(&buddy->lru);
zone->free_area[order].nr_free--;
rmv_page_order(buddy);
}
combined_idx = buddy_idx & page_idx;
page = page + (combined_idx - page_idx);
page_idx = combined_idx;
order++; /* 向上合并 */
}
if (max_order < MAX_ORDER) {
/* If we are here, it means order is >= pageblock_order.
* We want to prevent merge between freepages on isolate
* pageblock and normal pageblock. Without this, pageblock
* isolation could cause incorrect freepage or CMA accounting.
*
* We don't want to hit this code for the more frequent
* low-order merging.
*/
/* 在这里说明已经超出了pageblock的order, 可能在不同的migratetype的block边界了,这里再检查一次是不是isolate, 不允许节点间迁移的page, 如果是的话就要结束合并的迭代,不然继续向上合并 */
if (unlikely(has_isolate_pageblock(zone))) {
int buddy_mt;

buddy_idx = __find_buddy_index(page_idx, order);
buddy = page + (buddy_idx - page_idx);
buddy_mt = get_pageblock_migratetype(buddy);

if (migratetype != buddy_mt
&& (is_migrate_isolate(migratetype) ||
is_migrate_isolate(buddy_mt)))
goto done_merging;
}
max_order++;
goto continue_merging;
}

done_merging:
set_page_order(page, order);/* 结束合并自后设置合并之后的阶数 */

/*
* If this is not the largest possible page, check if the buddy
* of the next-highest order is free. If it is, it's possible
* that pages are being freed that will coalesce soon. In case,
* that is happening, add the free page to the tail of the list
* so it's less likely to be used soon and more likely to be merged
* as a higher order page
*/
if ((order < MAX_ORDER-2) && pfn_valid_within(page_to_pfn(buddy))) { /* 这个条件判断没有合并到最大块, 内核认为很有可能接下来这个块和其他free的块合并, 从减少碎片的角度来说会更倾向于放到链表的末尾, 让这个块的free状态保持久一点, 更有机会被合并成更大的块 */
struct page *higher_page, *higher_buddy;
combined_idx = buddy_idx & page_idx; /* 合并后的idx */
higher_page = page + (combined_idx - page_idx); /* 合并后的page */
buddy_idx = __find_buddy_index(combined_idx, order + 1); /* 向上寻找buddy */
higher_buddy = higher_page + (buddy_idx - combined_idx);
if (page_is_buddy(higher_page, higher_buddy, order + 1)) {
list_add_tail(&page->lru,
&zone->free_area[order].free_list[migratetype]);
goto out;
}
}

list_add(&page->lru, &zone->free_area[order].free_list[migratetype]);
/* 把page加入到对应的order当中 */
out:
zone->free_area[order].nr_free++; /* free_area nr_free 空闲块加1 */
}

这个函数就是从order开始迭代不断寻找自己的buddy, 寻找buddy的函数是__find_buddy_index, 首先要说的是page是以数组的形式存放在mem_map当中, 整个page是可以通过index线性索引的.要找到自己的buddy只要在对应的order位进行抑或即可.

1
2
3
4
5
6
7
static inline unsigned long
__find_buddy_index(unsigned long page_idx, unsigned int order)
{
/* idx 抑或 2^order order那位是0, 那么buddy应该是1
* order那位是1, 那么buddy应该是0 */
return page_idx ^ (1 << order);
}

然后层层递归把低层的free_area中的page块向高层移动,最后有一个优化, 内核认为刚释放的page很可能相关的buddy页也会释放, 所以把刚释放的页放到末尾让他更晚被分配出去从而得到更多的机会被合并成大块.

现在再从外层看物理页的分配.

入口函数是__free_pages, 其实知道了分配的过程对于释放的过程可以说是轻而易举就能理解的.首先这个函数put_page_testzero会减少引用计数, 如果到达0才真正执行释放操作.释放路径有两个一个是order为0和不为0的情况, 为0很好理解, 需要交回到高速缓存即可, 如果高速缓存满了再交回给buddy system, 不为0说明也没有缓存过,就直接调用free_one_page,这个函数会调用__free_one_page,最终还给buddy system. 所以这里主要看一下代缓存的路径, 先是调用free_hot_cold_page, 指定是热页也就是添加到高速缓存的头,如果计数器超过阈值pcp->count >= pcp->high调用free_pcppages_bulk交还给buddy system, 这个函数可以遍历高速缓存中的migratetype,然后根据指定的count从高速缓存中交还给buddy system的页, 当前路径是整个batch个的页都会还回去.

带高速缓存的释放的调用图如图所示.

总结

伙伴分配器的确是一个非常简洁的算法(我认为可以递归表示和迭代表示算法都美妙无比), 但是内核对于这样一个简单的算法也结合了很多实际工程的内容还有一些优化,比如迁移类型还有冷热页的标记和高速缓存等等. 充分理解内核的物理页分配对于接下来理解slab分配起和内核虚拟内存的相关内容有很好的帮助作用. 文末给出了许多参考链接, 我结合了前人的知识进行总结, 并且基于最新的4.7.2的内核重新走了一遍分析之路, 做了解释和补充, 对自己的学习是一个积累, 同时希望对大家有帮助.

参考

  1. 迁移类型 实现反碎片
  2. 页面迁移
  3. LVMM 的Physical Page Allocation 章节
  4. Linux 物理页面分配
  5. 顺序锁

qlang[1]是用Go实现的一套基于虚拟机实现的动态语言,这里主要分解一下虚拟机的实现如何嵌入到解释器当中的.

我们用tpl定义了一套语法,类似于yacc的语法,首先是运算优先级.

*代表匹配至少0个,/表示回调动作,|表示或,这是语法的第一部分,属于基本表达式:

1
2
3
4
5
6
7
8
9
10
11
12
term1 = factor *('*' factor/mul | '/' factor/quo | '%' factor/mod)

term2 = term1 *('+' term1/add | '-' term1/sub)

term31 = term2 *('<' term2/lt | '>' term2/gt | "==" term2/eq | "<=" term2/le | ">=" term2/ge | "!=" term2/ne)

term3 = term31 *("<-" term31/chin)

term4 = term3 *("&&"/_mute term3/_code/_unmute/and)

expr = term4 *("||"/_mute term4/_code/_unmute/or)

所以term1表示乘性运算,相似的term2表示加性运算,term3表示逻辑运算,term4表示与,expr表示表达式.
这里有一个特点就是优先级越高的运算在语法上写在了越前面,这是有原因的.

由上向下的递归推导可以看做是把输入的记号流转化成一颗语法树,从根节点按先序遍历进行.所以对应的也是寻找一个最左推导的过程.
递归方式的遍历也是一种实现,比如说如果我要匹配1+2那么就会构建一颗树形结构如下图的第一个.但是如果要匹配1+2*3这样的表达式就有可能出现问题,既可以是图中的第二种情况也可以是第三种情况.这里就关系到优先级的问题.因为递归向下的解析过程是深度优先的也就是意味着如果最先选择匹配,就能先形成语法结构,比如说,当输入是1+2*3时,如果先匹配加法,那么前三个token就是1+2就会构成一个结点,之后3就只能作为*的右边的因数加入到语法树中,但是如果乘法在语法顺序中靠前,那么匹配到1+的时候,就会去匹配term1这个语法,然后term1就会把乘法匹配完,这样就能构成下图中第三个语法树.这种优先级可以推而广之,同级优先的运算作为一个语法,高优先级的语法排在较前即可.

语法树

下面语句的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
sexpr = expr (
'='/tovar! expr/assign |
','/tovar! expr/tovar % ','/ARITY '=' expr % ','/ARITY /massign |
"++"/tovar/inc | "--"/tovar/dec |
"+="/tovar! expr/adda | "-="/tovar! expr/suba |
"*="/tovar! expr/mula | "/="/tovar! expr/quoa | "%="/tovar! expr/moda | 1/pop)

s = "if"/_mute! expr/_code body *("elif" expr/_code body)/_ARITY ?("else" body)/_ARITY/_unmute/if |
"switch"/_mute! ?(~'{' expr)/_code '{' swbody '}'/_unmute/switch |
"for"/_mute/_urange! fhead body/_unmute/for |
"return"! expr %= ','/ARITY /return |
"break" /brk |
"continue" /cont |
"include"! STRING/include |
"import"! (STRING ?("as" IDENT/name)/ARITY)/import |
"export"! IDENT/name % ','/ARITY /export |
"defer"/_mute! expr/_code/_unmute/defer |
"go"/_mute! expr/_code/_unmute/go |
sexpr

doc = ?s *(';' ?s)

body = '{' doc/_code '}'

fhead = (~'{' s)/_code %= ';'/_ARITY

frange = ?(IDENT/name % ','/ARITY '=')/ARITY "range" expr

swbody = *("case"! expr/_code ':' doc/_code)/_ARITY ?("default"! ':' doc/_code)/_ARITY

fnbody = '(' IDENT/name %= ','/ARITY ?"..."/ARITY ')' '{'/_mute doc/_code '}'/_unmute

afn = '{'/_mute doc/_code '}'/_unmute/afn

clsname = '(' IDENT/ref ')' | IDENT/ref

newargs = ?('(' expr %= ','/ARITY ')')/ARITY


语句主要支持的是赋值表达式,if语句,switch语句,for语句,return语句,break语句,continue语句,include语句,import语句,export语句,defer语句,go语句和单表达式构成的语句.

if语句类似于if expr body elif expr else body,
switch语句类似于switch expr { swbody },swbody的定义又是类似于case expr: body default: body的形式.
for语句类似于for stmt;stmt;stmt body的形式.
retur语句比较简答,对应的是return expr.
break,continue语句只有continuebreak. include,import语句则是include STRINGimport STRING的形式. defer语句是defer expr的形式. go语句是go expr`的形式,这里的go语句就是起一个routine进行执行.

从这里可以说一下这门语言的设计,类似于lua从简实现.因为继承了Go通道的特性所以语法中出现了类似<-的语法,并且支持类和成员函数.支持复合类型比如说slice,map和chan.

接着是单一元素的定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
classb = "fn"! IDENT/name fnbody ?';'/mfn

atom =
'(' expr %= ','/ARITY ?"..."/ARITY ?',' ')'/call |
'.' (IDENT|"class"|"new"|"recover"|"main")/mref |
'[' ?expr/ARITY ?':'/ARITY ?expr/ARITY ']'/index

factor =
INT/pushi |
FLOAT/pushf |
STRING/pushs |
CHAR/pushc |
(IDENT/ref | '('! expr ')' |
"fn"! (~'{' fnbody/fn | afn) | '[' expr %= ','/ARITY ?',' ']'/slice) *atom |
"new"! clsname newargs /new |
"range"! expr/_range |
"class"! '{' *classb/ARITY '}'/class |
"recover"! '(' ')'/recover |
"main"! afn |
'{'! (expr ':' expr) %= ','/ARITY ?',' '}'/map |
'!' factor/not |
'-' factor/neg |
"<-" factor/chout |
'+' factor

atom是个比较常见的定义,作为一个变量的atom,就是类似于fn(expr),var.selector或者indent[index]构成atom定义,作为元素的一部分.
整数(INT),浮点数(FLOAT),字符串(STRING),字符(CHAR),组合atom的元素(INDENT atom,fn{}atom,[expr,expr…]atom)等都可能是一个元素.
支持语法级别的new返回的对象,支持range,从panic中recover也可能是一个返回值,以及一些单操作符可以表达的字符串.

以上是整个语法的定义.qlang运行的环境是自行编写的虚拟机环境,实现一个虚拟机主要是要实现各种字节码的实现,还有对应的系统调用.
但是目前的实现没有定义中间状态的字节码,而是以一个库的形式,用接口的形式作为字节码执行的接口.

任何字节码的定义都要满足这样的接口,第一个参数是当前的栈,第二个参数是执行的上下文.

1
2
3
type Instr interface {
Exec(stk *Stack, ctx *Context)
}

对应的实现者为Class,iAnonymFn(匿名函数),iAs,

iAssign(赋值),iCall(调用),iDefer,isExport`,

iForRange,iFunc,iGo,iMacro,iMemberRef,

iModule,iMultiAssign,iMultiAssingFromSlice,

iOp1Assign,iOp3,iOpAssign,iPush,iRef,

iRem,iUnSet,iAnd,iCallFn,iCallFnv,

iCase,iGo,iChanIn,iClear,iJmp,

iJmpIfFalse,iNew,iOr,iPop,iPopEx,

iRecover,iReturn,等等这些字节码都是通过构造函数变作为Insrt接口返回的.

其实编程语言的虚拟机说白了就是一种提供字节码的运行环境,比如我定义add x1,x2sub x1,x2两条指令作为我的虚拟机的集合,那么我的虚拟寄字节码就支持加法和乘法,也就可以完成一个简易计算器的算式到字节码的转化,再把得到的字节码按照我们的需要进行转换.

比如LLVM[2]就是把自己定义的一套字节码(也就是类似于平台无关的虚拟汇编,其实这种汇编要稍微好用一点点,因为里面的寄存器是无限多个的,不需要自己考虑寄存器不够的情况),然后把这些字节码转化成平台相关的汇编,最后变成二进制文件.所以实现一套编程语言虚拟机就是要定义一套字节码的集合和对应的实现方式.

这里我们用Go实现虚拟机是通过Go来运行的,所以没有静态语言的性质,只是跑在Go的运行时上的虚拟机.这里我们举个例子看一下具体的实现.

现在以switch语法为例说一下运行流程
在C转汇编的过程中,switch case是通过跳转表来实现的,比如下面的C代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <stdio.h>

int f(int x)
{
switch(x)
{
case 1:
printf("1");
break;
case 2:
printf("2");
break;
default:
printf("default");
}
}
int main()
{
f(3);
}

转成汇编的话就是如下代码,f为switch case的部分的实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
	.file	"main.c"
.section .rodata
.LC0:
.string "default"
.text
.globl f
.type f, @function
f:
pushq %rbp // 保存栈base指针
movq %rsp, %rbp // 移动栈指针到rbp
subq $16, %rsp // 因为leaf function,可以开辟red zone[3] 128个字节
movl %edi, -4(%rbp) // 栈指针开始第4个字节,也就是第一个参数,0(%rbp)是callee保留的rbp.
movl -4(%rbp), %eax // 移动到eax中
cmpl $1, %eax //和1比较跳到L3
je .L3
cmpl $2, %eax //和2比较跳到L4
je .L4
jmp .L7 // default跳到L7
.L3:
movl $49, %edi // 放入参数'1'调用putchar,这里只打印一个字符,被优化成了putchar.
call putchar
jmp .L6
.L4:
movl $50, %edi // 放入参数'2'
call putchar
jmp .L6
.L7:
movl $.LC0, %edi // 让如.LC0,也就是字符串"default"的地址放入edi作为printf的参数
movl $0, %eax
call printf
.L6:
leave
ret
.size f, .-f
.globl main
.type main, @function
main:
pushq %rbp
movq %rsp, %rbp
movl $3, %edi
call f
popq %rbp
ret
.size main, .-main
.ident "GCC: (Ubuntu 4.8.4-2ubuntu1~14.04.1) 4.8.4"
.section .note.GNU-stack,"",@progbits

从汇编可以看出switch case其实本身其实是可以通过goto实现的,switch case只是goto的一个高级封装的实现技巧而已.如何放到虚拟机中
其实就是提供类似的goto机制来满足跳转的需求.

switch解析的时候首先注册了$switch的回调函数,如果匹配了"switch"/_mute! ?(~'{' expr)/_code '{' swbody '}'/_unmute/switch就会调用Compiler.Switch函数进行处理.

_mute会禁止回调函数的运行,除了_开头的回调函数,_unmute则是解开禁止.
_code的作用是把匹配到的语法记号流(tokens []tpl.Token)入栈.

swith boyd 是按照如下定义的:

swbody = *("case"! expr/_code ':' doc/_code)/_ARITY ?("default"! ':' doc/_code)/_ARITY

_ARITY获取的是语法匹配的次数,分别记录了casedefault匹配的次数,case可以匹配*次,default可以匹配?次.

然后可以看下Switch如何处理的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (p *Compiler) Switch(e interpreter.Engine) {

var defaultCode interface{}
// 之前push了一个 ? 的匹配次数, 如果是1那么就有default的代码, 所以把defaultCode pop出来.
defaultArity := p.popArity()
if defaultArity == 1 {
defaultCode, _ = p.gstk.Pop()
}
// 获取case的匹配次数
caseArity := p.popArity()
// case 中有个expression,case:后面有一个statment,所以乘2
casebr := p.gstk.PopNArgs(caseArity << 1) // 2 * caseArity
// 这switch:后面跟着的expression的代码取出
switchCode, _ := p.gstk.Pop()
// 保存老的块上下文
old := p.bctx
p.bctx = blockCtx{}
// switchCode有两种 , 一种是 switch , 一种是 switch expr.
// 这里处理的是switch {}的形式,每个case中都是条件表达式,就变成了if语句.
if switchCode == nil {
// 执行case branch,和default branch
p.doIf(e, casebr, defaultCode, caseArity)
p.bctx.MergeSw(&old, p.code.Len())
return
}
// 转换switchCode
// reserved2 是一组空的指令,用于最后填充跳转指令跳到switch body的末尾.
reserved2 := make([]exec.ReservedInstr, caseArity)
if err := e.EvalCode(p, "expr", switchCode); err != nil {
panic(err)
}
// 解析switchCode完毕,添加一行代码
p.CodeLine(switchCode)
for i := 0; i < caseArity; i++ {
caseCode := casebr[i<<1]
// 解析表达式
if err := e.EvalCode(p, "expr", caseCode); err != nil {
panic(err)
}
// 记录解析过的一行代码
p.CodeLine(caseCode)

// 保留指令一行空指令留待插入 case的跳转指令
reserved1 := p.code.Reserve()
bodyCode := casebr[(i<<1)+1]
// 解析块代码
bctx := evalDocCode(e, p, bodyCode)
// 把当前作用域中break,continue指令加入到p.bctx中
// 等最后到解析末尾再把跳转距离计算出来
bctx.MergeTo(&p.bctx)
// 把当前位置留空.
// 解析到了case :{}结尾作为跳转到结尾的指令的插入位置.
reserved2[i] = p.code.Reserve()
// 把reserved1保留的位置插入跳转到reserved2的保留的地址的地方.
// 相当于 Case delta,如果case 成功那么就跳到body的末尾,reserved2[i]
reserved1.Set(exec.Case(reserved2[i].Delta(reserved1)))
}
// 类似的解析default的case
p.code.Block(exec.Default)
bctx := evalDocCode(e, p, defaultCode)
bctx.MergeTo(&p.bctx)

end := p.code.Len()
for i := 0; i < caseArity; i++ {
// 设置跳转到末尾的指令
reserved2[i].Set(exec.Jmp(end - reserved2[i].Next()))
}
// 设置break指令的跳转地址.
// 并把旧bctx换回来,也就是说break,continue
// 跳转范围就终止在 switch 的作用域内.
p.bctx.MergeSw(&old, end)
}

比如下面的代码进行转化的话

1
2
3
4
5
6
7
8
9
10
11
12
13
x=1
switch(x){
case 1:
x=4
break
case 2:
x=2
break
case 3:
x=3
break
}

就跳到执行块的末尾,最后的翻译结果就是下面这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
==> 0000: Var &{x} // 变量x
==> 0001: Push &{1} // 压入1
==> 0002: AssignEx 0 // x=1
==> 0003: Ref &{x} // 引用x
==> 0004: Push &{1} // 压入1
==> 0005: Case 5 // case是自己定义的字节码,等于pop 1 再和当前栈顶的x比较,如果成功向下跳转5
==> 0006: Var &{x} // 引用x
==> 0007: Push &{4} // 压入4
==> 0008: AssignEx 0 // x=4
==> 0009: Jmp 16 // break 跳到结尾
==> 0010: Jmp 15 // case 不会继续执行,也是跳到结尾
==> 0011: Push &{2} // 后面是类似的
==> 0012: Case 5
==> 0013: Var &{x}
==> 0014: Push &{2}
==> 0015: AssignEx 0
==> 0016: Jmp 9
==> 0017: Jmp 8
==> 0018: Push &{3}
==> 0019: Case 5
==> 0020: Var &{x}
==> 0021: Push &{3}
==> 0022: AssignEx 0
==> 0023: Jmp 2
==> 0024: Jmp 1
==> 0025: Pop 0

比起汇编,我们定义的字节码稍微高级一点不需要构造跳转表,而是用Case指令替代,和栈顶的值比较,如果为true就顺序执行,不然就会跳转
相对距离的位置,到这里为止,我们的转换就结束了.

最后总结一下,现代编译器的实现已经非常方便了,前端有lex,yacc,后端有llvm,比如object-c和clang,以及rust的后端都是llvm,所以llvm还是很靠谱的,而前端手写或者用工具生成各有利弊,但是并不是什么决定因素.

而这里实现的qlang主要是优势是短小精悍无依赖,全部前后端自行实现,可以作为Golang的项目的嵌入脚本语言,类似于lua.

[1] https://github.com/qiniu/qlang
[2] http://llvm.org/
[3] http://eli.thegreenplace.net/2011/09/06/stack-frame-layout-on-x86-64/

每一种语言都会有一个定义良好的语法结构.函数是由申明和语句构成的,而语句又是由表达式构成的.
经常用来描述语法的是BNF[1].Go使用的是相应的变种,在Go的官方文档中有很详细的spec描述[2].一门语言的设计其实就在这份描述当中,这是一门语言的语法和规则的定义,是表面程序员可以接触到的部分,而运行时却可以改变,这相当于和程序员约定的接口,只要按照这个接口编写源代码,就能产生正常可以编译的二进制文件,但是最后的二进制文件如何运行,对于每条语法转换成了什么,有什么优化都是编译器优化和运行时的工作.所以一门语言必须有一个详尽的描述,这和一个网络协议一样,是非常重要的部分.

语法分析器也是有工具可以自动生成的,比如yacc[3].我在之前提到过使用工具的利弊,就不赘述了.

本文主要看一下Go的语法分析是如何进行.Go的parser接受的输入是源文件,内嵌了一个scanner,最后把scanner生成的token变成一颗抽象语法树(AST).
编译时的错误也是在这个时候报告的,但是大部分编译器编译时的错误系统并不是很完美,有时候报的错误文不对题,这主要是因为写对的方式有几种
但是写错的方式有很多种,编译器只能把一些错误进行归类,并且指出当前认为可疑的地方,并不能完完全全的知道到底是什么语法错误.这个需要结合给出的错误进行判断,clang作为一个C编译器做得好很多,这都是开发者不断地添加错误处理的结果,比gcc的报错完善很多.然而Go的编译时的错误处理也是秉承了gcc的风格,并不明确,但是会指出可疑的地方,在大多数场景下或者对语言标准熟悉的情况下也不是很麻烦.
下面看一下Go是怎么定义这些语法结构.这些结构都在go/ast当中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// All node types implement the Node interface.
type Node interface {
Pos() token.Pos // position of first character belonging to the node
End() token.Pos // position of first character immediately after the node
}

// All expression nodes implement the Expr interface.
type Expr interface {
Node
exprNode()
}

// All statement nodes implement the Stmt interface.
type Stmt interface {
Node
stmtNode()
}

// All declaration nodes implement the Decl interface.
type Decl interface {
Node
declNode()
}

语法有三个主体,表达式(expression),语句(statement),声明(declaration),Node是基类,用于标记该节点的位置的开始和结束.
而三个主体的函数没有实际意义,只是用三个interface来划分不同的语法单位,如果某个语法是Stmt的话,就实现一个空的stmtNode函数即可.
这样的好处是可以对语法单元进行comma,ok来判断类型,并且保证只有这些变量可以赋值给对应的interface.但是实际上这个划分不是很严格,比如

1
2
3
4
func (*ArrayType) exprNode()     {}
func (*StructType) exprNode() {}
func (*FuncType) exprNode() {}
func (*InterfaceType) exprNode() {}

就是类型,但是属于Expr,而真正的表达式比如

1
2
func (*BasicLit) exprNode()       {}
func (*FuncLit) exprNode() {}

是可以赋值给Exprt的.

了解了这个设计,再来看整个内容其实就是定义了源文件中可能出现的语法结构.列表如下,这个列表很长,扫一眼就可以,具体可以再回来看.

  1. 普通Node,不是特定语法结构,属于某个语法结构的一部分.
    • Comment 表示一行注释 // 或者 /* */
    • CommentGroup 表示多行注释
    • Field 表示结构体中的一个定义或者变量,或者函数签名当中的参数或者返回值
    • FieldList 表示以”{}”或者”()”包围的Filed列表
  2. Expression & Types (都划分成Expr接口)
    • BadExpr 用来表示错误表达式的占位符
    • Ident 比如报名,函数名,变量名
    • Ellipsis 省略号表达式,比如参数列表的最后一个可以写成arg...
    • BasicLit 基本字面值,数字或者字符串
    • FuncLit 函数定义
    • CompositeLit 构造类型,比如{1,2,3,4}
    • ParenExpr 括号表达式,被括号包裹的表达式
    • SelectorExpr 选择结构,类似于a.b的结构
    • IndexExpr 下标结构,类似这样的结构 expr[expr]
    • SliceExpr 切片表达式,类似这样 expr[low:mid:high]
    • TypeAssertExpr 类型断言类似于 X.(type)
    • CallExpr 调用类型,类似于 expr()
    • StarExpr *表达式,类似于 *X
    • UnaryExpr 一元表达式
    • BinaryExpr 二元表达式
    • KeyValueExp 键值表达式 key:value
    • ArrayType 数组类型
    • StructType 结构体类型
    • FuncType 函数类型
    • InterfaceType 接口类型
    • MapType map类型
    • ChanType 管道类型
  3. Statements
    • BadStmt 错误的语句
    • DeclStmt 在语句列表里的申明
    • EmptyStmt 空语句
    • LabeledStmt 标签语句类似于 indent:stmt
    • ExprStmt 包含单独的表达式语句
    • SendStmt chan发送语句
    • IncDecStmt 自增或者自减语句
    • AssignStmt 赋值语句
    • GoStmt Go语句
    • DeferStmt 延迟语句
    • ReturnStmt return 语句
    • BranchStmt 分支语句 例如break continue
    • BlockStmt 块语句 {} 包裹
    • IfStmt If 语句
    • CaseClause case 语句
    • SwitchStmt switch 语句
    • TypeSwitchStmt 类型switch 语句 switch x:=y.(type)
    • CommClause 发送或者接受的case语句,类似于 case x <-:
    • SelectStmt select 语句
    • ForStmt for 语句
    • RangeStmt range 语句
  4. Declarations
    • Spec type
      • Import Spec
      • Value Spec
      • Type Spec
    • BadDecl 错误申明
    • GenDecl 一般申明(和Spec相关,比如 import “a”,var a,type a)
    • FuncDecl 函数申明
  5. Files and Packages
    • File 代表一个源文件节点,包含了顶级元素.
    • Package 代表一个包,包含了很多文件.

上面就是整个源代码的所有组成元素,接下来就来看一下语法分析是如何进行的,也就是最后的AST是如何构建出来的.

先看一下parser结构体的定义,parser是以file为单位的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// The parser structure holds the parser's internal state.
type parser struct {
file *token.File
errors scanner.ErrorList // 解析过程中遇到的错误列表
scanner scanner.Scanner // 词法分析器.

// Tracing/debugging
mode Mode // parsing mode // 解析模式
trace bool // == (mode & Trace != 0)
indent int // indentation used for tracing output

// Comments 列表
comments []*ast.CommentGroup
leadComment *ast.CommentGroup // last lead comment
lineComment *ast.CommentGroup // last line comment

// Next token
pos token.Pos // token position
tok token.Token // one token look-ahead
lit string // token literal

// Error recovery
// (used to limit the number of calls to syncXXX functions
// w/o making scanning progress - avoids potential endless
// loops across multiple parser functions during error recovery)
syncPos token.Pos // last synchronization position 解析错误的同步点.
syncCnt int // number of calls to syncXXX without progress

// Non-syntactic parser control
// 非语法性的控制
// <0 在控制语句中, >= 在表达式中.
exprLev int // < 0: in control clause, >= 0: in expression
// 正在解析右值表达式
inRhs bool // if set, the parser is parsing a rhs expression

// Ordinary identifier scopes
pkgScope *ast.Scope // pkgScope.Outer == nil
topScope *ast.Scope // top-most scope; may be pkgScope
unresolved []*ast.Ident // unresolved identifiers
imports []*ast.ImportSpec // list of imports

// Label scopes
// (maintained by open/close LabelScope)
labelScope *ast.Scope // label scope for current function
targetStack [][]*ast.Ident // stack of unresolved labels
}

解析的入口是ParseFile,首先调用init,再调用parseFile进行解析.
整个解析是一个递归向下的过程也就是最low但是最实用的手写实现的方式.像yacc[4]生成的是我们编译里学的LALR[5]文法,牛逼的一逼,但是
gcc和Go都没用自动生成的解析器,也就是手写个几千行代码的事,所以为了更好的掌握编译器的细节,都选择了手写最简单的递归向下的方式.

通过init初始化scanner等.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (p *parser) init(fset *token.FileSet, filename string, src []byte, mode Mode) {
p.file = fset.AddFile(filename, -1, len(src))
var m scanner.Mode
if mode&ParseComments != 0 {
m = scanner.ScanComments
}
// 错误处理函数是在错误列表中添加错误.
eh := func(pos token.Position, msg string) { p.errors.Add(pos, msg) }
p.scanner.Init(p.file, src, eh, m)

p.mode = mode
p.trace = mode&Trace != 0 // for convenience (p.trace is used frequently)

p.next()
}

parseFile的简化流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
       // package clause
// 获取源文件开头的doc注释,从这里递归向下的解析开始了
doc := p.leadComment
// expect 从scanner获取一个token,并且返回位置pos.
pos := p.expect(token.PACKAGE)
// parseIdent 获取一个token并且转化为indent,如果不是报错.
ident := p.parseIdent()
if ident.Name == "_" && p.mode&DeclarationErrors != 0 {
p.error(p.pos, "invalid package name _")
}
// 作用域开始,标记解释器当前开始一个新的作用域
p.openScope()
// pkgScope 就是现在进入的作用域
p.pkgScope = p.topScope
// 解析 import 申明
for p.tok == token.IMPORT {
// parseGenDecl解析的是
// import (
// )
// 这样的结构,如果有括号就用parseImportSpec解析列表
// 没有就单独解析.
// 而parseImportSpec解析的是 一个可选的indent token和一个字符串token.
// 并且加入到imports列表中.
decls = append(decls, p.parseGenDecl(token.IMPORT, p.parseImportSpec))
}
// 解析全局的申明,包括函数申明
if p.mode&ImportsOnly == 0 {
// rest of package body
for p.tok != token.EOF {
decls = append(decls, p.parseDecl(syncDecl))
}
}
// 标记从当前作用域离开.
p.closeScope()
// 最后返回ast.File文件对象.
return &ast.File{
Doc: doc,
Package: pos,
Name: ident,
Decls: decls,
Scope: p.pkgScope,
Imports: p.imports,
Unresolved: p.unresolved[0:i],
Comments: p.comments,
}

看一下parseDecl主要是根据类型的不同调用不同的解析函数,parseValueSpec解析Value类型,parseTypeSpec解析Type类型,parseFuncDecl解析函数.
解析定义和解析类型的都是解析了,类似于var|type ( ident valueSpec|typeSpec)的token结构.因为parseFuncDecl里面也会解析这些内容,所以直接从函数解析来看也可以.
因为外一层的top scope其实就是相当于一个抽象的函数作用域而已,这样是为什么lennew这样的内嵌函数在函数内是可以做变量名的原因,因为可以在子作用域覆盖top作用域.整个解析过程简化过程如下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 解析一个func.
pos := p.expect(token.FUNC)
// 开一个新的作用域,topScope作为父Scope.
scope := ast.NewScope(p.topScope) // function scope
// 解析一个ident作为函数名
ident := p.parseIdent()
// 解析函数签名,也就是参数和返回值
params, results := p.parseSignature(scope)
// 再解析body
body = p.parseBody(scope)
// 最后返回函数申明.
decl := &ast.FuncDecl{
Doc: doc,
Recv: recv,
Name: ident,
Type: &ast.FuncType{
Func: pos,
Params: params,
Results: results,
},
Body: body,
}

解析参数和返回值就是解析(filed,filed)这样的格式,每个filed是indent type的token,最后构造成函数签名.然后来到parseBody,这个函数其实就是解析了左右花括号,然后向下开始解析Statement列表,类似于body -> { stmt_list },然后进入stmt_list的解析,不断地解析statement.

1
2
3
for p.tok != token.CASE && p.tok != token.DEFAULT && p.tok != token.RBRACE && p.tok != token.EOF {
list = append(list, p.parseStmt())
}

parseStmt最后会进入到语句的解析,然后根据不同的token选择进入不同的解析流程,比如看到var,type,const就是申明,碰到标识符和数字等等可能就是单独的表达式,
如果碰到go,就知道是一个go语句,如果看到defer和return都能判断出相应的语句并按规则解析,看到break等条件关键字就解析条件语句,看到{就解析块语句.都是可以递归去解析的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (p *parser) parseStmt() (s ast.Stmt) {
if p.trace {
defer un(trace(p, "Statement"))
}

switch p.tok {
case token.CONST, token.TYPE, token.VAR:
s = &ast.DeclStmt{Decl: p.parseDecl(syncStmt)}
case
// tokens that may start an expression
token.IDENT, token.INT, token.FLOAT, token.IMAG, token.CHAR, token.STRING, token.FUNC, token.LPAREN, // operands
token.LBRACK, token.STRUCT, token.MAP, token.CHAN, token.INTERFACE, // composite types
token.ADD, token.SUB, token.MUL, token.AND, token.XOR, token.ARROW, token.NOT: // unary operators
s, _ = p.parseSimpleStmt(labelOk)
// because of the required look-ahead, labeled statements are
// parsed by parseSimpleStmt - don't expect a semicolon after
// them
if _, isLabeledStmt := s.(*ast.LabeledStmt); !isLabeledStmt {
p.expectSemi()
}
case token.GO:
s = p.parseGoStmt()
case token.DEFER:
s = p.parseDeferStmt()
case token.RETURN:
s = p.parseReturnStmt()
case token.BREAK, token.CONTINUE, token.GOTO, token.FALLTHROUGH:
s = p.parseBranchStmt(p.tok)
case token.LBRACE:
s = p.parseBlockStmt()
...省略

举个例子看一下parseSimpleStmt()的简化流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
       // 解析左列表 一般是 l := r 或者 l1,l2 = r1,r2 或者 l <- r 或者 l++
x := p.parseLhsList()
switch p.tok {
case
token.DEFINE, token.ASSIGN, token.ADD_ASSIGN,
token.SUB_ASSIGN, token.MUL_ASSIGN, token.QUO_ASSIGN,
token.REM_ASSIGN, token.AND_ASSIGN, token.OR_ASSIGN,
token.XOR_ASSIGN, token.SHL_ASSIGN, token.SHR_ASSIGN, token.AND_NOT_ASSIGN:
// 如果看到range,range作为一种运算符按照range rhs来解析
// 如果没看到就按正常赋值语句解析 lhs op rhs 来解析op可以是上面那些token中的一种.
pos, tok := p.pos, p.tok
p.next()
var y []ast.Expr
isRange := false
if mode == rangeOk && p.tok == token.RANGE && (tok == token.DEFINE || tok == token.ASSIGN) {
pos := p.pos
p.next()
y = []ast.Expr{&ast.UnaryExpr{OpPos: pos, Op: token.RANGE, X: p.parseRhs()}}
isRange = true
} else {
y = p.parseRhsList()
}
as := &ast.AssignStmt{Lhs: x, TokPos: pos, Tok: tok, Rhs: y}

// 碰到":"找一个ident, 构成 goto: indent 之类的语句.
case token.COLON:
colon := p.pos
p.next()
if label, isIdent := x[0].(*ast.Ident); mode == labelOk && isIdent {
// Go spec: The scope of a label is the body of the function
// in which it is declared and excludes the body of any nested
// function.
stmt := &ast.LabeledStmt{Label: label, Colon: colon, Stmt: p.parseStmt()}
p.declare(stmt, nil, p.labelScope, ast.Lbl, label)
return stmt, false
}
// 碰到"<-",就构成 <- rhs 这样的语句.
case token.ARROW:
// send statement
arrow := p.pos
p.next()
y := p.parseRhs()
return &ast.SendStmt{Chan: x[0], Arrow: arrow, Value: y}, false

// 碰到"++"或者"--"就构成一个单独的自增语句.
case token.INC, token.DEC:
// increment or decrement
s := &ast.IncDecStmt{X: x[0], TokPos: p.pos, Tok: p.tok}
p.next()
return s, false
}

接下来就不一一解释每段代码了,具体情况具体看就可以.这里举个例子.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"go/ast"
"go/parser"
"go/token"
)

func main() {
fset := token.NewFileSet()
f, err := parser.ParseFile(fset, "", `
package main
func main(){
// comments
x:=1
go println(x)

}
`, parser.ParseComments)
if err != nil {
panic(err)
}
ast.Print(fset, f)
}

产生的结果是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
 0  *ast.File {
1 . Package: 2:1 |PACKAGE token
2 . Name: *ast.Ident { |IDENT token
3 . . NamePos: 2:9 |
4 . . Name: "main" |
5 . } |整个构成了顶部的 package main
6 . Decls: []ast.Decl (len = 1) { |最上层的申明列表
7 . . 0: *ast.FuncDecl { |func main的函数申明
8 . . . Name: *ast.Ident { |IDENT token
9 . . . . NamePos: 3:6 |
10 . . . . Name: "main" |
11 . . . . Obj: *ast.Object { |Objec是一个用于表达语法对象的结构
12 . . . . . Kind: func |表示之前存在过,Decl指向了7,也就是第7行的FuncDecl.
13 . . . . . Name: "main" |
14 . . . . . Decl: *(obj @ 7) |
15 . . . . } |
16 . . . } |
17 . . . Type: *ast.FuncType { |函数类型,也就是函数签名
18 . . . . Func: 3:1 |参数和返回值都是空的
19 . . . . Params: *ast.FieldList { |
20 . . . . . Opening: 3:10
21 . . . . . Closing: 3:11
22 . . . . }
23 . . . }
24 . . . Body: *ast.BlockStmt { |块语句,也就是main的body
25 . . . . Lbrace: 3:12
26 . . . . List: []ast.Stmt (len = 2) { |语句列表
27 . . . . . 0: *ast.AssignStmt { |赋值语句
28 . . . . . . Lhs: []ast.Expr (len = 1) { |左值是x
29 . . . . . . . 0: *ast.Ident {
30 . . . . . . . . NamePos: 5:2 |
31 . . . . . . . . Name: "x"
32 . . . . . . . . Obj: *ast.Object { |
33 . . . . . . . . . Kind: var
34 . . . . . . . . . Name: "x" |
35 . . . . . . . . . Decl: *(obj @ 27)
36 . . . . . . . . }
37 . . . . . . . } |
38 . . . . . . }
39 . . . . . . TokPos: 5:3 |:=和它的位置
40 . . . . . . Tok: :=
41 . . . . . . Rhs: []ast.Expr (len = 1) { |右边是一个数字类型的token
42 . . . . . . . 0: *ast.BasicLit {
43 . . . . . . . . ValuePos: 5:5
44 . . . . . . . . Kind: INT
45 . . . . . . . . Value: "1"
46 . . . . . . . }
47 . . . . . . }
48 . . . . . }
49 . . . . . 1: *ast.GoStmt { |接下来是go语句
50 . . . . . . Go: 6:2
51 . . . . . . Call: *ast.CallExpr { |一个调用表达式
52 . . . . . . . Fun: *ast.Ident { |IDENT token是println
53 . . . . . . . . NamePos: 6:5
54 . . . . . . . . Name: "println"
55 . . . . . . . }
56 . . . . . . . Lparen: 6:12 |左括号的位置
57 . . . . . . . Args: []ast.Expr (len = 1) { |参数列表
58 . . . . . . . . 0: *ast.Ident { |是一个符号INDENT,并且指向的是32行的x
59 . . . . . . . . . NamePos: 6:13
60 . . . . . . . . . Name: "x"
61 . . . . . . . . . Obj: *(obj @ 32)
62 . . . . . . . . }
63 . . . . . . . }
64 . . . . . . . Ellipsis: -
65 . . . . . . . Rparen: 6:14 |右括号的位置
66 . . . . . . }
67 . . . . . }
68 . . . . }
69 . . . . Rbrace: 8:1
70 . . . }
71 . . }
72 . }
73 . Scope: *ast.Scope { |最顶级的作用域
74 . . Objects: map[string]*ast.Object (len = 1) {
75 . . . "main": *(obj @ 11)
76 . . }
77 . }
78 . Unresolved: []*ast.Ident (len = 1) { |这里有个没有定义的符号println,是因为是内置符号,会另外处理
79 . . 0: *(obj @ 52) |从源文件上是表现不出来的.
80 . }
81 . Comments: []*ast.CommentGroup (len = 1) { |评论列表,以及位置和内容.
82 . . 0: *ast.CommentGroup {
83 . . . List: []*ast.Comment (len = 1) {
84 . . . . 0: *ast.Comment {
85 . . . . . Slash: 4:2
86 . . . . . Text: "// comments"
87 . . . . }
88 . . . }
89 . . }
90 . }
91 }

这就是Go的整个语法分析和最后产生的语法树的结构.

废话说了这么多其实实现很简单,问题是如何把一个语言的spec定义好,很重要,早期语言设计不是很固定的.都是慢慢尝试不断改进的过程.最早的一次spec文档[6]其实和现在差了很多很多.就是把TOKEN记号流从左至右匹配规则(可能会向前看几个token),然后递归解析语法树,最后得到AST.
我在我的字符画转换器里用的也是类似的方式[7],做了自顶向下递归解析语法的方式,但是错误处理都是速错,不会做错误恢复找到一个可以同步的节点继续分析.
所以这里补充一点,Go是如何进行错误处理的同步问题,寄希望于能够向使用者提供更多的错误.主要是parser当中的两个结构

1
2
syncPos token.Pos // last synchronization position
syncCnt int // number of calls to syncXXX without progress

syncPos错误的同步位置,也就类似于游戏的存档点,如果发生错误那就从这个地方开始跳过(BadStmt|BadExpr)继续解析,在每次完成语句,申明或者表达式的解析之后就会保存一个同步点.虽然这种继续解析的行为不一定能够给出很精确的错误提示,但的确够用了.当然如果错误实在太多了,从同步点恢复也没有太大意义,就会主动放弃,所以记录了没有成功解析而同步的次数.

因为之前造过轮子了,所以我发现其实编译器的前端用手写是一个很繁琐并且需要花很多时间去做的一件事情,如果语言有设计良好,那么也至少需要花实现的时间,如果设计不好,实现也要跟着修修补补,那就更麻烦,虽然整个编译器的前端也就不到万行代码,但是的确是很考验耐心的一件事情,而且用递归向下的方式解析也没什么效率问题,编译器编译慢一点也不是很要紧,所以有轮子还是用轮子吧,这只是一件苦力活,的确没什么高科技.

最后附带一个用Go实现的Go语法的子集的动态语言版本,只有几十行.

https://gist.github.com/ggaaooppeenngg/dff0fff8f0c9194d93c70550d50edbfa

参考:

  1. http://en.wikipedia.org/wiki/Backus%E2%80%93Naur_Form
  2. https://golang.org/ref/spec
  3. https://zh.wikipedia.org/wiki/Yacc
  4. https://zh.wikipedia.org/zh-cn/Yacc
  5. https://zh.wikipedia.org/wiki/LALR语法分析器
  6. https://github.com/golang/go/commit/18c5b488a3b2e218c0e0cf2a7d4820d9da93a554

词法分析一般是编译器的第一部分,而且词法分析很简单,就是一个有限状态机.
开始词法分析的过程就是把源文件转换成一组预先定义好的token的过程.
这一组被统一表现的token之后会被送入语法分析器进行语法解析,这里我们主要关注如何进行词法分析.

做词法分析就几种方法:

  1. 直接使用工具比如lex.
  2. 使用更低一层的正则表达式.
  3. 使用状态动作,构造一个状态机.

而真正实现一个语言的话,使用工具没有什么错,但是问题是,很难获得正确的错误提示.
工具生成的错误处理很弱.而且需要学习另一门规则或特定的语法.生成的代码可能性能不好,难以优化,但是用工具可以非常简单实现词法分析.
早期编译器的设计阶段都会使用lex来做词法分析器,比如gcc和Go都是这么做的,但是到了后期一个真正生产化的语言可能不能依赖生成的代码,而需要做自己特定的修改和优化,所以自己实现一个词法分析器就显得比较重要了.

正则表达被人诟病的一个话题就是效率问题,比如perl拥有功能最强大的正则表达式,但是整个正则表达式引擎的效率却很低,Go在这方面牺牲了一些正则表达式的特性来保证正则表达式的效率不至于过低,但是正则表达式对于大量文本处理体现的弱势却是很明显的.因为可能我们要处理的状态其实不需要一个繁重的正则表达来解决.

其实实现一个词法分析器很简单,而且这种技能是基本不会变的,如果写过一次,以后都是同样的实现方式.

先看一下Go的实现,在Go的源码下面go/token/token.go目录里面是这么定义token的.

1
2
// Token is the set of lexical tokens of the Go programming language.
type Token int

其实就是个枚举类型,对于每种类型的字面值都有对应的token.
实际上这个只能算是一个token的类型.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

// The list of tokens.
const (
// Special tokens
ILLEGAL Token = iota
EOF
COMMENT

literal_beg
// Identifiers and basic type literals
// (these tokens stand for classes of literals)
IDENT // main
INT // 12345
FLOAT // 123.45
IMAG // 123.45i
CHAR // 'a'
STRING // "abc"
// 省略
)

枚举所有可以碰到的token类型.

go/token/position.go 当中是关于token位置相关的定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
// -----------------------------------------------------------------------------
// Positions

// Position describes an arbitrary source position
// including the file, line, and column location.
// A Position is valid if the line number is > 0.
//
type Position struct {
Filename string // filename, if any
Offset int // offset, starting at 0
Line int // line number, starting at 1
Column int // column number, starting at 1 (byte count)
}

这个很简单就是标示在文件中的位置,比较有意思的是Pos的定义type Pos int,这是位置标示的紧凑表示.接下来看看Pos和Position之间是如何转换的.

首先定义了一个FileSet,可以理解为把File的内容字节按顺序存放的一个大数组,而某个文件则属于数组的一个区间[base,base+size]中,base是文件的第一个字节在大数组中的位置,size是这个文件的大小,某个文件中的Pos是在[base,base+size]这个区间里的一个小标.

所以最后Pos能够压缩成一个整数来表示一个文件当中的位置,当需要使用的使用再从FileSet中转换出完整的Position对象.

go/token/serialize.go 是对FileSet序列化,这里就略过了.

所以整个go/token包只是对token的一些定义和转化,词法分析的部分在go/scanner当中.

scan的主流程如下,主体是一个switch case表示的状态机,
比如碰到字符那么扫描到不为字符为止就作为一个标识符,比如碰到数字那么可能按照扫描数字,然后向后看一次小数字再扫描数字,直到没有数字为止.
scan每次会返回一个被扫描的token,压缩表示的位置,和字面值的字符串,这样就能够把一个源文件转化成一个token的记号流,也就是tokenize或者lexical analysis的过程.

func (s *Scanner) Scan() (pos token.Pos, tok token.Token, lit string) {
scanAgain:
        s.skipWhitespace() 
        // current token start
        pos = s.file.Pos(s.offset)
        // determine token value
        insertSemi := false
        switch ch := s.ch; {
    /* 字符开头,开始扫描标识符 */
        case isLetter(ch):
                lit = s.scanIdentifier()
                if len(lit) > 1 {
                        // keywords are longer than one letter - avoid lookup otherwise
                        tok = token.Lookup(lit)
                        switch tok {
                        case token.IDENT, token.BREAK, token.CONTINUE, token.FALLTHROUGH, token.RETURN:
                                insertSemi = true
                        }
                } else {
                        insertSemi = true
                        tok = token.IDENT
                } 
    /* 数字开头,扫描数字 */
        case '0' <= ch && ch <= '9':
                insertSemi = true
                tok, lit = s.scanNumber(false)
        default:

1
2
3

看一下例子的结果.

func ExampleScanner_Scan() { // src is the input that we want to tokenize. // 需要记号化的源文件 src := []byte("cos(x) + 1i*sin(x) // Euler") // Initialize the scanner. var s scanner.Scanner fset := token.NewFileSet() // positions are relative to fset // 添加到文件集合中 file := fset.AddFile("", fset.Base(), len(src)) // register input "file" // 初始化scanner s.Init(file, src, nil /* no error handler */, scanner.ScanComments) // Repeated calls to Scan yield the token sequence found in the input. for { pos, tok, lit := s.Scan() if tok == token.EOF { break } fmt.Printf("%s\t%s\t%q\n", fset.Position(pos), tok, lit) } // 不断扫描就能得到如下结果 // 词法分析就是做这样一件事情. // output: // 1:1 IDENT "cos" // 1:4 ( "" // 1:5 IDENT "x" // 1:6 ) "" // 1:8 + "" // 1:10 IMAG "1i" // 1:12 * "" // 1:13 IDENT "sin" // 1:16 ( "" // 1:17 IDENT "x" // 1:18 ) "" // 1:20 ; "\n" // 1:20 COMMENT "// Euler" } ``` 我在我的数据结构字符画生成工具[1]里面就实现了一个词法分析器,方便我用简单的语法构造一个字符画,然后插入到注释中辅助解释. 唯一的不同的是,我使用了channel读取token记号,来增加并发,而go本身的记号化是串行的,当然,这点区别其实没有多大,而且这个设计 在Go的模板包里面使用了,Rob Pike也有过相关的演讲[2]. 1. https://github.com/ggaaooppeenngg/cpic/blob/master/lex.go 2. http://cuddle.googlecode.com/hg/talk/lex.html#landing-slide

commit message

git commit message 是我们经常要用到的东西,但是大部分人都写得很随便,翻 git log 看到的信息也是不明觉厉,这里结合其他资料阐述一下如何写好一个 commit message。

其实 git commit message 对于不同的项目有不同的要求。 比如 Angular 的 commit message
标题要加分类,比如是属于 refactor 还是 docs , 还有影响范围(例如 属于全部、*, 还是编译器 $compiler),但是在 Linux Kernel 和类似的 GNU style 的项目里面是类似这样的 commit header。

“mm, tracing: unify mm flags handling in tracepoints and printk”

如果没有子系统要求开头大写,如果有涉及的子系统就说明涉及的子系统并且跟上冒号做解释,冒号后的小写。这个根据具体的场景有不同的要求,但是核心就是简洁明了,另外要使用祈使句。

这样的好处,就能通过git log --pretty=format:%s来遍历所有的 log message,对于修改一目了然。

commit header

第一条规则因项目而异。
比如 Angular 的规则是:<type>(<scope>): <subject>

type(必需),scope(可选)和 subject(必需)

angular 要求的 type 有几种

  • feat: 新功能 (feature)
  • fix: 修补 bug
  • docs: 文档 (documentation)
  • style: 格式(不影响代码运行的变动)
  • refactor: 重构(即不是新增功能,也不是修改 bug 的代码变动)
  • test: 增加测试
  • chore: 构建过程或辅助工具的变动

scope 表示影响的范围,subject 开头小写,使用祈使句,结尾不用。号。

但是在 runc 里面要求比较简单,使用祈使句,并且开头大写就可以,结尾不带。号。

kernel 的 subject 如果带上子系统比如 (mm),就要 mm: 开头,然后不用大写。
不然也要开头大写。总之 subject 的部分要求根据项目不同有一些不同,但是 body 部分要求是类似的。

commit body

body 要和 header 空格一行,每行不要超过 72 个字符,并且对改动进行解释。

例如:

1
2
3
4
5
6
7
8
9
10
在 50 个字符内概括你的改动标题

更详细的解释,限制到 72 个字符,和标题隔开一行。
有什么副作用或者后遗症也要指出。

如果有任务管理器,可以在结尾指定。

Resolves: #123
See also: #456, #789

例子

Angular

1
2
3
4
5
6
7
8
fix($compile): couple of unit tests for IE9

Older IEs serialize html uppercased, but IE9 does not...
Would be better to expect case insensitive, unfortunately jasmine does
not allow to user regexps for throw expectations.

Closes #392
Breaks foo.bar api, foo.baz should be used instead

GNU style

1
2
3
4
5
6
7
8
9
10
11
Author: Aleksa Sarai <asarai@suse.de>
Date: Sun Mar 13 04:53:20 2016 +1100

libcontainer: add pids.max to PidsStats

In order to allow nice usage statistics (in terms of percentages and
other such data), add the value of pids.max to the PidsStats struct
returned from the pids cgroup controller.

Signed-off-by: Aleksa Sarai <asarai@suse.de>

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Author: Alexander Morozov <lk4d4@docker.com>
Date: Fri Mar 27 10:50:32 2015 -0700

Use syscall.Kill instead of p.cmd.Process.Kill

We need this to unmask syscall.ESRCH error, which handled in docker and
can be handled by other clients.

Closes #457

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
Acked-by: Hugh Dickins <hughd@google.com>
Reviewed-by: Michal Hocko <mhocko@suse.com>

在进行 code review 的时候,一般会带上签名 Signed-off-by 表示自己对这段改动负责,matainer 也会查看这段代码如果理解这段代码并且进行了测试同样对这段代码负责也要签名,如果只是保证自己觉得代码无误要使用 Reviewed-by,如果表示只是知道这个带动就带上Acked-bygit commit -s 可以自动带上 Signed-off-by

总之,一个好的 commit message 是一个好习惯,如果项目从一开始打 commit message 就很随便,那 commit log 的意义就没有了,导致根本没人会看。如果有意义整个代码就有据可查了,看起来也很漂亮。

参考:

  1. Commit message 和 Change log 编写指南
  2. Submitting patches: the essential guide to getting your code into the kernel

本文主要讨论 timer heap 在 Go 中的管理,以及运行时对于时间是如何获取的问题,从而引出一个结论,我们对于 timer 的准确度可以有多大的依赖。

首先我们看一下 Go 是如何获取时间的,找到time.Now, 发现最终调用的是下面这个汇编函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

// func now() (sec int64, nsec int32)
TEXT time·now(SB),NOSPLIT,$16
// Be careful. We're calling a function with gcc calling convention here.
// We're guaranteed 128 bytes on entry, and we've taken 16, and the
// call uses another 8.
// That leaves 104 for the gettime code to use. Hope that's enough!
// 这里只能保证调用 gettime 函数的时候有 104 个字节给这个函数作为栈
// 因为返回值用了 16 个字节,这个函数本身会用 8 个字节,到调用 gettime 的时候只剩 104 个字节可以用。
MOVQ runtime·__vdso_clock_gettime_sym(SB), AX
CMPQ AX, $0
JEQ fallback
MOVL $0, DI // CLOCK_REALTIME
LEAQ 0(SP), SI
CALL AX
MOVQ 0(SP), AX // sec
MOVQ 8(SP), DX // nsec
MOVQ AX, sec+0(FP)
MOVL DX, nsec+8(FP)
RET
fallback:
LEAQ 0(SP), DI
MOVQ $0, SI
MOVQ runtime·__vdso_gettimeofday_sym(SB), AX
CALL AX
MOVQ 0(SP), AX // sec
MOVL 8(SP), DX // usec
IMULQ $1000, DX
MOVQ AX, sec+0(FP)
MOVL DX, nsec+8(FP)
RET

第一行TEXT time·now(SB),NOSPLIT,$16, 其中time·now(SB)表示函数now的地址,NOSPLIT标志不依赖参数,$16表示返回的内容是 16 个字节,Go 的汇编语法具体可以参考 Go asm 文档 [1].

首先是取出__vdso_clock_gettime_sym(SB)(这是一个符号,指向的其实是 clock_gettime 函数,man clock_gettime 可以看到 glibc 的定义)的地址,如果符号不为空的话就把栈顶的地址计算出来传入 SI(LEA 指令). DI 和 SI 分别是system call的前两个参数的寄存器,这个相当于调用clock_gettime(0,&ret). fallback 分支是当对应的符号没有初始化就退而求其次调用gettimeofday(man gettimeofday 可以看到 libc 的定义)这个函数。

Go 的函数调用确保会有至少 128 个字节的栈(注意不是 goroutine 的栈), 具体可以参考runtime/stack.go里的_StackSmall, 但是进入对应的 c 函数以后,栈的增长就不能够由 Go 控制了,所以剩下的 104 个字节要确保这个调用不会栈溢出.(不过一般不会,因为这两个获取时间的函数不复杂).

先说函数符号的事,查阅相关资料可以看出来,VDSO 是 Virtual Dynamic Shared Object, 就是内核提供的虚拟的.so, 这个.so 文件不在磁盘上,而是在内核里头,映射到了用户空间。具体细节可以看 so 上的一个回答 [2]. 其中一个链接中有一些描述 [3].

One way of obtaining a fast gettimeofday() is by writing the current time in a fixed place, on a page mapped into the memory of all applications, and updating this location on each clock interrupt.
These applications could then read this fixed location with a single instruction - no system call required.
Concerning gettimeofday(), a vsyscall version for the x86-64 is already part of the vanilla kernel.
Patches for i386 exist. (An example of the kind of timing differences: John Stultz reports on an experiment where he measures gettimeofday() and finds 1.67 us for the int 0x80 way, 1.24 us for the sysenter way, and 0.88 us for the vsyscall.)

简单来说就是一种加速系统调用的机制加兼容模式。像gettimeofday这样的函数如果像普通系统调用一样的话,有太多的上下文切换,特别是一些频繁获取时间的程序,所以一般gettimeofday是通过这种机制调用的. 单独在用户空间映射了一段地址,在里面的是内核暴露的一些系统调用,具体可能是 syscall 或者 int 80 或者 systenter 的方式,由内核决定更快的调用方式,防止出现 glibc 版本和 kernel 版本不兼容的问题.(vDSO 是 vsyscall 的一个升级版本,避免了一些安全问题,映射不再静态固定).

从内核中可以看出获取系统调用是由时间中断更新的,其调用栈如下 [5].

Hardware timer interrupt (generated by the Programmable Interrupt Timer - PIT)
-> tick_periodic();
-> do_timer(1);
-> update_wall_time();
-> timekeeping_update(tk, false);
-> update_vsyscall(tk);

update_wall_time使用的是时钟源 (clock source) 的时间,精度能达到 ns 级别。
但是一般 Linux kernel 的时间中断是 100HZ, 高的也有 1000HZ 的,也就是说这个时间一般是在 10ms 或者 1ms 中断处理时更新一次。
从操作系统的角度看时间粒度大概是 ms 级别. 但是,这个只是一个基准值. 每次获取时间的时候还是会取得时钟源
的时间(时钟源有很多种 [9],可能是硬件的计数器,也可能就是中断的 jiffy,一般是可以达到 ns). 获取时间的精度还是能到 us 和几百 ns 之间。
因为系统调用本身还要花时间,理论上更精确的时间就不是通过这个系统调用获得了,而是需要直接用汇编指令”rdtsc”读 CPU 的 cycle 来得到精确时间。

接下来说,获取时间的函数符号的寻找过程又涉及了 ELF 的一些内容,其实就是动态链接的过程,把.so 中的函数符号的地址解析到并且存入函数指针中,比如__vdso_clock_gettime_sym, 关于链接的内容可以阅读”程序员的自我修养”这本书 [4].

其他函数例如TEXT runtime·nanotime(SB),NOSPLIT,$16也是类似的过程. 这个函数就能获取时间。

看完时间获取的过程,再来看看 Go 运行时的部分,瞧瞧timer heap是如何管理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Package time knows the layout of this structure.
// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
// For GOOS=nacl, package syscall knows the layout of this structure.
// If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer.
type timer struct {
i int // heap index

// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(now, arg) in the timer goroutine, so f must be
// a well-behaved function and not block.
when int64
period int64
f func(interface{}, uintptr)
arg interface{}
seq uintptr
}

timer 是以 heap 的形式管理的,heap 是个完全二叉树,用数组就可以存下,i 是 heap 的 index.
when 是 goroutine 被唤醒的时间,period 是唤醒的间隙,下次唤醒的时间是 when+period, 依次类推。
调用函数f(now, arg), now 是时间戳。

1
2
3
4
5
6
7
8
9
var timers struct {
lock mutex
gp *g
created bool
sleeping bool
rescheduling bool
waitnote note
t []*timer
}

整个的 timer heap 由timers管理.gp 指向的是一个g的指针,也就是调度器当中的 G 结构,一个 goroutine 的状态维护的结构,指向的是时间管理器的单独的一个 goroutine, 这个不属于用户而是运行时启动的 goroutine.(当然只有使用了 timer 才会启动,不然不会有这个 goroutine).lock保证 timers 的线程安全.waitnote 是一个条件变量。

看一下addtimer函数,他是整个计时器开始的入口,当然本身只是加了锁。

1
2
3
4
5
6
func addtimer(t *timer) {
lock(&timers.lock)
addtimerLocked(t)
unlock(&timers.lock)
}

再看addtimerLocked可以看到就是入 heap 的过程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Add a timer to the heap and start or kick the timer proc.
// If the new timer is earlier than any of the others.
// Timers are locked.
func addtimerLocked(t *timer) {
// when must never be negative; otherwise timerproc will overflow
// during its delta calculation and never expire other runtime·timers.
if t.when < 0 {
t.when = 1<<63 - 1
}
t.i = len(timers.t)
timers.t = append(timers.t, t)
siftupTimer(t.i)
if t.i == 0 {
// siftup moved to top: new earliest deadline.
if timers.sleeping {
timers.sleeping = false
notewakeup(&timers.waitnote)
}
if timers.rescheduling {
timers.rescheduling = false
goready(timers.gp, 0)
}
}
if !timers.created {
timers.created = true
go timerproc()
}
}

先从下面的if !timers.created分支看起,如果 timers 对应没有创建就 go 一个 timerproc, timeproc 的定义如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// Timerproc runs the time-driven events.
// It sleeps until the next event in the timers heap.
// If addtimer inserts a new earlier event, addtimer1 wakes timerproc early.
func timerproc() {
timers.gp = getg()
for {
lock(&timers.lock)
timers.sleeping = false
now := nanotime()
delta := int64(-1)
for {
if len(timers.t) == 0 {
delta = -1
break
}
t := timers.t[0]
delta = t.when - now
if delta > 0 {
break
}
if t.period > 0 {
// leave in heap but adjust next time to fire
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(0)
} else {
// remove from heap
last := len(timers.t) - 1
if last > 0 {
timers.t[0] = timers.t[last]
timers.t[0].i = 0
}
timers.t[last] = nil
timers.t = timers.t[:last]
if last > 0 {
siftdownTimer(0)
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
unlock(&timers.lock)
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq)
lock(&timers.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
timers.rescheduling = true
// 如果没有 timers 剩余,就让 G 进入 sleep 状态。
// goparkunlock 的作用是解开 G 和 M 的联系,让 goroutine sleep 而 M
// 寻找下一个 G 来执行。
goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
timers.sleeping = true
// 清零 waitnote.
noteclear(&timers.waitnote)
unlock(&timers.lock)
// 调用 M 结构对应的 OS-dependent, OS-thread 的信号量让 M 进入 sleep 状态。
// 只会在超时的时候或者条件变量 waitnote 触发才会被唤醒。
notetsleepg(&timers.waitnote, delta)
}
}

timeproc 的主体就是从最小堆中取出 timer 然后回调函数,如果 period 大于 0 就把 timer 的 when 修改并且存回 heap 并调整,如果小于 0 就直接删除(对应的分别是标准库的 ticker 和 timer), 进入 OS 信号量中睡眠等待下次处理,(当然可以被 waitnote 变量唤醒), 这样一直循环,说白了就是靠信号量的超时机制来实现了运行时的 sleep[8]. 然后当完全没有 timer 剩余的时候,G 结构表示的 goroutine 进入睡眠状态,承载 goroutine 的 M 结构所代表的 OS-thread 会寻找其它可运行的 goroutine 执行. 具体关于运行时调度的细节可以参考雨痕大叔的笔记 [7].

看完这一路 case, 回到addtimerLocked, 当加入一个新的timer时,会作依次检查,也就是说最新插入的timer是在堆顶的话,会唤醒睡眠的 timergorountine 开始从 heap 上检查过时的timer并执行. 这里的唤醒和之前的睡眠是两种对应的状态timers.sleeping是进入了 M 的 os 信号量睡眠。
timers.rescheduling是进入了 G 的调度睡眠,而 M 并没有睡眠,让 G 重新进入可运行状态. 时间超时和新 timer 的加入,两者结合成为了timer运行时的驱动力。

看完了 time 的实现,再回过头来回答文章最初的问题”timer 可以有多精确”, 其实和两个原因有关,一个是操作系统本身的时间粒度,一般是 us 级别的,时间基准更新是 ms 级别的,时间精度能到 us 级别,另外一个就是timer本身的 goroutine 的调度问题,如果运行时的负载过大或者操作系统本身的负载过大,会导致timer本身的 goroutine 响应不及时,导致 timer 触发并不及时,可能导致一个 20ms 的 timer 和一个 30ms 的 timer 之间像是并发的一样(这也是笔者碰到的问题,特别是一些被 cgroup 限制了的容器环境,cpu 时间分配很少的条件下), 所以有时候我们不能过分相信 timer 的时序来保证程序的正常运行.NewTimer的注释也强调了”NewTimer creates a new Timer that will send the current time on its channel after at least duration d.”, 没有人能保证 timer 按点执行,当然如果你的间隔离谱的大的话其实就可以忽略这方面的影响了:)


参考链接:

1.  Go Assembly https://golang.org/doc/asm
2.  http://stackoverflow.com/questions/19938324/what-are-vdso-and-vsyscall
3.  http://www.win.tue.nl/~aeb/linux/lk/lk-4.html
4.  http://book.douban.com/subject/3652388/
5.  http://linuxmogeb.blogspot.com/2013/10/how-does-clockgettime-work.html
6.  时钟源 http://blog.csdn.net/droidphone/article/details/7975694
7.  Go 源码剖析 https://github.com/qyuhen/book/blob/master
    /Go%201.5%20%E6%BA%90%E7%A0%81%E5%89%96%E6%9E%90.pdf
8.  带超时的信号量的实现 http://lxr.free-electrons.com/source/kernel/locking/semaphore.c#L204
9.  内核的时钟源 http://blog.csdn.net/droidphone/article/details/7975694