ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

kube-controller-manager 分析

kube-controller-manager 可以认为是一个守护进程用于监视 apiserver 暴露的集群状态,并且不断地尝试把当前状态向集群的目标状态迁移。为了避免频繁查询 apiserver,apiserver 提供了 watch 接口用于监视资源的增加删除和更新,client-go 对此作了抽象,封装一层 informer 来表示本地 apiserver 状态的 cache。这个视频 有一个 Google 工程师讲解的 client-go 的详细内容,这篇 七牛前同事的文章介绍了 informer 的整体结构,写得也很好。

client 当中的 controller

处理事件的 controller 由几部分构成,首先是 Config 当中的可配置部分,下图是 controller 的关系,controller 实现了 Controller 接口。

controller 从 Queue 当中通过 Pop 获取对象交给 Process 回调处理,DeltaFIFO 和 FIFO 是类似的,只是 DeltaFIFO 可以处理删除事件,一般都用 DeltaFIFO。ListerWatcher 就是用客户端构造出来的,针对对应资源的 List Watch 方法的集合,List 用于获取最开始的对象获取,Watch 用于监控之后的变化,所有最开始的时候现有的对象会通过 List 传给 Add 回调,同步了当前状态以后再不断接受新的变化,但是 Watch 本身是有超时机制的,不能永久监听,所以再超时之后还会通过 List 方法,先同步一次再进行删除操作。Resync Period 表示把 cache 中的对象重新入队给回调函数处理,这种情况一般是可能你可能漏掉了更细操作,或者是之前的一些失败了。大部分情况用不到这个选项,可以非常相信 etcd 的功能。

调用路径

1
2
3
4
5
cache.NewListWatchFromClient -> listWatcher
cache.NewIndexer -> store
cache.NewLister(store) -> lister
cache.NewReflector
go refector.Run()

还有一个关键结构是 reflector,reflector 会把对象转化成对应的需要的对象并且 Add 到 Queue 当中去。

Informer 本身的框架是异步的,所以为了做并发控制就引入了 workqueue 的组件,workqueue 有 rate limit 的功能,并且能够合并更新操作。

注意不要修改传入的对象,因为他们要和 cache 一致,如果要写对象的话,需要使用 api.Scheme.Copy 这个函数,进行深度拷贝,所有的 k8s Object 都要支持深拷贝的方法。

kube-controller 当中的插件式 controller

k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go 中的NewControllerInitializers 函数有大部分 controller 的列表,bootstrapsignertokencleaner 是默认关闭的。

1
2
3
4
var ControllersDisabledByDefault = sets.NewString(
"bootstrapsigner",
"tokencleaner",
)

需要特殊初始化的是 serviceaccount-tok,另外的是 NewControllerInitializers 当中的 controller 了。所有的 controller 初始化函数都要满足如下接口。

1
2
3
4
// InitFunc is used to launch a particular controller.  It may run additional "should I activate checks".
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type InitFunc func(ctx ControllerContext) (bool, error)

node controller

接下来看一个具体的 controller,startNodeController

startNodeController 首先解析 ClusterCIDRServiceCIDR 两个子网范围,下面是 NodeController 初始化需要的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
nodeController, err := nodecontroller.NewNodeController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Extensions().V1beta1().DaemonSets(),
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.Options.PodEvictionTimeout.Duration,
ctx.Options.NodeEvictionRate,
ctx.Options.SecondaryNodeEvictionRate,
ctx.Options.LargeClusterSizeThreshold,
ctx.Options.UnhealthyZoneThreshold,
ctx.Options.NodeMonitorGracePeriod.Duration,
ctx.Options.NodeStartupGracePeriod.Duration,
ctx.Options.NodeMonitorPeriod.Duration,
clusterCIDR,
serviceCIDR,
int(ctx.Options.NodeCIDRMaskSize),
ctx.Options.AllocateNodeCIDRs,
ipam.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
ctx.Options.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
)

InformerFactory 是用来构造具体 resource 的 informer 的工厂类型,构造了 pods, nodes, daemonsets 的 informer, 说明 node controller 需要 watch 这几种 resource 的变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

glog.Infof("Starting node controller")
defer glog.Infof("Shutting down node controller")

if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}

// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)

if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}

if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
}

<-stopCh
}

现在看一下运行的时候是如何做的,首先要调用 controller.WaitForCacheSync 等待 node,pod,daemonSet 的 inofrmer 同步,这是因为在 kube-controller-manager 当中使用的子 controller 使用的 informer 都是共享型的,也就是多个 controller 之间共享一个 informer 的 cache,所以在开始的时候需要保证所有的 sharedInformerFactory 创建的 informers 之间的 cache 先等待一次一致。controller 的读基本上是从 cache 读的,只要写才会打到 etcd 里面,然后等待 cache 的更新回调。

nodeController 主要分成两部分,一部分是 monitorNodeStatus , 它首先从 informer 的 cache 当中 list 新添加的节点和删除的节点,和 newZoneRepresentations。node 是分 zone 的,这在单可用区的 cluster 当中是个空字符串,但是如果 labels 中有 failure-domain.beta.kubernetes.io/zonefailure-domain.beta.kubernetes.io/region 就会构成不同的可用区的划分, 这个适用于仅仅在一家云厂商分不同可用区的时候可以用到。这篇文档 描述了多 zone 的 cluster 的内容,没有 zone 有相应的可用状态,如果某个 zone 变成不可用需要把 pod 从这个 zone 当中剔除,所以 pod 的 failover 是以 zone 为单位的。

处理 node 比较啰嗦,tryUpdateNodeStatus 尝试获取当前的 conditions 更新并且获取 conditions。处理 node 过程主要是标记 node 为不可用的 node,或者把不可用的状态恢复过来。

接下来的就是处理 pod eviction 的部分,另外,这篇文档解释了一些 kubelet 支持的资源耗尽的情况下 kubelet 的剔除策略。

taintManager.Run 处理 taint 的逻辑,这篇文档 解释了 taint 和 toleration 的关系,以及基于 taint 的 eviction 策略。首先看 node 的更新,然后把上面不能 tolerate 的 pod 传给 handlePodUpdate,然后 pod 有更新也会 handlePodUpdate,在 pod 更新的时间中会让 node 抢占一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
for {
select {
case <-stopCh:
break
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
case podUpdate := <-tc.podUpdateChannel:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
}
}

最后是 eviction 的部分,基于 eviction 的会把 pod 直接删除,基于 taint 只是打上标记,然后通过上面的 tiantManager 剔除。和 pod eviction 不同,taint eviction 是通过限制加 taint 的速率控制 raltelimit 的。

podGCController

pod GC Controller 只 watch pod 一种资源,比较简单。

k8s.io/kubernetes/pkg/controller/podgc/gc_controller.go 下。

gcc.gcOrphaned 删除 node 不存在的 pod,gcc.gcUnscheduledTerminating 删除正在终止,但是没有调度的 pod,gcc.gcTerminated 删除已经被终止的 pod。

其他 controller

其他 controller 也是类似的,从 apiserver 获取状态,并且向对应的状态迁移,这也是为什么 kubernetes 的命令和资源都是宣告式的原因。