ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

行星运动我们都是按万有引力定律算的,实际上按照爱因斯坦的说法,万有引力不是惯性力是时空弯曲的效果,但是按照爱因斯坦的公式算起来特别复杂,所以现在航天轨道的设计还是按万有引力定律在算。

牛顿万有引力定律

牛顿万有引力定律:任意两个质点由通过连心线方向上的力相互吸引。该吸引力的大小与它们的质量乘积成正比,与它们距离的平方成反比,G 是万有引力常数。

$$ F=-\frac{G M \cdot m}{r^2} \cdot \frac{\vec{r}}{r} $$

多体问题

多体问题,是所有吸引力的叠加,这里排除物体相撞的情况,因为宇宙非常大,两个天体之间的距离非常远,暂时不考虑这种情况,通过加入一个小常量可以在计算上忽略这个问题。

$$ F=-\frac{G M m r}{(r^2 + \epsilon^2)^{\frac{3}{2}}} $$

这样累加的时候自己也可以加进去,因为 r 是 0,那一项相当于没算。

$$ F_j= \sum_{i=1}^{n} F_i $$

对于 N body 有很多的稍微近似的算法,多是基于实际的物理场景的,比如大部分天体是属于一个星系的,每个星系之间都非常远,所以可以将一些星系作为整体计算,构建一颗树,树中的某些节点是其所有叶子的质心,这样就可以减少计算量,但是放到 GPU 的场景下一个 O(N^2) 的暴力算法利用 GPU 的并行能力可以非常快的算出来。

Python 暴力解

下面是 N-Body 暴力实现的例子,网上有个各种语言实验的,比较好用来测试准确性,来自这里,就是两层循环。

1
2
3
4
5
6
7
8
9
10
11
12
cat <<EOF >> nbody.txt
0.01 3 20
1
0 0 0
0.01 0 0
0.1
1 1 0
0 0 0.02
0.001
0 1 1
0.01 -0.01 -0.01
EOF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import math

class Vector:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z

def __add__(self, other):
return Vector(self.x + other.x, self.y + other.y, self.z + other.z)

def __sub__(self, other):
return Vector(self.x - other.x, self.y - other.y, self.z - other.z)

def __mul__(self, other):
return Vector(self.x * other, self.y * other, self.z * other)

def __div__(self, other):
return Vector(self.x / other, self.y / other, self.z / other)

def __eq__(self, other):
if isinstance(other, Vector):
return self.x == other.x and self.y == other.y and self.z == other.z
return False

def __ne__(self, other):
return not self.__eq__(other)

def __str__(self):
return '({x}, {y}, {z})'.format(x=self.x, y=self.y, z=self.z)

def abs(self):
return math.sqrt(self.x*self.x + self.y*self.y + self.z*self.z)

origin = Vector(0, 0, 0)

class NBody:
def __init__(self, fileName):
with open(fileName, "r") as fh:
lines = fh.readlines()
gbt = lines[0].split()
self.gc = float(gbt[0])
self.bodies = int(gbt[1])
self.timeSteps = int(gbt[2])
self.masses = [0.0 for i in range(self.bodies)]
self.positions = [origin for i in range(self.bodies)]
self.velocities = [origin for i in range(self.bodies)]
self.accelerations = [origin for i in range(self.bodies)]
for i in range(self.bodies):
self.masses[i] = float(lines[i*3 + 1])
self.positions[i] = self.__decompose(lines[i*3 + 2])
self.velocities[i] = self.__decompose(lines[i*3 + 3])

print("Contents of", fileName)
for line in lines:
print(line.rstrip())
print
print("Body : x y z |")
print(" vx vy vz")

def __decompose(self, line):
xyz = line.split()
x = float(xyz[0])
y = float(xyz[1])
z = float(xyz[2])
return Vector(x, y, z)

def __computeAccelerations(self):
for i in range(self.bodies):
self.accelerations[i] = origin
for j in range(self.bodies):
if i != j:
temp = self.gc * self.masses[j] / math.pow((self.positions[i] - self.positions[j]).abs(), 3)
self.accelerations[i] += (self.positions[j] - self.positions[i]) * temp
return None

def __computePositions(self):
for i in range(self.bodies):
self.positions[i] += self.velocities[i] + self.accelerations[i] * 0.5
return None

def __computeVelocities(self):
for i in range(self.bodies):
self.velocities[i] += self.accelerations[i]
return None

def __resolveCollisions(self):
for i in range(self.bodies):
for j in range(self.bodies):
if self.positions[i] == self.positions[j]:
(self.velocities[i], self.velocities[j]) = (self.velocities[j], self.velocities[i])
return None

def simulate(self):
self.__computeAccelerations()
self.__computePositions()
self.__computeVelocities()
self.__resolveCollisions()
return None

def printResults(self):
fmt = "Body %d : % 8.6f % 8.6f % 8.6f | % 8.6f % 8.6f % 8.6f"
for i in range(self.bodies):
print(fmt % (i+1, self.positions[i].x, self.positions[i].y, self.positions[i].z, self.velocities[i].x, self.velocities[i].y, self.velocities[i].z))
return None

nb = NBody("nbody.txt")
for i in range(nb.timeSteps):
print("\nCycle %d" % (i + 1))
nb.simulate()
nb.printResults()

CUDA 并行计算

在 CUDA 中并行化首先要理解 CUDA 的层次结构,CUDA 有 grid 和 block 对线程做划分。首先设计一个 computation tile,相当于一个 block 的计算。

左侧表示一个 block 中的 p 个 body 执行 p 次,最后就能更新所有 body 的加速度,这里的内存占用是 O(p) 的不是 O(p^2),浅绿色表示的是串行执行的流,官方的文档没有更新,对应 CUDA 10 的例子里是这样的,vec3 存的是 x/y/z 轴的加速度,vec4 存的是坐标加质量,就是计算 i 受到 j 的加速度,这里没有并行设计是串行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename T>
__device__ typename vec3<T>::Type
bodyBodyInteraction(typename vec3<T>::Type ai,
typename vec4<T>::Type bi,
typename vec4<T>::Type bj)
{
typename vec3<T>::Type r;

// r_ij [3 FLOPS]
r.x = bj.x - bi.x;
r.y = bj.y - bi.y;
r.z = bj.z - bi.z;

// distSqr = dot(r_ij, r_ij) + EPS^2 [6 FLOPS]
T distSqr = r.x * r.x + r.y * r.y + r.z * r.z;
distSqr += getSofteningSquared<T>();

// invDistCube =1/distSqr^(3/2) [4 FLOPS (2 mul, 1 sqrt, 1 inv)]
T invDist = rsqrt_T(distSqr);
T invDistCube = invDist * invDist * invDist;

// s = m_j * invDistCube [1 FLOP]
T s = bj.w * invDistCube;

// a_i = a_i + s * r_ij [6 FLOPS]
ai.x += r.x * s;
ai.y += r.y * s;
ai.z += r.z * s;

return ai;
}

block 的 tile computation 就是串行计算 p 次 p 个 body 的加速度。

1
2
3
4
for (unsigned int counter = 0; counter < blockDim.x; counter++)
{
acc = bodyBodyInteraction<T>(acc, bodyPos, sharedPos[counter]);
}

每次计算完了以后要对线程做共享内存的同步

黑实线就是一次 block 共享内存同步的边界,也就是一次 tile compuation,图中是 p 个 body 按时间串行执行的流程,超出 p 的时间计算的不是 p 中的 body,看下面这张图。

也就是按照时间串行的计算每个 body 对一个 body 的加速度,然后 N/p 个 block 并行,每个 block 有 p 个线程并行,没 p 次计算每个 block 要同步一次,但是多个 block 之间不需要同步,所以每个并行线程的主体是这样的要在 tile computation 之间包一层同步的调用 cg::sync(ca),同步一次 block 中线程的共享内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename T>
__device__ typename vec3<T>::Type
computeBodyAccel(typename vec4<T>::Type bodyPos,
typename vec4<T>::Type *positions,
int numTiles, cg::thread_block cta)
{
typename vec4<T>::Type *sharedPos = SharedMemory<typename vec4<T>::Type>();

typename vec3<T>::Type acc = {0.0f, 0.0f, 0.0f};

for (int tile = 0; tile < numTiles; tile++)
{
sharedPos[threadIdx.x] = positions[tile * blockDim.x + threadIdx.x];

cg::sync(cta);
// This is the "tile_calculation" from the GPUG3 article.
#pragma unroll 128

for (unsigned int counter = 0; counter < blockDim.x; counter++)
{
acc = bodyBodyInteraction<T>(acc, bodyPos, sharedPos[counter]);
}

cg::sync(cta);
}

return acc;
}

从 GPU 的角度来看,时间复杂度是 O(N),空间复杂度也是 O(N),但前提是要开 N 个线程同时计算。

参考资料

n body nvidia

O(N) 算法计算天体运动

n body problem

模型的保存分三种类型

  1. 知道模型结构,单纯保存变量
  2. 不知道模型结构,保存模型和变量
  3. 不需要再改变量,只要常量化的模型(“冻结”)

第一种用于训练的存档,并且临时恢复,这个时候用户是把训练需要的网络结构在代码里面构造好了的,只是在一定的时间下需要暂时保存网络中的变量,为了在崩溃之后继续训练。所以自然而然会有一个问题,如果我用 Python 写的代码,需要在 C++ 当中恢复,我需要知道你的模型结构,才能恢复,这个最蠢的办法是用 C++ 把你的网络结构再构造一遍,但我们按照统一的协议(比如 Protobuf)确定网络结构,就可以直接从标准序列化的数据中解析网络结构,这就是第二种情况,独立于语言,模型和变量一起保存的情况。然后如果碰到我们不需要再训练了,比如只是把这个模型进行部署,不需要改变相关的变量,那么其实只要一个带常量的模型就可以,这就是第三种情况,把变量冻结的正向传播模型。接下来会依次解释这几种情况的工作方式。

除了这些以外,针对用于服务的模型还可以做很多的优化。

存档

存档只是单纯的保存变量,并且能够恢复,可以在一定的迭代次数以后保存变量,并且从任意一个存档开始重新训练。以两个变量加减 1 为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import tensorflow as tf

# Create some variables.
v1 = tf.get_variable("v1", shape=[3], initializer = tf.zeros_initializer)
v2 = tf.get_variable("v2", shape=[5], initializer = tf.zeros_initializer)

inc_v1 = v1.assign(v1+1)
dec_v2 = v2.assign(v2-1)

# Add an op to initialize the variables.
init_op = tf.global_variables_initializer()

# Add ops to save and restore all the variables.
saver = tf.train.Saver()

# Later, launch the model, initialize the variables, do some work, and save the
# variables to disk.
with tf.Session() as sess:
sess.run(init_op)
# Do some work with the model.
inc_v1.op.run()
dec_v2.op.run()
# Save the variables to disk.
save_path = saver.save(sess, "/tmp/tf-test/model.ckpt")
print("Model saved in path: %s" % save_path)

可以在 /tmp/tf-test 下面看到这几个文件 checkpoint model.ckpt.data-00000-of-00001 model.ckpt.index model.ckpt.meta

可以通过脚本观察保存的变量 python $tensorflow-src/tensorflow/python/tools/inspect_checkpoint.py --file_name=/tmp/tf-test/model.ckpt --all_tensors

得到保存的变量的内容,注意 model.ckpt 这个只是文件前缀。

1
2
3
4
tensor_name:  v1
[1. 1. 1.]
tensor_name: v2
[-1. -1. -1. -1. -1.]

如果要恢复的话,可以通过下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import tensorflow as tf

# Create some variables.
v1 = tf.get_variable("v1", shape=[3])
v2 = tf.get_variable("v2", shape=[5])

# Add ops to save and restore all the variables.
saver = tf.train.Saver()

# Later, launch the model, use the saver to restore variables from disk, and
# do some work with the model.
with tf.Session() as sess:
# Restore variables from disk.
saver.restore(sess, "/tmp/tf-test/model.ckpt")
print("Model restored.")
# Check the values of the variables
print("v1 : %s" % v1.eval())
print("v2 : %s" % v2.eval())

得到一样的效果

1
2
v1 : [1. 1. 1.]
v2 : [-1. -1. -1. -1. -1.]

具体来说 .meta 对应的是 MetaGraph 和 SaverGraph,.index 对应的是变量值的位置,key 是变量名,value 是变量保存的入口定义,data 变量的值具体保存的文件。这是恢复代码中已经原样构造出了 Graph,如果没有构造的化,需要通过 tf.train.import_meta_graph('/tmp/model.ckpt.meta') 来加载,但是存档保存的信息比较单一,Tensorflow 提供了一个更丰富的 API 来使用。

保存

SavedModelBuilder 保存的 API 比较丰富,能够保存多个 MetaGraph 和 Variables 的组合,除此之外还能附带 assets,并且要指定模型签名,simple_saved 的方法是一个简单版本的调用,适用于 Predict API。这里要展开一下 GraphDef, MetaGraphDef, SignatureDef, tags 这些东西的概念。对于 MetaGraph,这篇文章解释得很清楚。SignatureDef 是对应了一种图的输入和输出,可以依据这个进行 serving API 的调用,类似于函数签名,相对于一个接口的定义。

tensorflow_serving 自己给了个例子,执行 python mnist_saved_model.py /tmp/tf-test-2 以后可以获得一个目录,下面有版本 1 的模型数据,执行 saved_model_cli show --dir /tmp/tf-test-2/1 可以查看对应的签名。可以看到对应的层级关系,默认用于服务的模型会打上 serve 的标签,函数签名有两个,分别对应了 predict 和 classify 的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['predict_images']:
The given SavedModel SignatureDef contains the following input(s):
inputs['images'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 784)
name: x:0
The given SavedModel SignatureDef contains the following output(s):
outputs['scores'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: y:0
Method name is: tensorflow/serving/predict

signature_def['serving_default']:
The given SavedModel SignatureDef contains the following input(s):
inputs['inputs'] tensor_info:
dtype: DT_STRING
shape: unknown_rank
name: tf_example:0
The given SavedModel SignatureDef contains the following output(s):
outputs['classes'] tensor_info:
dtype: DT_STRING
shape: (-1, 10)
name: index_to_string_Lookup:0
outputs['scores'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: TopKV2:0
Method name is: tensorflow/serving/classify

可以参考 tensorflow 的 REST API,比如 GET http://host:port/v1/models/${MODEL_NAME}[/versions/${MODEL_VERSION}] 其实对应这个例子就是 GET http://host:port/v1/models/tf-test-2/versions/1,然后感觉函数签名不同的 method name,可以调用不同的 request,比如 POST http://host:port/v1/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]:predict 这个格式,如果输入和输出对应的是 imagesscores 那么就对应了第一个签名。

冻结

冻结的情况就是变量不再需要修改,直接把变量转化成常量保存成单一的模型,方便在部署的场景下使用。
冻结模型的代码在这里,他的主要流程如下

  1. 清除所有 Op 中的 device,让原来在指定 CPU/GPU/节点 上的 Op 不再绑定。
  2. 通过 graph_util.convert_variables_to_constants 将所有的 Variable eval 一次,把变量的 Op 的结果拿到,替换成 constant

优化

除了冻结模型以外,还可以删减一些多余的节点,比如 Summary 节点或者 Identity 节点,甚者把 16bit 的浮点数权重修改为 8bit 的浮点数权重(这个在 Tensorflow Lite 里很有用)。这篇文章 列出了详细的优化方式,主要是靠 transform_graph 这个工具,地址在,他有很详细的柴剪列表,并且可以自己编写裁剪函数,充分做到模型在部署环节的“纯净化”,调用方式也很简单。

1
2
3
4
5
6
7
8
9
10
transform_graph \
--in_graph=tensorflow_inception_graph.pb \
--out_graph=optimized_inception_graph.pb \
--inputs='Mul:0' \
--outputs='softmax:0' \
--transforms='
strip_unused_nodes(type=float, shape="1,299,299,3")
remove_nodes(op=Identity, op=CheckNumerics)
fold_old_batch_norms
'

transforms里面加入你想进行优化的 transformer 和对应的参数即可,在科赛上也有在线可以跑的notebook

参考

  1. Cloud ML Engine

我照着 raft 论文重新过了一遍 etcd/raft 的代码,主要的文件是 etcd 下面的 raft.go。对照这个代码重新梳理一遍也算是深入理解一下 raft 算法。接下来会包含两个视频一个是选举相关的内容,一个是日志复制的内容,我 walktrough 的时候默认是大致读过论文的,对一些机制和字段都有了解,没有具体解析每个字段的来历,并且把一些问题按照代码的顺序重新理了一遍。

第一部分是关于选举的,raft 本身是一个 quorum based 的算法,一致性要靠“大多数人”的同意,并且为了简单和不必要的竞争,raft 只有一个主节点,在收到大多数人的投票以后成为主节点。

第二部分是关于日志复制的,raft 需要让主节点把客户端的请求复制到大多数节点上才能算达成一致,并且 commit,下面介绍的是复制的机制,并且 ConfChange 和 Snapshot 也是走这个流程达成一致的。

之后的 walkthrough 应该会慢慢补上,关于一些其他的逻辑和具体的一些函数的细节部分会在后面放出。

mvcc

v2 版本的实现存在 watcher 丢失或者丢失事件的问题,导致客户端要重新获取一遍。新版本是有本地嵌入式的数据库来避免这些问题。
mvcc 是 etcd v3 版本的存储实现主要有两个部分,一个是 backend 的 boltDB 和内存索引 key index。

BoltDB 是一个 B+ 树的嵌入式 KV store,相对于 leveldb 适用于写多读少的情况,我之前的文章简单介绍了一下 leveldb 的设计,BoltDB 比较适合读多写少,或者存在大范围 scan 的情况下比较合适,还有类似 levedb 的产品 badger,是纯 Go 实现的,在多写的情况下 leveldb 的衍生品表现更好。

BatchTx 就是收集很多的 call,然后在一个 Update 里面完成。

backend 里面存的 key 是 revision ,其中 main revision 是事务编号,sub revision 是事务中操作的编号,etcd 在 boltdb 上还做了一层缓存,所以多了一些锁机制。
内存索引中存的是真正的 key 到 revison 的索引。

key_index -> tx_buffer -> boltdb.tx

调用路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kv:
WriteView 是一些直接操作
ReadView
Read() TxnRead
Write() TxnWrite 是拿到对应的 transaction

revision:
main
sub
kvstore:
readView readView{kv} 用了 Read() Read/Write 是 kvstore 自己实现的用于拿 backend 的和自己的各种锁。(kvstore\_txn.go)
writeView writeView{kv} 用了 Write()
treeIndex:
keyIndex


writeView:
拿到 kv 的 Write(storeTxnWrite): 首先查 tw.s.kvindex.Get 然后再 backend.Tx UnsageSeqPut,然后更新 keyindex,append changes,处理 lease (old detach new attach)
readView
storeTxnRead: keyindex Revisons -> tx.UnsafeRange

lessor lesor(重音在后面)
leaseBucket 在单独的一个 bucket 里面

BPF

BPF (Berkeley Packet Filter) 最早是用在 tcpdump 里面的,比如 tcpdump tcp and dst port 80 这样的过滤规则会单独复制 tcp 协议并且目的端口是 80 的包到用户态。整个实现是基于内核中的一个虚拟机来实现的,通过翻译 BPF 规则到字节码运行到内核中的虚拟机当中。最早的论文是这篇,这篇论文我大概翻了一下,主要讲的是原本的基于栈的过滤太重了,而 BPF 是一套能充分利用 CPU 寄存器,动态注册 filter 的虚拟机实现,相对于基于内存的实现更高效,不过那个时候的内存比较小才几十兆。bpf 会从链路层复制 pakcet 并根据 filter 的规则选择抛弃或者复制,字节码是这样的,具体语法就不介绍了,一般也不会去直接写这些字节码,然后通过内核中实现的一个虚拟机翻译这些字节码,注册过滤规则,这样不修改内核的虚拟机也能实现很多功能。

在 Linux 中对应的 API 是

1
2
3
socket(SOCK_RAW)
bind(iface)
setsockopt(SO_ATTACH_FILTER)

下面是一个低层级的 demo,首先 ethernet header 的十二个字节记录了 ip 的协议,ip 的第9个字节记录 tcp 的协议,如果协议编号不匹配都跳到最后 reject,然后在到 tcp 的第二个字节是 port 看看是不是 80,都满足的话就 accept。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <net/if.h>
#include <net/ethernet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <netpacket/packet.h>
#include <linux/filter.h>

#define OP_LDH (BPF_LD | BPF_H | BPF_ABS)
#define OP_LDB (BPF_LD | BPF_B | BPF_ABS)
#define OP_JEQ (BPF_JMP | BPF_JEQ | BPF_K)
#define OP_RET (BPF_RET | BPF_K)

// Filter TCP segments to port 80
static struct sock_filter bpfcode[8] = {
{ OP_LDH, 0, 0, 12 }, // ldh [12]
{ OP_JEQ, 0, 5, ETH_P_IP }, // jeq #0x800, L2, L8
{ OP_LDB, 0, 0, 23 }, // ldb [23] # 14 bytes of ethernet header + 9 bytes in IP header until the protocol
{ OP_JEQ, 0, 3, IPPROTO_TCP }, // jeq #0x6, L4, L8
{ OP_LDH, 0, 0, 36 }, // ldh [36] # 14 bytes of ethernet header + 20 bytes of IP header (we assume no options) + 2 bytes of offset until the port
{ OP_JEQ, 0, 1, 80 }, // jeq #0x50, L6, L8
{ OP_RET, 0, 0, -1, }, // ret #0xffffffff # (accept)
{ OP_RET, 0, 0, 0 }, // ret #0x0 # (reject)

};

int main(int argc, char **argv)
{
int sock;
int n;
char buf[2000];
struct sockaddr_ll addr;
struct packet_mreq mreq;
struct iphdr *ip;
char saddr_str[INET_ADDRSTRLEN], daddr_str[INET_ADDRSTRLEN];
char *proto_str;
char *name;
struct sock_fprog bpf = { 8, bpfcode };

if (argc != 2) {
printf("Usage: %s ifname\n", argv[0]);
return 1;
}

name = argv[1];

sock = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
if (sock < 0) {
perror("socket");
return 1;
}

memset(&addr, 0, sizeof(addr));
addr.sll_ifindex = if_nametoindex(name);
addr.sll_family = AF_PACKET;
addr.sll_protocol = htons(ETH_P_ALL);

if (bind(sock, (struct sockaddr *) &addr, sizeof(addr))) {
perror("bind");
return 1;
}

if (setsockopt(sock, SOL_SOCKET, SO_ATTACH_FILTER, &bpf, sizeof(bpf))) {
perror("setsockopt ATTACH_FILTER");
return 1;
}

memset(&mreq, 0, sizeof(mreq));
mreq.mr_type = PACKET_MR_PROMISC;
mreq.mr_ifindex = if_nametoindex(name);

if (setsockopt(sock, SOL_PACKET,
PACKET_ADD_MEMBERSHIP, (char *)&mreq, sizeof(mreq))) {
perror("setsockopt MR_PROMISC");
return 1;
}

for (;;) {
n = recv(sock, buf, sizeof(buf), 0);
if (n < 1) {
perror("recv");
return 0;
}

ip = (struct iphdr *)(buf + sizeof(struct ether_header));

inet_ntop(AF_INET, &ip->saddr, saddr_str, sizeof(saddr_str));
inet_ntop(AF_INET, &ip->daddr, daddr_str, sizeof(daddr_str));

switch (ip->protocol) {
#define PTOSTR(_p,_str) \
case _p: proto_str = _str; break

PTOSTR(IPPROTO_ICMP, "icmp");
PTOSTR(IPPROTO_TCP, "tcp");
PTOSTR(IPPROTO_UDP, "udp");
default:
proto_str = "";
break;
}

printf("IPv%d proto=%d(%s) src=%s dst=%s\n",
ip->version, ip->protocol, proto_str, saddr_str, daddr_str);
}

return 0;
}

执行 curl "http://www.baidu.com",结果如下:

1
2
3
4
5
6
7
8
sudo ./filter ens3
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244

这些低级别的操作都封装在了 libpcap 里面,一般不太会自己这么写。

eBPF

eBPF 是 extended BPF 具有更强大的功能。老的 BPF 现在叫 cBPF (classic BPF)。

首先是字节码的指令集更加丰富了,并且现在有了 64 位的寄存器(相较于上古时期的 32 位的CPU),有了 JIT mapping 技术和 LLVM 的后端。JIT 指的的是 Just In Time,实时编译。

一般的 ePBF 的工作流是编写一个 C 的子集(比如没有循环),通过 LLVM 编译到字节码,然成生成 ELF 文件,然后 JIT 编译进内核。

eBPF 一个最重要的功能是可以做到动态跟踪(dynamic tracing),可以不修改程序直接监控一个正在运行的进程。

在 eBPF 之前

在 ebpf 之前,为了实现同样的功能,要在执行的指令中嵌入 hook,并且支持跳到 inspect 函数,然后再恢复执行,这个流程和 debugger 非常相似,这是用 kprobe 来实现,kprobe 是 2007 年引入内核的。比如下面的例子,把 Instruction 3 改成跳转指令,然后再执行 Instruction 3,然后再跳转回去。

使用 kprobe 需要通过编译 kernel module 注册到内核当中,非常麻烦,等于是直接动内核的代码很容易引起内核 panic,而且每个内核版本都不一样,函数符号和位移是有区别的,对每个版本的内核都要编译一个对应的版本的 module。为了解决这个问题引入了一些静态的稳定的 trace point,不会因为版本而改变的地方可以插入 kprobe,但这样就限制了 kprobe 可以探测的范围。

有了 eBPF

有了 eBPF,就可以将用户态的程序插入到内核中,不用编写内核模块了,但是问题并没有改善,内核版本带来的问题还是没有解决。

eBPF 的 kprobe 一种方式时候 mapping,映射 kprobe 的数据到用户态程序,比如发包数,然后用户态程序定期检查这个映射进行统计。

另一种方式是 event (perf_events),如果 kprobe 向用户态程序发送事件来进行统计,这样不同轮询,直接异步计算就可以。

低级别的 API,这个只有 Linux 有

1
2
3
4
5
bpf() 系统调用
BPF_PROG_LOAD 加载 BPF 字节码
BPF_PROG_TYPE_SOCKET_FILTER
BPF_PROG_TYPE_KPROBE
BPF_MAP_* map 映射到 BPF 当中
1
perf_event_open() + ioctl(PERF_EVENT_IOC_SET_BPF)

高级别的接口是 bcc(BPFcompiler collection),转换 c 到 LLVM-epbf 后端,并且前端是 python 的。可以实现动态加载 eBPF 字节码到内核中。

weave scope 就是用 bcc 实现的 HTTP stats 的统计。

这里可以看到程序的主体,这里 hook 了内核函数 skb_copy_datagram_iter,这个函数有一个 tracepoint trace_skb_copy_datagram_iovec。在内核代码里面对应的是下面这段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* skb_copy_datagram_iter - Copy a datagram to an iovec iterator.
* @skb: buffer to copy
* @offset: offset in the buffer to start copying from
* @to: iovec iterator to copy to
* @len: amount of data to copy from buffer to iovec
*/
int skb_copy_datagram_iter(const struct sk_buff *skb, int offset,
struct iov_iter *to, int len)
{
int start = skb_headlen(skb);
int i, copy = start - offset, start_off = offset, n;
struct sk_buff *frag_iter;

trace_skb_copy_datagram_iovec(skb, len);

这里对这个 hook 注册了程序,具体的代码就不展示了,主要是根据协议统计这个 HTTP 的大小,方法等信息。

1
2
3
4
5
6
7
8
9
10

/* skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket
* buffers from kernel to userspace.
*
* skb_copy_datagram_iter() has an associated tracepoint
* (trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but
* it lacks the offset argument.
*/
int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *skb, int offset, void *unused_iovec, int len)
{

比如判断 method 是不是 DELETE 的是实现就比较蠢,是因为 eBPF 不支持循环,只能这么实现才能把 c 代码翻译成字节码。

1
2
3
4
5
case 'D':
if ((data[1] != 'E') || (data[2] != 'L') || (data[3] != 'E') || (data[4] != 'T') || (data[5] != 'E') || (data[6] != ' ')) {
return 0;
}
break;

除了 bcc 之外,waeve 使用了 gobpf,一个 bpf 的 go binding,并且通过建立 tcp 连接来猜测内核的数据结构,以达到内核版本无关,这个项目 tcptracer-bpf 还在开发中。

eBPF 的其他应用

还有一个比较大头的基于 eBPF 的是 cilium,一套比较完整的网络解决方案,用 eBPF 实现了 NAT,L3/L4 负载均衡,连接记录等等功能。比如访问控制,一般的 iptables 都是 drop 或者 rst,要过整个协议栈,但是 eBPF 可以在 connect 的时候就拦截然后返回 EACCESS,这样就不用过协议栈了。cilium 一个优化就是通过 XDP ,利用类似 DPDK 的加速方案,hook 到驱动层中,让 eBPF 可以直接使用 DMA 的缓冲,优化负载均衡。

BPF/XDP allows for a 10x improvement in load balancing over IPVS for L3/L4 traffic.

现在 k8s 最新的 lb 方案是基于 ipvs 的,我在 kube-proxy 分析 里面有提到过,已经比原来的 iptables 提高很多了,现在有了 eBPF 加 XDP 的硬件加速可以实现更高的提升,facebook 的 katran L4 负载均衡器的实现也是类似的。

cilium 在我看来基本上是 k8s 网络的一个大方向吧,只不过包括 eBPF 和 XDP 对硬件和内核版本都要比较新,是一个要持续关注的更新。

性能调优

Velocity 2017: Performance Analysis Superpowers with Linux eBPF里,Brendan Gregg (Netflix 的性能调优专家) 提到,性能调优也是 eBPF 的一个大头。用于网络监控其实只是 hook 在了协议栈的函数上,如果 hook 在别的地方可以有更多的统计维度。比如 bcc 官方的例子就是统计 IO Size 的大小的分布,更多关于基于 eBPF 的性能调优可以参考他的 blog,他给出了更详细的关于 eBPF 的解释,里面有一些列 Linux 性能调优的内容。

1
2
3
4
5
6
7
8
9
10
11
12
# ./bitehist.py
Tracing... Hit Ctrl-C to end.
^C
kbytes : count distribution
0 -> 1 : 3 | |
2 -> 3 : 0 | |
4 -> 7 : 211 |********** |
8 -> 15 : 0 | |
16 -> 31 : 0 | |
32 -> 63 : 0 | |
64 -> 127 : 1 | |
128 -> 255 : 800 |**************************************|

安全

在安全方面有 seccomp,可以实现限制 Linux 的系统调用,而 seccompe-bpf 则是通过 bpf 支持更强大的过滤和匹配功能,k8s pod 里面的 SecurityContext 就有 seccomp 实现的部分。

cgroup

在 cgroup 上有一个小原型,cgnet 获取 cgroup 的网络统计信息到 prometheus,也是基于 eBPF 的。

参考:

  1. Infrastructure 2017 - Alfonso Acosta - High-performance Linux monitoring with eBPF

  2. Using bpf in kubernetes

  3. Cilium: Networking and security for containers with BPF and XDP

csi 是一个标准的容器存储接口,规定了如何实现一个容器的存储接口,CSI 本身的定义是基于 gRPC 的,所以有一套样例库可以使用,这里分析一下 kuberntes 实现 csi 的方式,为了兼容 CSI kubernete 其实搞得挺绕的,目前这个 CSI 还是定制中包括后期的 Snapshot 的接口怎么设计等等还在讨论中。kubernetes CSI 主要基于几个外部组件和内部功能的一些改动。

CSI-Driver

这里规定了 CSI 的标准,定义了三个 Service,也就是 RPC 的集合,但是没规定怎么写,目前看到的实现都是把这三个 service 都写在一起,比较方便,然后部署的时候有些区别将就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
service Identity {
rpc GetPluginInfo(GetPluginInfoRequest)
returns (GetPluginInfoResponse) {}

rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
returns (GetPluginCapabilitiesResponse) {}

rpc Probe (ProbeRequest)
returns (ProbeResponse) {}
}

service Controller {
rpc CreateVolume (CreateVolumeRequest)
returns (CreateVolumeResponse) {}

rpc DeleteVolume (DeleteVolumeRequest)
returns (DeleteVolumeResponse) {}

rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
returns (ControllerPublishVolumeResponse) {}

rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
returns (ControllerUnpublishVolumeResponse) {}

rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
returns (ValidateVolumeCapabilitiesResponse) {}

rpc ListVolumes (ListVolumesRequest)
returns (ListVolumesResponse) {}

rpc GetCapacity (GetCapacityRequest)
returns (GetCapacityResponse) {}

rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
returns (ControllerGetCapabilitiesResponse) {}

rpc CreateSnapshot (CreateSnapshotRequest)
returns (CreateSnapshotResponse) {}

rpc DeleteSnapshot (DeleteSnapshotRequest)
returns (DeleteSnapshotResponse) {}

rpc ListSnapshots (ListSnapshotsRequest)
returns (ListSnapshotsResponse) {}
}

service Node {
rpc NodeStageVolume (NodeStageVolumeRequest)
returns (NodeStageVolumeResponse) {}

rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
returns (NodeUnstageVolumeResponse) {}

rpc NodePublishVolume (NodePublishVolumeRequest)
returns (NodePublishVolumeResponse) {}

rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
returns (NodeUnpublishVolumeResponse) {}

rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
returns (NodeGetVolumeStatsResponse) {}

// NodeGetId is being deprecated in favor of NodeGetInfo and will be
// removed in CSI 1.0. Existing drivers, however, may depend on this
// RPC call and hence this RPC call MUST be implemented by the CSI
// plugin prior to v1.0.
rpc NodeGetId (NodeGetIdRequest)
returns (NodeGetIdResponse) {
option deprecated = true;
}

rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
returns (NodeGetCapabilitiesResponse) {}

// Prior to CSI 1.0 - CSI plugins MUST implement both NodeGetId and
// NodeGetInfo RPC calls.
rpc NodeGetInfo (NodeGetInfoRequest)
returns (NodeGetInfoResponse) {}
}

比如说 GetPluginInfo 就是用来获取 driver 的 name 等信息的,NodePublishVolume 大部分情况下就是在节点上挂载文件系统,CreateVolume 这个如果对应的是 ebs 这种块存储可能就是在 API 里面建一个 ebs,如果对应的是 glusterfs 这种文件系统存储可能就是建一个 volume,然后 ControllerPublishVolume 对应 ebs 就是把 ebs 和 instance 绑定,然后调用节点的 NodePublishVolume 来挂载,如果是文件存储,可能就不需要 ``ControllerPublishVolume` 了,因为不需要绑定快设备到机器上,直接挂到网络接口就可以,这一套标准的目的一个是为了兼容现有的存储方案,一个是为了让一些私有的 provider 能够比较容易的实现一套方案,而不需要做过多的迁移,甚至厂商都不需要开源代码,如果是要实现 in-tree 的存储代码肯定是要开源的,因为 kubernetes 是开源的。

device-driver-registrar

kubernetes 实现 csi 的兼容,首先需要一个外部组件 devide-driver-registrar,初始化的时候通过 csi-sock 的 RPC 获取 driver name 和 node id。

主要功能给 node 打上下面类似的 annotations,dirver 对应的是 csi driver 的名字,name 对应的是 driver 的 NodeId 基本上就是 k8s 的 node name。这样可以让 ControllerPublishVolume 调用能够获取 nodeid 到 storage nodeid 的映射,理论上一样的就可以感觉。

1
csi.volume.kubernetes.io/nodeid: "{ "driver1": "name1", "driver2": "name2" }

他有两个模式,一个模式是自己给 node 打上这个 annotation,并且在退出的时候把这个 annotation 去掉。

另一个模式是交给 kubelet 的 pluginswatcher 来管理, kubelet 自己会根据 device-driver-registrar 提供的 unix domain socket 然后调用 gRPC 从 registrar 获取 NodeId 和 DriverName 自己把 annotation 打上。

搜索这条路径下的 socket/var/lib/kubelet/plugins/[SanitizedCSIDriverName]/csi.sock,然后就可以自动连接 registrar 拿到 NodeId 和 DriverName。

所以 device-driver-registar 主要是注册 Node annotation 的。

external-attacher

监听 VolumeAttachments 和 PersistentVolumes 两个对象,这是和 kube-controller-manager 之间的桥梁。

实现中最后会调用 SyncNewOrUpdatedVolumeAttachment 来同步,调用 csi dirver 的 Attach 函数。

in-tree 的 attach/detach-controller

在 CSI 中扮演的角色是创建 VolumeAttachment,然后等待他的 VolumeAttachment 的 attached 的状态。

attach-controller 会创建 VolumeAttatchment.Spec.Attacher 指向的是 external-attacher

external-provisoner

Static VolumeDynamic Volume 的区别是,有一个 PersistentVolumeClaim 这个会根据 claim 自动分配 PersistentVolume,不然就要自己手动创建,然后 pod 要指定这个手动创建的 volume。

external-provisoner 就是提供支持 PersistentVolumeClaim 的,一般的 provisioner 要实现 Provision 和 Delete 的接口。主要是根据 PVC 创建 PV,这是 Provisioner 的接口的定义了,不是 CSI spec 里的,这里顺带介绍一下。

external-provisoner 看到 pvc 调用 driver 的 CreateVolume,完成以后就会创建 PersistenVolume,并且绑定到 pvc。

kubelet volume manager

kubelet 有一个 volume manager 来管理 volume 的 mount/attach 操作。

desiredStateOfWorld 是从 podManager 同步的理想状态。

actualStateOfWorld 是目前 kubelet 的上运行的 pod 的状态。

每次 volume manager 需要把 actualStateOfWorld 中 volume 的状态同步到 desired 指定的状态。

volume Manager 有两个 goroutine 一个是同步状态,一个 reconciler.reconcile

rc.operationExecutor.MountVolume 会执行 MountVolume 的操作。

-> oe.operationGenerator.GenerateMountVolumeFunc

-> 首先根据 og.volumePluginMgr.FindPluginBySpec 找到对应的 VolumePlugin

-> 然后调用 volumePlugin.NewMounter

-> 然后拿到 og.volumePluginMgr.FindAttachablePluginBySpec attachableplugin

-> volumeMounter.SetUp(fsGroup) 做 mount

volume plugin

csi volume plugin 是一个 in-tree volume,以后应该会逐步迁移到都使用 csi,而不会再有 in-tree volume plugin 了。

1
2
3
func (c *csiMountMgr) SetUp(fsGroup *int64) error {
return c.SetUpAt(c.GetPath(), fsGroup)
}

csi 的 mounter 调用了 NodePublish 函数。stagingTargetPath 和 targetPath 都是自动生成的。

SetUp/TearDown 的调用会执行 in-tree CSI plugin 的接口(这又是 in-tree volume plugin 的定义,确实挺绕的),对应的是 NodePublishVolumeNodeUnpublishVolume ,这个会通过 unix domain socket 直接调用 csi driver。

总结一个简单的具体流程

首先管理员创建 StorageClass 指向 external-provisioner,然后用户创建指向这个 StorageClass 的 pvc,然后 kube-controller-manager 里的 persistent volume manager 会把这个 pvc 打上 volume.beta.kubernetes.io/storage-provisioner 的 annotation。

externla-provisioner 看到这个 pvc 带有自己的 annotation 以后,拿到 StorageClass 提供的一些参数,并且根据 StorageClass 和 pvc 调用 CSI driver 的 CreateVolume,创建成功以后创建 pv,并且把这个 pv 绑定到对应的 pvc。

然后 kube-controller-manager 看到有 pod 含有对应的 pvc,就用调用 in-tree 的 CSI plugin 的 attach 方法。

in-tree 的 CSI plugin 实际上会创建一个 VolumeAttachment 的 object,等待这个 VolumeAttachment 被完成。

external-controller 看到 VolumeAttachment,开始 attach 一个 volume,实际上调用 CSI driver 的 ControllerPublish,成功以后更新 VoluemAttachment 以后就知道这个 Volume Attach 成功了,然后让 attach/detach-controller (kube-controller-manager) 知道这个 attach 完成。

接下来就到 kubelet 了,kubelet 看到 volume in pod 以后就会调用 in-tree 的 csi plugin WaitForAttach,然后等待 Attach 成功,之后就会调用 daemonset 里面的 csi driver 的 NodePublishVolume 做挂载操作。

整体的流程是这样的,需要反复多看几遍 kubernetes-csi 的 document,加深理解。

kube-proxy 是一层代理,主要用于实现 Service 这个 Object。

Service 有一个 ClusterIP,这是一个虚拟 IP 应对 Service 后面的 pod,pod 的销毁和创建会有 ip 不固定的问题。除了能够屏蔽后端的 pod 的 IP 之外,Service 还可以有负载均衡的作用。

Endpoint 维护的是 Service 和 Pod 的映射关系,如果 pods 变动,Endpoint 也会变动。Service 本身指定义了 ClusterIP 和 port 的映射。Endpoints 记录了 pod 的 ip 和端口。

主要的访问方式是通过 NAT 实现的,如果使用 iptables 的话,就设置匹配目标端口以后,DNAT 转发到目的地址。

我看代码的时候,发现有个函数叫 birth cry 挺有意思的,表示打程序启动的一条 log。

Kube-proxy 主要监听两个对象,一个是 Service,一个是 Endpoint

这些 Objects 对应的处理函数有不同的实现,现在先看一下 iptables 的实现

当 Service 有更新的时候就会更新对应的 iptables 规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

glog.Info("Starting service config controller")
defer glog.Info("Shutting down service config controller")

if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) {
return
}

for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnServiceSynced()")
c.eventHandlers[i].OnServiceSynced()
}

<-stopCh
}

会调用 OnServiceSynced,然后具体调用实现者的方法。

Proxier

Netfilter

Netfilter 是内核模块的控制代码,ipvs 和 iptables 都是基于 Netfilter 实现的。

iptables

用 iptables 配置 DNAT ,然后再用概率模块做负载均衡。kube-proxy iptables 的实现主要依赖于 filter 和 nat。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Masquerade
-A KUBE-MARK-DROP -j MARK --set-xmark 0x8000/0x8000
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE

# clusterIP and publicIP
-A KUBE-SERVICES ! -s 10.244.0.0/16 -d 10.98.154.163/32 -p tcp -m comment --comment "default/nginx: cluster IP" -m tcp --dport 80 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -d 10.98.154.163/32 -p tcp -m comment --comment "default/nginx: cluster IP" -m tcp --dport 80 -j KUBE-SVC-4N57TFCL4MD7ZTDA
-A KUBE-SERVICES -d 12.12.12.12/32 -p tcp -m comment --comment "default/nginx: loadbalancer IP" -m tcp --dport 80 -j KUBE-FW-4N57TFCL4MD7ZTDA

# Masq for publicIP
-A KUBE-FW-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx: loadbalancer IP" -j KUBE-MARK-MASQ
-A KUBE-FW-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx: loadbalancer IP" -j KUBE-SVC-4N57TFCL4MD7ZTDA
-A KUBE-FW-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx: loadbalancer IP" -j KUBE-MARK-DROP

# Masq for nodePort
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/nginx:" -m tcp --dport 30938 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/nginx:" -m tcp --dport 30938 -j KUBE-SVC-4N57TFCL4MD7ZTDA

# load balance for each endpoints
-A KUBE-SVC-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx:" -m statistic --mode random --probability 0.33332999982 -j KUBE-SEP-UXHBWR5XIMVGXW3H
-A KUBE-SVC-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx:" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-TOYRWPNILILHH3OR
-A KUBE-SVC-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx:" -j KUBE-SEP-6QCC2MHJZP35QQAR

# endpoint #1
-A KUBE-SEP-6QCC2MHJZP35QQAR -s 10.244.3.4/32 -m comment --comment "default/nginx:" -j KUBE-MARK-MASQ
-A KUBE-SEP-6QCC2MHJZP35QQAR -p tcp -m comment --comment "default/nginx:" -m tcp -j DNAT --to-destination 10.244.3.4:80

# endpoint #2
-A KUBE-SEP-TOYRWPNILILHH3OR -s 10.244.2.4/32 -m comment --comment "default/nginx:" -j KUBE-MARK-MASQ
-A KUBE-SEP-TOYRWPNILILHH3OR -p tcp -m comment --comment "default/nginx:" -m tcp -j DNAT --to-destination 10.244.2.4:80

# endpoint #3
-A KUBE-SEP-UXHBWR5XIMVGXW3H -s 10.244.1.2/32 -m comment --comment "default/nginx:" -j KUBE-MARK-MASQ
-A KUBE-SEP-UXHBWR5XIMVGXW3H -p tcp -m comment --comment "default/nginx:" -m tcp -j DNAT --to-destination 10.244.1.2:80

这是一个实例,首先通过 clusterIP 的 -d 匹配目的地址,还有 dport 匹配目的端口,转入对应的 SVC 链。

SVC 联通过 -m statistic 模块负载均衡到不同的 Endpoint 链上,每个 Endpoint 链又有一个 DNAT 到对应的 pod

的 IP:port 上,总的来说就是用 DNAT 实现 clusterIP 的代理,用 statistic 模块实现按概率的负载均衡(按概率进入不同的 Endpoint 链当中)。

ipvs

ipvs 有相对更好的性能,更复杂的负载均衡的算法以及健康状态检查等。

可以看到 kube-ipvs0 上的 10.102.128.4 对应的 Service 的 ClusterIP,而 RealServer 映射到了不同的 Endpoint 上。

iptables 的匹配是 hash 的,而不同根据一个链一个链的匹配,效率更高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# kubectl describe svc nginx-service
Name: nginx-service
...
Type: ClusterIP
IP: 10.102.128.4
Port: http 3080/TCP
Endpoints: 10.244.0.235:8080,10.244.1.237:8080
Session Affinity: None

# ip addr
...
73: kube-ipvs0: <BROADCAST,NOARP> mtu 1500 qdisc noop state DOWN qlen 1000
link/ether 1a:ce:f5:5f:c1:4d brd ff:ff:ff:ff:ff:ff
inet 10.102.128.4/32 scope global kube-ipvs0
valid_lft forever preferred_lft forever

# ipvsadm -ln
IP Virtual Server version 1.2.1 (size=4096)
Prot LocalAddress:Port Scheduler Flags
-> RemoteAddress:Port Forward Weight ActiveConn InActConn
TCP 10.102.128.4:3080 rr
-> 10.244.0.235:8080 Masq 1 0 0
-> 10.244.1.237:8080 Masq 1 0 0

glusterfs 主要有以下几个组件

  • gluster 命令行。
  • glusterd 管理进程。
  • glusterfsd 服务进程。
  • glusterfs 客户端 fuse 进程。

glusterfs 是靠 translator 做分层设计的,类似于可插拔的模块的概念,对应的结构体是 xlator_t,因为是 C 写的,一些面向的对象的写法也是通过 xlator 实现的,可以理解 xlator 是类,具体的实现是对象。比如 protocol/client 是最后的 xlator 从客户端发送到网络,对应的 storage/posix 是服务端接受的最后一层交给服务器上的文件系统。每个 xlator 都会编译成一个单独的 .so 文件。通过指定配置文件可以把不同的 xlator 进行组装,加载 .so 以后,通过dlsym 查找 xlator 的符号(函数表),相当于获取 xlator 的公开接口。配置文件中的 volumesubvolume 对应了 xlator 的组织关系。

volfile 会配置在 /var/lib/glusterd/vols/ 下面,对应的文件名是 volfile-id 选项指定的。xlator_t 定义在 libglusterfs/src/xlator.h 里面。下面这张图就可以看到各个 translator 的关系。

glusterfsd 最开始是 protocol/server 最后是 storage/posix 两个 xlator 作为开头和结束。

glusterfs (client) 是以protocol/client 结尾。

那个 write-behind 应该对应的是缓存策略里面的 write back,异步写,而不是直写。

这种设计的好处是方便扩展,你只要填补 xlator 的实现,编译成 .so ,放到链接目录下面,就可以加载进去,不用改整个 glusterfs 的代码。

glusterfs 的整个应用的结构也可以看一下。

用户态是 fuse 实现的文件系统,通过网络协议走到 server,server 本身用的是本地的文件系统,glusterfs 推荐使用 xfs。

下面举个详细的例子。

writev 举例,(write 的本质也是 writev,这里的 v 指的是内存向量,写的时候用链表表示的内存向量可以减少内存的拷贝,因为要分配一段连续的内存,然后拷到一起很没有效率,而且系统调用本身也支持这个系统调用,走到底层是硬件支持的,硬件不支持,也能 hook 帮你再拷贝到一起,但上层就不用管这些细节了),他们的调用关系通过 STACK_WINDSTACK_UNWIND 实现,对于所有的 xlator 的 op 都是这种调用关系,本身一层结束以后通过调用 STACK_WIND 调用下一层对应的 op,然后在调用完成之后通过 STACK_UNWIND 回调 op_cbk,并且这种调用关系是树状的。

这张图解释挺好的,说明了 xlator 向下传递的过程,通过 STACK_WIND 调用下一层,通过 STACK_UNWIND 调用上一层的 cbk。

glusterfsd 每个 volume 都会起一个线程来处理。

./xlators/mount/fuse/src/fuse-bridge.c 下面是 fuse 的接口,这是客户端的入口,作为文件系统的“桥接点”,大概是为什么叫 bridge 的原因吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static fuse_handler_t *fuse_std_ops[FUSE_OP_HIGH] = {
[FUSE_LOOKUP] = fuse_lookup,
[FUSE_FORGET] = fuse_forget,
[FUSE_GETATTR] = fuse_getattr,
[FUSE_SETATTR] = fuse_setattr,
[FUSE_READLINK] = fuse_readlink,
[FUSE_SYMLINK] = fuse_symlink,
[FUSE_MKNOD] = fuse_mknod,
[FUSE_MKDIR] = fuse_mkdir,
[FUSE_UNLINK] = fuse_unlink,
[FUSE_RMDIR] = fuse_rmdir,
[FUSE_RENAME] = fuse_rename,
[FUSE_LINK] = fuse_link,
[FUSE_OPEN] = fuse_open,
[FUSE_READ] = fuse_readv,
[FUSE_WRITE] = fuse_write,
[FUSE_STATFS] = fuse_statfs,
[FUSE_RELEASE] = fuse_release,
[FUSE_FSYNC] = fuse_fsync,

看一下 fuse_write,传了一个 xlator_t ,这个东西是 glusterfs 的分层设计的核心,每个 fuse_in_header_tmsg 还有 iobuf 都是 fuse 的 API。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void
fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg,
struct iobuf *iobuf)
{
/* WRITE is special, metadata is attached to in_header,
* and msg is the payload as-is.
*/
struct fuse_write_in *fwi = (struct fuse_write_in *)
(finh + 1);

fuse_state_t *state = NULL;
fd_t *fd = NULL;
#if FUSE_KERNEL_MINOR_VERSION >= 9
fuse_private_t *priv = NULL;
priv = this->private;
#endif

GET_STATE (this, finh, state);
fd = FH_TO_FD (fwi->fh);
state->fd = fd;
state->size = fwi->size;
state->off = fwi->offset;

/* lets ignore 'fwi->write_flags', but just consider 'fwi->flags' */
#if FUSE_KERNEL_MINOR_VERSION >= 9
state->io_flags = fwi->flags;
#else
state->io_flags = fwi->write_flags;
#endif
/* TODO: may need to handle below flag
(fwi->write_flags & FUSE_WRITE_CACHE);
*/


fuse_resolve_fd_init (state, &state->resolve, fd);

/* See comment by similar code in fuse_settatr */
#if FUSE_KERNEL_MINOR_VERSION >= 9
priv = this->private;
if (priv->proto_minor >= 9 && fwi->write_flags & FUSE_WRITE_LOCKOWNER)
state->lk_owner = fwi->lock_owner;
#endif

state->vector.iov_base = msg;
state->vector.iov_len = fwi->size;
state->iobuf = iobuf;

fuse_resolve_and_resume (state, fuse_write_resume);

return;
}

首先是传入了写入文件的描述符,要写的内存的段的地址和大小,iobuf 这个结构体和内核里的 iobuf 是查不多的。然后进到 fuse_resolve_and_resume,这个函数主要是解析一些文件的元数据,并且返回到 writev_resume 这个回调函数上,解析的函数在 ./xlators/mount/fuse/src/fuse-resolve.c 下面,这整个是个状态机的写法,主要是解析 fd,父路径,inode 等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int
fuse_resolve (fuse_state_t *state)
{
fuse_resolve_t *resolve = NULL;

resolve = state->resolve_now;

if (resolve->fd) {

fuse_resolve_fd (state);

} else if (!gf_uuid_is_null (resolve->pargfid)) {

fuse_resolve_parent (state);

} else if (!gf_uuid_is_null (resolve->gfid)) {

fuse_resolve_inode (state);

} else {
fuse_resolve_all (state);
}

return 0;
}

到了回到调 fuse_write_resume 上,包了一层引用计数就往下传了。

1
2
3
4
5
6
7
8
9
10
11
iobref_add (iobref, state->iobuf);

gf_log ("glusterfs-fuse", GF_LOG_TRACE,
"%"PRIu64": WRITE (%p, size=%"GF_PRI_SIZET", offset=%"PRId64")",
state->finh->unique, state->fd, state->size, state->off);

FUSE_FOP (state, fuse_writev_cbk, GF_FOP_WRITE, writev, state->fd,
&state->vector, 1, state->off, state->io_flags, iobref,
state->xdata);

iobref_unref (iobref);

fuse_writev_cbk 里面 send_fuse_objiobuf发送出去。send_fuse_datasend_fuse_iov

然后走整个的 xlator 的 writev 调用链,到 client 上,这里最主要的就是 submit_request 开始走网络 rpc 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int32_t
client3_3_writev (call_frame_t *frame, xlator_t *this, void *data)
{
clnt_args_t *args = NULL;
clnt_conf_t *conf = NULL;
gfs3_write_req req = {{0,},};
int op_errno = ESTALE;
int ret = 0;

if (!frame || !this || !data)
goto unwind;

args = data;
conf = this->private;

ret = client_pre_writev (this, &req, args->fd, args->size,
args->offset, args->flags, &args->xdata);

if (ret) {
op_errno = -ret;
goto unwind;
}

ret = client_fd_fop_prepare_local (frame, args->fd, req.fd);
if (ret) {
op_errno = -ret;
goto unwind;
}
ret = client_submit_request (this, &req, frame, conf->fops,
GFS3_OP_WRITE, client3_3_writev_cbk,
args->iobref, args->vector, args->count,
NULL, 0, NULL,
(xdrproc_t)xdr_gfs3_write_req);
if (ret) {
/*
* If the lower layers fail to submit a request, they'll also
* do the unwind for us (see rpc_clnt_submit), so don't unwind
* here in such cases.
*/
gf_msg (this->name, GF_LOG_WARNING, 0, PC_MSG_FOP_SEND_FAILED,
"failed to send the fop");
}

GF_FREE (req.xdata.xdata_val);

return 0;

unwind:
CLIENT_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);
GF_FREE (req.xdata.xdata_val);

return 0;
}

回调就会开始释放内存等等操作,这里就略过了,这里回到 fuse 的 writev_cbk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static int
fuse_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *stbuf, struct iatt *postbuf, dict_t *xdata)
{
fuse_state_t *state = NULL;
fuse_in_header_t *finh = NULL;
struct fuse_write_out fwo = {0, };

state = frame->root->state;
finh = state->finh;

fuse_log_eh_fop(this, state, frame, op_ret, op_errno);

if (op_ret >= 0) {
gf_log ("glusterfs-fuse", GF_LOG_TRACE,
"%"PRIu64": WRITE => %d/%"GF_PRI_SIZET",%"PRId64"/%"PRIu64,
frame->root->unique,
op_ret, state->size, state->off, stbuf->ia_size);

fwo.size = op_ret;
send_fuse_obj (this, finh, &fwo);
} else {
gf_log ("glusterfs-fuse", GF_LOG_WARNING,
"%"PRIu64": WRITE => -1 gfid=%s fd=%p (%s)",
frame->root->unique,
(state->fd && state->fd->inode) ?
uuid_utoa (state->fd->inode->gfid) : "nil", state->fd,
strerror (op_errno));

send_fuse_err (this, finh, op_errno);
}

free_fuse_state (state);
STACK_DESTROY (frame->root);

return 0;
}

send_fuse_obj 是一个宏实际上是 send_fuse_data,把 fuse_write_out 传给 fuse 告诉 fuse 写出了多少,或者返回错误给 fuse。

1
2
#define send_fuse_obj(this, finh, obj) \
send_fuse_data (this, finh, obj, sizeof (*(obj)))

整个的分析过程大概是从客户端的角度来看的,glusterfs 比较重要的一个 xlator 就是 dht xlator,distributed hash table,这个 xlator 决定了文件的分布。

Tensor 的用户 API 可以参考这里,这里做一下简单介绍。Tensor 是各种维度的向量和矩阵的统称,分 Tensor 和 SparseTensor。和 Tensor 不同,SparseTensor 存的是值以及值对应的 index,而 Tensor 存的是完整的矩阵。

举个例子。

1
2
3
4
5
6
7
8
import tensorflow as tf
a = tf.constant([1, 1])
b = tf.constant([2, 2])
c = tf.add(a, b)
sess = tf.InteractiveSession()
print("a[0]=%s, a[1]=%s" % (a[0].eval(), a[1].eval()))
print("c = %s" % c.eval())
sess.close()

输出对应如下。

1
2
a[0]=1, a[1]=1
c = [3 3]

Tensor 只有 eval 以后才能获得结果,是懒计算的。

Tensor 的实现

Tensor (tensorflow/tensorflow/core/framework/tensor.h) 依赖 TensorShape(tensorflow/tensorflow/tensorflow/core/framework/tensor_shape.h) 和 TensorBuffer (tensorflow/tensorflow/core/framework/tensor.h) 两个成员。

TensorShape 主要负责记录张量的形状。

TensorBuffer 主要负责管理 Tensor 的内存,TensorBuffer 继承自 RefCounted (tensorflow/tensorflow/core/lib/core/refcount.h),具有引用计数的功能,用于对内存进行管理。

1
2
3
4
5
// Interface to access the raw ref-counted data buffer.
class TensorBuffer : public core::RefCounted {
public:
~TensorBuffer() override {}
...

他们的对应的关系如下。

Tensor 从下往上看,其实就是一个带”形状“的内存,和 NumPy 的数组是差不多的。

OpKernel

对于一个线性回归来说,是最简单也最好理解的模型,方便分析底层的代码实现。

$$ Y=XW+b $$

损失函数用平方差定义的,优化器是提督下降,这样一个模型可以用一下的 Python 代码实现,这个代码片段是截取的,如果要完整运行这个例子可以在这里复现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import tensorflow as tf
sess = tf.InteractiveSession()
x = tf.placeholder(tf.float32,[None, x_train.shape[1]])
y = tf.placeholder(tf.float32,[None, 1])
w = tf.Variable(tf.zeros([x_train.shape[1],1]))
b = tf.Variable(tf.zeros([1])) # placeholder 不用加 None
pred = tf.add(tf.matmul(x, w), b)

init = tf.global_variables_initializer()
cost = tf.reduce_mean(tf.square(y - pred))
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost)
epochs = 3000

init.run()

for epoch in range(0, epochs):
optimizer.run(feed_dict={x:x_train, y:y_train})
c = cost.eval(feed_dict = {x:x_train,y:y_train})
if epoch%100 == 0:
print_percentage(int(c*100))

print('\nEpoch: {0}, Error: {1}'.format(epoch+1, c))

b_value = b.eval()
w_value = w.eval()

# Predicted Labels
y_pred = pred.eval(feed_dict={x: x_test})

# Mean Squared Error
mse = tf.reduce_mean(tf.square(y_pred - y_test))
print("MSE", mse.eval())
sess.close()

对应的训练结果。

1
2
Epoch: 3000, Error: 0.25882452726364136
MSE 0.30620116674806463

可以看到,线性回归的模型主要依赖的两个 Operation 分别是 tf.addtf.matmul,其他的复杂模型也是类似的逻辑,对应的 OpKernel 分别是 AddOpMatMulOp,这里可以看一下具体的实现。

如果有源代码,可以连着源代码用 bazel 编译,可以参照这里自己编写一个 Op。

MatMulOp 的实现在 /tensorflow/core/kernels/matmul_op.cc 下面,定义在tensorflow/tensorflow/core/ops/math_ops.cc 下面。AddOp/tensorflow/core/kernels/matmul_op.cc,实现在 /tensorflow/core/kernels/cwise_op_add_1.cc 下面,依赖 /tensorflow/tensorflow/core/kernels/cwise_ops_common.hcommon 的定义。

Add 用的是 Eigenadd /tensorflow/tensorflow/core/kernels/cwise_ops.h,依赖third_party/Eigen/src/Core/functors/BinaryFunctors.h

举个例子,看一下 MatMulOpMatMulOp 的构造函数里面有一个 OpKernelConstruction 可以初始化 OpKernel,通过 OpKernel 可以获得这个 Op 的参数比如transpose_a 等等。

1
2
3
4
5
6
7
8
9
10
11
12
template <typename Device, typename T, bool USE_CUBLAS>
class MatMulOp : public OpKernel {
public:
explicit MatMulOp(OpKernelConstruction* ctx)
: OpKernel(ctx), algorithms_set_already_(false) { // 在执行构造函数之前,执行两个成员的构造函数
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_a", &transpose_a_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_b", &transpose_b_));

LaunchMatMul<Device, T, USE_CUBLAS>::GetBlasGemmAlgorithm(
ctx, &algorithms_, &algorithms_set_already_);
use_autotune_ = MatmulAutotuneEnable();
}

每个 OpKernel 都要实现一个 Compute 函数,可以看到这个 Compute 函数首先检查了两个 Tensor 是否是矩阵,然后检查两个矩阵的形状是否符合矩阵相乘的条件,然后根据形状分配 TensorShape 并且根据 TensorShape 分配新的 Tensor (其实顺便分配的 TensorBuffer 的内存空间)。然后通过 LaunchMatMul 真正执行相乘操作,因为这个计算过程,可能是用了 GPU,所以模版是带 Device 的(GPUDevice/CPUDevice)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void Compute(OpKernelContext* ctx) override {
const Tensor& a = ctx->input(0);
const Tensor& b = ctx->input(1);

// Check that the dimensions of the two matrices are valid.
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(a.shape()),
errors::InvalidArgument("In[0] is not a matrix"));
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(b.shape()),
errors::InvalidArgument("In[1] is not a matrix"));
Eigen::array<Eigen::IndexPair<Eigen::DenseIndex>, 1> dim_pair;
dim_pair[0].first = transpose_a_ ? 0 : 1;
dim_pair[0].second = transpose_b_ ? 1 : 0;

OP_REQUIRES(
ctx, a.dim_size(dim_pair[0].first) == b.dim_size(dim_pair[0].second),
errors::InvalidArgument(
"Matrix size-incompatible: In[0]: ", a.shape().DebugString(),
", In[1]: ", b.shape().DebugString()));
int a_dim_remaining = 1 - dim_pair[0].first;
int b_dim_remaining = 1 - dim_pair[0].second;
TensorShape out_shape(
{a.dim_size(a_dim_remaining), b.dim_size(b_dim_remaining)});
Tensor* out = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(0, out_shape, &out));

if (out->NumElements() == 0) {
// If a has shape [0, x] or b has shape [x, 0], the output shape
// is a 0-element matrix, so there is nothing to do.
return;
}

if (a.NumElements() == 0 || b.NumElements() == 0) {
// If a has shape [x, 0] and b has shape [0, y], the
// output shape is [x, y] where x and y are non-zero, so we fill
// the output with zeros.
functor::SetZeroFunctor<Device, T> f;
f(ctx->eigen_device<Device>(), out->flat<T>());
return;
}

LaunchMatMul<Device, T, USE_CUBLAS>::launch(
ctx, a, b, dim_pair, &algorithms_, use_autotune_, out);
}

LaunchMatMul 继承自 LaunchMatMulBase,在 LaunchMatMulBase 当中调用了 functor::MatMulFunctor,这个 functor 主要就会执行乘法操作,在这之前会检查一下是否其中一个元素是 vector,这样可以直接优化算出来,而不用 Eigen 库来算,这样更快,这个目前看到的是 CPU 的路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template <typename Device, typename T>
struct LaunchMatMulBase {
#if GOOGLE_CUDA
typedef se::blas::AlgorithmType AlgorithmType;
#else
typedef int64 AlgorithmType;
#endif // GOOGLE_CUDA

static void launch(
OpKernelContext* ctx, const Tensor& a, const Tensor& b,
const Eigen::array<Eigen::IndexPair<Eigen::DenseIndex>, 1>& dim_pair,
std::vector<AlgorithmType>* algorithms, bool use_aututone, Tensor* out) {
#ifndef TENSORFLOW_USE_SYCL
// An explicit vector-matrix multiply is much better optimized than an
// implicit one and this is a bottleneck during non-batched inference.
bool was_vector = ExplicitVectorMatrixOptimization<T>(a, b, dim_pair, out);
if (!was_vector) {
#endif // TENSORFLOW_USE_SYCL
functor::MatMulFunctor<Device, T>()(ctx->eigen_device<Device>(),
out->matrix<T>(), a.matrix<T>(),
b.matrix<T>(), dim_pair);
#ifndef TENSORFLOW_USE_SYCL
}
#endif // TENSORFLOW_USE_SYCL
}

static void GetBlasGemmAlgorithm(OpKernelConstruction* ctx,
std::vector<int64>* algorithms,
bool* algorithm_set_flag) {}
};

MatMulFunctor 在设备 d 上计算矩阵相乘的结果,其中调用的是 MatMul<CPUDevice>

1
2
3
4
5
6
template <typename Device, typename In0, typename In1, typename Out,
typename DimPair>
void MatMul(const Device& d, Out out, In0 in0, In1 in1,
const DimPair& dim_pair) {
out.device(d) = in0.contract(in1, dim_pair);
}

这里的 contract 调用的是 TensorContractionOp (third_party/unsupported/Eigen/CXX11/src/Tensor/TensorContraction.h),跟之前说的一样,这个 Op 是计算图的一部分,要通过 eval 来做计算,计算结果是 eval 驱动的。TensorContractionOp 的构造函数就是,这负责构建左表达式和右表达式。

1
2
3
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorContractionOp(
const LhsXprType& lhs, const RhsXprType& rhs, const Indices& dims)
: m_lhs_xpr(lhs), m_rhs_xpr(rhs), m_indices(dims) {}

真正的计算过程在 TensorContractionEvaluatorBase 里面,真正执行计算过程,计算细节就省略了主要是矩阵相乘。

CUDA

如果条件编译 GOOGLE_CUDA 的话,会使用 GPU 的代码,对应会调用到 steam executor,这个以后具体分析。

总结

Tensorflow 基于图模型的,并且是懒计算的,通过扩展可以自己用 C++ 实现新的 Op,并且也可以观察默认自带的 OpKernel 是如何实现的,对于理解 Tensorflow 的工作流程会有很大的帮助。Tensorflow 本身依赖了 Eigen,CUDA 等线性代数库或者 GPU 计算库,要看懂代码还是要多学一点线代的知识,比如 Contraction 这个概念我也是第一次晓得。

参考文献

  1. 深入理解 Tensorflow 架构设计与原理实现

Raft 的作者其实对 Paxos 研究得很深,毕竟 Paxos 基本上就是共识算法的代名词,建议在了解 Raft 之前先看看 Paxos,因为 Paxos 里面有一段从单点开始完整的推断共识算法的正确性和必要性的过程,方便建立一种对共识算法上直觉的认同。Raft 论文最好看作者的博士论文,那个比较完整。

leader election

Raft 有 Term 的概念,把时间每分成了一个个的任期,每个任期开始是选举过程。Raft server 有三种角色,Candidate 就是选举中的状态,Leader 是当选的状态,Follower 是服从并选择接受 Leader 的 Log 的状态。

其实可以把选主看作一个 basic Paxos,大家对 leader=? 达成一致,触发的条件是超时和初始化的时候,term 就是类似于 Paxos 的 propose number,和 basic Paxos 不一样的是每个 term 只能投一个人,所以每个 term 不会有冲突,只有一个人会当选,但是会有平票的问题,大家用同一个 term 选举的情况下就会发生(比如说大家都投自己,谁都不接受谁),所以为了减少冲突,把重试的时间随机化,快速超时的人能拿到高 term,并且其他慢速超时的 candidate 会服从这个先拿到下一个 term 的 leader。成为 leader 的标志是收到了大部分人的服从(3/5 或者 2/3)。

log replication

log 不一致的就以上几种情况,可能少了 entry,可能多了没 commited 的 entry,可能又少了 entry 还多了没 commited 的 entry,log 不一致的情况就这几种。index term 能唯一确定一个 entry,如果一个 entry 确定,之前的也确定了。基于上面的性质,如果 entry commited 之前的就是都 commited 了。这个通过 append 的 prev index prev term,来保证这个特性。解决方法是在 AppendRequest 里面带上新的 entry 前的 index 和 term(在 leader 那里存的前一个),如果这个 entry 不在 follower 里面的话,就拒绝追加新 entry,然后 leader 就要用前一个 index 尝试,直到开始出现 match 的点,有点像 git tree 里面开始产生分支的那个 base,这个过程一直减少自己保存的 follower 的 nextIndex,一直到获得和自己 match 的最后一个 index 开始追加复制自己的 log。也就是说这样会强迫所有的 follower 复制主节点的 log,这里有一个按 term 回溯 base 的方法,但是这个优化作者认为有点多此一举,毕竟错误发生的情况比较少。这里会有一个问题(啥问题),后面会通过选举的时候加上限制解决。

这个问题就是,新的 leader 可能没有之前 commited 的 log,然后修改了其他 follower 的 log,导致每个 state machine 执行的命令是不一致的,所以要加一个约束,在投票的时候,新的 leader 必须包含之前 commited 的 log,这个保证就需要通过定义 entry 的 update-to-date,在投票的时候拒绝比自己老的 leader 当选。

safety

这个问题被称作 safety,为了解决这个问题 update-to-date 怎么定义呢?比如下面这个例子,如果 S3 挂了,其实 index 5 已经 commited 了,但是没有办法确定 index 5 的这个 log 是否 commited 了。所以要选个最可能包含所有 log 的做 leader。

先比 term,再比 log 数量,按顺序比,有点像字符串比较,这样可以确保 leader 有其他 follower 的 entry。没有大多数人的又新又全就无法成为 leader。

commit from previous entry

如果一个 leader 在 commit 之前 crash 了,并且新的 leader 没有已经在大多数节点上的 entry 可能会覆盖掉本该 commit 的 entry。

比如这图,如果是 (d1) 的情况,3 就会把 2 盖掉,但是实际上 2 本该 commited 了,但是在 commit 之前 S5 可能在 term 5 成为了主(这个没有违背上面的 upda-to-date 因为 3 确实比 2 要新,并且可以从 S2, S3, S4 那里获得选票)。

光看数量,没办法替之前 term 的 entry commit,因为 d1 的情况,可能就把 2 盖掉了。只有 term 4 的时候 4 commit 了(当前的 term 可以 commit)才能替之前 term 的 entry commit(实际上间接 commit 了,这个可以证明),这样 term 3 就不会成为主,就不可能盖掉 2。

总结一下,在 Raft 中 leader 不会 commit 之前 term 的 entry,只 commit 当前 term 的 entry。当前 term 的 entry commit 了,之前 term 的 entry 就间接 commit 了。

持久化数据

当前的 term 和 vote 。

membership change

raft conf change 有几个主要阶段,其中 C(old)+c(new) 的情况下,需要服从两个 conf 的大多数,并且,每次只能一个一个的增加 server 和 减少 server。

参考文献

Paxos 是什么

Paxos is a mechanism for achieving consensus on a single value over unreliable communication channels.

Paxos 就是一个在不稳定的网络环境下建立的对一个值的共识。我们说的 Multi-Paxos 是指对多个值的共识,不过我们先一步一步来。

在 Paxos 中是没有 leader 这个概念的,所以相对来说会比较慢,因为谁都可以处理请求并且把自己的 peer 盖掉导致冲突,达成一致的过程会更长。

Paxos 只抵抗机器崩溃,网络异常,不抵抗恶意行为的节点,或者说使用不同协议的参与者,或者说参与者不能撒谎,所以经典的 Paxos 不抵抗拜占庭问题(也有针对拜占庭问题的 Paxos 带了 verify 的过程)。

Paxos 论文里面描述的算法,其实不是很清晰,一般人看了都会有疑问,所以后来很多人对这个算法做了补充和解释,包括作者本人。

基本需求

第一个是安全性,就是只有一个值会被选择,并且节点对这个值不会主动知道,而是在值被选择以后被动学习知道这个值被选择的(这个原文有点难懂我换成了自己的话解释了一下)。
第二个是活性,也就是值最终会被选择,并且所有节点会最终学习到这个值被选择了。

解决方案

首先单点可以排除,因为单点虽然是最容易解决一致性问题的,但是如果单点挂了,整个就不可用了,所以显然不能依靠单点。

那如果每个节点就接受自己接受到的第一个值,也会有平票问题(split votes)。redblue 对等的,那到底谁被选择了呢,所以来了就接受的策略并不可行。

Paxos 是 quorum based 的,表示的一致性协议是少数服从多数的,在大多数节点都接受了这个值以后,这个值就被选择了。为了让节点可以接受多个值,多个值之间需要区分,所以就有了提交号,这个号码是单增的,拒绝掉小号的提交,并且当一个值已经被选择,那么之后的提交都要提交这个值,这样做的目的是让提交者知道这个值被学习了,是大家认可的一个值。有了少数服从多数原则,就会碰到冲突的问题。

这种情况就是下面这样,从时间上来讲,red 已经被选择了,如果 S3 能够拒绝 red 的提交,那么 S3,S4,S5就可以拿着 red 重试,并且知道 red 已经被大多数人接受了,而知道冲突的 S3 就会决绝这次请求,这个点上就处理了两个值(接受了一个拒绝了一个)。

序列号可以帮助我们区分优先关系,这样因为网络问题延迟的请求就可以被处理。下面这种情况,red 虽然先提交,但是并不是先被大多数接受的(存在网络延迟),这个时候blue已经被选择了(被大多数人接受),我们需要提交 red 的提交意识到自己已经太“老”了,而触发这就是接受了 blueS3

所以你会发现,矛盾的点其实就是这个 S3,也就是少数服从多数原则,能保证任意的大多数都是有交集的。交集中的点会发现矛盾和之前接受的值有矛盾选择拒绝。

问题:三节点的容忍度是 1,四节点的容忍度是多少?

答案:也是 1,因为要形成发现矛盾的交集对于 4 来说,要达到 3/4,才能构成大多数,这就是为什么集群选单数的原因,因为双数从算法的角度来说没什么帮助。

接下来看具体的算法。

其实,从朴素角度来说,经典 Paxos 看起来就是一个两阶段提交的过程,首先是准备阶段,选择一个提交号 n,提交 prepare(n),接受者需要返回自己接受的值,和已经接受的提交号。当从大多数收到回复以后就可以做判断了,如果有返回接受值,选择提交号最大的值进行下一阶段(这个行为对应的是发现有值可能被接受了,尝试服从或者学习这个接受),不然就可以用自己的值进行下一阶段。

下一阶段就是 accept(value,n),如果接受者发现自己目前收到的n,没有比accpet给的n大,就接受这个值,并且更新自己的n,否则就拒绝(这里就保证提交者能够发现自己变老了或者被拒绝了)。
如果接受者发现提交号大于自己当前的最大提交号,就接受这个值,不然就拒绝。当提交者从大多数人那里接受到返回以后发现有拒绝的情况,就进行重试拿一个新的n开始,否则这个值就被接受了。

Basic Paxos value 就是设置一次,不存在再设次一次的情况。

总结起来就是如图:

那这样的一个二阶段提交,看看能不能解决前面的问题,主要有三种可能。
注意,这里的例子是原文里的例子,一个变量的取值是X或者Y,然后3.1 表示 S1提交的提交号为 3 的提交,这是我们定义的 message ID。

第一种值被选择了,后者意识到了X,放弃 YX 提交,也就意味着提交者学习到了这个值已经被选择了。

第二种情况,值没有被选择,但是交集的部分看到了一个被选择的值,也会选择放弃YX提交,虽然X暂时没有被选择(被大多数人接受),但是可以保证两个提交都成功。

第三种情况,值没有被选择,同时交集的部分也没有发现被选择的值,如果已经做了 promies 这个时候就会拒绝老的提交。比如下面在 S3 accept(X) 的时候就会放弃提交并且进行重试(S1 S2 也会一起重试), 并且在重试的时候覆盖掉原先接受的x)。

看起来所有的问题都解决了,但是活性问题无法保证。这个情况发生在提交之间相互阻塞的情况,S3 S4 S5 拿着更高的提交号导致 S1 S2 S3 的 accept 被拒绝重新进行提交,又把 S3 S4 S5 给拒绝了。

解决这个问题的办法就是把重试时间进行一些随机化,减少这种巧合发生,或者把重试的时间指数增长等等。

Multi-Paxos

到此 classic Paxos 算是告一段落。那 Paxos 有什么问题呢。首先是活性的问题,这个在后面可以通过选主的方式解决。其次是学习是通过 propose 获知值的,不然无法知道一个值是否被接受了,要走一遍整套的 Paxos 协议。

Multi-Paxos 新增的问题是如何选择 log entry,并且用选主的方式减少冲突,以及减少 prepare 的请求。

以下图为例。深色框表示这个 log 已经被选择(怎么确定选择后面会提到)。寻找 log entry 的方式就是寻找第一个没有被选择的 log,尝试执行 basic Paxos,如果有值被选择会尝试用这个 log 提交帮助这个 log 被选择,这个过程和 basic paxos 是一致的,但是在此之后就要继续寻找下一个 log 尝试进行我们的尝试。

下图中(只看 S1 和 S2 这两行),第一次会找到最后一个没有没学习的 log,也就是 index=3 的 log,但是发现了 s1cmp,就会服从这个 cmp,然后到 index=4 发现了 s2sub,也选择服从,并且学习到了 index=4 的 log 应该是 sub,最后到了 index=5 才进行插入。

这样的情况下,Paxos 可以接受并发的请求,而 Raft 却规定了对 log 只能 append,不能 3 4 5 都能同时处理,简化了实现。反正只要保证了 log 的顺序一致,状态机的最终状态都是一致的。

接下来的是对于性能的一些提升,一个是选主避免大面积冲突,另一个是优化二阶段提交的次数。解决方法是选出一个主,保证一次只有一个提交者。另外对于 prepare 是对整个 log 进行 prepare,而不是单个 log entry,这样大多数情况下,大部分 log 都可以一次性就被选择。

可以通过 lease based 选主。 lamport 提出的方式比较简单,节点之间维持T间隔的心跳,2T 之内没有收到更高编号的主的心跳就成为主,非主则转发请求给主,这样还是不会避免同时出现两主的情况,两主会有冲突,但 Paxos 就算有两主还是能正常运行,毕竟有主只是优化方式。

接下来我们看看优化提交的过程。回忆一下,prepare 的作用,其中一点是帮助我们发现冲突,知道有值可能被选择了,另外一点是拒绝老的提交,让他们发现自己变“老”了。

我们可以改变提交号的意义,让他代表整个 log,也就是所有 log entry 都用一个提交号,这样接受者可以通过返回 noMoreAccepted 让提交者意识到在当前 log entry 之后的 log 都没有 accpeted 是可以被“锁”住的,然后如果大多数节点返回 noMoreAccepted,就可以跳过之后的 prepare 直到 accept 被拒绝。这样后续的 accept 操作就可以不用 prepare 在一趟之内解决,所以二阶段提交的第一阶段的 promise 落在了整个 log 上。这个情况下 发起者的 accept 一直顺序进行就可以,问题发生在有其他主(之前说到的会临时有多主的情况),又提交了 prepare,然后这个提交号更高,把后一段锁住了,accept 就会发现冲突学习新的值。

补充 1 持续发送 accept 请求,让所有的节点都同步这个 log。
补充 2 让 proposer 带上 firstUnchosenIndex 让 acceptor 知道大多数可以确认被选择的 log。

但是还有问题,例如下面这张图,对于所有小于 firstUnchosenIndex 的 i 来说,如果 accpetedProposal[i] 的提交号和 request 的 proposal 一样的话,就可以确认 log[i] 是被选择了的,并且标记 acceptedProposal[i] = 无限。但是 2.5 确实来自之前的 leader 的,貌似无法被标记为选择了。这需要我们进一步修改这个协议。

这个时候需要在 accept 返回的时候带上自己的 firtUnchosenIndex,如果 proposer 的比这个大,可以把 acceptor 直接补齐。用 success 命令,让 acceptor 直接更改index 的这个值, 并且继续返回 firtUnchosenIndex,让 proposer 弥补。

最后一部分是配置改变,degree of replication,也就是要保证同时只有一个 conf 生效,可以用 a 个之前的 log 做 conf,这样在使用这个 conf 之前,conf 已经发生了改变。但是同时限制住了并发度(只能为a)。直觉上讲就是约定一个 index 在某个时间点一次性切换。

参考文献