Use HAVE_FSYNC, we bothered to test for it.
[Samba/id10ts.git] / source3 / modules / vfs_aio_pthread.c
blobae5963b7685ea2f7aed36ef59983da91ffe7ce93
1 /*
2 * Simulate Posix AIO using pthreads.
4 * Based on the aio_fork work from Volker and Volker's pthreadpool library.
6 * Copyright (C) Volker Lendecke 2008
7 * Copyright (C) Jeremy Allison 2012
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
30 #ifdef HAVE_LINUX_FALLOC_H
31 #include <linux/falloc.h>
32 #endif
34 struct aio_extra;
35 static struct pthreadpool *pool;
36 static int aio_pthread_jobid;
38 struct aio_private_data {
39 struct aio_private_data *prev, *next;
40 int jobid;
41 SMB_STRUCT_AIOCB *aiocb;
42 ssize_t ret_size;
43 int ret_errno;
44 bool cancelled;
45 bool write_command;
46 bool flush_write;
49 /* List of outstanding requests we have. */
50 static struct aio_private_data *pd_list;
52 static void aio_pthread_handle_completion(struct event_context *event_ctx,
53 struct fd_event *event,
54 uint16 flags,
55 void *p);
58 /************************************************************************
59 Ensure thread pool is initialized.
60 ***********************************************************************/
62 static bool init_aio_threadpool(struct event_context *ev_ctx,
63 struct pthreadpool **pp_pool,
64 void (*completion_fn)(struct event_context *,
65 struct fd_event *,
66 uint16,
67 void *))
69 struct fd_event *sock_event = NULL;
70 int ret = 0;
72 if (*pp_pool) {
73 return true;
76 ret = pthreadpool_init(aio_pending_size, pp_pool);
77 if (ret) {
78 errno = ret;
79 return false;
81 sock_event = tevent_add_fd(ev_ctx,
82 NULL,
83 pthreadpool_signal_fd(*pp_pool),
84 TEVENT_FD_READ,
85 completion_fn,
86 NULL);
87 if (sock_event == NULL) {
88 pthreadpool_destroy(*pp_pool);
89 *pp_pool = NULL;
90 return false;
93 DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
94 aio_pending_size));
96 return true;
100 /************************************************************************
101 Worker function - core of the pthread aio engine.
102 This is the function that actually does the IO.
103 ***********************************************************************/
105 static void aio_worker(void *private_data)
107 struct aio_private_data *pd =
108 (struct aio_private_data *)private_data;
110 if (pd->write_command) {
111 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
112 (const void *)pd->aiocb->aio_buf,
113 pd->aiocb->aio_nbytes,
114 pd->aiocb->aio_offset);
115 if (pd->ret_size == -1 && errno == ESPIPE) {
116 /* Maintain the fiction that pipes can
117 be seeked (sought?) on. */
118 pd->ret_size = sys_write(pd->aiocb->aio_fildes,
119 (const void *)pd->aiocb->aio_buf,
120 pd->aiocb->aio_nbytes);
122 #if defined(HAVE_FSYNC)
123 if (pd->ret_size != -1 && pd->flush_write) {
125 * Optimization - flush if requested.
126 * Ignore error as upper layer will
127 * also do this.
129 (void)fsync(pd->aiocb->aio_fildes);
131 #endif
132 } else {
133 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
134 (void *)pd->aiocb->aio_buf,
135 pd->aiocb->aio_nbytes,
136 pd->aiocb->aio_offset);
137 if (pd->ret_size == -1 && errno == ESPIPE) {
138 /* Maintain the fiction that pipes can
139 be seeked (sought?) on. */
140 pd->ret_size = sys_read(pd->aiocb->aio_fildes,
141 (void *)pd->aiocb->aio_buf,
142 pd->aiocb->aio_nbytes);
145 if (pd->ret_size == -1) {
146 pd->ret_errno = errno;
147 } else {
148 pd->ret_errno = 0;
152 /************************************************************************
153 Private data destructor.
154 ***********************************************************************/
156 static int pd_destructor(struct aio_private_data *pd)
158 DLIST_REMOVE(pd_list, pd);
159 return 0;
162 /************************************************************************
163 Create and initialize a private data struct.
164 ***********************************************************************/
166 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
167 SMB_STRUCT_AIOCB *aiocb)
169 struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
170 if (!pd) {
171 return NULL;
173 pd->jobid = aio_pthread_jobid++;
174 pd->aiocb = aiocb;
175 pd->ret_size = -1;
176 pd->ret_errno = EINPROGRESS;
177 talloc_set_destructor(pd, pd_destructor);
178 DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
179 return pd;
182 /************************************************************************
183 Spin off a threadpool (if needed) and initiate a pread call.
184 ***********************************************************************/
186 static int aio_pthread_read(struct vfs_handle_struct *handle,
187 struct files_struct *fsp,
188 SMB_STRUCT_AIOCB *aiocb)
190 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
191 struct aio_private_data *pd = NULL;
192 int ret;
194 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
195 &pool,
196 aio_pthread_handle_completion)) {
197 return -1;
200 pd = create_private_data(aio_ex, aiocb);
201 if (pd == NULL) {
202 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
203 return -1;
206 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
207 if (ret) {
208 errno = ret;
209 return -1;
212 DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
213 "of %llu bytes at offset %llu\n",
214 pd->jobid,
215 (unsigned long long)pd->aiocb->aio_nbytes,
216 (unsigned long long)pd->aiocb->aio_offset));
218 return 0;
221 /************************************************************************
222 Spin off a threadpool (if needed) and initiate a pwrite call.
223 ***********************************************************************/
225 static int aio_pthread_write(struct vfs_handle_struct *handle,
226 struct files_struct *fsp,
227 SMB_STRUCT_AIOCB *aiocb)
229 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
230 struct aio_private_data *pd = NULL;
231 int ret;
233 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
234 &pool,
235 aio_pthread_handle_completion)) {
236 return -1;
239 pd = create_private_data(aio_ex, aiocb);
240 if (pd == NULL) {
241 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
242 return -1;
245 pd->write_command = true;
246 if (lp_strict_sync(SNUM(fsp->conn)) &&
247 (lp_syncalways(SNUM(fsp->conn)) ||
248 aio_write_through_requested(aio_ex))) {
249 pd->flush_write = true;
253 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
254 if (ret) {
255 errno = ret;
256 return -1;
259 DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
260 "of %llu bytes at offset %llu\n",
261 pd->jobid,
262 (unsigned long long)pd->aiocb->aio_nbytes,
263 (unsigned long long)pd->aiocb->aio_offset));
265 return 0;
268 /************************************************************************
269 Find the private data by jobid.
270 ***********************************************************************/
272 static struct aio_private_data *find_private_data_by_jobid(int jobid)
274 struct aio_private_data *pd;
276 for (pd = pd_list; pd != NULL; pd = pd->next) {
277 if (pd->jobid == jobid) {
278 return pd;
282 return NULL;
285 /************************************************************************
286 Callback when an IO completes.
287 ***********************************************************************/
289 static void aio_pthread_handle_completion(struct event_context *event_ctx,
290 struct fd_event *event,
291 uint16 flags,
292 void *p)
294 struct aio_extra *aio_ex = NULL;
295 struct aio_private_data *pd = NULL;
296 int jobid = 0;
297 int ret;
299 DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
300 (int)flags));
302 if ((flags & EVENT_FD_READ) == 0) {
303 return;
306 ret = pthreadpool_finished_job(pool, &jobid);
307 if (ret) {
308 smb_panic("aio_pthread_handle_completion");
309 return;
312 pd = find_private_data_by_jobid(jobid);
313 if (pd == NULL) {
314 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
315 jobid));
316 return;
319 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
320 smbd_aio_complete_aio_ex(aio_ex);
322 DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
323 jobid ));
324 TALLOC_FREE(aio_ex);
327 /************************************************************************
328 Find the private data by aiocb.
329 ***********************************************************************/
331 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
333 struct aio_private_data *pd;
335 for (pd = pd_list; pd != NULL; pd = pd->next) {
336 if (pd->aiocb == aiocb) {
337 return pd;
341 return NULL;
344 /************************************************************************
345 Called to return the result of a completed AIO.
346 Should only be called if aio_error returns something other than EINPROGRESS.
347 Returns:
348 Any other value - return from IO operation.
349 ***********************************************************************/
351 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
352 struct files_struct *fsp,
353 SMB_STRUCT_AIOCB *aiocb)
355 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
357 if (pd == NULL) {
358 errno = EINVAL;
359 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
360 return -1;
363 pd->aiocb = NULL;
365 if (pd->cancelled) {
366 errno = ECANCELED;
367 return -1;
370 if (pd->ret_size == -1) {
371 errno = pd->ret_errno;
374 return pd->ret_size;
377 /************************************************************************
378 Called to check the result of an AIO.
379 Returns:
380 EINPROGRESS - still in progress.
381 EINVAL - invalid aiocb.
382 ECANCELED - request was cancelled.
383 0 - request completed successfully.
384 Any other value - errno from IO operation.
385 ***********************************************************************/
387 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
388 struct files_struct *fsp,
389 SMB_STRUCT_AIOCB *aiocb)
391 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
393 if (pd == NULL) {
394 return EINVAL;
396 if (pd->cancelled) {
397 return ECANCELED;
399 return pd->ret_errno;
402 /************************************************************************
403 Called to request the cancel of an AIO, or all of them on a specific
404 fsp if aiocb == NULL.
405 ***********************************************************************/
407 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
408 struct files_struct *fsp,
409 SMB_STRUCT_AIOCB *aiocb)
411 struct aio_private_data *pd = NULL;
413 for (pd = pd_list; pd != NULL; pd = pd->next) {
414 if (pd->aiocb == NULL) {
415 continue;
417 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
418 continue;
420 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
421 continue;
425 * We let the child do its job, but we discard the result when
426 * it's finished.
429 pd->cancelled = true;
432 return AIO_CANCELED;
435 /************************************************************************
436 Callback for a previously detected job completion.
437 ***********************************************************************/
439 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
440 struct tevent_immediate *im,
441 void *private_data)
443 struct aio_extra *aio_ex = NULL;
444 struct aio_private_data *pd = (struct aio_private_data *)private_data;
446 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
447 smbd_aio_complete_aio_ex(aio_ex);
448 TALLOC_FREE(aio_ex);
451 /************************************************************************
452 Private data struct used in suspend completion code.
453 ***********************************************************************/
455 struct suspend_private {
456 int num_entries;
457 int num_finished;
458 const SMB_STRUCT_AIOCB * const *aiocb_array;
461 /************************************************************************
462 Callback when an IO completes from a suspend call.
463 ***********************************************************************/
465 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
466 struct fd_event *event,
467 uint16 flags,
468 void *p)
470 struct suspend_private *sp = (struct suspend_private *)p;
471 struct aio_private_data *pd = NULL;
472 struct tevent_immediate *im = NULL;
473 int jobid;
474 int i;
476 DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
477 (int)flags));
479 if ((flags & EVENT_FD_READ) == 0) {
480 return;
483 if (pthreadpool_finished_job(pool, &jobid)) {
484 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
485 return;
488 pd = find_private_data_by_jobid(jobid);
489 if (pd == NULL) {
490 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
491 jobid));
492 return;
495 /* Is this a jobid with an aiocb we're interested in ? */
496 for (i = 0; i < sp->num_entries; i++) {
497 if (sp->aiocb_array[i] == pd->aiocb) {
498 sp->num_finished++;
499 return;
503 /* Jobid completed we weren't waiting for.
504 We must reschedule this as an immediate event
505 on the main event context. */
506 im = tevent_create_immediate(NULL);
507 if (!im) {
508 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
511 DEBUG(10,("aio_pthread_handle_suspend_completion: "
512 "re-scheduling job id %d\n",
513 jobid));
515 tevent_schedule_immediate(im,
516 server_event_context(),
517 aio_pthread_handle_immediate,
518 (void *)pd);
522 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
523 struct tevent_timer *te,
524 struct timeval now,
525 void *private_data)
527 bool *timed_out = (bool *)private_data;
528 /* Remove this timed event handler. */
529 TALLOC_FREE(te);
530 *timed_out = true;
533 /************************************************************************
534 Called to request everything to stop until all IO is completed.
535 ***********************************************************************/
537 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
538 struct files_struct *fsp,
539 const SMB_STRUCT_AIOCB * const aiocb_array[],
540 int n,
541 const struct timespec *timeout)
543 struct event_context *ev = NULL;
544 struct fd_event *sock_event = NULL;
545 int ret = -1;
546 struct suspend_private sp;
547 bool timed_out = false;
548 TALLOC_CTX *frame = talloc_stackframe();
550 /* This is a blocking call, and has to use a sub-event loop. */
551 ev = event_context_init(frame);
552 if (ev == NULL) {
553 errno = ENOMEM;
554 goto out;
557 if (timeout) {
558 struct timeval tv = convert_timespec_to_timeval(*timeout);
559 struct tevent_timer *te = tevent_add_timer(ev,
560 frame,
561 timeval_current_ofs(tv.tv_sec,
562 tv.tv_usec),
563 aio_pthread_suspend_timed_out,
564 &timed_out);
565 if (!te) {
566 errno = ENOMEM;
567 goto out;
571 ZERO_STRUCT(sp);
572 sp.num_entries = n;
573 sp.aiocb_array = aiocb_array;
574 sp.num_finished = 0;
576 sock_event = tevent_add_fd(ev,
577 frame,
578 pthreadpool_signal_fd(pool),
579 TEVENT_FD_READ,
580 aio_pthread_handle_suspend_completion,
581 (void *)&sp);
582 if (sock_event == NULL) {
583 pthreadpool_destroy(pool);
584 pool = NULL;
585 goto out;
588 * We're going to cheat here. We know that smbd/aio.c
589 * only calls this when it's waiting for every single
590 * outstanding call to finish on a close, so just wait
591 * individually for each IO to complete. We don't care
592 * what order they finish - only that they all do. JRA.
594 while (sp.num_entries != sp.num_finished) {
595 if (tevent_loop_once(ev) == -1) {
596 goto out;
599 if (timed_out) {
600 errno = EAGAIN;
601 goto out;
605 ret = 0;
607 out:
609 TALLOC_FREE(frame);
610 return ret;
613 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
615 * We must have openat() to do any thread-based
616 * asynchronous opens. We also must be using
617 * thread-specific credentials (Linux-only
618 * for now).
622 * NB. This threadpool is shared over all
623 * instances of this VFS module in this
624 * process, as is the current jobid.
627 static struct pthreadpool *open_pool;
628 static int aio_pthread_open_jobid;
630 struct aio_open_private_data {
631 struct aio_open_private_data *prev, *next;
632 /* Inputs. */
633 int jobid;
634 int dir_fd;
635 int flags;
636 mode_t mode;
637 uint64_t mid;
638 bool in_progress;
639 const char *fname;
640 char *dname;
641 struct smbd_server_connection *sconn;
642 const struct security_unix_token *ux_tok;
643 uint64_t initial_allocation_size;
644 /* Returns. */
645 int ret_fd;
646 int ret_errno;
649 /* List of outstanding requests we have. */
650 static struct aio_open_private_data *open_pd_list;
652 /************************************************************************
653 Find the open private data by jobid.
654 ***********************************************************************/
656 static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
658 struct aio_open_private_data *opd;
660 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
661 if (opd->jobid == jobid) {
662 return opd;
666 return NULL;
669 /************************************************************************
670 Find the open private data by mid.
671 ***********************************************************************/
673 static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
675 struct aio_open_private_data *opd;
677 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
678 if (opd->mid == mid) {
679 return opd;
683 return NULL;
686 /************************************************************************
687 Callback when an open completes.
688 ***********************************************************************/
690 static void aio_open_handle_completion(struct event_context *event_ctx,
691 struct fd_event *event,
692 uint16 flags,
693 void *p)
695 struct aio_open_private_data *opd = NULL;
696 int jobid = 0;
697 int ret;
699 DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
700 (int)flags));
702 if ((flags & EVENT_FD_READ) == 0) {
703 return;
706 ret = pthreadpool_finished_job(open_pool, &jobid);
707 if (ret) {
708 smb_panic("aio_open_handle_completion");
709 /* notreached. */
710 return;
713 opd = find_open_private_data_by_jobid(jobid);
714 if (opd == NULL) {
715 DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
716 jobid));
717 smb_panic("aio_open_handle_completion - no jobid");
718 /* notreached. */
719 return;
722 DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
723 "for file %s/%s completed\n",
724 jobid,
725 (unsigned long long)opd->mid,
726 opd->dname,
727 opd->fname));
729 opd->in_progress = false;
731 /* Find outstanding event and reschdule. */
732 if (!schedule_deferred_open_message_smb(opd->sconn, opd->mid)) {
734 * Outstanding event didn't exist or was
735 * cancelled. Free up the fd and throw
736 * away the result.
738 if (opd->ret_fd != -1) {
739 close(opd->ret_fd);
740 opd->ret_fd = -1;
742 TALLOC_FREE(opd);
746 /*****************************************************************
747 The core of the async open code - the worker function. Note we
748 use the new openat() system call to avoid any problems with
749 current working directory changes plus we change credentials
750 on the thread to prevent any security race conditions.
751 *****************************************************************/
753 static void aio_open_worker(void *private_data)
755 struct aio_open_private_data *opd =
756 (struct aio_open_private_data *)private_data;
758 /* Become the correct credential on this thread. */
759 if (set_thread_credentials(opd->ux_tok->uid,
760 opd->ux_tok->gid,
761 (size_t)opd->ux_tok->ngroups,
762 opd->ux_tok->groups) != 0) {
763 opd->ret_fd = -1;
764 opd->ret_errno = errno;
765 return;
768 opd->ret_fd = openat(opd->dir_fd,
769 opd->fname,
770 opd->flags,
771 opd->mode);
773 if (opd->ret_fd == -1) {
774 opd->ret_errno = errno;
775 } else {
776 /* Create was successful. */
777 opd->ret_errno = 0;
779 #if defined(HAVE_LINUX_FALLOCATE)
781 * See if we can set the initial
782 * allocation size. We don't record
783 * the return for this as it's an
784 * optimization - the upper layer
785 * will also do this for us once
786 * the open returns.
788 if (opd->initial_allocation_size) {
789 (void)fallocate(opd->ret_fd,
790 FALLOC_FL_KEEP_SIZE,
792 (off_t)opd->initial_allocation_size);
794 #endif
798 /************************************************************************
799 Open private data destructor.
800 ***********************************************************************/
802 static int opd_destructor(struct aio_open_private_data *opd)
804 if (opd->dir_fd != -1) {
805 close(opd->dir_fd);
807 DLIST_REMOVE(open_pd_list, opd);
808 return 0;
811 /************************************************************************
812 Create and initialize a private data struct for async open.
813 ***********************************************************************/
815 static struct aio_open_private_data *create_private_open_data(const files_struct *fsp,
816 int flags,
817 mode_t mode)
819 struct aio_open_private_data *opd = talloc_zero(NULL,
820 struct aio_open_private_data);
821 const char *fname = NULL;
823 if (!opd) {
824 return NULL;
827 opd->jobid = aio_pthread_open_jobid++;
828 opd->dir_fd = -1;
829 opd->ret_fd = -1;
830 opd->ret_errno = EINPROGRESS;
831 opd->flags = flags;
832 opd->mode = mode;
833 opd->mid = fsp->mid;
834 opd->in_progress = true;
835 opd->sconn = fsp->conn->sconn;
836 opd->initial_allocation_size = fsp->initial_allocation_size;
838 /* Copy our current credentials. */
839 opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn));
840 if (opd->ux_tok == NULL) {
841 TALLOC_FREE(opd);
842 return NULL;
846 * Copy the parent directory name and the
847 * relative path within it.
849 if (parent_dirname(opd,
850 fsp->fsp_name->base_name,
851 &opd->dname,
852 &fname) == false) {
853 TALLOC_FREE(opd);
854 return NULL;
856 opd->fname = talloc_strdup(opd, fname);
857 if (opd->fname == NULL) {
858 TALLOC_FREE(opd);
859 return NULL;
862 #if defined(O_DIRECTORY)
863 opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY);
864 #else
865 opd->dir_fd = open(opd->dname, O_RDONLY);
866 #endif
867 if (opd->dir_fd == -1) {
868 TALLOC_FREE(opd);
869 return NULL;
872 talloc_set_destructor(opd, opd_destructor);
873 DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *);
874 return opd;
877 /*****************************************************************
878 Setup an async open.
879 *****************************************************************/
881 static int open_async(const files_struct *fsp,
882 int flags,
883 mode_t mode)
885 struct aio_open_private_data *opd = NULL;
886 int ret;
888 if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
889 &open_pool,
890 aio_open_handle_completion)) {
891 return -1;
894 opd = create_private_open_data(fsp, flags, mode);
895 if (opd == NULL) {
896 DEBUG(10, ("open_async: Could not create private data.\n"));
897 return -1;
900 ret = pthreadpool_add_job(open_pool,
901 opd->jobid,
902 aio_open_worker,
903 (void *)opd);
904 if (ret) {
905 errno = ret;
906 return -1;
909 DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
910 (unsigned long long)opd->mid,
911 opd->jobid,
912 opd->dname,
913 opd->fname));
915 /* Cause the calling code to reschedule us. */
916 errno = EINTR; /* Maps to NT_STATUS_RETRY. */
917 return -1;
920 /*****************************************************************
921 Look for a matching SMB2 mid. If we find it we're rescheduled,
922 just return the completed open.
923 *****************************************************************/
925 static bool find_completed_open(files_struct *fsp,
926 int *p_fd,
927 int *p_errno)
929 struct aio_open_private_data *opd;
931 opd = find_open_private_data_by_mid(fsp->mid);
932 if (!opd) {
933 return false;
936 if (opd->in_progress) {
937 DEBUG(0,("find_completed_open: mid %llu "
938 "jobid %d still in progress for "
939 "file %s/%s. PANIC !\n",
940 (unsigned long long)opd->mid,
941 opd->jobid,
942 opd->dname,
943 opd->fname));
944 /* Disaster ! This is an open timeout. Just panic. */
945 smb_panic("find_completed_open - in_progress\n");
946 /* notreached. */
947 return false;
950 *p_fd = opd->ret_fd;
951 *p_errno = opd->ret_errno;
953 DEBUG(5,("find_completed_open: mid %llu returning "
954 "fd = %d, errno = %d (%s) "
955 "jobid (%d) for file %s\n",
956 (unsigned long long)opd->mid,
957 opd->ret_fd,
958 opd->ret_errno,
959 strerror(opd->ret_errno),
960 opd->jobid,
961 smb_fname_str_dbg(fsp->fsp_name)));
963 /* Now we can free the opd. */
964 TALLOC_FREE(opd);
965 return true;
968 /*****************************************************************
969 The core open function. Only go async on O_CREAT|O_EXCL
970 opens to prevent any race conditions.
971 *****************************************************************/
973 static int aio_pthread_open_fn(vfs_handle_struct *handle,
974 struct smb_filename *smb_fname,
975 files_struct *fsp,
976 int flags,
977 mode_t mode)
979 int my_errno = 0;
980 int fd = -1;
981 bool aio_allow_open = lp_parm_bool(
982 SNUM(handle->conn), "aio_pthread", "aio open", false);
984 if (smb_fname->stream_name) {
985 /* Don't handle stream opens. */
986 errno = ENOENT;
987 return -1;
990 if (!aio_allow_open) {
991 /* aio opens turned off. */
992 return open(smb_fname->base_name, flags, mode);
995 if (!(flags & O_CREAT)) {
996 /* Only creates matter. */
997 return open(smb_fname->base_name, flags, mode);
1000 if (!(flags & O_EXCL)) {
1001 /* Only creates with O_EXCL matter. */
1002 return open(smb_fname->base_name, flags, mode);
1006 * See if this is a reentrant call - i.e. is this a
1007 * restart of an existing open that just completed.
1010 if (find_completed_open(fsp,
1011 &fd,
1012 &my_errno)) {
1013 errno = my_errno;
1014 return fd;
1017 /* Ok, it's a create exclusive call - pass it to a thread helper. */
1018 return open_async(fsp, flags, mode);
1020 #endif
1022 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
1023 const char *user)
1025 /*********************************************************************
1026 * How many threads to initialize ?
1027 * 100 per process seems insane as a default until you realize that
1028 * (a) Threads terminate after 1 second when idle.
1029 * (b) Throttling is done in SMB2 via the crediting algorithm.
1030 * (c) SMB1 clients are limited to max_mux (50) outstanding
1031 * requests and Windows clients don't use this anyway.
1032 * Essentially we want this to be unlimited unless smb.conf
1033 * says different.
1034 *********************************************************************/
1035 aio_pending_size = lp_parm_int(
1036 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
1037 return SMB_VFS_NEXT_CONNECT(handle, service, user);
1040 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
1041 .connect_fn = aio_pthread_connect,
1042 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
1043 .open_fn = aio_pthread_open_fn,
1044 #endif
1045 .aio_read_fn = aio_pthread_read,
1046 .aio_write_fn = aio_pthread_write,
1047 .aio_return_fn = aio_pthread_return_fn,
1048 .aio_cancel_fn = aio_pthread_cancel,
1049 .aio_error_fn = aio_pthread_error_fn,
1050 .aio_suspend_fn = aio_pthread_suspend,
1053 NTSTATUS vfs_aio_pthread_init(void);
1054 NTSTATUS vfs_aio_pthread_init(void)
1056 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
1057 "aio_pthread", &vfs_aio_pthread_fns);