ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

经常提到的context都是一些在一个http请求的不同中间件之间传递参数和返回值的上下文. 标准库里的上下文稍微丰富一点, 比如流水线模型都会有一个通知退出的channel, 也一并封装进了Context, 所以既有传值功能也有流程控制的功能.

一般会把Context作为函数的第一个参数传递下去, 比如Context可以保存一个Request的id, 在打log的时候带上, 这样就可以区分出输出中同一个request的调用情况. 或者子函数select 传入的Context.Done channel用于在流水线中退出的通知等等.

Context接口有四个方法, 并且线程安全.

Done 是一个channel用于通知退出事件, 当从这个channel里读出的时候说明Context结束.
Err 返回退出的原因.
Deadline 返回一个截止时间(如果设置了的话).
Value 可以取得Context里面存得值.

Context的实现有4种, cancelCtx, emptyCtx, timerCtx, valueCtx.

valueCtx只是最简单的Ctx用来存k-v, 这类似于中间件之间传递各种值的功能, 没有Done, Err, Deadline这些方法, 是组合父Context得到的.

cancelCtx是一个带取消功能的Ctx, 返回一个cancel函数, 可以通过调用它主动取消这个Ctx, 比如错误退出的时候, 可以调用cancel函数, 他会同时调用子Ctx的cancel函数, 以此来让Context的Done起作用, cancelCtx是树形的, 有Done, Err, 但是没有Deadline, 组合了父结构获得的.

timerCtx是一个cancelCtx的封装, 由timer调用cancel函数, 并且不会超过parent的Deadline(不然就退化成cancelCtx由parent触发cancel), 有Deadline, 其余部分是组合了cancelCtx.

Context.WithCancel就是绑定一个parent, 返回一个cancelCtx的Context, 用返回的CancelFunc可以用来主动关闭Context, 比如程序错误的时候可以调用CancelFunc来让select Done的goroutine知道.

Context.WithTimer是对Context.WithDeadline的封装, Context.WithDeadline返回的是timerCtx.

Context.WithValue则仅仅是用来存值的Context.

WithXXX的函数都是要接受一个parent的, 最初的parent就用Context.Background().
它是一种emptyCtx, Err, Done(nil channel永远阻塞), Deadline, Value全都返回空值, 没有实际作用只是作为树形结构的root.

另一种emptyCtx是context.TODO(), 用于没有明确作用的不知道父Context会是什么的时候可以使用.

主要结构

为了解决外部碎片的问题, 需要有一种手段可以分配非连续的页映射到一段连续的虚拟地址上. 这就是vmalloc的目的, 本文基于4.7.2的代码对vmalloc的实现进行分析.

这里要提到一个重要的结构体

1
2
3
4
5
6
7
8
9
10
11
struct vm_struct {
struct vm_struct *next;
void *addr;
unsigned long size;
unsigned long flags;
struct page **pages;
unsigned int nr_pages;
phys_addr_t phys_addr;
const void *caller;
};

这个结构用于描述每一段映射的内存.

比起32位的结构, 64位结构有更大的寻址空间, 可以通过查看Documentation/x86/x86_64/mm.txt 获得64位的内存映射, 比起32位的变化就是跨度变大了很多.

直接映射是64TB, 有一个1TB的空洞, 接着就是32TB的vmalloc空间,以VMALLOC_START(64TB+1TB)开始,VMALLOC_END(64TB+1TB+32TB)结尾.

原来的32位时, 内核的虚拟空间只有1G, 直接映射的区间占896MB, 隔了一个8MB的空洞, 再接着是vmalloc的映射区, 大小一般是128MB.

vmalloc和物理映射的空洞主要是一个安全区, 如果越过了这个hole说明程序有问题存在非法的访问.

vm_struct的组织是通过链表实现的,
其中next指向下一个vm_struct, addr指向的是虚拟地址, size对应分配的内存的大小,在有guard page(也就是flag的VM_NO_GUARD没有设置的话)的时候会多出一个PAGE_SIZE的大小(但没有映射内存), pages是页的地址, nr_pages是页的数量, phy_addr这个字段一般为0,当用于ioremap的时候会用到, caller是调用者的地址.

上面说了, 每个vm_struct代表的区间之间会有一PAGE_SIZE大小的分隔,也是为了能够方便检查非法越界的情况.

这是32位的一张分布图, 整体结构没变, 就是64位的区间变大了.

vmap_area用于代表虚拟地址的区间, 和vm_struct是有对应关系的.
list 是按地址排序存的线性链表节点.
rb_node 是按地址存的红黑树节点.
这两个值都是在插入的时候修改的.
也就是为了方便搜索,它既存在红黑树上也存在链表里面.

1
2
3
4
5
6
7
8
9
10
11
12
struct vmap_area {
unsigned long va_start;
unsigned long va_end;
unsigned long flags;
struct rb_node rb_node; /* address sorted rbtree */
struct list_head list; /* address sorted list */
struct llist_node purge_list; /* "lazy purge" list */
struct vm_struct *vm;
struct rcu_head rcu_head;
};


vmap_area 虚拟地址的管理

接下了说一下调用过程, 首先vmalloc会调用__vmalloc_node_flags,这是一层封装,指定了GFP的flag, 接着调用__vmalloc_node,它通过指定搜寻的区域(VMALLOC_START,VMALLOC_END)调用了__vmalloc_node_range, 然后通过__get_vm_area_node寻找合适的虚拟内存空间,并且分配vm_struct,最后调用__vmalloc_area_node根据vm_struct分配具体的物理页并在页表中建立虚拟地址的映射.

__get_vm_area_nodesize进行对齐, 分配vmap_area的内存, 然后增加一个PAGE_SIZE的guard page, 接着就是寻找一段合适的虚拟地址的区间来分配给vmap_area并且用它来初始化vm_struct.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static struct vm_struct *__get_vm_area_node(unsigned long size,
unsigned long align, unsigned long flags, unsigned long start,
unsigned long end, int node, gfp_t gfp_mask, const void *caller)
{
struct vmap_area *va;
struct vm_struct *area;

BUG_ON(in_interrupt());
if (flags & VM_IOREMAP)
align = 1ul << clamp_t(int, fls_long(size),
PAGE_SHIFT, IOREMAP_MAX_ORDER);

size = PAGE_ALIGN(size);
if (unlikely(!size))
return NULL;

/* 分配vm_struct
* 并且初始化全0
*/
area = kzalloc_node(sizeof(*area), gfp_mask & GFP_RECLAIM_MASK, node);
if (unlikely(!area))
return NULL;
/* 如果没有标记,size要多算一个guard page用于分隔 */
if (!(flags & VM_NO_GUARD))
size += PAGE_SIZE;
/* 分配vmap_area */
va = alloc_vmap_area(size, align, start, end, node, gfp_mask);
if (IS_ERR(va)) {
kfree(area);
return NULL;
}

/* 初始化 */
setup_vmalloc_vm(area, va, flags, caller);

return area;
}

alloc_vmap_area会缓存上次一次搜索的节点到free_vmap_cache, 这样是认为一般虚拟内存是按顺序连续分配的, 如果通过缓存没有搜索到就要从全局的vmap_area_root开始搜索.

这里说明一下vmap_area的分配过程, 主要是在红黑树中根据所指定的区间寻找合适的点,如果找到一个区间正好包含了vstart,那么在这个区间链表后面寻找合适的空洞进行分配就可以, 也有可能在红黑书上找到的就是空洞就不用接着遍历了, 总之搜索的过程是先红黑书然后再链表.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
* Allocate a region of KVA of the specified size and alignment, within the
* vstart and vend.
*/
static struct vmap_area *alloc_vmap_area(unsigned long size,
unsigned long align,
unsigned long vstart, unsigned long vend,
int node, gfp_t gfp_mask)
{
struct vmap_area *va;
struct rb_node *n;
unsigned long addr;
int purged = 0;
struct vmap_area *first;

BUG_ON(!size);
BUG_ON(offset_in_page(size));
BUG_ON(!is_power_of_2(align));

might_sleep_if(gfpflags_allow_blocking(gfp_mask));

va = kmalloc_node(sizeof(struct vmap_area),
gfp_mask & GFP_RECLAIM_MASK, node);
if (unlikely(!va))
return ERR_PTR(-ENOMEM);

/*
* Only scan the relevant parts containing pointers to other objects
* to avoid false negatives.
*/
kmemleak_scan_area(&va->rb_node, SIZE_MAX, gfp_mask & GFP_RECLAIM_MASK);

retry:
spin_lock(&vmap_area_lock);
/*
* Invalidate cache if we have more permissive parameters.
* cached_hole_size notes the largest hole noticed _below_
* the vmap_area cached in free_vmap_cache: if size fits
* into that hole, we want to scan from vstart to reuse
* the hole instead of allocating above free_vmap_cache.
* Note that __free_vmap_area may update free_vmap_cache
* without updating cached_hole_size or cached_align.
*/
if (!free_vmap_cache || /* 缓存为空, 并且在下面条件满足的时候不使用缓存 */
size < cached_hole_size || /* 至少有一个空洞可以复用, 直接从头开始搜索 */
vstart < cached_vstart || /* 别缓存的起点小 */
align < cached_align) { /* 对齐大小比缓存的小 */
nocache:
cached_hole_size = 0;
free_vmap_cache = NULL;
}
/* record if we encounter less permissive parameters */
cached_vstart = vstart;
cached_align = align;

/* find starting point for our search */
if (free_vmap_cache) { /* 这个地方是用缓存上次搜索的结果, 每次找到以后会存到这里 */
first = rb_entry(free_vmap_cache, struct vmap_area, rb_node);
addr = ALIGN(first->va_end, align);
if (addr < vstart) /* 缓存的末尾对齐之后超出了范围从头开始搜索 */
goto nocache;
if (addr + size < addr) /* 整数溢出 */
goto overflow;

} else {
addr = ALIGN(vstart, align);
if (addr + size < addr)
goto overflow;

n = vmap_area_root.rb_node;
first = NULL;

while (n) { /* 用对齐之后的vstart正好<=va_ned&&>=va_start的节点 */
struct vmap_area *tmp;
tmp = rb_entry(n, struct vmap_area, rb_node);
if (tmp->va_end >= addr) {
first = tmp;
if (tmp->va_start <= addr)
break;
n = n->rb_left;
} else
n = n->rb_right;
}

if (!first)
goto found;
}
/* from the starting point, walk areas until a suitable hole is found */
while (addr + size > first->va_start && addr + size <= vend) {
if (addr + cached_hole_size < first->va_start) /* cached_hole_size */
cached_hole_size = first->va_start - addr; /* 缓存最大的空洞大小 */
addr = ALIGN(first->va_end, align);
if (addr + size < addr)
goto overflow;

if (list_is_last(&first->list, &vmap_area_list))
goto found;

first = list_next_entry(first, list);
}

found:
if (addr + size > vend)
goto overflow;

va->va_start = addr;
va->va_end = addr + size;
va->flags = 0;
__insert_vmap_area(va); /* 插入到红黑书和链表当中 */
free_vmap_cache = &va->rb_node;
spin_unlock(&vmap_area_lock);

BUG_ON(!IS_ALIGNED(va->va_start, align));
BUG_ON(va->va_start < vstart);
BUG_ON(va->va_end > vend);

return va;

overflow:
spin_unlock(&vmap_area_lock);
if (!purged) {
purge_vmap_area_lazy();
purged = 1;
goto retry;
}

if (gfpflags_allow_blocking(gfp_mask)) {
unsigned long freed = 0;
blocking_notifier_call_chain(&vmap_notify_list, 0, &freed);
if (freed > 0) {
purged = 0;
goto retry;
}
}

if (printk_ratelimit())
pr_warn("vmap allocation for size %lu failed: use vmalloc=<size> to increase size\n",
size);
kfree(va);
return ERR_PTR(-EBUSY);
}


建立映射

成功分配好vm_struct之后就可以建立映射了.
__vmalloc_area_node的过程首先是分配物理页,调用alloc_kmem_pages,这个函数和alloc_pages不同的是套了一层cgroup的kmem计数器,来限制内存分配, 然后再调用map_vm_area在页表上建立映射,内部调用的是vmap_page_range,指定了vm_struct的start和end, 而vmap_page_range则调用了vmap_page_range_noflush迭代去初始化各级页表. 内核的虚拟地址的映射就全部完成了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
*
* Set up page tables in kva (addr, end). The ptes shall have prot "prot", and
* will have pfns corresponding to the "pages" array.
* 这里的kva指的是kernel virtual address.
* Ie. pte at addr+N*PAGE_SIZE shall point to pfn corresponding to pages[N]
*/
static int vmap_page_range_noflush(unsigned long start, unsigned long end,
pgprot_t prot, struct page **pages)
{
pgd_t *pgd;
unsigned long next;
unsigned long addr = start;
int err = 0;
int nr = 0;

BUG_ON(addr >= end);
pgd = pgd_offset_k(addr); /* 把虚拟地址hash到pgd */
do {
next = pgd_addr_end(addr, end); /* 获取下一个pgd */
err = vmap_pud_range(pgd, addr, next, prot, pages, &nr); /* 再去初始化 pud pmd pte */
if (err)
return err;
} while (pgd++, addr = next, addr != end);

return nr;
}


对应的调用关系如图.

释放

释放的入口是vfree, 这个函数首先通过in_interrupt检查是否在中断上下文, 如果是正处于中断中则使用workqueue传入vfree_deferred用workqueue进行释放, 不然就直接调用__vunmap来释放, 它会调用remove_vm_area, 最终调用用unmap_vmap_area重置页表, 然后用free_vmap_area_noflushvmp_area加入到vmap_purge_list这个链表当中, 放入这个链表之后只有链表长度超过阈值才会尝试调用try_purge_vmap_area_lazy真正释放掉vmap_area,释放过程主要是从红黑树中移除, 然后把自己也给free掉, 这是一个延迟的释放过程, 最后把把vm_struct自己和它管理的物理页全部释放. 调用图如图所示.

总结

vmalloc主要是针对连续地址分配的需求, 涉及两个部分, 一个是虚拟地址的管理以及到物理地址的映射. 其中虚拟地址的管理比较麻烦一点, 用链表红黑树保存了每个vmap_area并且寻找合适的空洞用于分配虚拟地址. 而物理地址的分配之前我的博客已经讲过了, 建立映射只要设置对应的页表项就OK了.

参考

  1. 《LINUX3.0内核源代码分析》第四章: 内存管理

  2. linux内存管理之vmalloc

  3. lkvm

很多人看内核物理页的分配都喜欢从__alloc_pages开始看,喜欢从程序的执行流程去看问题,其实可换个思路,从开发或者说实现的角度去理解物理页的分配效果可能更好,所以这篇博客,反其道而行之,基于4.7.2的内核代码, 由内向外解释物理页的分配过程的细节.

binary buddy system allocation 算法

伙伴分配器算法的流程如下:

  1. 如果内存被分配
    1. 寻找一个合适大小的内存(要求是: 大于 requested memory, 同时以\(2^k\)为量纲最小的块, 也就是分配一个满足要求的最小块)
      1. 如果找到了直接分配
      2. 如果没有找到, 需要进行如下尝试
        1. 拆分一个比 requested memory 更大的内存块(比如比之前没找到的\(2^x\)大的\(2^{x+1}\), 分成两半.
        2. 如果拆分出来的一半满足requested memory, 并且不能再分了, 已经是最小的了, 就分配该块.
        3. 重复1, 寻找合适大小的内存块.
        4. 重复这个流程直到找到合适的内存块.
  2. 如果内存被释放
    1. 释放\(2^k\)内存块
    2. 查看内存块的伙伴也就是分配之后的另一半\(2^k\)块是否也free了
    3. 如果是,则会回到2并且重复执行直到所有内存被释放或者有一个伙伴没有被free掉, 无法合并.

内存碎片

内存碎片有两种,一种是External fragmentation和Internal Fragmentation, 前者是因为内存块划分的太小而无法满足大的内存分配的需要, 在这点上内核遇到的大内存块分配其实并不多, 而且通过vmalloc把不连续的物理地址映射成连续的虚拟地址很好的解决了这个问题. 后者的解决方法是引入了slab allocator, 对小内存进行了有效的缓存, 避免了Internal fragmentation的情况.

具体实现

每个struct zone结构有一个struct free_area free_area[MAX_ORDER];的定义, 对应的结构如下.

1
2
3
4
struct free_area {
struct list_head free_list[MIGRATE_TYPES];
unsigned long nr_free;
};

所以整个的空闲块的管理结构如图.

每一个free_area用来存储一种\(2^k\)块,用链表链接,MAX_ORDER表示的允许最大的块就是\(2^{MAX_ORDER}\)大小.

migratetype

图中省略了MIGRATE_TYPES的表示, 仔细观察free_list是一个数组每个代表了一种migratetype, 用于page分配从node之间迁移的, 让内存尽量分配在离CPU近的node上, 这个特性是在2.6.32.25引入的, 同时对外部碎片的现象进行了优化, 这一点是这么理解的, 外部碎片是因为小内存块的分配位置可能占据了某个地方, 导致大的内存块无法分配出来, 比如说如果这个页是一些内核的永久数据可能这个碎片就会一直存在, 但是我们提前从可移动页分配一大块内存来给不可移动页, 那么内存碎片就不会影响到可移动页的分配, 这样就不会让内核初始化的时候分配的一些永久性的内存导致碎片影响了其他内存的分配,下面我们看看迁移类型的具体定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* PAGE_ALLOC_COSTLY_ORDER is the order at which allocations are deemed
* costly to service. That is between allocation orders which should
* coalesce naturally under reasonable reclaim pressure and those which
* will not.
*/
/* 内核认为超过8个页算是大的内存分配 */
#define PAGE_ALLOC_COSTLY_ORDER 3

enum {
MIGRATE_UNMOVABLE,
MIGRATE_MOVABLE,
MIGRATE_RECLAIMABLE,
MIGRATE_PCPTYPES, /* the number of types on the pcp lists */
MIGRATE_HIGHATOMIC = MIGRATE_PCPTYPES,
#ifdef CONFIG_CMA
/*
* MIGRATE_CMA migration type is designed to mimic the way
* ZONE_MOVABLE works. Only movable pages can be allocated
* from MIGRATE_CMA pageblocks and page allocator never
* implicitly change migration type of MIGRATE_CMA pageblock.
*
* The way to use it is to change migratetype of a range of
* pageblocks to MIGRATE_CMA which can be done by
* __free_pageblock_cma() function. What is important though
* is that a range of pageblocks must be aligned to
* MAX_ORDER_NR_PAGES should biggest page be bigger then
* a single pageblock.
*/
MIGRATE_CMA,
#endif
#ifdef CONFIG_MEMORY_ISOLATION
MIGRATE_ISOLATE, /* can't allocate from here */
#endif
MIGRATE_TYPES
};


这里简单解释一下:

  1. MIGRATE_UNMOVABLE
     这种类型的页的位置是固定不变的, 不可以移动, 内核中大部分数据是这样的.
    
  2. MIGRATE_MOVABLE
     不能够直接移动,但是可以删除,而内容则可以从某些源重新生成.如文件数据映射的页面则归属此类.
    
  3. MIGRATE_RECLAIMABLE
     可以移动。分配给用户态程序运行的用户空间页面则为该类.由于是通过页面映射而得,将其复制到新位置后,更新映射表项,重新映射,应用程序是不感知的.
    
  4. MIGRATE_PCPTYPES
     是一个分界数,小于这个数的都是在pcplist上的.
    
  5. MIGRATE_CMA
     连续内存分配,用于避免预留大块内存导致系统可用内存减少而实现的,即当驱动不使用内存时,将其分配给用户使用,而需要时则通过回收或者迁移的方式将内存腾出来。
    
  6. MIGRATE_ISOLATE
     表示NUMA不能夸node迁移page, 让cpu和节点之间保持亲和性.
    
  7. MIGRATE_TYPES
     分界数, 低于这个才是迁移类型.
    

实现概要

实际上的合并对应的是通过小块合并成大块从下级的free_area的链表中移除, 然后添加到上一层free_area当中. 拆分则相反, 从上级free_area当中删除大块, 并且拆分成两个小块, 加入到下层free_area的list当中, 当然, 如果有一块符合需求就直接分配了. 这就是整个物理页分配的核心内容.

分配

整个算法的入口是__rmqueue, 从伙伴分配器中分配对应\( 2^{order} \) 的物理块.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
* Do the hard work of removing an element from the buddy allocator.
* Call me with the zone->lock already held.
*/
static struct page *__rmqueue(struct zone *zone, unsigned int order,
int migratetype)
{
struct page *page;

page = __rmqueue_smallest(zone, order, migratetype);
if (unlikely(!page)) {
if (migratetype == MIGRATE_MOVABLE)
page = __rmqueue_cma_fallback(zone, order); /* MOVABLE的失败优先从cma迁移 */

if (!page)
page = __rmqueue_fallback(zone, order, migratetype); /* 失败的话会从其他备选迁移类型当中迁移page */
}

trace_mm_page_alloc_zone_locked(page, order, migratetype);
return page;
}

分配失败退化的行为在后面讨论,先看主要路径, 算法主要体现在__rmqueue_smallest这个函数上, 依次从order开始向上寻找free_area并从list当中取出page块.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
* Go through the free lists for the given migratetype and remove
* the smallest available page from the freelists
*/
static inline
struct page *__rmqueue_smallest(struct zone *zone, unsigned int order,
int migratetype)
{
unsigned int current_order;
struct free_area *area;
struct page *page;

/* Find a page of the appropriate size in the preferred list */
for (current_order = order; current_order < MAX_ORDER; ++current_order) {
area = &(zone->free_area[current_order]);
page = list_first_entry_or_null(&area->free_list[migratetype],
struct page, lru);
if (!page)
continue; /* 如果失败就尝试更大的块 */
list_del(&page->lru); /* 这个lru取决他的上下文,比如在这里就是free_list */
rmv_page_order(page); /* 设置flags */
area->nr_free--; /* free计数器-1 */
expand(zone, page, order, current_order, area, migratetype);
set_pcppage_migratetype(page, migratetype);/* 设置page的migratetype */
return page;
}
return NULL;
}

从下往上找到合适的块以后再从上往下进行拆分, 这依赖于expand函数完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/*
* The order of subdivision here is critical for the IO subsystem.
* Please do not alter this order without good reasons and regression
* testing. Specifically, as large blocks of memory are subdivided,
* the order in which smaller blocks are delivered depends on the order
* they're subdivided in this function. This is the primary factor
* influencing the order in which pages are delivered to the IO
* subsystem according to empirical testing, and this is also justified
* by considering the behavior of a buddy system containing a single
* large block of memory acted on by a series of small allocations.
* This behavior is a critical factor in sglist merging's success.
*
* -- nyc
*/
static inline void expand(struct zone *zone, struct page *page,
int low, int high, struct free_area *area,
int migratetype)
{
unsigned long size = 1 << high;

while (high > low) { /* 从高阶向低阶迭代 */
area--;
high--;
size >>= 1;
VM_BUG_ON_PAGE(bad_range(zone, &page[size]), &page[size]);
/* guardpage TODO */
if (IS_ENABLED(CONFIG_DEBUG_PAGEALLOC) &&
debug_guardpage_enabled() &&
high < debug_guardpage_minorder()) {
/*
* Mark as guard pages (or page), that will allow to
* merge back to allocator when buddy will be freed.
* Corresponding page table entries will not be touched,
* pages will stay not present in virtual address space
*/
set_page_guard(zone, &page[size], high, migratetype);
continue;
}
list_add(&page[size].lru, &area->free_list[migratetype]); /* 把page放到area的free_list里面, 这个page是一个数组形式的 比如之前是8 这里变成4,那么后半部分会被留下来,前半部分会继续用于迭代 */
area->nr_free++; /* 空闲块计数器+1 */
set_page_order(&page[size], high); /* 相当于page->private = high 表示自己属于order为high的阶的block中 */
}
}

expand 就是向下一层的free_area留一半, 然后接着用另一半进行迭代.

举个例子说明分配的过程:

假设当前需要从zone当中分配一个order为1的page, 就会从下往上找到一个可以分配的块.

从1开始迭代到2, 发现2有可以分配的\(2^3\)page, 然后进入expand, 进行拆分.

拆分以后一半用于分配一半进入了同一层的free_list当中,如图所示, 核心流程非常简洁明了.

现在再重新回到__rmqueue,分析这个函数后半部分的fallback行为是如何迁移的.
首先如果编译选项带了CMA就会有具体内容, 其实就是再尝试从MIGRATE_CMA当中再分配一次page.

1
2
3
4
5
6
7
8
9
10
11
#ifdef CONFIG_CMA
static struct page *__rmqueue_cma_fallback(struct zone *zone,
unsigned int order)
{
return __rmqueue_smallest(zone, order, MIGRATE_CMA);
}
#else
static inline struct page *__rmqueue_cma_fallback(struct zone *zone,
unsigned int order) { return NULL; }
#endif

之后是一般性的退化流程是__rmqueue_fallback这个函数.
说这个函数之前需要看一张表.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

/*
* This array describes the order lists are fallen back to when
* the free lists for the desirable migrate type are depleted
*/
static int fallbacks[MIGRATE_TYPES][4] = {
[MIGRATE_UNMOVABLE] = { MIGRATE_RECLAIMABLE, MIGRATE_MOVABLE, MIGRATE_TYPES },
[MIGRATE_RECLAIMABLE] = { MIGRATE_UNMOVABLE, MIGRATE_MOVABLE, MIGRATE_TYPES },
[MIGRATE_MOVABLE] = { MIGRATE_RECLAIMABLE, MIGRATE_UNMOVABLE, MIGRATE_TYPES },
#ifdef CONFIG_CMA
[MIGRATE_CMA] = { MIGRATE_TYPES }, /* Never used */
#endif
#ifdef CONFIG_MEMORY_ISOLATION
[MIGRATE_ISOLATE] = { MIGRATE_TYPES }, /* Never used */
#endif
};

这张表描述的是当一种类型无法满足时退化选择的迁移类型的一个顺序.接下来看函数的实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

/* Remove an element from the buddy allocator from the fallback list */
static inline struct page *
__rmqueue_fallback(struct zone *zone, unsigned int order, int start_migratetype)
{
struct free_area *area;
unsigned int current_order;
struct page *page;
int fallback_mt;
bool can_steal;

/* Find the largest possible block of pages in the other list */
for (current_order = MAX_ORDER-1;
current_order >= order && current_order <= MAX_ORDER-1;
--current_order) {
area = &(zone->free_area[current_order]);
/* 遍历fallback table 找到合适的fallback migratetype 要综合考虑: 是否有超过一定阈值的空闲页,并且是可以退化的页类型 */
fallback_mt = find_suitable_fallback(area, current_order,
start_migratetype, false, &can_steal);
if (fallback_mt == -1)
continue;

page = list_first_entry(&area->free_list[fallback_mt],
struct page, lru);
/* 从对应迁类型中"偷取"page TODO:具体如何迁移的 我感觉应该是很抽象的迁移就可以了 */
if (can_steal)
steal_suitable_fallback(zone, page, start_migratetype);

/* Remove the page from the freelists */
area->nr_free--;
list_del(&page->lru);
rmv_page_order(page);

expand(zone, page, order, current_order, area,
start_migratetype);
/*
* The pcppage_migratetype may differ from pageblock's
* migratetype depending on the decisions in
* find_suitable_fallback(). This is OK as long as it does not
* differ for MIGRATE_CMA pageblocks. Those can be used as
* fallback only via special __rmqueue_cma_fallback() function
*/
set_pcppage_migratetype(page, start_migratetype);

trace_mm_page_alloc_extfrag(page, order, current_order,
start_migratetype, fallback_mt);

return page;
}

return NULL;
}

和主要分配路径不同的事, 迭代器是从最大的阶数开始迭代的,这个代表的意思是说我尽量多迁移一点空间给你, 不然你还要迁移的时候我又分配一小块空间给你, 你的碎片间接导致了我的碎片产生, 所以从大到小开始迭代.

整个核心部分分析完以后, 按照从里向外分析的思路,这是基于__rmqueu的一套更高层次的函数, 再倒回来看alloc_pages的实现会发现非常容易理解.首先__alloc_pages调用__alloc_pages_nodemask. 这里补充一个结构体.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
* Structure for holding the mostly immutable allocation parameters passed
* between functions involved in allocations, including the alloc_pages*
* family of functions.
*
* nodemask, migratetype and high_zoneidx are initialized only once in
* __alloc_pages_nodemask() and then never change.
*
* zonelist, preferred_zone and classzone_idx are set first in
* __alloc_pages_nodemask() for the fast path, and might be later changed
* in __alloc_pages_slowpath(). All other functions pass the whole strucure
* by a const pointer.
*/
struct alloc_context {
struct zonelist *zonelist;
nodemask_t *nodemask;
struct zoneref *preferred_zoneref;
int migratetype;
enum zone_type high_zoneidx;
bool spread_dirty_pages;
};

这个结构体主要是一些内存分配的参数, 因为从上到下的调用都一直有用到这几个参数所以合成一个context向下传递.

__alloc_pages_nodemask第一步会检查cpusets的功能是否打开, 这个是一个cgroup的子模块, 如果没有设置nodemask就会用cpusets配置的cpuset_current_mems_allowed来限制在哪个node上分配, 这个也是在NUMA结构当中才会有用的. 之后会用到might_sleep_if,判断gfp_mask & __GFP_DIRECT_RECLAIM), 表示当前内存压力比较大需要直接回收内存, 会循环睡眠同步等待页可用, 而might_sleep_if是一个debug函数,标记当前函数在iftrue的时候表示可能会进入睡眠, 如果当前调用进入了一个不可睡眠的上下文就会报错. should_fail_alloc_page会做一些预检查, 一些无法分配的条件会直接报错.
接着获取read_mems_allowed_begin,这个会拿到current当前进程的cpusets允许的节点分配掩码, 这个结果的读取通过顺序锁(seqcount_t)进行, 如果在分配page期间这个值发生了改变(通过read_mems_allowed_retry判断), 那么读操作需要重新进行尝试.
调用first_zones_zonelist会从zonelist这个表示分配page的zone的优先顺序链表里获取第一个zoneref, zoneref是一个链表节点用于迭代之后的zone选项.
接着就会调用关键函数get_page_from_freelist从而分配到页.
分配完页如果失败会调用__alloc_pages_slowpath唤起swapd进程进行页回收从而获取可用的页, 再尝试调用get_page_from_free_list.
函数的调用图如下.

这是最外层的一个过程, 但是内核往往经过了很多实践才会有这么一套机制, 内核在这一层之前加了一套缓存, 来加速物理页的分配, get_page_from_free_list到伙伴分配器之间还有一层高速缓存. 这个结构是一个percpu结构, 每个cpu独有一份链表缓存分配的页,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct per_cpu_pages {
int count; /* number of pages in the list */
int high; /* high watermark, emptying needed */
int batch; /* chunk size for buddy add/remove */

/* Lists of pages, one per migrate type stored on the pcp-lists */
struct list_head lists[MIGRATE_PCPTYPES];
};

struct per_cpu_pageset {
struct per_cpu_pages pcp;
#ifdef CONFIG_NUMA
s8 expire;
#endif
#ifdef CONFIG_SMP
s8 stat_threshold;
s8 vm_stat_diff[NR_VM_ZONE_STAT_ITEMS];
#endif
};

每个zone都会有一个这个结构用于缓存页的分配, 中文一般就叫高速缓存.

缓存的过程是这样的, 首先通过for_next_zone_zonelist_nodemask遍历每个zone 会选择合适的zone, 选择合适的zone一笔带过但其实考虑了很多因素比如是否满足分配的wartermark, 是否需要均衡到别的zone当中, 是否被cpusets禁止了分配等等, 然后调用buffered_rmqueue这个函数, 这个函数就是构建对buddy分配器裹了一层高速缓存. 这个函数在只分配1页(order=0)的情况下,会判断是不是”冷”页, bool cold = ((gfp_flags & __GFP_COLD) != 0), 冷页的区别是会放到高速缓存的末尾, 相反热页会放到高速缓存的头部. 然后尝试获取高速缓存的page, 如果高速缓存为空就调用rmqueue_bulk分配一段batch大小的页块到高速缓存中继续尝试分配, 而rmqueue_bulk就是循环了batch次分配了order=0的page逐个添加到高速缓存当中. 如果需要分配的order不为0的话还是会直接从buddy-system当中分配不依靠高速缓存. 高速缓存主要是缓存单页的分配.

所以调整后的走高速缓存的主流程就如下图所示.

释放

还是从伙伴系统的最内部实现向外说, 释放对应的是__free_one_page, 指定释放\(2^{order}\)的物理页块, 地址为page, 可以说释放过程是拆分过程的一个逆过程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
* Freeing function for a buddy system allocator.
*
* The concept of a buddy system is to maintain direct-mapped table
* (containing bit values) for memory blocks of various "orders".
* The bottom level table contains the map for the smallest allocatable
* units of memory (here, pages), and each level above it describes
* pairs of units from the levels below, hence, "buddies".
* At a high level, all that happens here is marking the table entry
* at the bottom level available, and propagating the changes upward
* as necessary, plus some accounting needed to play nicely with other
* parts of the VM system.
* At each level, we keep a list of pages, which are heads of continuous
* free pages of length of (1 << order) and marked with _mapcount
* PAGE_BUDDY_MAPCOUNT_VALUE. Page's order is recorded in page_private(page)
* field.
* So when we are allocating or freeing one, we can derive the state of the
* other. That is, if we allocate a small block, and both were
* free, the remainder of the region must be split into blocks.
* If a block is freed, and its buddy is also free, then this
* triggers coalescing into a block of larger size.
*
* -- nyc
*/

static inline void __free_one_page(struct page *page, /* 把page加回到free_list里面 */
unsigned long pfn,
struct zone *zone, unsigned int order,
int migratetype)
{
unsigned long page_idx;
unsigned long combined_idx;
unsigned long uninitialized_var(buddy_idx);
struct page *buddy;
unsigned int max_order;

max_order = min_t(unsigned int, MAX_ORDER, pageblock_order + 1); /* 获取最大的阶数 */

VM_BUG_ON(!zone_is_initialized(zone)); /* 检查wait_table是否存在来表示zone是否初始化过 里面用到了双感叹号表示强制1或者0 */
VM_BUG_ON_PAGE(page->flags & PAGE_FLAGS_CHECK_AT_PREP, page);

VM_BUG_ON(migratetype == -1);
if (likely(!is_migrate_isolate(migratetype))) /* 是isolate类型 */
__mod_zone_freepage_state(zone, 1 << order, migratetype);

page_idx = pfn & ((1 << MAX_ORDER) - 1); /* 获取page的下标 */

VM_BUG_ON_PAGE(page_idx & ((1 << order) - 1), page);
VM_BUG_ON_PAGE(bad_range(zone, page), page);

continue_merging:
while (order < max_order - 1) {
buddy_idx = __find_buddy_index(page_idx, order);/* 获取buddy的下标 */
buddy = page + (buddy_idx - page_idx); /* 根据相对距离得到buddy page */
if (!page_is_buddy(page, buddy, order)) /* 如果不是buddy就结束合并 */
goto done_merging;
/*
* Our buddy is free or it is CONFIG_DEBUG_PAGEALLOC guard page,
* merge with it and move up one order.
*/
if (page_is_guard(buddy)) {
clear_page_guard(zone, buddy, order, migratetype);
} else {
/* 把buddy从free_area当中移除 */
list_del(&buddy->lru);
zone->free_area[order].nr_free--;
rmv_page_order(buddy);
}
combined_idx = buddy_idx & page_idx;
page = page + (combined_idx - page_idx);
page_idx = combined_idx;
order++; /* 向上合并 */
}
if (max_order < MAX_ORDER) {
/* If we are here, it means order is >= pageblock_order.
* We want to prevent merge between freepages on isolate
* pageblock and normal pageblock. Without this, pageblock
* isolation could cause incorrect freepage or CMA accounting.
*
* We don't want to hit this code for the more frequent
* low-order merging.
*/
/* 在这里说明已经超出了pageblock的order, 可能在不同的migratetype的block边界了,这里再检查一次是不是isolate, 不允许节点间迁移的page, 如果是的话就要结束合并的迭代,不然继续向上合并 */
if (unlikely(has_isolate_pageblock(zone))) {
int buddy_mt;

buddy_idx = __find_buddy_index(page_idx, order);
buddy = page + (buddy_idx - page_idx);
buddy_mt = get_pageblock_migratetype(buddy);

if (migratetype != buddy_mt
&& (is_migrate_isolate(migratetype) ||
is_migrate_isolate(buddy_mt)))
goto done_merging;
}
max_order++;
goto continue_merging;
}

done_merging:
set_page_order(page, order);/* 结束合并自后设置合并之后的阶数 */

/*
* If this is not the largest possible page, check if the buddy
* of the next-highest order is free. If it is, it's possible
* that pages are being freed that will coalesce soon. In case,
* that is happening, add the free page to the tail of the list
* so it's less likely to be used soon and more likely to be merged
* as a higher order page
*/
if ((order < MAX_ORDER-2) && pfn_valid_within(page_to_pfn(buddy))) { /* 这个条件判断没有合并到最大块, 内核认为很有可能接下来这个块和其他free的块合并, 从减少碎片的角度来说会更倾向于放到链表的末尾, 让这个块的free状态保持久一点, 更有机会被合并成更大的块 */
struct page *higher_page, *higher_buddy;
combined_idx = buddy_idx & page_idx; /* 合并后的idx */
higher_page = page + (combined_idx - page_idx); /* 合并后的page */
buddy_idx = __find_buddy_index(combined_idx, order + 1); /* 向上寻找buddy */
higher_buddy = higher_page + (buddy_idx - combined_idx);
if (page_is_buddy(higher_page, higher_buddy, order + 1)) {
list_add_tail(&page->lru,
&zone->free_area[order].free_list[migratetype]);
goto out;
}
}

list_add(&page->lru, &zone->free_area[order].free_list[migratetype]);
/* 把page加入到对应的order当中 */
out:
zone->free_area[order].nr_free++; /* free_area nr_free 空闲块加1 */
}

这个函数就是从order开始迭代不断寻找自己的buddy, 寻找buddy的函数是__find_buddy_index, 首先要说的是page是以数组的形式存放在mem_map当中, 整个page是可以通过index线性索引的.要找到自己的buddy只要在对应的order位进行抑或即可.

1
2
3
4
5
6
7
static inline unsigned long
__find_buddy_index(unsigned long page_idx, unsigned int order)
{
/* idx 抑或 2^order order那位是0, 那么buddy应该是1
* order那位是1, 那么buddy应该是0 */
return page_idx ^ (1 << order);
}

然后层层递归把低层的free_area中的page块向高层移动,最后有一个优化, 内核认为刚释放的page很可能相关的buddy页也会释放, 所以把刚释放的页放到末尾让他更晚被分配出去从而得到更多的机会被合并成大块.

现在再从外层看物理页的分配.

入口函数是__free_pages, 其实知道了分配的过程对于释放的过程可以说是轻而易举就能理解的.首先这个函数put_page_testzero会减少引用计数, 如果到达0才真正执行释放操作.释放路径有两个一个是order为0和不为0的情况, 为0很好理解, 需要交回到高速缓存即可, 如果高速缓存满了再交回给buddy system, 不为0说明也没有缓存过,就直接调用free_one_page,这个函数会调用__free_one_page,最终还给buddy system. 所以这里主要看一下代缓存的路径, 先是调用free_hot_cold_page, 指定是热页也就是添加到高速缓存的头,如果计数器超过阈值pcp->count >= pcp->high调用free_pcppages_bulk交还给buddy system, 这个函数可以遍历高速缓存中的migratetype,然后根据指定的count从高速缓存中交还给buddy system的页, 当前路径是整个batch个的页都会还回去.

带高速缓存的释放的调用图如图所示.

总结

伙伴分配器的确是一个非常简洁的算法(我认为可以递归表示和迭代表示算法都美妙无比), 但是内核对于这样一个简单的算法也结合了很多实际工程的内容还有一些优化,比如迁移类型还有冷热页的标记和高速缓存等等. 充分理解内核的物理页分配对于接下来理解slab分配起和内核虚拟内存的相关内容有很好的帮助作用. 文末给出了许多参考链接, 我结合了前人的知识进行总结, 并且基于最新的4.7.2的内核重新走了一遍分析之路, 做了解释和补充, 对自己的学习是一个积累, 同时希望对大家有帮助.

参考

  1. 迁移类型 实现反碎片
  2. 页面迁移
  3. LVMM 的Physical Page Allocation 章节
  4. Linux 物理页面分配
  5. 顺序锁

qlang[1]是用Go实现的一套基于虚拟机实现的动态语言,这里主要分解一下虚拟机的实现如何嵌入到解释器当中的.

我们用tpl定义了一套语法,类似于yacc的语法,首先是运算优先级.

*代表匹配至少0个,/表示回调动作,|表示或,这是语法的第一部分,属于基本表达式:

1
2
3
4
5
6
7
8
9
10
11
12
term1 = factor *('*' factor/mul | '/' factor/quo | '%' factor/mod)

term2 = term1 *('+' term1/add | '-' term1/sub)

term31 = term2 *('<' term2/lt | '>' term2/gt | "==" term2/eq | "<=" term2/le | ">=" term2/ge | "!=" term2/ne)

term3 = term31 *("<-" term31/chin)

term4 = term3 *("&&"/_mute term3/_code/_unmute/and)

expr = term4 *("||"/_mute term4/_code/_unmute/or)

所以term1表示乘性运算,相似的term2表示加性运算,term3表示逻辑运算,term4表示与,expr表示表达式.
这里有一个特点就是优先级越高的运算在语法上写在了越前面,这是有原因的.

由上向下的递归推导可以看做是把输入的记号流转化成一颗语法树,从根节点按先序遍历进行.所以对应的也是寻找一个最左推导的过程.
递归方式的遍历也是一种实现,比如说如果我要匹配1+2那么就会构建一颗树形结构如下图的第一个.但是如果要匹配1+2*3这样的表达式就有可能出现问题,既可以是图中的第二种情况也可以是第三种情况.这里就关系到优先级的问题.因为递归向下的解析过程是深度优先的也就是意味着如果最先选择匹配,就能先形成语法结构,比如说,当输入是1+2*3时,如果先匹配加法,那么前三个token就是1+2就会构成一个结点,之后3就只能作为*的右边的因数加入到语法树中,但是如果乘法在语法顺序中靠前,那么匹配到1+的时候,就会去匹配term1这个语法,然后term1就会把乘法匹配完,这样就能构成下图中第三个语法树.这种优先级可以推而广之,同级优先的运算作为一个语法,高优先级的语法排在较前即可.

语法树

下面语句的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
sexpr = expr (
'='/tovar! expr/assign |
','/tovar! expr/tovar % ','/ARITY '=' expr % ','/ARITY /massign |
"++"/tovar/inc | "--"/tovar/dec |
"+="/tovar! expr/adda | "-="/tovar! expr/suba |
"*="/tovar! expr/mula | "/="/tovar! expr/quoa | "%="/tovar! expr/moda | 1/pop)

s = "if"/_mute! expr/_code body *("elif" expr/_code body)/_ARITY ?("else" body)/_ARITY/_unmute/if |
"switch"/_mute! ?(~'{' expr)/_code '{' swbody '}'/_unmute/switch |
"for"/_mute/_urange! fhead body/_unmute/for |
"return"! expr %= ','/ARITY /return |
"break" /brk |
"continue" /cont |
"include"! STRING/include |
"import"! (STRING ?("as" IDENT/name)/ARITY)/import |
"export"! IDENT/name % ','/ARITY /export |
"defer"/_mute! expr/_code/_unmute/defer |
"go"/_mute! expr/_code/_unmute/go |
sexpr

doc = ?s *(';' ?s)

body = '{' doc/_code '}'

fhead = (~'{' s)/_code %= ';'/_ARITY

frange = ?(IDENT/name % ','/ARITY '=')/ARITY "range" expr

swbody = *("case"! expr/_code ':' doc/_code)/_ARITY ?("default"! ':' doc/_code)/_ARITY

fnbody = '(' IDENT/name %= ','/ARITY ?"..."/ARITY ')' '{'/_mute doc/_code '}'/_unmute

afn = '{'/_mute doc/_code '}'/_unmute/afn

clsname = '(' IDENT/ref ')' | IDENT/ref

newargs = ?('(' expr %= ','/ARITY ')')/ARITY


语句主要支持的是赋值表达式,if语句,switch语句,for语句,return语句,break语句,continue语句,include语句,import语句,export语句,defer语句,go语句和单表达式构成的语句.

if语句类似于if expr body elif expr else body,
switch语句类似于switch expr { swbody },swbody的定义又是类似于case expr: body default: body的形式.
for语句类似于for stmt;stmt;stmt body的形式.
retur语句比较简答,对应的是return expr.
break,continue语句只有continuebreak. include,import语句则是include STRINGimport STRING的形式. defer语句是defer expr的形式. go语句是go expr`的形式,这里的go语句就是起一个routine进行执行.

从这里可以说一下这门语言的设计,类似于lua从简实现.因为继承了Go通道的特性所以语法中出现了类似<-的语法,并且支持类和成员函数.支持复合类型比如说slice,map和chan.

接着是单一元素的定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
classb = "fn"! IDENT/name fnbody ?';'/mfn

atom =
'(' expr %= ','/ARITY ?"..."/ARITY ?',' ')'/call |
'.' (IDENT|"class"|"new"|"recover"|"main")/mref |
'[' ?expr/ARITY ?':'/ARITY ?expr/ARITY ']'/index

factor =
INT/pushi |
FLOAT/pushf |
STRING/pushs |
CHAR/pushc |
(IDENT/ref | '('! expr ')' |
"fn"! (~'{' fnbody/fn | afn) | '[' expr %= ','/ARITY ?',' ']'/slice) *atom |
"new"! clsname newargs /new |
"range"! expr/_range |
"class"! '{' *classb/ARITY '}'/class |
"recover"! '(' ')'/recover |
"main"! afn |
'{'! (expr ':' expr) %= ','/ARITY ?',' '}'/map |
'!' factor/not |
'-' factor/neg |
"<-" factor/chout |
'+' factor

atom是个比较常见的定义,作为一个变量的atom,就是类似于fn(expr),var.selector或者indent[index]构成atom定义,作为元素的一部分.
整数(INT),浮点数(FLOAT),字符串(STRING),字符(CHAR),组合atom的元素(INDENT atom,fn{}atom,[expr,expr…]atom)等都可能是一个元素.
支持语法级别的new返回的对象,支持range,从panic中recover也可能是一个返回值,以及一些单操作符可以表达的字符串.

以上是整个语法的定义.qlang运行的环境是自行编写的虚拟机环境,实现一个虚拟机主要是要实现各种字节码的实现,还有对应的系统调用.
但是目前的实现没有定义中间状态的字节码,而是以一个库的形式,用接口的形式作为字节码执行的接口.

任何字节码的定义都要满足这样的接口,第一个参数是当前的栈,第二个参数是执行的上下文.

1
2
3
type Instr interface {
Exec(stk *Stack, ctx *Context)
}

对应的实现者为Class,iAnonymFn(匿名函数),iAs,

iAssign(赋值),iCall(调用),iDefer,isExport`,

iForRange,iFunc,iGo,iMacro,iMemberRef,

iModule,iMultiAssign,iMultiAssingFromSlice,

iOp1Assign,iOp3,iOpAssign,iPush,iRef,

iRem,iUnSet,iAnd,iCallFn,iCallFnv,

iCase,iGo,iChanIn,iClear,iJmp,

iJmpIfFalse,iNew,iOr,iPop,iPopEx,

iRecover,iReturn,等等这些字节码都是通过构造函数变作为Insrt接口返回的.

其实编程语言的虚拟机说白了就是一种提供字节码的运行环境,比如我定义add x1,x2sub x1,x2两条指令作为我的虚拟机的集合,那么我的虚拟寄字节码就支持加法和乘法,也就可以完成一个简易计算器的算式到字节码的转化,再把得到的字节码按照我们的需要进行转换.

比如LLVM[2]就是把自己定义的一套字节码(也就是类似于平台无关的虚拟汇编,其实这种汇编要稍微好用一点点,因为里面的寄存器是无限多个的,不需要自己考虑寄存器不够的情况),然后把这些字节码转化成平台相关的汇编,最后变成二进制文件.所以实现一套编程语言虚拟机就是要定义一套字节码的集合和对应的实现方式.

这里我们用Go实现虚拟机是通过Go来运行的,所以没有静态语言的性质,只是跑在Go的运行时上的虚拟机.这里我们举个例子看一下具体的实现.

现在以switch语法为例说一下运行流程
在C转汇编的过程中,switch case是通过跳转表来实现的,比如下面的C代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <stdio.h>

int f(int x)
{
switch(x)
{
case 1:
printf("1");
break;
case 2:
printf("2");
break;
default:
printf("default");
}
}
int main()
{
f(3);
}

转成汇编的话就是如下代码,f为switch case的部分的实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
	.file	"main.c"
.section .rodata
.LC0:
.string "default"
.text
.globl f
.type f, @function
f:
pushq %rbp // 保存栈base指针
movq %rsp, %rbp // 移动栈指针到rbp
subq $16, %rsp // 因为leaf function,可以开辟red zone[3] 128个字节
movl %edi, -4(%rbp) // 栈指针开始第4个字节,也就是第一个参数,0(%rbp)是callee保留的rbp.
movl -4(%rbp), %eax // 移动到eax中
cmpl $1, %eax //和1比较跳到L3
je .L3
cmpl $2, %eax //和2比较跳到L4
je .L4
jmp .L7 // default跳到L7
.L3:
movl $49, %edi // 放入参数'1'调用putchar,这里只打印一个字符,被优化成了putchar.
call putchar
jmp .L6
.L4:
movl $50, %edi // 放入参数'2'
call putchar
jmp .L6
.L7:
movl $.LC0, %edi // 让如.LC0,也就是字符串"default"的地址放入edi作为printf的参数
movl $0, %eax
call printf
.L6:
leave
ret
.size f, .-f
.globl main
.type main, @function
main:
pushq %rbp
movq %rsp, %rbp
movl $3, %edi
call f
popq %rbp
ret
.size main, .-main
.ident "GCC: (Ubuntu 4.8.4-2ubuntu1~14.04.1) 4.8.4"
.section .note.GNU-stack,"",@progbits

从汇编可以看出switch case其实本身其实是可以通过goto实现的,switch case只是goto的一个高级封装的实现技巧而已.如何放到虚拟机中
其实就是提供类似的goto机制来满足跳转的需求.

switch解析的时候首先注册了$switch的回调函数,如果匹配了"switch"/_mute! ?(~'{' expr)/_code '{' swbody '}'/_unmute/switch就会调用Compiler.Switch函数进行处理.

_mute会禁止回调函数的运行,除了_开头的回调函数,_unmute则是解开禁止.
_code的作用是把匹配到的语法记号流(tokens []tpl.Token)入栈.

swith boyd 是按照如下定义的:

swbody = *("case"! expr/_code ':' doc/_code)/_ARITY ?("default"! ':' doc/_code)/_ARITY

_ARITY获取的是语法匹配的次数,分别记录了casedefault匹配的次数,case可以匹配*次,default可以匹配?次.

然后可以看下Switch如何处理的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (p *Compiler) Switch(e interpreter.Engine) {

var defaultCode interface{}
// 之前push了一个 ? 的匹配次数, 如果是1那么就有default的代码, 所以把defaultCode pop出来.
defaultArity := p.popArity()
if defaultArity == 1 {
defaultCode, _ = p.gstk.Pop()
}
// 获取case的匹配次数
caseArity := p.popArity()
// case 中有个expression,case:后面有一个statment,所以乘2
casebr := p.gstk.PopNArgs(caseArity << 1) // 2 * caseArity
// 这switch:后面跟着的expression的代码取出
switchCode, _ := p.gstk.Pop()
// 保存老的块上下文
old := p.bctx
p.bctx = blockCtx{}
// switchCode有两种 , 一种是 switch , 一种是 switch expr.
// 这里处理的是switch {}的形式,每个case中都是条件表达式,就变成了if语句.
if switchCode == nil {
// 执行case branch,和default branch
p.doIf(e, casebr, defaultCode, caseArity)
p.bctx.MergeSw(&old, p.code.Len())
return
}
// 转换switchCode
// reserved2 是一组空的指令,用于最后填充跳转指令跳到switch body的末尾.
reserved2 := make([]exec.ReservedInstr, caseArity)
if err := e.EvalCode(p, "expr", switchCode); err != nil {
panic(err)
}
// 解析switchCode完毕,添加一行代码
p.CodeLine(switchCode)
for i := 0; i < caseArity; i++ {
caseCode := casebr[i<<1]
// 解析表达式
if err := e.EvalCode(p, "expr", caseCode); err != nil {
panic(err)
}
// 记录解析过的一行代码
p.CodeLine(caseCode)

// 保留指令一行空指令留待插入 case的跳转指令
reserved1 := p.code.Reserve()
bodyCode := casebr[(i<<1)+1]
// 解析块代码
bctx := evalDocCode(e, p, bodyCode)
// 把当前作用域中break,continue指令加入到p.bctx中
// 等最后到解析末尾再把跳转距离计算出来
bctx.MergeTo(&p.bctx)
// 把当前位置留空.
// 解析到了case :{}结尾作为跳转到结尾的指令的插入位置.
reserved2[i] = p.code.Reserve()
// 把reserved1保留的位置插入跳转到reserved2的保留的地址的地方.
// 相当于 Case delta,如果case 成功那么就跳到body的末尾,reserved2[i]
reserved1.Set(exec.Case(reserved2[i].Delta(reserved1)))
}
// 类似的解析default的case
p.code.Block(exec.Default)
bctx := evalDocCode(e, p, defaultCode)
bctx.MergeTo(&p.bctx)

end := p.code.Len()
for i := 0; i < caseArity; i++ {
// 设置跳转到末尾的指令
reserved2[i].Set(exec.Jmp(end - reserved2[i].Next()))
}
// 设置break指令的跳转地址.
// 并把旧bctx换回来,也就是说break,continue
// 跳转范围就终止在 switch 的作用域内.
p.bctx.MergeSw(&old, end)
}

比如下面的代码进行转化的话

1
2
3
4
5
6
7
8
9
10
11
12
13
x=1
switch(x){
case 1:
x=4
break
case 2:
x=2
break
case 3:
x=3
break
}

就跳到执行块的末尾,最后的翻译结果就是下面这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
==> 0000: Var &{x} // 变量x
==> 0001: Push &{1} // 压入1
==> 0002: AssignEx 0 // x=1
==> 0003: Ref &{x} // 引用x
==> 0004: Push &{1} // 压入1
==> 0005: Case 5 // case是自己定义的字节码,等于pop 1 再和当前栈顶的x比较,如果成功向下跳转5
==> 0006: Var &{x} // 引用x
==> 0007: Push &{4} // 压入4
==> 0008: AssignEx 0 // x=4
==> 0009: Jmp 16 // break 跳到结尾
==> 0010: Jmp 15 // case 不会继续执行,也是跳到结尾
==> 0011: Push &{2} // 后面是类似的
==> 0012: Case 5
==> 0013: Var &{x}
==> 0014: Push &{2}
==> 0015: AssignEx 0
==> 0016: Jmp 9
==> 0017: Jmp 8
==> 0018: Push &{3}
==> 0019: Case 5
==> 0020: Var &{x}
==> 0021: Push &{3}
==> 0022: AssignEx 0
==> 0023: Jmp 2
==> 0024: Jmp 1
==> 0025: Pop 0

比起汇编,我们定义的字节码稍微高级一点不需要构造跳转表,而是用Case指令替代,和栈顶的值比较,如果为true就顺序执行,不然就会跳转
相对距离的位置,到这里为止,我们的转换就结束了.

最后总结一下,现代编译器的实现已经非常方便了,前端有lex,yacc,后端有llvm,比如object-c和clang,以及rust的后端都是llvm,所以llvm还是很靠谱的,而前端手写或者用工具生成各有利弊,但是并不是什么决定因素.

而这里实现的qlang主要是优势是短小精悍无依赖,全部前后端自行实现,可以作为Golang的项目的嵌入脚本语言,类似于lua.

[1] https://github.com/qiniu/qlang
[2] http://llvm.org/
[3] http://eli.thegreenplace.net/2011/09/06/stack-frame-layout-on-x86-64/

每一种语言都会有一个定义良好的语法结构.函数是由申明和语句构成的,而语句又是由表达式构成的.
经常用来描述语法的是BNF[1].Go使用的是相应的变种,在Go的官方文档中有很详细的spec描述[2].一门语言的设计其实就在这份描述当中,这是一门语言的语法和规则的定义,是表面程序员可以接触到的部分,而运行时却可以改变,这相当于和程序员约定的接口,只要按照这个接口编写源代码,就能产生正常可以编译的二进制文件,但是最后的二进制文件如何运行,对于每条语法转换成了什么,有什么优化都是编译器优化和运行时的工作.所以一门语言必须有一个详尽的描述,这和一个网络协议一样,是非常重要的部分.

语法分析器也是有工具可以自动生成的,比如yacc[3].我在之前提到过使用工具的利弊,就不赘述了.

本文主要看一下Go的语法分析是如何进行.Go的parser接受的输入是源文件,内嵌了一个scanner,最后把scanner生成的token变成一颗抽象语法树(AST).
编译时的错误也是在这个时候报告的,但是大部分编译器编译时的错误系统并不是很完美,有时候报的错误文不对题,这主要是因为写对的方式有几种
但是写错的方式有很多种,编译器只能把一些错误进行归类,并且指出当前认为可疑的地方,并不能完完全全的知道到底是什么语法错误.这个需要结合给出的错误进行判断,clang作为一个C编译器做得好很多,这都是开发者不断地添加错误处理的结果,比gcc的报错完善很多.然而Go的编译时的错误处理也是秉承了gcc的风格,并不明确,但是会指出可疑的地方,在大多数场景下或者对语言标准熟悉的情况下也不是很麻烦.
下面看一下Go是怎么定义这些语法结构.这些结构都在go/ast当中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// All node types implement the Node interface.
type Node interface {
Pos() token.Pos // position of first character belonging to the node
End() token.Pos // position of first character immediately after the node
}

// All expression nodes implement the Expr interface.
type Expr interface {
Node
exprNode()
}

// All statement nodes implement the Stmt interface.
type Stmt interface {
Node
stmtNode()
}

// All declaration nodes implement the Decl interface.
type Decl interface {
Node
declNode()
}

语法有三个主体,表达式(expression),语句(statement),声明(declaration),Node是基类,用于标记该节点的位置的开始和结束.
而三个主体的函数没有实际意义,只是用三个interface来划分不同的语法单位,如果某个语法是Stmt的话,就实现一个空的stmtNode函数即可.
这样的好处是可以对语法单元进行comma,ok来判断类型,并且保证只有这些变量可以赋值给对应的interface.但是实际上这个划分不是很严格,比如

1
2
3
4
func (*ArrayType) exprNode()     {}
func (*StructType) exprNode() {}
func (*FuncType) exprNode() {}
func (*InterfaceType) exprNode() {}

就是类型,但是属于Expr,而真正的表达式比如

1
2
func (*BasicLit) exprNode()       {}
func (*FuncLit) exprNode() {}

是可以赋值给Exprt的.

了解了这个设计,再来看整个内容其实就是定义了源文件中可能出现的语法结构.列表如下,这个列表很长,扫一眼就可以,具体可以再回来看.

  1. 普通Node,不是特定语法结构,属于某个语法结构的一部分.
    • Comment 表示一行注释 // 或者 /* */
    • CommentGroup 表示多行注释
    • Field 表示结构体中的一个定义或者变量,或者函数签名当中的参数或者返回值
    • FieldList 表示以”{}”或者”()”包围的Filed列表
  2. Expression & Types (都划分成Expr接口)
    • BadExpr 用来表示错误表达式的占位符
    • Ident 比如报名,函数名,变量名
    • Ellipsis 省略号表达式,比如参数列表的最后一个可以写成arg...
    • BasicLit 基本字面值,数字或者字符串
    • FuncLit 函数定义
    • CompositeLit 构造类型,比如{1,2,3,4}
    • ParenExpr 括号表达式,被括号包裹的表达式
    • SelectorExpr 选择结构,类似于a.b的结构
    • IndexExpr 下标结构,类似这样的结构 expr[expr]
    • SliceExpr 切片表达式,类似这样 expr[low:mid:high]
    • TypeAssertExpr 类型断言类似于 X.(type)
    • CallExpr 调用类型,类似于 expr()
    • StarExpr *表达式,类似于 *X
    • UnaryExpr 一元表达式
    • BinaryExpr 二元表达式
    • KeyValueExp 键值表达式 key:value
    • ArrayType 数组类型
    • StructType 结构体类型
    • FuncType 函数类型
    • InterfaceType 接口类型
    • MapType map类型
    • ChanType 管道类型
  3. Statements
    • BadStmt 错误的语句
    • DeclStmt 在语句列表里的申明
    • EmptyStmt 空语句
    • LabeledStmt 标签语句类似于 indent:stmt
    • ExprStmt 包含单独的表达式语句
    • SendStmt chan发送语句
    • IncDecStmt 自增或者自减语句
    • AssignStmt 赋值语句
    • GoStmt Go语句
    • DeferStmt 延迟语句
    • ReturnStmt return 语句
    • BranchStmt 分支语句 例如break continue
    • BlockStmt 块语句 {} 包裹
    • IfStmt If 语句
    • CaseClause case 语句
    • SwitchStmt switch 语句
    • TypeSwitchStmt 类型switch 语句 switch x:=y.(type)
    • CommClause 发送或者接受的case语句,类似于 case x <-:
    • SelectStmt select 语句
    • ForStmt for 语句
    • RangeStmt range 语句
  4. Declarations
    • Spec type
      • Import Spec
      • Value Spec
      • Type Spec
    • BadDecl 错误申明
    • GenDecl 一般申明(和Spec相关,比如 import “a”,var a,type a)
    • FuncDecl 函数申明
  5. Files and Packages
    • File 代表一个源文件节点,包含了顶级元素.
    • Package 代表一个包,包含了很多文件.

上面就是整个源代码的所有组成元素,接下来就来看一下语法分析是如何进行的,也就是最后的AST是如何构建出来的.

先看一下parser结构体的定义,parser是以file为单位的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// The parser structure holds the parser's internal state.
type parser struct {
file *token.File
errors scanner.ErrorList // 解析过程中遇到的错误列表
scanner scanner.Scanner // 词法分析器.

// Tracing/debugging
mode Mode // parsing mode // 解析模式
trace bool // == (mode & Trace != 0)
indent int // indentation used for tracing output

// Comments 列表
comments []*ast.CommentGroup
leadComment *ast.CommentGroup // last lead comment
lineComment *ast.CommentGroup // last line comment

// Next token
pos token.Pos // token position
tok token.Token // one token look-ahead
lit string // token literal

// Error recovery
// (used to limit the number of calls to syncXXX functions
// w/o making scanning progress - avoids potential endless
// loops across multiple parser functions during error recovery)
syncPos token.Pos // last synchronization position 解析错误的同步点.
syncCnt int // number of calls to syncXXX without progress

// Non-syntactic parser control
// 非语法性的控制
// <0 在控制语句中, >= 在表达式中.
exprLev int // < 0: in control clause, >= 0: in expression
// 正在解析右值表达式
inRhs bool // if set, the parser is parsing a rhs expression

// Ordinary identifier scopes
pkgScope *ast.Scope // pkgScope.Outer == nil
topScope *ast.Scope // top-most scope; may be pkgScope
unresolved []*ast.Ident // unresolved identifiers
imports []*ast.ImportSpec // list of imports

// Label scopes
// (maintained by open/close LabelScope)
labelScope *ast.Scope // label scope for current function
targetStack [][]*ast.Ident // stack of unresolved labels
}

解析的入口是ParseFile,首先调用init,再调用parseFile进行解析.
整个解析是一个递归向下的过程也就是最low但是最实用的手写实现的方式.像yacc[4]生成的是我们编译里学的LALR[5]文法,牛逼的一逼,但是
gcc和Go都没用自动生成的解析器,也就是手写个几千行代码的事,所以为了更好的掌握编译器的细节,都选择了手写最简单的递归向下的方式.

通过init初始化scanner等.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (p *parser) init(fset *token.FileSet, filename string, src []byte, mode Mode) {
p.file = fset.AddFile(filename, -1, len(src))
var m scanner.Mode
if mode&ParseComments != 0 {
m = scanner.ScanComments
}
// 错误处理函数是在错误列表中添加错误.
eh := func(pos token.Position, msg string) { p.errors.Add(pos, msg) }
p.scanner.Init(p.file, src, eh, m)

p.mode = mode
p.trace = mode&Trace != 0 // for convenience (p.trace is used frequently)

p.next()
}

parseFile的简化流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
       // package clause
// 获取源文件开头的doc注释,从这里递归向下的解析开始了
doc := p.leadComment
// expect 从scanner获取一个token,并且返回位置pos.
pos := p.expect(token.PACKAGE)
// parseIdent 获取一个token并且转化为indent,如果不是报错.
ident := p.parseIdent()
if ident.Name == "_" && p.mode&DeclarationErrors != 0 {
p.error(p.pos, "invalid package name _")
}
// 作用域开始,标记解释器当前开始一个新的作用域
p.openScope()
// pkgScope 就是现在进入的作用域
p.pkgScope = p.topScope
// 解析 import 申明
for p.tok == token.IMPORT {
// parseGenDecl解析的是
// import (
// )
// 这样的结构,如果有括号就用parseImportSpec解析列表
// 没有就单独解析.
// 而parseImportSpec解析的是 一个可选的indent token和一个字符串token.
// 并且加入到imports列表中.
decls = append(decls, p.parseGenDecl(token.IMPORT, p.parseImportSpec))
}
// 解析全局的申明,包括函数申明
if p.mode&ImportsOnly == 0 {
// rest of package body
for p.tok != token.EOF {
decls = append(decls, p.parseDecl(syncDecl))
}
}
// 标记从当前作用域离开.
p.closeScope()
// 最后返回ast.File文件对象.
return &ast.File{
Doc: doc,
Package: pos,
Name: ident,
Decls: decls,
Scope: p.pkgScope,
Imports: p.imports,
Unresolved: p.unresolved[0:i],
Comments: p.comments,
}

看一下parseDecl主要是根据类型的不同调用不同的解析函数,parseValueSpec解析Value类型,parseTypeSpec解析Type类型,parseFuncDecl解析函数.
解析定义和解析类型的都是解析了,类似于var|type ( ident valueSpec|typeSpec)的token结构.因为parseFuncDecl里面也会解析这些内容,所以直接从函数解析来看也可以.
因为外一层的top scope其实就是相当于一个抽象的函数作用域而已,这样是为什么lennew这样的内嵌函数在函数内是可以做变量名的原因,因为可以在子作用域覆盖top作用域.整个解析过程简化过程如下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 解析一个func.
pos := p.expect(token.FUNC)
// 开一个新的作用域,topScope作为父Scope.
scope := ast.NewScope(p.topScope) // function scope
// 解析一个ident作为函数名
ident := p.parseIdent()
// 解析函数签名,也就是参数和返回值
params, results := p.parseSignature(scope)
// 再解析body
body = p.parseBody(scope)
// 最后返回函数申明.
decl := &ast.FuncDecl{
Doc: doc,
Recv: recv,
Name: ident,
Type: &ast.FuncType{
Func: pos,
Params: params,
Results: results,
},
Body: body,
}

解析参数和返回值就是解析(filed,filed)这样的格式,每个filed是indent type的token,最后构造成函数签名.然后来到parseBody,这个函数其实就是解析了左右花括号,然后向下开始解析Statement列表,类似于body -> { stmt_list },然后进入stmt_list的解析,不断地解析statement.

1
2
3
for p.tok != token.CASE && p.tok != token.DEFAULT && p.tok != token.RBRACE && p.tok != token.EOF {
list = append(list, p.parseStmt())
}

parseStmt最后会进入到语句的解析,然后根据不同的token选择进入不同的解析流程,比如看到var,type,const就是申明,碰到标识符和数字等等可能就是单独的表达式,
如果碰到go,就知道是一个go语句,如果看到defer和return都能判断出相应的语句并按规则解析,看到break等条件关键字就解析条件语句,看到{就解析块语句.都是可以递归去解析的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (p *parser) parseStmt() (s ast.Stmt) {
if p.trace {
defer un(trace(p, "Statement"))
}

switch p.tok {
case token.CONST, token.TYPE, token.VAR:
s = &ast.DeclStmt{Decl: p.parseDecl(syncStmt)}
case
// tokens that may start an expression
token.IDENT, token.INT, token.FLOAT, token.IMAG, token.CHAR, token.STRING, token.FUNC, token.LPAREN, // operands
token.LBRACK, token.STRUCT, token.MAP, token.CHAN, token.INTERFACE, // composite types
token.ADD, token.SUB, token.MUL, token.AND, token.XOR, token.ARROW, token.NOT: // unary operators
s, _ = p.parseSimpleStmt(labelOk)
// because of the required look-ahead, labeled statements are
// parsed by parseSimpleStmt - don't expect a semicolon after
// them
if _, isLabeledStmt := s.(*ast.LabeledStmt); !isLabeledStmt {
p.expectSemi()
}
case token.GO:
s = p.parseGoStmt()
case token.DEFER:
s = p.parseDeferStmt()
case token.RETURN:
s = p.parseReturnStmt()
case token.BREAK, token.CONTINUE, token.GOTO, token.FALLTHROUGH:
s = p.parseBranchStmt(p.tok)
case token.LBRACE:
s = p.parseBlockStmt()
...省略

举个例子看一下parseSimpleStmt()的简化流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
       // 解析左列表 一般是 l := r 或者 l1,l2 = r1,r2 或者 l <- r 或者 l++
x := p.parseLhsList()
switch p.tok {
case
token.DEFINE, token.ASSIGN, token.ADD_ASSIGN,
token.SUB_ASSIGN, token.MUL_ASSIGN, token.QUO_ASSIGN,
token.REM_ASSIGN, token.AND_ASSIGN, token.OR_ASSIGN,
token.XOR_ASSIGN, token.SHL_ASSIGN, token.SHR_ASSIGN, token.AND_NOT_ASSIGN:
// 如果看到range,range作为一种运算符按照range rhs来解析
// 如果没看到就按正常赋值语句解析 lhs op rhs 来解析op可以是上面那些token中的一种.
pos, tok := p.pos, p.tok
p.next()
var y []ast.Expr
isRange := false
if mode == rangeOk && p.tok == token.RANGE && (tok == token.DEFINE || tok == token.ASSIGN) {
pos := p.pos
p.next()
y = []ast.Expr{&ast.UnaryExpr{OpPos: pos, Op: token.RANGE, X: p.parseRhs()}}
isRange = true
} else {
y = p.parseRhsList()
}
as := &ast.AssignStmt{Lhs: x, TokPos: pos, Tok: tok, Rhs: y}

// 碰到":"找一个ident, 构成 goto: indent 之类的语句.
case token.COLON:
colon := p.pos
p.next()
if label, isIdent := x[0].(*ast.Ident); mode == labelOk && isIdent {
// Go spec: The scope of a label is the body of the function
// in which it is declared and excludes the body of any nested
// function.
stmt := &ast.LabeledStmt{Label: label, Colon: colon, Stmt: p.parseStmt()}
p.declare(stmt, nil, p.labelScope, ast.Lbl, label)
return stmt, false
}
// 碰到"<-",就构成 <- rhs 这样的语句.
case token.ARROW:
// send statement
arrow := p.pos
p.next()
y := p.parseRhs()
return &ast.SendStmt{Chan: x[0], Arrow: arrow, Value: y}, false

// 碰到"++"或者"--"就构成一个单独的自增语句.
case token.INC, token.DEC:
// increment or decrement
s := &ast.IncDecStmt{X: x[0], TokPos: p.pos, Tok: p.tok}
p.next()
return s, false
}

接下来就不一一解释每段代码了,具体情况具体看就可以.这里举个例子.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"go/ast"
"go/parser"
"go/token"
)

func main() {
fset := token.NewFileSet()
f, err := parser.ParseFile(fset, "", `
package main
func main(){
// comments
x:=1
go println(x)

}
`, parser.ParseComments)
if err != nil {
panic(err)
}
ast.Print(fset, f)
}

产生的结果是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
 0  *ast.File {
1 . Package: 2:1 |PACKAGE token
2 . Name: *ast.Ident { |IDENT token
3 . . NamePos: 2:9 |
4 . . Name: "main" |
5 . } |整个构成了顶部的 package main
6 . Decls: []ast.Decl (len = 1) { |最上层的申明列表
7 . . 0: *ast.FuncDecl { |func main的函数申明
8 . . . Name: *ast.Ident { |IDENT token
9 . . . . NamePos: 3:6 |
10 . . . . Name: "main" |
11 . . . . Obj: *ast.Object { |Objec是一个用于表达语法对象的结构
12 . . . . . Kind: func |表示之前存在过,Decl指向了7,也就是第7行的FuncDecl.
13 . . . . . Name: "main" |
14 . . . . . Decl: *(obj @ 7) |
15 . . . . } |
16 . . . } |
17 . . . Type: *ast.FuncType { |函数类型,也就是函数签名
18 . . . . Func: 3:1 |参数和返回值都是空的
19 . . . . Params: *ast.FieldList { |
20 . . . . . Opening: 3:10
21 . . . . . Closing: 3:11
22 . . . . }
23 . . . }
24 . . . Body: *ast.BlockStmt { |块语句,也就是main的body
25 . . . . Lbrace: 3:12
26 . . . . List: []ast.Stmt (len = 2) { |语句列表
27 . . . . . 0: *ast.AssignStmt { |赋值语句
28 . . . . . . Lhs: []ast.Expr (len = 1) { |左值是x
29 . . . . . . . 0: *ast.Ident {
30 . . . . . . . . NamePos: 5:2 |
31 . . . . . . . . Name: "x"
32 . . . . . . . . Obj: *ast.Object { |
33 . . . . . . . . . Kind: var
34 . . . . . . . . . Name: "x" |
35 . . . . . . . . . Decl: *(obj @ 27)
36 . . . . . . . . }
37 . . . . . . . } |
38 . . . . . . }
39 . . . . . . TokPos: 5:3 |:=和它的位置
40 . . . . . . Tok: :=
41 . . . . . . Rhs: []ast.Expr (len = 1) { |右边是一个数字类型的token
42 . . . . . . . 0: *ast.BasicLit {
43 . . . . . . . . ValuePos: 5:5
44 . . . . . . . . Kind: INT
45 . . . . . . . . Value: "1"
46 . . . . . . . }
47 . . . . . . }
48 . . . . . }
49 . . . . . 1: *ast.GoStmt { |接下来是go语句
50 . . . . . . Go: 6:2
51 . . . . . . Call: *ast.CallExpr { |一个调用表达式
52 . . . . . . . Fun: *ast.Ident { |IDENT token是println
53 . . . . . . . . NamePos: 6:5
54 . . . . . . . . Name: "println"
55 . . . . . . . }
56 . . . . . . . Lparen: 6:12 |左括号的位置
57 . . . . . . . Args: []ast.Expr (len = 1) { |参数列表
58 . . . . . . . . 0: *ast.Ident { |是一个符号INDENT,并且指向的是32行的x
59 . . . . . . . . . NamePos: 6:13
60 . . . . . . . . . Name: "x"
61 . . . . . . . . . Obj: *(obj @ 32)
62 . . . . . . . . }
63 . . . . . . . }
64 . . . . . . . Ellipsis: -
65 . . . . . . . Rparen: 6:14 |右括号的位置
66 . . . . . . }
67 . . . . . }
68 . . . . }
69 . . . . Rbrace: 8:1
70 . . . }
71 . . }
72 . }
73 . Scope: *ast.Scope { |最顶级的作用域
74 . . Objects: map[string]*ast.Object (len = 1) {
75 . . . "main": *(obj @ 11)
76 . . }
77 . }
78 . Unresolved: []*ast.Ident (len = 1) { |这里有个没有定义的符号println,是因为是内置符号,会另外处理
79 . . 0: *(obj @ 52) |从源文件上是表现不出来的.
80 . }
81 . Comments: []*ast.CommentGroup (len = 1) { |评论列表,以及位置和内容.
82 . . 0: *ast.CommentGroup {
83 . . . List: []*ast.Comment (len = 1) {
84 . . . . 0: *ast.Comment {
85 . . . . . Slash: 4:2
86 . . . . . Text: "// comments"
87 . . . . }
88 . . . }
89 . . }
90 . }
91 }

这就是Go的整个语法分析和最后产生的语法树的结构.

废话说了这么多其实实现很简单,问题是如何把一个语言的spec定义好,很重要,早期语言设计不是很固定的.都是慢慢尝试不断改进的过程.最早的一次spec文档[6]其实和现在差了很多很多.就是把TOKEN记号流从左至右匹配规则(可能会向前看几个token),然后递归解析语法树,最后得到AST.
我在我的字符画转换器里用的也是类似的方式[7],做了自顶向下递归解析语法的方式,但是错误处理都是速错,不会做错误恢复找到一个可以同步的节点继续分析.
所以这里补充一点,Go是如何进行错误处理的同步问题,寄希望于能够向使用者提供更多的错误.主要是parser当中的两个结构

1
2
syncPos token.Pos // last synchronization position
syncCnt int // number of calls to syncXXX without progress

syncPos错误的同步位置,也就类似于游戏的存档点,如果发生错误那就从这个地方开始跳过(BadStmt|BadExpr)继续解析,在每次完成语句,申明或者表达式的解析之后就会保存一个同步点.虽然这种继续解析的行为不一定能够给出很精确的错误提示,但的确够用了.当然如果错误实在太多了,从同步点恢复也没有太大意义,就会主动放弃,所以记录了没有成功解析而同步的次数.

因为之前造过轮子了,所以我发现其实编译器的前端用手写是一个很繁琐并且需要花很多时间去做的一件事情,如果语言有设计良好,那么也至少需要花实现的时间,如果设计不好,实现也要跟着修修补补,那就更麻烦,虽然整个编译器的前端也就不到万行代码,但是的确是很考验耐心的一件事情,而且用递归向下的方式解析也没什么效率问题,编译器编译慢一点也不是很要紧,所以有轮子还是用轮子吧,这只是一件苦力活,的确没什么高科技.

最后附带一个用Go实现的Go语法的子集的动态语言版本,只有几十行.

https://gist.github.com/ggaaooppeenngg/dff0fff8f0c9194d93c70550d50edbfa

参考:

  1. http://en.wikipedia.org/wiki/Backus%E2%80%93Naur_Form
  2. https://golang.org/ref/spec
  3. https://zh.wikipedia.org/wiki/Yacc
  4. https://zh.wikipedia.org/zh-cn/Yacc
  5. https://zh.wikipedia.org/wiki/LALR语法分析器
  6. https://github.com/golang/go/commit/18c5b488a3b2e218c0e0cf2a7d4820d9da93a554

词法分析一般是编译器的第一部分,而且词法分析很简单,就是一个有限状态机.
开始词法分析的过程就是把源文件转换成一组预先定义好的token的过程.
这一组被统一表现的token之后会被送入语法分析器进行语法解析,这里我们主要关注如何进行词法分析.

做词法分析就几种方法:

  1. 直接使用工具比如lex.
  2. 使用更低一层的正则表达式.
  3. 使用状态动作,构造一个状态机.

而真正实现一个语言的话,使用工具没有什么错,但是问题是,很难获得正确的错误提示.
工具生成的错误处理很弱.而且需要学习另一门规则或特定的语法.生成的代码可能性能不好,难以优化,但是用工具可以非常简单实现词法分析.
早期编译器的设计阶段都会使用lex来做词法分析器,比如gcc和Go都是这么做的,但是到了后期一个真正生产化的语言可能不能依赖生成的代码,而需要做自己特定的修改和优化,所以自己实现一个词法分析器就显得比较重要了.

正则表达被人诟病的一个话题就是效率问题,比如perl拥有功能最强大的正则表达式,但是整个正则表达式引擎的效率却很低,Go在这方面牺牲了一些正则表达式的特性来保证正则表达式的效率不至于过低,但是正则表达式对于大量文本处理体现的弱势却是很明显的.因为可能我们要处理的状态其实不需要一个繁重的正则表达来解决.

其实实现一个词法分析器很简单,而且这种技能是基本不会变的,如果写过一次,以后都是同样的实现方式.

先看一下Go的实现,在Go的源码下面go/token/token.go目录里面是这么定义token的.

1
2
// Token is the set of lexical tokens of the Go programming language.
type Token int

其实就是个枚举类型,对于每种类型的字面值都有对应的token.
实际上这个只能算是一个token的类型.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

// The list of tokens.
const (
// Special tokens
ILLEGAL Token = iota
EOF
COMMENT

literal_beg
// Identifiers and basic type literals
// (these tokens stand for classes of literals)
IDENT // main
INT // 12345
FLOAT // 123.45
IMAG // 123.45i
CHAR // 'a'
STRING // "abc"
// 省略
)

枚举所有可以碰到的token类型.

go/token/position.go 当中是关于token位置相关的定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
// -----------------------------------------------------------------------------
// Positions

// Position describes an arbitrary source position
// including the file, line, and column location.
// A Position is valid if the line number is > 0.
//
type Position struct {
Filename string // filename, if any
Offset int // offset, starting at 0
Line int // line number, starting at 1
Column int // column number, starting at 1 (byte count)
}

这个很简单就是标示在文件中的位置,比较有意思的是Pos的定义type Pos int,这是位置标示的紧凑表示.接下来看看Pos和Position之间是如何转换的.

首先定义了一个FileSet,可以理解为把File的内容字节按顺序存放的一个大数组,而某个文件则属于数组的一个区间[base,base+size]中,base是文件的第一个字节在大数组中的位置,size是这个文件的大小,某个文件中的Pos是在[base,base+size]这个区间里的一个小标.

所以最后Pos能够压缩成一个整数来表示一个文件当中的位置,当需要使用的使用再从FileSet中转换出完整的Position对象.

go/token/serialize.go 是对FileSet序列化,这里就略过了.

所以整个go/token包只是对token的一些定义和转化,词法分析的部分在go/scanner当中.

scan的主流程如下,主体是一个switch case表示的状态机,
比如碰到字符那么扫描到不为字符为止就作为一个标识符,比如碰到数字那么可能按照扫描数字,然后向后看一次小数字再扫描数字,直到没有数字为止.
scan每次会返回一个被扫描的token,压缩表示的位置,和字面值的字符串,这样就能够把一个源文件转化成一个token的记号流,也就是tokenize或者lexical analysis的过程.

func (s *Scanner) Scan() (pos token.Pos, tok token.Token, lit string) {
scanAgain:
        s.skipWhitespace() 
        // current token start
        pos = s.file.Pos(s.offset)
        // determine token value
        insertSemi := false
        switch ch := s.ch; {
    /* 字符开头,开始扫描标识符 */
        case isLetter(ch):
                lit = s.scanIdentifier()
                if len(lit) > 1 {
                        // keywords are longer than one letter - avoid lookup otherwise
                        tok = token.Lookup(lit)
                        switch tok {
                        case token.IDENT, token.BREAK, token.CONTINUE, token.FALLTHROUGH, token.RETURN:
                                insertSemi = true
                        }
                } else {
                        insertSemi = true
                        tok = token.IDENT
                } 
    /* 数字开头,扫描数字 */
        case '0' <= ch && ch <= '9':
                insertSemi = true
                tok, lit = s.scanNumber(false)
        default:

1
2
3

看一下例子的结果.

func ExampleScanner_Scan() { // src is the input that we want to tokenize. // 需要记号化的源文件 src := []byte("cos(x) + 1i*sin(x) // Euler") // Initialize the scanner. var s scanner.Scanner fset := token.NewFileSet() // positions are relative to fset // 添加到文件集合中 file := fset.AddFile("", fset.Base(), len(src)) // register input "file" // 初始化scanner s.Init(file, src, nil /* no error handler */, scanner.ScanComments) // Repeated calls to Scan yield the token sequence found in the input. for { pos, tok, lit := s.Scan() if tok == token.EOF { break } fmt.Printf("%s\t%s\t%q\n", fset.Position(pos), tok, lit) } // 不断扫描就能得到如下结果 // 词法分析就是做这样一件事情. // output: // 1:1 IDENT "cos" // 1:4 ( "" // 1:5 IDENT "x" // 1:6 ) "" // 1:8 + "" // 1:10 IMAG "1i" // 1:12 * "" // 1:13 IDENT "sin" // 1:16 ( "" // 1:17 IDENT "x" // 1:18 ) "" // 1:20 ; "\n" // 1:20 COMMENT "// Euler" } ``` 我在我的数据结构字符画生成工具[1]里面就实现了一个词法分析器,方便我用简单的语法构造一个字符画,然后插入到注释中辅助解释. 唯一的不同的是,我使用了channel读取token记号,来增加并发,而go本身的记号化是串行的,当然,这点区别其实没有多大,而且这个设计 在Go的模板包里面使用了,Rob Pike也有过相关的演讲[2]. 1. https://github.com/ggaaooppeenngg/cpic/blob/master/lex.go 2. http://cuddle.googlecode.com/hg/talk/lex.html#landing-slide

commit message

git commit message 是我们经常要用到的东西,但是大部分人都写得很随便,翻 git log 看到的信息也是不明觉厉,这里结合其他资料阐述一下如何写好一个 commit message。

其实 git commit message 对于不同的项目有不同的要求。 比如 Angular 的 commit message
标题要加分类,比如是属于 refactor 还是 docs , 还有影响范围(例如 属于全部、*, 还是编译器 $compiler),但是在 Linux Kernel 和类似的 GNU style 的项目里面是类似这样的 commit header。

“mm, tracing: unify mm flags handling in tracepoints and printk”

如果没有子系统要求开头大写,如果有涉及的子系统就说明涉及的子系统并且跟上冒号做解释,冒号后的小写。这个根据具体的场景有不同的要求,但是核心就是简洁明了,另外要使用祈使句。

这样的好处,就能通过git log --pretty=format:%s来遍历所有的 log message,对于修改一目了然。

commit header

第一条规则因项目而异。
比如 Angular 的规则是:<type>(<scope>): <subject>

type(必需),scope(可选)和 subject(必需)

angular 要求的 type 有几种

  • feat: 新功能 (feature)
  • fix: 修补 bug
  • docs: 文档 (documentation)
  • style: 格式(不影响代码运行的变动)
  • refactor: 重构(即不是新增功能,也不是修改 bug 的代码变动)
  • test: 增加测试
  • chore: 构建过程或辅助工具的变动

scope 表示影响的范围,subject 开头小写,使用祈使句,结尾不用。号。

但是在 runc 里面要求比较简单,使用祈使句,并且开头大写就可以,结尾不带。号。

kernel 的 subject 如果带上子系统比如 (mm),就要 mm: 开头,然后不用大写。
不然也要开头大写。总之 subject 的部分要求根据项目不同有一些不同,但是 body 部分要求是类似的。

commit body

body 要和 header 空格一行,每行不要超过 72 个字符,并且对改动进行解释。

例如:

1
2
3
4
5
6
7
8
9
10
在 50 个字符内概括你的改动标题

更详细的解释,限制到 72 个字符,和标题隔开一行。
有什么副作用或者后遗症也要指出。

如果有任务管理器,可以在结尾指定。

Resolves: #123
See also: #456, #789

例子

Angular

1
2
3
4
5
6
7
8
fix($compile): couple of unit tests for IE9

Older IEs serialize html uppercased, but IE9 does not...
Would be better to expect case insensitive, unfortunately jasmine does
not allow to user regexps for throw expectations.

Closes #392
Breaks foo.bar api, foo.baz should be used instead

GNU style

1
2
3
4
5
6
7
8
9
10
11
Author: Aleksa Sarai <asarai@suse.de>
Date: Sun Mar 13 04:53:20 2016 +1100

libcontainer: add pids.max to PidsStats

In order to allow nice usage statistics (in terms of percentages and
other such data), add the value of pids.max to the PidsStats struct
returned from the pids cgroup controller.

Signed-off-by: Aleksa Sarai <asarai@suse.de>

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Author: Alexander Morozov <lk4d4@docker.com>
Date: Fri Mar 27 10:50:32 2015 -0700

Use syscall.Kill instead of p.cmd.Process.Kill

We need this to unmask syscall.ESRCH error, which handled in docker and
can be handled by other clients.

Closes #457

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
Acked-by: Hugh Dickins <hughd@google.com>
Reviewed-by: Michal Hocko <mhocko@suse.com>

在进行 code review 的时候,一般会带上签名 Signed-off-by 表示自己对这段改动负责,matainer 也会查看这段代码如果理解这段代码并且进行了测试同样对这段代码负责也要签名,如果只是保证自己觉得代码无误要使用 Reviewed-by,如果表示只是知道这个带动就带上Acked-bygit commit -s 可以自动带上 Signed-off-by

总之,一个好的 commit message 是一个好习惯,如果项目从一开始打 commit message 就很随便,那 commit log 的意义就没有了,导致根本没人会看。如果有意义整个代码就有据可查了,看起来也很漂亮。

参考:

  1. Commit message 和 Change log 编写指南
  2. Submitting patches: the essential guide to getting your code into the kernel

本文主要讨论 timer heap 在 Go 中的管理,以及运行时对于时间是如何获取的问题,从而引出一个结论,我们对于 timer 的准确度可以有多大的依赖。

首先我们看一下 Go 是如何获取时间的,找到time.Now, 发现最终调用的是下面这个汇编函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

// func now() (sec int64, nsec int32)
TEXT time·now(SB),NOSPLIT,$16
// Be careful. We're calling a function with gcc calling convention here.
// We're guaranteed 128 bytes on entry, and we've taken 16, and the
// call uses another 8.
// That leaves 104 for the gettime code to use. Hope that's enough!
// 这里只能保证调用 gettime 函数的时候有 104 个字节给这个函数作为栈
// 因为返回值用了 16 个字节,这个函数本身会用 8 个字节,到调用 gettime 的时候只剩 104 个字节可以用。
MOVQ runtime·__vdso_clock_gettime_sym(SB), AX
CMPQ AX, $0
JEQ fallback
MOVL $0, DI // CLOCK_REALTIME
LEAQ 0(SP), SI
CALL AX
MOVQ 0(SP), AX // sec
MOVQ 8(SP), DX // nsec
MOVQ AX, sec+0(FP)
MOVL DX, nsec+8(FP)
RET
fallback:
LEAQ 0(SP), DI
MOVQ $0, SI
MOVQ runtime·__vdso_gettimeofday_sym(SB), AX
CALL AX
MOVQ 0(SP), AX // sec
MOVL 8(SP), DX // usec
IMULQ $1000, DX
MOVQ AX, sec+0(FP)
MOVL DX, nsec+8(FP)
RET

第一行TEXT time·now(SB),NOSPLIT,$16, 其中time·now(SB)表示函数now的地址,NOSPLIT标志不依赖参数,$16表示返回的内容是 16 个字节,Go 的汇编语法具体可以参考 Go asm 文档 [1].

首先是取出__vdso_clock_gettime_sym(SB)(这是一个符号,指向的其实是 clock_gettime 函数,man clock_gettime 可以看到 glibc 的定义)的地址,如果符号不为空的话就把栈顶的地址计算出来传入 SI(LEA 指令). DI 和 SI 分别是system call的前两个参数的寄存器,这个相当于调用clock_gettime(0,&ret). fallback 分支是当对应的符号没有初始化就退而求其次调用gettimeofday(man gettimeofday 可以看到 libc 的定义)这个函数。

Go 的函数调用确保会有至少 128 个字节的栈(注意不是 goroutine 的栈), 具体可以参考runtime/stack.go里的_StackSmall, 但是进入对应的 c 函数以后,栈的增长就不能够由 Go 控制了,所以剩下的 104 个字节要确保这个调用不会栈溢出.(不过一般不会,因为这两个获取时间的函数不复杂).

先说函数符号的事,查阅相关资料可以看出来,VDSO 是 Virtual Dynamic Shared Object, 就是内核提供的虚拟的.so, 这个.so 文件不在磁盘上,而是在内核里头,映射到了用户空间。具体细节可以看 so 上的一个回答 [2]. 其中一个链接中有一些描述 [3].

One way of obtaining a fast gettimeofday() is by writing the current time in a fixed place, on a page mapped into the memory of all applications, and updating this location on each clock interrupt.
These applications could then read this fixed location with a single instruction - no system call required.
Concerning gettimeofday(), a vsyscall version for the x86-64 is already part of the vanilla kernel.
Patches for i386 exist. (An example of the kind of timing differences: John Stultz reports on an experiment where he measures gettimeofday() and finds 1.67 us for the int 0x80 way, 1.24 us for the sysenter way, and 0.88 us for the vsyscall.)

简单来说就是一种加速系统调用的机制加兼容模式。像gettimeofday这样的函数如果像普通系统调用一样的话,有太多的上下文切换,特别是一些频繁获取时间的程序,所以一般gettimeofday是通过这种机制调用的. 单独在用户空间映射了一段地址,在里面的是内核暴露的一些系统调用,具体可能是 syscall 或者 int 80 或者 systenter 的方式,由内核决定更快的调用方式,防止出现 glibc 版本和 kernel 版本不兼容的问题.(vDSO 是 vsyscall 的一个升级版本,避免了一些安全问题,映射不再静态固定).

从内核中可以看出获取系统调用是由时间中断更新的,其调用栈如下 [5].

Hardware timer interrupt (generated by the Programmable Interrupt Timer - PIT)
-> tick_periodic();
-> do_timer(1);
-> update_wall_time();
-> timekeeping_update(tk, false);
-> update_vsyscall(tk);

update_wall_time使用的是时钟源 (clock source) 的时间,精度能达到 ns 级别。
但是一般 Linux kernel 的时间中断是 100HZ, 高的也有 1000HZ 的,也就是说这个时间一般是在 10ms 或者 1ms 中断处理时更新一次。
从操作系统的角度看时间粒度大概是 ms 级别. 但是,这个只是一个基准值. 每次获取时间的时候还是会取得时钟源
的时间(时钟源有很多种 [9],可能是硬件的计数器,也可能就是中断的 jiffy,一般是可以达到 ns). 获取时间的精度还是能到 us 和几百 ns 之间。
因为系统调用本身还要花时间,理论上更精确的时间就不是通过这个系统调用获得了,而是需要直接用汇编指令”rdtsc”读 CPU 的 cycle 来得到精确时间。

接下来说,获取时间的函数符号的寻找过程又涉及了 ELF 的一些内容,其实就是动态链接的过程,把.so 中的函数符号的地址解析到并且存入函数指针中,比如__vdso_clock_gettime_sym, 关于链接的内容可以阅读”程序员的自我修养”这本书 [4].

其他函数例如TEXT runtime·nanotime(SB),NOSPLIT,$16也是类似的过程. 这个函数就能获取时间。

看完时间获取的过程,再来看看 Go 运行时的部分,瞧瞧timer heap是如何管理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Package time knows the layout of this structure.
// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
// For GOOS=nacl, package syscall knows the layout of this structure.
// If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer.
type timer struct {
i int // heap index

// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(now, arg) in the timer goroutine, so f must be
// a well-behaved function and not block.
when int64
period int64
f func(interface{}, uintptr)
arg interface{}
seq uintptr
}

timer 是以 heap 的形式管理的,heap 是个完全二叉树,用数组就可以存下,i 是 heap 的 index.
when 是 goroutine 被唤醒的时间,period 是唤醒的间隙,下次唤醒的时间是 when+period, 依次类推。
调用函数f(now, arg), now 是时间戳。

1
2
3
4
5
6
7
8
9
var timers struct {
lock mutex
gp *g
created bool
sleeping bool
rescheduling bool
waitnote note
t []*timer
}

整个的 timer heap 由timers管理.gp 指向的是一个g的指针,也就是调度器当中的 G 结构,一个 goroutine 的状态维护的结构,指向的是时间管理器的单独的一个 goroutine, 这个不属于用户而是运行时启动的 goroutine.(当然只有使用了 timer 才会启动,不然不会有这个 goroutine).lock保证 timers 的线程安全.waitnote 是一个条件变量。

看一下addtimer函数,他是整个计时器开始的入口,当然本身只是加了锁。

1
2
3
4
5
6
func addtimer(t *timer) {
lock(&timers.lock)
addtimerLocked(t)
unlock(&timers.lock)
}

再看addtimerLocked可以看到就是入 heap 的过程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Add a timer to the heap and start or kick the timer proc.
// If the new timer is earlier than any of the others.
// Timers are locked.
func addtimerLocked(t *timer) {
// when must never be negative; otherwise timerproc will overflow
// during its delta calculation and never expire other runtime·timers.
if t.when < 0 {
t.when = 1<<63 - 1
}
t.i = len(timers.t)
timers.t = append(timers.t, t)
siftupTimer(t.i)
if t.i == 0 {
// siftup moved to top: new earliest deadline.
if timers.sleeping {
timers.sleeping = false
notewakeup(&timers.waitnote)
}
if timers.rescheduling {
timers.rescheduling = false
goready(timers.gp, 0)
}
}
if !timers.created {
timers.created = true
go timerproc()
}
}

先从下面的if !timers.created分支看起,如果 timers 对应没有创建就 go 一个 timerproc, timeproc 的定义如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// Timerproc runs the time-driven events.
// It sleeps until the next event in the timers heap.
// If addtimer inserts a new earlier event, addtimer1 wakes timerproc early.
func timerproc() {
timers.gp = getg()
for {
lock(&timers.lock)
timers.sleeping = false
now := nanotime()
delta := int64(-1)
for {
if len(timers.t) == 0 {
delta = -1
break
}
t := timers.t[0]
delta = t.when - now
if delta > 0 {
break
}
if t.period > 0 {
// leave in heap but adjust next time to fire
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(0)
} else {
// remove from heap
last := len(timers.t) - 1
if last > 0 {
timers.t[0] = timers.t[last]
timers.t[0].i = 0
}
timers.t[last] = nil
timers.t = timers.t[:last]
if last > 0 {
siftdownTimer(0)
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
unlock(&timers.lock)
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq)
lock(&timers.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
timers.rescheduling = true
// 如果没有 timers 剩余,就让 G 进入 sleep 状态。
// goparkunlock 的作用是解开 G 和 M 的联系,让 goroutine sleep 而 M
// 寻找下一个 G 来执行。
goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
timers.sleeping = true
// 清零 waitnote.
noteclear(&timers.waitnote)
unlock(&timers.lock)
// 调用 M 结构对应的 OS-dependent, OS-thread 的信号量让 M 进入 sleep 状态。
// 只会在超时的时候或者条件变量 waitnote 触发才会被唤醒。
notetsleepg(&timers.waitnote, delta)
}
}

timeproc 的主体就是从最小堆中取出 timer 然后回调函数,如果 period 大于 0 就把 timer 的 when 修改并且存回 heap 并调整,如果小于 0 就直接删除(对应的分别是标准库的 ticker 和 timer), 进入 OS 信号量中睡眠等待下次处理,(当然可以被 waitnote 变量唤醒), 这样一直循环,说白了就是靠信号量的超时机制来实现了运行时的 sleep[8]. 然后当完全没有 timer 剩余的时候,G 结构表示的 goroutine 进入睡眠状态,承载 goroutine 的 M 结构所代表的 OS-thread 会寻找其它可运行的 goroutine 执行. 具体关于运行时调度的细节可以参考雨痕大叔的笔记 [7].

看完这一路 case, 回到addtimerLocked, 当加入一个新的timer时,会作依次检查,也就是说最新插入的timer是在堆顶的话,会唤醒睡眠的 timergorountine 开始从 heap 上检查过时的timer并执行. 这里的唤醒和之前的睡眠是两种对应的状态timers.sleeping是进入了 M 的 os 信号量睡眠。
timers.rescheduling是进入了 G 的调度睡眠,而 M 并没有睡眠,让 G 重新进入可运行状态. 时间超时和新 timer 的加入,两者结合成为了timer运行时的驱动力。

看完了 time 的实现,再回过头来回答文章最初的问题”timer 可以有多精确”, 其实和两个原因有关,一个是操作系统本身的时间粒度,一般是 us 级别的,时间基准更新是 ms 级别的,时间精度能到 us 级别,另外一个就是timer本身的 goroutine 的调度问题,如果运行时的负载过大或者操作系统本身的负载过大,会导致timer本身的 goroutine 响应不及时,导致 timer 触发并不及时,可能导致一个 20ms 的 timer 和一个 30ms 的 timer 之间像是并发的一样(这也是笔者碰到的问题,特别是一些被 cgroup 限制了的容器环境,cpu 时间分配很少的条件下), 所以有时候我们不能过分相信 timer 的时序来保证程序的正常运行.NewTimer的注释也强调了”NewTimer creates a new Timer that will send the current time on its channel after at least duration d.”, 没有人能保证 timer 按点执行,当然如果你的间隔离谱的大的话其实就可以忽略这方面的影响了:)


参考链接:

1.  Go Assembly https://golang.org/doc/asm
2.  http://stackoverflow.com/questions/19938324/what-are-vdso-and-vsyscall
3.  http://www.win.tue.nl/~aeb/linux/lk/lk-4.html
4.  http://book.douban.com/subject/3652388/
5.  http://linuxmogeb.blogspot.com/2013/10/how-does-clockgettime-work.html
6.  时钟源 http://blog.csdn.net/droidphone/article/details/7975694
7.  Go 源码剖析 https://github.com/qyuhen/book/blob/master
    /Go%201.5%20%E6%BA%90%E7%A0%81%E5%89%96%E6%9E%90.pdf
8.  带超时的信号量的实现 http://lxr.free-electrons.com/source/kernel/locking/semaphore.c#L204
9.  内核的时钟源 http://blog.csdn.net/droidphone/article/details/7975694

接着上篇文章阐述了VFS以后,这篇文章主要想讲述一下在内核当中如何创建一个文件系统.其实根据上一篇博客来说,我们的文件系统主要能够满足VFS的抽象,就可以在内核中构建一个自己的文件系统.一个文件系统满足的功能其实就是针对文件的增删改查,目录的管理,还有链接等等,这是从用户的角度来看,而文件系统本身也要有自己的状态信息,维护在超级块里,可以被挂载,然后向下要提交IO请求(一般是磁盘也可以是网络,甚至是内存).这里的实现我们选择在内存当中实现一个文件系统.

代码参考了《Linux内核探秘》[1],以及内核代码ramfs的部分[2],基于内存构建一个文件系统.完整代码可以在这里查看,代码是基于2.6.32的内核的,当中涉及了一些模块编程的内容可以参考”The Linux Kernel Module Programming Guide”[3]

为了实现一个文件系统,首先我们需要定义一个文件系统.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <linux/module.h>
#include <linux/fs.h>

static struct file_system_type au_fs_type = {
.owner = THIS_MODULE,
.name = "aufs",
};


static int __init aufs_init(void)
{
register_filesystem(&au_fs_type);
return 0;
}

static void __exit aufs_exit(void)
{
unregister_filesystem(&au_fs_type);
}

module_init(aufs_init);
module_exit(aufs_exit);
MODULE_LICENSE("GPL");

执行make,insmod aufs.ko,然后cat /proc/filesystems | grep aufs就能看到aufs名列其中,说明我们的文件系统已经注册到了内核当中.接下来我们需要挂载文件系统,但是挂载的过程中会导致panic,应为我们还没有定义文件系统super_block的获取和释放函数.
挂载文件系统的时候依赖这两个函数,不然就会导致空指针.接下来我们定义文件系统的两个接口.”kill_sb”使用的是内核函数kill_litter_super,它会对super_block的内容进行释放.”get_sb”这个接口调用了”aufs_get_sb”函数,这个函数也是调用了内核函数get_sb_nodev,这个函数会创建对应的super_block,这个函数针对的是不依赖/dev的文件系统,如果依赖/dev的话,需要调用别的函数,另外会根据/dev对应的设备获取super_block(比如说ext4会读对应的被格式化后的块设备的头来实例化超级块).我们需要传入一个函数指针用于填充空白的super_block,就是”aufs_fill_super”,然而”aufs_fill_super”也调用了内核函数.

看一下具体代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static int aufs_fill_super(struct super_block *sb, void *data, int silent)
{
struct inode *inode = NULL;
struct dentry *root;
int err;

sb->s_maxbytes = MAX_LFS_FILESIZE;
sb->s_blocksize = PAGE_CACHE_SIZE;
sb->s_blocksize_bits = PAGE_CACHE_SHIFT;
sb->s_magic = AUFS_MAGIC;

inode = new_inode(sb);
if (!inode) {
err = -ENOMEM;
goto fail;
}
inode->i_mode = 0755;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
inode->i_mode |= S_IFDIR;
inode->i_fop = &simple_dir_operations;
inode->i_op = &simple_dir_inode_operations;
// inc reference count for ".".
inc_nlink(inode);

root = d_alloc_root(inode);
sb->s_root = root;
if (!root) {
err = -ENOMEM;
goto fail;
}
return 0;
fail:
return err;
}

为了填充super_block,需要初始化sb以及创建根目录的inode和dentry.s_blocksize指定了文件系统的块大小,一般是一个PAGE_SIZE的大小,这里的PAGE_CACHE_SIZEPAGE_SIZE是一样的,PAGE_CACHE_SIZE_SHIFT是对应的位数,所以
s_blocksize_bits是块大小的bit位位数. 接着是inode初始化,new_inode为sb创建一个关联的inode结构体,并对inode初始化,包括uid,gid,i_mode.对应的i_fopi_op使用了内核默认的接口simple_dir(_inode)_operations,后面会仔细讨论,这里先加上方便展示代码,如果对应的接口未定义的话,初始化的时候文件系统根目录会出现不会被认作目录的情况.

接下来安装模块,然后挂载文件系统,mount -t aufs none tmp,因为我们的文件系统没有对应的设备类型所以参数会填none,对应的目录是tmp,这样tmp就成为了aufs的根目录,如果ls一把tmp,里面是什么都没有的,我们cd tmp && touch x返回的结果是不被允许,因为我们还没有定义对应的接口,不能创建文件.

我们继续,我们让这个文件系统可以创建目录,那我们需要定义目录inode的接口,一组inode_operations和一组file_operations.以下是实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static struct inode *aufs_get_inode(struct super_block *sb, int mode, dev_t dev)
{
struct inode *inode = new_inode(sb);
if (inode) {
inode->i_mode = mode;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
switch (mode & S_IFMT) {
case S_IFDIR:
// TODO:
inode->i_op = &aufs_dir_inode_operations;
inode->i_fop = &simple_dir_operations;
/* i_nlink = 2 */
inc_nlink(inode);
}
}
return inode;
}

static int
aufs_mknod(struct inode *dir, struct dentry *dentry, int mode, dev_t dev)
{
struct inode *inode = aufs_get_inode(dir->i_sb, mode, dev);
int error = -ENOSPC;
if (inode) {
// inherits gid.
if (dir->i_mode & S_ISGID) {
inode->i_gid = dir->i_gid;
if (S_ISDIR(mode)) inode->i_mode |= S_ISGID;
}
d_instantiate(dentry, inode);
// get dentry reference count.
dget(dentry);
error = 0;
dir->i_mtime = dir->i_ctime = CURRENT_TIME;
}
return error;
}

static int
aufs_mkdir(struct inode *dir, struct dentry *dentry, int mode)
{
int reval;
retval = aufs_mknod(dir, dentry, mode | S_IFDIR, 0);
printk("aufs: mkdir");
if (!retval)
inc_nlink(dir); // .
return retval;
}

static int
aufs_create(struct inode *dir, struct dentry *dentry, int mode, struct nameidata *nd)
{
return aufs_mknod(dir, dentry, mode | S_IFREG, 0);
}
static const struct inode_operations aufs_dir_inode_operations = {
.create = aufs_create,
.lookup = simple_lookup, // get dentry.
.link = simple_link, // same inode, different dentry.
.unlink = simple_unlink,
.symlink = aufs_symlink, // 之后再讲,目前没有做mapping会panic.
.mkdir = aufs_mkdir,
.rmdir = simple_rmdir,
.mknod = aufs_mknod,
.rename = simple_rename, // exchange dentry and dir.
};

其实很简单,aufs_get_inode只创建目录类型的inode,并且赋值对应的函数指针.file_operations使用的默认接口,这个后面再提,inode_operations主要是inode的创建,aufs_create和aufs_mkdir都是对aufs_mknod针对不同mode的封装,aufs_symlink暂时不讲,因为inode还没有做mapping,软链的时候不可写会导致panic.进行上面类似的编译和挂载以后我们就能创建简单文件和目录了,但是创建的文件不能做任何操作,因为我们没有定义对应的接口.

挑个接口说一下,比如link接口就是创建了一个dentry指向了同一个inode,并且增加inode的引用计数,unlink就是把dentry删掉,inode保留.

1
2
3
4
5
6
7
8
9
10
11
int simple_link(struct dentry *old_dentry, struct inode *dir, struct dentry *dentry)                                
{
struct inode *inode = old_dentry->d_inode;

inode->i_ctime = dir->i_ctime = dir->i_mtime = CURRENT_TIME;
inc_nlink(inode);
atomic_inc(&inode->i_count);
dget(dentry);
d_instantiate(dentry, inode);
return 0;
}

软链有点复杂,所以放到后面讲.

当我们能够完成目录和文件的创建和删除之后,我们可以继续文件的读写了,换句话说我们要定义普通文件的inode的file_operations接口.
为了能够添加文件我们增加如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static const struct address_space_operations aufs_aops = {
.readpage = simple_readpage,
.write_begin = simple_write_begin,
.write_end = simple_write_end,
/* .set_page_dirty = __set_page_dirty_no_writeback, 内核私有函数 */
};

static const struct file_operations aufs_file_operations = {
.read = do_sync_read, // file read get mapping page and copy to userspace.
.aio_read = generic_file_aio_read,
.write = do_sync_write,
.aio_write = generic_file_aio_write,
.mmap = generic_file_mmap,
.fsync = simple_sync_file,
.splice_read = generic_file_splice_read,
.splice_write = generic_file_splice_write,
.llseek = generic_file_llseek,
};

static const struct inode_operations aufs_file_inode_operations = {
.getattr = simple_getattr,
};

并把aufs_get_inode改成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static struct inode *aufs_get_inode(struct super_block *sb, int mode, dev_t dev)
{
struct inode *inode = new_inode(sb);
if (inode) {
inode->i_mode = mode;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
inode->i_mapping->a_ops = &aufs_aops;
mapping_set_gfp_mask(inode->i_mapping, GFP_HIGHUSER);
mapping_set_unevictable(inode->i_mapping);
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
switch (mode & S_IFMT) {
default:
init_special_inode(inode, mode, dev);
break;
case S_IFDIR:
inode->i_op = &aufs_dir_inode_operations;
inode->i_fop = &simple_dir_operations;
/* i_nlink = 2 for "." */
inc_nlink(inode);
break;
case S_IFREG:
inode->i_op = &aufs_file_inode_operations;
inode->i_fop = &aufs_file_operations;
break;
case S_IFLNK:
inode->i_op = &page_symlink_inode_operations;
break;
}
}
return inode;
}


这样以后我们就能对文件进行读写了,实际上文件的读写首先要依赖于mmap操作,把对应的页映射到虚拟内存当中来进行读写.编译并添加模块再挂载以后我们发现touch的文件可以读写了.
现在具体举一段代码路径分析一下,从read开始.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos)                                
{
struct iovec iov = { .iov_base = buf, .iov_len = len };
struct kiocb kiocb;
ssize_t ret;

init_sync_kiocb(&kiocb, filp);
kiocb.ki_pos = *ppos;
kiocb.ki_left = len;

for (;;) {
ret = filp->f_op->aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
if (ret != -EIOCBRETRY)
break;
wait_on_retry_sync_kiocb(&kiocb);
}

if (-EIOCBQUEUED == ret)
ret = wait_on_sync_kiocb(&kiocb);
*ppos = kiocb.ki_pos;
return ret;
}

read其实还是依赖了aio_read的接口,只不过加上了wait的部分,保证同步,kiocb是”kernel I/O control block”记录I/O的信息,这里标记了偏移和剩余量.
再看aio_read的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

ssize_t
generic_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
{
struct file *filp = iocb->ki_filp;
ssize_t retval;
unsigned long seg;
size_t count;
loff_t *ppos = &iocb->ki_pos;

count = 0;
retval = generic_segment_checks(iov, &nr_segs, &count, VERIFY_WRITE);
if (retval)
return retval;

/* coalesce the iovecs and go direct-to-BIO for O_DIRECT */
if (filp->f_flags & O_DIRECT) {
loff_t size;
struct address_space *mapping;
struct inode *inode;

mapping = filp->f_mapping;
inode = mapping->host;
if (!count)
goto out; /* skip atime */
size = i_size_read(inode);
if (pos < size) {
retval = filemap_write_and_wait_range(mapping, pos,
pos + iov_length(iov, nr_segs) - 1);
if (!retval) {
retval = mapping->a_ops->direct_IO(READ, iocb,
iov, pos, nr_segs);
}
if (retval > 0)
*ppos = pos + retval;
if (retval) {
file_accessed(filp);
goto out;
}
}
}

for (seg = 0; seg < nr_segs; seg++) {
read_descriptor_t desc;

desc.written = 0;
desc.arg.buf = iov[seg].iov_base;
desc.count = iov[seg].iov_len;
if (desc.count == 0)
continue;
desc.error = 0;
do_generic_file_read(filp, ppos, &desc, file_read_actor);
retval += desc.written;
if (desc.error) {
retval = retval ?: desc.error;
break;
}
if (desc.count > 0)
break;
}
out:
return retval;
}

struct iovec是一个数组每个元素是一段数据的开始和长度,这个结构和后面的io有关.
如果是不是DIRECT_IO的话,就会把iovector组装成read_descriptor_t传入do_generic_file_read当中.do_generic_file_read的读的具体过程是

1
2
3
4
5
6
7
8
9
10

struct address_space *mapping = filp->f_mapping;
...
for {
index = *ppos >> PAGE_CACHE_SHIFT; // 循环读取ppos,ppos每次都会更新,然后右移,相当于模一个页的大小,找到以页偏移的单位.
find_get_page(mapping,index); // 获取对应的page引用.
mapping->a_ops->readpage(filp, page); // 读取对应的页.
...
page_cache_release(page);
}

一般是通过mapping获取页缓存中的页并且读到用户空间中,在完成之后释放引用.读页的函数就是把page缓存刷掉.

1
2
3
4
5
6
7
8
int simple_readpage(struct file *file, struct page *page)                                 
{
clear_highpage(page);
flush_dcache_page(page);
SetPageUptodate(page);
unlock_page(page);
return 0;
}

获取页是通过mapping的radix_tree来找到对应的page引用.
写的过程也类似,同步写也调用了异步写的接口,最后把用户空间的数据拷贝到页当中.address_space_operations就是对应vma映射的接口.

其中page <-> virtual_address的转换依赖于 kmap把页转换成虚拟地址或者逻辑地址,然后对应的读写操作最后都变成读写虚拟内存,或者逻辑内存.

单就构造一个文件系统来说,目的已经达到了,但是凡事不能不求甚解,下一篇博客准备记录一下内存管理相关的内容.

  1. 《Linux 内核探秘》http://book.douban.com/subject/25817503/
  2. ramfs目录 http://lxr.free-electrons.com/source/fs/ramfs/
  3. 内核模块编程教程 http://www.tldp.org/LDP/lkmpg/2.6/html/

UNIX 的哲学之一就是一切皆文件,所以可以看出文件系统在操作系统层面是非常重要的,很多基本单元都是通过文件系统展开的,所以了解文件系统有利于分析整个操作系统的脉络。

在 Linux 当中文件系统千奇百种,比较常见的有 EXT3、EXT4,还有基于内存的 ramfs、tmpfs 和基于网络的 nfs,和基于用户态的 fuse,当然 fuse 应该不能完全的文件系统,只能算是一个能把文件系统实现放到用户态的模块,满足了内核文件系统的接口,他们都是文件系统的一种实现。这个 wiki 上列出了很多 Linux 的文件系统类型。

对于这些文件系统,Linux 做了一层抽象就是 VFS 虚拟文件系统,这个其实就是软件设计必然的过程,对于不同的实现规定统一的接口,也就是定义与实现分离,如果想要自己实现一个文件系统的话只要实现一个满足 VFS 层的文件系统就能加入到内核当中。所以其实内核的文件和我们普通理解的文件其实有点不一样,这里的文件更像是一个接口,只不过最初是从磁盘上的文件衍生过来的,最后抽象成了一种可以对接各种功能的接口。

下面就开始剖析 VFS 的主要内容。

VFS 有几个必不可少的元素,filesystem_type,super_block, dentry, inode, file, vfsmount, nameidata 等等。这里抛开具体的代码,先从概念入手,下一篇博文,我会实现一个简单的文件系统,再基于代码分析具体的概念。

首先是 filesystem_typesuper_block,这两者的关系有点类似于软件中的 classobject 的关系。在内核当中有一个全局的file_systems链表串接了所有的文件系统类型的代表filesystem_type,对于文件系统的注册和删除就是在链表当中增加和删除对应的节点。而super_block就是一个文件系统的实例。本身也是通过全局链表串连起来的。filesystem_type本身非常简单,定义了获取和删除super_block的接口,和一些共同的相关信息。

对于super_block来说超级块定义了文件系统的具体信息和对应文件系统的接口,比如write_super,alloc_inode,sync_fs等,这些都是有具体的文件系统实现的。所有的inode都链接到了super_block

对于文件系统来说挂载点是个很有意思的点,在内核当中挂载点用vfsmount表示,挂载点是文件系统之间的衔接部分,如果要添加一个新的文件系统势必要将文件系统挂载在某个目录下面使得文件系统生效,vfsmount就是这样一个接口。当文件系统挂载以后原目录将不可见。vfsmount的主要内容是vfsmount的拓扑关系以及指向的目录和super_block。所以从宏观的角度来说,整个文件系统的组织是如图这样的。

fs.png

这是文件系统本身这个结构在操作系统里的组织结构,接下来分析文件系统满足 VFS 要包含哪些内容。

第一个要说的就是 dentry,dentry 表现的是文件在文件系统中的树状关系,dentry 也要实现相应的接口,包括 d_deleted_released_compare等接口。
dentry 代表的是目录结构中的一个文件,而文件其实就是没有子目录的文件。dentry 链接到了超级快和父 dentry 和子 dentry 以及对应的 inode。

接下来是 inode, inode 本身代表的一个文件,保存的信息很多,包括文件的大小,创建时间,文件的块大小等参数,以及文件的读写缓存等信息,还要定义对应的针对文件的函数接口,包括增删改查等等。没有文件名,因为它代表的是文件的原信息,具体的路径的表示依赖dentry

dentry 和 inode 的关系是多对一的,即多个 dentry 可以指向同一个文件,这和 linux 当中的文件链接有关。

接下来就是 file,file 虽然叫 file 但是对应的却不像 inode 一样,它对应的是一个进程所打开的文件。例如两个不同的进程打开了磁盘上的同一个文件,那么他们对应的 inode 是相同的,这也是 inode 意义。但是不同进程之间的 file 不是同一个引用,file 本身的结构还是和文件操作有关的。

整个关系如图所示。

file.png

对应的文件目录如下

1
2
3
4
5
6
.
├── a
│   └── ab
└── b
├── bc -> ../a/ab
└── bd

图中展示了一个硬链接代表着 bc 和 ab 的 dentry 指向了同一个inode,硬链接是不同 dentry 指向同一个inode,这也是为什么硬链接不能夸文件系统,因为inode是属于特定文件系统的。
图中其实inode是串联在super_block上的,super_block维护了文件系统中`inodes,因为画上去太乱了所以省略了。

VFS 的整体结构就是这样,接下来简要地说几个具体内容:

所谓的打开文件描述符其实就是进程的 files 数组这个文件描述符表的下标,通过对应的 fd 就能找到 file 结构。例如 dentry,vfsmount 这样的结构都有一个 hashtable 来缓存搜索的内容,这样就能加开目录的遍历搜索。

inode 其实也有一个全局的 hashtable 用于快速查找,inode 本身能代表的东西很多,一切皆文件就体现在他身上,他既可以表示一个 socket,也可以表示一个管道,还可以表示块设备、字符设备,然后就是普通文件了。

以上讲的就是整个内核当中 VFS 层的抽象,并没有牵涉到具体的文件系统,在下一篇博客我将会实现一个简单的文件系统,不就具体的代码分析,来熟悉这里提到的这些概念。
其实了解了概念以后,就会给人一种不过如此的感觉,真正值得玩味的是下层的实现,这也是我后面的博客将会介绍的内容。比如基于磁盘的文件系统更多的是要关注 I\O 层的东西。

其实内核的 I\O 路径是这样的:user space -> VFS -> FS -> I\O layer -> I\O scheduler(optional) -> block_driver -> block_device, 一个 I\O 经过了这些才真正到达了对应的存储上。

一个用户态的系统调用先通过 VFS 找到对应的文件系统再向下传递 I\O,这是 I\O 的一般路径。所以说对于用户来说,一切都是操作文件了。