kcc: Add corresponding methods for repsTo
[Samba.git] / source3 / modules / vfs_ceph.c
blob8e11dab852ecaf2e0185d670234563d30452725a
1 /*
2 Unix SMB/CIFS implementation.
3 Wrap disk only vfs functions to sidestep dodgy compilers.
4 Copyright (C) Tim Potter 1998
5 Copyright (C) Jeremy Allison 2007
6 Copyright (C) Brian Chrisman 2011 <bchrisman@gmail.com>
7 Copyright (C) Richard Sharpe 2011 <realrichardsharpe@gmail.com>
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 * This VFS only works with the libceph.so user-space client. It is not needed
25 * if you are using the kernel client or the FUSE client.
27 * Add the following smb.conf parameter to each share that will be hosted on
28 * Ceph:
30 * vfs objects = ceph [any others you need go here]
33 #include "includes.h"
34 #include "smbd/smbd.h"
35 #include <dirent.h>
36 #include <sys/statvfs.h>
37 #include "cephfs/libcephfs.h"
38 #include "smbprofile.h"
39 #include "modules/posixacl_xattr.h"
41 #undef DBGC_CLASS
42 #define DBGC_CLASS DBGC_VFS
44 #ifndef LIBCEPHFS_VERSION
45 #define LIBCEPHFS_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra)
46 #define LIBCEPHFS_VERSION_CODE LIBCEPHFS_VERSION(0, 0, 0)
47 #endif
50 * Use %llu whenever we have a 64bit unsigned int, and cast to (long long unsigned)
52 #define llu(_var) ((long long unsigned)_var)
55 * Note, libceph's return code model is to return -errno! So we have to convert
56 * to what Samba expects, with is set errno to -return and return -1
58 #define WRAP_RETURN(_res) \
59 errno = 0; \
60 if (_res < 0) { \
61 errno = -_res; \
62 return -1; \
63 } \
64 return _res \
67 * We mount only one file system and then all shares are assumed to be in that.
68 * FIXME: If we want to support more than one FS, then we have to deal with
69 * this differently.
71 * So, cmount tells us if we have been this way before and whether
72 * we need to mount ceph and cmount_cnt tells us how many times we have
73 * connected
75 static struct ceph_mount_info * cmount = NULL;
76 static uint32_t cmount_cnt = 0;
78 /* Check for NULL pointer parameters in cephwrap_* functions */
80 /* We don't want to have NULL function pointers lying around. Someone
81 is sure to try and execute them. These stubs are used to prevent
82 this possibility. */
84 static int cephwrap_connect(struct vfs_handle_struct *handle, const char *service, const char *user)
86 int ret;
87 char buf[256];
89 const char * conf_file;
91 if (cmount) {
92 handle->data = cmount; /* We have been here before */
93 cmount_cnt++;
94 return 0;
97 conf_file = lp_parm_const_string(SNUM(handle->conn), "ceph", "config_file", NULL);
99 DEBUG(2, ( "[CEPH] calling: ceph_create\n" ));
100 ret = ceph_create(&cmount, NULL);
101 if (ret)
102 goto err_out;
104 if (conf_file) {
105 /* Override the config file */
106 DEBUG(2, ( "[CEPH] calling: ceph_conf_read_file\n" ));
107 ret = ceph_conf_read_file(cmount, conf_file);
108 } else {
110 DEBUG(2, ( "[CEPH] calling: ceph_conf_read_file with %s\n", conf_file));
111 ret = ceph_conf_read_file(cmount, NULL);
114 if (ret)
115 goto err_out;
117 DEBUG(2, ( "[CEPH] calling: ceph_conf_get\n" ));
118 ret = ceph_conf_get(cmount, "log file", buf, sizeof(buf));
119 if (ret < 0)
120 goto err_out;
122 DEBUG(2, ("[CEPH] calling: ceph_mount\n"));
123 ret = ceph_mount(cmount, NULL);
124 if (ret < 0)
125 goto err_out;
129 * encode mount context/state into our vfs/connection holding structure
130 * cmount is a ceph_mount_t*
132 handle->data = cmount;
133 cmount_cnt++;
135 return 0;
137 err_out:
139 * Handle the error correctly. Ceph returns -errno.
141 DEBUG(2, ("[CEPH] Error return: %s\n", strerror(-ret)));
142 WRAP_RETURN(ret);
145 static void cephwrap_disconnect(struct vfs_handle_struct *handle)
147 if (!cmount) {
148 DEBUG(0, ("[CEPH] Error, ceph not mounted\n"));
149 return;
152 /* Should we unmount/shutdown? Only if the last disconnect? */
153 if (--cmount_cnt) {
154 DEBUG(10, ("[CEPH] Not shuting down CEPH because still more connections\n"));
155 return;
158 ceph_shutdown(cmount);
160 cmount = NULL; /* Make it safe */
163 /* Disk operations */
165 static uint64_t cephwrap_disk_free(struct vfs_handle_struct *handle,
166 const char *path, uint64_t *bsize,
167 uint64_t *dfree, uint64_t *dsize)
169 struct statvfs statvfs_buf;
170 int ret;
172 if (!(ret = ceph_statfs(handle->data, path, &statvfs_buf))) {
174 * Provide all the correct values.
176 *bsize = statvfs_buf.f_bsize;
177 *dfree = statvfs_buf.f_bavail;
178 *dsize = statvfs_buf.f_blocks;
179 DEBUG(10, ("[CEPH] bsize: %llu, dfree: %llu, dsize: %llu\n",
180 llu(*bsize), llu(*dfree), llu(*dsize)));
181 return *dfree;
182 } else {
183 DEBUG(10, ("[CEPH] ceph_statfs returned %d\n", ret));
184 WRAP_RETURN(ret);
188 static int cephwrap_get_quota(struct vfs_handle_struct *handle,
189 const char *path, enum SMB_QUOTA_TYPE qtype,
190 unid_t id, SMB_DISK_QUOTA *qt)
192 /* libceph: Ceph does not implement this */
193 #if 0
194 /* was ifdef HAVE_SYS_QUOTAS */
195 int ret;
197 ret = ceph_get_quota(handle->conn->connectpath, qtype, id, qt);
199 if (ret) {
200 errno = -ret;
201 ret = -1;
204 return ret;
205 #else
206 errno = ENOSYS;
207 return -1;
208 #endif
211 static int cephwrap_set_quota(struct vfs_handle_struct *handle, enum SMB_QUOTA_TYPE qtype, unid_t id, SMB_DISK_QUOTA *qt)
213 /* libceph: Ceph does not implement this */
214 #if 0
215 /* was ifdef HAVE_SYS_QUOTAS */
216 int ret;
218 ret = ceph_set_quota(handle->conn->connectpath, qtype, id, qt);
219 if (ret) {
220 errno = -ret;
221 ret = -1;
224 return ret;
225 #else
226 WRAP_RETURN(-ENOSYS);
227 #endif
230 static int cephwrap_statvfs(struct vfs_handle_struct *handle, const char *path, vfs_statvfs_struct *statbuf)
232 struct statvfs statvfs_buf;
233 int ret;
235 ret = ceph_statfs(handle->data, path, &statvfs_buf);
236 if (ret < 0) {
237 WRAP_RETURN(ret);
238 } else {
239 statbuf->OptimalTransferSize = statvfs_buf.f_frsize;
240 statbuf->BlockSize = statvfs_buf.f_bsize;
241 statbuf->TotalBlocks = statvfs_buf.f_blocks;
242 statbuf->BlocksAvail = statvfs_buf.f_bfree;
243 statbuf->UserBlocksAvail = statvfs_buf.f_bavail;
244 statbuf->TotalFileNodes = statvfs_buf.f_files;
245 statbuf->FreeFileNodes = statvfs_buf.f_ffree;
246 statbuf->FsIdentifier = statvfs_buf.f_fsid;
247 DEBUG(10, ("[CEPH] f_bsize: %ld, f_blocks: %ld, f_bfree: %ld, f_bavail: %ld\n",
248 (long int)statvfs_buf.f_bsize, (long int)statvfs_buf.f_blocks,
249 (long int)statvfs_buf.f_bfree, (long int)statvfs_buf.f_bavail));
251 return ret;
254 /* Directory operations */
256 static DIR *cephwrap_opendir(struct vfs_handle_struct *handle,
257 const struct smb_filename *smb_fname,
258 const char *mask, uint32_t attr)
260 int ret = 0;
261 struct ceph_dir_result *result;
262 DEBUG(10, ("[CEPH] opendir(%p, %s)\n", handle, smb_fname->base_name));
264 /* Returns NULL if it does not exist or there are problems ? */
265 ret = ceph_opendir(handle->data, smb_fname->base_name, &result);
266 if (ret < 0) {
267 result = NULL;
268 errno = -ret; /* We return result which is NULL in this case */
271 DEBUG(10, ("[CEPH] opendir(...) = %d\n", ret));
272 return (DIR *) result;
275 static DIR *cephwrap_fdopendir(struct vfs_handle_struct *handle,
276 struct files_struct *fsp,
277 const char *mask,
278 uint32_t attributes)
280 int ret = 0;
281 struct ceph_dir_result *result;
282 DEBUG(10, ("[CEPH] fdopendir(%p, %p)\n", handle, fsp));
284 ret = ceph_opendir(handle->data, fsp->fsp_name->base_name, &result);
285 if (ret < 0) {
286 result = NULL;
287 errno = -ret; /* We return result which is NULL in this case */
290 DEBUG(10, ("[CEPH] fdopendir(...) = %d\n", ret));
291 return (DIR *) result;
294 static struct dirent *cephwrap_readdir(struct vfs_handle_struct *handle,
295 DIR *dirp,
296 SMB_STRUCT_STAT *sbuf)
298 struct dirent *result;
300 DEBUG(10, ("[CEPH] readdir(%p, %p)\n", handle, dirp));
301 result = ceph_readdir(handle->data, (struct ceph_dir_result *) dirp);
302 DEBUG(10, ("[CEPH] readdir(...) = %p\n", result));
304 /* Default Posix readdir() does not give us stat info.
305 * Set to invalid to indicate we didn't return this info. */
306 if (sbuf)
307 SET_STAT_INVALID(*sbuf);
308 return result;
311 static void cephwrap_seekdir(struct vfs_handle_struct *handle, DIR *dirp, long offset)
313 DEBUG(10, ("[CEPH] seekdir(%p, %p, %ld)\n", handle, dirp, offset));
314 ceph_seekdir(handle->data, (struct ceph_dir_result *) dirp, offset);
317 static long cephwrap_telldir(struct vfs_handle_struct *handle, DIR *dirp)
319 long ret;
320 DEBUG(10, ("[CEPH] telldir(%p, %p)\n", handle, dirp));
321 ret = ceph_telldir(handle->data, (struct ceph_dir_result *) dirp);
322 DEBUG(10, ("[CEPH] telldir(...) = %ld\n", ret));
323 WRAP_RETURN(ret);
326 static void cephwrap_rewinddir(struct vfs_handle_struct *handle, DIR *dirp)
328 DEBUG(10, ("[CEPH] rewinddir(%p, %p)\n", handle, dirp));
329 ceph_rewinddir(handle->data, (struct ceph_dir_result *) dirp);
332 static int cephwrap_mkdir(struct vfs_handle_struct *handle,
333 const struct smb_filename *smb_fname,
334 mode_t mode)
336 int result;
337 bool has_dacl = False;
338 char *parent = NULL;
339 const char *path = smb_fname->base_name;
341 DEBUG(10, ("[CEPH] mkdir(%p, %s)\n", handle, path));
343 if (lp_inherit_acls(SNUM(handle->conn))
344 && parent_dirname(talloc_tos(), path, &parent, NULL)
345 && (has_dacl = directory_has_default_acl(handle->conn, parent)))
346 mode = 0777;
348 TALLOC_FREE(parent);
350 result = ceph_mkdir(handle->data, path, mode);
353 * Note. This order is important
355 if (result) {
356 WRAP_RETURN(result);
357 } else if (result == 0 && !has_dacl) {
359 * We need to do this as the default behavior of POSIX ACLs
360 * is to set the mask to be the requested group permission
361 * bits, not the group permission bits to be the requested
362 * group permission bits. This is not what we want, as it will
363 * mess up any inherited ACL bits that were set. JRA.
365 int saved_errno = errno; /* We may get ENOSYS */
366 if ((SMB_VFS_CHMOD_ACL(handle->conn, smb_fname, mode) == -1) &&
367 (errno == ENOSYS)) {
368 errno = saved_errno;
372 return result;
375 static int cephwrap_rmdir(struct vfs_handle_struct *handle,
376 const struct smb_filename *smb_fname)
378 int result;
380 DEBUG(10, ("[CEPH] rmdir(%p, %s)\n", handle, smb_fname->base_name));
381 result = ceph_rmdir(handle->data, smb_fname->base_name);
382 DEBUG(10, ("[CEPH] rmdir(...) = %d\n", result));
383 WRAP_RETURN(result);
386 static int cephwrap_closedir(struct vfs_handle_struct *handle, DIR *dirp)
388 int result;
390 DEBUG(10, ("[CEPH] closedir(%p, %p)\n", handle, dirp));
391 result = ceph_closedir(handle->data, (struct ceph_dir_result *) dirp);
392 DEBUG(10, ("[CEPH] closedir(...) = %d\n", result));
393 WRAP_RETURN(result);
396 /* File operations */
398 static int cephwrap_open(struct vfs_handle_struct *handle,
399 struct smb_filename *smb_fname,
400 files_struct *fsp, int flags, mode_t mode)
402 int result = -ENOENT;
403 DEBUG(10, ("[CEPH] open(%p, %s, %p, %d, %d)\n", handle, smb_fname_str_dbg(smb_fname), fsp, flags, mode));
405 if (smb_fname->stream_name) {
406 goto out;
409 result = ceph_open(handle->data, smb_fname->base_name, flags, mode);
410 out:
411 DEBUG(10, ("[CEPH] open(...) = %d\n", result));
412 WRAP_RETURN(result);
415 static int cephwrap_close(struct vfs_handle_struct *handle, files_struct *fsp)
417 int result;
419 DEBUG(10, ("[CEPH] close(%p, %p)\n", handle, fsp));
420 result = ceph_close(handle->data, fsp->fh->fd);
421 DEBUG(10, ("[CEPH] close(...) = %d\n", result));
423 WRAP_RETURN(result);
426 static ssize_t cephwrap_read(struct vfs_handle_struct *handle, files_struct *fsp, void *data, size_t n)
428 ssize_t result;
430 DEBUG(10, ("[CEPH] read(%p, %p, %p, %llu)\n", handle, fsp, data, llu(n)));
432 /* Using -1 for the offset means read/write rather than pread/pwrite */
433 result = ceph_read(handle->data, fsp->fh->fd, data, n, -1);
434 DEBUG(10, ("[CEPH] read(...) = %llu\n", llu(result)));
435 WRAP_RETURN(result);
438 static ssize_t cephwrap_pread(struct vfs_handle_struct *handle, files_struct *fsp, void *data,
439 size_t n, off_t offset)
441 ssize_t result;
443 DEBUG(10, ("[CEPH] pread(%p, %p, %p, %llu, %llu)\n", handle, fsp, data, llu(n), llu(offset)));
445 result = ceph_read(handle->data, fsp->fh->fd, data, n, offset);
446 DEBUG(10, ("[CEPH] pread(...) = %llu\n", llu(result)));
447 WRAP_RETURN(result);
451 static ssize_t cephwrap_write(struct vfs_handle_struct *handle, files_struct *fsp, const void *data, size_t n)
453 ssize_t result;
455 DEBUG(10, ("[CEPH] write(%p, %p, %p, %llu)\n", handle, fsp, data, llu(n)));
457 result = ceph_write(handle->data, fsp->fh->fd, data, n, -1);
459 DEBUG(10, ("[CEPH] write(...) = %llu\n", llu(result)));
460 if (result < 0) {
461 WRAP_RETURN(result);
463 fsp->fh->pos += result;
464 return result;
467 static ssize_t cephwrap_pwrite(struct vfs_handle_struct *handle, files_struct *fsp, const void *data,
468 size_t n, off_t offset)
470 ssize_t result;
472 DEBUG(10, ("[CEPH] pwrite(%p, %p, %p, %llu, %llu)\n", handle, fsp, data, llu(n), llu(offset)));
473 result = ceph_write(handle->data, fsp->fh->fd, data, n, offset);
474 DEBUG(10, ("[CEPH] pwrite(...) = %llu\n", llu(result)));
475 WRAP_RETURN(result);
478 static off_t cephwrap_lseek(struct vfs_handle_struct *handle, files_struct *fsp, off_t offset, int whence)
480 off_t result = 0;
482 DEBUG(10, ("[CEPH] cephwrap_lseek\n"));
483 /* Cope with 'stat' file opens. */
484 if (fsp->fh->fd != -1) {
485 result = ceph_lseek(handle->data, fsp->fh->fd, offset, whence);
487 WRAP_RETURN(result);
490 static ssize_t cephwrap_sendfile(struct vfs_handle_struct *handle, int tofd, files_struct *fromfsp, const DATA_BLOB *hdr,
491 off_t offset, size_t n)
494 * We cannot support sendfile because libceph is in user space.
496 DEBUG(10, ("[CEPH] cephwrap_sendfile\n"));
497 errno = ENOTSUP;
498 return -1;
501 static ssize_t cephwrap_recvfile(struct vfs_handle_struct *handle,
502 int fromfd,
503 files_struct *tofsp,
504 off_t offset,
505 size_t n)
508 * We cannot support recvfile because libceph is in user space.
510 DEBUG(10, ("[CEPH] cephwrap_recvfile\n"));
511 errno=ENOTSUP;
512 return -1;
515 static int cephwrap_rename(struct vfs_handle_struct *handle,
516 const struct smb_filename *smb_fname_src,
517 const struct smb_filename *smb_fname_dst)
519 int result = -1;
520 DEBUG(10, ("[CEPH] cephwrap_rename\n"));
521 if (smb_fname_src->stream_name || smb_fname_dst->stream_name) {
522 errno = ENOENT;
523 return result;
526 result = ceph_rename(handle->data, smb_fname_src->base_name, smb_fname_dst->base_name);
527 WRAP_RETURN(result);
530 static int cephwrap_fsync(struct vfs_handle_struct *handle, files_struct *fsp)
532 int result;
533 DEBUG(10, ("[CEPH] cephwrap_fsync\n"));
534 result = ceph_fsync(handle->data, fsp->fh->fd, false);
535 WRAP_RETURN(result);
538 static int cephwrap_stat(struct vfs_handle_struct *handle,
539 struct smb_filename *smb_fname)
541 int result = -1;
542 struct stat stbuf;
544 DEBUG(10, ("[CEPH] stat(%p, %s)\n", handle, smb_fname_str_dbg(smb_fname)));
546 if (smb_fname->stream_name) {
547 errno = ENOENT;
548 return result;
551 result = ceph_stat(handle->data, smb_fname->base_name, (struct stat *) &stbuf);
552 DEBUG(10, ("[CEPH] stat(...) = %d\n", result));
553 if (result < 0) {
554 WRAP_RETURN(result);
555 } else {
556 DEBUG(10, ("[CEPH]\tstbuf = {dev = %llu, ino = %llu, mode = 0x%x, nlink = %llu, "
557 "uid = %d, gid = %d, rdev = %llu, size = %llu, blksize = %llu, "
558 "blocks = %llu, atime = %llu, mtime = %llu, ctime = %llu}\n",
559 llu(stbuf.st_dev), llu(stbuf.st_ino), stbuf.st_mode, llu(stbuf.st_nlink),
560 stbuf.st_uid, stbuf.st_gid, llu(stbuf.st_rdev), llu(stbuf.st_size), llu(stbuf.st_blksize),
561 llu(stbuf.st_blocks), llu(stbuf.st_atime), llu(stbuf.st_mtime), llu(stbuf.st_ctime)));
563 init_stat_ex_from_stat(
564 &smb_fname->st, &stbuf,
565 lp_fake_directory_create_times(SNUM(handle->conn)));
566 DEBUG(10, ("[CEPH] mode = 0x%x\n", smb_fname->st.st_ex_mode));
567 return result;
570 static int cephwrap_fstat(struct vfs_handle_struct *handle, files_struct *fsp, SMB_STRUCT_STAT *sbuf)
572 int result = -1;
573 struct stat stbuf;
575 DEBUG(10, ("[CEPH] fstat(%p, %d)\n", handle, fsp->fh->fd));
576 result = ceph_fstat(handle->data, fsp->fh->fd, (struct stat *) &stbuf);
577 DEBUG(10, ("[CEPH] fstat(...) = %d\n", result));
578 if (result < 0) {
579 WRAP_RETURN(result);
580 } else {
581 DEBUG(10, ("[CEPH]\tstbuf = {dev = %llu, ino = %llu, mode = 0x%x, nlink = %llu, "
582 "uid = %d, gid = %d, rdev = %llu, size = %llu, blksize = %llu, "
583 "blocks = %llu, atime = %llu, mtime = %llu, ctime = %llu}\n",
584 llu(stbuf.st_dev), llu(stbuf.st_ino), stbuf.st_mode, llu(stbuf.st_nlink),
585 stbuf.st_uid, stbuf.st_gid, llu(stbuf.st_rdev), llu(stbuf.st_size), llu(stbuf.st_blksize),
586 llu(stbuf.st_blocks), llu(stbuf.st_atime), llu(stbuf.st_mtime), llu(stbuf.st_ctime)));
589 init_stat_ex_from_stat(
590 sbuf, &stbuf,
591 lp_fake_directory_create_times(SNUM(handle->conn)));
592 DEBUG(10, ("[CEPH] mode = 0x%x\n", sbuf->st_ex_mode));
593 return result;
596 static int cephwrap_lstat(struct vfs_handle_struct *handle,
597 struct smb_filename *smb_fname)
599 int result = -1;
600 struct stat stbuf;
602 DEBUG(10, ("[CEPH] lstat(%p, %s)\n", handle, smb_fname_str_dbg(smb_fname)));
604 if (smb_fname->stream_name) {
605 errno = ENOENT;
606 return result;
609 result = ceph_lstat(handle->data, smb_fname->base_name, &stbuf);
610 DEBUG(10, ("[CEPH] lstat(...) = %d\n", result));
611 if (result < 0) {
612 WRAP_RETURN(result);
614 init_stat_ex_from_stat(
615 &smb_fname->st, &stbuf,
616 lp_fake_directory_create_times(SNUM(handle->conn)));
617 return result;
620 static int cephwrap_unlink(struct vfs_handle_struct *handle,
621 const struct smb_filename *smb_fname)
623 int result = -1;
625 DEBUG(10, ("[CEPH] unlink(%p, %s)\n", handle, smb_fname_str_dbg(smb_fname)));
626 if (smb_fname->stream_name) {
627 errno = ENOENT;
628 return result;
630 result = ceph_unlink(handle->data, smb_fname->base_name);
631 DEBUG(10, ("[CEPH] unlink(...) = %d\n", result));
632 WRAP_RETURN(result);
635 static int cephwrap_chmod(struct vfs_handle_struct *handle,
636 const struct smb_filename *smb_fname,
637 mode_t mode)
639 int result;
641 DEBUG(10, ("[CEPH] chmod(%p, %s, %d)\n",
642 handle,
643 smb_fname->base_name,
644 mode));
647 * We need to do this due to the fact that the default POSIX ACL
648 * chmod modifies the ACL *mask* for the group owner, not the
649 * group owner bits directly. JRA.
654 int saved_errno = errno; /* We might get ENOSYS */
655 result = SMB_VFS_CHMOD_ACL(handle->conn,
656 smb_fname,
657 mode);
658 if (result == 0) {
659 return result;
661 /* Error - return the old errno. */
662 errno = saved_errno;
665 result = ceph_chmod(handle->data, smb_fname->base_name, mode);
666 DEBUG(10, ("[CEPH] chmod(...) = %d\n", result));
667 WRAP_RETURN(result);
670 static int cephwrap_fchmod(struct vfs_handle_struct *handle, files_struct *fsp, mode_t mode)
672 int result;
674 DEBUG(10, ("[CEPH] fchmod(%p, %p, %d)\n", handle, fsp, mode));
677 * We need to do this due to the fact that the default POSIX ACL
678 * chmod modifies the ACL *mask* for the group owner, not the
679 * group owner bits directly. JRA.
683 int saved_errno = errno; /* We might get ENOSYS */
684 if ((result = SMB_VFS_FCHMOD_ACL(fsp, mode)) == 0) {
685 return result;
687 /* Error - return the old errno. */
688 errno = saved_errno;
691 #if defined(HAVE_FCHMOD)
692 result = ceph_fchmod(handle->data, fsp->fh->fd, mode);
693 DEBUG(10, ("[CEPH] fchmod(...) = %d\n", result));
694 WRAP_RETURN(result);
695 #else
696 errno = ENOSYS;
697 #endif
698 return -1;
701 static int cephwrap_chown(struct vfs_handle_struct *handle,
702 const struct smb_filename *smb_fname,
703 uid_t uid,
704 gid_t gid)
706 int result;
707 DEBUG(10, ("[CEPH] chown(%p, %s, %d, %d)\n",
708 handle,
709 smb_fname->base_name,
710 uid,
711 gid));
712 result = ceph_chown(handle->data, smb_fname->base_name, uid, gid);
713 DEBUG(10, ("[CEPH] chown(...) = %d\n", result));
714 WRAP_RETURN(result);
717 static int cephwrap_fchown(struct vfs_handle_struct *handle, files_struct *fsp, uid_t uid, gid_t gid)
719 int result;
720 #ifdef HAVE_FCHOWN
722 DEBUG(10, ("[CEPH] fchown(%p, %p, %d, %d)\n", handle, fsp, uid, gid));
723 result = ceph_fchown(handle->data, fsp->fh->fd, uid, gid);
724 DEBUG(10, ("[CEPH] fchown(...) = %d\n", result));
725 WRAP_RETURN(result);
726 #else
727 errno = ENOSYS;
728 result = -1;
729 #endif
730 return result;
733 static int cephwrap_lchown(struct vfs_handle_struct *handle,
734 const struct smb_filename *smb_fname,
735 uid_t uid,
736 gid_t gid)
738 int result;
739 DEBUG(10, ("[CEPH] lchown(%p, %s, %d, %d)\n",
740 handle,
741 smb_fname->base_name,
742 uid,
743 gid));
744 result = ceph_lchown(handle->data, smb_fname->base_name, uid, gid);
745 DEBUG(10, ("[CEPH] lchown(...) = %d\n", result));
746 WRAP_RETURN(result);
749 static int cephwrap_chdir(struct vfs_handle_struct *handle, const char *path)
751 int result = -1;
752 DEBUG(10, ("[CEPH] chdir(%p, %s)\n", handle, path));
754 * If the path is just / use chdir because Ceph is below / and
755 * cannot deal with changing directory above its mount point
757 if (path && !strcmp(path, "/"))
758 return chdir(path);
760 result = ceph_chdir(handle->data, path);
761 DEBUG(10, ("[CEPH] chdir(...) = %d\n", result));
762 WRAP_RETURN(result);
765 static char *cephwrap_getwd(struct vfs_handle_struct *handle)
767 const char *cwd = ceph_getcwd(handle->data);
768 DEBUG(10, ("[CEPH] getwd(%p) = %s\n", handle, cwd));
769 return SMB_STRDUP(cwd);
772 static int cephwrap_ntimes(struct vfs_handle_struct *handle,
773 const struct smb_filename *smb_fname,
774 struct smb_file_time *ft)
776 struct utimbuf buf;
777 int result;
779 if (null_timespec(ft->atime)) {
780 buf.actime = smb_fname->st.st_ex_atime.tv_sec;
781 } else {
782 buf.actime = ft->atime.tv_sec;
784 if (null_timespec(ft->mtime)) {
785 buf.modtime = smb_fname->st.st_ex_mtime.tv_sec;
786 } else {
787 buf.modtime = ft->mtime.tv_sec;
789 if (!null_timespec(ft->create_time)) {
790 set_create_timespec_ea(handle->conn, smb_fname,
791 ft->create_time);
793 if (buf.actime == smb_fname->st.st_ex_atime.tv_sec &&
794 buf.modtime == smb_fname->st.st_ex_mtime.tv_sec) {
795 return 0;
798 result = ceph_utime(handle->data, smb_fname->base_name, &buf);
799 DEBUG(10, ("[CEPH] ntimes(%p, %s, {%ld, %ld, %ld, %ld}) = %d\n", handle, smb_fname_str_dbg(smb_fname),
800 ft->mtime.tv_sec, ft->atime.tv_sec, ft->ctime.tv_sec,
801 ft->create_time.tv_sec, result));
802 return result;
805 static int strict_allocate_ftruncate(struct vfs_handle_struct *handle, files_struct *fsp, off_t len)
807 off_t space_to_write;
808 uint64_t space_avail;
809 uint64_t bsize,dfree,dsize;
810 int ret;
811 NTSTATUS status;
812 SMB_STRUCT_STAT *pst;
814 status = vfs_stat_fsp(fsp);
815 if (!NT_STATUS_IS_OK(status)) {
816 return -1;
818 pst = &fsp->fsp_name->st;
820 #ifdef S_ISFIFO
821 if (S_ISFIFO(pst->st_ex_mode))
822 return 0;
823 #endif
825 if (pst->st_ex_size == len)
826 return 0;
828 /* Shrink - just ftruncate. */
829 if (pst->st_ex_size > len)
830 return ftruncate(fsp->fh->fd, len);
832 space_to_write = len - pst->st_ex_size;
834 /* for allocation try fallocate first. This can fail on some
835 platforms e.g. when the filesystem doesn't support it and no
836 emulation is being done by the libc (like on AIX with JFS1). In that
837 case we do our own emulation. fallocate implementations can
838 return ENOTSUP or EINVAL in cases like that. */
839 ret = SMB_VFS_FALLOCATE(fsp, 0, pst->st_ex_size, space_to_write);
840 if (ret == -1 && errno == ENOSPC) {
841 return -1;
843 if (ret == 0) {
844 return 0;
846 DEBUG(10,("[CEPH] strict_allocate_ftruncate: SMB_VFS_FALLOCATE failed with "
847 "error %d. Falling back to slow manual allocation\n", errno));
849 /* available disk space is enough or not? */
850 space_avail = get_dfree_info(fsp->conn,
851 fsp->fsp_name->base_name,
852 &bsize, &dfree, &dsize);
853 /* space_avail is 1k blocks */
854 if (space_avail == (uint64_t)-1 ||
855 ((uint64_t)space_to_write/1024 > space_avail) ) {
856 errno = ENOSPC;
857 return -1;
860 /* Write out the real space on disk. */
861 return vfs_slow_fallocate(fsp, pst->st_ex_size, space_to_write);
864 static int cephwrap_ftruncate(struct vfs_handle_struct *handle, files_struct *fsp, off_t len)
866 int result = -1;
867 SMB_STRUCT_STAT st;
868 char c = 0;
869 off_t currpos;
871 DEBUG(10, ("[CEPH] ftruncate(%p, %p, %llu\n", handle, fsp, llu(len)));
873 if (lp_strict_allocate(SNUM(fsp->conn))) {
874 result = strict_allocate_ftruncate(handle, fsp, len);
875 return result;
878 /* we used to just check HAVE_FTRUNCATE_EXTEND and only use
879 sys_ftruncate if the system supports it. Then I discovered that
880 you can have some filesystems that support ftruncate
881 expansion and some that don't! On Linux fat can't do
882 ftruncate extend but ext2 can. */
884 result = ceph_ftruncate(handle->data, fsp->fh->fd, len);
885 if (result == 0)
886 goto done;
888 /* According to W. R. Stevens advanced UNIX prog. Pure 4.3 BSD cannot
889 extend a file with ftruncate. Provide alternate implementation
890 for this */
891 currpos = SMB_VFS_LSEEK(fsp, 0, SEEK_CUR);
892 if (currpos == -1) {
893 goto done;
896 /* Do an fstat to see if the file is longer than the requested
897 size in which case the ftruncate above should have
898 succeeded or shorter, in which case seek to len - 1 and
899 write 1 byte of zero */
900 if (SMB_VFS_FSTAT(fsp, &st) == -1) {
901 goto done;
904 #ifdef S_ISFIFO
905 if (S_ISFIFO(st.st_ex_mode)) {
906 result = 0;
907 goto done;
909 #endif
911 if (st.st_ex_size == len) {
912 result = 0;
913 goto done;
916 if (st.st_ex_size > len) {
917 /* the sys_ftruncate should have worked */
918 goto done;
921 if (SMB_VFS_LSEEK(fsp, len-1, SEEK_SET) != len -1)
922 goto done;
924 if (SMB_VFS_WRITE(fsp, &c, 1)!=1)
925 goto done;
927 /* Seek to where we were */
928 if (SMB_VFS_LSEEK(fsp, currpos, SEEK_SET) != currpos)
929 goto done;
930 result = 0;
932 done:
934 return result;
937 static bool cephwrap_lock(struct vfs_handle_struct *handle, files_struct *fsp, int op, off_t offset, off_t count, int type)
939 DEBUG(10, ("[CEPH] lock\n"));
940 return true;
943 static int cephwrap_kernel_flock(struct vfs_handle_struct *handle, files_struct *fsp,
944 uint32_t share_mode, uint32_t access_mask)
946 DEBUG(10, ("[CEPH] kernel_flock\n"));
948 * We must return zero here and pretend all is good.
949 * One day we might have this in CEPH.
951 return 0;
954 static bool cephwrap_getlock(struct vfs_handle_struct *handle, files_struct *fsp, off_t *poffset, off_t *pcount, int *ptype, pid_t *ppid)
956 DEBUG(10, ("[CEPH] getlock returning false and errno=0\n"));
958 errno = 0;
959 return false;
963 * We cannot let this fall through to the default, because the file might only
964 * be accessible from libceph (which is a user-space client) but the fd might
965 * be for some file the kernel knows about.
967 static int cephwrap_linux_setlease(struct vfs_handle_struct *handle, files_struct *fsp,
968 int leasetype)
970 int result = -1;
972 DEBUG(10, ("[CEPH] linux_setlease\n"));
973 errno = ENOSYS;
974 return result;
977 static int cephwrap_symlink(struct vfs_handle_struct *handle, const char *oldpath, const char *newpath)
979 int result = -1;
980 DEBUG(10, ("[CEPH] symlink(%p, %s, %s)\n", handle, oldpath, newpath));
981 result = ceph_symlink(handle->data, oldpath, newpath);
982 DEBUG(10, ("[CEPH] symlink(...) = %d\n", result));
983 WRAP_RETURN(result);
986 static int cephwrap_readlink(struct vfs_handle_struct *handle, const char *path, char *buf, size_t bufsiz)
988 int result = -1;
989 DEBUG(10, ("[CEPH] readlink(%p, %s, %p, %llu)\n", handle, path, buf, llu(bufsiz)));
990 result = ceph_readlink(handle->data, path, buf, bufsiz);
991 DEBUG(10, ("[CEPH] readlink(...) = %d\n", result));
992 WRAP_RETURN(result);
995 static int cephwrap_link(struct vfs_handle_struct *handle, const char *oldpath, const char *newpath)
997 int result = -1;
998 DEBUG(10, ("[CEPH] link(%p, %s, %s)\n", handle, oldpath, newpath));
999 result = ceph_link(handle->data, oldpath, newpath);
1000 DEBUG(10, ("[CEPH] link(...) = %d\n", result));
1001 WRAP_RETURN(result);
1004 static int cephwrap_mknod(struct vfs_handle_struct *handle, const char *pathname, mode_t mode, SMB_DEV_T dev)
1006 int result = -1;
1007 DEBUG(10, ("[CEPH] mknod(%p, %s)\n", handle, pathname));
1008 result = ceph_mknod(handle->data, pathname, mode, dev);
1009 DEBUG(10, ("[CEPH] mknod(...) = %d\n", result));
1010 WRAP_RETURN(result);
1014 * This is a simple version of real-path ... a better version is needed to
1015 * ask libceph about symbolic links.
1017 static char *cephwrap_realpath(struct vfs_handle_struct *handle, const char *path)
1019 char *result;
1020 size_t len = strlen(path);
1022 result = SMB_MALLOC_ARRAY(char, PATH_MAX+1);
1023 if (len && (path[0] == '/')) {
1024 int r = asprintf(&result, "%s", path);
1025 if (r < 0) return NULL;
1026 } else if ((len >= 2) && (path[0] == '.') && (path[1] == '/')) {
1027 if (len == 2) {
1028 int r = asprintf(&result, "%s",
1029 handle->conn->connectpath);
1030 if (r < 0) return NULL;
1031 } else {
1032 int r = asprintf(&result, "%s/%s",
1033 handle->conn->connectpath, &path[2]);
1034 if (r < 0) return NULL;
1036 } else {
1037 int r = asprintf(&result, "%s/%s",
1038 handle->conn->connectpath, path);
1039 if (r < 0) return NULL;
1041 DEBUG(10, ("[CEPH] realpath(%p, %s) = %s\n", handle, path, result));
1042 return result;
1045 static int cephwrap_chflags(struct vfs_handle_struct *handle, const char *path,
1046 unsigned int flags)
1048 errno = ENOSYS;
1049 return -1;
1052 static int cephwrap_get_real_filename(struct vfs_handle_struct *handle,
1053 const char *path,
1054 const char *name,
1055 TALLOC_CTX *mem_ctx,
1056 char **found_name)
1059 * Don't fall back to get_real_filename so callers can differentiate
1060 * between a full directory scan and an actual case-insensitive stat.
1062 errno = EOPNOTSUPP;
1063 return -1;
1066 static const char *cephwrap_connectpath(struct vfs_handle_struct *handle,
1067 const char *fname)
1069 return handle->conn->connectpath;
1072 /****************************************************************
1073 Extended attribute operations.
1074 *****************************************************************/
1076 static ssize_t cephwrap_getxattr(struct vfs_handle_struct *handle,const char *path, const char *name, void *value, size_t size)
1078 int ret;
1079 DEBUG(10, ("[CEPH] getxattr(%p, %s, %s, %p, %llu)\n", handle, path, name, value, llu(size)));
1080 ret = ceph_getxattr(handle->data, path, name, value, size);
1081 DEBUG(10, ("[CEPH] getxattr(...) = %d\n", ret));
1082 if (ret < 0) {
1083 WRAP_RETURN(ret);
1084 } else {
1085 return (ssize_t)ret;
1089 static ssize_t cephwrap_fgetxattr(struct vfs_handle_struct *handle, struct files_struct *fsp, const char *name, void *value, size_t size)
1091 int ret;
1092 DEBUG(10, ("[CEPH] fgetxattr(%p, %p, %s, %p, %llu)\n", handle, fsp, name, value, llu(size)));
1093 #if LIBCEPHFS_VERSION_CODE >= LIBCEPHFS_VERSION(0, 94, 0)
1094 ret = ceph_fgetxattr(handle->data, fsp->fh->fd, name, value, size);
1095 #else
1096 ret = ceph_getxattr(handle->data, fsp->fsp_name->base_name, name, value, size);
1097 #endif
1098 DEBUG(10, ("[CEPH] fgetxattr(...) = %d\n", ret));
1099 if (ret < 0) {
1100 WRAP_RETURN(ret);
1101 } else {
1102 return (ssize_t)ret;
1106 static ssize_t cephwrap_listxattr(struct vfs_handle_struct *handle, const char *path, char *list, size_t size)
1108 int ret;
1109 DEBUG(10, ("[CEPH] listxattr(%p, %s, %p, %llu)\n", handle, path, list, llu(size)));
1110 ret = ceph_listxattr(handle->data, path, list, size);
1111 DEBUG(10, ("[CEPH] listxattr(...) = %d\n", ret));
1112 if (ret < 0) {
1113 WRAP_RETURN(ret);
1114 } else {
1115 return (ssize_t)ret;
1119 #if 0
1120 static ssize_t cephwrap_llistxattr(struct vfs_handle_struct *handle, const char *path, char *list, size_t size)
1122 int ret;
1123 DEBUG(10, ("[CEPH] llistxattr(%p, %s, %p, %llu)\n", handle, path, list, llu(size)));
1124 ret = ceph_llistxattr(handle->data, path, list, size);
1125 DEBUG(10, ("[CEPH] listxattr(...) = %d\n", ret));
1126 if (ret < 0) {
1127 WRAP_RETURN(ret);
1128 } else {
1129 return (ssize_t)ret;
1132 #endif
1134 static ssize_t cephwrap_flistxattr(struct vfs_handle_struct *handle, struct files_struct *fsp, char *list, size_t size)
1136 int ret;
1137 DEBUG(10, ("[CEPH] flistxattr(%p, %p, %s, %llu)\n", handle, fsp, list, llu(size)));
1138 #if LIBCEPHFS_VERSION_CODE >= LIBCEPHFS_VERSION(0, 94, 0)
1139 ret = ceph_flistxattr(handle->data, fsp->fh->fd, list, size);
1140 #else
1141 ret = ceph_listxattr(handle->data, fsp->fsp_name->base_name, list, size);
1142 #endif
1143 DEBUG(10, ("[CEPH] flistxattr(...) = %d\n", ret));
1144 if (ret < 0) {
1145 WRAP_RETURN(ret);
1146 } else {
1147 return (ssize_t)ret;
1151 static int cephwrap_removexattr(struct vfs_handle_struct *handle, const char *path, const char *name)
1153 int ret;
1154 DEBUG(10, ("[CEPH] removexattr(%p, %s, %s)\n", handle, path, name));
1155 ret = ceph_removexattr(handle->data, path, name);
1156 DEBUG(10, ("[CEPH] removexattr(...) = %d\n", ret));
1157 WRAP_RETURN(ret);
1160 static int cephwrap_fremovexattr(struct vfs_handle_struct *handle, struct files_struct *fsp, const char *name)
1162 int ret;
1163 DEBUG(10, ("[CEPH] fremovexattr(%p, %p, %s)\n", handle, fsp, name));
1164 #if LIBCEPHFS_VERSION_CODE >= LIBCEPHFS_VERSION(0, 94, 0)
1165 ret = ceph_fremovexattr(handle->data, fsp->fh->fd, name);
1166 #else
1167 ret = ceph_removexattr(handle->data, fsp->fsp_name->base_name, name);
1168 #endif
1169 DEBUG(10, ("[CEPH] fremovexattr(...) = %d\n", ret));
1170 WRAP_RETURN(ret);
1173 static int cephwrap_setxattr(struct vfs_handle_struct *handle, const char *path, const char *name, const void *value, size_t size, int flags)
1175 int ret;
1176 DEBUG(10, ("[CEPH] setxattr(%p, %s, %s, %p, %llu, %d)\n", handle, path, name, value, llu(size), flags));
1177 ret = ceph_setxattr(handle->data, path, name, value, size, flags);
1178 DEBUG(10, ("[CEPH] setxattr(...) = %d\n", ret));
1179 WRAP_RETURN(ret);
1182 static int cephwrap_fsetxattr(struct vfs_handle_struct *handle, struct files_struct *fsp, const char *name, const void *value, size_t size, int flags)
1184 int ret;
1185 DEBUG(10, ("[CEPH] fsetxattr(%p, %p, %s, %p, %llu, %d)\n", handle, fsp, name, value, llu(size), flags));
1186 #if LIBCEPHFS_VERSION_CODE >= LIBCEPHFS_VERSION(0, 94, 0)
1187 ret = ceph_fsetxattr(handle->data, fsp->fh->fd,
1188 name, value, size, flags);
1189 #else
1190 ret = ceph_setxattr(handle->data, fsp->fsp_name->base_name, name, value, size, flags);
1191 #endif
1192 DEBUG(10, ("[CEPH] fsetxattr(...) = %d\n", ret));
1193 WRAP_RETURN(ret);
1196 static bool cephwrap_aio_force(struct vfs_handle_struct *handle, struct files_struct *fsp)
1200 * We do not support AIO yet.
1203 DEBUG(10, ("[CEPH] cephwrap_aio_force(%p, %p) = false (errno = ENOTSUP)\n", handle, fsp));
1204 errno = ENOTSUP;
1205 return false;
1208 static bool cephwrap_is_offline(struct vfs_handle_struct *handle,
1209 const struct smb_filename *fname,
1210 SMB_STRUCT_STAT *sbuf)
1212 return false;
1215 static int cephwrap_set_offline(struct vfs_handle_struct *handle,
1216 const struct smb_filename *fname)
1218 errno = ENOTSUP;
1219 return -1;
1222 static struct vfs_fn_pointers ceph_fns = {
1223 /* Disk operations */
1225 .connect_fn = cephwrap_connect,
1226 .disconnect_fn = cephwrap_disconnect,
1227 .disk_free_fn = cephwrap_disk_free,
1228 .get_quota_fn = cephwrap_get_quota,
1229 .set_quota_fn = cephwrap_set_quota,
1230 .statvfs_fn = cephwrap_statvfs,
1232 /* Directory operations */
1234 .opendir_fn = cephwrap_opendir,
1235 .fdopendir_fn = cephwrap_fdopendir,
1236 .readdir_fn = cephwrap_readdir,
1237 .seekdir_fn = cephwrap_seekdir,
1238 .telldir_fn = cephwrap_telldir,
1239 .rewind_dir_fn = cephwrap_rewinddir,
1240 .mkdir_fn = cephwrap_mkdir,
1241 .rmdir_fn = cephwrap_rmdir,
1242 .closedir_fn = cephwrap_closedir,
1244 /* File operations */
1246 .open_fn = cephwrap_open,
1247 .close_fn = cephwrap_close,
1248 .read_fn = cephwrap_read,
1249 .pread_fn = cephwrap_pread,
1250 .write_fn = cephwrap_write,
1251 .pwrite_fn = cephwrap_pwrite,
1252 .lseek_fn = cephwrap_lseek,
1253 .sendfile_fn = cephwrap_sendfile,
1254 .recvfile_fn = cephwrap_recvfile,
1255 .rename_fn = cephwrap_rename,
1256 .fsync_fn = cephwrap_fsync,
1257 .stat_fn = cephwrap_stat,
1258 .fstat_fn = cephwrap_fstat,
1259 .lstat_fn = cephwrap_lstat,
1260 .unlink_fn = cephwrap_unlink,
1261 .chmod_fn = cephwrap_chmod,
1262 .fchmod_fn = cephwrap_fchmod,
1263 .chown_fn = cephwrap_chown,
1264 .fchown_fn = cephwrap_fchown,
1265 .lchown_fn = cephwrap_lchown,
1266 .chdir_fn = cephwrap_chdir,
1267 .getwd_fn = cephwrap_getwd,
1268 .ntimes_fn = cephwrap_ntimes,
1269 .ftruncate_fn = cephwrap_ftruncate,
1270 .lock_fn = cephwrap_lock,
1271 .kernel_flock_fn = cephwrap_kernel_flock,
1272 .linux_setlease_fn = cephwrap_linux_setlease,
1273 .getlock_fn = cephwrap_getlock,
1274 .symlink_fn = cephwrap_symlink,
1275 .readlink_fn = cephwrap_readlink,
1276 .link_fn = cephwrap_link,
1277 .mknod_fn = cephwrap_mknod,
1278 .realpath_fn = cephwrap_realpath,
1279 .chflags_fn = cephwrap_chflags,
1280 .get_real_filename_fn = cephwrap_get_real_filename,
1281 .connectpath_fn = cephwrap_connectpath,
1283 /* EA operations. */
1284 .getxattr_fn = cephwrap_getxattr,
1285 .fgetxattr_fn = cephwrap_fgetxattr,
1286 .listxattr_fn = cephwrap_listxattr,
1287 .flistxattr_fn = cephwrap_flistxattr,
1288 .removexattr_fn = cephwrap_removexattr,
1289 .fremovexattr_fn = cephwrap_fremovexattr,
1290 .setxattr_fn = cephwrap_setxattr,
1291 .fsetxattr_fn = cephwrap_fsetxattr,
1293 /* Posix ACL Operations */
1294 .sys_acl_get_file_fn = posixacl_xattr_acl_get_file,
1295 .sys_acl_get_fd_fn = posixacl_xattr_acl_get_fd,
1296 .sys_acl_blob_get_file_fn = posix_sys_acl_blob_get_file,
1297 .sys_acl_blob_get_fd_fn = posix_sys_acl_blob_get_fd,
1298 .sys_acl_set_file_fn = posixacl_xattr_acl_set_file,
1299 .sys_acl_set_fd_fn = posixacl_xattr_acl_set_fd,
1300 .sys_acl_delete_def_file_fn = posixacl_xattr_acl_delete_def_file,
1302 /* aio operations */
1303 .aio_force_fn = cephwrap_aio_force,
1305 /* offline operations */
1306 .is_offline_fn = cephwrap_is_offline,
1307 .set_offline_fn = cephwrap_set_offline
1310 NTSTATUS vfs_ceph_init(void);
1311 NTSTATUS vfs_ceph_init(void)
1313 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
1314 "ceph", &ceph_fns);