ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

BPF

BPF (Berkeley Packet Filter) 最早是用在 tcpdump 里面的,比如 tcpdump tcp and dst port 80 这样的过滤规则会单独复制 tcp 协议并且目的端口是 80 的包到用户态。整个实现是基于内核中的一个虚拟机来实现的,通过翻译 BPF 规则到字节码运行到内核中的虚拟机当中。最早的论文是这篇,这篇论文我大概翻了一下,主要讲的是原本的基于栈的过滤太重了,而 BPF 是一套能充分利用 CPU 寄存器,动态注册 filter 的虚拟机实现,相对于基于内存的实现更高效,不过那个时候的内存比较小才几十兆。bpf 会从链路层复制 pakcet 并根据 filter 的规则选择抛弃或者复制,字节码是这样的,具体语法就不介绍了,一般也不会去直接写这些字节码,然后通过内核中实现的一个虚拟机翻译这些字节码,注册过滤规则,这样不修改内核的虚拟机也能实现很多功能。

在 Linux 中对应的 API 是

1
2
3
socket(SOCK_RAW)
bind(iface)
setsockopt(SO_ATTACH_FILTER)

下面是一个低层级的 demo,首先 ethernet header 的十二个字节记录了 ip 的协议,ip 的第9个字节记录 tcp 的协议,如果协议编号不匹配都跳到最后 reject,然后在到 tcp 的第二个字节是 port 看看是不是 80,都满足的话就 accept。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <net/if.h>
#include <net/ethernet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <netpacket/packet.h>
#include <linux/filter.h>

#define OP_LDH (BPF_LD | BPF_H | BPF_ABS)
#define OP_LDB (BPF_LD | BPF_B | BPF_ABS)
#define OP_JEQ (BPF_JMP | BPF_JEQ | BPF_K)
#define OP_RET (BPF_RET | BPF_K)

// Filter TCP segments to port 80
static struct sock_filter bpfcode[8] = {
{ OP_LDH, 0, 0, 12 }, // ldh [12]
{ OP_JEQ, 0, 5, ETH_P_IP }, // jeq #0x800, L2, L8
{ OP_LDB, 0, 0, 23 }, // ldb [23] # 14 bytes of ethernet header + 9 bytes in IP header until the protocol
{ OP_JEQ, 0, 3, IPPROTO_TCP }, // jeq #0x6, L4, L8
{ OP_LDH, 0, 0, 36 }, // ldh [36] # 14 bytes of ethernet header + 20 bytes of IP header (we assume no options) + 2 bytes of offset until the port
{ OP_JEQ, 0, 1, 80 }, // jeq #0x50, L6, L8
{ OP_RET, 0, 0, -1, }, // ret #0xffffffff # (accept)
{ OP_RET, 0, 0, 0 }, // ret #0x0 # (reject)

};

int main(int argc, char **argv)
{
int sock;
int n;
char buf[2000];
struct sockaddr_ll addr;
struct packet_mreq mreq;
struct iphdr *ip;
char saddr_str[INET_ADDRSTRLEN], daddr_str[INET_ADDRSTRLEN];
char *proto_str;
char *name;
struct sock_fprog bpf = { 8, bpfcode };

if (argc != 2) {
printf("Usage: %s ifname\n", argv[0]);
return 1;
}

name = argv[1];

sock = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
if (sock < 0) {
perror("socket");
return 1;
}

memset(&addr, 0, sizeof(addr));
addr.sll_ifindex = if_nametoindex(name);
addr.sll_family = AF_PACKET;
addr.sll_protocol = htons(ETH_P_ALL);

if (bind(sock, (struct sockaddr *) &addr, sizeof(addr))) {
perror("bind");
return 1;
}

if (setsockopt(sock, SOL_SOCKET, SO_ATTACH_FILTER, &bpf, sizeof(bpf))) {
perror("setsockopt ATTACH_FILTER");
return 1;
}

memset(&mreq, 0, sizeof(mreq));
mreq.mr_type = PACKET_MR_PROMISC;
mreq.mr_ifindex = if_nametoindex(name);

if (setsockopt(sock, SOL_PACKET,
PACKET_ADD_MEMBERSHIP, (char *)&mreq, sizeof(mreq))) {
perror("setsockopt MR_PROMISC");
return 1;
}

for (;;) {
n = recv(sock, buf, sizeof(buf), 0);
if (n < 1) {
perror("recv");
return 0;
}

ip = (struct iphdr *)(buf + sizeof(struct ether_header));

inet_ntop(AF_INET, &ip->saddr, saddr_str, sizeof(saddr_str));
inet_ntop(AF_INET, &ip->daddr, daddr_str, sizeof(daddr_str));

switch (ip->protocol) {
#define PTOSTR(_p,_str) \
case _p: proto_str = _str; break

PTOSTR(IPPROTO_ICMP, "icmp");
PTOSTR(IPPROTO_TCP, "tcp");
PTOSTR(IPPROTO_UDP, "udp");
default:
proto_str = "";
break;
}

printf("IPv%d proto=%d(%s) src=%s dst=%s\n",
ip->version, ip->protocol, proto_str, saddr_str, daddr_str);
}

return 0;
}

执行 curl "http://www.baidu.com",结果如下:

1
2
3
4
5
6
7
8
sudo ./filter ens3
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244

这些低级别的操作都封装在了 libpcap 里面,一般不太会自己这么写。

eBPF

eBPF 是 extended BPF 具有更强大的功能。老的 BPF 现在叫 cBPF (classic BPF)。

首先是字节码的指令集更加丰富了,并且现在有了 64 位的寄存器(相较于上古时期的 32 位的CPU),有了 JIT mapping 技术和 LLVM 的后端。JIT 指的的是 Just In Time,实时编译。

一般的 ePBF 的工作流是编写一个 C 的子集(比如没有循环),通过 LLVM 编译到字节码,然成生成 ELF 文件,然后 JIT 编译进内核。

eBPF 一个最重要的功能是可以做到动态跟踪(dynamic tracing),可以不修改程序直接监控一个正在运行的进程。

在 eBPF 之前

在 ebpf 之前,为了实现同样的功能,要在执行的指令中嵌入 hook,并且支持跳到 inspect 函数,然后再恢复执行,这个流程和 debugger 非常相似,这是用 kprobe 来实现,kprobe 是 2007 年引入内核的。比如下面的例子,把 Instruction 3 改成跳转指令,然后再执行 Instruction 3,然后再跳转回去。

使用 kprobe 需要通过编译 kernel module 注册到内核当中,非常麻烦,等于是直接动内核的代码很容易引起内核 panic,而且每个内核版本都不一样,函数符号和位移是有区别的,对每个版本的内核都要编译一个对应的版本的 module。为了解决这个问题引入了一些静态的稳定的 trace point,不会因为版本而改变的地方可以插入 kprobe,但这样就限制了 kprobe 可以探测的范围。

有了 eBPF

有了 eBPF,就可以将用户态的程序插入到内核中,不用编写内核模块了,但是问题并没有改善,内核版本带来的问题还是没有解决。

eBPF 的 kprobe 一种方式时候 mapping,映射 kprobe 的数据到用户态程序,比如发包数,然后用户态程序定期检查这个映射进行统计。

另一种方式是 event (perf_events),如果 kprobe 向用户态程序发送事件来进行统计,这样不同轮询,直接异步计算就可以。

低级别的 API,这个只有 Linux 有

1
2
3
4
5
bpf() 系统调用
BPF_PROG_LOAD 加载 BPF 字节码
BPF_PROG_TYPE_SOCKET_FILTER
BPF_PROG_TYPE_KPROBE
BPF_MAP_* map 映射到 BPF 当中
1
perf_event_open() + ioctl(PERF_EVENT_IOC_SET_BPF)

高级别的接口是 bcc(BPFcompiler collection),转换 c 到 LLVM-epbf 后端,并且前端是 python 的。可以实现动态加载 eBPF 字节码到内核中。

weave scope 就是用 bcc 实现的 HTTP stats 的统计。

这里可以看到程序的主体,这里 hook 了内核函数 skb_copy_datagram_iter,这个函数有一个 tracepoint trace_skb_copy_datagram_iovec。在内核代码里面对应的是下面这段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* skb_copy_datagram_iter - Copy a datagram to an iovec iterator.
* @skb: buffer to copy
* @offset: offset in the buffer to start copying from
* @to: iovec iterator to copy to
* @len: amount of data to copy from buffer to iovec
*/
int skb_copy_datagram_iter(const struct sk_buff *skb, int offset,
struct iov_iter *to, int len)
{
int start = skb_headlen(skb);
int i, copy = start - offset, start_off = offset, n;
struct sk_buff *frag_iter;

trace_skb_copy_datagram_iovec(skb, len);

这里对这个 hook 注册了程序,具体的代码就不展示了,主要是根据协议统计这个 HTTP 的大小,方法等信息。

1
2
3
4
5
6
7
8
9
10

/* skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket
* buffers from kernel to userspace.
*
* skb_copy_datagram_iter() has an associated tracepoint
* (trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but
* it lacks the offset argument.
*/
int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *skb, int offset, void *unused_iovec, int len)
{

比如判断 method 是不是 DELETE 的是实现就比较蠢,是因为 eBPF 不支持循环,只能这么实现才能把 c 代码翻译成字节码。

1
2
3
4
5
case 'D':
if ((data[1] != 'E') || (data[2] != 'L') || (data[3] != 'E') || (data[4] != 'T') || (data[5] != 'E') || (data[6] != ' ')) {
return 0;
}
break;

除了 bcc 之外,waeve 使用了 gobpf,一个 bpf 的 go binding,并且通过建立 tcp 连接来猜测内核的数据结构,以达到内核版本无关,这个项目 tcptracer-bpf 还在开发中。

eBPF 的其他应用

还有一个比较大头的基于 eBPF 的是 cilium,一套比较完整的网络解决方案,用 eBPF 实现了 NAT,L3/L4 负载均衡,连接记录等等功能。比如访问控制,一般的 iptables 都是 drop 或者 rst,要过整个协议栈,但是 eBPF 可以在 connect 的时候就拦截然后返回 EACCESS,这样就不用过协议栈了。cilium 一个优化就是通过 XDP ,利用类似 DPDK 的加速方案,hook 到驱动层中,让 eBPF 可以直接使用 DMA 的缓冲,优化负载均衡。

BPF/XDP allows for a 10x improvement in load balancing over IPVS for L3/L4 traffic.

现在 k8s 最新的 lb 方案是基于 ipvs 的,我在 kube-proxy 分析 里面有提到过,已经比原来的 iptables 提高很多了,现在有了 eBPF 加 XDP 的硬件加速可以实现更高的提升,facebook 的 katran L4 负载均衡器的实现也是类似的。

cilium 在我看来基本上是 k8s 网络的一个大方向吧,只不过包括 eBPF 和 XDP 对硬件和内核版本都要比较新,是一个要持续关注的更新。

性能调优

Velocity 2017: Performance Analysis Superpowers with Linux eBPF里,Brendan Gregg (Netflix 的性能调优专家) 提到,性能调优也是 eBPF 的一个大头。用于网络监控其实只是 hook 在了协议栈的函数上,如果 hook 在别的地方可以有更多的统计维度。比如 bcc 官方的例子就是统计 IO Size 的大小的分布,更多关于基于 eBPF 的性能调优可以参考他的 blog,他给出了更详细的关于 eBPF 的解释,里面有一些列 Linux 性能调优的内容。

1
2
3
4
5
6
7
8
9
10
11
12
# ./bitehist.py
Tracing... Hit Ctrl-C to end.
^C
kbytes : count distribution
0 -> 1 : 3 | |
2 -> 3 : 0 | |
4 -> 7 : 211 |********** |
8 -> 15 : 0 | |
16 -> 31 : 0 | |
32 -> 63 : 0 | |
64 -> 127 : 1 | |
128 -> 255 : 800 |**************************************|

安全

在安全方面有 seccomp,可以实现限制 Linux 的系统调用,而 seccompe-bpf 则是通过 bpf 支持更强大的过滤和匹配功能,k8s pod 里面的 SecurityContext 就有 seccomp 实现的部分。

cgroup

在 cgroup 上有一个小原型,cgnet 获取 cgroup 的网络统计信息到 prometheus,也是基于 eBPF 的。

参考:

  1. Infrastructure 2017 - Alfonso Acosta - High-performance Linux monitoring with eBPF

  2. Using bpf in kubernetes

  3. Cilium: Networking and security for containers with BPF and XDP

csi 是一个标准的容器存储接口,规定了如何实现一个容器的存储接口,CSI 本身的定义是基于 gRPC 的,所以有一套样例库可以使用,这里分析一下 kuberntes 实现 csi 的方式,为了兼容 CSI kubernete 其实搞得挺绕的,目前这个 CSI 还是定制中包括后期的 Snapshot 的接口怎么设计等等还在讨论中。kubernetes CSI 主要基于几个外部组件和内部功能的一些改动。

CSI-Driver

这里规定了 CSI 的标准,定义了三个 Service,也就是 RPC 的集合,但是没规定怎么写,目前看到的实现都是把这三个 service 都写在一起,比较方便,然后部署的时候有些区别将就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
service Identity {
rpc GetPluginInfo(GetPluginInfoRequest)
returns (GetPluginInfoResponse) {}

rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
returns (GetPluginCapabilitiesResponse) {}

rpc Probe (ProbeRequest)
returns (ProbeResponse) {}
}

service Controller {
rpc CreateVolume (CreateVolumeRequest)
returns (CreateVolumeResponse) {}

rpc DeleteVolume (DeleteVolumeRequest)
returns (DeleteVolumeResponse) {}

rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
returns (ControllerPublishVolumeResponse) {}

rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
returns (ControllerUnpublishVolumeResponse) {}

rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
returns (ValidateVolumeCapabilitiesResponse) {}

rpc ListVolumes (ListVolumesRequest)
returns (ListVolumesResponse) {}

rpc GetCapacity (GetCapacityRequest)
returns (GetCapacityResponse) {}

rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
returns (ControllerGetCapabilitiesResponse) {}

rpc CreateSnapshot (CreateSnapshotRequest)
returns (CreateSnapshotResponse) {}

rpc DeleteSnapshot (DeleteSnapshotRequest)
returns (DeleteSnapshotResponse) {}

rpc ListSnapshots (ListSnapshotsRequest)
returns (ListSnapshotsResponse) {}
}

service Node {
rpc NodeStageVolume (NodeStageVolumeRequest)
returns (NodeStageVolumeResponse) {}

rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
returns (NodeUnstageVolumeResponse) {}

rpc NodePublishVolume (NodePublishVolumeRequest)
returns (NodePublishVolumeResponse) {}

rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
returns (NodeUnpublishVolumeResponse) {}

rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
returns (NodeGetVolumeStatsResponse) {}

// NodeGetId is being deprecated in favor of NodeGetInfo and will be
// removed in CSI 1.0. Existing drivers, however, may depend on this
// RPC call and hence this RPC call MUST be implemented by the CSI
// plugin prior to v1.0.
rpc NodeGetId (NodeGetIdRequest)
returns (NodeGetIdResponse) {
option deprecated = true;
}

rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
returns (NodeGetCapabilitiesResponse) {}

// Prior to CSI 1.0 - CSI plugins MUST implement both NodeGetId and
// NodeGetInfo RPC calls.
rpc NodeGetInfo (NodeGetInfoRequest)
returns (NodeGetInfoResponse) {}
}

比如说 GetPluginInfo 就是用来获取 driver 的 name 等信息的,NodePublishVolume 大部分情况下就是在节点上挂载文件系统,CreateVolume 这个如果对应的是 ebs 这种块存储可能就是在 API 里面建一个 ebs,如果对应的是 glusterfs 这种文件系统存储可能就是建一个 volume,然后 ControllerPublishVolume 对应 ebs 就是把 ebs 和 instance 绑定,然后调用节点的 NodePublishVolume 来挂载,如果是文件存储,可能就不需要 ``ControllerPublishVolume` 了,因为不需要绑定快设备到机器上,直接挂到网络接口就可以,这一套标准的目的一个是为了兼容现有的存储方案,一个是为了让一些私有的 provider 能够比较容易的实现一套方案,而不需要做过多的迁移,甚至厂商都不需要开源代码,如果是要实现 in-tree 的存储代码肯定是要开源的,因为 kubernetes 是开源的。

device-driver-registrar

kubernetes 实现 csi 的兼容,首先需要一个外部组件 devide-driver-registrar,初始化的时候通过 csi-sock 的 RPC 获取 driver name 和 node id。

主要功能给 node 打上下面类似的 annotations,dirver 对应的是 csi driver 的名字,name 对应的是 driver 的 NodeId 基本上就是 k8s 的 node name。这样可以让 ControllerPublishVolume 调用能够获取 nodeid 到 storage nodeid 的映射,理论上一样的就可以感觉。

1
csi.volume.kubernetes.io/nodeid: "{ "driver1": "name1", "driver2": "name2" }

他有两个模式,一个模式是自己给 node 打上这个 annotation,并且在退出的时候把这个 annotation 去掉。

另一个模式是交给 kubelet 的 pluginswatcher 来管理, kubelet 自己会根据 device-driver-registrar 提供的 unix domain socket 然后调用 gRPC 从 registrar 获取 NodeId 和 DriverName 自己把 annotation 打上。

搜索这条路径下的 socket/var/lib/kubelet/plugins/[SanitizedCSIDriverName]/csi.sock,然后就可以自动连接 registrar 拿到 NodeId 和 DriverName。

所以 device-driver-registar 主要是注册 Node annotation 的。

external-attacher

监听 VolumeAttachments 和 PersistentVolumes 两个对象,这是和 kube-controller-manager 之间的桥梁。

实现中最后会调用 SyncNewOrUpdatedVolumeAttachment 来同步,调用 csi dirver 的 Attach 函数。

in-tree 的 attach/detach-controller

在 CSI 中扮演的角色是创建 VolumeAttachment,然后等待他的 VolumeAttachment 的 attached 的状态。

attach-controller 会创建 VolumeAttatchment.Spec.Attacher 指向的是 external-attacher

external-provisoner

Static VolumeDynamic Volume 的区别是,有一个 PersistentVolumeClaim 这个会根据 claim 自动分配 PersistentVolume,不然就要自己手动创建,然后 pod 要指定这个手动创建的 volume。

external-provisoner 就是提供支持 PersistentVolumeClaim 的,一般的 provisioner 要实现 Provision 和 Delete 的接口。主要是根据 PVC 创建 PV,这是 Provisioner 的接口的定义了,不是 CSI spec 里的,这里顺带介绍一下。

external-provisoner 看到 pvc 调用 driver 的 CreateVolume,完成以后就会创建 PersistenVolume,并且绑定到 pvc。

kubelet volume manager

kubelet 有一个 volume manager 来管理 volume 的 mount/attach 操作。

desiredStateOfWorld 是从 podManager 同步的理想状态。

actualStateOfWorld 是目前 kubelet 的上运行的 pod 的状态。

每次 volume manager 需要把 actualStateOfWorld 中 volume 的状态同步到 desired 指定的状态。

volume Manager 有两个 goroutine 一个是同步状态,一个 reconciler.reconcile

rc.operationExecutor.MountVolume 会执行 MountVolume 的操作。

-> oe.operationGenerator.GenerateMountVolumeFunc

-> 首先根据 og.volumePluginMgr.FindPluginBySpec 找到对应的 VolumePlugin

-> 然后调用 volumePlugin.NewMounter

-> 然后拿到 og.volumePluginMgr.FindAttachablePluginBySpec attachableplugin

-> volumeMounter.SetUp(fsGroup) 做 mount

volume plugin

csi volume plugin 是一个 in-tree volume,以后应该会逐步迁移到都使用 csi,而不会再有 in-tree volume plugin 了。

1
2
3
func (c *csiMountMgr) SetUp(fsGroup *int64) error {
return c.SetUpAt(c.GetPath(), fsGroup)
}

csi 的 mounter 调用了 NodePublish 函数。stagingTargetPath 和 targetPath 都是自动生成的。

SetUp/TearDown 的调用会执行 in-tree CSI plugin 的接口(这又是 in-tree volume plugin 的定义,确实挺绕的),对应的是 NodePublishVolumeNodeUnpublishVolume ,这个会通过 unix domain socket 直接调用 csi driver。

总结一个简单的具体流程

首先管理员创建 StorageClass 指向 external-provisioner,然后用户创建指向这个 StorageClass 的 pvc,然后 kube-controller-manager 里的 persistent volume manager 会把这个 pvc 打上 volume.beta.kubernetes.io/storage-provisioner 的 annotation。

externla-provisioner 看到这个 pvc 带有自己的 annotation 以后,拿到 StorageClass 提供的一些参数,并且根据 StorageClass 和 pvc 调用 CSI driver 的 CreateVolume,创建成功以后创建 pv,并且把这个 pv 绑定到对应的 pvc。

然后 kube-controller-manager 看到有 pod 含有对应的 pvc,就用调用 in-tree 的 CSI plugin 的 attach 方法。

in-tree 的 CSI plugin 实际上会创建一个 VolumeAttachment 的 object,等待这个 VolumeAttachment 被完成。

external-controller 看到 VolumeAttachment,开始 attach 一个 volume,实际上调用 CSI driver 的 ControllerPublish,成功以后更新 VoluemAttachment 以后就知道这个 Volume Attach 成功了,然后让 attach/detach-controller (kube-controller-manager) 知道这个 attach 完成。

接下来就到 kubelet 了,kubelet 看到 volume in pod 以后就会调用 in-tree 的 csi plugin WaitForAttach,然后等待 Attach 成功,之后就会调用 daemonset 里面的 csi driver 的 NodePublishVolume 做挂载操作。

整体的流程是这样的,需要反复多看几遍 kubernetes-csi 的 document,加深理解。

kube-proxy 是一层代理,主要用于实现 Service 这个 Object。

Service 有一个 ClusterIP,这是一个虚拟 IP 应对 Service 后面的 pod,pod 的销毁和创建会有 ip 不固定的问题。除了能够屏蔽后端的 pod 的 IP 之外,Service 还可以有负载均衡的作用。

Endpoint 维护的是 Service 和 Pod 的映射关系,如果 pods 变动,Endpoint 也会变动。Service 本身指定义了 ClusterIP 和 port 的映射。Endpoints 记录了 pod 的 ip 和端口。

主要的访问方式是通过 NAT 实现的,如果使用 iptables 的话,就设置匹配目标端口以后,DNAT 转发到目的地址。

我看代码的时候,发现有个函数叫 birth cry 挺有意思的,表示打程序启动的一条 log。

Kube-proxy 主要监听两个对象,一个是 Service,一个是 Endpoint

这些 Objects 对应的处理函数有不同的实现,现在先看一下 iptables 的实现

当 Service 有更新的时候就会更新对应的 iptables 规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

glog.Info("Starting service config controller")
defer glog.Info("Shutting down service config controller")

if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) {
return
}

for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnServiceSynced()")
c.eventHandlers[i].OnServiceSynced()
}

<-stopCh
}

会调用 OnServiceSynced,然后具体调用实现者的方法。

Proxier

Netfilter

Netfilter 是内核模块的控制代码,ipvs 和 iptables 都是基于 Netfilter 实现的。

iptables

用 iptables 配置 DNAT ,然后再用概率模块做负载均衡。kube-proxy iptables 的实现主要依赖于 filter 和 nat。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Masquerade
-A KUBE-MARK-DROP -j MARK --set-xmark 0x8000/0x8000
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE

# clusterIP and publicIP
-A KUBE-SERVICES ! -s 10.244.0.0/16 -d 10.98.154.163/32 -p tcp -m comment --comment "default/nginx: cluster IP" -m tcp --dport 80 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -d 10.98.154.163/32 -p tcp -m comment --comment "default/nginx: cluster IP" -m tcp --dport 80 -j KUBE-SVC-4N57TFCL4MD7ZTDA
-A KUBE-SERVICES -d 12.12.12.12/32 -p tcp -m comment --comment "default/nginx: loadbalancer IP" -m tcp --dport 80 -j KUBE-FW-4N57TFCL4MD7ZTDA

# Masq for publicIP
-A KUBE-FW-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx: loadbalancer IP" -j KUBE-MARK-MASQ
-A KUBE-FW-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx: loadbalancer IP" -j KUBE-SVC-4N57TFCL4MD7ZTDA
-A KUBE-FW-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx: loadbalancer IP" -j KUBE-MARK-DROP

# Masq for nodePort
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/nginx:" -m tcp --dport 30938 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/nginx:" -m tcp --dport 30938 -j KUBE-SVC-4N57TFCL4MD7ZTDA

# load balance for each endpoints
-A KUBE-SVC-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx:" -m statistic --mode random --probability 0.33332999982 -j KUBE-SEP-UXHBWR5XIMVGXW3H
-A KUBE-SVC-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx:" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-TOYRWPNILILHH3OR
-A KUBE-SVC-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx:" -j KUBE-SEP-6QCC2MHJZP35QQAR

# endpoint #1
-A KUBE-SEP-6QCC2MHJZP35QQAR -s 10.244.3.4/32 -m comment --comment "default/nginx:" -j KUBE-MARK-MASQ
-A KUBE-SEP-6QCC2MHJZP35QQAR -p tcp -m comment --comment "default/nginx:" -m tcp -j DNAT --to-destination 10.244.3.4:80

# endpoint #2
-A KUBE-SEP-TOYRWPNILILHH3OR -s 10.244.2.4/32 -m comment --comment "default/nginx:" -j KUBE-MARK-MASQ
-A KUBE-SEP-TOYRWPNILILHH3OR -p tcp -m comment --comment "default/nginx:" -m tcp -j DNAT --to-destination 10.244.2.4:80

# endpoint #3
-A KUBE-SEP-UXHBWR5XIMVGXW3H -s 10.244.1.2/32 -m comment --comment "default/nginx:" -j KUBE-MARK-MASQ
-A KUBE-SEP-UXHBWR5XIMVGXW3H -p tcp -m comment --comment "default/nginx:" -m tcp -j DNAT --to-destination 10.244.1.2:80

这是一个实例,首先通过 clusterIP 的 -d 匹配目的地址,还有 dport 匹配目的端口,转入对应的 SVC 链。

SVC 联通过 -m statistic 模块负载均衡到不同的 Endpoint 链上,每个 Endpoint 链又有一个 DNAT 到对应的 pod

的 IP:port 上,总的来说就是用 DNAT 实现 clusterIP 的代理,用 statistic 模块实现按概率的负载均衡(按概率进入不同的 Endpoint 链当中)。

ipvs

ipvs 有相对更好的性能,更复杂的负载均衡的算法以及健康状态检查等。

可以看到 kube-ipvs0 上的 10.102.128.4 对应的 Service 的 ClusterIP,而 RealServer 映射到了不同的 Endpoint 上。

iptables 的匹配是 hash 的,而不同根据一个链一个链的匹配,效率更高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# kubectl describe svc nginx-service
Name: nginx-service
...
Type: ClusterIP
IP: 10.102.128.4
Port: http 3080/TCP
Endpoints: 10.244.0.235:8080,10.244.1.237:8080
Session Affinity: None

# ip addr
...
73: kube-ipvs0: <BROADCAST,NOARP> mtu 1500 qdisc noop state DOWN qlen 1000
link/ether 1a:ce:f5:5f:c1:4d brd ff:ff:ff:ff:ff:ff
inet 10.102.128.4/32 scope global kube-ipvs0
valid_lft forever preferred_lft forever

# ipvsadm -ln
IP Virtual Server version 1.2.1 (size=4096)
Prot LocalAddress:Port Scheduler Flags
-> RemoteAddress:Port Forward Weight ActiveConn InActConn
TCP 10.102.128.4:3080 rr
-> 10.244.0.235:8080 Masq 1 0 0
-> 10.244.1.237:8080 Masq 1 0 0

glusterfs 主要有以下几个组件

  • gluster 命令行。
  • glusterd 管理进程。
  • glusterfsd 服务进程。
  • glusterfs 客户端 fuse 进程。

glusterfs 是靠 translator 做分层设计的,类似于可插拔的模块的概念,对应的结构体是 xlator_t,因为是 C 写的,一些面向的对象的写法也是通过 xlator 实现的,可以理解 xlator 是类,具体的实现是对象。比如 protocol/client 是最后的 xlator 从客户端发送到网络,对应的 storage/posix 是服务端接受的最后一层交给服务器上的文件系统。每个 xlator 都会编译成一个单独的 .so 文件。通过指定配置文件可以把不同的 xlator 进行组装,加载 .so 以后,通过dlsym 查找 xlator 的符号(函数表),相当于获取 xlator 的公开接口。配置文件中的 volumesubvolume 对应了 xlator 的组织关系。

volfile 会配置在 /var/lib/glusterd/vols/ 下面,对应的文件名是 volfile-id 选项指定的。xlator_t 定义在 libglusterfs/src/xlator.h 里面。下面这张图就可以看到各个 translator 的关系。

glusterfsd 最开始是 protocol/server 最后是 storage/posix 两个 xlator 作为开头和结束。

glusterfs (client) 是以protocol/client 结尾。

那个 write-behind 应该对应的是缓存策略里面的 write back,异步写,而不是直写。

这种设计的好处是方便扩展,你只要填补 xlator 的实现,编译成 .so ,放到链接目录下面,就可以加载进去,不用改整个 glusterfs 的代码。

glusterfs 的整个应用的结构也可以看一下。

用户态是 fuse 实现的文件系统,通过网络协议走到 server,server 本身用的是本地的文件系统,glusterfs 推荐使用 xfs。

下面举个详细的例子。

writev 举例,(write 的本质也是 writev,这里的 v 指的是内存向量,写的时候用链表表示的内存向量可以减少内存的拷贝,因为要分配一段连续的内存,然后拷到一起很没有效率,而且系统调用本身也支持这个系统调用,走到底层是硬件支持的,硬件不支持,也能 hook 帮你再拷贝到一起,但上层就不用管这些细节了),他们的调用关系通过 STACK_WINDSTACK_UNWIND 实现,对于所有的 xlator 的 op 都是这种调用关系,本身一层结束以后通过调用 STACK_WIND 调用下一层对应的 op,然后在调用完成之后通过 STACK_UNWIND 回调 op_cbk,并且这种调用关系是树状的。

这张图解释挺好的,说明了 xlator 向下传递的过程,通过 STACK_WIND 调用下一层,通过 STACK_UNWIND 调用上一层的 cbk。

glusterfsd 每个 volume 都会起一个线程来处理。

./xlators/mount/fuse/src/fuse-bridge.c 下面是 fuse 的接口,这是客户端的入口,作为文件系统的“桥接点”,大概是为什么叫 bridge 的原因吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static fuse_handler_t *fuse_std_ops[FUSE_OP_HIGH] = {
[FUSE_LOOKUP] = fuse_lookup,
[FUSE_FORGET] = fuse_forget,
[FUSE_GETATTR] = fuse_getattr,
[FUSE_SETATTR] = fuse_setattr,
[FUSE_READLINK] = fuse_readlink,
[FUSE_SYMLINK] = fuse_symlink,
[FUSE_MKNOD] = fuse_mknod,
[FUSE_MKDIR] = fuse_mkdir,
[FUSE_UNLINK] = fuse_unlink,
[FUSE_RMDIR] = fuse_rmdir,
[FUSE_RENAME] = fuse_rename,
[FUSE_LINK] = fuse_link,
[FUSE_OPEN] = fuse_open,
[FUSE_READ] = fuse_readv,
[FUSE_WRITE] = fuse_write,
[FUSE_STATFS] = fuse_statfs,
[FUSE_RELEASE] = fuse_release,
[FUSE_FSYNC] = fuse_fsync,

看一下 fuse_write,传了一个 xlator_t ,这个东西是 glusterfs 的分层设计的核心,每个 fuse_in_header_tmsg 还有 iobuf 都是 fuse 的 API。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void
fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg,
struct iobuf *iobuf)
{
/* WRITE is special, metadata is attached to in_header,
* and msg is the payload as-is.
*/
struct fuse_write_in *fwi = (struct fuse_write_in *)
(finh + 1);

fuse_state_t *state = NULL;
fd_t *fd = NULL;
#if FUSE_KERNEL_MINOR_VERSION >= 9
fuse_private_t *priv = NULL;
priv = this->private;
#endif

GET_STATE (this, finh, state);
fd = FH_TO_FD (fwi->fh);
state->fd = fd;
state->size = fwi->size;
state->off = fwi->offset;

/* lets ignore 'fwi->write_flags', but just consider 'fwi->flags' */
#if FUSE_KERNEL_MINOR_VERSION >= 9
state->io_flags = fwi->flags;
#else
state->io_flags = fwi->write_flags;
#endif
/* TODO: may need to handle below flag
(fwi->write_flags & FUSE_WRITE_CACHE);
*/


fuse_resolve_fd_init (state, &state->resolve, fd);

/* See comment by similar code in fuse_settatr */
#if FUSE_KERNEL_MINOR_VERSION >= 9
priv = this->private;
if (priv->proto_minor >= 9 && fwi->write_flags & FUSE_WRITE_LOCKOWNER)
state->lk_owner = fwi->lock_owner;
#endif

state->vector.iov_base = msg;
state->vector.iov_len = fwi->size;
state->iobuf = iobuf;

fuse_resolve_and_resume (state, fuse_write_resume);

return;
}

首先是传入了写入文件的描述符,要写的内存的段的地址和大小,iobuf 这个结构体和内核里的 iobuf 是查不多的。然后进到 fuse_resolve_and_resume,这个函数主要是解析一些文件的元数据,并且返回到 writev_resume 这个回调函数上,解析的函数在 ./xlators/mount/fuse/src/fuse-resolve.c 下面,这整个是个状态机的写法,主要是解析 fd,父路径,inode 等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int
fuse_resolve (fuse_state_t *state)
{
fuse_resolve_t *resolve = NULL;

resolve = state->resolve_now;

if (resolve->fd) {

fuse_resolve_fd (state);

} else if (!gf_uuid_is_null (resolve->pargfid)) {

fuse_resolve_parent (state);

} else if (!gf_uuid_is_null (resolve->gfid)) {

fuse_resolve_inode (state);

} else {
fuse_resolve_all (state);
}

return 0;
}

到了回到调 fuse_write_resume 上,包了一层引用计数就往下传了。

1
2
3
4
5
6
7
8
9
10
11
iobref_add (iobref, state->iobuf);

gf_log ("glusterfs-fuse", GF_LOG_TRACE,
"%"PRIu64": WRITE (%p, size=%"GF_PRI_SIZET", offset=%"PRId64")",
state->finh->unique, state->fd, state->size, state->off);

FUSE_FOP (state, fuse_writev_cbk, GF_FOP_WRITE, writev, state->fd,
&state->vector, 1, state->off, state->io_flags, iobref,
state->xdata);

iobref_unref (iobref);

fuse_writev_cbk 里面 send_fuse_objiobuf发送出去。send_fuse_datasend_fuse_iov

然后走整个的 xlator 的 writev 调用链,到 client 上,这里最主要的就是 submit_request 开始走网络 rpc 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int32_t
client3_3_writev (call_frame_t *frame, xlator_t *this, void *data)
{
clnt_args_t *args = NULL;
clnt_conf_t *conf = NULL;
gfs3_write_req req = {{0,},};
int op_errno = ESTALE;
int ret = 0;

if (!frame || !this || !data)
goto unwind;

args = data;
conf = this->private;

ret = client_pre_writev (this, &req, args->fd, args->size,
args->offset, args->flags, &args->xdata);

if (ret) {
op_errno = -ret;
goto unwind;
}

ret = client_fd_fop_prepare_local (frame, args->fd, req.fd);
if (ret) {
op_errno = -ret;
goto unwind;
}
ret = client_submit_request (this, &req, frame, conf->fops,
GFS3_OP_WRITE, client3_3_writev_cbk,
args->iobref, args->vector, args->count,
NULL, 0, NULL,
(xdrproc_t)xdr_gfs3_write_req);
if (ret) {
/*
* If the lower layers fail to submit a request, they'll also
* do the unwind for us (see rpc_clnt_submit), so don't unwind
* here in such cases.
*/
gf_msg (this->name, GF_LOG_WARNING, 0, PC_MSG_FOP_SEND_FAILED,
"failed to send the fop");
}

GF_FREE (req.xdata.xdata_val);

return 0;

unwind:
CLIENT_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);
GF_FREE (req.xdata.xdata_val);

return 0;
}

回调就会开始释放内存等等操作,这里就略过了,这里回到 fuse 的 writev_cbk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static int
fuse_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *stbuf, struct iatt *postbuf, dict_t *xdata)
{
fuse_state_t *state = NULL;
fuse_in_header_t *finh = NULL;
struct fuse_write_out fwo = {0, };

state = frame->root->state;
finh = state->finh;

fuse_log_eh_fop(this, state, frame, op_ret, op_errno);

if (op_ret >= 0) {
gf_log ("glusterfs-fuse", GF_LOG_TRACE,
"%"PRIu64": WRITE => %d/%"GF_PRI_SIZET",%"PRId64"/%"PRIu64,
frame->root->unique,
op_ret, state->size, state->off, stbuf->ia_size);

fwo.size = op_ret;
send_fuse_obj (this, finh, &fwo);
} else {
gf_log ("glusterfs-fuse", GF_LOG_WARNING,
"%"PRIu64": WRITE => -1 gfid=%s fd=%p (%s)",
frame->root->unique,
(state->fd && state->fd->inode) ?
uuid_utoa (state->fd->inode->gfid) : "nil", state->fd,
strerror (op_errno));

send_fuse_err (this, finh, op_errno);
}

free_fuse_state (state);
STACK_DESTROY (frame->root);

return 0;
}

send_fuse_obj 是一个宏实际上是 send_fuse_data,把 fuse_write_out 传给 fuse 告诉 fuse 写出了多少,或者返回错误给 fuse。

1
2
#define send_fuse_obj(this, finh, obj) \
send_fuse_data (this, finh, obj, sizeof (*(obj)))

整个的分析过程大概是从客户端的角度来看的,glusterfs 比较重要的一个 xlator 就是 dht xlator,distributed hash table,这个 xlator 决定了文件的分布。

Tensor 的用户 API 可以参考这里,这里做一下简单介绍。Tensor 是各种维度的向量和矩阵的统称,分 Tensor 和 SparseTensor。和 Tensor 不同,SparseTensor 存的是值以及值对应的 index,而 Tensor 存的是完整的矩阵。

举个例子。

1
2
3
4
5
6
7
8
import tensorflow as tf
a = tf.constant([1, 1])
b = tf.constant([2, 2])
c = tf.add(a, b)
sess = tf.InteractiveSession()
print("a[0]=%s, a[1]=%s" % (a[0].eval(), a[1].eval()))
print("c = %s" % c.eval())
sess.close()

输出对应如下。

1
2
a[0]=1, a[1]=1
c = [3 3]

Tensor 只有 eval 以后才能获得结果,是懒计算的。

Tensor 的实现

Tensor (tensorflow/tensorflow/core/framework/tensor.h) 依赖 TensorShape(tensorflow/tensorflow/tensorflow/core/framework/tensor_shape.h) 和 TensorBuffer (tensorflow/tensorflow/core/framework/tensor.h) 两个成员。

TensorShape 主要负责记录张量的形状。

TensorBuffer 主要负责管理 Tensor 的内存,TensorBuffer 继承自 RefCounted (tensorflow/tensorflow/core/lib/core/refcount.h),具有引用计数的功能,用于对内存进行管理。

1
2
3
4
5
// Interface to access the raw ref-counted data buffer.
class TensorBuffer : public core::RefCounted {
public:
~TensorBuffer() override {}
...

他们的对应的关系如下。

Tensor 从下往上看,其实就是一个带”形状“的内存,和 NumPy 的数组是差不多的。

OpKernel

对于一个线性回归来说,是最简单也最好理解的模型,方便分析底层的代码实现。

$$ Y=XW+b $$

损失函数用平方差定义的,优化器是提督下降,这样一个模型可以用一下的 Python 代码实现,这个代码片段是截取的,如果要完整运行这个例子可以在这里复现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import tensorflow as tf
sess = tf.InteractiveSession()
x = tf.placeholder(tf.float32,[None, x_train.shape[1]])
y = tf.placeholder(tf.float32,[None, 1])
w = tf.Variable(tf.zeros([x_train.shape[1],1]))
b = tf.Variable(tf.zeros([1])) # placeholder 不用加 None
pred = tf.add(tf.matmul(x, w), b)

init = tf.global_variables_initializer()
cost = tf.reduce_mean(tf.square(y - pred))
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost)
epochs = 3000

init.run()

for epoch in range(0, epochs):
optimizer.run(feed_dict={x:x_train, y:y_train})
c = cost.eval(feed_dict = {x:x_train,y:y_train})
if epoch%100 == 0:
print_percentage(int(c*100))

print('\nEpoch: {0}, Error: {1}'.format(epoch+1, c))

b_value = b.eval()
w_value = w.eval()

# Predicted Labels
y_pred = pred.eval(feed_dict={x: x_test})

# Mean Squared Error
mse = tf.reduce_mean(tf.square(y_pred - y_test))
print("MSE", mse.eval())
sess.close()

对应的训练结果。

1
2
Epoch: 3000, Error: 0.25882452726364136
MSE 0.30620116674806463

可以看到,线性回归的模型主要依赖的两个 Operation 分别是 tf.addtf.matmul,其他的复杂模型也是类似的逻辑,对应的 OpKernel 分别是 AddOpMatMulOp,这里可以看一下具体的实现。

如果有源代码,可以连着源代码用 bazel 编译,可以参照这里自己编写一个 Op。

MatMulOp 的实现在 /tensorflow/core/kernels/matmul_op.cc 下面,定义在tensorflow/tensorflow/core/ops/math_ops.cc 下面。AddOp/tensorflow/core/kernels/matmul_op.cc,实现在 /tensorflow/core/kernels/cwise_op_add_1.cc 下面,依赖 /tensorflow/tensorflow/core/kernels/cwise_ops_common.hcommon 的定义。

Add 用的是 Eigenadd /tensorflow/tensorflow/core/kernels/cwise_ops.h,依赖third_party/Eigen/src/Core/functors/BinaryFunctors.h

举个例子,看一下 MatMulOpMatMulOp 的构造函数里面有一个 OpKernelConstruction 可以初始化 OpKernel,通过 OpKernel 可以获得这个 Op 的参数比如transpose_a 等等。

1
2
3
4
5
6
7
8
9
10
11
12
template <typename Device, typename T, bool USE_CUBLAS>
class MatMulOp : public OpKernel {
public:
explicit MatMulOp(OpKernelConstruction* ctx)
: OpKernel(ctx), algorithms_set_already_(false) { // 在执行构造函数之前,执行两个成员的构造函数
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_a", &transpose_a_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_b", &transpose_b_));

LaunchMatMul<Device, T, USE_CUBLAS>::GetBlasGemmAlgorithm(
ctx, &algorithms_, &algorithms_set_already_);
use_autotune_ = MatmulAutotuneEnable();
}

每个 OpKernel 都要实现一个 Compute 函数,可以看到这个 Compute 函数首先检查了两个 Tensor 是否是矩阵,然后检查两个矩阵的形状是否符合矩阵相乘的条件,然后根据形状分配 TensorShape 并且根据 TensorShape 分配新的 Tensor (其实顺便分配的 TensorBuffer 的内存空间)。然后通过 LaunchMatMul 真正执行相乘操作,因为这个计算过程,可能是用了 GPU,所以模版是带 Device 的(GPUDevice/CPUDevice)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void Compute(OpKernelContext* ctx) override {
const Tensor& a = ctx->input(0);
const Tensor& b = ctx->input(1);

// Check that the dimensions of the two matrices are valid.
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(a.shape()),
errors::InvalidArgument("In[0] is not a matrix"));
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(b.shape()),
errors::InvalidArgument("In[1] is not a matrix"));
Eigen::array<Eigen::IndexPair<Eigen::DenseIndex>, 1> dim_pair;
dim_pair[0].first = transpose_a_ ? 0 : 1;
dim_pair[0].second = transpose_b_ ? 1 : 0;

OP_REQUIRES(
ctx, a.dim_size(dim_pair[0].first) == b.dim_size(dim_pair[0].second),
errors::InvalidArgument(
"Matrix size-incompatible: In[0]: ", a.shape().DebugString(),
", In[1]: ", b.shape().DebugString()));
int a_dim_remaining = 1 - dim_pair[0].first;
int b_dim_remaining = 1 - dim_pair[0].second;
TensorShape out_shape(
{a.dim_size(a_dim_remaining), b.dim_size(b_dim_remaining)});
Tensor* out = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(0, out_shape, &out));

if (out->NumElements() == 0) {
// If a has shape [0, x] or b has shape [x, 0], the output shape
// is a 0-element matrix, so there is nothing to do.
return;
}

if (a.NumElements() == 0 || b.NumElements() == 0) {
// If a has shape [x, 0] and b has shape [0, y], the
// output shape is [x, y] where x and y are non-zero, so we fill
// the output with zeros.
functor::SetZeroFunctor<Device, T> f;
f(ctx->eigen_device<Device>(), out->flat<T>());
return;
}

LaunchMatMul<Device, T, USE_CUBLAS>::launch(
ctx, a, b, dim_pair, &algorithms_, use_autotune_, out);
}

LaunchMatMul 继承自 LaunchMatMulBase,在 LaunchMatMulBase 当中调用了 functor::MatMulFunctor,这个 functor 主要就会执行乘法操作,在这之前会检查一下是否其中一个元素是 vector,这样可以直接优化算出来,而不用 Eigen 库来算,这样更快,这个目前看到的是 CPU 的路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template <typename Device, typename T>
struct LaunchMatMulBase {
#if GOOGLE_CUDA
typedef se::blas::AlgorithmType AlgorithmType;
#else
typedef int64 AlgorithmType;
#endif // GOOGLE_CUDA

static void launch(
OpKernelContext* ctx, const Tensor& a, const Tensor& b,
const Eigen::array<Eigen::IndexPair<Eigen::DenseIndex>, 1>& dim_pair,
std::vector<AlgorithmType>* algorithms, bool use_aututone, Tensor* out) {
#ifndef TENSORFLOW_USE_SYCL
// An explicit vector-matrix multiply is much better optimized than an
// implicit one and this is a bottleneck during non-batched inference.
bool was_vector = ExplicitVectorMatrixOptimization<T>(a, b, dim_pair, out);
if (!was_vector) {
#endif // TENSORFLOW_USE_SYCL
functor::MatMulFunctor<Device, T>()(ctx->eigen_device<Device>(),
out->matrix<T>(), a.matrix<T>(),
b.matrix<T>(), dim_pair);
#ifndef TENSORFLOW_USE_SYCL
}
#endif // TENSORFLOW_USE_SYCL
}

static void GetBlasGemmAlgorithm(OpKernelConstruction* ctx,
std::vector<int64>* algorithms,
bool* algorithm_set_flag) {}
};

MatMulFunctor 在设备 d 上计算矩阵相乘的结果,其中调用的是 MatMul<CPUDevice>

1
2
3
4
5
6
template <typename Device, typename In0, typename In1, typename Out,
typename DimPair>
void MatMul(const Device& d, Out out, In0 in0, In1 in1,
const DimPair& dim_pair) {
out.device(d) = in0.contract(in1, dim_pair);
}

这里的 contract 调用的是 TensorContractionOp (third_party/unsupported/Eigen/CXX11/src/Tensor/TensorContraction.h),跟之前说的一样,这个 Op 是计算图的一部分,要通过 eval 来做计算,计算结果是 eval 驱动的。TensorContractionOp 的构造函数就是,这负责构建左表达式和右表达式。

1
2
3
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorContractionOp(
const LhsXprType& lhs, const RhsXprType& rhs, const Indices& dims)
: m_lhs_xpr(lhs), m_rhs_xpr(rhs), m_indices(dims) {}

真正的计算过程在 TensorContractionEvaluatorBase 里面,真正执行计算过程,计算细节就省略了主要是矩阵相乘。

CUDA

如果条件编译 GOOGLE_CUDA 的话,会使用 GPU 的代码,对应会调用到 steam executor,这个以后具体分析。

总结

Tensorflow 基于图模型的,并且是懒计算的,通过扩展可以自己用 C++ 实现新的 Op,并且也可以观察默认自带的 OpKernel 是如何实现的,对于理解 Tensorflow 的工作流程会有很大的帮助。Tensorflow 本身依赖了 Eigen,CUDA 等线性代数库或者 GPU 计算库,要看懂代码还是要多学一点线代的知识,比如 Contraction 这个概念我也是第一次晓得。

参考文献

  1. 深入理解 Tensorflow 架构设计与原理实现

Raft 的作者其实对 Paxos 研究得很深,毕竟 Paxos 基本上就是共识算法的代名词,建议在了解 Raft 之前先看看 Paxos,因为 Paxos 里面有一段从单点开始完整的推断共识算法的正确性和必要性的过程,方便建立一种对共识算法上直觉的认同。Raft 论文最好看作者的博士论文,那个比较完整。

leader election

Raft 有 Term 的概念,把时间每分成了一个个的任期,每个任期开始是选举过程。Raft server 有三种角色,Candidate 就是选举中的状态,Leader 是当选的状态,Follower 是服从并选择接受 Leader 的 Log 的状态。

其实可以把选主看作一个 basic Paxos,大家对 leader=? 达成一致,触发的条件是超时和初始化的时候,term 就是类似于 Paxos 的 propose number,和 basic Paxos 不一样的是每个 term 只能投一个人,所以每个 term 不会有冲突,只有一个人会当选,但是会有平票的问题,大家用同一个 term 选举的情况下就会发生(比如说大家都投自己,谁都不接受谁),所以为了减少冲突,把重试的时间随机化,快速超时的人能拿到高 term,并且其他慢速超时的 candidate 会服从这个先拿到下一个 term 的 leader。成为 leader 的标志是收到了大部分人的服从(3/5 或者 2/3)。

log replication

log 不一致的就以上几种情况,可能少了 entry,可能多了没 commited 的 entry,可能又少了 entry 还多了没 commited 的 entry,log 不一致的情况就这几种。index term 能唯一确定一个 entry,如果一个 entry 确定,之前的也确定了。基于上面的性质,如果 entry commited 之前的就是都 commited 了。这个通过 append 的 prev index prev term,来保证这个特性。解决方法是在 AppendRequest 里面带上新的 entry 前的 index 和 term(在 leader 那里存的前一个),如果这个 entry 不在 follower 里面的话,就拒绝追加新 entry,然后 leader 就要用前一个 index 尝试,直到开始出现 match 的点,有点像 git tree 里面开始产生分支的那个 base,这个过程一直减少自己保存的 follower 的 nextIndex,一直到获得和自己 match 的最后一个 index 开始追加复制自己的 log。也就是说这样会强迫所有的 follower 复制主节点的 log,这里有一个按 term 回溯 base 的方法,但是这个优化作者认为有点多此一举,毕竟错误发生的情况比较少。这里会有一个问题(啥问题),后面会通过选举的时候加上限制解决。

这个问题就是,新的 leader 可能没有之前 commited 的 log,然后修改了其他 follower 的 log,导致每个 state machine 执行的命令是不一致的,所以要加一个约束,在投票的时候,新的 leader 必须包含之前 commited 的 log,这个保证就需要通过定义 entry 的 update-to-date,在投票的时候拒绝比自己老的 leader 当选。

safety

这个问题被称作 safety,为了解决这个问题 update-to-date 怎么定义呢?比如下面这个例子,如果 S3 挂了,其实 index 5 已经 commited 了,但是没有办法确定 index 5 的这个 log 是否 commited 了。所以要选个最可能包含所有 log 的做 leader。

先比 term,再比 log 数量,按顺序比,有点像字符串比较,这样可以确保 leader 有其他 follower 的 entry。没有大多数人的又新又全就无法成为 leader。

commit from previous entry

如果一个 leader 在 commit 之前 crash 了,并且新的 leader 没有已经在大多数节点上的 entry 可能会覆盖掉本该 commit 的 entry。

比如这图,如果是 (d1) 的情况,3 就会把 2 盖掉,但是实际上 2 本该 commited 了,但是在 commit 之前 S5 可能在 term 5 成为了主(这个没有违背上面的 upda-to-date 因为 3 确实比 2 要新,并且可以从 S2, S3, S4 那里获得选票)。

光看数量,没办法替之前 term 的 entry commit,因为 d1 的情况,可能就把 2 盖掉了。只有 term 4 的时候 4 commit 了(当前的 term 可以 commit)才能替之前 term 的 entry commit(实际上间接 commit 了,这个可以证明),这样 term 3 就不会成为主,就不可能盖掉 2。

总结一下,在 Raft 中 leader 不会 commit 之前 term 的 entry,只 commit 当前 term 的 entry。当前 term 的 entry commit 了,之前 term 的 entry 就间接 commit 了。

持久化数据

当前的 term 和 vote 。

membership change

raft conf change 有几个主要阶段,其中 C(old)+c(new) 的情况下,需要服从两个 conf 的大多数,并且,每次只能一个一个的增加 server 和 减少 server。

参考文献

Paxos 是什么

Paxos is a mechanism for achieving consensus on a single value over unreliable communication channels.

Paxos 就是一个在不稳定的网络环境下建立的对一个值的共识。我们说的 Multi-Paxos 是指对多个值的共识,不过我们先一步一步来。

在 Paxos 中是没有 leader 这个概念的,所以相对来说会比较慢,因为谁都可以处理请求并且把自己的 peer 盖掉导致冲突,达成一致的过程会更长。

Paxos 只抵抗机器崩溃,网络异常,不抵抗恶意行为的节点,或者说使用不同协议的参与者,或者说参与者不能撒谎,所以经典的 Paxos 不抵抗拜占庭问题(也有针对拜占庭问题的 Paxos 带了 verify 的过程)。

Paxos 论文里面描述的算法,其实不是很清晰,一般人看了都会有疑问,所以后来很多人对这个算法做了补充和解释,包括作者本人。

基本需求

第一个是安全性,就是只有一个值会被选择,并且节点对这个值不会主动知道,而是在值被选择以后被动学习知道这个值被选择的(这个原文有点难懂我换成了自己的话解释了一下)。
第二个是活性,也就是值最终会被选择,并且所有节点会最终学习到这个值被选择了。

解决方案

首先单点可以排除,因为单点虽然是最容易解决一致性问题的,但是如果单点挂了,整个就不可用了,所以显然不能依靠单点。

那如果每个节点就接受自己接受到的第一个值,也会有平票问题(split votes)。redblue 对等的,那到底谁被选择了呢,所以来了就接受的策略并不可行。

Paxos 是 quorum based 的,表示的一致性协议是少数服从多数的,在大多数节点都接受了这个值以后,这个值就被选择了。为了让节点可以接受多个值,多个值之间需要区分,所以就有了提交号,这个号码是单增的,拒绝掉小号的提交,并且当一个值已经被选择,那么之后的提交都要提交这个值,这样做的目的是让提交者知道这个值被学习了,是大家认可的一个值。有了少数服从多数原则,就会碰到冲突的问题。

这种情况就是下面这样,从时间上来讲,red 已经被选择了,如果 S3 能够拒绝 red 的提交,那么 S3,S4,S5就可以拿着 red 重试,并且知道 red 已经被大多数人接受了,而知道冲突的 S3 就会决绝这次请求,这个点上就处理了两个值(接受了一个拒绝了一个)。

序列号可以帮助我们区分优先关系,这样因为网络问题延迟的请求就可以被处理。下面这种情况,red 虽然先提交,但是并不是先被大多数接受的(存在网络延迟),这个时候blue已经被选择了(被大多数人接受),我们需要提交 red 的提交意识到自己已经太“老”了,而触发这就是接受了 blueS3

所以你会发现,矛盾的点其实就是这个 S3,也就是少数服从多数原则,能保证任意的大多数都是有交集的。交集中的点会发现矛盾和之前接受的值有矛盾选择拒绝。

问题:三节点的容忍度是 1,四节点的容忍度是多少?

答案:也是 1,因为要形成发现矛盾的交集对于 4 来说,要达到 3/4,才能构成大多数,这就是为什么集群选单数的原因,因为双数从算法的角度来说没什么帮助。

接下来看具体的算法。

其实,从朴素角度来说,经典 Paxos 看起来就是一个两阶段提交的过程,首先是准备阶段,选择一个提交号 n,提交 prepare(n),接受者需要返回自己接受的值,和已经接受的提交号。当从大多数收到回复以后就可以做判断了,如果有返回接受值,选择提交号最大的值进行下一阶段(这个行为对应的是发现有值可能被接受了,尝试服从或者学习这个接受),不然就可以用自己的值进行下一阶段。

下一阶段就是 accept(value,n),如果接受者发现自己目前收到的n,没有比accpet给的n大,就接受这个值,并且更新自己的n,否则就拒绝(这里就保证提交者能够发现自己变老了或者被拒绝了)。
如果接受者发现提交号大于自己当前的最大提交号,就接受这个值,不然就拒绝。当提交者从大多数人那里接受到返回以后发现有拒绝的情况,就进行重试拿一个新的n开始,否则这个值就被接受了。

Basic Paxos value 就是设置一次,不存在再设次一次的情况。

总结起来就是如图:

那这样的一个二阶段提交,看看能不能解决前面的问题,主要有三种可能。
注意,这里的例子是原文里的例子,一个变量的取值是X或者Y,然后3.1 表示 S1提交的提交号为 3 的提交,这是我们定义的 message ID。

第一种值被选择了,后者意识到了X,放弃 YX 提交,也就意味着提交者学习到了这个值已经被选择了。

第二种情况,值没有被选择,但是交集的部分看到了一个被选择的值,也会选择放弃YX提交,虽然X暂时没有被选择(被大多数人接受),但是可以保证两个提交都成功。

第三种情况,值没有被选择,同时交集的部分也没有发现被选择的值,如果已经做了 promies 这个时候就会拒绝老的提交。比如下面在 S3 accept(X) 的时候就会放弃提交并且进行重试(S1 S2 也会一起重试), 并且在重试的时候覆盖掉原先接受的x)。

看起来所有的问题都解决了,但是活性问题无法保证。这个情况发生在提交之间相互阻塞的情况,S3 S4 S5 拿着更高的提交号导致 S1 S2 S3 的 accept 被拒绝重新进行提交,又把 S3 S4 S5 给拒绝了。

解决这个问题的办法就是把重试时间进行一些随机化,减少这种巧合发生,或者把重试的时间指数增长等等。

Multi-Paxos

到此 classic Paxos 算是告一段落。那 Paxos 有什么问题呢。首先是活性的问题,这个在后面可以通过选主的方式解决。其次是学习是通过 propose 获知值的,不然无法知道一个值是否被接受了,要走一遍整套的 Paxos 协议。

Multi-Paxos 新增的问题是如何选择 log entry,并且用选主的方式减少冲突,以及减少 prepare 的请求。

以下图为例。深色框表示这个 log 已经被选择(怎么确定选择后面会提到)。寻找 log entry 的方式就是寻找第一个没有被选择的 log,尝试执行 basic Paxos,如果有值被选择会尝试用这个 log 提交帮助这个 log 被选择,这个过程和 basic paxos 是一致的,但是在此之后就要继续寻找下一个 log 尝试进行我们的尝试。

下图中(只看 S1 和 S2 这两行),第一次会找到最后一个没有没学习的 log,也就是 index=3 的 log,但是发现了 s1cmp,就会服从这个 cmp,然后到 index=4 发现了 s2sub,也选择服从,并且学习到了 index=4 的 log 应该是 sub,最后到了 index=5 才进行插入。

这样的情况下,Paxos 可以接受并发的请求,而 Raft 却规定了对 log 只能 append,不能 3 4 5 都能同时处理,简化了实现。反正只要保证了 log 的顺序一致,状态机的最终状态都是一致的。

接下来的是对于性能的一些提升,一个是选主避免大面积冲突,另一个是优化二阶段提交的次数。解决方法是选出一个主,保证一次只有一个提交者。另外对于 prepare 是对整个 log 进行 prepare,而不是单个 log entry,这样大多数情况下,大部分 log 都可以一次性就被选择。

可以通过 lease based 选主。 lamport 提出的方式比较简单,节点之间维持T间隔的心跳,2T 之内没有收到更高编号的主的心跳就成为主,非主则转发请求给主,这样还是不会避免同时出现两主的情况,两主会有冲突,但 Paxos 就算有两主还是能正常运行,毕竟有主只是优化方式。

接下来我们看看优化提交的过程。回忆一下,prepare 的作用,其中一点是帮助我们发现冲突,知道有值可能被选择了,另外一点是拒绝老的提交,让他们发现自己变“老”了。

我们可以改变提交号的意义,让他代表整个 log,也就是所有 log entry 都用一个提交号,这样接受者可以通过返回 noMoreAccepted 让提交者意识到在当前 log entry 之后的 log 都没有 accpeted 是可以被“锁”住的,然后如果大多数节点返回 noMoreAccepted,就可以跳过之后的 prepare 直到 accept 被拒绝。这样后续的 accept 操作就可以不用 prepare 在一趟之内解决,所以二阶段提交的第一阶段的 promise 落在了整个 log 上。这个情况下 发起者的 accept 一直顺序进行就可以,问题发生在有其他主(之前说到的会临时有多主的情况),又提交了 prepare,然后这个提交号更高,把后一段锁住了,accept 就会发现冲突学习新的值。

补充 1 持续发送 accept 请求,让所有的节点都同步这个 log。
补充 2 让 proposer 带上 firstUnchosenIndex 让 acceptor 知道大多数可以确认被选择的 log。

但是还有问题,例如下面这张图,对于所有小于 firstUnchosenIndex 的 i 来说,如果 accpetedProposal[i] 的提交号和 request 的 proposal 一样的话,就可以确认 log[i] 是被选择了的,并且标记 acceptedProposal[i] = 无限。但是 2.5 确实来自之前的 leader 的,貌似无法被标记为选择了。这需要我们进一步修改这个协议。

这个时候需要在 accept 返回的时候带上自己的 firtUnchosenIndex,如果 proposer 的比这个大,可以把 acceptor 直接补齐。用 success 命令,让 acceptor 直接更改index 的这个值, 并且继续返回 firtUnchosenIndex,让 proposer 弥补。

最后一部分是配置改变,degree of replication,也就是要保证同时只有一个 conf 生效,可以用 a 个之前的 log 做 conf,这样在使用这个 conf 之前,conf 已经发生了改变。但是同时限制住了并发度(只能为a)。直觉上讲就是约定一个 index 在某个时间点一次性切换。

参考文献

最新的 kubeadm 的设计文档在这里,如果把里面设计大概看一遍就能够理解里面的流程了,可以说设计的还是很缜密的,并且大大简化了 k8s 的运维工作。

kubeadm 安全设施

主要解释 kubeadm init 和 kubeadm join 的过程和实现

kubeadm init

  1. 首先进行 preflight-checks ,检查系统是否满足初始化的状态。
  2. 创建自签名的 CA,并且生成和签发各个 component 的私钥和证书 (/etc/kubernetes/pki),如果文件已存在就不会再生成了,比如要给 apiserver 添加域名可以重新签发一个证书,然后重启就好了。
  3. 写入各个服务的配置文件,以及一个 admin.conf (/etc/kubernetes/)
  4. 配置 kubelet 的动态配置加载 (disable by default)
  5. 配置静态 pod (/etc/kubernetes/manifests)
  6. 给 master 添加 taint 和 label,让其他 pod 默认不会运行在 master 上
  7. 生成用于让其他 kubelet 加入的 token
  8. 配置用 token 加入的可以自动确认 CSR(也就是用 CA 自动签 kubelet 的证书)
  9. 设置 kube-dns
  10. 检查 self-hosting,如果设置了,就把 static pod 转成 daemonset

是否使用外部 CA 的条件是,目录下有 CA 证书,但是没有 CA 私钥。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if res, _ := certsphase.UsingExternalCA(i.cfg); !res {

// PHASE 1: Generate certificates
if err := certsphase.CreatePKIAssets(i.cfg); err != nil {
return err
}

// PHASE 2: Generate kubeconfig files for the admin and the kubelet
if err := kubeconfigphase.CreateInitKubeConfigFiles(kubeConfigDir, i.cfg); err != nil {
return err
}

} else {
fmt.Println("[externalca] The file 'ca.key' was not found, yet all other certificates are present. Using external CA mode - certificates or kubeconfig will not be generated.")
}

具体的生成 PKI 相关的配置的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// CreatePKIAssets will create and write to disk all PKI assets necessary to establish the control plane.
// If the PKI assets already exists in the target folder, they are used only if evaluated equal; otherwise an error is returned.
func CreatePKIAssets(cfg *kubeadmapi.MasterConfiguration) error {

certActions := []func(cfg *kubeadmapi.MasterConfiguration) error{
CreateCACertAndKeyFiles,
CreateAPIServerCertAndKeyFiles,
CreateAPIServerKubeletClientCertAndKeyFiles,
CreateEtcdCACertAndKeyFiles,
CreateEtcdServerCertAndKeyFiles,
CreateEtcdPeerCertAndKeyFiles,
CreateEtcdHealthcheckClientCertAndKeyFiles,
CreateAPIServerEtcdClientCertAndKeyFiles,
CreateServiceAccountKeyAndPublicKeyFiles,
CreateFrontProxyCACertAndKeyFiles,
CreateFrontProxyClientCertAndKeyFiles,
}

for _, action := range certActions {
err := action(cfg)
if err != nil {
return err
}
}

fmt.Printf("[certificates] Valid certificates and keys now exist in %q\n", cfg.CertificatesDir)

return nil
}

列表中的函数都是用来生成所有证书和私钥的,主要依靠 k8s.io/kubernetes/cmd/kubeadm/app/phases/certs/pkiutil 来完成。私钥很容易生成,没什么需要特别配置的,主要看 CA 的证书里面配了啥,因为这个跟 k8s 的鉴权有关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// NewSelfSignedCACert creates a CA certificate
func NewSelfSignedCACert(cfg Config, key *rsa.PrivateKey) (*x509.Certificate, error) {
now := time.Now()
tmpl := x509.Certificate{
SerialNumber: new(big.Int).SetInt64(0),
Subject: pkix.Name{
CommonName: cfg.CommonName,
Organization: cfg.Organization,
},
NotBefore: now.UTC(),
NotAfter: now.Add(duration365d * 10).UTC(),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
IsCA: true,
}

certDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &tmpl, &tmpl, key.Public(), key)
if err != nil {
return nil, err
}
return x509.ParseCertificate(certDERBytes)
}

openssl x509 -in /etc/kubernetes/pki/ca.crt -text -noout 可以查看 ca 证书里面的内容,和我们看到的配置是一致的。

然后我们再看一下 apiserver 的私钥和证书是怎么签的,首先把生成的 CA 证书和私钥加载进来,然后生成私钥并且签出自己的证书。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func CreateAPIServerCertAndKeyFiles(cfg *kubeadmapi.MasterConfiguration) error {

caCert, caKey, err := loadCertificateAuthority(cfg.CertificatesDir, kubeadmconstants.CACertAndKeyBaseName)
if err != nil {
return err
}

apiCert, apiKey, err := NewAPIServerCertAndKey(cfg, caCert, caKey)
if err != nil {
return err
}

return writeCertificateFilesIfNotExist(
cfg.CertificatesDir,
kubeadmconstants.APIServerCertAndKeyBaseName,
caCert,
apiCert,
apiKey,
)
}

这里比较重要,因为 SAN 这个是用来匹配域名的,如果这里没写好,HTTPS 是拒绝访问的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// NewAPIServerCertAndKey generate certificate for apiserver, signed by the given CA.
func NewAPIServerCertAndKey(cfg *kubeadmapi.MasterConfiguration, caCert *x509.Certificate, caKey *rsa.PrivateKey) (*x509.Certificate, *rsa.PrivateKey, error) {

altNames, err := pkiutil.GetAPIServerAltNames(cfg)
if err != nil {
return nil, nil, fmt.Errorf("failure while composing altnames for API server: %v", err)
}

config := certutil.Config{
CommonName: kubeadmconstants.APIServerCertCommonName,
AltNames: *altNames,
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
}
apiCert, apiKey, err := pkiutil.NewCertAndKey(caCert, caKey, config)
if err != nil {
return nil, nil, fmt.Errorf("failure while creating API server key and certificate: %v", err)
}

return apiCert, apiKey, nil
}

apiserver 的证书也是对的,马赛克的部分是我自己配的一些 IP 和 域名,和 kubeadm 配的一些 kube-dns 用的 overlay 网络上的 master 域名,其他 master 上的 components 也是类似的,在配高可用的时候要把每个 master 节点上的域名和 IP 都配上。kubelet 的证书也要配对,就在 join 里面介绍了,这个会用来鉴权 node 的身份。

其他部分的代码和设计文档的描述是一致的,所以没什么好看的,感觉一个好的设计文档是非常重要的,代码只是设计的实现,如果设计本身就可读性很强,阅读代码只是辅助理解一些细节和找 BUG 用的而已。

kubeadm join

  1. 首先用 token 鉴权的方式获取 apiserver 的 CA 证书,并且通过 SHA256 验证。
  2. 加载动态配置,如果 master 上有的话。
  3. TLS 初始化,首先用 token 鉴权的方式把自己的 CSR 发给 apiserver,然后签发自己的证书,这个证书要用来检查 node 的身份的。
  4. 配置 kubelet 和 server 开始建立连接。

新版本的 kubelet 有些变动,支持把一些 featuregate 配置写到文件里面,比如这个阻止 fork bomb 的配置,新版本只能用配置文件写了,不能通过参数配置。

kubeadm 的详细过程如下:

首先会把 flags 传入 NodeConfiguration 中,开始 AddJoinConfigFlags(cmd.PersistentFlags(),如果没有 nodeNamecfg 默认用 host 的 name 并且小写化通过 GetHostname获得,和在宿主机上执行 hostname 是一致的。在初始化之前先 尝试启动 TryStartKubelet 过程,然后把 token 和 server 写入(证书 data 是怎么生成的?),先配置kubelet-bootstrap-config

整体流程如下,token 验证是基于 JWT 的,所以 token 的格式是 "^([a-z0-9]{6})\\.([a-z0-9]{16})$",分两部分,tokenID.tokenSecret

1
2
3
4
5
6
7
8
discovery.For
->GetValidatedClusterInfoObject 这个是获取 CA 证书的过程
->token.RetrieveValidatedClusterInfo 这一步是 JWT 验证
->tokenutil.ParseToken 得到 tokenID 和 tokenSecret
->pubKeyPins.Allow 加载用于检验 CA 证书的 HASH 值
->buildInsecureBootstrapKubeConfig (用 token-bootstrap-client 身份)获取信息
-> 从kube-public 获取 configmap (和 kubectl describe configmap cluster-info -n kube-public 中的信息是一致的)这个configmap 里面包含 JWS 签名和 master 的 CA 证书,获取对应 tokenID 的 jws token,验证 token 成功,并且验证 CA 的证书 hash,如果通过说明这个 master 是可信的,然后拿到 CA 证书以后开始构建自己的证书,建立 secure config。
->

kubeconfigutil.WriteToDisk 会把 bootstrap-kubelet.conf 写入到配置目录中,kubeadm 的任务就完成了,之前的 kubeadm 会代替 kubelet 生成 kubelet.conf(其中用的是公钥鉴权),现在移走了,kubelet 启动的时候会尝试使用这个配置文件建立 HTTPS 的鉴权配置文件。

可以看一下 kubelet 用这个配置给 master 发 CSR 以后得到了什么,可以看到是生成了自己的证书的。

并且这个证书里面的 SAN 也是用来进行 Node 鉴权的身份确认的信息。用 openssl x509 -in /var/lib/kubelet/pki/kubelet-client.crt -text -noout查看信息。

红线部分就是 Node 鉴权的信息,基于这个确认 node 的身份。另外一个是 Role Based Access Control,那个主要是配给 pod,限制 pod 行为用的,类似于 linux 的 chmod

kubeadm 高可用配置

如果没有安全配置,其实高可用挺好配置的,现在加入了安全配置,虽然麻烦,但是还是能理解 k8s 的良苦用心的,理解了整个鉴权的过程以后就可以做,现在支持配置文件初始化其实更好配置了。可以基于这个文档配置,算了我还是不总结了,看懂上面的,照着配置就好了。主要是自己生成 CA,他们用的 cfsslcfssljson,这个工具比 openssl 好用一点,比较容易配置。生成 ca 证书和私钥以后,就可以构建 HTTPS 的 etcd。

先创建 ca 的配置文件 ca-config.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
"signing": {
"default": {
"expiry": "43800h"
},
"profiles": {
"server": {
"expiry": "43800h",
"usages": [
"signing",
"key encipherment",
"server auth",
"client auth"
]
},
"client": {
"expiry": "43800h",
"usages": [
"signing",
"key encipherment",
"client auth"
]
},
"peer": {
"expiry": "43800h",
"usages": [
"signing",
"key encipherment",
"server auth",
"client auth"
]
}
}
}
}

然后生成用于自签名的 csr 的配置文件 ca-csr.json,用于签发自签名的 CA 证书。

1
2
3
4
5
6
7
{
"CN": "etcd",
"key": {
"algo": "rsa",
"size": 2048
}
}

结果就是目录下面生成了,ca-key.pemca.pem,这个命令不太按规则,对应的叫 ca.keyca.crtpem 是密钥保存的格式。

接下来用 client.json 获得自己的私钥和通过 ca 签的证书。

1
2
3
4
5
6
7
{
"CN": "client",
"key": {
"algo": "ecdsa",
"size": 256
}
}

执行 cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=client client.json | cfssljson -bare client,生成了 client-key.pem,client.pem,分别是私钥和证书,把文件 ca.pem,ca-key.pem, client.pem,client-key.pem,ca-config.json, 拷贝到每台 master 机器上面,一般放到 /etc/kubernetes/pki/ 下面。

然后

1
2
3
4
5
6
7
cfssl print-defaults csr > config.json
sed -i '0,/CN/{s/example\.net/'"$PEER_NAME"'/}' config.json
sed -i 's/www\.example\.net/'"$PRIVATE_IP"'/' config.json
sed -i 's/example\.net/'"$PEER_NAME"'/' config.json

cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=server config.json | cfssljson -bare server
cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=peer config.json | cfssljson -bare peer

签出出两对密钥和证书,把每台的机器的 域名 和 IP 替换掉示例的 example 配置,就是这些地方可以改成自己的域名和地址,这个会被用来 check。

可以放到 kubelet 的 static pod 里面,启动 etcd,两个证书分别是用作 server 验证和 client 验证的,server 让别人访问的时候相信你,peer 是你访问别人的时候让别人相信你。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
cat >/etc/kubernetes/manifests/etcd.yaml <<EOF
apiVersion: v1
kind: Pod
metadata:
labels:
component: etcd
tier: control-plane
name: <podname>
namespace: kube-system
spec:
containers:
- command:
- etcd --name ${PEER_NAME} \
- --data-dir /var/lib/etcd \
- --listen-client-urls https://${PRIVATE_IP}:2379 \
- --advertise-client-urls https://${PRIVATE_IP}:2379 \
- --listen-peer-urls https://${PRIVATE_IP}:2380 \
- --initial-advertise-peer-urls https://${PRIVATE_IP}:2380 \
- --cert-file=/certs/server.pem \
- --key-file=/certs/server-key.pem \
- --client-cert-auth \
- --trusted-ca-file=/certs/ca.pem \
- --peer-cert-file=/certs/peer.pem \
- --peer-key-file=/certs/peer-key.pem \
- --peer-client-cert-auth \
- --peer-trusted-ca-file=/certs/ca.pem \
- --initial-cluster etcd0=https://<etcd0-ip-address>:2380,etcd1=https://<etcd1-ip-address>:2380,etcd2=https://<etcd2-ip-address>:2380 \
- --initial-cluster-token my-etcd-token \
- --initial-cluster-state new
image: k8s.gcr.io/etcd-amd64:3.1.10
livenessProbe:
httpGet:
path: /health
port: 2379
scheme: HTTP
initialDelaySeconds: 15
timeoutSeconds: 15
name: etcd
env:
- name: PUBLIC_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: PRIVATE_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: PEER_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- mountPath: /var/lib/etcd
name: etcd
- mountPath: /certs
name: certs
hostNetwork: true
volumes:
- hostPath:
path: /var/lib/etcd
type: DirectoryOrCreate
name: etcd
- hostPath:
path: /etc/kubernetes/pki/etcd
name: certs
EOF

然后每个节点的 master init 的配置文件按照下面这个配置,client.pem 是用来让 etcd 相信 apiserver 的。private-ipload-balancer-ip 都是要写到证书的 SAN 里面的,不然用这些 ip 是访问不了 apiserver 的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: kubeadm.k8s.io/v1alpha1
kind: MasterConfiguration
api:
advertiseAddress: <private-ip>
etcd:
endpoints:
- https://<etcd0-ip-address>:2379
- https://<etcd1-ip-address>:2379
- https://<etcd2-ip-address>:2379
caFile: /etc/kubernetes/pki/etcd/ca.pem
certFile: /etc/kubernetes/pki/etcd/client.pem
keyFile: /etc/kubernetes/pki/etcd/client-key.pem
networking:
podSubnet: <podCIDR>
apiServerCertSANs:
- <load-balancer-ip>
apiServerExtraArgs:
apiserver-count: "3"
EOF

至于怎么做 loadbalance 可以用七层的也可以用四层的,七层把证书配到负载均衡服务上,四层的就不用,自己在裸机器上做可以用 vip + nginx 做一个四层的,也可以把证书放到 nginx 上做七层的,但是在云环境下,都不怎么支持自己配置 vip,需要用云厂商的 lb 服务,这个就看具体提供商的服务怎么配了。

首先公钥念“gongyue”,而不是公钥,拼音打字打多了就发现得这么读。

HTTPS 是基于 SSL/TLS 的安全 HTTP 协议,其实 HTTPS 主要还是看安全套接字层,在这之上还是一个 HTTP 协议,这里主要总结一下我们现在主流用到的加密体系,比如我们经常看到的 .pem.crt.key.csr 还有 CA 啊之类的是啥东西,并且这些东西都如何工作和应用的。

在密码学里面,有几个角色,类似于中国的甲乙丙丁,一个是 Alice 和 Bob,这是正常通信的两个人,还有一个是 Eve,是信道上具备窃听能力的人,另外一个是 Mallory,这个人可以妨碍网络流量,主动攻击。

对称加密

其实最简单的加密算法就是对称加密

Alice 和 Bob 有一个共有的密码,也就是只有两个人拥有,相互传输的密文只能通过这个密钥解开。

首先加密需要基于密钥,凯撒密码可不可以,可以是可以,但是被人知道算法以后,就可以被所有人破解,并且优秀的加密算法应该被人验证,如果不是公开的算法就没有办法验证,当然一些保守的加密方法会不公开,这样其实也很难解密,但是互联网的加密协议很显然是用在千家万户的,所以一定要是经得起推敲的加密算法。

分组密码

分组密码的作用是,一般会用 128 位一个分组,这样加密的好处是即使一个小的变化也会导致输出大量的变化,这样攻击者很难通过出现频率分析加密方式(比如 HTTP 开头都是一样的,所以使用顺序加密,很容易用 HTTP 的通用开头做输入得出加密方法)。但是分组密码的小影响会导致大改变的特性导致攻击者没办法这么做。

(如果攻击者把有的流量都记录下来,等有一天通过方法获得密钥就能解开这些数据了,可能是未来算力提高,或者通过法律手段,斯诺登的加密信箱就是 FBI 让加密邮箱公司强制提供的)

哈希函数

哈希函数其实很熟悉了,解释一下 MAC。

MAC

MAC 是 message authetication code,是密钥的哈希函数,因为普通哈希函数,如果 Mallory 可以直接用假的数据用哈希算出结果发给 Bob,缺少身份验证,MAC 就是带密钥的哈希函数,HMAC 其实就是把密钥和消息组合在一起的协议。

非对称加密

非对称加密又叫做公钥加密,对称加密固然好,但是对称密钥在团体中使用的话,大家都要共享,密钥给来给去的很容易出问题。对称加密就没有这个问题,可以方便传播,对称密钥分私钥和公钥,一个用于加密一个用于解密,私钥加密的数据只能用公钥解密。公钥的出现使得密钥可以大范围传播。

数字签名

数字签名主要验证消息的真实性,对消息进行验证。主要是对消息进行哈希,然后用私钥加密,追加到文档中做身份验证,这样用公钥解的开的话就能证明消息的发送者。

TLS 的具体协议

PKI 公钥基础设施

PKI 主要是用来保证公钥的可信,通过中间的权威机构(CA)签发的公钥也就是证书才能被认可是合法的证书。

证书

证书包含了版本、序列号、签名算法、颁发者、有效期、使用者、公钥。证书也有证书链,比如 root CA 可以签发 中间 CA 的证书。根证书一般是跟着操作系统一起造就装好了的,大公司的操作系统和浏览器都有自己的根证书库,自带就在电脑上。

应用

RSA 是目前最广泛应用的密钥算法,破解 1024 位的 RSA 密钥的成本大约是 1000 万人民币,现在一般用 2048 位的 RSA,基本无解。

其实公钥体系已经很完全了,但是大部分出问题都情况是私钥泄漏,管理自己的私钥非常重要。

1. 不要用 CA 生成的私钥,尽量自己生成。
2. 不要用刚开机的机器生成随机数,这台机器获得的外界熵不够多。
3. 定期更换私钥
4. 不要随意传播私钥
5. 安全存储私钥,这个主要是中间 CA 可能要用,普通服务直接换私钥就可以。

下面就用 OpenSSL 进行一些实验。

首先我们生成一个 RSA 密钥,前面提到了最好用 2048 位的,用 AES-128 算法来加密保存,会让你输入密码,这个密钥基于这个密码保存,这个文件的格式叫 PEM 所以如果看到 PEM 格式的文件就知道是私钥了。

1
2
3
4
5
6
7
$openssl genrsa -aes128 -out test.key 2048
Generating RSA private key, 2048 bit long modulus
...+++
.............................+++
e is 65537 (0x10001)
Enter pass phrase for test.key:
Verifying - Enter pass phrase for test.key:

可以看这个文件,看起来一通乱七八糟的东西,但是根据开头的信息,我们有办法用密码把私钥解析出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$cat test.key
-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: AES-128-CBC,52551A2438582E22358335433B7BAEE0

GiVITGQMbkGxzBYestzFNX4KMUgP84A2p49mOBozQWkAol+zU5llumFXrj/bzATq
OiH01+UN7sqKXD+lNwXsvL2bhhWFGe4h80CbCaVhPYnAtNtxU4HNTbAnCFZPmTHg
wC5JCHlJvt6TGIOeOeyTj9qnCeIKd1XJGYypG8syzzaKuNbSwlKN8DrUKGJP/w/8
gCPKT2AA2l53ysxBI6i2jHAxQSs1Y+K1jFrZjgObT3QDN4eqT1Io/waSDAiB8tkl
rW35ZYO0Toe3iJgTOp325v5dvC6mCOvL0QAQWDz7y239l8fAdyDJUO2tSzjXfIii
FM12NaHfw5m+GcT6brqlwbAOL6BSMX8Q+Dj/fdeDoPOF+pKnG4AGW3LPEY6LJgq0
pkExHJ5cl/nEL2Q2H3yekvPjW40XDZfjyQSivwEJGhsmAq3+tfis/7h5f5Rg/2yh
Vb8kkPQTu0SofS1VFkSQN6iJe5A3imjMkacKDoKtafORR1lWu0noaOqr757orcgI
hK847ijdFdwjstycrGBdN1INr0Yx/FnyaPYNR2XZhWD4uQwZ1tMVge5duw8jgHrH
ZwCeBAgfpSynbMy/1GTru1sO2T6qpZj9pfao59jVcN1fka+YKI5oEdxFFc97wcgk
XVX9lVrcPrI0bMyuhje5Vi04HbOxpd4GmtnNPKTwtmMbDsSVVIX+hlFltSBl+0in
PsKlUeBGjAmUapT3x1v7OP5K7K8YvszHMehbctDS2E9bZstCwhnsMog3b+Jxhw40
gXsOc6Vb3kJljkPXu6k7qGGkqzVqUuUMSaWlE87s5Cm4ZyS8c5IPQvmQk4S/1AJP
v5sD3TObRlJAIw0MEItPY4daQBMyIXPr+UXUAKfMoFK8bXG9aNpACKm/pQ1pVMj1
eE0+lzQ1UZUVM0GotBZGce+TtbrU/I/dbhGLA2KKyoohsCSH2yV+wGIMrmmbUiqH
a46FYwJLtFdDZ9m3ZVj1KCMza9/B2ylIetCX98C3/fVd81L3rxpSmbpWvzWYhw+Y
205t8p26WUloAQrkP+kqw0HDsiDeEU2QilvNwtA5qlfd8666rJEJpBg50jM8Sb0R
JYsG8Mc2a9CNpXt5pVi2kHdoRRkiSeGDh9xTnOlvnCc77p4MMlvcC3bqHwGe85ON
0q5xVJMyLwzAXR82X0FunRiiDNBO3Pg6k+UBALmLNrgieet57Jdva90OnkEA0Ihg
sYHHnfsMMJ6EnW4mNov8tdVi7mExK8K+RRe1vu+zAqzx/UuKkjekW0NdzQjHv4Hs
mX/9XzVE2cLG9GLWVojyTS5XUg16NBn7qM3HRQmQoAgXHlfCJUx33xMjTL3glbNH
65jpvG1S7l6U45BGMI3d2eQ0XfiSUHOHh/zHOIGRl7JcHyoK082IzivPbHkrlolv
o1E+4sgpFc09rQYgcW2cb6Nr3H3aYOlC86iPn4Ecxj8M/wznH/JNjjfoKKhmFWcU
EtmrreXjR8QlUCXCHZCA3DOi/MujcPU0qGZkaz+Ttii4NACZoHpedL+XQ9EcXci7
xkFO4NxsezDunpvXNHgkzES+as5lNxSa4wC4tUbC9h4zhgbeiOGBSTL8Aq0MxwX3
-----END RSA PRIVATE KEY-----

解析私钥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
$openssl rsa -text -in test.key
Enter pass phrase for test.key:
Private-Key: (2048 bit)
modulus:
00:a0:18:a6:70:ce:65:64:ea:f1:43:30:35:ff:cf:
b4:5a:8e:20:52:04:84:bf:bf:3a:b4:a7:65:94:a5:
df:b5:14:09:61:ba:79:8a:43:d1:fe:cc:d9:96:d3:
81:5b:82:d7:63:e1:e9:6f:30:5e:b0:3f:fe:65:c4:
e1:d5:b2:e3:ce:ba:fc:7b:5e:b2:83:f0:9f:3f:c2:
15:39:b4:fa:e7:3c:ff:42:96:e7:a6:7a:29:e1:ba:
0b:c7:99:aa:ac:07:2a:2b:74:b3:f8:10:8d:0f:91:
44:a4:fa:48:c1:aa:88:0e:86:ff:c1:da:59:c8:dd:
32:d5:5b:15:f4:80:3e:2f:d7:d3:92:09:63:54:d0:
01:46:78:cd:5c:5d:f1:1c:ad:a7:ab:84:5c:86:e1:
25:69:6d:6c:6c:df:90:5f:af:ca:6f:43:17:50:05:
b0:77:3d:92:e9:e7:4c:66:c3:58:08:96:60:7a:16:
02:d3:6f:56:cb:df:41:69:eb:83:f3:28:b7:82:0a:
c2:c6:b4:a3:6e:f1:2d:7f:ec:ea:87:7b:94:4b:8b:
b8:e1:72:0d:00:c1:8d:9f:cc:03:32:de:74:6e:26:
29:0b:4f:f4:41:93:1c:9c:ae:22:41:81:71:b6:9c:
8c:17:15:63:d5:86:ce:74:b2:99:fb:7f:ff:37:c8:
03:7b
publicExponent: 65537 (0x10001)
privateExponent:
1a:ae:40:fe:c7:c6:ea:1c:a5:7c:97:0a:48:c9:aa:
ba:f4:b8:ba:32:7a:95:22:1f:7c:7f:f1:53:e6:98:
f3:aa:95:2d:ae:50:17:14:da:68:66:67:54:d5:86:
d7:63:64:d6:06:8e:4a:b3:7a:f4:50:95:eb:0b:f6:
bf:10:83:1a:ae:da:e9:0c:8d:1f:a3:f8:46:3d:e8:
1f:a7:e3:b0:a9:df:b8:8f:41:a7:e2:f0:1b:e8:4f:
92:42:2f:c9:5f:a0:4d:81:b3:84:81:ed:a0:4c:8b:
6e:1b:30:08:e6:8c:aa:2f:21:6c:83:21:37:72:75:
c8:4c:d7:c9:d9:9d:83:87:67:05:4d:6d:28:72:71:
63:3b:b6:82:f8:42:0f:94:af:f2:b1:d8:c5:d3:3f:
50:bf:13:61:b2:0b:de:6b:34:42:cd:29:27:04:c9:
ff:49:14:75:d0:d5:e5:5c:4b:29:a1:95:c3:c5:e5:
34:46:9e:81:d4:9d:c3:c4:06:c9:96:90:39:90:fb:
db:06:77:fa:46:73:38:60:0e:e3:40:7b:d0:5d:a0:
97:0d:6e:0b:39:d6:99:63:a6:ee:67:b7:94:35:e2:
63:cf:02:a1:eb:0a:f0:50:99:6f:30:ae:6b:ef:1e:
14:a0:1a:f4:8e:ed:cd:81:bf:3d:2b:9d:b5:9e:b8:
21
prime1:
00:cc:ff:e2:f4:39:5d:33:de:96:15:e6:7c:d2:e6:
a3:56:a9:6a:09:0c:e9:26:94:36:41:92:b9:db:c9:
09:20:28:9d:bc:c6:76:60:88:93:97:81:16:86:da:
4d:65:0e:87:ec:ef:15:6d:c9:06:f7:99:12:eb:4a:
a6:7e:49:9d:1a:68:ca:35:57:5c:4b:2f:32:2e:e4:
76:87:a5:02:94:27:1a:1f:38:28:58:77:68:2d:5d:
fa:c2:fd:c4:09:80:e4:eb:14:84:cc:73:06:96:4b:
08:8e:da:1c:55:38:8d:8c:7f:19:01:fa:54:b3:62:
d4:cb:4c:df:01:e0:02:d8:c3
prime2:
00:c7:ec:f1:2b:83:26:d1:35:e3:55:00:3e:a9:7d:
2e:f0:68:4d:27:77:3f:d5:1c:99:ef:a0:98:3c:fd:
fd:d7:8f:51:f5:82:e7:8f:37:34:a7:1a:1f:c4:83:
44:ab:11:62:54:7a:5e:5c:a4:7f:d6:dd:f8:45:3c:
b6:bc:1e:b5:56:df:60:65:66:aa:43:82:f5:7a:7c:
72:3b:3d:fe:33:d4:27:b2:c5:9a:07:36:b4:ca:bc:
1d:a7:7a:5f:9c:1a:75:2b:2c:57:97:5a:b8:a9:de:
0e:8a:8c:84:ff:51:e9:12:e9:d4:8b:bf:de:5f:98:
52:9c:08:55:42:e1:70:be:e9
exponent1:
15:76:dd:7e:90:db:0f:69:48:f1:b6:16:6f:c6:b2:
67:8a:89:8d:b5:0a:5c:7d:bc:48:95:62:5c:7e:ea:
33:b1:cd:02:4d:0d:6c:02:20:e2:06:24:23:ae:8b:
d7:fe:f3:80:7d:70:12:f4:af:84:11:45:07:d9:e3:
20:e9:f8:47:21:9d:ba:84:11:27:d6:23:3d:01:b2:
df:75:09:96:15:9a:08:96:ca:b2:a8:9e:01:d2:0b:
45:8b:68:91:4e:2b:a9:e9:96:16:0a:1d:30:73:5e:
cc:06:4e:5d:25:f4:bc:37:3a:99:18:6a:f1:f5:71:
2e:70:38:11:6c:31:20:1d
exponent2:
00:a6:f0:8f:21:4a:4e:6b:7b:97:ec:2e:5c:24:a2:
c7:43:2f:94:dd:53:92:15:9d:e0:5c:5b:b9:43:94:
c3:15:f0:32:fb:d2:e7:10:8b:84:87:d4:24:9a:af:
11:f3:d6:7c:49:16:35:1d:1e:af:30:f8:00:8b:af:
fa:d6:72:bd:f1:60:6c:d9:bf:34:85:53:21:2f:ba:
22:98:9d:57:5a:67:d9:0e:4a:3a:27:b3:e2:9b:37:
21:7b:eb:8f:52:86:35:38:6b:ba:68:43:f4:d6:c2:
f9:59:6f:a4:ce:9d:d3:05:5c:03:82:fe:1f:ed:aa:
ff:b0:12:b5:3f:37:88:31:a1
coefficient:
09:97:9e:dc:20:fe:c5:e2:34:47:d8:64:de:bb:ad:
70:65:4d:08:49:c8:cf:28:40:f6:87:43:09:c9:63:
bc:d8:cd:11:53:78:ba:ad:1a:f0:8b:e7:fa:1c:5f:
c9:9d:5f:ae:e1:2a:7f:87:7a:7f:1a:e3:c8:b5:8d:
eb:b2:af:18:c6:1e:07:43:f0:e7:be:4e:bc:c6:1b:
77:b8:43:36:58:3a:b5:8a:2c:f7:76:37:c7:97:4c:
8c:fd:47:71:09:f8:76:fe:8d:0f:e1:3a:30:56:5c:
2b:70:60:9d:fa:53:74:8a:db:b9:04:78:ce:1c:1d:
28:ca:78:81:53:07:de:5e
writing RSA key
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAoBimcM5lZOrxQzA1/8+0Wo4gUgSEv786tKdllKXftRQJYbp5
ikPR/szZltOBW4LXY+HpbzBesD/+ZcTh1bLjzrr8e16yg/CfP8IVObT65zz/Qpbn
pnop4boLx5mqrAcqK3Sz+BCND5FEpPpIwaqIDob/wdpZyN0y1VsV9IA+L9fTkglj
VNABRnjNXF3xHK2nq4RchuElaW1sbN+QX6/Kb0MXUAWwdz2S6edMZsNYCJZgehYC
029Wy99BaeuD8yi3ggrCxrSjbvEtf+zqh3uUS4u44XINAMGNn8wDMt50biYpC0/0
QZMcnK4iQYFxtpyMFxVj1YbOdLKZ+3//N8gDewIDAQABAoIBABquQP7HxuocpXyX
CkjJqrr0uLoyepUiH3x/8VPmmPOqlS2uUBcU2mhmZ1TVhtdjZNYGjkqzevRQlesL
9r8Qgxqu2ukMjR+j+EY96B+n47Cp37iPQafi8BvoT5JCL8lfoE2Bs4SB7aBMi24b
MAjmjKovIWyDITdydchM18nZnYOHZwVNbShycWM7toL4Qg+Ur/Kx2MXTP1C/E2Gy
C95rNELNKScEyf9JFHXQ1eVcSymhlcPF5TRGnoHUncPEBsmWkDmQ+9sGd/pGczhg
DuNAe9BdoJcNbgs51pljpu5nt5Q14mPPAqHrCvBQmW8wrmvvHhSgGvSO7c2Bvz0r
nbWeuCECgYEAzP/i9DldM96WFeZ80uajVqlqCQzpJpQ2QZK528kJICidvMZ2YIiT
l4EWhtpNZQ6H7O8VbckG95kS60qmfkmdGmjKNVdcSy8yLuR2h6UClCcaHzgoWHdo
LV36wv3ECYDk6xSEzHMGlksIjtocVTiNjH8ZAfpUs2LUy0zfAeAC2MMCgYEAx+zx
K4Mm0TXjVQA+qX0u8GhNJ3c/1RyZ76CYPP39149R9YLnjzc0pxofxINEqxFiVHpe
XKR/1t34RTy2vB61Vt9gZWaqQ4L1enxyOz3+M9QnssWaBza0yrwdp3pfnBp1KyxX
l1q4qd4OioyE/1HpEunUi7/eX5hSnAhVQuFwvukCgYAVdt1+kNsPaUjxthZvxrJn
iomNtQpcfbxIlWJcfuozsc0CTQ1sAiDiBiQjrovX/vOAfXAS9K+EEUUH2eMg6fhH
IZ26hBEn1iM9AbLfdQmWFZoIlsqyqJ4B0gtFi2iRTiup6ZYWCh0wc17MBk5dJfS8
NzqZGGrx9XEucDgRbDEgHQKBgQCm8I8hSk5re5fsLlwkosdDL5TdU5IVneBcW7lD
lMMV8DL70ucQi4SH1CSarxHz1nxJFjUdHq8w+ACLr/rWcr3xYGzZvzSFUyEvuiKY
nVdaZ9kOSjons+KbNyF7649ShjU4a7poQ/TWwvlZb6TOndMFXAOC/h/tqv+wErU/
N4gxoQKBgAmXntwg/sXiNEfYZN67rXBlTQhJyM8oQPaHQwnJY7zYzRFTeLqtGvCL
5/ocX8mdX67hKn+Hen8a48i1jeuyrxjGHgdD8Oe+TrzGG3e4QzZYOrWKLPd2N8eX
TIz9R3EJ+Hb+jQ/hOjBWXCtwYJ36U3SK27kEeM4cHSjKeIFTB95e
-----END RSA PRIVATE KEY-----

这个私钥是我自己生成的,没用来干什么,所以直接展示了,但是生产环境的私钥要妥善保管,现在我们生成共钥,输入密码,读取密钥,然后 -pubout 表示生成公钥。

1
2
3
4
5
6
7
8
9
10
11
12
13
$openssl rsa -in test.key -pubout -out test-public.key
Enter pass phrase for test.key:
writing RSA key
$cat test-public.key
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoBimcM5lZOrxQzA1/8+0
Wo4gUgSEv786tKdllKXftRQJYbp5ikPR/szZltOBW4LXY+HpbzBesD/+ZcTh1bLj
zrr8e16yg/CfP8IVObT65zz/Qpbnpnop4boLx5mqrAcqK3Sz+BCND5FEpPpIwaqI
Dob/wdpZyN0y1VsV9IA+L9fTkgljVNABRnjNXF3xHK2nq4RchuElaW1sbN+QX6/K
b0MXUAWwdz2S6edMZsNYCJZgehYC029Wy99BaeuD8yi3ggrCxrSjbvEtf+zqh3uU
S4u44XINAMGNn8wDMt50biYpC0/0QZMcnK4iQYFxtpyMFxVj1YbOdLKZ+3//N8gD
ewIDAQAB
-----END PUBLIC KEY-----

创建证书需要发起 CSR(certificate signing request),到 CA 那里,这个 csr 包含了申请者的信息和申请者的公钥。下面就是创建 csr 的命令,其中比较重要的是要配置好 Common Name,这个会拿来和访问的 host 进行匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$openssl req -new -key test.key -out test.csr
Enter pass phrase for test.key:
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) []:CN
State or Province Name (full name) []:SH
Locality Name (eg, city) []:SH
Organization Name (eg, company) []:.
Organizational Unit Name (eg, section) []:.
Common Name (eg, fully qualified host name) []:www.example.com
Email Address []:

Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:1234

可以查看里面的信息是否正确,req 表示处理 csr 文件,-text 一般是用于展示文件内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
$openssl req -text -in test.csr -noout
Certificate Request:
Data:
Version: 0 (0x0)
Subject: C=CN, ST=SH, L=SH, CN=www.example.com
Subject Public Key Info:
Public Key Algorithm: rsaEncryption
Public-Key: (2048 bit)
Modulus:
00:a0:18:a6:70:ce:65:64:ea:f1:43:30:35:ff:cf:
b4:5a:8e:20:52:04:84:bf:bf:3a:b4:a7:65:94:a5:
df:b5:14:09:61:ba:79:8a:43:d1:fe:cc:d9:96:d3:
81:5b:82:d7:63:e1:e9:6f:30:5e:b0:3f:fe:65:c4:
e1:d5:b2:e3:ce:ba:fc:7b:5e:b2:83:f0:9f:3f:c2:
15:39:b4:fa:e7:3c:ff:42:96:e7:a6:7a:29:e1:ba:
0b:c7:99:aa:ac:07:2a:2b:74:b3:f8:10:8d:0f:91:
44:a4:fa:48:c1:aa:88:0e:86:ff:c1:da:59:c8:dd:
32:d5:5b:15:f4:80:3e:2f:d7:d3:92:09:63:54:d0:
01:46:78:cd:5c:5d:f1:1c:ad:a7:ab:84:5c:86:e1:
25:69:6d:6c:6c:df:90:5f:af:ca:6f:43:17:50:05:
b0:77:3d:92:e9:e7:4c:66:c3:58:08:96:60:7a:16:
02:d3:6f:56:cb:df:41:69:eb:83:f3:28:b7:82:0a:
c2:c6:b4:a3:6e:f1:2d:7f:ec:ea:87:7b:94:4b:8b:
b8:e1:72:0d:00:c1:8d:9f:cc:03:32:de:74:6e:26:
29:0b:4f:f4:41:93:1c:9c:ae:22:41:81:71:b6:9c:
8c:17:15:63:d5:86:ce:74:b2:99:fb:7f:ff:37:c8:
03:7b
Exponent: 65537 (0x10001)
Attributes:
challengePassword :unable to print attribute
Signature Algorithm: sha256WithRSAEncryption
33:83:fa:d3:a1:7d:1b:5c:cc:cb:b1:19:99:79:e4:b8:29:fc:
0e:ac:e6:40:f5:13:f0:d7:f7:2b:67:d4:32:39:78:3f:0b:f0:
5e:2c:f4:5c:c1:14:f0:f7:82:5d:1e:c5:bf:00:3e:87:d2:b5:
ed:a7:46:75:70:da:db:53:f1:19:37:15:63:09:63:a8:4d:74:
19:ed:c5:3a:50:7b:db:5a:68:f0:88:37:54:23:0d:bb:4d:c3:
b6:1a:3f:1d:93:24:17:f3:c5:66:c8:9c:43:67:e8:3b:cc:48:
20:8e:9e:da:a6:a0:48:90:6d:b1:bc:ff:0d:39:62:7b:8c:5c:
cb:ec:ce:e1:de:0c:f3:5b:51:3e:5c:ab:ad:6f:f5:96:9c:e5:
12:9e:1b:a7:27:90:fe:d3:9f:f9:c2:9d:7e:b5:62:ac:f9:45:
33:6a:a7:b5:c2:ab:b7:18:a8:a6:91:15:26:27:a4:c9:84:26:
88:85:3e:68:99:8c:f4:c6:32:8d:61:71:83:cb:86:96:92:2e:
c7:bc:76:e0:59:82:e8:fe:47:39:da:f0:57:72:f7:59:c4:ba:
7a:51:23:13:bc:8c:75:07:d7:2d:cf:2b:69:07:20:80:27:6d:
6d:ae:cb:27:5d:ef:0c:92:99:a4:02:45:5b:58:ac:e9:71:1e:
ee:5f:54:78

可以看到里面的信息,还有签名算法,以及公钥等等。

我们可以用自己的私钥给自己签名,比如 x509 是证书的格式。

1
2
3
4
5
$openssl x509 -req -days 365 -in test.csr -signkey test.key -out test.crt
Signature ok
subject=/C=CN/ST=SH/L=SH/CN=www.example.com
Getting Private key
Enter pass phrase for test.key:

也可以把两步结合起来,直接创建自签名的证书,openssl req -new -x509 -days 365 -key test.key -out test.crt,如果不想要交互式的可以直接

1
2
openssl req -new -x509 -days 365 -key test.key -out test.crt \
-subj "/C=CN/L=BJ/O=HaiDian/CN=www.example.com"

CN 只能写一个,虽然可以写泛域名,但是要支持多个域名可以通过扩展字段 SAN(Subject Alternative Name)来解决。

我的一点观点

我个人觉得 nvidia 的 CUDA 太封闭了,不是很能明白这样封闭的产品怎么能够长久生存,仅仅是因为家大业大么,如果有家公司推出了八成性能的 GPU,但是整套开发的生态非常友好,是不是会像 Android 取代诺基亚一样,还是 nvidia 就是苹果,就算封闭环境也能保证强劲体验,这我也不好说了。

安装

CUDA+cudnn 装起来挺麻烦的,反正如果有错误的话,可以检查 CUDA samples 里面的 deviceQuery 是否成功,如果不成功可以用 strace 看一下少了什么东西,再想办法安装上去。检查 cudnn samples 是否成功也是一样的,里面有一个 mnistDNN 的例子。

Hello World!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <stdio.h>

__global__ void helloFromGPU (void)
{
printf("Hello world!\n");
}

int main(void)
{
printf("Hello World! from CPU\n");

helloFromGPU <<<1, 10>>>();
cudaDeviceReset();
return 0;
}

以上就是一段 GPU 的 Hello world,执行下面的代码可以看到我们在 GPU 上并行执行了十个 “Hello world!” 的打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
$ nvcc -arch sm_20 hello.cu -o hello
$ ./hello
Hello World! from CPU
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!
Hello world!

CUDA

CUDA 全称 Compute Unified Device Architecure,用于定义 GPU 的架构标准。GPU 的工作方式主要依赖于多核心的并行计算,CUDA 提供了方便的模型进行这种模式的编程,下面就会简单介绍一下 CUDA 的架构以及基于 GPU 的编程。

CUDA 一次启动的线程称为网格(grid),网格中包含块,每个块包含新程,是一个二维的模式,这张图片就说明得很清晰,首先是由 Grid 组成,然后每个 Block 有 shared memory,同 block 的线程可以访问 shared memory,不同的 Block 的线程只能访问全局的内存,这种结构也方便设计和实现并行算法。

cuda-arch

CUDA 的编译器 nvcc 是 gcc 的一个扩展,支持编写运行在 GPU 上的函数,其中的,<<x, y>> 扩展就是用来指定 block 和 thread 的数量的。

比如典型的俩个向量相加的例子:

1
2
3
4
void sumArrayOnHost(float *A, float *B, float *C, const int N) {
for (int i = 0; i < N; i++)
C[i] = A[i] + B[i]
}

但是 GPU 的核函数怎么写呢,是这样的。

1
2
3
4
__global__ void sumArrayOnGPU(float *A, float *N, float C) {
int i = threadIdx.x
C[i] = A[i] + B[i]
}

然后,如果 N 是 32 的话,可以如下调用,__global__ 表示这个函数可以在 host 调用也可以在 GPU 上调用,用个 32 个线程的 block 算这个向量和,N 隐性包含在了定义中。

1
sumArrayOnGPU<<1, 32>>(float *A, float *B, float *C)

线程组织

基于 CPU 和 GPU 的异构计算平台可以优势互补,CPU 负责处理逻辑复杂的串行程序,而 GPU 重点处理数据密集型的并行计算程序,从而发挥最大功效。

线程组织主要依据网格(现在网络这个词其实比较容易混淆,可以是神经网络可以是计算机网络,这里指的是线程的组织形式)模型。主要分 grid, block, thread,组织的方式就靠索引来决定,可以通过 block 索引和 thread 索引进行线程的定位。CUDA 提供了块内线程同步的方法,但是没有提供块间同步的原语。

线程束分化

线程束 = warp

CPU 有很强的分支预测的能力,会预加载指令,如果预测正确,执行代价就很小,但是 GPU 在这方面是很弱的,因为线程束当中如果执行分支出现不同,那出现分支的线程就会被禁止执行,这也是用 GPU 做并行编程的时候需要参考的一个很重要的因素,保证并行的线程出现线程束分化的情况尽量少。

下面就是线程束分化的例子。

并行归约问题

并行归约是指如何对并行问题进行归约,比如说相邻配对和交错配对。

设备管理

管理 GPU 主要是通过 CUDA API 或者 nvidia-smi 命令来获取。

SM(流式多处理器)

上面讲的是抽象上的分层,但是实际的物理层面承载 GPU 的是 SM。最早的 GPU 架构叫 Fermi,然后是 Kepler,然后才是 Tesla。一般说的 cm_20, sm_20 就指这种计算能力和架构,新款的 GPU 计算能力要更强一点。

内存模型

CUDA 的内存模型和 CPU 是类似的也是多级的结构。线程有局部内存,块之间有全局内存。GPU 和 CPU 都是主要采用 DRAM 来做主存,CPU 的一级缓存会用 SRAM 做。CPU 的多级缓存对用户来说不是很需要考虑,尽量屏蔽了其中的细节,但是 GPU 相对来说会把这种分级结构暴露给用户,这对编程来说也是一种新的挑战。

后面

用 GPU 计算一些计算密集型的程序,速度是真快,比 CPU 块很多,所以这也是为什么大量深度学习的应用都要通过 GPU 加速的原因。