s3:vfs_aio_pthread: Convert to libasys
[Samba/gebeck_regimport.git] / source3 / modules / vfs_aio_pthread.c
blob06ac8b866759ae175dbed97bc1fa801a57b872d3
1 /*
2 * Simulate Posix AIO using pthreads.
4 * Based on the aio_fork work from Volker and Volker's pthreadpool library.
6 * Copyright (C) Volker Lendecke 2008
7 * Copyright (C) Jeremy Allison 2012
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
30 #include "lib/asys/asys.h"
31 #include "lib/util/tevent_unix.h"
32 #ifdef HAVE_LINUX_FALLOC_H
33 #include <linux/falloc.h>
34 #endif
36 static struct asys_context *asys_ctx;
37 struct tevent_fd *asys_fde;
39 struct aio_pthread_state {
40 struct tevent_req *req;
41 ssize_t ret;
42 int err;
45 static int aio_pthread_state_destructor(struct aio_pthread_state *s)
47 asys_cancel(asys_ctx, s->req);
48 return 0;
51 static struct tevent_req *aio_pthread_pread_send(
52 struct vfs_handle_struct *handle,
53 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
54 struct files_struct *fsp, void *data, size_t n, off_t offset)
56 struct tevent_req *req;
57 struct aio_pthread_state *state;
58 int ret;
60 req = tevent_req_create(mem_ctx, &state, struct aio_pthread_state);
61 if (req == NULL) {
62 return NULL;
64 state->req = req;
66 ret = asys_pread(asys_ctx, fsp->fh->fd, data, n, offset, req);
67 if (ret != 0) {
68 tevent_req_error(req, ret);
69 return tevent_req_post(req, ev);
71 talloc_set_destructor(state, aio_pthread_state_destructor);
73 return req;
76 static struct tevent_req *aio_pthread_pwrite_send(
77 struct vfs_handle_struct *handle,
78 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
79 struct files_struct *fsp, const void *data, size_t n, off_t offset)
81 struct tevent_req *req;
82 struct aio_pthread_state *state;
83 int ret;
85 req = tevent_req_create(mem_ctx, &state, struct aio_pthread_state);
86 if (req == NULL) {
87 return NULL;
89 state->req = req;
91 ret = asys_pwrite(asys_ctx, fsp->fh->fd, data, n, offset, req);
92 if (ret != 0) {
93 tevent_req_error(req, ret);
94 return tevent_req_post(req, ev);
96 talloc_set_destructor(state, aio_pthread_state_destructor);
98 return req;
101 static void aio_pthread_finished(struct tevent_context *ev,
102 struct tevent_fd *fde,
103 uint16_t flags, void *p)
105 struct tevent_req *req;
106 struct aio_pthread_state *state;
107 int res;
108 ssize_t ret;
109 int err;
110 void *private_data;
112 if ((flags & TEVENT_FD_READ) == 0) {
113 return;
116 res = asys_result(asys_ctx, &ret, &err, &private_data);
117 if (res == ECANCELED) {
118 return;
121 if (res != 0) {
122 DEBUG(1, ("asys_result returned %s\n", strerror(res)));
123 return;
126 req = talloc_get_type_abort(private_data, struct tevent_req);
127 state = tevent_req_data(req, struct aio_pthread_state);
129 talloc_set_destructor(state, NULL);
131 state->ret = ret;
132 state->err = err;
133 tevent_req_done(req);
136 static ssize_t aio_pthread_recv(struct tevent_req *req, int *err)
138 struct aio_pthread_state *state = tevent_req_data(
139 req, struct aio_pthread_state);
141 if (tevent_req_is_unix_error(req, err)) {
142 return -1;
144 *err = state->err;
145 return state->ret;
149 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
151 /************************************************************************
152 Ensure thread pool is initialized.
153 ***********************************************************************/
155 static bool init_aio_threadpool(struct event_context *ev_ctx,
156 struct pthreadpool **pp_pool,
157 void (*completion_fn)(struct event_context *,
158 struct fd_event *,
159 uint16,
160 void *))
162 struct fd_event *sock_event = NULL;
163 int ret = 0;
165 if (*pp_pool) {
166 return true;
169 ret = pthreadpool_init(aio_pending_size, pp_pool);
170 if (ret) {
171 errno = ret;
172 return false;
174 sock_event = tevent_add_fd(ev_ctx,
175 NULL,
176 pthreadpool_signal_fd(*pp_pool),
177 TEVENT_FD_READ,
178 completion_fn,
179 NULL);
180 if (sock_event == NULL) {
181 pthreadpool_destroy(*pp_pool);
182 *pp_pool = NULL;
183 return false;
186 DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
187 aio_pending_size));
189 return true;
193 * We must have openat() to do any thread-based
194 * asynchronous opens. We also must be using
195 * thread-specific credentials (Linux-only
196 * for now).
200 * NB. This threadpool is shared over all
201 * instances of this VFS module in this
202 * process, as is the current jobid.
205 static struct pthreadpool *open_pool;
206 static int aio_pthread_open_jobid;
208 struct aio_open_private_data {
209 struct aio_open_private_data *prev, *next;
210 /* Inputs. */
211 int jobid;
212 int dir_fd;
213 int flags;
214 mode_t mode;
215 uint64_t mid;
216 bool in_progress;
217 const char *fname;
218 char *dname;
219 struct smbd_server_connection *sconn;
220 const struct security_unix_token *ux_tok;
221 uint64_t initial_allocation_size;
222 /* Returns. */
223 int ret_fd;
224 int ret_errno;
227 /* List of outstanding requests we have. */
228 static struct aio_open_private_data *open_pd_list;
230 /************************************************************************
231 Find the open private data by jobid.
232 ***********************************************************************/
234 static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
236 struct aio_open_private_data *opd;
238 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
239 if (opd->jobid == jobid) {
240 return opd;
244 return NULL;
247 /************************************************************************
248 Find the open private data by mid.
249 ***********************************************************************/
251 static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
253 struct aio_open_private_data *opd;
255 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
256 if (opd->mid == mid) {
257 return opd;
261 return NULL;
264 /************************************************************************
265 Callback when an open completes.
266 ***********************************************************************/
268 static void aio_open_handle_completion(struct event_context *event_ctx,
269 struct fd_event *event,
270 uint16 flags,
271 void *p)
273 struct aio_open_private_data *opd = NULL;
274 int jobid = 0;
275 int ret;
277 DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
278 (int)flags));
280 if ((flags & EVENT_FD_READ) == 0) {
281 return;
284 ret = pthreadpool_finished_job(open_pool, &jobid);
285 if (ret) {
286 smb_panic("aio_open_handle_completion");
287 /* notreached. */
288 return;
291 opd = find_open_private_data_by_jobid(jobid);
292 if (opd == NULL) {
293 DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
294 jobid));
295 smb_panic("aio_open_handle_completion - no jobid");
296 /* notreached. */
297 return;
300 DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
301 "for file %s/%s completed\n",
302 jobid,
303 (unsigned long long)opd->mid,
304 opd->dname,
305 opd->fname));
307 opd->in_progress = false;
309 /* Find outstanding event and reschdule. */
310 if (!schedule_deferred_open_message_smb(opd->sconn, opd->mid)) {
312 * Outstanding event didn't exist or was
313 * cancelled. Free up the fd and throw
314 * away the result.
316 if (opd->ret_fd != -1) {
317 close(opd->ret_fd);
318 opd->ret_fd = -1;
320 TALLOC_FREE(opd);
324 /*****************************************************************
325 The core of the async open code - the worker function. Note we
326 use the new openat() system call to avoid any problems with
327 current working directory changes plus we change credentials
328 on the thread to prevent any security race conditions.
329 *****************************************************************/
331 static void aio_open_worker(void *private_data)
333 struct aio_open_private_data *opd =
334 (struct aio_open_private_data *)private_data;
336 /* Become the correct credential on this thread. */
337 if (set_thread_credentials(opd->ux_tok->uid,
338 opd->ux_tok->gid,
339 (size_t)opd->ux_tok->ngroups,
340 opd->ux_tok->groups) != 0) {
341 opd->ret_fd = -1;
342 opd->ret_errno = errno;
343 return;
346 opd->ret_fd = openat(opd->dir_fd,
347 opd->fname,
348 opd->flags,
349 opd->mode);
351 if (opd->ret_fd == -1) {
352 opd->ret_errno = errno;
353 } else {
354 /* Create was successful. */
355 opd->ret_errno = 0;
357 #if defined(HAVE_LINUX_FALLOCATE)
359 * See if we can set the initial
360 * allocation size. We don't record
361 * the return for this as it's an
362 * optimization - the upper layer
363 * will also do this for us once
364 * the open returns.
366 if (opd->initial_allocation_size) {
367 (void)fallocate(opd->ret_fd,
368 FALLOC_FL_KEEP_SIZE,
370 (off_t)opd->initial_allocation_size);
372 #endif
376 /************************************************************************
377 Open private data destructor.
378 ***********************************************************************/
380 static int opd_destructor(struct aio_open_private_data *opd)
382 if (opd->dir_fd != -1) {
383 close(opd->dir_fd);
385 DLIST_REMOVE(open_pd_list, opd);
386 return 0;
389 /************************************************************************
390 Create and initialize a private data struct for async open.
391 ***********************************************************************/
393 static struct aio_open_private_data *create_private_open_data(const files_struct *fsp,
394 int flags,
395 mode_t mode)
397 struct aio_open_private_data *opd = talloc_zero(NULL,
398 struct aio_open_private_data);
399 const char *fname = NULL;
401 if (!opd) {
402 return NULL;
405 opd->jobid = aio_pthread_open_jobid++;
406 opd->dir_fd = -1;
407 opd->ret_fd = -1;
408 opd->ret_errno = EINPROGRESS;
409 opd->flags = flags;
410 opd->mode = mode;
411 opd->mid = fsp->mid;
412 opd->in_progress = true;
413 opd->sconn = fsp->conn->sconn;
414 opd->initial_allocation_size = fsp->initial_allocation_size;
416 /* Copy our current credentials. */
417 opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn));
418 if (opd->ux_tok == NULL) {
419 TALLOC_FREE(opd);
420 return NULL;
424 * Copy the parent directory name and the
425 * relative path within it.
427 if (parent_dirname(opd,
428 fsp->fsp_name->base_name,
429 &opd->dname,
430 &fname) == false) {
431 TALLOC_FREE(opd);
432 return NULL;
434 opd->fname = talloc_strdup(opd, fname);
435 if (opd->fname == NULL) {
436 TALLOC_FREE(opd);
437 return NULL;
440 #if defined(O_DIRECTORY)
441 opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY);
442 #else
443 opd->dir_fd = open(opd->dname, O_RDONLY);
444 #endif
445 if (opd->dir_fd == -1) {
446 TALLOC_FREE(opd);
447 return NULL;
450 talloc_set_destructor(opd, opd_destructor);
451 DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *);
452 return opd;
455 /*****************************************************************
456 Setup an async open.
457 *****************************************************************/
459 static int open_async(const files_struct *fsp,
460 int flags,
461 mode_t mode)
463 struct aio_open_private_data *opd = NULL;
464 int ret;
466 if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
467 &open_pool,
468 aio_open_handle_completion)) {
469 return -1;
472 opd = create_private_open_data(fsp, flags, mode);
473 if (opd == NULL) {
474 DEBUG(10, ("open_async: Could not create private data.\n"));
475 return -1;
478 ret = pthreadpool_add_job(open_pool,
479 opd->jobid,
480 aio_open_worker,
481 (void *)opd);
482 if (ret) {
483 errno = ret;
484 return -1;
487 DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
488 (unsigned long long)opd->mid,
489 opd->jobid,
490 opd->dname,
491 opd->fname));
493 /* Cause the calling code to reschedule us. */
494 errno = EINTR; /* Maps to NT_STATUS_RETRY. */
495 return -1;
498 /*****************************************************************
499 Look for a matching SMB2 mid. If we find it we're rescheduled,
500 just return the completed open.
501 *****************************************************************/
503 static bool find_completed_open(files_struct *fsp,
504 int *p_fd,
505 int *p_errno)
507 struct aio_open_private_data *opd;
509 opd = find_open_private_data_by_mid(fsp->mid);
510 if (!opd) {
511 return false;
514 if (opd->in_progress) {
515 DEBUG(0,("find_completed_open: mid %llu "
516 "jobid %d still in progress for "
517 "file %s/%s. PANIC !\n",
518 (unsigned long long)opd->mid,
519 opd->jobid,
520 opd->dname,
521 opd->fname));
522 /* Disaster ! This is an open timeout. Just panic. */
523 smb_panic("find_completed_open - in_progress\n");
524 /* notreached. */
525 return false;
528 *p_fd = opd->ret_fd;
529 *p_errno = opd->ret_errno;
531 DEBUG(5,("find_completed_open: mid %llu returning "
532 "fd = %d, errno = %d (%s) "
533 "jobid (%d) for file %s\n",
534 (unsigned long long)opd->mid,
535 opd->ret_fd,
536 opd->ret_errno,
537 strerror(opd->ret_errno),
538 opd->jobid,
539 smb_fname_str_dbg(fsp->fsp_name)));
541 /* Now we can free the opd. */
542 TALLOC_FREE(opd);
543 return true;
546 /*****************************************************************
547 The core open function. Only go async on O_CREAT|O_EXCL
548 opens to prevent any race conditions.
549 *****************************************************************/
551 static int aio_pthread_open_fn(vfs_handle_struct *handle,
552 struct smb_filename *smb_fname,
553 files_struct *fsp,
554 int flags,
555 mode_t mode)
557 int my_errno = 0;
558 int fd = -1;
559 bool aio_allow_open = lp_parm_bool(
560 SNUM(handle->conn), "aio_pthread", "aio open", false);
562 if (smb_fname->stream_name) {
563 /* Don't handle stream opens. */
564 errno = ENOENT;
565 return -1;
568 if (!aio_allow_open) {
569 /* aio opens turned off. */
570 return open(smb_fname->base_name, flags, mode);
573 if (!(flags & O_CREAT)) {
574 /* Only creates matter. */
575 return open(smb_fname->base_name, flags, mode);
578 if (!(flags & O_EXCL)) {
579 /* Only creates with O_EXCL matter. */
580 return open(smb_fname->base_name, flags, mode);
584 * See if this is a reentrant call - i.e. is this a
585 * restart of an existing open that just completed.
588 if (find_completed_open(fsp,
589 &fd,
590 &my_errno)) {
591 errno = my_errno;
592 return fd;
595 /* Ok, it's a create exclusive call - pass it to a thread helper. */
596 return open_async(fsp, flags, mode);
598 #endif
600 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
601 const char *user)
603 /*********************************************************************
604 * How many threads to initialize ?
605 * 100 per process seems insane as a default until you realize that
606 * (a) Threads terminate after 1 second when idle.
607 * (b) Throttling is done in SMB2 via the crediting algorithm.
608 * (c) SMB1 clients are limited to max_mux (50) outstanding
609 * requests and Windows clients don't use this anyway.
610 * Essentially we want this to be unlimited unless smb.conf
611 * says different.
612 *********************************************************************/
613 aio_pending_size = lp_parm_int(
614 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
616 if (asys_ctx == NULL) {
617 int ret;
619 ret = asys_context_init(&asys_ctx, aio_pending_size);
620 if (ret != 0) {
621 DEBUG(1, ("asys_context_init failed: %s\n",
622 strerror(ret)));
623 return -1;
626 asys_fde = tevent_add_fd(handle->conn->sconn->ev_ctx, NULL,
627 asys_signalfd(asys_ctx),
628 TEVENT_FD_READ, aio_pthread_finished,
629 NULL);
630 if (asys_fde == NULL) {
631 DEBUG(1, ("tevent_add_fd failed\n"));
632 asys_context_destroy(asys_ctx);
633 asys_ctx = NULL;
634 return -1;
637 return SMB_VFS_NEXT_CONNECT(handle, service, user);
640 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
641 .connect_fn = aio_pthread_connect,
642 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
643 .open_fn = aio_pthread_open_fn,
644 #endif
645 .pread_send_fn = aio_pthread_pread_send,
646 .pread_recv_fn = aio_pthread_recv,
647 .pwrite_send_fn = aio_pthread_pwrite_send,
648 .pwrite_recv_fn = aio_pthread_recv,
651 NTSTATUS vfs_aio_pthread_init(void);
652 NTSTATUS vfs_aio_pthread_init(void)
654 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
655 "aio_pthread", &vfs_aio_pthread_fns);