ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

我的一点观点

我个人觉得 nvidia 的 CUDA 太封闭了,不是很能明白这样封闭的产品怎么能够长久生存,仅仅是因为家大业大么,如果有家公司推出了八成性能的 GPU,但是整套开发的生态非常友好,是不是会像 Android 取代诺基亚一样,还是 nvidia 就是苹果,就算封闭环境也能保证强劲体验,这我也不好说了。

安装

CUDA+cudnn 装起来挺麻烦的,反正如果有错误的话,可以检查 CUDA samples 里面的 deviceQuery 是否成功,如果不成功可以用 strace 看一下少了什么东西,再想办法安装上去。检查 cudnn samples 是否成功也是一样的,里面有一个 mnistDNN 的例子。

Hello World!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <stdio.h>

__global__ void helloFromGPU (void)
{
printf("Hello world!\n");
}

int main(void)
{
printf("Hello World! from CPU\n");

helloFromGPU <<<1, 10>>>();
cudaDeviceReset();
return 0;
}

以上就是一段 GPU 的 Hello world,执行下面的代码可以看到我们在 GPU 上并行执行了十个 “Hello world!” 的打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
$ nvcc -arch sm_20 hello.cu -o hello
$ ./hello
Hello World! from CPU
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!

CUDA

CUDA 全称 Compute Unified Device Architecure,用于定义 GPU 的架构标准。GPU 的工作方式主要依赖于多核心的并行计算,CUDA 提供了方便的模型进行这种模式的编程,下面就会简单介绍一下 CUDA 的架构以及基于 GPU 的编程。

CUDA 一次启动的线程称为网格(grid),网格中包含块,每个块包含新程,是一个二维的模式,这张图片就说明得很清晰,首先是由 Grid 组成,然后每个 Block 有 shared memory,同 block 的线程可以访问 shared memory,不同的 Block 的线程只能访问全局的内存,这种结构也方便设计和实现并行算法。

cuda-arch

CUDA 的编译器 nvcc 是 gcc 的一个扩展,支持编写运行在 GPU 上的函数,其中的,<<x, y>> 扩展就是用来指定 block 和 thread 的数量的。

比如典型的俩个向量相加的例子:

1
2
3
4
void sumArrayOnHost(float *A, float *B, float *C, const int N) {
for (int i = 0; i < N; i++)
C[i] = A[i] + B[i]
}

但是 GPU 的核函数怎么写呢,是这样的。

1
2
3
4
__global__ void sumArrayOnGPU(float *A, float *N, float C) {
int i = threadIdx.x
C[i] = A[i] + B[i]
}

然后,如果 N 是 32 的话,可以如下调用,__global__ 表示这个函数可以在 host 调用也可以在 GPU 上调用,用个 32 个线程的 block 算这个向量和,N 隐性包含在了定义中。

1
sumArrayOnGPU<<1, 32>>(float *A, float *B, float *C)

线程组织

基于 CPU 和 GPU 的异构计算平台可以优势互补,CPU 负责处理逻辑复杂的串行程序,而 GPU 重点处理数据密集型的并行计算程序,从而发挥最大功效。

线程组织主要依据网格(现在网络这个词其实比较容易混淆,可以是神经网络可以是计算机网络,这里指的是线程的组织形式)模型。主要分 grid, block, thread,组织的方式就靠索引来决定,可以通过 block 索引和 thread 索引进行线程的定位。CUDA 提供了块内线程同步的方法,但是没有提供块间同步的原语。

线程束分化

线程束 = warp

CPU 有很强的分支预测的能力,会预加载指令,如果预测正确,执行代价就很小,但是 GPU 在这方面是很弱的,因为线程束当中如果执行分支出现不同,那出现分支的线程就会被禁止执行,这也是用 GPU 做并行编程的时候需要参考的一个很重要的因素,保证并行的线程出现线程束分化的情况尽量少。

下面就是线程束分化的例子。

并行归约问题

并行归约是指如何对并行问题进行归约,比如说相邻配对和交错配对。

设备管理

管理 GPU 主要是通过 CUDA API 或者 nvidia-smi 命令来获取。

SM(流式多处理器)

上面讲的是抽象上的分层,但是实际的物理层面承载 GPU 的是 SM。最早的 GPU 架构叫 Fermi,然后是 Kepler,然后才是 Tesla。一般说的 cm_20, sm_20 就指这种计算能力和架构,新款的 GPU 计算能力要更强一点。

内存模型

CUDA 的内存模型和 CPU 是类似的也是多级的结构。线程有局部内存,块之间有全局内存。GPU 和 CPU 都是主要采用 DRAM 来做主存,CPU 的一级缓存会用 SRAM 做。CPU 的多级缓存对用户来说不是很需要考虑,尽量屏蔽了其中的细节,但是 GPU 相对来说会把这种分级结构暴露给用户,这对编程来说也是一种新的挑战。

后面

用 GPU 计算一些计算密集型的程序,速度是真快,比 CPU 块很多,所以这也是为什么大量深度学习的应用都要通过 GPU 加速的原因。

本文通过裸写神经网络的方法,帮助理解神经网络的工作方式,直接在 klab 上查看就可以。

kube-dns 是 kubernetes 基于 DNS 的服务发现模块,主要由三个容器组成,分别是 dnsmasq, kube-dns, sidecar,整体的结构如图。

sidecar

sidecar 是一个监控健康模块,同时向外暴露metrics 记录,但是为啥叫三蹦子不知道。

接受的探测参数是

--probe=<label>,<server>,<dnsname>[,<interval_seconds>][,<type>]

例子如下

--probe=dnsmasq,127.0.0.1:53,kubernetes.default.svc.cluster.local,5,A

等于是每隔 5s 向127.0.0.1:53 进行 DNS 查询 kubernetes.default.svc.cluster.local 的 A 记录

对应的结构体是

dnsmasq

dnsmasq-nanny 是 dnsmasq 的保姆进程,dnsmasq 是一个简易的 DNS server。

dnsmasq-nanny “–” 后面是 dnsmasq 的参数,比如下面这个参数表示的是把 server=/cluster.local/127.0.0.1#10053 当作 dnsmasq 的配置,10053 是 kube-dns 的地址,也就是把 cluster.local 的域名拦截转到 kube-dns 进行解析,剩下的通过正常的域名解析流程。

--server=/cluster.local/127.0.0.1#10053

dnsmasq 简单来说扮演的是集群当中的一个传统 dns server 并且把集群内部的 dns 查询拦截到 kube-dns 当中通过中心化的方法进行 dns 查询,集群的 dns 查询主要依靠 kube-dns。

kube-dns

kube-dns 主要基于 skydns 来实现。

k8s.io/dns/pkg/dns/dns.goKubeDNS.Start 下面有 endpoints 和 services 的 controllers,会把 service 注册到 kube-dns 的 cache 当中 (k8s.io/dns/pkg/dns/treecache),这里有 k8s 域名命名规范

主要的实现方式是 skydns 接受一个后端实现。

KubeDNS.Records KubeDNS.ReverseRecord 基于 TreeCache 实现 DNS 记录存储的后端,从而使得 skydns 提供 DNS 服务。

总结

整体来说 kube-dns 还是一个比较简单的模块,基于 kube-apiserver 的一个控制器,提供中心化的 DNS 查询。

kube-controller-manager 可以认为是一个守护进程用于监视 apiserver 暴露的集群状态,并且不断地尝试把当前状态向集群的目标状态迁移。为了避免频繁查询 apiserver,apiserver 提供了 watch 接口用于监视资源的增加删除和更新,client-go 对此作了抽象,封装一层 informer 来表示本地 apiserver 状态的 cache。这个视频 有一个 Google 工程师讲解的 client-go 的详细内容,这篇 七牛前同事的文章介绍了 informer 的整体结构,写得也很好。

client 当中的 controller

处理事件的 controller 由几部分构成,首先是 Config 当中的可配置部分,下图是 controller 的关系,controller 实现了 Controller 接口。

controller 从 Queue 当中通过 Pop 获取对象交给 Process 回调处理,DeltaFIFO 和 FIFO 是类似的,只是 DeltaFIFO 可以处理删除事件,一般都用 DeltaFIFO。ListerWatcher 就是用客户端构造出来的,针对对应资源的 List Watch 方法的集合,List 用于获取最开始的对象获取,Watch 用于监控之后的变化,所有最开始的时候现有的对象会通过 List 传给 Add 回调,同步了当前状态以后再不断接受新的变化,但是 Watch 本身是有超时机制的,不能永久监听,所以再超时之后还会通过 List 方法,先同步一次再进行删除操作。Resync Period 表示把 cache 中的对象重新入队给回调函数处理,这种情况一般是可能你可能漏掉了更细操作,或者是之前的一些失败了。大部分情况用不到这个选项,可以非常相信 etcd 的功能。

调用路径

1
2
3
4
5
cache.NewListWatchFromClient -> listWatcher
cache.NewIndexer -> store
cache.NewLister(store) -> lister
cache.NewReflector
go refector.Run()

还有一个关键结构是 reflector,reflector 会把对象转化成对应的需要的对象并且 Add 到 Queue 当中去。

Informer 本身的框架是异步的,所以为了做并发控制就引入了 workqueue 的组件,workqueue 有 rate limit 的功能,并且能够合并更新操作。

注意不要修改传入的对象,因为他们要和 cache 一致,如果要写对象的话,需要使用 api.Scheme.Copy 这个函数,进行深度拷贝,所有的 k8s Object 都要支持深拷贝的方法。

kube-controller 当中的插件式 controller

k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go 中的NewControllerInitializers 函数有大部分 controller 的列表,bootstrapsignertokencleaner 是默认关闭的。

1
2
3
4
var ControllersDisabledByDefault = sets.NewString(
"bootstrapsigner",
"tokencleaner",
)

需要特殊初始化的是 serviceaccount-tok,另外的是 NewControllerInitializers 当中的 controller 了。所有的 controller 初始化函数都要满足如下接口。

1
2
3
4
// InitFunc is used to launch a particular controller.  It may run additional "should I activate checks".
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type InitFunc func(ctx ControllerContext) (bool, error)

node controller

接下来看一个具体的 controller,startNodeController

startNodeController 首先解析 ClusterCIDRServiceCIDR 两个子网范围,下面是 NodeController 初始化需要的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
nodeController, err := nodecontroller.NewNodeController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Extensions().V1beta1().DaemonSets(),
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.Options.PodEvictionTimeout.Duration,
ctx.Options.NodeEvictionRate,
ctx.Options.SecondaryNodeEvictionRate,
ctx.Options.LargeClusterSizeThreshold,
ctx.Options.UnhealthyZoneThreshold,
ctx.Options.NodeMonitorGracePeriod.Duration,
ctx.Options.NodeStartupGracePeriod.Duration,
ctx.Options.NodeMonitorPeriod.Duration,
clusterCIDR,
serviceCIDR,
int(ctx.Options.NodeCIDRMaskSize),
ctx.Options.AllocateNodeCIDRs,
ipam.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
ctx.Options.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
)

InformerFactory 是用来构造具体 resource 的 informer 的工厂类型,构造了 pods, nodes, daemonsets 的 informer, 说明 node controller 需要 watch 这几种 resource 的变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

glog.Infof("Starting node controller")
defer glog.Infof("Shutting down node controller")

if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}

// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)

if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}

if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
}

<-stopCh
}

现在看一下运行的时候是如何做的,首先要调用 controller.WaitForCacheSync 等待 node,pod,daemonSet 的 inofrmer 同步,这是因为在 kube-controller-manager 当中使用的子 controller 使用的 informer 都是共享型的,也就是多个 controller 之间共享一个 informer 的 cache,所以在开始的时候需要保证所有的 sharedInformerFactory 创建的 informers 之间的 cache 先等待一次一致。controller 的读基本上是从 cache 读的,只要写才会打到 etcd 里面,然后等待 cache 的更新回调。

nodeController 主要分成两部分,一部分是 monitorNodeStatus , 它首先从 informer 的 cache 当中 list 新添加的节点和删除的节点,和 newZoneRepresentations。node 是分 zone 的,这在单可用区的 cluster 当中是个空字符串,但是如果 labels 中有 failure-domain.beta.kubernetes.io/zonefailure-domain.beta.kubernetes.io/region 就会构成不同的可用区的划分, 这个适用于仅仅在一家云厂商分不同可用区的时候可以用到。这篇文档 描述了多 zone 的 cluster 的内容,没有 zone 有相应的可用状态,如果某个 zone 变成不可用需要把 pod 从这个 zone 当中剔除,所以 pod 的 failover 是以 zone 为单位的。

处理 node 比较啰嗦,tryUpdateNodeStatus 尝试获取当前的 conditions 更新并且获取 conditions。处理 node 过程主要是标记 node 为不可用的 node,或者把不可用的状态恢复过来。

接下来的就是处理 pod eviction 的部分,另外,这篇文档解释了一些 kubelet 支持的资源耗尽的情况下 kubelet 的剔除策略。

taintManager.Run 处理 taint 的逻辑,这篇文档 解释了 taint 和 toleration 的关系,以及基于 taint 的 eviction 策略。首先看 node 的更新,然后把上面不能 tolerate 的 pod 传给 handlePodUpdate,然后 pod 有更新也会 handlePodUpdate,在 pod 更新的时间中会让 node 抢占一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
for {
select {
case <-stopCh:
break
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
case podUpdate := <-tc.podUpdateChannel:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
}
}

最后是 eviction 的部分,基于 eviction 的会把 pod 直接删除,基于 taint 只是打上标记,然后通过上面的 tiantManager 剔除。和 pod eviction 不同,taint eviction 是通过限制加 taint 的速率控制 raltelimit 的。

podGCController

pod GC Controller 只 watch pod 一种资源,比较简单。

k8s.io/kubernetes/pkg/controller/podgc/gc_controller.go 下。

gcc.gcOrphaned 删除 node 不存在的 pod,gcc.gcUnscheduledTerminating 删除正在终止,但是没有调度的 pod,gcc.gcTerminated 删除已经被终止的 pod。

其他 controller

其他 controller 也是类似的,从 apiserver 获取状态,并且向对应的状态迁移,这也是为什么 kubernetes 的命令和资源都是宣告式的原因。

kubernetes 概览

以下是 k8s 的整体架构,在 master 节点上主要是 kube-apiserver(整合了 kube-aggregator),还有 kube-scheduler,以及 kube-controller-manager,包括后端存储 etcd。

其中 kube-apiserver 是一个比较关键的部分,而且前期写得坑很多,导致这一部分虽然看起来是一个 API server 其实代码很复杂,特别冗余,而且目前对 kube-apiserver 还要做拆分,能够支持插入第三方的 apiserver,也就是又一个 aggregated apiserver 的 feature,也是和 kube-apiserver 和里面包的一层 genericserver 揉合在一起了,感觉一个大的系统 API server 越写越挫是一个通病,还好现在 k8s 迷途知返正在调整。

kube-apiserver

Kube-apiserver 可以是认为在 generic server 上封装的一层官方默认的 apiserver,有第三方需要的情况下,自己也可以在 generic server 上封装一层加入到集成模式中,这里主要介绍 kube-apiserver 的结构。

restful API

kube-apiserver 是一个 restful 服务,请求直接通过 HTTP 请求发送,例如创建一个 ubuntu 的 pod,用以下的 pod.yaml 文件。

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: Pod
metadata:
name: ubuntu1
labels:
name: ubuntu1
spec:
containers:
- name: ubuntu1
image: ubuntu
command: ["sleep", "1d"]

执行命令 kubectl create -f ./pod.yaml -v=8,可以看到对应的 POST 请求如下。

1
2
3
4
5
6
7
8
Request Body: {"apiVersion":"v1","kind":"Pod","metadata":{"labels":{"name":"ubuntu1"},"name":"ubuntu1","namespace":"default"},"spec":{"containers":[{"command":["sleep","1d"],"image":"ubuntu","name":"ubuntu1"}],"schedulerName":"default-scheduler"}}
curl -k -v -XPOST -H "Content-Type: application/json" -H "Accept: application/json" -H "User-Agent: kubectl/v1.7.5 (linux/amd64) kubernetes/17d7182" https://localhost:6443/api/v1/namespaces/default/pods
POST https://localhost:6443/api/v1/namespaces/default/pods 201 Created in 6 milliseconds
Response Headers:
Content-Type: application/json
Content-Length: 1208
Date: Wed, 18 Oct 2017 15:04:17 GMT
Response Body: {"kind":"Pod","apiVersion":"v1","metadata":{"name":"ubuntu1","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/ubuntu1","uid":"9c9af581-b415-11e7-8033-024d1ba659e8","resourceVersion":"486154","creationTimestamp":"2017-10-18T15:04:17Z","labels":{"name":"ubuntu1"}},"spec":{"volumes":[{"name":"default-token-p0980","secret":{"secretName":"default-token-p0980","defaultMode":420}}],"containers":[{"name":"ubuntu1","image":"ubuntu","command":["sleep","1d"],"resources":{},"volumeMounts":[{"name":"default-token-p0980","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","securityContext":{},"schedulerName":"default-scheduler","tolerations":[{"key":"node.kubernetes.io/not-ready","operator":"Exists","effect":"NoExecute","tolerationSeconds":300},{"key":"node.alpha.kubernetes.io/unreachable","operator":"Exists","effect":"NoExecute","tolerationSeconds":300}]},"status":{"phase":"Pending","qosClass":"BestEffort"}}

从 url path 里面可以看到几个划分,path 的分类大概有下面这几种。

路径上整体分成 group, version, resource, 作为核心 API group 的 core(包括 pod, node 之类的 resource),不带 group,直接接在 /api/ 后面,其他的 api group 则接在 /apis 后面。以 pod 为例,pod 对应的数据类型如下,这个数据结构和 POST 请求中的结构的参数是一致的。

如果是 job 的话则是在,pkg/apis/batch/v2alpha1/types.go,和 API 路径是对应的。例子当中 kubectl 加上 level 大于 8 的 log 就会打印请求和相应的 body,可以看到 request body 和上面的数据结构是一致的。这个请求会发送到 apiserver 进行处理并且返回存储之后的 pod。

重要结构体

Config

父结构,主要的配置内容,其中有一个结构 RESTOptionsGetter genericregistry.RESTOptionsGetter 是和 API 初始化相关的,这个接口的实现是在 k8s.io/apiserver/pkg/server/options/etcd.go 中的 storageFactoryRestOptionsFactory 实现的,对应的实现函数是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}

ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}

return ret, nil
}

APIGroupInfo

APIGroupInfo 主要定义了一个 API 组的相关信息,观察一下 APIGroupInfo 是如何初始化的。

k8s.io/pkg/master/master.go 当中,每个 Resource 都要提供自己的 Provider,比如说 storagerest 就在 k8s.io/kubernetes/pkg/registry/storage/rest/storage_storage.go 定义了 NewRESTStorage 方法。而默认的 resource 的 legacy provider 单独处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
if c.ExtraConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
}
m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
}

然后通过调用 k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProviderNewLegacyRESTStorage 来初始化基础对象的 apigroup info,比如初始化 podStorage,serviceStorage 和 nodeStorage 等等。legacy ApiGrouInfo 的 Scheme, ParamaterCodec, NegotiatedSerializer 都是用 "k8s.io/kubernetes/pkg/api" 包下的全局变量初始化的。

1
2
3
Scheme:                      api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,

然后合并成一个 restStorage 存入 apiGroupInfo 中。

1
2
3
4
5
6
7
8
9
10
11
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
...

举个例子 podStorage 就是用的 genericregistry.Store,这是一个通用的 etc 辅助结构,把 etcd 抽象成存储结构。

1
2
3
4
5
// REST implements a RESTStorage for pods
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}

serialization

pkg/api.Codecs 是全局默认的 codec 来自下面这段代码。

1
2
3
4
func NewCodecFactory(scheme *runtime.Scheme) CodecFactory {
serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory)
return newCodecFactory(scheme, serializers)
}

默认具体定义了这几种 serilizer。

1
2
3
4
5
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []serializerType {
jsonSerializer := json.NewSerializer(mf, scheme, scheme, false)
jsonPrettySerializer := json.NewSerializer(mf, scheme, scheme, true)
yamlSerializer := json.NewYAMLSerializer(mf, scheme, scheme)
...

而且标准库的 json 有很严重的性能问题,换用了 json-iter 但是有很多标准库不兼容的问题,性能提升了大概 20% 但是没办法和进主线,我尝试在上面工作的了一段时间,改了两个问题还是有错,由于时间关系,暂时放弃了这个工作,相关的 issue 在这里

filters

首先通过 ./staging/src/k8s.io/apiserver/pkg/server/config.go 下的 DefaultBuildHandlerChain 构建 filters。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.RequestContextMapper, c.Authorizer, c.Serializer)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer, c.Serializer)
if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) {
handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
} else {
handler = genericapifilters.WithLegacyAudit(handler, c.RequestContextMapper, c.LegacyAuditWriter)
}
failedHandler := genericapifilters.Unauthorized(c.RequestContextMapper, c.Serializer, c.SupportsBasicAuth)
if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) {
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicyChecker)
}
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
handler = genericfilters.WithPanicRecovery(handler)
return handler
}
panic recover

genericfilters.WithPanicRecovery 在 handler 的最外层对出现的 panic 恢复,并且打印每次请求的 log,所以你想观察 API 请求的情况可以 grep wrap.go 就能看到。

request context

apirequest.WithRequestContext 给 request 绑定一个 Context

RequestInfo

跟路 url 提取后续请求需要的 group, version, namespace, verb, resource 等信息。

WithTimeoutForNonLongRunningRequests

限制 API 调用时间,超时处理提前终止 write。

WithCORS

允许跨域访问。

authentication

k8s.io/apiserver/pkg/endpoints/filters/authentication.go 下。WithAuthentication 插入鉴权信息,例如证书鉴权,token 鉴权等,并且从鉴权信息当中获取 user 信息(可能是 service account 也可能是外部用户)user 身份是由 里面的几种方式确认的

authorization

检查是否有权限进行对应资源的操作。一种是 RBAC 一种是 Node。具体这两种方式可以看这个介绍,RBAC 主要是针对服务的,而 Node 模式主要是针对 kubelet 的。

impersonation

让用户伪装成其他用户,比如 admin 可以用普通用户的身份创建资源。

路由

通过 genericapiserver 的 InstallLegacyAPIGroup 就注册到路由当中。具体的做法就是根据 version, resource, sub resource, verb 等信息构造路由,然后用 go-restful 注册处理函数。比如说 GET

1
2
3
4
5
6
7
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)

handler 里面做的内容就是序列化,然后根据具体的要求(GET DELETE 等)到 etcd 中操作,当然本身还有一层缓存,这取决于 API 的 options 是希望更新还是直接读缓存(缓存会比 etcd 旧一些),比如对于 kubelet 会不断查询 node 信息,但是 kubelet 本身并不需要最新的信息,这个时候就会从缓存中读取。

性能调优

开启代理 kubectl proxy,就可以通过 localhost 直接访问 kube-apiserver HTTP 服务。然后执行 go tool pprof http://localhost:8001/debug/pprof/profile 可以获得 profile 结果,下图红色的部分就是调用耗时最多的部分。

除此之外,kube-apiserver 本身也暴露了很多 prometheus 的 metrics 但是往上现在没有现成的模板,只能根据自己的需求来在 prometheus 当作做 query。可以在 k8s.io/apiserver/pkg/endpoints/metrics/metrics.go 里面看到。

之前也说过,超时间调用时会打 log 的,在代码中保存了一些 trace 日志,可以通过 grep Trace来过滤。Trace[%d] 这样开头, %d 是一个 id 可以看到具体的 trace 信息。

最近因为 k8s 的默认调度器功能太丰富,太“高级”了,一些屌丝特性没有满足,所以前段时间自己魔改了一下满足了一些屌丝特性,暂时叫做乞丐调度器,顺便把默认的调度器代码翻了一下,这里对默认的代码做一下总结。

CreateScheduler

CreateScheduler 会从 policy file 当中获取算法的配置信息。接口k8s.io/kubernetes/plugin/pkg/scheduler.Configurator 定义了构造一个 scheduler 的配置信息。k8s.io/kubernetes/plugin/pkg/scheduler/factor.ConfigFactory 是这个接口的一个实现。c.Create 会把 AlgorithmProvider 配置的 PredicatePriority 的 key 当作参数传给,f.CreateFromKeys 这样主要对应的 key 注册了的话就会有对应的算法绑定到 scheduler 上面。目前有两种 Provider,一种是用默认 predicate 和 默认 priority 的 AlgorithmProvider,另一种是把 LeastRequestedPriority 换成 MostRequestedPriority 的自动伸缩友好的 AlgorithmProvider。

1
2
3
4
5
6
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities())
// Cluster autoscaler friendly scheduling algorithm.
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, defaultPredicates(),
copyAndReplace(defaultPriorities(), "LeastRequestedPriority", "MostRequestedPriority"))

Scheduler Server

options.ScheduleServer 是服务端对应的配置结构,其中有几个成员。

  1. KubeSchedulerConfiguration 调度器的配置
  2. Master 表示 API server 的地址
  3. Kubeconfig k8s 配置文件的路径

func Run(s *options.SchedulerServer) error 会根据 Sechduler Server 来运行。EventBroadcaster 接受事件,并且把事件发送给事件处理者( EventSink watcher, log),startHTTP 主要是是 profiling 接口,心跳检测接口和 prometheus 的 instrumenting 接口。informerFactory,看起来是一个异步同步信息的 cache,平时调度是直接走 cache,更新的时候才会走 API。最后配置了选主的话会从 Etcd 拿到锁,并且拿到 Master 的锁。

k8s.io/kubernetes/plugin/pkg/scheduler.Scheduler.Run

初始化以后,Run 对应的是一个 0 秒循环的大 loop(相当于每次 loop 等于主动调用一次 Go runtime.Sched()),在每次循环当中都会调用 sched.scheduleOne,首先 NextPod 会同步等待一个 pod 变成 available 的状态,并且跳过正在被删除的 pod,然后调用 sched.schedule 走到具体的调度算法当中,整个过程是串行,没有批量调度 pod 的操作。在进行具体的调度算法之后,会得到一个可行的 node,如果调度失败的话会,并且调度失败的原因是找不到合适的 node 的话,就尝试 sched.preempt,这个的作用就是尝试在替换现有 pod 的情况下能够获得调度机会的策略,那么就抢占已经被调度的 pod,标记目标 pod 的 Annotation 然后踢出权重最低的那个 pod。如果成果获得一个可调度的节点,就通过把本地 cache 先更新到已经调度之后的状态,标记 pod 已经在要调度的 node 上,也就是调用 sched.assume 假设 pod 已经调度到了节点上,再异步的通过 ApiServer 的接口,sched.bind 让 pod 正在运行到 node 上。

sched.schedule

schedule 调用 algorithm/scheduler_interface.go下面定义的调度器的接口 Schedule。对应的实现在 core/generic_scheduler.go 下面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
// onto machines.
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate
// Prioritizers returns a slice of priority config. This is exposed for
// testing.
Prioritizers() []PriorityConfig
}

Schedule 会根据调度算法得到一个合适的节点,而 Preempt 则是尝试抢占一个 pod 以获得调度到节点上的机会。PredicatesPriorities 则是两个重要的部分,Predicates 类似一个过滤器,对节点进行筛选,而 Priorities 则是对筛选出来的节点进行权重的排序,最后得到一个合适的调度节点。

算法工厂

算法工厂就是注册 PreciatePriority 的地方,之前已经说了可以通过 AlgorithmProvider 获得一组 PredicatePriority,比如 DefaultProvider 提供了默认的一套,如果不用 Provider,需要在 policy file 当中另外指定要使用的 PredicatePriority,不过目前好像没有用这种方式,还是通过 Provider 指定了一套要使用的算法 。注册 AlgorithmProvider 是通过factory.RegisterAlgorithmProvider,然后调用 NewGenericScheduler (在 k8s.io/kubernetes/plugin/pkg/scheduler/core/generic_scheduler.go 当中),初始化要用到的 predicates 和 priorities。

调度过程

Schedule 其实很简单,就是通过 findNodesThatFit,先根据 Predicate 过滤出合适的 Node,然后调用 PrioritizeNodes,用 Priorities 对 Node 根据算法的权重进行排序,因为每个 node 要走的流程是一样的并且最终结果相互没有影响,所以这个过程是并发的,这篇文章的图画的很好,示意很明显。

最后会得到一个最理想的节点,再通过 bing 告诉 API server 这个节点被选中了。

sched.preempt

抢占过程是在 pod 没有找到合适的节点情况下,如果能在踢出一个 pod 获得调度机会的情况下进行抢占。抢占算是一个比较新的特性,在 1.8 里面都是默认关掉的,要打开的话需要指定kube-scheduler--feature-gates=PodPriority=true 还有 apiserver 的 --runtime-config=scheduling.k8s.io/v1alpha1=true。可以通过添加 PriorityClass 把 pod 分权重,现在这个特性算是给 pod 也加上的权重。

1
2
3
4
5
6
7
8
apiVersion: v1
kind: PriorityClass
metadata:
name: high-priority
value: 1000000
globalDefault: false
description: "This priority class should be used for XYZ service pods only."

然后可以在spec当中指定 priorityClassName: high-priority,这样这么大的权重,这个 pod 就很难被抢占了,具体流程如下图。

podEligibleToPreemptOthers 主要判断如果 目标 pod 被标记为(通过 pod 的 annotation 标记)已经要抢占其他 pod,并且有一个优先级小于 目标 pod 的 pod 即将被删除 (p.DeletionTimestamp != nil ),就直接退出,因为这个时候这个被删除的 pod 其实在为目标 pod 腾出空间了,在下次调度的时候就会获得调度机会。nodesWherePreemptionMightHelp,类似于 schedule 的时候的 predicate 阶段,只不过多了一步是通过尝试移除 pod 跑一遍 predicates 看看这个节点能不能被通过。 selectNodesForPreemption 则和 priority 的阶段类似,把删除 pod 之后的可以通过的节点进行排序选出一个排名最高的节点。再通过 selectVictimsOnNode 把节点上的 pod 按照节点的 priority 排序选出“受害者”,越高越难被抢占。可以在 spec 里面设置这个值,选出了节点上的受害者以后,通过pickOneNodeForPreemption,主要的依据是拥有最低的最高 pod 权重的节点先被选出来,比如 node1 上 pod 的最高权重是 10,node2 上 pod 的最高权重是 8,那么 node2 被选中,如果有平局的话,最少的“受害者”先选,如果还平局,随机选一个。最后得到一个要被抢占的节点。

自定义调度器的方式

自定义调度器有三种方法。

第一种是通过添加 PredicatePriority 的方式,做微调,这种方式比较简单,只要定义好对应的函数并且通过函数工厂注册就可以。

第二种是使用自定义的调度器,具体的方法可以看官方文档,通过把 pod 的 spec.schedulerName 指向自定义的调度器就可以把调度任务转到自己实现的服务。

第三种是使用 extender,extender 本身和调度器的过程类似,接口是如下定义的,主要是针对一些不算受集群本身控制的资源,需要通过外部调用来进行调度的情况,相关文档在这里

性能测试

目前单机简单的测试条件下,1s 钟可以调度成功 450 左右的 pod,具体的性能参数还要慢慢挖掘。

flannel 是一个中心化的 overlay 容器网络,设计简单,容易理解,对于 k8s 来说,有一个假设:所有容器都可以和集群里任意其他容器或者节点通信,并且通信双方看到的对方的 IP 地址就是实际的地址,主要的好处就是不需要任何的端口映射和地址转换,拥有一张扁平的网络更容易管理,而且由于是基于 Etcd 的中心化的管理,所以对于一些 IP 变化异常频繁的场景来说,比一些去中心化的方案能够较及时同步网络拓扑关系。

IP 地址的管理

flannel 的 IP 地址是通过 Etcd 管理的,在 k8s 初始化的时候指定 pod 大网的网段 --pod-network-cidr=10.244.0.0/16,flanneld 可以直接通过 Etcd 管理,如果启动的时候指定了 --kube-subnet-mgr,可以直接通过 k8s 的 apiserver 来获得一个小网段的租期,通过 kubectl get <NodeName> -o jsonpath='{.spec.podCIDR}' 可以获取对应节点的 CIDR 表示的网段,flannel 是以节点为单元划分小网段的,每个节点上的 pod 在这个例子当中是划分一个 10.244.x.0/24 的网段,所以总共能分配 255 个节点,每个节点上可以分配 253 个 pod。结构如下图所示,每个节点上都会有一个 flanneld 用于管理自己网段的租期。

可以通过在 host 上 cat /run/flannel/subnet.env 查看同步下来的信息,例如:

1
2
3
4
FLANNEL_NETWORK=10.244.0.0/16
FLANNEL_SUBNET=10.244.0.1/24
FLANNEL_MTU=8951
FLANNEL_IPMASQ=true

说明当前节点分配的网段是 10.244.0.1/24。在每个节点上因为已经确定了网段,用 ipam 就可以管理这一范围 ip 地址的分配,所以本身 pod 的 IP 分配和中心 Etcd 没有太多联系。

基本工作原理

简单来说就是通过建立 VXLAN 隧道,通过 UDP 把 IP 封装一层直接送到对应的节点,实现了一个大的 VLAN。没有使用 IPoIP 或者 GRE 主要是因为一些云厂商比如 AWS 的安全策略只能支持 TCP/UDP/ICMP。

flannel 本身会创建一个类似下面这样配置的 CNI bridge 设备。

1
2
3
4
5
6
7
8
9
10
11
12
{
"name" : "cni0",
"type" : "bridge",
"mtu" : 8973,
"ipMasq" : true,
"isGateway" : true,
"ipam" : {
"type" : "host-local",
"subnet" : "10.244.0.1/24",
"routes" : [ { "dst" : "10.244.0.0/16" } ]
}
}

具体的网络拓扑图如下,所用的网段开头是 10.1,但是划分是一致的,图里面的 docker0 应该是 cni0,flannel0 应该是 flannel.1,这个命名的区别主要是不带点的是 UDP 封装,带点的是 vxlan 封装,图片比较早。

第一步,首先是从容器中(10.1.20.3)出来,走桥接去到 cni0 (10.1.20.1),通过brctl show 可以看到 cni0 接了两个容器的 veth。

第二步,然后根据路由规则,在宿主机上能够用ip route,找到一条走到大网段的路由。10.15.0.0/16 dev flannel.1,到达 flannel.1

第三步,flannel.1 会走 vxlan,这个是在内核实现的,如果用 UDP 封装就是在用户态实现的,用户态实现的等于把包从内核过了两遍,没有直接用 vxlan 封装的直接走内核效率高,所以基本上不会使用 UDP 封装。对应的 vxlan 配置可以通过 bridge fdb flannel.1 看到,没有一条隧道就会有一条这样的转发表。因为到达每个对应网段的信息是在 Etcd 上分配的 flannel.1 只要 watch 然后发现有更改的时候对应配置隧道指向对应容器网段的宿主机 IP 就可以。

1
8a:55:a7:e2:e9:18 dev flannel.1 dst 192.168.0.100 self permanent

第四步,走宿主机的网络到达对端,对端的 vxlan 收到以后会拆开封装,丢到协议栈里面。

第五步,根据路由 ip route,中的一条10.1.15.0/24 dev cni0 proto kernel scope link src 10.1.15.1,送到 cni0 然后再转发给容器 10.1.15.2, 最后就能完成 pod 跨界点的互通了。

优缺点

因为整个的网段分配是存在 Etcd 里面的,节点只要 watch 然后根据网段建隧道就可以,相对来说中心化的系统设计比较简单,而且对于 IP 地址变动能够及时反应,特别是节点和容器都有剧烈变化的时候(别问我为啥物理节点会有剧烈变化,创业公司玩法怎么省钱怎么来……),相比于去中心化的一些设计能够更快同步一些。当然建隧道是一个点对点的规模,也就是如果有 n 个节点建隧道的话,每个节点上都要建 n-1 条隧道。

一种改进方式是使用 host-gw 的后端方式,以及 ipvlan,不过目前 ipvlan 还没有支持,这里有一个各种后端实现的比较,vxlan 表现很最差,host-gw 的做法是不用隧道的方式,而是把路由信息直接写到节点上,直接打平到节点上,等于是节点之间是一个大网,每个节点上的小网段通过路由的方式和大网互通,将到达各个节点的网段的路由刷到节点上,而不是建 vxlan 隧道的方式,比如文中的例子,会有这样的路由。

1
2
10.1.15.0/24 via 192.168.0.100 dev eth0 
10.1.15.0/24 dev cni0 proto kernel scope link src 10.1.20.1

然而,由于 flannel 只能够修改各个主机的路由表,一旦主机直接隔了个其他路由设备,比如三层路由器,这个包就会在路由设备上被丢掉。这样一来,host-gw 的模式就只能用于二层直接可达的网络。

一般来说我们把字典树称作 trie,这是一个用于查找字符串的数据结构,举个简单的例子。下面就是一个把 b,abc,abd,bcd,abcd,efg,hii 这 6 个单词构造成字典树的例子。

在路由表中我们可以把子网看作一个二进制字符串,对于 IPv4 来说,字符串的长度就有 32 位。

以上面这个为例子,有编号 0 到 14 这几个二进制的字符串(字符集和只有 0 和 1),右边是他们的具体字符串,这样的字典树有一点像二叉树。如果把它画出来是这样的。

对这个树做压缩有一个办法就是把单节点作压缩,形成下面这个树,这个就是路径压缩,path compressed trie。

我们需要在压缩的地方进行标记,比如 skip 4 那里把一段 1011 压缩掉了。而 LC-trie 指的是 Level Compressed trie,这个树会把层级进行压缩,因为我们得到的这个字典树实际上是一个稀疏树,高度并不是平衡的,所以为了达到平衡,需要做的一件事情是把高度进行压缩,压缩之后变成下面这个形式,这样整棵树就会更加扁平。

这个树是有两种类型的节点,一种是 leaf,保存了路由具体的信息的叶子结点,一种是 trie node(tnode)保存了中间节点,子节点可能是 tnode 或者 leaf。trie 上有几个数据要存储,一个是 bits,这个表示的是子节点的选择度(这个怎么理解呢,就是我接下来的子节点是八叉了,因为我把原来的树压缩了,所以现在不需要二选一,现在直接来个八选一就可以),对于一个八叉的压缩来说,就要有一个 3 位的数字来存储,也就是 log(8),当然也可以能是 7 叉,但是必须是 2 的指数。而 pos 表示的是从那个 bit 开始,它的作用和 skip 类似只不过不是一个相对值,而是一个累加值,表示我们要从哪开始(从字符串的起始位置开始数)。

我们先看一下搜索的代码,其实很简单,就是不断匹配公共前缀直到直到找到叶子节点( bits == 0)。匹配前缀的方式比较 tricky,用异或进行确认。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/* rcu_read_lock needs to be hold by caller from readside */
static struct key_vector *fib_find_node(struct trie *t,
struct key_vector **tp, u32 key)
{
struct key_vector *pn, *n = t->kv;
unsigned long index = 0;

do {
pn = n;
n = get_child_rcu(n, index);

if (!n)
break;

index = get_cindex(key, n);

/* This bit of code is a bit tricky but it combines multiple
* checks into a single check. The prefix consists of the
* prefix plus zeros for the bits in the cindex. The index
* is the difference between the key and this value. From
* this we can actually derive several pieces of data.
* if (index >= (1ul << bits))
* we have a mismatch in skip bits and failed
* else
* we know the value is cindex
*
* This check is safe even if bits == KEYLENGTH due to the
* fact that we can only allocate a node with 32 bits if a
* long is greater than 32 bits.
*/
if (index >= (1ul << n->bits)) {
n = NULL;
break;
}

/* keep searching until we find a perfect match leaf or NULL */
} while (IS_TNODE(n));

*tp = pn;

return n;
}

具体看一下 get_cindex 匹配的方式,#define get_cindex(key, kv) (((key) ^ (kv)->key) >> (kv)->pos),对 tnode 和 被比较的 key 做异或,这个怎么理解呢,看下面的图的例子这是正确匹配以后的结果,灰色代表 0,蓝色代表 1,两个值进行异或的话,首先 pos 会被右移掉,然后 bits 的部分会原样保留,因为 tnode 的这部分都是 0。然后亲址的部分如果完全匹配的话结果就都是 0 ,但是如果不完全匹配的话,结果就会比 index 还要大,因为高位还有 1,所以这就是为什么 index >= (1ul << n->bits 能判断是否匹配的前缀的原因。

然后我们再看一下插入的流程。首先算出匹配到当前节点的子节点(有可能有,有可能没有)。

1
2
3
4
5
6
7
8
9
10
11
12

static int fib_insert_node(struct trie *t, struct key_vector *tp,
struct fib_alias *new, t_key key)
{
struct key_vector *n, *l;

l = leaf_new(key, new);
if (!l)
goto noleaf;

/* retrieve child from parent node */
n = get_child(tp, get_index(key, tp));

如果有子节点,就要创建一个新的 tnode,再把这个 key 给插入。

1
2
3
4
5
6
7
8
/* Case 2: n is a LEAF or a TNODE and the key doesn't match.
*
* Add a new tnode here
* first tnode need some special handling
* leaves us in position for handling as case 3
*/
if (n) {
struct key_vector *tn;

__fls find last set bit,就是找到 pos,然后扩展出有两个选择(2 的 1 次方)的 tnode。

1
2
3
tn = tnode_new(key, __fls(key ^ n->key), 1);
if (!tn)
goto notnode;

设置 tn 的 父节点为 tp,然后把 key 插入到 tn 当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
	/* initialize routes out of node */
NODE_INIT_PARENT(tn, tp);
put_child(tn, get_index(key, tn) ^ 1, n);

/* start adding routes into the node */
put_child_root(tp, key, tn);
node_set_parent(n, tn);

/* parent now has a NULL spot where the leaf can go */
tp = tn;
}

/* Case 3: n is NULL, and will just insert a new leaf */
node_push_suffix(tp, new->fa_slen);
NODE_INIT_PARENT(l, tp);
put_child_root(tp, key, l);

开始进行平衡调整树形。

1
2
3
4
5
6
7
8
	trie_rebalance(t, tp);

return 0;
notnode:
node_free(l);
noleaf:
return -ENOMEM;
}

再看一下树的高度是如何进行调整的,从当前节点一直向上压缩。

1
2
3
4
5
static void trie_rebalance(struct trie *t, struct key_vector *tn)
{
while (!IS_TRIE(tn))
tn = resize(t, tn);
}
1
2
3
4
5
6
#define MAX_WORK 10
static struct key_vector *resize(struct trie *t, struct key_vector *tn)
{
#ifdef CONFIG_IP_FIB_TRIE_STATS
struct trie_use_stats __percpu *stats = t->stats;
#endif

利用 container_of 获取父节点。

1
struct key_vector *tp = node_parent(tn);

获取子节点,初始化 max_work 为 10。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
unsigned long cindex = get_index(tn->key, tp);
int max_work = MAX_WORK;

pr_debug("In tnode_resize %p inflate_threshold=%d threshold=%d\n",
tn, inflate_threshold, halve_threshold);

/* track the tnode via the pointer from the parent instead of
* doing it ourselves. This way we can let RCU fully do its
* thing without us interfering
*/
BUG_ON(tn != get_child(tp, cindex));

/* Double as long as the resulting node has a number of
* nonempty nodes that are above the threshold.
*/

should_inflate 决定要不要压缩的依据是根据动态压缩算法来的(引用4),直观的来说就是高度超过了一个动态计算的阈值,并且还没压缩超过十次就会继续压缩。这个动态阈值的算法是用非空子节点的数目如果超过压缩之后子节点数目的一半就值得压缩。而 inflate 做的事情就把层级压缩一层,也就是把 children 的 children 按照 bits 的匹配放到 parent 的 new_children 当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
	while (should_inflate(tp, tn) && max_work) {
tp = inflate(t, tn);
if (!tp) {
#ifdef CONFIG_IP_FIB_TRIE_STATS
this_cpu_inc(stats->resize_node_skipped);
#endif
break;
}

max_work--;
tn = get_child(tp, cindex);
}

/* update parent in case inflate failed */
tp = node_parent(tn);

/* Return if at least one inflate is run */
if (max_work != MAX_WORK)
return tp;

到这里说明一次调整都没有发生,说明节点很稀疏,也就是把节点分开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
	/* Halve as long as the number of empty children in this
* node is above threshold.
*/
while (should_halve(tp, tn) && max_work) {
tp = halve(t, tn);
if (!tp) {
#ifdef CONFIG_IP_FIB_TRIE_STATS
this_cpu_inc(stats->resize_node_skipped);
#endif
break;
}

max_work--;
tn = get_child(tp, cindex);
}

只有一个孩子,可以进行 path compress,没必要再多一个中间节点。

1
2
3
4
5
6
7
	/* Only one child remains */
if (should_collapse(tn))
return collapse(t, tn);

/* update parent in case halve failed */
return node_parent(tn);
}

整个的 LC-trie 的结构大致如此,主要用于路由表中路由规则的快速匹配,是在 3.6 之后引进的,摒弃了之前用哈希来查找路由表的算法。

引用

  1. The-Art-Of-Programming-By-July
  2. Fast address lookup for Internet routers
  3. LC-trie implementation notes
  4. Implementing a dynamic compressed trie

neighbor 在协议栈里指的是同一个 LAN 下面的邻居,也就是他们在 L3 上通过媒介或者点对点连接在了一起。邻居子系统同时也可以理解为一个 L2 和 L3 地址的转换器。邻居子系统的目的就在于上层协议不应该关心下层的地址信息,我发送过来的 IP 地址应该让下层来决定发送到哪个 MAC 地址。neighbor solicitation and neighbor advertisement,可以分别对应 ARP 的 request 请求和 ARP 的 reply 请求。

邻居子系统主要缓存两大块的内容,一个是 L3 到 L2 的映射解析的缓存,一个是 L2 头的缓存,缓存 L2 头的原因是大部分 L2 的头基本上是重复的,所以通过缓存头部可以加快协议的封装。

以下有几个相关结构体需要介绍一下。

struct neighbour 代表的是一个邻居的信息,比如 L2 和 L3 地址等。

struct neigh_table 代表的是一种邻居协议的接口(比如 ARP)。

struct neigh_params 代表的是邻居协议在每个设备上的不同参数。

struct neigh_ops 邻居对应的一些操作函数。

struct hh_cache 缓存 L2 的头部,不是所有的设备都支持头部缓存。

struct rttablestruct dst_enry, IPv4 的路由缓存信息是通过 struct rttable 缓存的。

这是 dst_entry, hh_cacheneighbour 之间的关系。

neighbor 结构是用 hash 存储的,key 是 L3 地址加设备(对应的 device 结构体)加一个随机值。hash 表通过 neigh_hash_allocneigh_hash_free 来分配和释放。neigh_lookup 就是通过 key 从 hash table 中找到对应的 neighbour 结构体,实现都不复杂,这里理解就可以了。

一般邻居子系统的缓存流程是这样的,如果 L3 的请求到达,地址解析缓存没有命中,把 L3 packet 入队,开始neighbor solicitation 等收到 neighbor advertisement 之后再出队并且发送。

邻居状态信息 NUD(Netowork Unreachablility) 状态

NUD state 本来是 IPv6 协议里的邻居关系的定义,但是在内核中沿用到了 IPv4 里面。

  • NUD_NONE 这个 neighbour 刚开始建立,没有相关状态
  • NUD_INCOMPLETE 发送了获取 L2 地址的请求(通过 ARP 或者其他协议),但是没收到回复,并且之前不存在老缓存
  • NUD_REACHABLE neighbour 地址被缓存了,并且是可达的
  • NUD_FAILED 因为发送地址解析请求失败了,标志邻居不可达
  • NUD_STALE NUD_DELAY NUD_PROBE 是解析请求确认邻居可达的过程中的中间状态
  • NUD_NOARP 标志不需要用解析协议(虽然是 NOARP,但是别的协议也用这个标记),这是一个特殊情况。
  • NUD_PERMANET 标志邻居的地址解析是静态配置的

这些是基本状态,根据这些基本状态组合了几个相对有语义的状态。

  • NUD_VALID 表示该地址会是一个有效的地址

    NUD_PERMANENT NUD_NOARP NUD_REACHABLE NUD_PROBE NUD_STALE NUD_DELAY

  • NUD_CONNECTED 是 NUD_VALID 的子集,去除了待决的中间状态

    NUD_PERMANENT NUD_NOARP NUD_REACHABLE

  • NUD_IN_TIMER 表示在这个状态下正在执行一个定时任务,一般是状态不明了的时候

    NUD_INCOMPLETE NUD_DELAY NUD_PROBE

邻居的可达性可以通过两点来确认,收到了一个地址解析协议的单播回复,或者通过外部信息确认(比如收到了这个邻居的 TCP 报文,当然这个 IP 可能不是自己的邻居,但是可以可以确定对应网关『作为邻居』的可达性)。

确认邻居信息

IP 层会调用ipv4_confirm_neigh 来确认映射地址,如果有 gateway 用 gateway,没有 gateway 开始查缓存。在 net/ipv4/ip_output.c 中的对应代码,就是查缓存的 neighbour 结构体,如果这个结构体不存在的话就要开始confirm 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
neigh = __ipv4_neigh_lookup_noref(dev, nexthop);

if (unlikely(!neigh))
neigh = __neigh_create(&arp_tbl, &nexthop, dev, false);
if (!IS_ERR(neigh)) {
int res;

sock_confirm_neigh(skb, neigh);
res = neigh_output(neigh, skb);

rcu_read_unlock_bh();
return res;
}

当 TCP 收到报文(比如对端的 SYN/ACK 时),这种外部信息说明其实这个节点是可达的(不是来自 gateway),也可以更新缓存。

另外,neigh_connectneigh_suspect 是两个状态转换时会调用的函数。

当 neigh 进入 NUD_REACHABLEneigh_connectneigh->output 的函数指向 connected_output 这个函数,它会在调用 dev_queue_xmit 之前填充 L2 头部,把包直接发出去。

当从 NUD_REACHBLE 转换成 NUD_STALE 或者 NUD_DELAYneigh_suspect 会强制进行可达性的确认,通过把 neighbor->output 指向 neigh_ops->output, 也就是 neigh_resolve_output,它会在调用 dev_queue_xmit 之前先把地址解析出来,等把地址解析完成以后再把缓存的包发送出去。

更新邻居信息

邻居信息更新的入口函数就是 neigh_update,这个函数定义如下。

1
2
int neigh_update(struct neighbour *neigh, const u8 *lladdr, u8 new,
u32 flags, u32 nlmsg_pid)

首先做一些预先检查,如果是管理员配置,原来的配置就不能是 PERMANENT 或者 NOARP 的。

1
2
3
if (!(flags & NEIGH_UPDATE_F_ADMIN) &&
(old & (NUD_NOARP | NUD_PERMANENT)))
goto out;

标记为 dead,然后 goto out

1
2
if (neigh->dead)
goto out;

如果更新为无效的标记的话,删除 timer,并且 supect 一下,如果是 NUD_CONNECTED 状态。
如果是需要标记为失败的 neigh(之前是 INCOMPLETE|NUD_PROBE),则调用 neigh_invalidate,让这个 neigh 无效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (!(new & NUD_VALID)) {
neigh_del_timer(neigh);
if (old & NUD_CONNECTED)
neigh_suspect(neigh);
neigh->nud_state = new;
err = 0;
notify = old & NUD_VALID;
if ((old & (NUD_INCOMPLETE | NUD_PROBE)) &&
(new & NUD_FAILED)) {
neigh_invalidate(neigh);
notify = 1;
}
goto out;
}

接下来是三个条件,一个是 device 没有硬件地址,用 neigh->ha,如果 lladdr 提供了并且老的是有效的,使用老的地址,如果 lladdr 没有提供,直接使用老的地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Compare new lladdr with cached one */
if (!dev->addr_len) {
/* First case: device needs no address. */
lladdr = neigh->ha;
} else if (lladdr) {
/* The second case: if something is already cached
and a new address is proposed:
- compare new & old
- if they are different, check override flag
*/
if ((old & NUD_VALID) &&
!memcmp(lladdr, neigh->ha, dev->addr_len))
lladdr = neigh->ha;
} else {
/* No address is supplied; if we know something,
use it, otherwise discard the request.
*/
err = -EINVAL;
if (!(old & NUD_VALID))
goto out;
lladdr = neigh->ha;
}

如果NUD_CONNECTED 更新 confirm 的时间,更新『更新』的时间。

1
2
3
if (new & NUD_CONNECTED)
neigh->confirmed = jiffies;
neigh->updated = jiffies;

NEIGH_UPDATE_F_OVERRIDE_ISROUTER 标记的是当前 neigh 是一个 router。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* If entry was valid and address is not changed,
do not change entry state, if new one is STALE.
*/
err = 0;
update_isrouter = flags & NEIGH_UPDATE_F_OVERRIDE_ISROUTER;
if (old & NUD_VALID) {
if (lladdr != neigh->ha && !(flags & NEIGH_UPDATE_F_OVERRIDE)) {
update_isrouter = 0;
if ((flags & NEIGH_UPDATE_F_WEAK_OVERRIDE) &&
(old & NUD_CONNECTED)) {
lladdr = neigh->ha;
new = NUD_STALE;
} else
goto out;
} else {
if (lladdr == neigh->ha && new == NUD_STALE &&
!(flags & NEIGH_UPDATE_F_ADMIN))
new = old;
}
}

如果是更新操作,删除老的 timer,如果需要 timer,更新新的 timer,并且设置新状态。

1
2
3
4
5
6
7
8
9
10
11
12
if (new != old) {
neigh_del_timer(neigh);
if (new & NUD_PROBE)
atomic_set(&neigh->probes, 0);
if (new & NUD_IN_TIMER)
neigh_add_timer(neigh, (jiffies +
((new & NUD_REACHABLE) ?
neigh->parms->reachable_time :
0)));
neigh->nud_state = new;
notify = 1;
}

更新 neigh->ha,如果 lladdrneigh->ha 不同的话。

1
2
3
4
5
6
7
8
9
10
11
12
if (lladdr != neigh->ha) {
write_seqlock(&neigh->ha_lock);
memcpy(&neigh->ha, lladdr, dev->addr_len);
write_sequnlock(&neigh->ha_lock);
neigh_update_hhs(neigh);
if (!(new & NUD_CONNECTED))
neigh->confirmed = jiffies -
(NEIGH_VAR(neigh->parms, BASE_REACHABLE_TIME) << 1);
notify = 1;
}
if (new == old)
goto out;

根据状态调用 connect 和 suspect

1
2
3
4
if (new & NUD_CONNECTED)
neigh_connect(neigh);
else
neigh_suspect(neigh);

如果之前老的不是 NUD_VALID,就会把 skb 从 arp_queue,并且释放 arp_queue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
if (!(old & NUD_VALID)) {
struct sk_buff *skb;

/* Again: avoid dead loop if something went wrong */

while (neigh->nud_state & NUD_VALID &&
(skb = __skb_dequeue(&neigh->arp_queue)) != NULL) {
struct dst_entry *dst = skb_dst(skb);
struct neighbour *n2, *n1 = neigh;
write_unlock_bh(&neigh->lock);

rcu_read_lock();

/* Why not just use 'neigh' as-is? The problem is that
* things such as shaper, eql, and sch_teql can end up
* using alternative, different, neigh objects to output
* the packet in the output path. So what we need to do
* here is re-lookup the top-level neigh in the path so
* we can reinject the packet there.
*/
n2 = NULL;
if (dst) {
n2 = dst_neigh_lookup_skb(dst, skb);
if (n2)
n1 = n2;
}
n1->output(n1, skb);
if (n2)
neigh_release(n2);
rcu_read_unlock();

write_lock_bh(&neigh->lock);
}
__skb_queue_purge(&neigh->arp_queue);
neigh->arp_queue_len_bytes = 0;
}

更新 flag,发送 notify 通过 rtnetlink 通知 nlmsg_pid 对应的进程,

1
2
3
4
5
6
7
8
9
10
out:
if (update_isrouter) {
neigh->flags = (flags & NEIGH_UPDATE_F_ISROUTER) ?
(neigh->flags | NTF_ROUTER) :
(neigh->flags & ~NTF_ROUTER);
}
write_unlock_bh(&neigh->lock);

if (notify)
neigh_update_notify(neigh, nlmsg_pid);

这就是邻居信息的更新流程。

当然还有一个缓存项是 L2 header 的缓存这里就简单略过了。

ARP

接下来说一下 ARP 相关的内容,ARP 本身的格式其实很简单。

邻居子系统有一个比较关键的协议就是 ARP,这里简单介绍一下 ARP 协议,我们来看一下 ARP 的格式。

先是两个字节的 L2 类型,然后是两个字节的 L3 类型。一般 L2 是 1 (以太网),L3 是 0x0800 (IPV4),接着跟着的是 L2 类型的长度和 L3 类型的长度,这里 Ethernet 的 MAC 地址是 6 个字节,IPV4 的 IP 地址是 4 个字节,接着是两个字节的操作,1 表示请求,2 表示回复,然后是发送端 L2 的地址,L3 的地址,接着是接收端 L2 和 L3 的地址。整个翻译过来就是『我(MAC 是 0a:00:27:00:00:00 IP 是 192.168.56.1)广而告之(MAC 广播地址),问一下谁知道 192.168.56.102 的地址』。

gARP (gratuitous ARP)

听起来像是一个无理由的 ARP 请求,实际上是一种主动通知的 ARP 请求。gARP 本身不是一个查询请求,而是一个通知请求,主要运用于主动通知 L2 地址改变,重复地址发现(请求解析自己的地址,如果收到回复说明有地址重复)。

还有就是 VIP,一般的作用是在本地网路中有两台机器,一台作为备机,一台作为主机,当主机 failover 的时候,备机可以继续『冒充』主机的 IP 地址,具体的做法就是主动发送请求,解析的 MAC 和 IP 都和 source 一样,老的 server 肯定不会回答这个 ARP,交换机上已经没有这个端口的缓存,会进行广播,让所有的接收者都会更新自己的缓存。也就是发送了一个一去不复返的请求,让所有的邻居更新了自己的 ARP 缓存,从而替代了老 server 的 IP,这就是 VIP 通过 ARP 实现的 failover。

总结

邻居子系统很大一部分的作用就是解析和缓存地址映射,主要是通过 ARP 来完成,而且 ARP 本身也有很多使用的姿势,也就上面说到的 gARP,邻居子系统是沟通路由子系统,以及 L2 和 L3 的桥梁。

参考:

  1. Understanding Linux Network Internals

ip 分片的主体函数在 ip_fragment 当中,重组则在 ip_defrag 当中。第一个分片的标志 Offset 为 0,MF 为 1,之后的分片则是 Offset 非 0,MF 为 1,最后一个分片则是 Offset 非 0,但是 MF 为 0。以此来分别当前的 IP packet 是否是一个分片。从 IP 层向上层协议发送数据包的时候就会进行重组,比如在 ip_local_deliver 当中,调用了。说一句题外话, TCP 有 MSS ,保障 TCP message 不超过分片大小,这样是一种对底层协议有感知的行为。

1
2
3
4
if (ip_is_fragment(ip_hdr(skb))) {
if (ip_defrag(net, skb, IP_DEFRAG_LOCAL_DELIVER))
return 0;
}

IP 分片

ip_is_fragment 对应的条件就是 (iph->frag_off & htons(IP_MF | IP_OFFSET)) != 0;

ip_fragment 当中会碰到几种情况,一种是不需要分片的 IP packet,这种很好,省心,一种是需要分片的 IP packet,这种最操心,还有一种是已经按分片负载的长度分配好了 buffer 只要加个头就相当于分片完成了就也非常棒。要从头开始进行分配的情况属于慢速路径,而已经有 buffer 准好的,直接加个头就完事的属于快速路径,快速路径的内存拷贝代价更低。

ip_fragment 主要检查 IP 是否允许进行分片,不然的话就返回一个 ICMP 错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct iphdr *iph = ip_hdr(skb);

if ((iph->frag_off & htons(IP_DF)) == 0)
return ip_do_fragment(net, sk, skb, output);

if (unlikely(!skb->ignore_df ||
(IPCB(skb)->frag_max_size &&
IPCB(skb)->frag_max_size > mtu))) {
IP_INC_STATS(net, IPSTATS_MIB_FRAGFAILS);
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_FRAG_NEEDED,
htonl(mtu));
kfree_skb(skb);
return -EMSGSIZE;
}

return ip_do_fragment(net, sk, skb, output);

然后进入到 ip_do_fragment 当中。我们先看一下慢速路径是如何处理。

首先知道 IP 头部的长度,已经负载 (left),然后当前的指针,已经链路层需要预留的长度。

1
2
3
4
5
6
7
8
slow_path:
iph = ip_hdr(skb);

left = skb->len - hlen; /* Space per frame */
ptr = hlen; /* Where to start from */

ll_rs = LL_RESERVED_SPACE(rt->dst.dev);

IP 的 offset,以及不是最后一个分片的标志位,这里是进行分片的,不知道为什么要获取一些重组时候需要的数据,TODO。

/*
 *    Fragment the datagram.
 */

offset = (ntohs(iph->frag_off) & IP_OFFSET) << 3;
not_last_frag = iph->frag_off & htons(IP_MF);

调整要分配的 skb_buff 的长度,首先不能超过 mtu,然后最后一段要按 8 对齐。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* Keep copying data until we run out.
*/

while (left > 0) {
len = left;
/* IF: it doesn't fit, use 'mtu' - the data space left */
if (len > mtu)
len = mtu;
/* IF: we are not sending up to and including the packet end
then align the next start on an eight byte boundary */
if (len < left) {
len &= ~7;
}

/* Allocate buffer */
skb2 = alloc_skb(len + hlen + ll_rs, GFP_ATOMIC);
if (!skb2) {
err = -ENOMEM;
goto fail;
}

设置分片的元数据,ip_copy_metadata 会拷贝优先级,协议类型,等辅助信息。然后保留 L2 的头部空间,接着在保留 IP 层的长度,然后设置网络头部,接着设置传输层头部的位置,就是一些初始化的动作。

1
2
3
4
5
6
7
8
9
/*
* Set up data on packet
*/

ip_copy_metadata(skb2, skb);
skb_reserve(skb2, ll_rs);
skb_put(skb2, len + hlen);
skb_reset_network_header(skb2);
skb2->transport_header = skb2->network_header + hlen;

设置对应 sk 为 owner

1
2
3
4
5
6
7
/*
* Charge the memory for the fragment to any owner
* it might possess
*/

if (skb->sk)
skb_set_owner_w(skb2, skb->sk);

拷贝网络层的头部

1
2
3
4
5
/*
* Copy the packet header into the new buffer.
*/

skb_copy_from_linear_data(skb, skb_network_header(skb2), hlen);

然后拷贝真正的负载,这里没有直接用 memcpy 的原因是,对应的空间不一定是连续的,它可能含有 frag_list,甚至是之前检查没有通过的快速路径到达了这里。

1
2
3
4
5
6
/*
* Copy a block of the IP datagram.
*/
if (skb_copy_bits(skb, ptr, skb_transport_header(skb2), len))
BUG();
left -= len;

设置 IP 头的偏移和分片标志。

1
2
3
4
5
6
7
8
/*
* Fill in the new header fields.
*/
iph = ip_hdr(skb2);
iph->frag_off = htons((offset >> 3));

if (IPCB(skb)->flags & IPSKB_FRAG_PMTU)
iph->frag_off |= htons(IP_DF);

如果是第一个分片就尝试更新 IP options。

1
2
3
4
5
6
7
8
/* ANK: dirty, but effective trick. Upgrade options only if
* the segment to be fragmented was THE FIRST (otherwise,
* options are already fixed) and make it ONCE
* on the initial skb, so that all the following fragments
* will inherit fixed options.
*/
if (offset == 0)
ip_options_fragment(skb);

最后修改位移,更新标记位,计算 checksum,然后送到 output。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* Added AC : If we are fragmenting a fragment that's not the
* last fragment then keep MF on each bit
*/
if (left > 0 || not_last_frag)
iph->frag_off |= htons(IP_MF);
ptr += len;
offset += len;

/*
* Put this fragment into the sending queue.
*/
iph->tot_len = htons(len + hlen);

ip_send_check(iph);

err = output(net, sk, skb2);
if (err)
goto fail;

IP_INC_STATS(net, IPSTATS_MIB_FRAGCREATES);

这个就是慢速路径的分片过程,快速路径的分片过程其实更简单,因为比较麻烦的事情已经在 ip_append_data 里面处理过了,在我上一篇文章里面有介绍这个过程,就是在上层调用 ip_append_data 的时候,会在主动的进行分段式的缓存,而不使用连续空间,每个分段式的换粗也会不超过分片的大小,这样每个缓存就可以直接用来做分片了。

现在再回头看快速路径,快速路径主要检查有没有 frag_list 也就是之前分配好的 buffer 列表。获取第一个 buffer (存在 frags 里面,不是 frag_list)的长度,如果比 mtu 大,或者不是 8 的倍数,或者已经是分段了,或者是一段 shared skb_buff (因为快速路径不会拷贝内存,慢速路径会会分配新的内存,不影响之前有人引用)都不行,要进入慢速路径。

1
2
3
4
5
6
7
8
9
10
if (skb_has_frag_list(skb)) {
struct sk_buff *frag, *frag2;
unsigned int first_len = skb_pagelen(skb);

if (first_len - hlen > mtu ||
((first_len - hlen) & 7) ||
ip_is_fragment(iph) ||
skb_cloned(skb))
goto slow_path;

首先保证每个 frag_list 里面 frag 不超过 mtu,然后不是最后一段需要是 8 的倍数,有足够的头部空间用来给新的 IP 分片用,然后 frag 的 buffer 也不能是 shared,最后绑定 sk 关系,减掉 skb 的 truesize。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
skb_walk_frags(skb, frag) {
/* Correct geometry. */
if (frag->len > mtu ||
((frag->len & 7) && frag->next) ||
skb_headroom(frag) < hlen)
goto slow_path_clean;

/* Partially cloned skb? */
if (skb_shared(frag))
goto slow_path_clean;

BUG_ON(frag->sk);
if (skb->sk) {
frag->sk = skb->sk;
frag->destructor = sock_wfree;
}
skb->truesize -= frag->truesize;
}

到这里就可以真的开始分片了,初始化头部信息,以及要用来分片的 frag。

1
2
3
4
5
6
7
8
9
err = 0;
offset = 0;
frag = skb_shinfo(skb)->frag_list;
skb_frag_list_init(skb);
skb->data_len = first_len - skb_headlen(skb);
skb->len = first_len;
iph->tot_len = htons(first_len);
iph->frag_off = htons(IP_MF);
ip_send_check(iph);

这个循环里面做的事情就更简单了,比起慢速路径来说,就是给每个原本没有头部的 buffer,加上头部变成真正的 fragment。保留空间,设置网络层头部,拷贝头部memcpy(skb_network_header(frag), iph, hlen);,拷贝原信息,如果是第一个分片更新 options,然后更新标记位,然后送到 output。直到 frag_list 被循环完,这就大功告成了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
for (;;) {
/* Prepare header of the next frame,
* before previous one went down. */
if (frag) {
frag->ip_summed = CHECKSUM_NONE;
skb_reset_transport_header(frag);
__skb_push(frag, hlen);
skb_reset_network_header(frag);
memcpy(skb_network_header(frag), iph, hlen);
iph = ip_hdr(frag);
iph->tot_len = htons(frag->len);
ip_copy_metadata(frag, skb);
if (offset == 0)
ip_options_fragment(frag);
offset += skb->len - hlen;
iph->frag_off = htons(offset>>3);
if (frag->next)
iph->frag_off |= htons(IP_MF);
/* Ready, complete checksum */
ip_send_check(iph);
}

err = output(net, sk, skb);

if (!err)
IP_INC_STATS(net, IPSTATS_MIB_FRAGCREATES);
if (err || !frag)
break;

skb = frag;
frag = skb->next;
skb->next = NULL;
}

IP 重组

重组一般发生在向上层协议栈传输的时候,不过有的路由器也有可能进行重组,可能要对整个 IP packet 进行校验等,一般情况下,转发不太会对 IP 进行重组。IP 重组讲起来也有些麻烦。

每个正在被重组的 IP packet 都会用一个 ipq 表示,这个 ipq 使用的是 hash table (inet_frags->hash) 的搜索结构,没有 ipq 由 源地址,目的地址,协议和 ID 确定,所以存在重复的可能。ip_defrag依赖两个函数一个是ip_find用于寻找 ipq 如果没有找到的话会自动创建一个,其次是用于入队的 ip_frag_queue ,进行重组的工作。sk_buff->cb 用于保存当前的 offset。对于分片的重组也会有超时机制,防止一个 ipq 停留太长的时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* Process an incoming IP datagram fragment. */
int ip_defrag(struct net *net, struct sk_buff *skb, u32 user)
{
struct net_device *dev = skb->dev ? : skb_dst(skb)->dev;
int vif = l3mdev_master_ifindex_rcu(dev);
struct ipq *qp;

__IP_INC_STATS(net, IPSTATS_MIB_REASMREQDS);
skb_orphan(skb);

/* Lookup (or create) queue header */
qp = ip_find(net, ip_hdr(skb), user, vif);
if (qp) {
int ret;

spin_lock(&qp->q.lock);

ret = ip_frag_queue(qp, skb);

spin_unlock(&qp->q.lock);
ipq_put(qp);
return ret;
}

__IP_INC_STATS(net, IPSTATS_MIB_REASMFAILS);
kfree_skb(skb);
return -ENOMEM;
}

ip_find 主要两个功能,根据原信息计算 hash 值,从net->ipv4.frags 的 hash 表当中寻找到对应的 ipq

1
2
3
4
hash = ipqhashfn(iph->id, iph->saddr, iph->daddr, iph->protocol);

q = inet_frag_find(&net->ipv4.frags, &ip4_frags, &arg, hash);

然后进入到 ip_frag_queue 当中首先检查,如果出现错误,就把 ipq 标记为可以被之后的垃圾回收清扫。

1
2
3
4
5
6
7
8
9
10
if (qp->q.flags & INET_FRAG_COMPLETE)
goto err;

if (!(IPCB(skb)->flags & IPSKB_FRAG_COMPLETE) &&
unlikely(ip_frag_too_far(qp)) &&
unlikely(err = ip_frag_reinit(qp))) {
ipq_kill(qp);
goto err;
}

获取 offset,flags 和头部。

1
2
3
4
5
6
7
ecn = ip4_frag_ecn(ip_hdr(skb)->tos);
offset = ntohs(ip_hdr(skb)->frag_off);
flags = offset & ~IP_OFFSET;
offset &= IP_OFFSET;
offset <<= 3; /* offset is in 8-byte chunks */
ihl = ip_hdrlen(skb);

计算这个追加的 fragment 会拷贝的位置的末尾在哪。

1
2
3
4
/* Determine the position of this fragment. */
end = offset + skb->len - skb_network_offset(skb) - ihl;
err = -EINVAL;

如果是最后一个 fragment,那么不应该超过 q.len,或者已经有了最后一个了,但是 endq.len 不一致,所以有一些 corruption。如果检查没问题,就更新q.flasg 标记为最后一个和把 end 赋值给q.len

1
2
3
4
5
6
7
8
9
10
11
/* Is this the final fragment? */
if ((flags & IP_MF) == 0) {
/* If we already have some bits beyond end
* or have different end, the segment is corrupted.
*/
if (end < qp->q.len ||
((qp->q.flags & INET_FRAG_LAST_IN) && end != qp->q.len))
goto err;
qp->q.flags |= INET_FRAG_LAST_IN;
qp->q.len = end;

如果不是最后一个,长度要与 8 对齐,然后更新 q.len

1
2
3
4
5
6
7
8
9
10
11
12
13
} else {
if (end&7) {
end &= ~7;
if (skb->ip_summed != CHECKSUM_UNNECESSARY)
skb->ip_summed = CHECKSUM_NONE;
}
if (end > qp->q.len) {
/* Some bits beyond end -> corruption. */
if (qp->q.flags & INET_FRAG_LAST_IN)
goto err;
qp->q.len = end;
}

剩下的就是从链表 q.fragments 当中中根据offset 寻找到要插入的位置,会先看一下表尾,再进行遍历。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
if (end == offset)
goto err;

err = -ENOMEM;
if (!pskb_pull(skb, skb_network_offset(skb) + ihl))
goto err;

err = pskb_trim_rcsum(skb, end - offset);
if (err)
goto err;

/* Find out which fragments are in front and at the back of us
* in the chain of fragments so far. We must know where to put
* this fragment, right?
*/
prev = qp->q.fragments_tail;
if (!prev || FRAG_CB(prev)->offset < offset) {
next = NULL;
goto found;
}
prev = NULL;
for (next = qp->q.fragments; next != NULL; next = next->next) {
if (FRAG_CB(next)->offset >= offset)
break; /* bingo! */
prev = next;
}

如果和前面的分组有重叠,就把重叠的部分去掉,CHECKSUM_NONE 可以使当前的校验和失效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (prev) {
int i = (FRAG_CB(prev)->offset + prev->len) - offset;

if (i > 0) {
offset += i;
err = -EINVAL;
if (end <= offset)
goto err;
err = -ENOMEM;
if (!pskb_pull(skb, i))
goto err;
if (skb->ip_summed != CHECKSUM_UNNECESSARY)
skb->ip_summed = CHECKSUM_NONE;
}
}

然后向后检查有没有重叠,并且把重叠的部分去掉,如果重叠的部分比 next 本身还要大,直接把 next 删掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
while (next && FRAG_CB(next)->offset < end) {
int i = end - FRAG_CB(next)->offset; /* overlap is 'i' bytes */

if (i < next->len) {
/* Eat head of the next overlapped fragment
* and leave the loop. The next ones cannot overlap.
*/
if (!pskb_pull(next, i))
goto err;
FRAG_CB(next)->offset += i;
qp->q.meat -= i;
if (next->ip_summed != CHECKSUM_UNNECESSARY)
next->ip_summed = CHECKSUM_NONE;
break;
} else {
struct sk_buff *free_it = next;

/* Old fragment is completely overridden with
* new one drop it.
*/
next = next->next;

if (prev)
prev->next = next;
else
qp->q.fragments = next;

qp->q.meat -= free_it->len;
sub_frag_mem_limit(qp->q.net, free_it->truesize);
kfree_skb(free_it);
}
}

剩下的就是插入链表,并且更新 ipq 的信息了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
skb->next = next;
if (!next)
qp->q.fragments_tail = skb;
if (prev)
prev->next = skb;
else
qp->q.fragments = skb;

dev = skb->dev;
if (dev) {
qp->iif = dev->ifindex;
skb->dev = NULL;
}
qp->q.stamp = skb->tstamp;
qp->q.meat += skb->len;
qp->ecn |= ecn;
add_frag_mem_limit(qp->q.net, skb->truesize);
if (offset == 0)
qp->q.flags |= INET_FRAG_FIRST_IN;

fragsize = skb->len + ihl;

if (fragsize > qp->q.max_size)
qp->q.max_size = fragsize;

if (ip_hdr(skb)->frag_off & htons(IP_DF) &&
fragsize > qp->max_df_size)
qp->max_df_size = fragsize;

然后如果,第一个包和最后一个包都收齐了的话,就尝试进行重组。

1
2
3
4
5
6
7
8
9
if (qp->q.flags == (INET_FRAG_FIRST_IN | INET_FRAG_LAST_IN) &&
qp->q.meat == qp->q.len) {
unsigned long orefdst = skb->_skb_refdst;

skb->_skb_refdst = 0UL;
err = ip_frag_reasm(qp, prev, dev);
skb->_skb_refdst = orefdst;
return err;
}

另外垃圾回收的过程,就是在内存超过阈值的时候,把超时的 ipq 从 hash 表当中剔除。内存阈值通过 ip_frag_mem获取。

1
2
3
4
int ip_frag_mem(struct net *net)
{
return sum_frag_mem_limit(&net->ipv4.frags);
}

总结

IP 分片与重组的整体流程大致如此,IP 面临的覆盖的现象,是由于不同的 packet 但是 hash 元素一样导致的。另一方面重叠处理一个是防止出现重叠包攻击导致内存溢出。还有就是具体的校验过程会丢给上层的协议来控制。