ggaaooppeenngg

为什么计算机科学是无限的但生命是有限的

glusterfs writev 的主要流程单分析

glusterfs 主要有以下几个组件

  • gluster 命令行。
  • glusterd 管理进程。
  • glusterfsd 服务进程。
  • glusterfs 客户端 fuse 进程。

glusterfs 是靠 translator 做分层设计的,类似于可插拔的模块的概念,对应的结构体是 xlator_t,因为是 C 写的,一些面向的对象的写法也是通过 xlator 实现的,可以理解 xlator 是类,具体的实现是对象。比如 protocol/client 是最后的 xlator 从客户端发送到网络,对应的 storage/posix 是服务端接受的最后一层交给服务器上的文件系统。每个 xlator 都会编译成一个单独的 .so 文件。通过指定配置文件可以把不同的 xlator 进行组装,加载 .so 以后,通过dlsym 查找 xlator 的符号(函数表),相当于获取 xlator 的公开接口。配置文件中的 volumesubvolume 对应了 xlator 的组织关系。

volfile 会配置在 /var/lib/glusterd/vols/ 下面,对应的文件名是 volfile-id 选项指定的。xlator_t 定义在 libglusterfs/src/xlator.h 里面。下面这张图就可以看到各个 translator 的关系。

glusterfsd 最开始是 protocol/server 最后是 storage/posix 两个 xlator 作为开头和结束。

glusterfs (client) 是以protocol/client 结尾。

那个 write-behind 应该对应的是缓存策略里面的 write back,异步写,而不是直写。

这种设计的好处是方便扩展,你只要填补 xlator 的实现,编译成 .so ,放到链接目录下面,就可以加载进去,不用改整个 glusterfs 的代码。

glusterfs 的整个应用的结构也可以看一下。

用户态是 fuse 实现的文件系统,通过网络协议走到 server,server 本身用的是本地的文件系统,glusterfs 推荐使用 xfs。

下面举个详细的例子。

writev 举例,(write 的本质也是 writev,这里的 v 指的是内存向量,写的时候用链表表示的内存向量可以减少内存的拷贝,因为要分配一段连续的内存,然后拷到一起很没有效率,而且系统调用本身也支持这个系统调用,走到底层是硬件支持的,硬件不支持,也能 hook 帮你再拷贝到一起,但上层就不用管这些细节了),他们的调用关系通过 STACK_WINDSTACK_UNWIND 实现,对于所有的 xlator 的 op 都是这种调用关系,本身一层结束以后通过调用 STACK_WIND 调用下一层对应的 op,然后在调用完成之后通过 STACK_UNWIND 回调 op_cbk,并且这种调用关系是树状的。

这张图解释挺好的,说明了 xlator 向下传递的过程,通过 STACK_WIND 调用下一层,通过 STACK_UNWIND 调用上一层的 cbk。

glusterfsd 每个 volume 都会起一个线程来处理。

./xlators/mount/fuse/src/fuse-bridge.c 下面是 fuse 的接口,这是客户端的入口,作为文件系统的“桥接点”,大概是为什么叫 bridge 的原因吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static fuse_handler_t *fuse_std_ops[FUSE_OP_HIGH] = {
[FUSE_LOOKUP] = fuse_lookup,
[FUSE_FORGET] = fuse_forget,
[FUSE_GETATTR] = fuse_getattr,
[FUSE_SETATTR] = fuse_setattr,
[FUSE_READLINK] = fuse_readlink,
[FUSE_SYMLINK] = fuse_symlink,
[FUSE_MKNOD] = fuse_mknod,
[FUSE_MKDIR] = fuse_mkdir,
[FUSE_UNLINK] = fuse_unlink,
[FUSE_RMDIR] = fuse_rmdir,
[FUSE_RENAME] = fuse_rename,
[FUSE_LINK] = fuse_link,
[FUSE_OPEN] = fuse_open,
[FUSE_READ] = fuse_readv,
[FUSE_WRITE] = fuse_write,
[FUSE_STATFS] = fuse_statfs,
[FUSE_RELEASE] = fuse_release,
[FUSE_FSYNC] = fuse_fsync,

看一下 fuse_write,传了一个 xlator_t ,这个东西是 glusterfs 的分层设计的核心,每个 fuse_in_header_tmsg 还有 iobuf 都是 fuse 的 API。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void
fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg,
struct iobuf *iobuf)
{
/* WRITE is special, metadata is attached to in_header,
* and msg is the payload as-is.
*/
struct fuse_write_in *fwi = (struct fuse_write_in *)
(finh + 1);

fuse_state_t *state = NULL;
fd_t *fd = NULL;
#if FUSE_KERNEL_MINOR_VERSION >= 9
fuse_private_t *priv = NULL;
priv = this->private;
#endif

GET_STATE (this, finh, state);
fd = FH_TO_FD (fwi->fh);
state->fd = fd;
state->size = fwi->size;
state->off = fwi->offset;

/* lets ignore 'fwi->write_flags', but just consider 'fwi->flags' */
#if FUSE_KERNEL_MINOR_VERSION >= 9
state->io_flags = fwi->flags;
#else
state->io_flags = fwi->write_flags;
#endif
/* TODO: may need to handle below flag
(fwi->write_flags & FUSE_WRITE_CACHE);
*/


fuse_resolve_fd_init (state, &state->resolve, fd);

/* See comment by similar code in fuse_settatr */
#if FUSE_KERNEL_MINOR_VERSION >= 9
priv = this->private;
if (priv->proto_minor >= 9 && fwi->write_flags & FUSE_WRITE_LOCKOWNER)
state->lk_owner = fwi->lock_owner;
#endif

state->vector.iov_base = msg;
state->vector.iov_len = fwi->size;
state->iobuf = iobuf;

fuse_resolve_and_resume (state, fuse_write_resume);

return;
}

首先是传入了写入文件的描述符,要写的内存的段的地址和大小,iobuf 这个结构体和内核里的 iobuf 是查不多的。然后进到 fuse_resolve_and_resume,这个函数主要是解析一些文件的元数据,并且返回到 writev_resume 这个回调函数上,解析的函数在 ./xlators/mount/fuse/src/fuse-resolve.c 下面,这整个是个状态机的写法,主要是解析 fd,父路径,inode 等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int
fuse_resolve (fuse_state_t *state)
{
fuse_resolve_t *resolve = NULL;

resolve = state->resolve_now;

if (resolve->fd) {

fuse_resolve_fd (state);

} else if (!gf_uuid_is_null (resolve->pargfid)) {

fuse_resolve_parent (state);

} else if (!gf_uuid_is_null (resolve->gfid)) {

fuse_resolve_inode (state);

} else {
fuse_resolve_all (state);
}

return 0;
}

到了回到调 fuse_write_resume 上,包了一层引用计数就往下传了。

1
2
3
4
5
6
7
8
9
10
11
iobref_add (iobref, state->iobuf);

gf_log ("glusterfs-fuse", GF_LOG_TRACE,
"%"PRIu64": WRITE (%p, size=%"GF_PRI_SIZET", offset=%"PRId64")",
state->finh->unique, state->fd, state->size, state->off);

FUSE_FOP (state, fuse_writev_cbk, GF_FOP_WRITE, writev, state->fd,
&state->vector, 1, state->off, state->io_flags, iobref,
state->xdata);

iobref_unref (iobref);

fuse_writev_cbk 里面 send_fuse_objiobuf发送出去。send_fuse_datasend_fuse_iov

然后走整个的 xlator 的 writev 调用链,到 client 上,这里最主要的就是 submit_request 开始走网络 rpc 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int32_t
client3_3_writev (call_frame_t *frame, xlator_t *this, void *data)
{
clnt_args_t *args = NULL;
clnt_conf_t *conf = NULL;
gfs3_write_req req = {{0,},};
int op_errno = ESTALE;
int ret = 0;

if (!frame || !this || !data)
goto unwind;

args = data;
conf = this->private;

ret = client_pre_writev (this, &req, args->fd, args->size,
args->offset, args->flags, &args->xdata);

if (ret) {
op_errno = -ret;
goto unwind;
}

ret = client_fd_fop_prepare_local (frame, args->fd, req.fd);
if (ret) {
op_errno = -ret;
goto unwind;
}
ret = client_submit_request (this, &req, frame, conf->fops,
GFS3_OP_WRITE, client3_3_writev_cbk,
args->iobref, args->vector, args->count,
NULL, 0, NULL,
(xdrproc_t)xdr_gfs3_write_req);
if (ret) {
/*
* If the lower layers fail to submit a request, they'll also
* do the unwind for us (see rpc_clnt_submit), so don't unwind
* here in such cases.
*/
gf_msg (this->name, GF_LOG_WARNING, 0, PC_MSG_FOP_SEND_FAILED,
"failed to send the fop");
}

GF_FREE (req.xdata.xdata_val);

return 0;

unwind:
CLIENT_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);
GF_FREE (req.xdata.xdata_val);

return 0;
}

回调就会开始释放内存等等操作,这里就略过了,这里回到 fuse 的 writev_cbk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static int
fuse_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *stbuf, struct iatt *postbuf, dict_t *xdata)
{
fuse_state_t *state = NULL;
fuse_in_header_t *finh = NULL;
struct fuse_write_out fwo = {0, };

state = frame->root->state;
finh = state->finh;

fuse_log_eh_fop(this, state, frame, op_ret, op_errno);

if (op_ret >= 0) {
gf_log ("glusterfs-fuse", GF_LOG_TRACE,
"%"PRIu64": WRITE => %d/%"GF_PRI_SIZET",%"PRId64"/%"PRIu64,
frame->root->unique,
op_ret, state->size, state->off, stbuf->ia_size);

fwo.size = op_ret;
send_fuse_obj (this, finh, &fwo);
} else {
gf_log ("glusterfs-fuse", GF_LOG_WARNING,
"%"PRIu64": WRITE => -1 gfid=%s fd=%p (%s)",
frame->root->unique,
(state->fd && state->fd->inode) ?
uuid_utoa (state->fd->inode->gfid) : "nil", state->fd,
strerror (op_errno));

send_fuse_err (this, finh, op_errno);
}

free_fuse_state (state);
STACK_DESTROY (frame->root);

return 0;
}

send_fuse_obj 是一个宏实际上是 send_fuse_data,把 fuse_write_out 传给 fuse 告诉 fuse 写出了多少,或者返回错误给 fuse。

1
2
#define send_fuse_obj(this, finh, obj) \
send_fuse_data (this, finh, obj, sizeof (*(obj)))

整个的分析过程大概是从客户端的角度来看的,glusterfs 比较重要的一个 xlator 就是 dht xlator,distributed hash table,这个 xlator 决定了文件的分布。