CUDA 的内存模型和 CPU 是类似的也是多级的结构。线程有局部内存,块之间有全局内存。GPU 和 CPU 都是主要采用 DRAM 来做主存,CPU 的一级缓存会用 SRAM 做。CPU 的多级缓存对用户来说不是很需要考虑,尽量屏蔽了其中的细节,但是 GPU 相对来说会把这种分级结构暴露给用户,这对编程来说也是一种新的挑战。
后面
用 GPU 计算一些计算密集型的程序,速度是真快,比 CPU 块很多,所以这也是为什么大量深度学习的应用都要通过 GPU 加速的原因。
// InitFunc is used to launch a particular controller. It may run additional "should I activate checks". // Any error returned will cause the controller process to `Fatal` // The bool indicates whether the controller was enabled. type InitFunc func(ctx ControllerContext) (bool, error)
// Run starts an asynchronous loop that monitors the status of cluster nodes. func(nc *Controller) Run(stopCh <-chanstruct{}) { defer utilruntime.HandleCrash()
glog.Infof("Starting node controller") defer glog.Infof("Shutting down node controller")
if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { return }
// Incorporate the results of node status pushed from kubelet to master. go wait.Until(func() { if err := nc.monitorNodeStatus(); err != nil { glog.Errorf("Error monitoring node status: %v", err) } }, nc.nodeMonitorPeriod, wait.NeverStop)
if nc.runTaintManager { go nc.taintManager.Run(wait.NeverStop) }
if nc.useTaintBasedEvictions { // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop) } else { // Managing eviction of nodes: // When we delete pods off a node, if the node was not empty at the time we then // queue an eviction watcher. If we hit an error, retry deletion. go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop) }
nodeController 主要分成两部分,一部分是 monitorNodeStatus , 它首先从 informer 的 cache 当中 list 新添加的节点和删除的节点,和 newZoneRepresentations。node 是分 zone 的,这在单可用区的 cluster 当中是个空字符串,但是如果 labels 中有 failure-domain.beta.kubernetes.io/zone 和 failure-domain.beta.kubernetes.io/region 就会构成不同的可用区的划分, 这个适用于仅仅在一家云厂商分不同可用区的时候可以用到。这篇文档 描述了多 zone 的 cluster 的内容,没有 zone 有相应的可用状态,如果某个 zone 变成不可用需要把 pod 从这个 zone 当中剔除,所以 pod 的 failover 是以 zone 为单位的。
for { select { case <-stopCh: break case nodeUpdate := <-tc.nodeUpdateChannel: tc.handleNodeUpdate(nodeUpdate) case podUpdate := <-tc.podUpdateChannel: // If we found a Pod update we need to empty Node queue first. priority: for { select { case nodeUpdate := <-tc.nodeUpdateChannel: tc.handleNodeUpdate(nodeUpdate) default: break priority } } // After Node queue is emptied we process podUpdate. tc.handlePodUpdate(podUpdate) } }
最后是 eviction 的部分,基于 eviction 的会把 pod 直接删除,基于 taint 只是打上标记,然后通过上面的 tiantManager 剔除。和 pod eviction 不同,taint eviction 是通过限制加 taint 的速率控制 raltelimit 的。
路径上整体分成 group, version, resource, 作为核心 API group 的 core(包括 pod, node 之类的 resource),不带 group,直接接在 /api/ 后面,其他的 api group 则接在 /apis 后面。以 pod 为例,pod 对应的数据类型如下,这个数据结构和 POST 请求中的结构的参数是一致的。
如果是 job 的话则是在,pkg/apis/batch/v2alpha1/types.go,和 API 路径是对应的。例子当中 kubectl 加上 level 大于 8 的 log 就会打印请求和相应的 body,可以看到 request body 和上面的数据结构是一致的。这个请求会发送到 apiserver 进行处理并且返回存储之后的 pod。
重要结构体
Config
父结构,主要的配置内容,其中有一个结构 RESTOptionsGetter genericregistry.RESTOptionsGetter 是和 API 初始化相关的,这个接口的实现是在 k8s.io/apiserver/pkg/server/options/etcd.go 中的 storageFactoryRestOptionsFactory 实现的,对应的实现函数是
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used // by specifying flag. factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities()) // Cluster autoscaler friendly scheduling algorithm. factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, defaultPredicates(), copyAndReplace(defaultPriorities(), "LeastRequestedPriority", "MostRequestedPriority"))
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods // onto machines. type ScheduleAlgorithm interface { Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error) // Preempt receives scheduling errors for a pod and tries to create room for // the pod by preempting lower priority pods if possible. // It returns the node where preemption happened, a list of preempted pods, and error if any. Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error) // Predicates() returns a pointer to a map of predicate functions. This is // exposed for testing. Predicates() map[string]FitPredicate // Prioritizers returns a slice of priority config. This is exposed for // testing. Prioritizers() []PriorityConfig }
抢占过程是在 pod 没有找到合适的节点情况下,如果能在踢出一个 pod 获得调度机会的情况下进行抢占。抢占算是一个比较新的特性,在 1.8 里面都是默认关掉的,要打开的话需要指定kube-scheduler 的 --feature-gates=PodPriority=true 还有 apiserver 的 --runtime-config=scheduling.k8s.io/v1alpha1=true。可以通过添加 PriorityClass 把 pod 分权重,现在这个特性算是给 pod 也加上的权重。
1 2 3 4 5 6 7 8
apiVersion:v1 kind:PriorityClass metadata: name:high-priority value:1000000 globalDefault:false description:"This priority class should be used for XYZ service pods only."
然后可以在spec当中指定 priorityClassName: high-priority,这样这么大的权重,这个 pod 就很难被抢占了,具体流程如下图。
podEligibleToPreemptOthers 主要判断如果 目标 pod 被标记为(通过 pod 的 annotation 标记)已经要抢占其他 pod,并且有一个优先级小于 目标 pod 的 pod 即将被删除 (p.DeletionTimestamp != nil ),就直接退出,因为这个时候这个被删除的 pod 其实在为目标 pod 腾出空间了,在下次调度的时候就会获得调度机会。nodesWherePreemptionMightHelp,类似于 schedule 的时候的 predicate 阶段,只不过多了一步是通过尝试移除 pod 跑一遍 predicates 看看这个节点能不能被通过。 selectNodesForPreemption 则和 priority 的阶段类似,把删除 pod 之后的可以通过的节点进行排序选出一个排名最高的节点。再通过 selectVictimsOnNode 把节点上的 pod 按照节点的 priority 排序选出“受害者”,越高越难被抢占。可以在 spec 里面设置这个值,选出了节点上的受害者以后,通过pickOneNodeForPreemption,主要的依据是拥有最低的最高 pod 权重的节点先被选出来,比如 node1 上 pod 的最高权重是 10,node2 上 pod 的最高权重是 8,那么 node2 被选中,如果有平局的话,最少的“受害者”先选,如果还平局,随机选一个。最后得到一个要被抢占的节点。
/* rcu_read_lock needs to be hold by caller from readside */ staticstruct key_vector *fib_find_node(struct trie *t, struct key_vector **tp, u32 key) { structkey_vector *pn, *n = t->kv; unsignedlong index = 0;
do { pn = n; n = get_child_rcu(n, index);
if (!n) break;
index = get_cindex(key, n);
/* This bit of code is a bit tricky but it combines multiple * checks into a single check. The prefix consists of the * prefix plus zeros for the bits in the cindex. The index * is the difference between the key and this value. From * this we can actually derive several pieces of data. * if (index >= (1ul << bits)) * we have a mismatch in skip bits and failed * else * we know the value is cindex * * This check is safe even if bits == KEYLENGTH due to the * fact that we can only allocate a node with 32 bits if a * long is greater than 32 bits. */ if (index >= (1ul << n->bits)) { n = NULL; break; }
/* keep searching until we find a perfect match leaf or NULL */ } while (IS_TNODE(n));
/* retrieve child from parent node */ n = get_child(tp, get_index(key, tp));
如果有子节点,就要创建一个新的 tnode,再把这个 key 给插入。
1 2 3 4 5 6 7 8
/* Case 2: n is a LEAF or a TNODE and the key doesn't match. * * Add a new tnode here * first tnode need some special handling * leaves us in position for handling as case 3 */ if (n) { structkey_vector *tn;
__fls find last set bit,就是找到 pos,然后扩展出有两个选择(2 的 1 次方)的 tnode。
/* track the tnode via the pointer from the parent instead of * doing it ourselves. This way we can let RCU fully do its * thing without us interfering */ BUG_ON(tn != get_child(tp, cindex));
/* Double as long as the resulting node has a number of * nonempty nodes that are above the threshold. */
should_inflate 决定要不要压缩的依据是根据动态压缩算法来的(引用4),直观的来说就是高度超过了一个动态计算的阈值,并且还没压缩超过十次就会继续压缩。这个动态阈值的算法是用非空子节点的数目如果超过压缩之后子节点数目的一半就值得压缩。而 inflate 做的事情就把层级压缩一层,也就是把 children 的 children 按照 bits 的匹配放到 parent 的 new_children 当中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
while (should_inflate(tp, tn) && max_work) { tp = inflate(t, tn); if (!tp) { #ifdef CONFIG_IP_FIB_TRIE_STATS this_cpu_inc(stats->resize_node_skipped); #endif break; }
max_work--; tn = get_child(tp, cindex); }
/* update parent in case inflate failed */ tp = node_parent(tn);
/* Return if at least one inflate is run */ if (max_work != MAX_WORK) return tp;
到这里说明一次调整都没有发生,说明节点很稀疏,也就是把节点分开。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Halve as long as the number of empty children in this * node is above threshold. */ while (should_halve(tp, tn) && max_work) { tp = halve(t, tn); if (!tp) { #ifdef CONFIG_IP_FIB_TRIE_STATS this_cpu_inc(stats->resize_node_skipped); #endif break; }
max_work--; tn = get_child(tp, cindex); }
只有一个孩子,可以进行 path compress,没必要再多一个中间节点。
1 2 3 4 5 6 7
/* Only one child remains */ if (should_collapse(tn)) return collapse(t, tn);
/* update parent in case halve failed */ return node_parent(tn); }
/* Compare new lladdr with cached one */ if (!dev->addr_len) { /* First case: device needs no address. */ lladdr = neigh->ha; } elseif (lladdr) { /* The second case: if something is already cached and a new address is proposed: - compare new & old - if they are different, check override flag */ if ((old & NUD_VALID) && !memcmp(lladdr, neigh->ha, dev->addr_len)) lladdr = neigh->ha; } else { /* No address is supplied; if we know something, use it, otherwise discard the request. */ err = -EINVAL; if (!(old & NUD_VALID)) goto out; lladdr = neigh->ha; }
如果NUD_CONNECTED 更新 confirm 的时间,更新『更新』的时间。
1 2 3
if (new & NUD_CONNECTED) neigh->confirmed = jiffies; neigh->updated = jiffies;
/* If entry was valid and address is not changed, do not change entry state, if new one is STALE. */ err = 0; update_isrouter = flags & NEIGH_UPDATE_F_OVERRIDE_ISROUTER; if (old & NUD_VALID) { if (lladdr != neigh->ha && !(flags & NEIGH_UPDATE_F_OVERRIDE)) { update_isrouter = 0; if ((flags & NEIGH_UPDATE_F_WEAK_OVERRIDE) && (old & NUD_CONNECTED)) { lladdr = neigh->ha; new = NUD_STALE; } else goto out; } else { if (lladdr == neigh->ha && new == NUD_STALE && !(flags & NEIGH_UPDATE_F_ADMIN)) new = old; } }
如果是更新操作,删除老的 timer,如果需要 timer,更新新的 timer,并且设置新状态。
1 2 3 4 5 6 7 8 9 10 11 12
if (new != old) { neigh_del_timer(neigh); if (new & NUD_PROBE) atomic_set(&neigh->probes, 0); if (new & NUD_IN_TIMER) neigh_add_timer(neigh, (jiffies + ((new & NUD_REACHABLE) ? neigh->parms->reachable_time : 0))); neigh->nud_state = new; notify = 1; }
更新 neigh->ha,如果 lladdr 和 neigh->ha 不同的话。
1 2 3 4 5 6 7 8 9 10 11 12
if (lladdr != neigh->ha) { write_seqlock(&neigh->ha_lock); memcpy(&neigh->ha, lladdr, dev->addr_len); write_sequnlock(&neigh->ha_lock); neigh_update_hhs(neigh); if (!(new & NUD_CONNECTED)) neigh->confirmed = jiffies - (NEIGH_VAR(neigh->parms, BASE_REACHABLE_TIME) << 1); notify = 1; } if (new == old) goto out;
根据状态调用 connect 和 suspect
1 2 3 4
if (new & NUD_CONNECTED) neigh_connect(neigh); else neigh_suspect(neigh);
/* Why not just use 'neigh' as-is? The problem is that * things such as shaper, eql, and sch_teql can end up * using alternative, different, neigh objects to output * the packet in the output path. So what we need to do * here is re-lookup the top-level neigh in the path so * we can reinject the packet there. */ n2 = NULL; if (dst) { n2 = dst_neigh_lookup_skb(dst, skb); if (n2) n1 = n2; } n1->output(n1, skb); if (n2) neigh_release(n2); rcu_read_unlock();
还有就是 VIP,一般的作用是在本地网路中有两台机器,一台作为备机,一台作为主机,当主机 failover 的时候,备机可以继续『冒充』主机的 IP 地址,具体的做法就是主动发送请求,解析的 MAC 和 IP 都和 source 一样,老的 server 肯定不会回答这个 ARP,交换机上已经没有这个端口的缓存,会进行广播,让所有的接收者都会更新自己的缓存。也就是发送了一个一去不复返的请求,让所有的邻居更新了自己的 ARP 缓存,从而替代了老 server 的 IP,这就是 VIP 通过 ARP 实现的 failover。
while (left > 0) { len = left; /* IF: it doesn't fit, use 'mtu' - the data space left */ if (len > mtu) len = mtu; /* IF: we are not sending up to and including the packet end then align the next start on an eight byte boundary */ if (len < left) { len &= ~7; }
/* * Copy a block of the IP datagram. */ if (skb_copy_bits(skb, ptr, skb_transport_header(skb2), len)) BUG(); left -= len;
设置 IP 头的偏移和分片标志。
1 2 3 4 5 6 7 8
/* * Fill in the new header fields. */ iph = ip_hdr(skb2); iph->frag_off = htons((offset >> 3));
if (IPCB(skb)->flags & IPSKB_FRAG_PMTU) iph->frag_off |= htons(IP_DF);
如果是第一个分片就尝试更新 IP options。
1 2 3 4 5 6 7 8
/* ANK: dirty, but effective trick. Upgrade options only if * the segment to be fragmented was THE FIRST (otherwise, * options are already fixed) and make it ONCE * on the initial skb, so that all the following fragments * will inherit fixed options. */ if (offset == 0) ip_options_fragment(skb);
/* * Added AC : If we are fragmenting a fragment that's not the * last fragment then keep MF on each bit */ if (left > 0 || not_last_frag) iph->frag_off |= htons(IP_MF); ptr += len; offset += len;
/* * Put this fragment into the sending queue. */ iph->tot_len = htons(len + hlen);
/* Determine the position of this fragment. */ end = offset + skb->len - skb_network_offset(skb) - ihl; err = -EINVAL;
如果是最后一个 fragment,那么不应该超过 q.len,或者已经有了最后一个了,但是 end 和 q.len 不一致,所以有一些 corruption。如果检查没问题,就更新q.flasg 标记为最后一个和把 end 赋值给q.len。
1 2 3 4 5 6 7 8 9 10 11
/* Is this the final fragment? */ if ((flags & IP_MF) == 0) { /* If we already have some bits beyond end * or have different end, the segment is corrupted. */ if (end < qp->q.len || ((qp->q.flags & INET_FRAG_LAST_IN) && end != qp->q.len)) goto err; qp->q.flags |= INET_FRAG_LAST_IN; qp->q.len = end;
如果不是最后一个,长度要与 8 对齐,然后更新 q.len。
1 2 3 4 5 6 7 8 9 10 11 12 13
} else { if (end&7) { end &= ~7; if (skb->ip_summed != CHECKSUM_UNNECESSARY) skb->ip_summed = CHECKSUM_NONE; } if (end > qp->q.len) { /* Some bits beyond end -> corruption. */ if (qp->q.flags & INET_FRAG_LAST_IN) goto err; qp->q.len = end; }
err = -ENOMEM; if (!pskb_pull(skb, skb_network_offset(skb) + ihl)) goto err;
err = pskb_trim_rcsum(skb, end - offset); if (err) goto err;
/* Find out which fragments are in front and at the back of us * in the chain of fragments so far. We must know where to put * this fragment, right? */ prev = qp->q.fragments_tail; if (!prev || FRAG_CB(prev)->offset < offset) { next = NULL; goto found; } prev = NULL; for (next = qp->q.fragments; next != NULL; next = next->next) { if (FRAG_CB(next)->offset >= offset) break; /* bingo! */ prev = next; }
如果和前面的分组有重叠,就把重叠的部分去掉,CHECKSUM_NONE 可以使当前的校验和失效。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
if (prev) { int i = (FRAG_CB(prev)->offset + prev->len) - offset;
if (i > 0) { offset += i; err = -EINVAL; if (end <= offset) goto err; err = -ENOMEM; if (!pskb_pull(skb, i)) goto err; if (skb->ip_summed != CHECKSUM_UNNECESSARY) skb->ip_summed = CHECKSUM_NONE; } }
然后向后检查有没有重叠,并且把重叠的部分去掉,如果重叠的部分比 next 本身还要大,直接把 next 删掉。
while (next && FRAG_CB(next)->offset < end) { int i = end - FRAG_CB(next)->offset; /* overlap is 'i' bytes */
if (i < next->len) { /* Eat head of the next overlapped fragment * and leave the loop. The next ones cannot overlap. */ if (!pskb_pull(next, i)) goto err; FRAG_CB(next)->offset += i; qp->q.meat -= i; if (next->ip_summed != CHECKSUM_UNNECESSARY) next->ip_summed = CHECKSUM_NONE; break; } else { struct sk_buff *free_it = next;
/* Old fragment is completely overridden with * new one drop it. */ next = next->next;
if (prev) prev->next = next; else qp->q.fragments = next;