ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

背景

Horovod 是一个兼容主流计算框架的分布式机器学习训练框架,主要基于的算法是 AllReduce,这个是 baidu-research 在17年做的一个实现,这个东西原来是高性能计算范畴里的东西应用了 MPI 并行计算接口来实现,这是并行计算里的一个框架,已经很老了,这里有一个介绍 MPI 的 tutorial 写的比较好。

在介绍 horovod 的之前需要解释一下 AllReduce。在 MapReduce 里面 reduce 被翻译成了规约,在上面提到的 MPI tutorial 里面的解释是

Reduce is a classic concept from functional programming. Data reduction involves reducing a set of numbers into a smaller set of numbers via a function. For example, let’s say we have a list of numbers [1, 2, 3, 4, 5]. Reducing this list of numbers with the sum function would produce sum([1, 2, 3, 4, 5]) = 15. Similarly, the multiplication reduction would yield multiply([1, 2, 3, 4, 5]) = 120.

就是说把一个大的集合“缩减”成了小的集合,这里要注意的是这种缩减的计算是要满足交换律的,也就是减法或者除法是不行的,因为在并行计算当中不太好去控制计算的顺序。Reduce 就是这个意思,具体到 MPI_Reduce 就是把不同节点的数字“缩减”到一个节点上,支持的计算方式有加法乘法和取大小值等。

教程中给出的 Reduce 是求和。

AllReduce 就是在每个节点都获得 Reduce 的结果

基于这个标准就有很多的 All-Reduce 的实现,比如 Ring-Reduce,这个实现分两部分,一部分是 Scatter-Reduce 另一部分是 All-Gather。最早是在这篇 post里提到的。这个算法的好处是可以摆脱之前 PS 非常依赖 Parameter-Server 的带宽,Parameter-Server 的带宽会成为计算瓶颈的问题,而 AllReduce 可以让每个节点在带宽传输中的位置是对等的,并且减少传输次数。具体的算法可以看文章的解释,scatter-reduce 就是让每个节点有 K/N 的一个 reduce(也就是 sum),然后把自己的一个 K/N 的 reduce 再传递给其他节点,每个节点只和自己相邻的节点通信。

In the system we described, each of the N GPUs will send and receive values N-1 times for the scatter-reduce, and N-1 times for the allgather. Each time, the GPUs will send K / N values, where K is the total number of values in array being summed across the different GPUs. Therefore, the total amount of data transferred to and from every GPU is

Data Transferred=2(N−1)KN

数据传输量在 N 比较大的时候越没有影响,这就消弭了多节点给 Parameter-Server 造成的瓶颈。

还有一些其他术语,假设有 4 台 4 卡的 GPU 服务器。size 是工作进程(GPU)的数量(6),rank 是所有工作进程的 id(0-15),local rank 是当前服务器上的 id(0-3)。

Horovod 的介绍

使用 horovod 有一定的侵入性,代码需要一定的修改才能变成适配分布式训练,但是有一个好处就是适配的成本不高,并且 horovod 提供的各种框架的支持可以让 horovod 比较好的在各个框架的基础上使用,他支持 tensorflow/keras/mxnet/pytorch,MPI 的实现也有很多,比如 OpenMPI 还有 Nvidia 的 NCCL,还有 facebook 的 gloo,他们都实现了一种并行计算的通信和计算方式。而且 horovod 的本身的实现也很简单。

使用

Keras 用 ResNet50 训练 ImageNet 为例,主要侵入了几部分 hvd.init() 这个是 MPI 的初始化,让并行进程能够知道自己的 rank/local_rank 等信息。

第二部根据 local_rank(相当于单节点上的第n张卡),并且设置不占用全部显存,按需分配(可能因内没有统一管理导致显存碎片),然后传递给 keras 设置 session。

1
2
3
4
5
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))

然后在 rank 0 上恢复一个 checkpoint 并且广播给其他节点,这里的 broadcast 后面会介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
for try_epoch in range(args.epochs, 0, -1):
if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
resume_from_epoch = try_epoch
break

# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch')

# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0

设定传输的压缩函数,具体的压缩后面会提到,然后要么从之前的模型恢复要么重新训练。关键的 wrapper 在 opt 上,会给本地的 opt 包装一个 DistributedOptimizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast both model and optimizer weights
# to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch),
compression=compression)
else:
# ResNet-50 model that is included with Keras is optimized for inference.
# Add L2 weight decay & adjust BN settings.
model_config = model.get_config()
for layer, layer_config in zip(model.layers, model_config['layers']):
if hasattr(layer, 'kernel_regularizer'):
regularizer = keras.regularizers.l2(args.wd)
layer_config['config']['kernel_regularizer'] = \
{'class_name': regularizer.__class__.__name__,
'config': regularizer.get_config()}
if type(layer) == keras.layers.BatchNormalization:
layer_config['config']['momentum'] = 0.9
layer_config['config']['epsilon'] = 1e-5

model = keras.models.Model.from_config(model_config)

# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(),
momentum=args.momentum)

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt, compression=compression)

model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy', 'top_k_categorical_accuracy'])

然后设置一些回调函数,hvd.callbacks.BroadcastGlobalVariablesCallback(0) 保证的是 rank 0 上的所有参数只在 rank 0 初始化,然后广播给其他节点,后面是学习率 decay 的设置和一些统计信息的回调打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),

# Horovod: average metrics among workers at the end of every epoch.
#
# Note: This callback must be in the list before the ReduceLROnPlateau,
# TensorBoard, or other metrics-based callbacks.
hvd.callbacks.MetricAverageCallback(),

# Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose),

# Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3),
]

最后直接用 allreduce 计算一个 evaluation score。

1
2
# Evaluate the model on the full data set.
score = hvd.allreduce(model.evaluate_generator(input_fn(False, args.train_dir, args.val_batch_size),NUM_IMAGES['validation']))

实现

适配层和压缩算法

horovod 的实现主要分几部分,第一部分是一个适配层,用于兼容各种框架,比如 tensorflow 的适配就是实现一个新的 Op,这个可以参考 add new op,里面规范了 Tensorflow 自定义算子的实现。

请注意,生成的函数将获得一个蛇形名称(以符合 PEP8)。因此,如果您的操作在 C++ 文件中命名为 ZeroOut,则 Python 函数将称为 zero_out。

C++ 的定义是驼峰的,生成出来的 python 函数是下划线小写的,所以最后对应的是,适配Op的代码在 horovod/tensorflow 目录下面

C++ Python
HorovodAllgather horovod_allgather
HorovodAllreduce horovod_allreduce
HorovodBroadcast horovod_broadcast

另外在适配层可以加入一些压缩算法(在 horovod/[framework]/compression.py),我觉得压缩算法和框架无关的,放到适配层下面可能有别的原因,比如 tensorflow 默认带了一个 float16 压缩,具体的其他压缩算法比如3LC,可以通过有损压缩或者无损压缩提高带宽利用率。

统一层

这一层的实现是统一的,所有的适配层最后都是发出一些 Op+Tensor 的 Message 到队列中,后台初始化的时候会有一个专门的线程专门消费这个队列。他有一个同步消息的过程,相当于这个 tensor 在所有节点上都就绪以后就可以开始计算了,主体的流程是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// The coordinator currently follows a master-worker paradigm. Rank zero acts
// as the master (the "coordinator"), whereas all other ranks are simply
// workers. Each rank runs its own background thread which progresses in ticks.
// In each tick, the following actions happen:
//
// a) The workers send a Request to the coordinator, indicating what
// they would like to do (which tensor they would like to gather and
// reduce, as well as their shape and type). They repeat this for every
// tensor that they would like to operate on.
//
// b) The workers send an empty "DONE" message to the coordinator to
// indicate that there are no more tensors they wish to operate on.
//
// c) The coordinator receives the Requests from the workers, as well
// as from its own TensorFlow ops, and stores them in a [request table]. The
// coordinator continues to receive Request messages until it has
// received MPI_SIZE number of empty "DONE" messages.
//
// d) The coordinator finds all tensors that are ready to be reduced,
// gathered, or all operations that result in an error. For each of those,
// it sends a Response to all the workers. When no more Responses
// are available, it sends a "DONE" response to the workers. If the process
// is being shutdown, it instead sends a "SHUTDOWN" response.
//
// e) The workers listen for Response messages, processing each one by
// doing the required reduce or gather, until they receive a "DONE"
// response from the coordinator. At that point, the tick ends.
// If instead of "DONE" they receive "SHUTDOWN", they exit their background
// loop.

简单来讲就是说 coordinator 集 size 个 request DONE,然后找出就绪的 tensor (在 message_table 里面查找)构造出一个 read_to_reduce 的列表,然后发出 size 个 request 告知进程进行计算,然后 worker 接受到 response 开始真正的计算过程(通过 op_manager 具体执行)。

这是整体同步的过程,如果打开 horovod 的 trace log(HOROVOD_LOG_LEVEL=trace) 就能看到同步的过程。horovod 的主要 Op 除了 AllReduce 之外还有 allgather 和 broadcast。

算子实现层

具体的 op 在 common/op 可以看到有 NCCL/Gloo/MPI 等等的,这些由 op_manager 管理,他会根据优先级找到可以用来计算的 op 进行计算,比如 MPI 用的就是 MPI_Allreduce,具体 scatter-gather 和 all-gather openMPI 有现成的实现,NCCL 就直接调用 ncclAllReduce,比较新的 nccl 也支持跨节点的 allreduce 了,不用自己再套一层。

除了 allreduce 之外,还有两个比较重要的算子。

allgather 主要是比 allreduce 少一层 reduce,所有数据被发送到所有进程就可以。allreduce 的第二步就是把每个进程的 scatter-reduce 的 reduce 结果发送到所有进程。

broadcast 的作用是一对多的广播,主要是把初始化的参数同步给其他进程的时候使用。

总结一下 SysML 2019 的一些论文。

TICTAC

问题背景是解决分布式训练的 scale 问题。
如图,网络带宽和传输的顺序是关键因素,网络带宽很好理解,如果 Best 要提高只能加带宽,同时传输顺序如图会影响计算时间。

Communication to Computation Ratio 传输计算比,如果比值大说明效率高

  1. 增大 BatchSize,但是会过拟合(贾扬清的 Train ImageNet in 1 Hour),减小参数精度(FP16,腾讯绝艺 1024 张V100)。

  2. Overlay Coeffiecient 提供传输和计算的重合率,是 TicTac 的优化方向。

计算最优传输依赖,这是个 NP 问题,需要找近似解。recv op 是图的 root,可行的拓扑排序有很多种,解决方案就是找到近似优化排序。(原问题是 NP 问题)

几种符号

op.P 直接 Op 执行计算时间
op.M Op传输时间
op.M+ 触发计算 op 的最小传输时间

Tic

设 Communication Time 对于每个 recv op 来说都相等,从直觉上解释就是计算执行 recv op 的优先度。
如图计算对应的值

1
2
3
4
5
6
case 1
A.M = B.M = 1
case 2
A.M = B.M = 1
C.M = 1+1
D.M = 1+1+1

越小优先级越高,这个可以在 DAG 中静态算出

Tac

Communication Time 对于每个 recv op 来说有对应的时间,对应的算法。

系统设计

Time Oracle: Tensorflow metrics 计算 op time

Ordering Wizard: 计算静态依赖优先级

Enforcing: 修改 Tensorflow gRPC 子模块

P3

PRIORITY-BASED PARAMETER PROPAGATION FOR DISTRIBUTED DNN TRAINING

和上一篇论文类似,也是基于传输优化的思路,因为网络带宽基本上就是一个硬件问题。一些优化手段是用了有损压缩或者降低收敛时准确率的方式。还是通过提高 overlap 来实现。
直观的现象是如图 L4 的一次正向传播和反向传播之间的间隔很大。

解决方法:带优先级的参数切分

在 MXNet 里面,worker 算完当层的梯度
就会提一个 pull request 当其他 work 同步了这个梯度。
问题在于发送的顺序和反向传播的顺序一致,并且颗粒度
是以 layer 为单位的。

根据领域知识将大 layer 分片成小 slice。
对每个 slice 进行权重排序,优先传输权重高的 slice。
被称为 Priority-Based Parameter Propagation

如图如果参数传输有优先级能够被中断就能减少 delay

大部分模型每层的参数是不平衡的,特别是全连接的参数非常大。

对每一层切分以后利用 TCP 的全双工可以达到流水线的效果。

P3 的设计

参数切片:根据领域知识和实验选择大小(50000)
参数优先级:下一层先需要的先发送,可以抢占低优先级的

我个人觉得简单,这个比上一个更好理解也简单。

BlueConnect

主要是一个对分层(原本的一层或者两层)的 All-Reduce 的泛化算法。
首先使用递归的方法可以减少 all-gather 的传输次数。

这个只要三次 logp

ring reduce 要7次 (p-1)

问题背景

多卡之间的带宽很高 32GB/s,不同网络拓扑下,最慢的会成为瓶颈。
分两层可以解决网络和总线的带宽差异。

对于二层的一个泛化,

三层的 reduce-scatter 的例子,reduce-gather 是反的。

BEYOND DATA AND MODEL PARALLELISM FOR DEEP NEURAL NETWORKS

这个主要是提到了并行程度除了数据和模型之外可以更细粒度到参数和属性,也就是模型的小划分和样本的小划分。
SOAP= Sample Attribute Operator Parameter

全连接层有大量的参数,造成传输瓶颈。
领域特定的优化:RNN 和 CNN 用 data 并行,最后全连接用 model 并行。

搜索策略

找到最优并行策略是 NP 问题。
Execution Optimizer 通过找到最小 cost 的
并行策略寻找较优解。
Cost = tensor size/ bandwidth

随机从搜索空间选取策略,依据 MCMC
最够贪心,又不会太局部贪心。

总的来有点和 AutoML 类似,在一个搜索空间选择一个策略使得 cost 最低,一个是原本的 cost 函数,这个是
计算本身执行的时间。目前主流框架也不支持 SOAP 级别的并行划分,没手调的好,但很接近。

可以看到搜索出来的结果全连接基本上是单卡算的

行星运动我们都是按万有引力定律算的,实际上按照爱因斯坦的说法,万有引力不是惯性力是时空弯曲的效果,但是按照爱因斯坦的公式算起来特别复杂,所以现在航天轨道的设计还是按万有引力定律在算。

牛顿万有引力定律

牛顿万有引力定律:任意两个质点由通过连心线方向上的力相互吸引。该吸引力的大小与它们的质量乘积成正比,与它们距离的平方成反比,G 是万有引力常数。

$$ F=-\frac{G M \cdot m}{r^2} \cdot \frac{\vec{r}}{r} $$

多体问题

多体问题,是所有吸引力的叠加,这里排除物体相撞的情况,因为宇宙非常大,两个天体之间的距离非常远,暂时不考虑这种情况,通过加入一个小常量可以在计算上忽略这个问题。

$$ F=-\frac{G M m r}{(r^2 + \epsilon^2)^{\frac{3}{2}}} $$

这样累加的时候自己也可以加进去,因为 r 是 0,那一项相当于没算。

$$ F_j= \sum_{i=1}^{n} F_i $$

对于 N body 有很多的稍微近似的算法,多是基于实际的物理场景的,比如大部分天体是属于一个星系的,每个星系之间都非常远,所以可以将一些星系作为整体计算,构建一颗树,树中的某些节点是其所有叶子的质心,这样就可以减少计算量,但是放到 GPU 的场景下一个 O(N^2) 的暴力算法利用 GPU 的并行能力可以非常快的算出来。

Python 暴力解

下面是 N-Body 暴力实现的例子,网上有个各种语言实验的,比较好用来测试准确性,来自这里,就是两层循环。

1
2
3
4
5
6
7
8
9
10
11
12
cat <<EOF >> nbody.txt
0.01 3 20
1
0 0 0
0.01 0 0
0.1
1 1 0
0 0 0.02
0.001
0 1 1
0.01 -0.01 -0.01
EOF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import math

class Vector:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z

def __add__(self, other):
return Vector(self.x + other.x, self.y + other.y, self.z + other.z)

def __sub__(self, other):
return Vector(self.x - other.x, self.y - other.y, self.z - other.z)

def __mul__(self, other):
return Vector(self.x * other, self.y * other, self.z * other)

def __div__(self, other):
return Vector(self.x / other, self.y / other, self.z / other)

def __eq__(self, other):
if isinstance(other, Vector):
return self.x == other.x and self.y == other.y and self.z == other.z
return False

def __ne__(self, other):
return not self.__eq__(other)

def __str__(self):
return '({x}, {y}, {z})'.format(x=self.x, y=self.y, z=self.z)

def abs(self):
return math.sqrt(self.x*self.x + self.y*self.y + self.z*self.z)

origin = Vector(0, 0, 0)

class NBody:
def __init__(self, fileName):
with open(fileName, "r") as fh:
lines = fh.readlines()
gbt = lines[0].split()
self.gc = float(gbt[0])
self.bodies = int(gbt[1])
self.timeSteps = int(gbt[2])
self.masses = [0.0 for i in range(self.bodies)]
self.positions = [origin for i in range(self.bodies)]
self.velocities = [origin for i in range(self.bodies)]
self.accelerations = [origin for i in range(self.bodies)]
for i in range(self.bodies):
self.masses[i] = float(lines[i*3 + 1])
self.positions[i] = self.__decompose(lines[i*3 + 2])
self.velocities[i] = self.__decompose(lines[i*3 + 3])

print("Contents of", fileName)
for line in lines:
print(line.rstrip())
print
print("Body : x y z |")
print(" vx vy vz")

def __decompose(self, line):
xyz = line.split()
x = float(xyz[0])
y = float(xyz[1])
z = float(xyz[2])
return Vector(x, y, z)

def __computeAccelerations(self):
for i in range(self.bodies):
self.accelerations[i] = origin
for j in range(self.bodies):
if i != j:
temp = self.gc * self.masses[j] / math.pow((self.positions[i] - self.positions[j]).abs(), 3)
self.accelerations[i] += (self.positions[j] - self.positions[i]) * temp
return None

def __computePositions(self):
for i in range(self.bodies):
self.positions[i] += self.velocities[i] + self.accelerations[i] * 0.5
return None

def __computeVelocities(self):
for i in range(self.bodies):
self.velocities[i] += self.accelerations[i]
return None

def __resolveCollisions(self):
for i in range(self.bodies):
for j in range(self.bodies):
if self.positions[i] == self.positions[j]:
(self.velocities[i], self.velocities[j]) = (self.velocities[j], self.velocities[i])
return None

def simulate(self):
self.__computeAccelerations()
self.__computePositions()
self.__computeVelocities()
self.__resolveCollisions()
return None

def printResults(self):
fmt = "Body %d : % 8.6f % 8.6f % 8.6f | % 8.6f % 8.6f % 8.6f"
for i in range(self.bodies):
print(fmt % (i+1, self.positions[i].x, self.positions[i].y, self.positions[i].z, self.velocities[i].x, self.velocities[i].y, self.velocities[i].z))
return None

nb = NBody("nbody.txt")
for i in range(nb.timeSteps):
print("\nCycle %d" % (i + 1))
nb.simulate()
nb.printResults()

CUDA 并行计算

在 CUDA 中并行化首先要理解 CUDA 的层次结构,CUDA 有 grid 和 block 对线程做划分。首先设计一个 computation tile,相当于一个 block 的计算。

左侧表示一个 block 中的 p 个 body 执行 p 次,最后就能更新所有 body 的加速度,这里的内存占用是 O(p) 的不是 O(p^2),浅绿色表示的是串行执行的流,官方的文档没有更新,对应 CUDA 10 的例子里是这样的,vec3 存的是 x/y/z 轴的加速度,vec4 存的是坐标加质量,就是计算 i 受到 j 的加速度,这里没有并行设计是串行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename T>
__device__ typename vec3<T>::Type
bodyBodyInteraction(typename vec3<T>::Type ai,
typename vec4<T>::Type bi,
typename vec4<T>::Type bj)
{
typename vec3<T>::Type r;

// r_ij [3 FLOPS]
r.x = bj.x - bi.x;
r.y = bj.y - bi.y;
r.z = bj.z - bi.z;

// distSqr = dot(r_ij, r_ij) + EPS^2 [6 FLOPS]
T distSqr = r.x * r.x + r.y * r.y + r.z * r.z;
distSqr += getSofteningSquared<T>();

// invDistCube =1/distSqr^(3/2) [4 FLOPS (2 mul, 1 sqrt, 1 inv)]
T invDist = rsqrt_T(distSqr);
T invDistCube = invDist * invDist * invDist;

// s = m_j * invDistCube [1 FLOP]
T s = bj.w * invDistCube;

// a_i = a_i + s * r_ij [6 FLOPS]
ai.x += r.x * s;
ai.y += r.y * s;
ai.z += r.z * s;

return ai;
}

block 的 tile computation 就是串行计算 p 次 p 个 body 的加速度。

1
2
3
4
for (unsigned int counter = 0; counter < blockDim.x; counter++)
{
acc = bodyBodyInteraction<T>(acc, bodyPos, sharedPos[counter]);
}

每次计算完了以后要对线程做共享内存的同步

黑实线就是一次 block 共享内存同步的边界,也就是一次 tile compuation,图中是 p 个 body 按时间串行执行的流程,超出 p 的时间计算的不是 p 中的 body,看下面这张图。

也就是按照时间串行的计算每个 body 对一个 body 的加速度,然后 N/p 个 block 并行,每个 block 有 p 个线程并行,没 p 次计算每个 block 要同步一次,但是多个 block 之间不需要同步,所以每个并行线程的主体是这样的要在 tile computation 之间包一层同步的调用 cg::sync(ca),同步一次 block 中线程的共享内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename T>
__device__ typename vec3<T>::Type
computeBodyAccel(typename vec4<T>::Type bodyPos,
typename vec4<T>::Type *positions,
int numTiles, cg::thread_block cta)
{
typename vec4<T>::Type *sharedPos = SharedMemory<typename vec4<T>::Type>();

typename vec3<T>::Type acc = {0.0f, 0.0f, 0.0f};

for (int tile = 0; tile < numTiles; tile++)
{
sharedPos[threadIdx.x] = positions[tile * blockDim.x + threadIdx.x];

cg::sync(cta);
// This is the "tile_calculation" from the GPUG3 article.
#pragma unroll 128

for (unsigned int counter = 0; counter < blockDim.x; counter++)
{
acc = bodyBodyInteraction<T>(acc, bodyPos, sharedPos[counter]);
}

cg::sync(cta);
}

return acc;
}

从 GPU 的角度来看,时间复杂度是 O(N),空间复杂度也是 O(N),但前提是要开 N 个线程同时计算。

参考资料

n body nvidia

O(N) 算法计算天体运动

n body problem

模型的保存分三种类型

  1. 知道模型结构,单纯保存变量
  2. 不知道模型结构,保存模型和变量
  3. 不需要再改变量,只要常量化的模型(“冻结”)

第一种用于训练的存档,并且临时恢复,这个时候用户是把训练需要的网络结构在代码里面构造好了的,只是在一定的时间下需要暂时保存网络中的变量,为了在崩溃之后继续训练。所以自然而然会有一个问题,如果我用 Python 写的代码,需要在 C++ 当中恢复,我需要知道你的模型结构,才能恢复,这个最蠢的办法是用 C++ 把你的网络结构再构造一遍,但我们按照统一的协议(比如 Protobuf)确定网络结构,就可以直接从标准序列化的数据中解析网络结构,这就是第二种情况,独立于语言,模型和变量一起保存的情况。然后如果碰到我们不需要再训练了,比如只是把这个模型进行部署,不需要改变相关的变量,那么其实只要一个带常量的模型就可以,这就是第三种情况,把变量冻结的正向传播模型。接下来会依次解释这几种情况的工作方式。

除了这些以外,针对用于服务的模型还可以做很多的优化。

存档

存档只是单纯的保存变量,并且能够恢复,可以在一定的迭代次数以后保存变量,并且从任意一个存档开始重新训练。以两个变量加减 1 为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import tensorflow as tf

# Create some variables.
v1 = tf.get_variable("v1", shape=[3], initializer = tf.zeros_initializer)
v2 = tf.get_variable("v2", shape=[5], initializer = tf.zeros_initializer)

inc_v1 = v1.assign(v1+1)
dec_v2 = v2.assign(v2-1)

# Add an op to initialize the variables.
init_op = tf.global_variables_initializer()

# Add ops to save and restore all the variables.
saver = tf.train.Saver()

# Later, launch the model, initialize the variables, do some work, and save the
# variables to disk.
with tf.Session() as sess:
sess.run(init_op)
# Do some work with the model.
inc_v1.op.run()
dec_v2.op.run()
# Save the variables to disk.
save_path = saver.save(sess, "/tmp/tf-test/model.ckpt")
print("Model saved in path: %s" % save_path)

可以在 /tmp/tf-test 下面看到这几个文件 checkpoint model.ckpt.data-00000-of-00001 model.ckpt.index model.ckpt.meta

可以通过脚本观察保存的变量 python $tensorflow-src/tensorflow/python/tools/inspect_checkpoint.py --file_name=/tmp/tf-test/model.ckpt --all_tensors

得到保存的变量的内容,注意 model.ckpt 这个只是文件前缀。

1
2
3
4
tensor_name:  v1
[1. 1. 1.]
tensor_name: v2
[-1. -1. -1. -1. -1.]

如果要恢复的话,可以通过下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import tensorflow as tf

# Create some variables.
v1 = tf.get_variable("v1", shape=[3])
v2 = tf.get_variable("v2", shape=[5])

# Add ops to save and restore all the variables.
saver = tf.train.Saver()

# Later, launch the model, use the saver to restore variables from disk, and
# do some work with the model.
with tf.Session() as sess:
# Restore variables from disk.
saver.restore(sess, "/tmp/tf-test/model.ckpt")
print("Model restored.")
# Check the values of the variables
print("v1 : %s" % v1.eval())
print("v2 : %s" % v2.eval())

得到一样的效果

1
2
v1 : [1. 1. 1.]
v2 : [-1. -1. -1. -1. -1.]

具体来说 .meta 对应的是 MetaGraph 和 SaverGraph,.index 对应的是变量值的位置,key 是变量名,value 是变量保存的入口定义,data 变量的值具体保存的文件。这是恢复代码中已经原样构造出了 Graph,如果没有构造的化,需要通过 tf.train.import_meta_graph('/tmp/model.ckpt.meta') 来加载,但是存档保存的信息比较单一,Tensorflow 提供了一个更丰富的 API 来使用。

保存

SavedModelBuilder 保存的 API 比较丰富,能够保存多个 MetaGraph 和 Variables 的组合,除此之外还能附带 assets,并且要指定模型签名,simple_saved 的方法是一个简单版本的调用,适用于 Predict API。这里要展开一下 GraphDef, MetaGraphDef, SignatureDef, tags 这些东西的概念。对于 MetaGraph,这篇文章解释得很清楚。SignatureDef 是对应了一种图的输入和输出,可以依据这个进行 serving API 的调用,类似于函数签名,相对于一个接口的定义。

tensorflow_serving 自己给了个例子,执行 python mnist_saved_model.py /tmp/tf-test-2 以后可以获得一个目录,下面有版本 1 的模型数据,执行 saved_model_cli show --dir /tmp/tf-test-2/1 可以查看对应的签名。可以看到对应的层级关系,默认用于服务的模型会打上 serve 的标签,函数签名有两个,分别对应了 predict 和 classify 的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['predict_images']:
The given SavedModel SignatureDef contains the following input(s):
inputs['images'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 784)
name: x:0
The given SavedModel SignatureDef contains the following output(s):
outputs['scores'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: y:0
Method name is: tensorflow/serving/predict

signature_def['serving_default']:
The given SavedModel SignatureDef contains the following input(s):
inputs['inputs'] tensor_info:
dtype: DT_STRING
shape: unknown_rank
name: tf_example:0
The given SavedModel SignatureDef contains the following output(s):
outputs['classes'] tensor_info:
dtype: DT_STRING
shape: (-1, 10)
name: index_to_string_Lookup:0
outputs['scores'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 10)
name: TopKV2:0
Method name is: tensorflow/serving/classify

可以参考 tensorflow 的 REST API,比如 GET http://host:port/v1/models/${MODEL_NAME}[/versions/${MODEL_VERSION}] 其实对应这个例子就是 GET http://host:port/v1/models/tf-test-2/versions/1,然后感觉函数签名不同的 method name,可以调用不同的 request,比如 POST http://host:port/v1/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]:predict 这个格式,如果输入和输出对应的是 imagesscores 那么就对应了第一个签名。

冻结

冻结的情况就是变量不再需要修改,直接把变量转化成常量保存成单一的模型,方便在部署的场景下使用。
冻结模型的代码在这里,他的主要流程如下

  1. 清除所有 Op 中的 device,让原来在指定 CPU/GPU/节点 上的 Op 不再绑定。
  2. 通过 graph_util.convert_variables_to_constants 将所有的 Variable eval 一次,把变量的 Op 的结果拿到,替换成 constant

优化

除了冻结模型以外,还可以删减一些多余的节点,比如 Summary 节点或者 Identity 节点,甚者把 16bit 的浮点数权重修改为 8bit 的浮点数权重(这个在 Tensorflow Lite 里很有用)。这篇文章 列出了详细的优化方式,主要是靠 transform_graph 这个工具,地址在,他有很详细的柴剪列表,并且可以自己编写裁剪函数,充分做到模型在部署环节的“纯净化”,调用方式也很简单。

1
2
3
4
5
6
7
8
9
10
transform_graph \
--in_graph=tensorflow_inception_graph.pb \
--out_graph=optimized_inception_graph.pb \
--inputs='Mul:0' \
--outputs='softmax:0' \
--transforms='
strip_unused_nodes(type=float, shape="1,299,299,3")
remove_nodes(op=Identity, op=CheckNumerics)
fold_old_batch_norms
'

transforms里面加入你想进行优化的 transformer 和对应的参数即可,在科赛上也有在线可以跑的notebook

参考

  1. Cloud ML Engine

我照着 raft 论文重新过了一遍 etcd/raft 的代码,主要的文件是 etcd 下面的 raft.go。对照这个代码重新梳理一遍也算是深入理解一下 raft 算法。接下来会包含两个视频一个是选举相关的内容,一个是日志复制的内容,我 walktrough 的时候默认是大致读过论文的,对一些机制和字段都有了解,没有具体解析每个字段的来历,并且把一些问题按照代码的顺序重新理了一遍。

第一部分是关于选举的,raft 本身是一个 quorum based 的算法,一致性要靠“大多数人”的同意,并且为了简单和不必要的竞争,raft 只有一个主节点,在收到大多数人的投票以后成为主节点。

第二部分是关于日志复制的,raft 需要让主节点把客户端的请求复制到大多数节点上才能算达成一致,并且 commit,下面介绍的是复制的机制,并且 ConfChange 和 Snapshot 也是走这个流程达成一致的。

之后的 walkthrough 应该会慢慢补上,关于一些其他的逻辑和具体的一些函数的细节部分会在后面放出。

mvcc

v2 版本的实现存在 watcher 丢失或者丢失事件的问题,导致客户端要重新获取一遍。新版本是有本地嵌入式的数据库来避免这些问题。
mvcc 是 etcd v3 版本的存储实现主要有两个部分,一个是 backend 的 boltDB 和内存索引 key index。

BoltDB 是一个 B+ 树的嵌入式 KV store,相对于 leveldb 适用于写多读少的情况,我之前的文章简单介绍了一下 leveldb 的设计,BoltDB 比较适合读多写少,或者存在大范围 scan 的情况下比较合适,还有类似 levedb 的产品 badger,是纯 Go 实现的,在多写的情况下 leveldb 的衍生品表现更好。

BatchTx 就是收集很多的 call,然后在一个 Update 里面完成。

backend 里面存的 key 是 revision ,其中 main revision 是事务编号,sub revision 是事务中操作的编号,etcd 在 boltdb 上还做了一层缓存,所以多了一些锁机制。
内存索引中存的是真正的 key 到 revison 的索引。

key_index -> tx_buffer -> boltdb.tx

调用路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kv:
WriteView 是一些直接操作
ReadView
Read() TxnRead
Write() TxnWrite 是拿到对应的 transaction

revision:
main
sub
kvstore:
readView readView{kv} 用了 Read() Read/Write 是 kvstore 自己实现的用于拿 backend 的和自己的各种锁。(kvstore\_txn.go)
writeView writeView{kv} 用了 Write()
treeIndex:
keyIndex


writeView:
拿到 kv 的 Write(storeTxnWrite): 首先查 tw.s.kvindex.Get 然后再 backend.Tx UnsageSeqPut,然后更新 keyindex,append changes,处理 lease (old detach new attach)
readView
storeTxnRead: keyindex Revisons -> tx.UnsafeRange

lessor lesor(重音在后面)
leaseBucket 在单独的一个 bucket 里面

BPF

BPF (Berkeley Packet Filter) 最早是用在 tcpdump 里面的,比如 tcpdump tcp and dst port 80 这样的过滤规则会单独复制 tcp 协议并且目的端口是 80 的包到用户态。整个实现是基于内核中的一个虚拟机来实现的,通过翻译 BPF 规则到字节码运行到内核中的虚拟机当中。最早的论文是这篇,这篇论文我大概翻了一下,主要讲的是原本的基于栈的过滤太重了,而 BPF 是一套能充分利用 CPU 寄存器,动态注册 filter 的虚拟机实现,相对于基于内存的实现更高效,不过那个时候的内存比较小才几十兆。bpf 会从链路层复制 pakcet 并根据 filter 的规则选择抛弃或者复制,字节码是这样的,具体语法就不介绍了,一般也不会去直接写这些字节码,然后通过内核中实现的一个虚拟机翻译这些字节码,注册过滤规则,这样不修改内核的虚拟机也能实现很多功能。

在 Linux 中对应的 API 是

1
2
3
socket(SOCK_RAW)
bind(iface)
setsockopt(SO_ATTACH_FILTER)

下面是一个低层级的 demo,首先 ethernet header 的十二个字节记录了 ip 的协议,ip 的第9个字节记录 tcp 的协议,如果协议编号不匹配都跳到最后 reject,然后在到 tcp 的第二个字节是 port 看看是不是 80,都满足的话就 accept。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <net/if.h>
#include <net/ethernet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <netpacket/packet.h>
#include <linux/filter.h>

#define OP_LDH (BPF_LD | BPF_H | BPF_ABS)
#define OP_LDB (BPF_LD | BPF_B | BPF_ABS)
#define OP_JEQ (BPF_JMP | BPF_JEQ | BPF_K)
#define OP_RET (BPF_RET | BPF_K)

// Filter TCP segments to port 80
static struct sock_filter bpfcode[8] = {
{ OP_LDH, 0, 0, 12 }, // ldh [12]
{ OP_JEQ, 0, 5, ETH_P_IP }, // jeq #0x800, L2, L8
{ OP_LDB, 0, 0, 23 }, // ldb [23] # 14 bytes of ethernet header + 9 bytes in IP header until the protocol
{ OP_JEQ, 0, 3, IPPROTO_TCP }, // jeq #0x6, L4, L8
{ OP_LDH, 0, 0, 36 }, // ldh [36] # 14 bytes of ethernet header + 20 bytes of IP header (we assume no options) + 2 bytes of offset until the port
{ OP_JEQ, 0, 1, 80 }, // jeq #0x50, L6, L8
{ OP_RET, 0, 0, -1, }, // ret #0xffffffff # (accept)
{ OP_RET, 0, 0, 0 }, // ret #0x0 # (reject)

};

int main(int argc, char **argv)
{
int sock;
int n;
char buf[2000];
struct sockaddr_ll addr;
struct packet_mreq mreq;
struct iphdr *ip;
char saddr_str[INET_ADDRSTRLEN], daddr_str[INET_ADDRSTRLEN];
char *proto_str;
char *name;
struct sock_fprog bpf = { 8, bpfcode };

if (argc != 2) {
printf("Usage: %s ifname\n", argv[0]);
return 1;
}

name = argv[1];

sock = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
if (sock < 0) {
perror("socket");
return 1;
}

memset(&addr, 0, sizeof(addr));
addr.sll_ifindex = if_nametoindex(name);
addr.sll_family = AF_PACKET;
addr.sll_protocol = htons(ETH_P_ALL);

if (bind(sock, (struct sockaddr *) &addr, sizeof(addr))) {
perror("bind");
return 1;
}

if (setsockopt(sock, SOL_SOCKET, SO_ATTACH_FILTER, &bpf, sizeof(bpf))) {
perror("setsockopt ATTACH_FILTER");
return 1;
}

memset(&mreq, 0, sizeof(mreq));
mreq.mr_type = PACKET_MR_PROMISC;
mreq.mr_ifindex = if_nametoindex(name);

if (setsockopt(sock, SOL_PACKET,
PACKET_ADD_MEMBERSHIP, (char *)&mreq, sizeof(mreq))) {
perror("setsockopt MR_PROMISC");
return 1;
}

for (;;) {
n = recv(sock, buf, sizeof(buf), 0);
if (n < 1) {
perror("recv");
return 0;
}

ip = (struct iphdr *)(buf + sizeof(struct ether_header));

inet_ntop(AF_INET, &ip->saddr, saddr_str, sizeof(saddr_str));
inet_ntop(AF_INET, &ip->daddr, daddr_str, sizeof(daddr_str));

switch (ip->protocol) {
#define PTOSTR(_p,_str) \
case _p: proto_str = _str; break

PTOSTR(IPPROTO_ICMP, "icmp");
PTOSTR(IPPROTO_TCP, "tcp");
PTOSTR(IPPROTO_UDP, "udp");
default:
proto_str = "";
break;
}

printf("IPv%d proto=%d(%s) src=%s dst=%s\n",
ip->version, ip->protocol, proto_str, saddr_str, daddr_str);
}

return 0;
}

执行 curl "http://www.baidu.com",结果如下:

1
2
3
4
5
6
7
8
sudo ./filter ens3
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244
IPv4 proto=6(tcp) src=172.31.3.210 dst=220.181.112.244

这些低级别的操作都封装在了 libpcap 里面,一般不太会自己这么写。

eBPF

eBPF 是 extended BPF 具有更强大的功能。老的 BPF 现在叫 cBPF (classic BPF)。

首先是字节码的指令集更加丰富了,并且现在有了 64 位的寄存器(相较于上古时期的 32 位的CPU),有了 JIT mapping 技术和 LLVM 的后端。JIT 指的的是 Just In Time,实时编译。

一般的 ePBF 的工作流是编写一个 C 的子集(比如没有循环),通过 LLVM 编译到字节码,然成生成 ELF 文件,然后 JIT 编译进内核。

eBPF 一个最重要的功能是可以做到动态跟踪(dynamic tracing),可以不修改程序直接监控一个正在运行的进程。

在 eBPF 之前

在 ebpf 之前,为了实现同样的功能,要在执行的指令中嵌入 hook,并且支持跳到 inspect 函数,然后再恢复执行,这个流程和 debugger 非常相似,这是用 kprobe 来实现,kprobe 是 2007 年引入内核的。比如下面的例子,把 Instruction 3 改成跳转指令,然后再执行 Instruction 3,然后再跳转回去。

使用 kprobe 需要通过编译 kernel module 注册到内核当中,非常麻烦,等于是直接动内核的代码很容易引起内核 panic,而且每个内核版本都不一样,函数符号和位移是有区别的,对每个版本的内核都要编译一个对应的版本的 module。为了解决这个问题引入了一些静态的稳定的 trace point,不会因为版本而改变的地方可以插入 kprobe,但这样就限制了 kprobe 可以探测的范围。

有了 eBPF

有了 eBPF,就可以将用户态的程序插入到内核中,不用编写内核模块了,但是问题并没有改善,内核版本带来的问题还是没有解决。

eBPF 的 kprobe 一种方式时候 mapping,映射 kprobe 的数据到用户态程序,比如发包数,然后用户态程序定期检查这个映射进行统计。

另一种方式是 event (perf_events),如果 kprobe 向用户态程序发送事件来进行统计,这样不同轮询,直接异步计算就可以。

低级别的 API,这个只有 Linux 有

1
2
3
4
5
bpf() 系统调用
BPF_PROG_LOAD 加载 BPF 字节码
BPF_PROG_TYPE_SOCKET_FILTER
BPF_PROG_TYPE_KPROBE
BPF_MAP_* map 映射到 BPF 当中
1
perf_event_open() + ioctl(PERF_EVENT_IOC_SET_BPF)

高级别的接口是 bcc(BPFcompiler collection),转换 c 到 LLVM-epbf 后端,并且前端是 python 的。可以实现动态加载 eBPF 字节码到内核中。

weave scope 就是用 bcc 实现的 HTTP stats 的统计。

这里可以看到程序的主体,这里 hook 了内核函数 skb_copy_datagram_iter,这个函数有一个 tracepoint trace_skb_copy_datagram_iovec。在内核代码里面对应的是下面这段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* skb_copy_datagram_iter - Copy a datagram to an iovec iterator.
* @skb: buffer to copy
* @offset: offset in the buffer to start copying from
* @to: iovec iterator to copy to
* @len: amount of data to copy from buffer to iovec
*/
int skb_copy_datagram_iter(const struct sk_buff *skb, int offset,
struct iov_iter *to, int len)
{
int start = skb_headlen(skb);
int i, copy = start - offset, start_off = offset, n;
struct sk_buff *frag_iter;

trace_skb_copy_datagram_iovec(skb, len);

这里对这个 hook 注册了程序,具体的代码就不展示了,主要是根据协议统计这个 HTTP 的大小,方法等信息。

1
2
3
4
5
6
7
8
9
10

/* skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket
* buffers from kernel to userspace.
*
* skb_copy_datagram_iter() has an associated tracepoint
* (trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but
* it lacks the offset argument.
*/
int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *skb, int offset, void *unused_iovec, int len)
{

比如判断 method 是不是 DELETE 的是实现就比较蠢,是因为 eBPF 不支持循环,只能这么实现才能把 c 代码翻译成字节码。

1
2
3
4
5
case 'D':
if ((data[1] != 'E') || (data[2] != 'L') || (data[3] != 'E') || (data[4] != 'T') || (data[5] != 'E') || (data[6] != ' ')) {
return 0;
}
break;

除了 bcc 之外,waeve 使用了 gobpf,一个 bpf 的 go binding,并且通过建立 tcp 连接来猜测内核的数据结构,以达到内核版本无关,这个项目 tcptracer-bpf 还在开发中。

eBPF 的其他应用

还有一个比较大头的基于 eBPF 的是 cilium,一套比较完整的网络解决方案,用 eBPF 实现了 NAT,L3/L4 负载均衡,连接记录等等功能。比如访问控制,一般的 iptables 都是 drop 或者 rst,要过整个协议栈,但是 eBPF 可以在 connect 的时候就拦截然后返回 EACCESS,这样就不用过协议栈了。cilium 一个优化就是通过 XDP ,利用类似 DPDK 的加速方案,hook 到驱动层中,让 eBPF 可以直接使用 DMA 的缓冲,优化负载均衡。

BPF/XDP allows for a 10x improvement in load balancing over IPVS for L3/L4 traffic.

现在 k8s 最新的 lb 方案是基于 ipvs 的,我在 kube-proxy 分析 里面有提到过,已经比原来的 iptables 提高很多了,现在有了 eBPF 加 XDP 的硬件加速可以实现更高的提升,facebook 的 katran L4 负载均衡器的实现也是类似的。

cilium 在我看来基本上是 k8s 网络的一个大方向吧,只不过包括 eBPF 和 XDP 对硬件和内核版本都要比较新,是一个要持续关注的更新。

性能调优

Velocity 2017: Performance Analysis Superpowers with Linux eBPF里,Brendan Gregg (Netflix 的性能调优专家) 提到,性能调优也是 eBPF 的一个大头。用于网络监控其实只是 hook 在了协议栈的函数上,如果 hook 在别的地方可以有更多的统计维度。比如 bcc 官方的例子就是统计 IO Size 的大小的分布,更多关于基于 eBPF 的性能调优可以参考他的 blog,他给出了更详细的关于 eBPF 的解释,里面有一些列 Linux 性能调优的内容。

1
2
3
4
5
6
7
8
9
10
11
12
# ./bitehist.py
Tracing... Hit Ctrl-C to end.
^C
kbytes : count distribution
0 -> 1 : 3 | |
2 -> 3 : 0 | |
4 -> 7 : 211 |********** |
8 -> 15 : 0 | |
16 -> 31 : 0 | |
32 -> 63 : 0 | |
64 -> 127 : 1 | |
128 -> 255 : 800 |**************************************|

安全

在安全方面有 seccomp,可以实现限制 Linux 的系统调用,而 seccompe-bpf 则是通过 bpf 支持更强大的过滤和匹配功能,k8s pod 里面的 SecurityContext 就有 seccomp 实现的部分。

cgroup

在 cgroup 上有一个小原型,cgnet 获取 cgroup 的网络统计信息到 prometheus,也是基于 eBPF 的。

参考:

  1. Infrastructure 2017 - Alfonso Acosta - High-performance Linux monitoring with eBPF

  2. Using bpf in kubernetes

  3. Cilium: Networking and security for containers with BPF and XDP

csi 是一个标准的容器存储接口,规定了如何实现一个容器的存储接口,CSI 本身的定义是基于 gRPC 的,所以有一套样例库可以使用,这里分析一下 kuberntes 实现 csi 的方式,为了兼容 CSI kubernete 其实搞得挺绕的,目前这个 CSI 还是定制中包括后期的 Snapshot 的接口怎么设计等等还在讨论中。kubernetes CSI 主要基于几个外部组件和内部功能的一些改动。

CSI-Driver

这里规定了 CSI 的标准,定义了三个 Service,也就是 RPC 的集合,但是没规定怎么写,目前看到的实现都是把这三个 service 都写在一起,比较方便,然后部署的时候有些区别将就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
service Identity {
rpc GetPluginInfo(GetPluginInfoRequest)
returns (GetPluginInfoResponse) {}

rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
returns (GetPluginCapabilitiesResponse) {}

rpc Probe (ProbeRequest)
returns (ProbeResponse) {}
}

service Controller {
rpc CreateVolume (CreateVolumeRequest)
returns (CreateVolumeResponse) {}

rpc DeleteVolume (DeleteVolumeRequest)
returns (DeleteVolumeResponse) {}

rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
returns (ControllerPublishVolumeResponse) {}

rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
returns (ControllerUnpublishVolumeResponse) {}

rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
returns (ValidateVolumeCapabilitiesResponse) {}

rpc ListVolumes (ListVolumesRequest)
returns (ListVolumesResponse) {}

rpc GetCapacity (GetCapacityRequest)
returns (GetCapacityResponse) {}

rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
returns (ControllerGetCapabilitiesResponse) {}

rpc CreateSnapshot (CreateSnapshotRequest)
returns (CreateSnapshotResponse) {}

rpc DeleteSnapshot (DeleteSnapshotRequest)
returns (DeleteSnapshotResponse) {}

rpc ListSnapshots (ListSnapshotsRequest)
returns (ListSnapshotsResponse) {}
}

service Node {
rpc NodeStageVolume (NodeStageVolumeRequest)
returns (NodeStageVolumeResponse) {}

rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
returns (NodeUnstageVolumeResponse) {}

rpc NodePublishVolume (NodePublishVolumeRequest)
returns (NodePublishVolumeResponse) {}

rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
returns (NodeUnpublishVolumeResponse) {}

rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
returns (NodeGetVolumeStatsResponse) {}

// NodeGetId is being deprecated in favor of NodeGetInfo and will be
// removed in CSI 1.0. Existing drivers, however, may depend on this
// RPC call and hence this RPC call MUST be implemented by the CSI
// plugin prior to v1.0.
rpc NodeGetId (NodeGetIdRequest)
returns (NodeGetIdResponse) {
option deprecated = true;
}

rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
returns (NodeGetCapabilitiesResponse) {}

// Prior to CSI 1.0 - CSI plugins MUST implement both NodeGetId and
// NodeGetInfo RPC calls.
rpc NodeGetInfo (NodeGetInfoRequest)
returns (NodeGetInfoResponse) {}
}

比如说 GetPluginInfo 就是用来获取 driver 的 name 等信息的,NodePublishVolume 大部分情况下就是在节点上挂载文件系统,CreateVolume 这个如果对应的是 ebs 这种块存储可能就是在 API 里面建一个 ebs,如果对应的是 glusterfs 这种文件系统存储可能就是建一个 volume,然后 ControllerPublishVolume 对应 ebs 就是把 ebs 和 instance 绑定,然后调用节点的 NodePublishVolume 来挂载,如果是文件存储,可能就不需要 ``ControllerPublishVolume` 了,因为不需要绑定快设备到机器上,直接挂到网络接口就可以,这一套标准的目的一个是为了兼容现有的存储方案,一个是为了让一些私有的 provider 能够比较容易的实现一套方案,而不需要做过多的迁移,甚至厂商都不需要开源代码,如果是要实现 in-tree 的存储代码肯定是要开源的,因为 kubernetes 是开源的。

device-driver-registrar

kubernetes 实现 csi 的兼容,首先需要一个外部组件 devide-driver-registrar,初始化的时候通过 csi-sock 的 RPC 获取 driver name 和 node id。

主要功能给 node 打上下面类似的 annotations,dirver 对应的是 csi driver 的名字,name 对应的是 driver 的 NodeId 基本上就是 k8s 的 node name。这样可以让 ControllerPublishVolume 调用能够获取 nodeid 到 storage nodeid 的映射,理论上一样的就可以感觉。

1
csi.volume.kubernetes.io/nodeid: "{ "driver1": "name1", "driver2": "name2" }

他有两个模式,一个模式是自己给 node 打上这个 annotation,并且在退出的时候把这个 annotation 去掉。

另一个模式是交给 kubelet 的 pluginswatcher 来管理, kubelet 自己会根据 device-driver-registrar 提供的 unix domain socket 然后调用 gRPC 从 registrar 获取 NodeId 和 DriverName 自己把 annotation 打上。

搜索这条路径下的 socket/var/lib/kubelet/plugins/[SanitizedCSIDriverName]/csi.sock,然后就可以自动连接 registrar 拿到 NodeId 和 DriverName。

所以 device-driver-registar 主要是注册 Node annotation 的。

external-attacher

监听 VolumeAttachments 和 PersistentVolumes 两个对象,这是和 kube-controller-manager 之间的桥梁。

实现中最后会调用 SyncNewOrUpdatedVolumeAttachment 来同步,调用 csi dirver 的 Attach 函数。

in-tree 的 attach/detach-controller

在 CSI 中扮演的角色是创建 VolumeAttachment,然后等待他的 VolumeAttachment 的 attached 的状态。

attach-controller 会创建 VolumeAttatchment.Spec.Attacher 指向的是 external-attacher

external-provisoner

Static VolumeDynamic Volume 的区别是,有一个 PersistentVolumeClaim 这个会根据 claim 自动分配 PersistentVolume,不然就要自己手动创建,然后 pod 要指定这个手动创建的 volume。

external-provisoner 就是提供支持 PersistentVolumeClaim 的,一般的 provisioner 要实现 Provision 和 Delete 的接口。主要是根据 PVC 创建 PV,这是 Provisioner 的接口的定义了,不是 CSI spec 里的,这里顺带介绍一下。

external-provisoner 看到 pvc 调用 driver 的 CreateVolume,完成以后就会创建 PersistenVolume,并且绑定到 pvc。

kubelet volume manager

kubelet 有一个 volume manager 来管理 volume 的 mount/attach 操作。

desiredStateOfWorld 是从 podManager 同步的理想状态。

actualStateOfWorld 是目前 kubelet 的上运行的 pod 的状态。

每次 volume manager 需要把 actualStateOfWorld 中 volume 的状态同步到 desired 指定的状态。

volume Manager 有两个 goroutine 一个是同步状态,一个 reconciler.reconcile

rc.operationExecutor.MountVolume 会执行 MountVolume 的操作。

-> oe.operationGenerator.GenerateMountVolumeFunc

-> 首先根据 og.volumePluginMgr.FindPluginBySpec 找到对应的 VolumePlugin

-> 然后调用 volumePlugin.NewMounter

-> 然后拿到 og.volumePluginMgr.FindAttachablePluginBySpec attachableplugin

-> volumeMounter.SetUp(fsGroup) 做 mount

volume plugin

csi volume plugin 是一个 in-tree volume,以后应该会逐步迁移到都使用 csi,而不会再有 in-tree volume plugin 了。

1
2
3
func (c *csiMountMgr) SetUp(fsGroup *int64) error {
return c.SetUpAt(c.GetPath(), fsGroup)
}

csi 的 mounter 调用了 NodePublish 函数。stagingTargetPath 和 targetPath 都是自动生成的。

SetUp/TearDown 的调用会执行 in-tree CSI plugin 的接口(这又是 in-tree volume plugin 的定义,确实挺绕的),对应的是 NodePublishVolumeNodeUnpublishVolume ,这个会通过 unix domain socket 直接调用 csi driver。

总结一个简单的具体流程

首先管理员创建 StorageClass 指向 external-provisioner,然后用户创建指向这个 StorageClass 的 pvc,然后 kube-controller-manager 里的 persistent volume manager 会把这个 pvc 打上 volume.beta.kubernetes.io/storage-provisioner 的 annotation。

externla-provisioner 看到这个 pvc 带有自己的 annotation 以后,拿到 StorageClass 提供的一些参数,并且根据 StorageClass 和 pvc 调用 CSI driver 的 CreateVolume,创建成功以后创建 pv,并且把这个 pv 绑定到对应的 pvc。

然后 kube-controller-manager 看到有 pod 含有对应的 pvc,就用调用 in-tree 的 CSI plugin 的 attach 方法。

in-tree 的 CSI plugin 实际上会创建一个 VolumeAttachment 的 object,等待这个 VolumeAttachment 被完成。

external-controller 看到 VolumeAttachment,开始 attach 一个 volume,实际上调用 CSI driver 的 ControllerPublish,成功以后更新 VoluemAttachment 以后就知道这个 Volume Attach 成功了,然后让 attach/detach-controller (kube-controller-manager) 知道这个 attach 完成。

接下来就到 kubelet 了,kubelet 看到 volume in pod 以后就会调用 in-tree 的 csi plugin WaitForAttach,然后等待 Attach 成功,之后就会调用 daemonset 里面的 csi driver 的 NodePublishVolume 做挂载操作。

整体的流程是这样的,需要反复多看几遍 kubernetes-csi 的 document,加深理解。

kube-proxy 是一层代理,主要用于实现 Service 这个 Object。

Service 有一个 ClusterIP,这是一个虚拟 IP 应对 Service 后面的 pod,pod 的销毁和创建会有 ip 不固定的问题。除了能够屏蔽后端的 pod 的 IP 之外,Service 还可以有负载均衡的作用。

Endpoint 维护的是 Service 和 Pod 的映射关系,如果 pods 变动,Endpoint 也会变动。Service 本身指定义了 ClusterIP 和 port 的映射。Endpoints 记录了 pod 的 ip 和端口。

主要的访问方式是通过 NAT 实现的,如果使用 iptables 的话,就设置匹配目标端口以后,DNAT 转发到目的地址。

我看代码的时候,发现有个函数叫 birth cry 挺有意思的,表示打程序启动的一条 log。

Kube-proxy 主要监听两个对象,一个是 Service,一个是 Endpoint

这些 Objects 对应的处理函数有不同的实现,现在先看一下 iptables 的实现

当 Service 有更新的时候就会更新对应的 iptables 规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

glog.Info("Starting service config controller")
defer glog.Info("Shutting down service config controller")

if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) {
return
}

for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnServiceSynced()")
c.eventHandlers[i].OnServiceSynced()
}

<-stopCh
}

会调用 OnServiceSynced,然后具体调用实现者的方法。

Proxier

Netfilter

Netfilter 是内核模块的控制代码,ipvs 和 iptables 都是基于 Netfilter 实现的。

iptables

用 iptables 配置 DNAT ,然后再用概率模块做负载均衡。kube-proxy iptables 的实现主要依赖于 filter 和 nat。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Masquerade
-A KUBE-MARK-DROP -j MARK --set-xmark 0x8000/0x8000
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE

# clusterIP and publicIP
-A KUBE-SERVICES ! -s 10.244.0.0/16 -d 10.98.154.163/32 -p tcp -m comment --comment "default/nginx: cluster IP" -m tcp --dport 80 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -d 10.98.154.163/32 -p tcp -m comment --comment "default/nginx: cluster IP" -m tcp --dport 80 -j KUBE-SVC-4N57TFCL4MD7ZTDA
-A KUBE-SERVICES -d 12.12.12.12/32 -p tcp -m comment --comment "default/nginx: loadbalancer IP" -m tcp --dport 80 -j KUBE-FW-4N57TFCL4MD7ZTDA

# Masq for publicIP
-A KUBE-FW-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx: loadbalancer IP" -j KUBE-MARK-MASQ
-A KUBE-FW-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx: loadbalancer IP" -j KUBE-SVC-4N57TFCL4MD7ZTDA
-A KUBE-FW-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx: loadbalancer IP" -j KUBE-MARK-DROP

# Masq for nodePort
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/nginx:" -m tcp --dport 30938 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/nginx:" -m tcp --dport 30938 -j KUBE-SVC-4N57TFCL4MD7ZTDA

# load balance for each endpoints
-A KUBE-SVC-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx:" -m statistic --mode random --probability 0.33332999982 -j KUBE-SEP-UXHBWR5XIMVGXW3H
-A KUBE-SVC-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx:" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-TOYRWPNILILHH3OR
-A KUBE-SVC-4N57TFCL4MD7ZTDA -m comment --comment "default/nginx:" -j KUBE-SEP-6QCC2MHJZP35QQAR

# endpoint #1
-A KUBE-SEP-6QCC2MHJZP35QQAR -s 10.244.3.4/32 -m comment --comment "default/nginx:" -j KUBE-MARK-MASQ
-A KUBE-SEP-6QCC2MHJZP35QQAR -p tcp -m comment --comment "default/nginx:" -m tcp -j DNAT --to-destination 10.244.3.4:80

# endpoint #2
-A KUBE-SEP-TOYRWPNILILHH3OR -s 10.244.2.4/32 -m comment --comment "default/nginx:" -j KUBE-MARK-MASQ
-A KUBE-SEP-TOYRWPNILILHH3OR -p tcp -m comment --comment "default/nginx:" -m tcp -j DNAT --to-destination 10.244.2.4:80

# endpoint #3
-A KUBE-SEP-UXHBWR5XIMVGXW3H -s 10.244.1.2/32 -m comment --comment "default/nginx:" -j KUBE-MARK-MASQ
-A KUBE-SEP-UXHBWR5XIMVGXW3H -p tcp -m comment --comment "default/nginx:" -m tcp -j DNAT --to-destination 10.244.1.2:80

这是一个实例,首先通过 clusterIP 的 -d 匹配目的地址,还有 dport 匹配目的端口,转入对应的 SVC 链。

SVC 联通过 -m statistic 模块负载均衡到不同的 Endpoint 链上,每个 Endpoint 链又有一个 DNAT 到对应的 pod

的 IP:port 上,总的来说就是用 DNAT 实现 clusterIP 的代理,用 statistic 模块实现按概率的负载均衡(按概率进入不同的 Endpoint 链当中)。

ipvs

ipvs 有相对更好的性能,更复杂的负载均衡的算法以及健康状态检查等。

可以看到 kube-ipvs0 上的 10.102.128.4 对应的 Service 的 ClusterIP,而 RealServer 映射到了不同的 Endpoint 上。

iptables 的匹配是 hash 的,而不同根据一个链一个链的匹配,效率更高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# kubectl describe svc nginx-service
Name: nginx-service
...
Type: ClusterIP
IP: 10.102.128.4
Port: http 3080/TCP
Endpoints: 10.244.0.235:8080,10.244.1.237:8080
Session Affinity: None

# ip addr
...
73: kube-ipvs0: <BROADCAST,NOARP> mtu 1500 qdisc noop state DOWN qlen 1000
link/ether 1a:ce:f5:5f:c1:4d brd ff:ff:ff:ff:ff:ff
inet 10.102.128.4/32 scope global kube-ipvs0
valid_lft forever preferred_lft forever

# ipvsadm -ln
IP Virtual Server version 1.2.1 (size=4096)
Prot LocalAddress:Port Scheduler Flags
-> RemoteAddress:Port Forward Weight ActiveConn InActConn
TCP 10.102.128.4:3080 rr
-> 10.244.0.235:8080 Masq 1 0 0
-> 10.244.1.237:8080 Masq 1 0 0

glusterfs 主要有以下几个组件

  • gluster 命令行。
  • glusterd 管理进程。
  • glusterfsd 服务进程。
  • glusterfs 客户端 fuse 进程。

glusterfs 是靠 translator 做分层设计的,类似于可插拔的模块的概念,对应的结构体是 xlator_t,因为是 C 写的,一些面向的对象的写法也是通过 xlator 实现的,可以理解 xlator 是类,具体的实现是对象。比如 protocol/client 是最后的 xlator 从客户端发送到网络,对应的 storage/posix 是服务端接受的最后一层交给服务器上的文件系统。每个 xlator 都会编译成一个单独的 .so 文件。通过指定配置文件可以把不同的 xlator 进行组装,加载 .so 以后,通过dlsym 查找 xlator 的符号(函数表),相当于获取 xlator 的公开接口。配置文件中的 volumesubvolume 对应了 xlator 的组织关系。

volfile 会配置在 /var/lib/glusterd/vols/ 下面,对应的文件名是 volfile-id 选项指定的。xlator_t 定义在 libglusterfs/src/xlator.h 里面。下面这张图就可以看到各个 translator 的关系。

glusterfsd 最开始是 protocol/server 最后是 storage/posix 两个 xlator 作为开头和结束。

glusterfs (client) 是以protocol/client 结尾。

那个 write-behind 应该对应的是缓存策略里面的 write back,异步写,而不是直写。

这种设计的好处是方便扩展,你只要填补 xlator 的实现,编译成 .so ,放到链接目录下面,就可以加载进去,不用改整个 glusterfs 的代码。

glusterfs 的整个应用的结构也可以看一下。

用户态是 fuse 实现的文件系统,通过网络协议走到 server,server 本身用的是本地的文件系统,glusterfs 推荐使用 xfs。

下面举个详细的例子。

writev 举例,(write 的本质也是 writev,这里的 v 指的是内存向量,写的时候用链表表示的内存向量可以减少内存的拷贝,因为要分配一段连续的内存,然后拷到一起很没有效率,而且系统调用本身也支持这个系统调用,走到底层是硬件支持的,硬件不支持,也能 hook 帮你再拷贝到一起,但上层就不用管这些细节了),他们的调用关系通过 STACK_WINDSTACK_UNWIND 实现,对于所有的 xlator 的 op 都是这种调用关系,本身一层结束以后通过调用 STACK_WIND 调用下一层对应的 op,然后在调用完成之后通过 STACK_UNWIND 回调 op_cbk,并且这种调用关系是树状的。

这张图解释挺好的,说明了 xlator 向下传递的过程,通过 STACK_WIND 调用下一层,通过 STACK_UNWIND 调用上一层的 cbk。

glusterfsd 每个 volume 都会起一个线程来处理。

./xlators/mount/fuse/src/fuse-bridge.c 下面是 fuse 的接口,这是客户端的入口,作为文件系统的“桥接点”,大概是为什么叫 bridge 的原因吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static fuse_handler_t *fuse_std_ops[FUSE_OP_HIGH] = {
[FUSE_LOOKUP] = fuse_lookup,
[FUSE_FORGET] = fuse_forget,
[FUSE_GETATTR] = fuse_getattr,
[FUSE_SETATTR] = fuse_setattr,
[FUSE_READLINK] = fuse_readlink,
[FUSE_SYMLINK] = fuse_symlink,
[FUSE_MKNOD] = fuse_mknod,
[FUSE_MKDIR] = fuse_mkdir,
[FUSE_UNLINK] = fuse_unlink,
[FUSE_RMDIR] = fuse_rmdir,
[FUSE_RENAME] = fuse_rename,
[FUSE_LINK] = fuse_link,
[FUSE_OPEN] = fuse_open,
[FUSE_READ] = fuse_readv,
[FUSE_WRITE] = fuse_write,
[FUSE_STATFS] = fuse_statfs,
[FUSE_RELEASE] = fuse_release,
[FUSE_FSYNC] = fuse_fsync,

看一下 fuse_write,传了一个 xlator_t ,这个东西是 glusterfs 的分层设计的核心,每个 fuse_in_header_tmsg 还有 iobuf 都是 fuse 的 API。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void
fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg,
struct iobuf *iobuf)
{
/* WRITE is special, metadata is attached to in_header,
* and msg is the payload as-is.
*/
struct fuse_write_in *fwi = (struct fuse_write_in *)
(finh + 1);

fuse_state_t *state = NULL;
fd_t *fd = NULL;
#if FUSE_KERNEL_MINOR_VERSION >= 9
fuse_private_t *priv = NULL;
priv = this->private;
#endif

GET_STATE (this, finh, state);
fd = FH_TO_FD (fwi->fh);
state->fd = fd;
state->size = fwi->size;
state->off = fwi->offset;

/* lets ignore 'fwi->write_flags', but just consider 'fwi->flags' */
#if FUSE_KERNEL_MINOR_VERSION >= 9
state->io_flags = fwi->flags;
#else
state->io_flags = fwi->write_flags;
#endif
/* TODO: may need to handle below flag
(fwi->write_flags & FUSE_WRITE_CACHE);
*/


fuse_resolve_fd_init (state, &state->resolve, fd);

/* See comment by similar code in fuse_settatr */
#if FUSE_KERNEL_MINOR_VERSION >= 9
priv = this->private;
if (priv->proto_minor >= 9 && fwi->write_flags & FUSE_WRITE_LOCKOWNER)
state->lk_owner = fwi->lock_owner;
#endif

state->vector.iov_base = msg;
state->vector.iov_len = fwi->size;
state->iobuf = iobuf;

fuse_resolve_and_resume (state, fuse_write_resume);

return;
}

首先是传入了写入文件的描述符,要写的内存的段的地址和大小,iobuf 这个结构体和内核里的 iobuf 是查不多的。然后进到 fuse_resolve_and_resume,这个函数主要是解析一些文件的元数据,并且返回到 writev_resume 这个回调函数上,解析的函数在 ./xlators/mount/fuse/src/fuse-resolve.c 下面,这整个是个状态机的写法,主要是解析 fd,父路径,inode 等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int
fuse_resolve (fuse_state_t *state)
{
fuse_resolve_t *resolve = NULL;

resolve = state->resolve_now;

if (resolve->fd) {

fuse_resolve_fd (state);

} else if (!gf_uuid_is_null (resolve->pargfid)) {

fuse_resolve_parent (state);

} else if (!gf_uuid_is_null (resolve->gfid)) {

fuse_resolve_inode (state);

} else {
fuse_resolve_all (state);
}

return 0;
}

到了回到调 fuse_write_resume 上,包了一层引用计数就往下传了。

1
2
3
4
5
6
7
8
9
10
11
iobref_add (iobref, state->iobuf);

gf_log ("glusterfs-fuse", GF_LOG_TRACE,
"%"PRIu64": WRITE (%p, size=%"GF_PRI_SIZET", offset=%"PRId64")",
state->finh->unique, state->fd, state->size, state->off);

FUSE_FOP (state, fuse_writev_cbk, GF_FOP_WRITE, writev, state->fd,
&state->vector, 1, state->off, state->io_flags, iobref,
state->xdata);

iobref_unref (iobref);

fuse_writev_cbk 里面 send_fuse_objiobuf发送出去。send_fuse_datasend_fuse_iov

然后走整个的 xlator 的 writev 调用链,到 client 上,这里最主要的就是 submit_request 开始走网络 rpc 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int32_t
client3_3_writev (call_frame_t *frame, xlator_t *this, void *data)
{
clnt_args_t *args = NULL;
clnt_conf_t *conf = NULL;
gfs3_write_req req = {{0,},};
int op_errno = ESTALE;
int ret = 0;

if (!frame || !this || !data)
goto unwind;

args = data;
conf = this->private;

ret = client_pre_writev (this, &req, args->fd, args->size,
args->offset, args->flags, &args->xdata);

if (ret) {
op_errno = -ret;
goto unwind;
}

ret = client_fd_fop_prepare_local (frame, args->fd, req.fd);
if (ret) {
op_errno = -ret;
goto unwind;
}
ret = client_submit_request (this, &req, frame, conf->fops,
GFS3_OP_WRITE, client3_3_writev_cbk,
args->iobref, args->vector, args->count,
NULL, 0, NULL,
(xdrproc_t)xdr_gfs3_write_req);
if (ret) {
/*
* If the lower layers fail to submit a request, they'll also
* do the unwind for us (see rpc_clnt_submit), so don't unwind
* here in such cases.
*/
gf_msg (this->name, GF_LOG_WARNING, 0, PC_MSG_FOP_SEND_FAILED,
"failed to send the fop");
}

GF_FREE (req.xdata.xdata_val);

return 0;

unwind:
CLIENT_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);
GF_FREE (req.xdata.xdata_val);

return 0;
}

回调就会开始释放内存等等操作,这里就略过了,这里回到 fuse 的 writev_cbk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static int
fuse_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *stbuf, struct iatt *postbuf, dict_t *xdata)
{
fuse_state_t *state = NULL;
fuse_in_header_t *finh = NULL;
struct fuse_write_out fwo = {0, };

state = frame->root->state;
finh = state->finh;

fuse_log_eh_fop(this, state, frame, op_ret, op_errno);

if (op_ret >= 0) {
gf_log ("glusterfs-fuse", GF_LOG_TRACE,
"%"PRIu64": WRITE => %d/%"GF_PRI_SIZET",%"PRId64"/%"PRIu64,
frame->root->unique,
op_ret, state->size, state->off, stbuf->ia_size);

fwo.size = op_ret;
send_fuse_obj (this, finh, &fwo);
} else {
gf_log ("glusterfs-fuse", GF_LOG_WARNING,
"%"PRIu64": WRITE => -1 gfid=%s fd=%p (%s)",
frame->root->unique,
(state->fd && state->fd->inode) ?
uuid_utoa (state->fd->inode->gfid) : "nil", state->fd,
strerror (op_errno));

send_fuse_err (this, finh, op_errno);
}

free_fuse_state (state);
STACK_DESTROY (frame->root);

return 0;
}

send_fuse_obj 是一个宏实际上是 send_fuse_data,把 fuse_write_out 传给 fuse 告诉 fuse 写出了多少,或者返回错误给 fuse。

1
2
#define send_fuse_obj(this, finh, obj) \
send_fuse_data (this, finh, obj, sizeof (*(obj)))

整个的分析过程大概是从客户端的角度来看的,glusterfs 比较重要的一个 xlator 就是 dht xlator,distributed hash table,这个 xlator 决定了文件的分布。

Tensor 的用户 API 可以参考这里,这里做一下简单介绍。Tensor 是各种维度的向量和矩阵的统称,分 Tensor 和 SparseTensor。和 Tensor 不同,SparseTensor 存的是值以及值对应的 index,而 Tensor 存的是完整的矩阵。

举个例子。

1
2
3
4
5
6
7
8
import tensorflow as tf
a = tf.constant([1, 1])
b = tf.constant([2, 2])
c = tf.add(a, b)
sess = tf.InteractiveSession()
print("a[0]=%s, a[1]=%s" % (a[0].eval(), a[1].eval()))
print("c = %s" % c.eval())
sess.close()

输出对应如下。

1
2
a[0]=1, a[1]=1
c = [3 3]

Tensor 只有 eval 以后才能获得结果,是懒计算的。

Tensor 的实现

Tensor (tensorflow/tensorflow/core/framework/tensor.h) 依赖 TensorShape(tensorflow/tensorflow/tensorflow/core/framework/tensor_shape.h) 和 TensorBuffer (tensorflow/tensorflow/core/framework/tensor.h) 两个成员。

TensorShape 主要负责记录张量的形状。

TensorBuffer 主要负责管理 Tensor 的内存,TensorBuffer 继承自 RefCounted (tensorflow/tensorflow/core/lib/core/refcount.h),具有引用计数的功能,用于对内存进行管理。

1
2
3
4
5
// Interface to access the raw ref-counted data buffer.
class TensorBuffer : public core::RefCounted {
public:
~TensorBuffer() override {}
...

他们的对应的关系如下。

Tensor 从下往上看,其实就是一个带”形状“的内存,和 NumPy 的数组是差不多的。

OpKernel

对于一个线性回归来说,是最简单也最好理解的模型,方便分析底层的代码实现。

$$ Y=XW+b $$

损失函数用平方差定义的,优化器是提督下降,这样一个模型可以用一下的 Python 代码实现,这个代码片段是截取的,如果要完整运行这个例子可以在这里复现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import tensorflow as tf
sess = tf.InteractiveSession()
x = tf.placeholder(tf.float32,[None, x_train.shape[1]])
y = tf.placeholder(tf.float32,[None, 1])
w = tf.Variable(tf.zeros([x_train.shape[1],1]))
b = tf.Variable(tf.zeros([1])) # placeholder 不用加 None
pred = tf.add(tf.matmul(x, w), b)

init = tf.global_variables_initializer()
cost = tf.reduce_mean(tf.square(y - pred))
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost)
epochs = 3000

init.run()

for epoch in range(0, epochs):
optimizer.run(feed_dict={x:x_train, y:y_train})
c = cost.eval(feed_dict = {x:x_train,y:y_train})
if epoch%100 == 0:
print_percentage(int(c*100))

print('\nEpoch: {0}, Error: {1}'.format(epoch+1, c))

b_value = b.eval()
w_value = w.eval()

# Predicted Labels
y_pred = pred.eval(feed_dict={x: x_test})

# Mean Squared Error
mse = tf.reduce_mean(tf.square(y_pred - y_test))
print("MSE", mse.eval())
sess.close()

对应的训练结果。

1
2
Epoch: 3000, Error: 0.25882452726364136
MSE 0.30620116674806463

可以看到,线性回归的模型主要依赖的两个 Operation 分别是 tf.addtf.matmul,其他的复杂模型也是类似的逻辑,对应的 OpKernel 分别是 AddOpMatMulOp,这里可以看一下具体的实现。

如果有源代码,可以连着源代码用 bazel 编译,可以参照这里自己编写一个 Op。

MatMulOp 的实现在 /tensorflow/core/kernels/matmul_op.cc 下面,定义在tensorflow/tensorflow/core/ops/math_ops.cc 下面。AddOp/tensorflow/core/kernels/matmul_op.cc,实现在 /tensorflow/core/kernels/cwise_op_add_1.cc 下面,依赖 /tensorflow/tensorflow/core/kernels/cwise_ops_common.hcommon 的定义。

Add 用的是 Eigenadd /tensorflow/tensorflow/core/kernels/cwise_ops.h,依赖third_party/Eigen/src/Core/functors/BinaryFunctors.h

举个例子,看一下 MatMulOpMatMulOp 的构造函数里面有一个 OpKernelConstruction 可以初始化 OpKernel,通过 OpKernel 可以获得这个 Op 的参数比如transpose_a 等等。

1
2
3
4
5
6
7
8
9
10
11
12
template <typename Device, typename T, bool USE_CUBLAS>
class MatMulOp : public OpKernel {
public:
explicit MatMulOp(OpKernelConstruction* ctx)
: OpKernel(ctx), algorithms_set_already_(false) { // 在执行构造函数之前,执行两个成员的构造函数
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_a", &transpose_a_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_b", &transpose_b_));

LaunchMatMul<Device, T, USE_CUBLAS>::GetBlasGemmAlgorithm(
ctx, &algorithms_, &algorithms_set_already_);
use_autotune_ = MatmulAutotuneEnable();
}

每个 OpKernel 都要实现一个 Compute 函数,可以看到这个 Compute 函数首先检查了两个 Tensor 是否是矩阵,然后检查两个矩阵的形状是否符合矩阵相乘的条件,然后根据形状分配 TensorShape 并且根据 TensorShape 分配新的 Tensor (其实顺便分配的 TensorBuffer 的内存空间)。然后通过 LaunchMatMul 真正执行相乘操作,因为这个计算过程,可能是用了 GPU,所以模版是带 Device 的(GPUDevice/CPUDevice)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void Compute(OpKernelContext* ctx) override {
const Tensor& a = ctx->input(0);
const Tensor& b = ctx->input(1);

// Check that the dimensions of the two matrices are valid.
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(a.shape()),
errors::InvalidArgument("In[0] is not a matrix"));
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(b.shape()),
errors::InvalidArgument("In[1] is not a matrix"));
Eigen::array<Eigen::IndexPair<Eigen::DenseIndex>, 1> dim_pair;
dim_pair[0].first = transpose_a_ ? 0 : 1;
dim_pair[0].second = transpose_b_ ? 1 : 0;

OP_REQUIRES(
ctx, a.dim_size(dim_pair[0].first) == b.dim_size(dim_pair[0].second),
errors::InvalidArgument(
"Matrix size-incompatible: In[0]: ", a.shape().DebugString(),
", In[1]: ", b.shape().DebugString()));
int a_dim_remaining = 1 - dim_pair[0].first;
int b_dim_remaining = 1 - dim_pair[0].second;
TensorShape out_shape(
{a.dim_size(a_dim_remaining), b.dim_size(b_dim_remaining)});
Tensor* out = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(0, out_shape, &out));

if (out->NumElements() == 0) {
// If a has shape [0, x] or b has shape [x, 0], the output shape
// is a 0-element matrix, so there is nothing to do.
return;
}

if (a.NumElements() == 0 || b.NumElements() == 0) {
// If a has shape [x, 0] and b has shape [0, y], the
// output shape is [x, y] where x and y are non-zero, so we fill
// the output with zeros.
functor::SetZeroFunctor<Device, T> f;
f(ctx->eigen_device<Device>(), out->flat<T>());
return;
}

LaunchMatMul<Device, T, USE_CUBLAS>::launch(
ctx, a, b, dim_pair, &algorithms_, use_autotune_, out);
}

LaunchMatMul 继承自 LaunchMatMulBase,在 LaunchMatMulBase 当中调用了 functor::MatMulFunctor,这个 functor 主要就会执行乘法操作,在这之前会检查一下是否其中一个元素是 vector,这样可以直接优化算出来,而不用 Eigen 库来算,这样更快,这个目前看到的是 CPU 的路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template <typename Device, typename T>
struct LaunchMatMulBase {
#if GOOGLE_CUDA
typedef se::blas::AlgorithmType AlgorithmType;
#else
typedef int64 AlgorithmType;
#endif // GOOGLE_CUDA

static void launch(
OpKernelContext* ctx, const Tensor& a, const Tensor& b,
const Eigen::array<Eigen::IndexPair<Eigen::DenseIndex>, 1>& dim_pair,
std::vector<AlgorithmType>* algorithms, bool use_aututone, Tensor* out) {
#ifndef TENSORFLOW_USE_SYCL
// An explicit vector-matrix multiply is much better optimized than an
// implicit one and this is a bottleneck during non-batched inference.
bool was_vector = ExplicitVectorMatrixOptimization<T>(a, b, dim_pair, out);
if (!was_vector) {
#endif // TENSORFLOW_USE_SYCL
functor::MatMulFunctor<Device, T>()(ctx->eigen_device<Device>(),
out->matrix<T>(), a.matrix<T>(),
b.matrix<T>(), dim_pair);
#ifndef TENSORFLOW_USE_SYCL
}
#endif // TENSORFLOW_USE_SYCL
}

static void GetBlasGemmAlgorithm(OpKernelConstruction* ctx,
std::vector<int64>* algorithms,
bool* algorithm_set_flag) {}
};

MatMulFunctor 在设备 d 上计算矩阵相乘的结果,其中调用的是 MatMul<CPUDevice>

1
2
3
4
5
6
template <typename Device, typename In0, typename In1, typename Out,
typename DimPair>
void MatMul(const Device& d, Out out, In0 in0, In1 in1,
const DimPair& dim_pair) {
out.device(d) = in0.contract(in1, dim_pair);
}

这里的 contract 调用的是 TensorContractionOp (third_party/unsupported/Eigen/CXX11/src/Tensor/TensorContraction.h),跟之前说的一样,这个 Op 是计算图的一部分,要通过 eval 来做计算,计算结果是 eval 驱动的。TensorContractionOp 的构造函数就是,这负责构建左表达式和右表达式。

1
2
3
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorContractionOp(
const LhsXprType& lhs, const RhsXprType& rhs, const Indices& dims)
: m_lhs_xpr(lhs), m_rhs_xpr(rhs), m_indices(dims) {}

真正的计算过程在 TensorContractionEvaluatorBase 里面,真正执行计算过程,计算细节就省略了主要是矩阵相乘。

CUDA

如果条件编译 GOOGLE_CUDA 的话,会使用 GPU 的代码,对应会调用到 steam executor,这个以后具体分析。

总结

Tensorflow 基于图模型的,并且是懒计算的,通过扩展可以自己用 C++ 实现新的 Op,并且也可以观察默认自带的 OpKernel 是如何实现的,对于理解 Tensorflow 的工作流程会有很大的帮助。Tensorflow 本身依赖了 Eigen,CUDA 等线性代数库或者 GPU 计算库,要看懂代码还是要多学一点线代的知识,比如 Contraction 这个概念我也是第一次晓得。

参考文献

  1. 深入理解 Tensorflow 架构设计与原理实现