ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

本文主要讨论 timer heap 在 Go 中的管理,以及运行时对于时间是如何获取的问题,从而引出一个结论,我们对于 timer 的准确度可以有多大的依赖。

首先我们看一下 Go 是如何获取时间的,找到time.Now, 发现最终调用的是下面这个汇编函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

// func now() (sec int64, nsec int32)
TEXT time·now(SB),NOSPLIT,$16
// Be careful. We're calling a function with gcc calling convention here.
// We're guaranteed 128 bytes on entry, and we've taken 16, and the
// call uses another 8.
// That leaves 104 for the gettime code to use. Hope that's enough!
// 这里只能保证调用 gettime 函数的时候有 104 个字节给这个函数作为栈
// 因为返回值用了 16 个字节,这个函数本身会用 8 个字节,到调用 gettime 的时候只剩 104 个字节可以用。
MOVQ runtime·__vdso_clock_gettime_sym(SB), AX
CMPQ AX, $0
JEQ fallback
MOVL $0, DI // CLOCK_REALTIME
LEAQ 0(SP), SI
CALL AX
MOVQ 0(SP), AX // sec
MOVQ 8(SP), DX // nsec
MOVQ AX, sec+0(FP)
MOVL DX, nsec+8(FP)
RET
fallback:
LEAQ 0(SP), DI
MOVQ $0, SI
MOVQ runtime·__vdso_gettimeofday_sym(SB), AX
CALL AX
MOVQ 0(SP), AX // sec
MOVL 8(SP), DX // usec
IMULQ $1000, DX
MOVQ AX, sec+0(FP)
MOVL DX, nsec+8(FP)
RET

第一行TEXT time·now(SB),NOSPLIT,$16, 其中time·now(SB)表示函数now的地址,NOSPLIT标志不依赖参数,$16表示返回的内容是 16 个字节,Go 的汇编语法具体可以参考 Go asm 文档 [1].

首先是取出__vdso_clock_gettime_sym(SB)(这是一个符号,指向的其实是 clock_gettime 函数,man clock_gettime 可以看到 glibc 的定义)的地址,如果符号不为空的话就把栈顶的地址计算出来传入 SI(LEA 指令). DI 和 SI 分别是system call的前两个参数的寄存器,这个相当于调用clock_gettime(0,&ret). fallback 分支是当对应的符号没有初始化就退而求其次调用gettimeofday(man gettimeofday 可以看到 libc 的定义)这个函数。

Go 的函数调用确保会有至少 128 个字节的栈(注意不是 goroutine 的栈), 具体可以参考runtime/stack.go里的_StackSmall, 但是进入对应的 c 函数以后,栈的增长就不能够由 Go 控制了,所以剩下的 104 个字节要确保这个调用不会栈溢出.(不过一般不会,因为这两个获取时间的函数不复杂).

先说函数符号的事,查阅相关资料可以看出来,VDSO 是 Virtual Dynamic Shared Object, 就是内核提供的虚拟的.so, 这个.so 文件不在磁盘上,而是在内核里头,映射到了用户空间。具体细节可以看 so 上的一个回答 [2]. 其中一个链接中有一些描述 [3].

One way of obtaining a fast gettimeofday() is by writing the current time in a fixed place, on a page mapped into the memory of all applications, and updating this location on each clock interrupt.
These applications could then read this fixed location with a single instruction - no system call required.
Concerning gettimeofday(), a vsyscall version for the x86-64 is already part of the vanilla kernel.
Patches for i386 exist. (An example of the kind of timing differences: John Stultz reports on an experiment where he measures gettimeofday() and finds 1.67 us for the int 0x80 way, 1.24 us for the sysenter way, and 0.88 us for the vsyscall.)

简单来说就是一种加速系统调用的机制加兼容模式。像gettimeofday这样的函数如果像普通系统调用一样的话,有太多的上下文切换,特别是一些频繁获取时间的程序,所以一般gettimeofday是通过这种机制调用的. 单独在用户空间映射了一段地址,在里面的是内核暴露的一些系统调用,具体可能是 syscall 或者 int 80 或者 systenter 的方式,由内核决定更快的调用方式,防止出现 glibc 版本和 kernel 版本不兼容的问题.(vDSO 是 vsyscall 的一个升级版本,避免了一些安全问题,映射不再静态固定).

从内核中可以看出获取系统调用是由时间中断更新的,其调用栈如下 [5].

Hardware timer interrupt (generated by the Programmable Interrupt Timer - PIT)
-> tick_periodic();
-> do_timer(1);
-> update_wall_time();
-> timekeeping_update(tk, false);
-> update_vsyscall(tk);

update_wall_time使用的是时钟源 (clock source) 的时间,精度能达到 ns 级别。
但是一般 Linux kernel 的时间中断是 100HZ, 高的也有 1000HZ 的,也就是说这个时间一般是在 10ms 或者 1ms 中断处理时更新一次。
从操作系统的角度看时间粒度大概是 ms 级别. 但是,这个只是一个基准值. 每次获取时间的时候还是会取得时钟源
的时间(时钟源有很多种 [9],可能是硬件的计数器,也可能就是中断的 jiffy,一般是可以达到 ns). 获取时间的精度还是能到 us 和几百 ns 之间。
因为系统调用本身还要花时间,理论上更精确的时间就不是通过这个系统调用获得了,而是需要直接用汇编指令”rdtsc”读 CPU 的 cycle 来得到精确时间。

接下来说,获取时间的函数符号的寻找过程又涉及了 ELF 的一些内容,其实就是动态链接的过程,把.so 中的函数符号的地址解析到并且存入函数指针中,比如__vdso_clock_gettime_sym, 关于链接的内容可以阅读”程序员的自我修养”这本书 [4].

其他函数例如TEXT runtime·nanotime(SB),NOSPLIT,$16也是类似的过程. 这个函数就能获取时间。

看完时间获取的过程,再来看看 Go 运行时的部分,瞧瞧timer heap是如何管理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Package time knows the layout of this structure.
// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
// For GOOS=nacl, package syscall knows the layout of this structure.
// If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer.
type timer struct {
i int // heap index

// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(now, arg) in the timer goroutine, so f must be
// a well-behaved function and not block.
when int64
period int64
f func(interface{}, uintptr)
arg interface{}
seq uintptr
}

timer 是以 heap 的形式管理的,heap 是个完全二叉树,用数组就可以存下,i 是 heap 的 index.
when 是 goroutine 被唤醒的时间,period 是唤醒的间隙,下次唤醒的时间是 when+period, 依次类推。
调用函数f(now, arg), now 是时间戳。

1
2
3
4
5
6
7
8
9
var timers struct {
lock mutex
gp *g
created bool
sleeping bool
rescheduling bool
waitnote note
t []*timer
}

整个的 timer heap 由timers管理.gp 指向的是一个g的指针,也就是调度器当中的 G 结构,一个 goroutine 的状态维护的结构,指向的是时间管理器的单独的一个 goroutine, 这个不属于用户而是运行时启动的 goroutine.(当然只有使用了 timer 才会启动,不然不会有这个 goroutine).lock保证 timers 的线程安全.waitnote 是一个条件变量。

看一下addtimer函数,他是整个计时器开始的入口,当然本身只是加了锁。

1
2
3
4
5
6
func addtimer(t *timer) {
lock(&timers.lock)
addtimerLocked(t)
unlock(&timers.lock)
}

再看addtimerLocked可以看到就是入 heap 的过程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Add a timer to the heap and start or kick the timer proc.
// If the new timer is earlier than any of the others.
// Timers are locked.
func addtimerLocked(t *timer) {
// when must never be negative; otherwise timerproc will overflow
// during its delta calculation and never expire other runtime·timers.
if t.when < 0 {
t.when = 1<<63 - 1
}
t.i = len(timers.t)
timers.t = append(timers.t, t)
siftupTimer(t.i)
if t.i == 0 {
// siftup moved to top: new earliest deadline.
if timers.sleeping {
timers.sleeping = false
notewakeup(&timers.waitnote)
}
if timers.rescheduling {
timers.rescheduling = false
goready(timers.gp, 0)
}
}
if !timers.created {
timers.created = true
go timerproc()
}
}

先从下面的if !timers.created分支看起,如果 timers 对应没有创建就 go 一个 timerproc, timeproc 的定义如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// Timerproc runs the time-driven events.
// It sleeps until the next event in the timers heap.
// If addtimer inserts a new earlier event, addtimer1 wakes timerproc early.
func timerproc() {
timers.gp = getg()
for {
lock(&timers.lock)
timers.sleeping = false
now := nanotime()
delta := int64(-1)
for {
if len(timers.t) == 0 {
delta = -1
break
}
t := timers.t[0]
delta = t.when - now
if delta > 0 {
break
}
if t.period > 0 {
// leave in heap but adjust next time to fire
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(0)
} else {
// remove from heap
last := len(timers.t) - 1
if last > 0 {
timers.t[0] = timers.t[last]
timers.t[0].i = 0
}
timers.t[last] = nil
timers.t = timers.t[:last]
if last > 0 {
siftdownTimer(0)
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
unlock(&timers.lock)
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq)
lock(&timers.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
timers.rescheduling = true
// 如果没有 timers 剩余,就让 G 进入 sleep 状态。
// goparkunlock 的作用是解开 G 和 M 的联系,让 goroutine sleep 而 M
// 寻找下一个 G 来执行。
goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
timers.sleeping = true
// 清零 waitnote.
noteclear(&timers.waitnote)
unlock(&timers.lock)
// 调用 M 结构对应的 OS-dependent, OS-thread 的信号量让 M 进入 sleep 状态。
// 只会在超时的时候或者条件变量 waitnote 触发才会被唤醒。
notetsleepg(&timers.waitnote, delta)
}
}

timeproc 的主体就是从最小堆中取出 timer 然后回调函数,如果 period 大于 0 就把 timer 的 when 修改并且存回 heap 并调整,如果小于 0 就直接删除(对应的分别是标准库的 ticker 和 timer), 进入 OS 信号量中睡眠等待下次处理,(当然可以被 waitnote 变量唤醒), 这样一直循环,说白了就是靠信号量的超时机制来实现了运行时的 sleep[8]. 然后当完全没有 timer 剩余的时候,G 结构表示的 goroutine 进入睡眠状态,承载 goroutine 的 M 结构所代表的 OS-thread 会寻找其它可运行的 goroutine 执行. 具体关于运行时调度的细节可以参考雨痕大叔的笔记 [7].

看完这一路 case, 回到addtimerLocked, 当加入一个新的timer时,会作依次检查,也就是说最新插入的timer是在堆顶的话,会唤醒睡眠的 timergorountine 开始从 heap 上检查过时的timer并执行. 这里的唤醒和之前的睡眠是两种对应的状态timers.sleeping是进入了 M 的 os 信号量睡眠。
timers.rescheduling是进入了 G 的调度睡眠,而 M 并没有睡眠,让 G 重新进入可运行状态. 时间超时和新 timer 的加入,两者结合成为了timer运行时的驱动力。

看完了 time 的实现,再回过头来回答文章最初的问题”timer 可以有多精确”, 其实和两个原因有关,一个是操作系统本身的时间粒度,一般是 us 级别的,时间基准更新是 ms 级别的,时间精度能到 us 级别,另外一个就是timer本身的 goroutine 的调度问题,如果运行时的负载过大或者操作系统本身的负载过大,会导致timer本身的 goroutine 响应不及时,导致 timer 触发并不及时,可能导致一个 20ms 的 timer 和一个 30ms 的 timer 之间像是并发的一样(这也是笔者碰到的问题,特别是一些被 cgroup 限制了的容器环境,cpu 时间分配很少的条件下), 所以有时候我们不能过分相信 timer 的时序来保证程序的正常运行.NewTimer的注释也强调了”NewTimer creates a new Timer that will send the current time on its channel after at least duration d.”, 没有人能保证 timer 按点执行,当然如果你的间隔离谱的大的话其实就可以忽略这方面的影响了:)


参考链接:

1.  Go Assembly https://golang.org/doc/asm
2.  http://stackoverflow.com/questions/19938324/what-are-vdso-and-vsyscall
3.  http://www.win.tue.nl/~aeb/linux/lk/lk-4.html
4.  http://book.douban.com/subject/3652388/
5.  http://linuxmogeb.blogspot.com/2013/10/how-does-clockgettime-work.html
6.  时钟源 http://blog.csdn.net/droidphone/article/details/7975694
7.  Go 源码剖析 https://github.com/qyuhen/book/blob/master
    /Go%201.5%20%E6%BA%90%E7%A0%81%E5%89%96%E6%9E%90.pdf
8.  带超时的信号量的实现 http://lxr.free-electrons.com/source/kernel/locking/semaphore.c#L204
9.  内核的时钟源 http://blog.csdn.net/droidphone/article/details/7975694

接着上篇文章阐述了VFS以后,这篇文章主要想讲述一下在内核当中如何创建一个文件系统.其实根据上一篇博客来说,我们的文件系统主要能够满足VFS的抽象,就可以在内核中构建一个自己的文件系统.一个文件系统满足的功能其实就是针对文件的增删改查,目录的管理,还有链接等等,这是从用户的角度来看,而文件系统本身也要有自己的状态信息,维护在超级块里,可以被挂载,然后向下要提交IO请求(一般是磁盘也可以是网络,甚至是内存).这里的实现我们选择在内存当中实现一个文件系统.

代码参考了《Linux内核探秘》[1],以及内核代码ramfs的部分[2],基于内存构建一个文件系统.完整代码可以在这里查看,代码是基于2.6.32的内核的,当中涉及了一些模块编程的内容可以参考”The Linux Kernel Module Programming Guide”[3]

为了实现一个文件系统,首先我们需要定义一个文件系统.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <linux/module.h>
#include <linux/fs.h>

static struct file_system_type au_fs_type = {
.owner = THIS_MODULE,
.name = "aufs",
};


static int __init aufs_init(void)
{
register_filesystem(&au_fs_type);
return 0;
}

static void __exit aufs_exit(void)
{
unregister_filesystem(&au_fs_type);
}

module_init(aufs_init);
module_exit(aufs_exit);
MODULE_LICENSE("GPL");

执行make,insmod aufs.ko,然后cat /proc/filesystems | grep aufs就能看到aufs名列其中,说明我们的文件系统已经注册到了内核当中.接下来我们需要挂载文件系统,但是挂载的过程中会导致panic,应为我们还没有定义文件系统super_block的获取和释放函数.
挂载文件系统的时候依赖这两个函数,不然就会导致空指针.接下来我们定义文件系统的两个接口.”kill_sb”使用的是内核函数kill_litter_super,它会对super_block的内容进行释放.”get_sb”这个接口调用了”aufs_get_sb”函数,这个函数也是调用了内核函数get_sb_nodev,这个函数会创建对应的super_block,这个函数针对的是不依赖/dev的文件系统,如果依赖/dev的话,需要调用别的函数,另外会根据/dev对应的设备获取super_block(比如说ext4会读对应的被格式化后的块设备的头来实例化超级块).我们需要传入一个函数指针用于填充空白的super_block,就是”aufs_fill_super”,然而”aufs_fill_super”也调用了内核函数.

看一下具体代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static int aufs_fill_super(struct super_block *sb, void *data, int silent)
{
struct inode *inode = NULL;
struct dentry *root;
int err;

sb->s_maxbytes = MAX_LFS_FILESIZE;
sb->s_blocksize = PAGE_CACHE_SIZE;
sb->s_blocksize_bits = PAGE_CACHE_SHIFT;
sb->s_magic = AUFS_MAGIC;

inode = new_inode(sb);
if (!inode) {
err = -ENOMEM;
goto fail;
}
inode->i_mode = 0755;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
inode->i_mode |= S_IFDIR;
inode->i_fop = &simple_dir_operations;
inode->i_op = &simple_dir_inode_operations;
// inc reference count for ".".
inc_nlink(inode);

root = d_alloc_root(inode);
sb->s_root = root;
if (!root) {
err = -ENOMEM;
goto fail;
}
return 0;
fail:
return err;
}

为了填充super_block,需要初始化sb以及创建根目录的inode和dentry.s_blocksize指定了文件系统的块大小,一般是一个PAGE_SIZE的大小,这里的PAGE_CACHE_SIZEPAGE_SIZE是一样的,PAGE_CACHE_SIZE_SHIFT是对应的位数,所以
s_blocksize_bits是块大小的bit位位数. 接着是inode初始化,new_inode为sb创建一个关联的inode结构体,并对inode初始化,包括uid,gid,i_mode.对应的i_fopi_op使用了内核默认的接口simple_dir(_inode)_operations,后面会仔细讨论,这里先加上方便展示代码,如果对应的接口未定义的话,初始化的时候文件系统根目录会出现不会被认作目录的情况.

接下来安装模块,然后挂载文件系统,mount -t aufs none tmp,因为我们的文件系统没有对应的设备类型所以参数会填none,对应的目录是tmp,这样tmp就成为了aufs的根目录,如果ls一把tmp,里面是什么都没有的,我们cd tmp && touch x返回的结果是不被允许,因为我们还没有定义对应的接口,不能创建文件.

我们继续,我们让这个文件系统可以创建目录,那我们需要定义目录inode的接口,一组inode_operations和一组file_operations.以下是实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static struct inode *aufs_get_inode(struct super_block *sb, int mode, dev_t dev)
{
struct inode *inode = new_inode(sb);
if (inode) {
inode->i_mode = mode;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
switch (mode & S_IFMT) {
case S_IFDIR:
// TODO:
inode->i_op = &aufs_dir_inode_operations;
inode->i_fop = &simple_dir_operations;
/* i_nlink = 2 */
inc_nlink(inode);
}
}
return inode;
}

static int
aufs_mknod(struct inode *dir, struct dentry *dentry, int mode, dev_t dev)
{
struct inode *inode = aufs_get_inode(dir->i_sb, mode, dev);
int error = -ENOSPC;
if (inode) {
// inherits gid.
if (dir->i_mode & S_ISGID) {
inode->i_gid = dir->i_gid;
if (S_ISDIR(mode)) inode->i_mode |= S_ISGID;
}
d_instantiate(dentry, inode);
// get dentry reference count.
dget(dentry);
error = 0;
dir->i_mtime = dir->i_ctime = CURRENT_TIME;
}
return error;
}

static int
aufs_mkdir(struct inode *dir, struct dentry *dentry, int mode)
{
int reval;
retval = aufs_mknod(dir, dentry, mode | S_IFDIR, 0);
printk("aufs: mkdir");
if (!retval)
inc_nlink(dir); // .
return retval;
}

static int
aufs_create(struct inode *dir, struct dentry *dentry, int mode, struct nameidata *nd)
{
return aufs_mknod(dir, dentry, mode | S_IFREG, 0);
}
static const struct inode_operations aufs_dir_inode_operations = {
.create = aufs_create,
.lookup = simple_lookup, // get dentry.
.link = simple_link, // same inode, different dentry.
.unlink = simple_unlink,
.symlink = aufs_symlink, // 之后再讲,目前没有做mapping会panic.
.mkdir = aufs_mkdir,
.rmdir = simple_rmdir,
.mknod = aufs_mknod,
.rename = simple_rename, // exchange dentry and dir.
};

其实很简单,aufs_get_inode只创建目录类型的inode,并且赋值对应的函数指针.file_operations使用的默认接口,这个后面再提,inode_operations主要是inode的创建,aufs_create和aufs_mkdir都是对aufs_mknod针对不同mode的封装,aufs_symlink暂时不讲,因为inode还没有做mapping,软链的时候不可写会导致panic.进行上面类似的编译和挂载以后我们就能创建简单文件和目录了,但是创建的文件不能做任何操作,因为我们没有定义对应的接口.

挑个接口说一下,比如link接口就是创建了一个dentry指向了同一个inode,并且增加inode的引用计数,unlink就是把dentry删掉,inode保留.

1
2
3
4
5
6
7
8
9
10
11
int simple_link(struct dentry *old_dentry, struct inode *dir, struct dentry *dentry)                                
{
struct inode *inode = old_dentry->d_inode;

inode->i_ctime = dir->i_ctime = dir->i_mtime = CURRENT_TIME;
inc_nlink(inode);
atomic_inc(&inode->i_count);
dget(dentry);
d_instantiate(dentry, inode);
return 0;
}

软链有点复杂,所以放到后面讲.

当我们能够完成目录和文件的创建和删除之后,我们可以继续文件的读写了,换句话说我们要定义普通文件的inode的file_operations接口.
为了能够添加文件我们增加如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static const struct address_space_operations aufs_aops = {
.readpage = simple_readpage,
.write_begin = simple_write_begin,
.write_end = simple_write_end,
/* .set_page_dirty = __set_page_dirty_no_writeback, 内核私有函数 */
};

static const struct file_operations aufs_file_operations = {
.read = do_sync_read, // file read get mapping page and copy to userspace.
.aio_read = generic_file_aio_read,
.write = do_sync_write,
.aio_write = generic_file_aio_write,
.mmap = generic_file_mmap,
.fsync = simple_sync_file,
.splice_read = generic_file_splice_read,
.splice_write = generic_file_splice_write,
.llseek = generic_file_llseek,
};

static const struct inode_operations aufs_file_inode_operations = {
.getattr = simple_getattr,
};

并把aufs_get_inode改成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static struct inode *aufs_get_inode(struct super_block *sb, int mode, dev_t dev)
{
struct inode *inode = new_inode(sb);
if (inode) {
inode->i_mode = mode;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
inode->i_mapping->a_ops = &aufs_aops;
mapping_set_gfp_mask(inode->i_mapping, GFP_HIGHUSER);
mapping_set_unevictable(inode->i_mapping);
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
switch (mode & S_IFMT) {
default:
init_special_inode(inode, mode, dev);
break;
case S_IFDIR:
inode->i_op = &aufs_dir_inode_operations;
inode->i_fop = &simple_dir_operations;
/* i_nlink = 2 for "." */
inc_nlink(inode);
break;
case S_IFREG:
inode->i_op = &aufs_file_inode_operations;
inode->i_fop = &aufs_file_operations;
break;
case S_IFLNK:
inode->i_op = &page_symlink_inode_operations;
break;
}
}
return inode;
}


这样以后我们就能对文件进行读写了,实际上文件的读写首先要依赖于mmap操作,把对应的页映射到虚拟内存当中来进行读写.编译并添加模块再挂载以后我们发现touch的文件可以读写了.
现在具体举一段代码路径分析一下,从read开始.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos)                                
{
struct iovec iov = { .iov_base = buf, .iov_len = len };
struct kiocb kiocb;
ssize_t ret;

init_sync_kiocb(&kiocb, filp);
kiocb.ki_pos = *ppos;
kiocb.ki_left = len;

for (;;) {
ret = filp->f_op->aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
if (ret != -EIOCBRETRY)
break;
wait_on_retry_sync_kiocb(&kiocb);
}

if (-EIOCBQUEUED == ret)
ret = wait_on_sync_kiocb(&kiocb);
*ppos = kiocb.ki_pos;
return ret;
}

read其实还是依赖了aio_read的接口,只不过加上了wait的部分,保证同步,kiocb是”kernel I/O control block”记录I/O的信息,这里标记了偏移和剩余量.
再看aio_read的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

ssize_t
generic_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
{
struct file *filp = iocb->ki_filp;
ssize_t retval;
unsigned long seg;
size_t count;
loff_t *ppos = &iocb->ki_pos;

count = 0;
retval = generic_segment_checks(iov, &nr_segs, &count, VERIFY_WRITE);
if (retval)
return retval;

/* coalesce the iovecs and go direct-to-BIO for O_DIRECT */
if (filp->f_flags & O_DIRECT) {
loff_t size;
struct address_space *mapping;
struct inode *inode;

mapping = filp->f_mapping;
inode = mapping->host;
if (!count)
goto out; /* skip atime */
size = i_size_read(inode);
if (pos < size) {
retval = filemap_write_and_wait_range(mapping, pos,
pos + iov_length(iov, nr_segs) - 1);
if (!retval) {
retval = mapping->a_ops->direct_IO(READ, iocb,
iov, pos, nr_segs);
}
if (retval > 0)
*ppos = pos + retval;
if (retval) {
file_accessed(filp);
goto out;
}
}
}

for (seg = 0; seg < nr_segs; seg++) {
read_descriptor_t desc;

desc.written = 0;
desc.arg.buf = iov[seg].iov_base;
desc.count = iov[seg].iov_len;
if (desc.count == 0)
continue;
desc.error = 0;
do_generic_file_read(filp, ppos, &desc, file_read_actor);
retval += desc.written;
if (desc.error) {
retval = retval ?: desc.error;
break;
}
if (desc.count > 0)
break;
}
out:
return retval;
}

struct iovec是一个数组每个元素是一段数据的开始和长度,这个结构和后面的io有关.
如果是不是DIRECT_IO的话,就会把iovector组装成read_descriptor_t传入do_generic_file_read当中.do_generic_file_read的读的具体过程是

1
2
3
4
5
6
7
8
9
10

struct address_space *mapping = filp->f_mapping;
...
for {
index = *ppos >> PAGE_CACHE_SHIFT; // 循环读取ppos,ppos每次都会更新,然后右移,相当于模一个页的大小,找到以页偏移的单位.
find_get_page(mapping,index); // 获取对应的page引用.
mapping->a_ops->readpage(filp, page); // 读取对应的页.
...
page_cache_release(page);
}

一般是通过mapping获取页缓存中的页并且读到用户空间中,在完成之后释放引用.读页的函数就是把page缓存刷掉.

1
2
3
4
5
6
7
8
int simple_readpage(struct file *file, struct page *page)                                 
{
clear_highpage(page);
flush_dcache_page(page);
SetPageUptodate(page);
unlock_page(page);
return 0;
}

获取页是通过mapping的radix_tree来找到对应的page引用.
写的过程也类似,同步写也调用了异步写的接口,最后把用户空间的数据拷贝到页当中.address_space_operations就是对应vma映射的接口.

其中page <-> virtual_address的转换依赖于 kmap把页转换成虚拟地址或者逻辑地址,然后对应的读写操作最后都变成读写虚拟内存,或者逻辑内存.

单就构造一个文件系统来说,目的已经达到了,但是凡事不能不求甚解,下一篇博客准备记录一下内存管理相关的内容.

  1. 《Linux 内核探秘》http://book.douban.com/subject/25817503/
  2. ramfs目录 http://lxr.free-electrons.com/source/fs/ramfs/
  3. 内核模块编程教程 http://www.tldp.org/LDP/lkmpg/2.6/html/

UNIX 的哲学之一就是一切皆文件,所以可以看出文件系统在操作系统层面是非常重要的,很多基本单元都是通过文件系统展开的,所以了解文件系统有利于分析整个操作系统的脉络。

在 Linux 当中文件系统千奇百种,比较常见的有 EXT3、EXT4,还有基于内存的 ramfs、tmpfs 和基于网络的 nfs,和基于用户态的 fuse,当然 fuse 应该不能完全的文件系统,只能算是一个能把文件系统实现放到用户态的模块,满足了内核文件系统的接口,他们都是文件系统的一种实现。这个 wiki 上列出了很多 Linux 的文件系统类型。

对于这些文件系统,Linux 做了一层抽象就是 VFS 虚拟文件系统,这个其实就是软件设计必然的过程,对于不同的实现规定统一的接口,也就是定义与实现分离,如果想要自己实现一个文件系统的话只要实现一个满足 VFS 层的文件系统就能加入到内核当中。所以其实内核的文件和我们普通理解的文件其实有点不一样,这里的文件更像是一个接口,只不过最初是从磁盘上的文件衍生过来的,最后抽象成了一种可以对接各种功能的接口。

下面就开始剖析 VFS 的主要内容。

VFS 有几个必不可少的元素,filesystem_type,super_block, dentry, inode, file, vfsmount, nameidata 等等。这里抛开具体的代码,先从概念入手,下一篇博文,我会实现一个简单的文件系统,再基于代码分析具体的概念。

首先是 filesystem_typesuper_block,这两者的关系有点类似于软件中的 classobject 的关系。在内核当中有一个全局的file_systems链表串接了所有的文件系统类型的代表filesystem_type,对于文件系统的注册和删除就是在链表当中增加和删除对应的节点。而super_block就是一个文件系统的实例。本身也是通过全局链表串连起来的。filesystem_type本身非常简单,定义了获取和删除super_block的接口,和一些共同的相关信息。

对于super_block来说超级块定义了文件系统的具体信息和对应文件系统的接口,比如write_super,alloc_inode,sync_fs等,这些都是有具体的文件系统实现的。所有的inode都链接到了super_block

对于文件系统来说挂载点是个很有意思的点,在内核当中挂载点用vfsmount表示,挂载点是文件系统之间的衔接部分,如果要添加一个新的文件系统势必要将文件系统挂载在某个目录下面使得文件系统生效,vfsmount就是这样一个接口。当文件系统挂载以后原目录将不可见。vfsmount的主要内容是vfsmount的拓扑关系以及指向的目录和super_block。所以从宏观的角度来说,整个文件系统的组织是如图这样的。

fs.png

这是文件系统本身这个结构在操作系统里的组织结构,接下来分析文件系统满足 VFS 要包含哪些内容。

第一个要说的就是 dentry,dentry 表现的是文件在文件系统中的树状关系,dentry 也要实现相应的接口,包括 d_deleted_released_compare等接口。
dentry 代表的是目录结构中的一个文件,而文件其实就是没有子目录的文件。dentry 链接到了超级快和父 dentry 和子 dentry 以及对应的 inode。

接下来是 inode, inode 本身代表的一个文件,保存的信息很多,包括文件的大小,创建时间,文件的块大小等参数,以及文件的读写缓存等信息,还要定义对应的针对文件的函数接口,包括增删改查等等。没有文件名,因为它代表的是文件的原信息,具体的路径的表示依赖dentry

dentry 和 inode 的关系是多对一的,即多个 dentry 可以指向同一个文件,这和 linux 当中的文件链接有关。

接下来就是 file,file 虽然叫 file 但是对应的却不像 inode 一样,它对应的是一个进程所打开的文件。例如两个不同的进程打开了磁盘上的同一个文件,那么他们对应的 inode 是相同的,这也是 inode 意义。但是不同进程之间的 file 不是同一个引用,file 本身的结构还是和文件操作有关的。

整个关系如图所示。

file.png

对应的文件目录如下

1
2
3
4
5
6
.
├── a
│   └── ab
└── b
├── bc -> ../a/ab
└── bd

图中展示了一个硬链接代表着 bc 和 ab 的 dentry 指向了同一个inode,硬链接是不同 dentry 指向同一个inode,这也是为什么硬链接不能夸文件系统,因为inode是属于特定文件系统的。
图中其实inode是串联在super_block上的,super_block维护了文件系统中`inodes,因为画上去太乱了所以省略了。

VFS 的整体结构就是这样,接下来简要地说几个具体内容:

所谓的打开文件描述符其实就是进程的 files 数组这个文件描述符表的下标,通过对应的 fd 就能找到 file 结构。例如 dentry,vfsmount 这样的结构都有一个 hashtable 来缓存搜索的内容,这样就能加开目录的遍历搜索。

inode 其实也有一个全局的 hashtable 用于快速查找,inode 本身能代表的东西很多,一切皆文件就体现在他身上,他既可以表示一个 socket,也可以表示一个管道,还可以表示块设备、字符设备,然后就是普通文件了。

以上讲的就是整个内核当中 VFS 层的抽象,并没有牵涉到具体的文件系统,在下一篇博客我将会实现一个简单的文件系统,不就具体的代码分析,来熟悉这里提到的这些概念。
其实了解了概念以后,就会给人一种不过如此的感觉,真正值得玩味的是下层的实现,这也是我后面的博客将会介绍的内容。比如基于磁盘的文件系统更多的是要关注 I\O 层的东西。

其实内核的 I\O 路径是这样的:user space -> VFS -> FS -> I\O layer -> I\O scheduler(optional) -> block_driver -> block_device, 一个 I\O 经过了这些才真正到达了对应的存储上。

一个用户态的系统调用先通过 VFS 找到对应的文件系统再向下传递 I\O,这是 I\O 的一般路径。所以说对于用户来说,一切都是操作文件了。

首先抛出一个问题,在Go中当我们想实现一个集合的时候,可以用map来实现.而map本身就可以通过”comma ok”机制来获取该建是否存在,例如_ , ok := map["key"],如果没有对应的值,ok为false,以此就可以实现集合.有时候我们会选择map[string]bool这类方式来定义这个集合,但是因为有了”comma ok”这个语法,还可以定义成map[string]struct{}的形式,值不再占用内存.

后者可以表示两种状态有或者无,而前者其实有三种状态,有的时候表示true或者false,或者没有.
很多时候我们会选择map[string]struct{}来表示集合的实现,但是这样真得值得么?

这里要从map的实现说起.map的实现是一个hash表.表结构由两个结构体表示,分别是hmap和bmap,前者表示这个map结构,后者表示了map的hash表下的bucket结构.

一切要从map的实现开始讲起.

map是由桶数组组成的,每个桶的表示如下.

1
2
3
4
5
6
7
8
// A bucket for a Go map.
type bmap struct {
tophash [bucketCnt]uint8
// 这里的bucetCnt是8,是个固定值,每个桶跟8个k-v对.
// 先是8个key,后是8个value.
// 最后是一个overflow指针指向串联的bucket.
}

而hmap表示如下,其实就是一个头信息.

1
2
3
4
5
6
7
8
9
10
11
12
13
// A header for a Go map.
type hmap struct {
flags uint8 // 一些标志j
B uint8 // bucket数量的log_2
hash0 uint32 // hash 种子

buckets unsafe.Pointer // buckets 数组的指针.
oldbuckets unsafe.Pointer // 增长时需要被替换的数组的指针.
nevacuate uintptr // 被提升的桶的数量(增长时,桶会从oldbuckets移到buckets当中)

overflow *[2]*[]*bmap // 指向串联桶的指针.
}

bmap这个结构类似于C的定义,后面其实还有一些成员,但是需要动态申请(runtime自己的malloc),没有定义.
一个bmap会有8个字节的tophash用于定位到桶中对应的entry.每个entry表示一个k-v,这个tophash是key的hash的高位字节.
而定位桶用的是hash的低位字节.在go中每个类型都会有自己的hash方法.

为了防止对齐问题,所以先排8个key,再排8个value.举个例子如果是map[int8]int64,那么k-v排在一起的话,就会空7个字节,非常浪费.
但是先排8个int8的话就不会出现对齐的问题.最后一个结构是桶指针,指向串联的桶.

而整个hmap是一个bmap的数组,主要是管理信息.

内存分布如图.

hmap

hmap的增长是依赖于负载系数的,在go里面负载系数(loadFactor)是6.5,这个值是一个通过测试得到的比较理想的一个值.
这个值的意思表示的是,每个桶平均装下的entry数量是6.5个,之前我们提到了每个桶的大小是8.也就是说bucket一般都不会装满.

如果要负载系数高,也就是桶尽量装满,就会导致hash碰撞率较高(可以hash到的空间不大),这样会产生过多的overflow的bucket.
如果要负载系数低,hash碰撞率比较低,这样会使得空间很大,导致真正利用率(存入的数据/全部bucket空间)相对变小.
所以综合情况负载系数6.5是一个比较理想的值,这也是go现在采用的值.

这个可以通过决定增长的关键代码发现:

1
2
for ; hint > bucketCnt && float32(hint) > loadFactor*float32(uintptr(1)<<B); B++ {
}

2^B是桶的数量,hint是申请的map的大小,bucketCnt就是8,因为预先会分配一个桶,如果一个桶都不会超过的话就不增加了.
关键是hint要保证大于负载系数*桶的数量,换句话说要保证平均每个桶装6.5个k-v能容得下hint这么多对k-v.
上面说得是静态分配,动态增长的时候oldbuckets是buckets的一半,也就是翻倍增长.

hmap在增长的时候会把bueckets变成oldbuckets然后再申请新的buckets.buckets中的k-v是不会移动到别的桶当中去的.
这样保证了遍历时候的一致性.hashmap按照range遍历的时候是按bucket数组的一个bucket开始然后bucket的串联bucket再回到
bucket数组的下个元素依次遍历.

删除非常简单,仅仅是把对应的key和value置为空.

现在把map的实现说清楚以后我们可以算一笔账.假设我们的map定义为map[string]struct{}{},
在64bit的操作系统下面一个桶的大小是 8 + 816 + 80 + 8 = 144个字节(string 是常量只含一个指针和一个len值).
如果是map[string]bool{},那么一个桶的大小是 8 + 816 + 81 = 152个字节.

换算下来节省的空间大概是5.2%,考虑到负载系数是6.5,换成百分比是81.25%这个程度,省8个字节的事情完全是多余的.

与其牺牲语义取巧节省这几个字节不如定义一个表示清晰的map来的更直接.
所以我的结论是map[string]struct{}并不可取.

今天hack标准库里的debug/elf,里面有一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

// A Section represents a single section in an ELF file.
type Section struct {
SectionHeader

// Embed ReaderAt for ReadAt method.
// Do not embed SectionReader directly
// to avoid having Read and Seek.
// If a client wants Read and Seek it must use
// Open() to avoid fighting over the seek offset
// with other clients.
io.ReaderAt
sr *io.SectionReader
}

这个结构很特别的一点是为了能够只使用SectionReader的ReadAt方法而不使用其他方法防止Seek出现竞争问题把一个interface io.ReaderAt包含了进来.
io.ReaderAt是包含ReadAt方法的interface,以此目的来达到屏蔽其他方法的目的.如果真的要使用Read+Seek需要另外再调用Open函数.

1
2
3
// Open returns a new ReadSeeker reading the ELF section.
func (s *Section) Open() io.ReadSeeker { return io.NewSectionReader(s.sr, 0, 1<<63-1) }

这里可以看到就是new了一个SectionReader出来用,这样就防止了竞争的问题.

实现的基础就是私有结构的方法是不暴露的,而共有借口的方法是可以暴露的,这样怎么做到呢?
我把代码做了一个抽象,写了一个demo如下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

package main

import (
"fmt"
)

type S struct {
MethodsMask
i Implementer
}

func NewS() *S {
s := new(S)
s.MethodsMask = s.i
return s
}

type MethodsMask interface {
F1()
}
type Implementer struct {
}

func (i Implementer) F1() {
fmt.Println("F1")
}

func (i Implementer) F2() {
fmt.Println("F2")
}

func main() {
s := NewS()
s.F1()
}

这个程序运行的结果就是F1,注意最关键的部分就在于函数NewS.
把实现者赋值给了接口,因为接口是公开的,所以对应的方法就变成共有的了.
而除此之外的方法还是不能被调用.这样就把一个实现者裁剪了.
当然了,这样的裁剪主要还是为了Seek的竞争.

我觉得适用的场景就是两个借口的并集构成了一个结构体的方法,但是这两个借口泾渭分明的时候就可以这样做.
就像标准库里面分成了io.ReaderAtio.ReadSeeker一样.

很意外地在九月底结束了在阿里的实习,五个月的时间的确收获了很多.其中被实习离职然后惨淡度过十月份先按下不表.主要想总结一下自己第一次工作之后
对于整个过程的思考.

当初兴冲冲的来到内核组,的确是因为觉得搞内核很厉害,感觉很有前途,所以充满期待地就来了.

先对工作做一个总结吧.第一个任务就是在ksplice [1] 上面搭建一个热升级补丁的工具,当然通过这个主要是使用了一些RPC的调用,在多个主机上面制作热升级的补丁,完成了一个简单的小型分布式系统,当然很简单就是,完全是一个单点的架构.这当中看了一个MIT公开课的讲义,算是一个初体验吧,也算是揭开了分布式系统的面纱,不再感到那么神秘.之后开始接触Bcache [2] ,这个做在内核态中的块设备缓存,当然这个部分对于内核初学者来说的确很难看懂,其中C语言的知识也好,内核的知识也好都不得不恶补了很多,基本上市面上那些带缩写的内核的书籍我都看了个遍,然而深入度还是欠缺.这段时间基本没干什么活,主要是学习,也过得比较轻松.最后是参与到libvirt[3]/QEMU[4] 这一套东西里面,主要是做了一下IO路径的切换,从QEMU->分布式网络存储转化为了QEMU->vhost-blk->Bcache->分布式网络存储的一个热迁移方案的解决,这里面主要是做了一些C的用户态的工具,熟悉了一下C语言,也开始了解了虚拟化的一些内容,当我正在准备从最基础的 vring [5] 部分从里往外看的时候我离开了阿里.

公司内部的一些事情不是我想说的重点.然而失业的这段时间也是疲于奔命,一个是重新找实习还有一个就是做点兼职填补一下经济空白.之前工作的时候也是上完班就回来看看书补补内核的知识,或者看看B站虚度光阴.好像没有碰到一个契机来反思自己.

总结的说,实习的这段经历让我感到失落,工作本身磨灭了我对编程的热情,我不太能总结,但包括了一些因素.首先是工作本身很乏味,不管是作热升级补丁也好,参与热迁移的方案也好,都是在做工具,但是在我眼里,做工具只是一个时间问题,我希望这个过程有一定程度是个未知数,是一片可以探索和发现的领域,然后工作却让我感觉按部就班做完就好了的颓废感.这一点让我感觉工作本身成为了一件机械性的重复性的劳动.

其次是不管是libvirt也好QEMU也罢或者是Bcache这些东西,都是fork出来的,很多东西成为了组内的问题,不再变得open了,也就是说很多问题的确只能问组里人可能才能知道,因为组里的分支和上游还有蛮多区别的.我自己还是一个蛮有自信解决问题的人,但是这样导致一个很糟糕的情况是,当我们做的东西不open的时候,我们讨论问题的范围也会缩小,也就是说我问问题的话只能问组里的人,因为很多改动都是为了适应业务的需求,而且内部的代码拿出去问也的确不可能.而且组里的人都有工作都很忙,可能一些比较直接的问题比较好回答,具体的问题还是要靠我自己探索,这样我就感觉自己陷入了一个死循环.让我感觉自己的活动范围瞬间小了很多.当然这也跟我涉及东西的确有点复杂有关.

还有就是一些零碎的东西了,我记得有一次和师兄讨论问题,主要是在跟我讲分布式存储上加缓存的业务的时候,后面有个人说”讨论这么热烈,不就是个实习生么”,当时我听在耳里,嘴上没有说什么,心里却五味杂陈.当时我想,换句话说在大部分人眼里我只是个实习生罢了.写到这里也是感觉自己不够争气呀.

这是我实习的一些感受.我对比了一下我在学校的状态,感觉有些不同.

首先是我已经修完所有的课程了,有大把空余的时间钻研自己感兴趣的东西,这也是我为什么单独写了一个OJ,从头到尾自己维护的原因,而且我把写在我的简历最前面是因为这个过程是我觉得最有意思的一件事情,从未知到已知,不断地请教别人,不断地尝试和寻找解决方案,最开始做沙箱的那段时间我是快乐的,因为我编程的同时感觉收获很大,而且这个东西从头到尾都是我自己写的,所以我写起来也很舒服,不像上班的时候一样光看代码就要看半天.直到后来把上层的WEB的内容写了个大概我的热情就开始渐渐消减,然后我又开始寻找新的可以钻研的东西,做沙箱的过程中让我真正体会到真正了解操作系统多么重要,不是调用那些系统调用这么简单,我记得有一次发邮件和别人讨论,因为当时那个人是也做了一个类似的东西,但是没有那么完全,我借鉴了他的代码,后来发邮件和他聊,他也在做类似的东西,但是是闭源的不方便拿给我看.但是他跟我聊了一些他的手段提到了 docker [6] 也提到准备利用 namespace[7]/cgroup[8] 做一个类似 docker 的沙箱的想法,当时我就觉得”卧槽,确实牛逼,这些我都不懂,我还有很长的路要走呀”.

还有就是之前提到的,我可以自由地在segmenfault或者stackoverflow这些网站上面问问题,因为代码都在那,有问题就可以直接问就是了.我感觉大学学到东西最多的地方的确就是网络.这是说我自己写着玩的东西,如果一个项目和 upstream 越走越远的话,很产生很多的问题,而这些问题只会变得越来越私有,不再适合公开讨论了.

另外就是自由度上,在学校随便睡到几点,起来之后开会儿书,更新几段代码再打打球一天就过去了轻松无比.然而在学校我也感受到了一个瓶颈.那就是学而不精.
很多东西想深入却没有门道,就举 Bcache 这个例子吧,不在工业条件下根本没办法研究这些内容,好歹有两块硬盘吧,一块得是快点的SSD才有效果吧,又或者是简单的搭个集群,这件事一个笔记本很难体验到,开个虚拟机都卡的飞起.还有就是没有一个工业环境,很多时候不知道一个更大数量级的业务到底是什么情况,还是只能用一台服务器起个进程跑服务的状态.这都是制约,所以这也促使着我想去公司看看.因为我本身就打算本科毕业就工作,所以偏纯科研的内容我的确没有什么准备.特别是大数据现在这么火的时代,我反倒不太喜欢去搞数据挖掘什么的,我更对底层支撑的分布式服务,如何让整个分布式的服务器更好更快地支撑上面的运算需求更感兴趣.

除此之外,在学校经常在社区里面说话什么的, github也经常更新(当然进了公司以后才发现github很绿并没什么卵用).但是毕竟做程序员就是搞技术活的,口碑也很重要.光说不练假把式,代码亮出来几斤几两就知道了.当然这里面还有一层自我满足的东西在里面,自己写的东西有人观赏,甚至有人学习,更加的是有人用,都是很有成就感的一件事情.而且成就感这东西的确占了我对编程兴趣的一大半,这个东西在公司就感觉不到.那个时候就是一顿恶补,但是内核部分的代码还是不能很好的掌握,自己也懒得更新博客,因为一个模块都没有仔仔细细地搞明白,的确没什么好写的,写学习总结又太懒.更新github的代码更是别提了,基本没闲情了.

最后做一下总结,学校的自由让我保持着对编程的热情,但是有限的条件和机会让我感觉不到快速进步的空间.然而公司虽然给了很大的环境却让我陷入了一个枯燥和禁闭的状态.

在这里我对工作又有了一个新展望: 开放,探索,自由.

索引:

最大子数组之和问题是一道经典的面试题.
这里对这个问题的迭代解法做一个证明.

先给出C的伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

int max_sub_array(int* arr, int len
{
int temp = 0, max = arr[0];
int i;
for(i = 0; i < len; i++) {
temp += a[i];
if(temp < 0) {
temp = 0;
}
if(temp > max) {
max = temp;
}
}
return int
}

算法的思路是这样的:

所以用\(temp\)存储\(A[p..r]\),只要\(A[p..r]\)求和是负数的话,那么下一个\(A[r+1]\)开始,\(temp\)重新清零开始求和,只要\(temp\)比和大,就更新这个和.

这个算法的思路证明如下:

对于\(A[p..r]\),如果\(A[p..r]\)求和为负数的话,那么数组\(A[p..r+1]\)的最大子数组肯定不会是\(A[p..r+1]\)
因为\(A[p..r] + A[r+1] < A[r+1]\)的.

对数组的所有元素进行一个划分,对于\(A[i]\),如果能够使得 \(temp>0 and temp+A[i]<0\),那么这个元素就是一个划分的边界.
这样就会形成很多个区间.

接下来要证明

  1. 最大子数组一定在划分块内
  2. 一定存在首元素是划分块的首元素的最大子数组

对于第一个点,假设有一个元素\(A[i]\)在划分区间\(A[p..r]\)内,那么\(A[p..i-1] > 0 and A[i..r] < 0\)一定成立,这个可以用反证法.

如果一个最大子数组横跨了这些区域假设这个最大子数组为\(A[i..j]\),并且\(A[r]\)最大子数组中最后一个处于分界的元素.
那么\(A[i..r] < 0\)一定成立,所以最大子数组不可能有这段部分,因此最大子数组只会处于划分内.

对于第二点,假设\(A[i..j]\)处于划分\(A[p..q]\)当中,那么\(A[p..i-1]+A[i..j] >= a[i..j]\)一定成立,所以最大子数组
的开始元素也是划分的开始元素.

综合起来就能证明这个算法的正确性.

实际上抽象地理解就是把数组划分成了恰好多加一个元素和就变负数的N个划分,而最大子数组不可能超过这个划分,
因此在计算划分当中的最大值就能得到最大子数组.产生一个新的划分的时候如果碰到负数肯定是独自成立了一个划分,如果碰到正数
就会作为新的划分的开始,所以这个划分的第一个元素会成为最大子数组的第一个元素,然后遍历每次包含一个元素以后的和就能确定最大值,最终获得最大子数组.

普通配置

安装 arp-scan,可以通过ARP扫描获得IP到MAC的映射表.

执行 arp-scan --interface wlan0 --localnet(需要root权限),interface对应的是接入的网卡,这里我连接的是WIFI,所以选择wlan0.
结果是一堆表

1
2
3
4
5
6
7
8
9
Interface: wlan0, datalink type: EN10MB (Ethernet)
Starting arp-scan 1.8.1 with 256 hosts (http://www.nta-monitor.com/tools/arp-scan/)
192.168.0.1 50:bd:5f:5f:02:be (Unknown)
192.168.0.100 8c:21:0a:7f:1a:bf (Unknown)
192.168.0.102 74:e5:43:db:f4:81 (Unknown)
192.168.0.107 74:51:ba:c5:6f:4f (Unknown)
192.168.0.103 8c:be:be:9f:c7:ec (Unknown) 192.168.0.114 b8:27:eb:63:d8:74 (Unknown)
192.168.0.108 f8:a4:5f:ef:a4:b2 (Unknown)

在里面没有看到名字,但是树莓派的MAC都是b8:27:eb开头的.
所以可以执行arp-scan --interface wlan0 --localnet | grep b8:27:eb就可以看到树莓派对应的IP.

ssh在树莓派上是默认开启了的,这个时候可以通过ssh pi@ipssh到树莓派上,密码是默认的__raspberry__.

这是在没有配置静态IP的情况下,如果不想每次都通过动态IP登陆的话,可以设置树莓派的静态IP。

通过编辑 /etc/network/interfaces 可以看到这样一行iface eth0 inet dhcp,这表示以太网的ip是动态分配的,
现在把这行换成iface eth0 inet static然后再进行我们的IP配置.

1
2
3
4
5
6
7
8
9
10
11
12
13
...省略
iface eth0 inet static
#my static IP
address 192.168.0.118 // 自己设置的静态IP
#my gateway IP
gateway 192.168.0.1 // 网关 netstat -nr 可以通过路由表查看网关.
netmask 255.255.255.0 // 子网掩码 可以通过ifconfig 查看子网掩码,一般都是255.255.255.0
#my network address "family"
network 192.168.0.0 // 表示网段. netstat -nr 也可以看到.
broadcast 192.168.0.255 // 表示广播网络. ifconfig可以查看,最后一项一定是255.

...省略

比如我不想记住这个IP,可以编辑/etc/hosts文件,来给IP绑定一个名字.

1
192.168.0.118 pi

这样以后就可以通过 ssh pi@pi 来登陆树莓派了.

高潮来了:

之前配置树莓派的时候把静态IP设置了,然后来北京实习之后没有带显示屏,所以只能通过一根网线来操作,网线还是我从公司上拔下来的.
其实奇淫巧计还是蛮多了,一种是直接把SD卡拔下来在电脑上改文件,但是之前我以为是操作系统崩掉了,所以傻逼一直想连接上去,最后发现这个地址: (http://embeddedday.com/projects/raspberry-pi/basics/direct-connection-to-pc

真是たすかた呀,boot/下面的cmdline.txt最后添加一段ip设置,可以设置一个静态IP(当时只是想连接上去,其实直接改网络配置文件就好了)然后把网线和树莓派直连ssh上去,最后连接上去了,然后果断把我的静态IP配置回去,我又买了一个USB无线网卡,记录一下无线配置.

1
2
3
4
5
6
7
8
allow-hotplug wlan0
auto wlan0
iface wlan0 inet static
address 192.168.99.226
netmask 255.255.255.0
gateway 192.168.99.1
wpa-ssid "id"
wpa-psk "password"

之前一直小看了heap觉得这个数据结构比较简单,等串讲的时候发现其实并不是的,居然为了heap的插入删除争论了很久.
这里特地记录一下,heap的性质是递归的父结点比子节点大,说白了就是一个优先队列.
push,pop都只要分别做一次向上调整和向下调整.

但之前我的知识体系好像调整要有一次down再一次up.
这是在真正调整的时候用的比如替换任意一个元素或者删除任意一个元素.
替换很好理解就是引入一个元素可能很大也可能很小,要么向上要么向下.
而我卡住的地方是删除,老是觉得把最后一个结点换上来应该是不用向上调整只要向下的,
因为这个定义是递归的,但是不是的,虽然在删除处i下面的子树可以,但是如果最后一个结点不在i的子树下面,比如另外一个i的兄弟子树下面,整棵树都比i小的话(这里指最小堆),那么其实情况和替换是类似的,所以任意删除和替换才是会把down up两个函数都写入,但都是只执行一次.

之前用jekyll在Github上面搭建了一个博客,但是发现语法高亮什么的,并不完全是markdown格式的,而且迁移起来也奇奇怪怪的,于是决定换成hexo来搭建。另外一个原因是想在博客里面写一些数学公式等等,但是却发现没有什么主题是支持的,然后就想自己改一个,结果发现自己改的实在太丑,不想用,当然,作为工程师型的人,丑不丑倒是无所谓,主要是配来配置去把耐心给消耗了,最后决定用别人写好的主题,选择了next漂亮而且支持数学公式等等(当然数学公式的支持就是添加一个js库MathJax就可以搞定了,也不是什么难事)。

hexo的部署可以参考官网和一些网上的资料,已经有很多了。这里主要记录一下hexo的一些主要使用方法。

写作

  1. 草稿

    通过hexo new [layout] <title>这样的命令就可以生成对应的文件,比如草稿,
    hexo draft “draft”(如果题目有空格,要带双引号,layout默认是post,这
    些layout都存在_scaffolds_下面。可以通过hexo设置是否在网站上显示草稿,我
    设置的是显示(设置*_config里面的render_draftstrue*即可)。

  2. 发布

    发布可以选择草稿发布,也可以选择直接发布新文章,对应的命令分别是hexo publish [layout] <title>hexo new [post] <title>
    这个时候生成的静态文件里面就可以看到新的文章了。

  3. 前缀

    tags和categories里面的内容,按照[content1,content2,…]的格式就可以分开显示在标签栏里面。

  4. 特殊插入

    tag-plugins中可以找到一些特殊需要的插入,比如插入gist,twitter的引用或者youtube视频等等。

  5. i18n

    支持国际化,在*_config.yml*下面设成

     language:
     - zh-CN
     - en
    

    设置*_config.yml里面的i18n_dir可以根据路径前缀判断对应的语言。
    在发布的时候带上
    –lang*选项设置语言,比如hexo new "Hello World" --lang tw就可以针对不同的语言产生对应目录。

  6. 数学公式

    数学公式主要依赖于MathJax,在编辑文本的时候$$...$$\[...\]用来表示数学公式,\\(...\\)用来表示行内数学公式,比如\\(a^2+b^2=c^2\\)的效果是\(a^2+b^2=c^2\)。而$$ a^2+b^2=c^2 $$,就会另起一行。

    $$ a^2+b^2=c^2 $$

    具体的使用可以在这里查看
    注意markdown的一些符号,需要在公式里面转义,省的被markdown语法误解。