ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

Tensorflow 的 Tensor 和 OpKernel 分析

Tensor 的用户 API 可以参考这里,这里做一下简单介绍。Tensor 是各种维度的向量和矩阵的统称,分 Tensor 和 SparseTensor。和 Tensor 不同,SparseTensor 存的是值以及值对应的 index,而 Tensor 存的是完整的矩阵。

举个例子。

1
2
3
4
5
6
7
8
import tensorflow as tf
a = tf.constant([1, 1])
b = tf.constant([2, 2])
c = tf.add(a, b)
sess = tf.InteractiveSession()
print("a[0]=%s, a[1]=%s" % (a[0].eval(), a[1].eval()))
print("c = %s" % c.eval())
sess.close()

输出对应如下。

1
2
a[0]=1, a[1]=1
c = [3 3]

Tensor 只有 eval 以后才能获得结果,是懒计算的。

Tensor 的实现

Tensor (tensorflow/tensorflow/core/framework/tensor.h) 依赖 TensorShape(tensorflow/tensorflow/tensorflow/core/framework/tensor_shape.h) 和 TensorBuffer (tensorflow/tensorflow/core/framework/tensor.h) 两个成员。

TensorShape 主要负责记录张量的形状。

TensorBuffer 主要负责管理 Tensor 的内存,TensorBuffer 继承自 RefCounted (tensorflow/tensorflow/core/lib/core/refcount.h),具有引用计数的功能,用于对内存进行管理。

1
2
3
4
5
// Interface to access the raw ref-counted data buffer.
class TensorBuffer : public core::RefCounted {
public:
~TensorBuffer() override {}
...

他们的对应的关系如下。

Tensor 从下往上看,其实就是一个带”形状“的内存,和 NumPy 的数组是差不多的。

OpKernel

对于一个线性回归来说,是最简单也最好理解的模型,方便分析底层的代码实现。

$$ Y=XW+b $$

损失函数用平方差定义的,优化器是提督下降,这样一个模型可以用一下的 Python 代码实现,这个代码片段是截取的,如果要完整运行这个例子可以在这里复现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import tensorflow as tf
sess = tf.InteractiveSession()
x = tf.placeholder(tf.float32,[None, x_train.shape[1]])
y = tf.placeholder(tf.float32,[None, 1])
w = tf.Variable(tf.zeros([x_train.shape[1],1]))
b = tf.Variable(tf.zeros([1])) # placeholder 不用加 None
pred = tf.add(tf.matmul(x, w), b)

init = tf.global_variables_initializer()
cost = tf.reduce_mean(tf.square(y - pred))
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost)
epochs = 3000

init.run()

for epoch in range(0, epochs):
optimizer.run(feed_dict={x:x_train, y:y_train})
c = cost.eval(feed_dict = {x:x_train,y:y_train})
if epoch%100 == 0:
print_percentage(int(c*100))

print('\nEpoch: {0}, Error: {1}'.format(epoch+1, c))

b_value = b.eval()
w_value = w.eval()

# Predicted Labels
y_pred = pred.eval(feed_dict={x: x_test})

# Mean Squared Error
mse = tf.reduce_mean(tf.square(y_pred - y_test))
print("MSE", mse.eval())
sess.close()

对应的训练结果。

1
2
Epoch: 3000, Error: 0.25882452726364136
MSE 0.30620116674806463

可以看到,线性回归的模型主要依赖的两个 Operation 分别是 tf.addtf.matmul,其他的复杂模型也是类似的逻辑,对应的 OpKernel 分别是 AddOpMatMulOp,这里可以看一下具体的实现。

如果有源代码,可以连着源代码用 bazel 编译,可以参照这里自己编写一个 Op。

MatMulOp 的实现在 /tensorflow/core/kernels/matmul_op.cc 下面,定义在tensorflow/tensorflow/core/ops/math_ops.cc 下面。AddOp/tensorflow/core/kernels/matmul_op.cc,实现在 /tensorflow/core/kernels/cwise_op_add_1.cc 下面,依赖 /tensorflow/tensorflow/core/kernels/cwise_ops_common.hcommon 的定义。

Add 用的是 Eigenadd /tensorflow/tensorflow/core/kernels/cwise_ops.h,依赖third_party/Eigen/src/Core/functors/BinaryFunctors.h

举个例子,看一下 MatMulOpMatMulOp 的构造函数里面有一个 OpKernelConstruction 可以初始化 OpKernel,通过 OpKernel 可以获得这个 Op 的参数比如transpose_a 等等。

1
2
3
4
5
6
7
8
9
10
11
12
template <typename Device, typename T, bool USE_CUBLAS>
class MatMulOp : public OpKernel {
public:
explicit MatMulOp(OpKernelConstruction* ctx)
: OpKernel(ctx), algorithms_set_already_(false) { // 在执行构造函数之前,执行两个成员的构造函数
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_a", &transpose_a_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_b", &transpose_b_));

LaunchMatMul<Device, T, USE_CUBLAS>::GetBlasGemmAlgorithm(
ctx, &algorithms_, &algorithms_set_already_);
use_autotune_ = MatmulAutotuneEnable();
}

每个 OpKernel 都要实现一个 Compute 函数,可以看到这个 Compute 函数首先检查了两个 Tensor 是否是矩阵,然后检查两个矩阵的形状是否符合矩阵相乘的条件,然后根据形状分配 TensorShape 并且根据 TensorShape 分配新的 Tensor (其实顺便分配的 TensorBuffer 的内存空间)。然后通过 LaunchMatMul 真正执行相乘操作,因为这个计算过程,可能是用了 GPU,所以模版是带 Device 的(GPUDevice/CPUDevice)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void Compute(OpKernelContext* ctx) override {
const Tensor& a = ctx->input(0);
const Tensor& b = ctx->input(1);

// Check that the dimensions of the two matrices are valid.
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(a.shape()),
errors::InvalidArgument("In[0] is not a matrix"));
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(b.shape()),
errors::InvalidArgument("In[1] is not a matrix"));
Eigen::array<Eigen::IndexPair<Eigen::DenseIndex>, 1> dim_pair;
dim_pair[0].first = transpose_a_ ? 0 : 1;
dim_pair[0].second = transpose_b_ ? 1 : 0;

OP_REQUIRES(
ctx, a.dim_size(dim_pair[0].first) == b.dim_size(dim_pair[0].second),
errors::InvalidArgument(
"Matrix size-incompatible: In[0]: ", a.shape().DebugString(),
", In[1]: ", b.shape().DebugString()));
int a_dim_remaining = 1 - dim_pair[0].first;
int b_dim_remaining = 1 - dim_pair[0].second;
TensorShape out_shape(
{a.dim_size(a_dim_remaining), b.dim_size(b_dim_remaining)});
Tensor* out = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(0, out_shape, &out));

if (out->NumElements() == 0) {
// If a has shape [0, x] or b has shape [x, 0], the output shape
// is a 0-element matrix, so there is nothing to do.
return;
}

if (a.NumElements() == 0 || b.NumElements() == 0) {
// If a has shape [x, 0] and b has shape [0, y], the
// output shape is [x, y] where x and y are non-zero, so we fill
// the output with zeros.
functor::SetZeroFunctor<Device, T> f;
f(ctx->eigen_device<Device>(), out->flat<T>());
return;
}

LaunchMatMul<Device, T, USE_CUBLAS>::launch(
ctx, a, b, dim_pair, &algorithms_, use_autotune_, out);
}

LaunchMatMul 继承自 LaunchMatMulBase,在 LaunchMatMulBase 当中调用了 functor::MatMulFunctor,这个 functor 主要就会执行乘法操作,在这之前会检查一下是否其中一个元素是 vector,这样可以直接优化算出来,而不用 Eigen 库来算,这样更快,这个目前看到的是 CPU 的路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template <typename Device, typename T>
struct LaunchMatMulBase {
#if GOOGLE_CUDA
typedef se::blas::AlgorithmType AlgorithmType;
#else
typedef int64 AlgorithmType;
#endif // GOOGLE_CUDA

static void launch(
OpKernelContext* ctx, const Tensor& a, const Tensor& b,
const Eigen::array<Eigen::IndexPair<Eigen::DenseIndex>, 1>& dim_pair,
std::vector<AlgorithmType>* algorithms, bool use_aututone, Tensor* out) {
#ifndef TENSORFLOW_USE_SYCL
// An explicit vector-matrix multiply is much better optimized than an
// implicit one and this is a bottleneck during non-batched inference.
bool was_vector = ExplicitVectorMatrixOptimization<T>(a, b, dim_pair, out);
if (!was_vector) {
#endif // TENSORFLOW_USE_SYCL
functor::MatMulFunctor<Device, T>()(ctx->eigen_device<Device>(),
out->matrix<T>(), a.matrix<T>(),
b.matrix<T>(), dim_pair);
#ifndef TENSORFLOW_USE_SYCL
}
#endif // TENSORFLOW_USE_SYCL
}

static void GetBlasGemmAlgorithm(OpKernelConstruction* ctx,
std::vector<int64>* algorithms,
bool* algorithm_set_flag) {}
};

MatMulFunctor 在设备 d 上计算矩阵相乘的结果,其中调用的是 MatMul<CPUDevice>

1
2
3
4
5
6
template <typename Device, typename In0, typename In1, typename Out,
typename DimPair>
void MatMul(const Device& d, Out out, In0 in0, In1 in1,
const DimPair& dim_pair) {
out.device(d) = in0.contract(in1, dim_pair);
}

这里的 contract 调用的是 TensorContractionOp (third_party/unsupported/Eigen/CXX11/src/Tensor/TensorContraction.h),跟之前说的一样,这个 Op 是计算图的一部分,要通过 eval 来做计算,计算结果是 eval 驱动的。TensorContractionOp 的构造函数就是,这负责构建左表达式和右表达式。

1
2
3
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorContractionOp(
const LhsXprType& lhs, const RhsXprType& rhs, const Indices& dims)
: m_lhs_xpr(lhs), m_rhs_xpr(rhs), m_indices(dims) {}

真正的计算过程在 TensorContractionEvaluatorBase 里面,真正执行计算过程,计算细节就省略了主要是矩阵相乘。

CUDA

如果条件编译 GOOGLE_CUDA 的话,会使用 GPU 的代码,对应会调用到 steam executor,这个以后具体分析。

总结

Tensorflow 基于图模型的,并且是懒计算的,通过扩展可以自己用 C++ 实现新的 Op,并且也可以观察默认自带的 OpKernel 是如何实现的,对于理解 Tensorflow 的工作流程会有很大的帮助。Tensorflow 本身依赖了 Eigen,CUDA 等线性代数库或者 GPU 计算库,要看懂代码还是要多学一点线代的知识,比如 Contraction 这个概念我也是第一次晓得。

参考文献

  1. 深入理解 Tensorflow 架构设计与原理实现