Add an optimization to pthread aio writes to also do fsync if requested.
[Samba.git] / source3 / modules / vfs_aio_pthread.c
blob4525beb818121dfeb8458ed146754c0f9dfe759d
1 /*
2 * Simulate Posix AIO using pthreads.
4 * Based on the aio_fork work from Volker and Volker's pthreadpool library.
6 * Copyright (C) Volker Lendecke 2008
7 * Copyright (C) Jeremy Allison 2012
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
31 struct aio_extra;
32 static struct pthreadpool *pool;
33 static int aio_pthread_jobid;
35 struct aio_private_data {
36 struct aio_private_data *prev, *next;
37 int jobid;
38 SMB_STRUCT_AIOCB *aiocb;
39 ssize_t ret_size;
40 int ret_errno;
41 bool cancelled;
42 bool write_command;
43 bool flush_write;
46 /* List of outstanding requests we have. */
47 static struct aio_private_data *pd_list;
49 static void aio_pthread_handle_completion(struct event_context *event_ctx,
50 struct fd_event *event,
51 uint16 flags,
52 void *p);
55 /************************************************************************
56 Ensure thread pool is initialized.
57 ***********************************************************************/
59 static bool init_aio_threadpool(struct event_context *ev_ctx,
60 struct pthreadpool **pp_pool,
61 void (*completion_fn)(struct event_context *,
62 struct fd_event *,
63 uint16,
64 void *))
66 struct fd_event *sock_event = NULL;
67 int ret = 0;
69 if (*pp_pool) {
70 return true;
73 ret = pthreadpool_init(aio_pending_size, pp_pool);
74 if (ret) {
75 errno = ret;
76 return false;
78 sock_event = tevent_add_fd(ev_ctx,
79 NULL,
80 pthreadpool_signal_fd(*pp_pool),
81 TEVENT_FD_READ,
82 completion_fn,
83 NULL);
84 if (sock_event == NULL) {
85 pthreadpool_destroy(*pp_pool);
86 *pp_pool = NULL;
87 return false;
90 DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
91 aio_pending_size));
93 return true;
97 /************************************************************************
98 Worker function - core of the pthread aio engine.
99 This is the function that actually does the IO.
100 ***********************************************************************/
102 static void aio_worker(void *private_data)
104 struct aio_private_data *pd =
105 (struct aio_private_data *)private_data;
107 if (pd->write_command) {
108 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
109 (const void *)pd->aiocb->aio_buf,
110 pd->aiocb->aio_nbytes,
111 pd->aiocb->aio_offset);
112 if (pd->ret_size == -1 && errno == ESPIPE) {
113 /* Maintain the fiction that pipes can
114 be seeked (sought?) on. */
115 pd->ret_size = sys_write(pd->aiocb->aio_fildes,
116 (const void *)pd->aiocb->aio_buf,
117 pd->aiocb->aio_nbytes);
119 if (pd->ret_size != -1 && pd->flush_write) {
121 * Optimization - flush if requested.
122 * Ignore error as upper layer will
123 * also do this.
125 (void)fsync(pd->aiocb->aio_fildes);
127 } else {
128 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
129 (void *)pd->aiocb->aio_buf,
130 pd->aiocb->aio_nbytes,
131 pd->aiocb->aio_offset);
132 if (pd->ret_size == -1 && errno == ESPIPE) {
133 /* Maintain the fiction that pipes can
134 be seeked (sought?) on. */
135 pd->ret_size = sys_read(pd->aiocb->aio_fildes,
136 (void *)pd->aiocb->aio_buf,
137 pd->aiocb->aio_nbytes);
140 if (pd->ret_size == -1) {
141 pd->ret_errno = errno;
142 } else {
143 pd->ret_errno = 0;
147 /************************************************************************
148 Private data destructor.
149 ***********************************************************************/
151 static int pd_destructor(struct aio_private_data *pd)
153 DLIST_REMOVE(pd_list, pd);
154 return 0;
157 /************************************************************************
158 Create and initialize a private data struct.
159 ***********************************************************************/
161 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
162 SMB_STRUCT_AIOCB *aiocb)
164 struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
165 if (!pd) {
166 return NULL;
168 pd->jobid = aio_pthread_jobid++;
169 pd->aiocb = aiocb;
170 pd->ret_size = -1;
171 pd->ret_errno = EINPROGRESS;
172 talloc_set_destructor(pd, pd_destructor);
173 DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
174 return pd;
177 /************************************************************************
178 Spin off a threadpool (if needed) and initiate a pread call.
179 ***********************************************************************/
181 static int aio_pthread_read(struct vfs_handle_struct *handle,
182 struct files_struct *fsp,
183 SMB_STRUCT_AIOCB *aiocb)
185 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
186 struct aio_private_data *pd = NULL;
187 int ret;
189 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
190 &pool,
191 aio_pthread_handle_completion)) {
192 return -1;
195 pd = create_private_data(aio_ex, aiocb);
196 if (pd == NULL) {
197 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
198 return -1;
201 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
202 if (ret) {
203 errno = ret;
204 return -1;
207 DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
208 "of %llu bytes at offset %llu\n",
209 pd->jobid,
210 (unsigned long long)pd->aiocb->aio_nbytes,
211 (unsigned long long)pd->aiocb->aio_offset));
213 return 0;
216 /************************************************************************
217 Spin off a threadpool (if needed) and initiate a pwrite call.
218 ***********************************************************************/
220 static int aio_pthread_write(struct vfs_handle_struct *handle,
221 struct files_struct *fsp,
222 SMB_STRUCT_AIOCB *aiocb)
224 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
225 struct aio_private_data *pd = NULL;
226 int ret;
228 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
229 &pool,
230 aio_pthread_handle_completion)) {
231 return -1;
234 pd = create_private_data(aio_ex, aiocb);
235 if (pd == NULL) {
236 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
237 return -1;
240 pd->write_command = true;
241 if (lp_strict_sync(SNUM(fsp->conn)) &&
242 (lp_syncalways(SNUM(fsp->conn)) ||
243 aio_write_through_requested(aio_ex))) {
244 pd->flush_write = true;
248 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
249 if (ret) {
250 errno = ret;
251 return -1;
254 DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
255 "of %llu bytes at offset %llu\n",
256 pd->jobid,
257 (unsigned long long)pd->aiocb->aio_nbytes,
258 (unsigned long long)pd->aiocb->aio_offset));
260 return 0;
263 /************************************************************************
264 Find the private data by jobid.
265 ***********************************************************************/
267 static struct aio_private_data *find_private_data_by_jobid(int jobid)
269 struct aio_private_data *pd;
271 for (pd = pd_list; pd != NULL; pd = pd->next) {
272 if (pd->jobid == jobid) {
273 return pd;
277 return NULL;
280 /************************************************************************
281 Callback when an IO completes.
282 ***********************************************************************/
284 static void aio_pthread_handle_completion(struct event_context *event_ctx,
285 struct fd_event *event,
286 uint16 flags,
287 void *p)
289 struct aio_extra *aio_ex = NULL;
290 struct aio_private_data *pd = NULL;
291 int jobid = 0;
292 int ret;
294 DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
295 (int)flags));
297 if ((flags & EVENT_FD_READ) == 0) {
298 return;
301 ret = pthreadpool_finished_job(pool, &jobid);
302 if (ret) {
303 smb_panic("aio_pthread_handle_completion");
304 return;
307 pd = find_private_data_by_jobid(jobid);
308 if (pd == NULL) {
309 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
310 jobid));
311 return;
314 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
315 smbd_aio_complete_aio_ex(aio_ex);
317 DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
318 jobid ));
319 TALLOC_FREE(aio_ex);
322 /************************************************************************
323 Find the private data by aiocb.
324 ***********************************************************************/
326 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
328 struct aio_private_data *pd;
330 for (pd = pd_list; pd != NULL; pd = pd->next) {
331 if (pd->aiocb == aiocb) {
332 return pd;
336 return NULL;
339 /************************************************************************
340 Called to return the result of a completed AIO.
341 Should only be called if aio_error returns something other than EINPROGRESS.
342 Returns:
343 Any other value - return from IO operation.
344 ***********************************************************************/
346 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
347 struct files_struct *fsp,
348 SMB_STRUCT_AIOCB *aiocb)
350 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
352 if (pd == NULL) {
353 errno = EINVAL;
354 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
355 return -1;
358 pd->aiocb = NULL;
360 if (pd->cancelled) {
361 errno = ECANCELED;
362 return -1;
365 if (pd->ret_size == -1) {
366 errno = pd->ret_errno;
369 return pd->ret_size;
372 /************************************************************************
373 Called to check the result of an AIO.
374 Returns:
375 EINPROGRESS - still in progress.
376 EINVAL - invalid aiocb.
377 ECANCELED - request was cancelled.
378 0 - request completed successfully.
379 Any other value - errno from IO operation.
380 ***********************************************************************/
382 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
383 struct files_struct *fsp,
384 SMB_STRUCT_AIOCB *aiocb)
386 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
388 if (pd == NULL) {
389 return EINVAL;
391 if (pd->cancelled) {
392 return ECANCELED;
394 return pd->ret_errno;
397 /************************************************************************
398 Called to request the cancel of an AIO, or all of them on a specific
399 fsp if aiocb == NULL.
400 ***********************************************************************/
402 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
403 struct files_struct *fsp,
404 SMB_STRUCT_AIOCB *aiocb)
406 struct aio_private_data *pd = NULL;
408 for (pd = pd_list; pd != NULL; pd = pd->next) {
409 if (pd->aiocb == NULL) {
410 continue;
412 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
413 continue;
415 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
416 continue;
420 * We let the child do its job, but we discard the result when
421 * it's finished.
424 pd->cancelled = true;
427 return AIO_CANCELED;
430 /************************************************************************
431 Callback for a previously detected job completion.
432 ***********************************************************************/
434 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
435 struct tevent_immediate *im,
436 void *private_data)
438 struct aio_extra *aio_ex = NULL;
439 struct aio_private_data *pd = (struct aio_private_data *)private_data;
441 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
442 smbd_aio_complete_aio_ex(aio_ex);
443 TALLOC_FREE(aio_ex);
446 /************************************************************************
447 Private data struct used in suspend completion code.
448 ***********************************************************************/
450 struct suspend_private {
451 int num_entries;
452 int num_finished;
453 const SMB_STRUCT_AIOCB * const *aiocb_array;
456 /************************************************************************
457 Callback when an IO completes from a suspend call.
458 ***********************************************************************/
460 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
461 struct fd_event *event,
462 uint16 flags,
463 void *p)
465 struct suspend_private *sp = (struct suspend_private *)p;
466 struct aio_private_data *pd = NULL;
467 struct tevent_immediate *im = NULL;
468 int jobid;
469 int i;
471 DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
472 (int)flags));
474 if ((flags & EVENT_FD_READ) == 0) {
475 return;
478 if (pthreadpool_finished_job(pool, &jobid)) {
479 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
480 return;
483 pd = find_private_data_by_jobid(jobid);
484 if (pd == NULL) {
485 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
486 jobid));
487 return;
490 /* Is this a jobid with an aiocb we're interested in ? */
491 for (i = 0; i < sp->num_entries; i++) {
492 if (sp->aiocb_array[i] == pd->aiocb) {
493 sp->num_finished++;
494 return;
498 /* Jobid completed we weren't waiting for.
499 We must reschedule this as an immediate event
500 on the main event context. */
501 im = tevent_create_immediate(NULL);
502 if (!im) {
503 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
506 DEBUG(10,("aio_pthread_handle_suspend_completion: "
507 "re-scheduling job id %d\n",
508 jobid));
510 tevent_schedule_immediate(im,
511 server_event_context(),
512 aio_pthread_handle_immediate,
513 (void *)pd);
517 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
518 struct tevent_timer *te,
519 struct timeval now,
520 void *private_data)
522 bool *timed_out = (bool *)private_data;
523 /* Remove this timed event handler. */
524 TALLOC_FREE(te);
525 *timed_out = true;
528 /************************************************************************
529 Called to request everything to stop until all IO is completed.
530 ***********************************************************************/
532 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
533 struct files_struct *fsp,
534 const SMB_STRUCT_AIOCB * const aiocb_array[],
535 int n,
536 const struct timespec *timeout)
538 struct event_context *ev = NULL;
539 struct fd_event *sock_event = NULL;
540 int ret = -1;
541 struct suspend_private sp;
542 bool timed_out = false;
543 TALLOC_CTX *frame = talloc_stackframe();
545 /* This is a blocking call, and has to use a sub-event loop. */
546 ev = event_context_init(frame);
547 if (ev == NULL) {
548 errno = ENOMEM;
549 goto out;
552 if (timeout) {
553 struct timeval tv = convert_timespec_to_timeval(*timeout);
554 struct tevent_timer *te = tevent_add_timer(ev,
555 frame,
556 timeval_current_ofs(tv.tv_sec,
557 tv.tv_usec),
558 aio_pthread_suspend_timed_out,
559 &timed_out);
560 if (!te) {
561 errno = ENOMEM;
562 goto out;
566 ZERO_STRUCT(sp);
567 sp.num_entries = n;
568 sp.aiocb_array = aiocb_array;
569 sp.num_finished = 0;
571 sock_event = tevent_add_fd(ev,
572 frame,
573 pthreadpool_signal_fd(pool),
574 TEVENT_FD_READ,
575 aio_pthread_handle_suspend_completion,
576 (void *)&sp);
577 if (sock_event == NULL) {
578 pthreadpool_destroy(pool);
579 pool = NULL;
580 goto out;
583 * We're going to cheat here. We know that smbd/aio.c
584 * only calls this when it's waiting for every single
585 * outstanding call to finish on a close, so just wait
586 * individually for each IO to complete. We don't care
587 * what order they finish - only that they all do. JRA.
589 while (sp.num_entries != sp.num_finished) {
590 if (tevent_loop_once(ev) == -1) {
591 goto out;
594 if (timed_out) {
595 errno = EAGAIN;
596 goto out;
600 ret = 0;
602 out:
604 TALLOC_FREE(frame);
605 return ret;
608 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
610 * We must have openat() to do any thread-based
611 * asynchronous opens. We also must be using
612 * thread-specific credentials (Linux-only
613 * for now).
617 * NB. This threadpool is shared over all
618 * instances of this VFS module in this
619 * process, as is the current jobid.
622 static struct pthreadpool *open_pool;
623 static int aio_pthread_open_jobid;
625 struct aio_open_private_data {
626 struct aio_open_private_data *prev, *next;
627 /* Inputs. */
628 int jobid;
629 int dir_fd;
630 int flags;
631 mode_t mode;
632 uint64_t mid;
633 bool in_progress;
634 const char *fname;
635 char *dname;
636 struct smbd_server_connection *sconn;
637 const struct security_unix_token *ux_tok;
638 /* Returns. */
639 int ret_fd;
640 int ret_errno;
643 /* List of outstanding requests we have. */
644 static struct aio_open_private_data *open_pd_list;
646 /************************************************************************
647 Find the open private data by jobid.
648 ***********************************************************************/
650 static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
652 struct aio_open_private_data *opd;
654 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
655 if (opd->jobid == jobid) {
656 return opd;
660 return NULL;
663 /************************************************************************
664 Find the open private data by mid.
665 ***********************************************************************/
667 static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
669 struct aio_open_private_data *opd;
671 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
672 if (opd->mid == mid) {
673 return opd;
677 return NULL;
680 /************************************************************************
681 Callback when an open completes.
682 ***********************************************************************/
684 static void aio_open_handle_completion(struct event_context *event_ctx,
685 struct fd_event *event,
686 uint16 flags,
687 void *p)
689 struct aio_open_private_data *opd = NULL;
690 int jobid = 0;
691 int ret;
693 DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
694 (int)flags));
696 if ((flags & EVENT_FD_READ) == 0) {
697 return;
700 ret = pthreadpool_finished_job(open_pool, &jobid);
701 if (ret) {
702 smb_panic("aio_open_handle_completion");
703 /* notreached. */
704 return;
707 opd = find_open_private_data_by_jobid(jobid);
708 if (opd == NULL) {
709 DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
710 jobid));
711 smb_panic("aio_open_handle_completion - no jobid");
712 /* notreached. */
713 return;
716 DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
717 "for file %s/%s completed\n",
718 jobid,
719 (unsigned long long)opd->mid,
720 opd->dname,
721 opd->fname));
723 opd->in_progress = false;
725 /* Find outstanding event and reschdule. */
726 if (!schedule_deferred_open_message_smb(opd->sconn, opd->mid)) {
728 * Outstanding event didn't exist or was
729 * cancelled. Free up the fd and throw
730 * away the result.
732 if (opd->ret_fd != -1) {
733 close(opd->ret_fd);
734 opd->ret_fd = -1;
736 TALLOC_FREE(opd);
740 /*****************************************************************
741 The core of the async open code - the worker function. Note we
742 use the new openat() system call to avoid any problems with
743 current working directory changes plus we change credentials
744 on the thread to prevent any security race conditions.
745 *****************************************************************/
747 static void aio_open_worker(void *private_data)
749 struct aio_open_private_data *opd =
750 (struct aio_open_private_data *)private_data;
752 /* Become the correct credential on this thread. */
753 if (set_thread_credentials(opd->ux_tok->uid,
754 opd->ux_tok->gid,
755 (size_t)opd->ux_tok->ngroups,
756 opd->ux_tok->groups) != 0) {
757 opd->ret_fd = -1;
758 opd->ret_errno = errno;
759 return;
762 opd->ret_fd = openat(opd->dir_fd,
763 opd->fname,
764 opd->flags,
765 opd->mode);
767 if (opd->ret_fd == -1) {
768 opd->ret_errno = errno;
769 } else {
770 /* Create was successful. */
771 opd->ret_errno = 0;
775 /************************************************************************
776 Open private data destructor.
777 ***********************************************************************/
779 static int opd_destructor(struct aio_open_private_data *opd)
781 if (opd->dir_fd != -1) {
782 close(opd->dir_fd);
784 DLIST_REMOVE(open_pd_list, opd);
785 return 0;
788 /************************************************************************
789 Create and initialize a private data struct for async open.
790 ***********************************************************************/
792 static struct aio_open_private_data *create_private_open_data(const files_struct *fsp,
793 int flags,
794 mode_t mode)
796 struct aio_open_private_data *opd = talloc_zero(NULL,
797 struct aio_open_private_data);
798 const char *fname = NULL;
800 if (!opd) {
801 return NULL;
804 opd->jobid = aio_pthread_open_jobid++;
805 opd->dir_fd = -1;
806 opd->ret_fd = -1;
807 opd->ret_errno = EINPROGRESS;
808 opd->flags = flags;
809 opd->mode = mode;
810 opd->mid = fsp->mid;
811 opd->in_progress = true;
812 opd->sconn = fsp->conn->sconn;
814 /* Copy our current credentials. */
815 opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn));
816 if (opd->ux_tok == NULL) {
817 TALLOC_FREE(opd);
818 return NULL;
822 * Copy the parent directory name and the
823 * relative path within it.
825 if (parent_dirname(opd,
826 fsp->fsp_name->base_name,
827 &opd->dname,
828 &fname) == false) {
829 TALLOC_FREE(opd);
830 return NULL;
832 opd->fname = talloc_strdup(opd, fname);
833 if (opd->fname == NULL) {
834 TALLOC_FREE(opd);
835 return NULL;
838 #if defined(O_DIRECTORY)
839 opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY);
840 #else
841 opd->dir_fd = open(opd->dname, O_RDONLY);
842 #endif
843 if (opd->dir_fd == -1) {
844 TALLOC_FREE(opd);
845 return NULL;
848 talloc_set_destructor(opd, opd_destructor);
849 DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *);
850 return opd;
853 /*****************************************************************
854 Setup an async open.
855 *****************************************************************/
857 static int open_async(const files_struct *fsp,
858 int flags,
859 mode_t mode)
861 struct aio_open_private_data *opd = NULL;
862 int ret;
864 if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
865 &open_pool,
866 aio_open_handle_completion)) {
867 return -1;
870 opd = create_private_open_data(fsp, flags, mode);
871 if (opd == NULL) {
872 DEBUG(10, ("open_async: Could not create private data.\n"));
873 return -1;
876 ret = pthreadpool_add_job(open_pool,
877 opd->jobid,
878 aio_open_worker,
879 (void *)opd);
880 if (ret) {
881 errno = ret;
882 return -1;
885 DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
886 (unsigned long long)opd->mid,
887 opd->jobid,
888 opd->dname,
889 opd->fname));
891 /* Cause the calling code to reschedule us. */
892 errno = EINTR; /* Maps to NT_STATUS_RETRY. */
893 return -1;
896 /*****************************************************************
897 Look for a matching SMB2 mid. If we find it we're rescheduled,
898 just return the completed open.
899 *****************************************************************/
901 static bool find_completed_open(files_struct *fsp,
902 int *p_fd,
903 int *p_errno)
905 struct aio_open_private_data *opd;
907 opd = find_open_private_data_by_mid(fsp->mid);
908 if (!opd) {
909 return false;
912 if (opd->in_progress) {
913 DEBUG(0,("find_completed_open: mid %llu "
914 "jobid %d still in progress for "
915 "file %s/%s. PANIC !\n",
916 (unsigned long long)opd->mid,
917 opd->jobid,
918 opd->dname,
919 opd->fname));
920 /* Disaster ! This is an open timeout. Just panic. */
921 smb_panic("find_completed_open - in_progress\n");
922 /* notreached. */
923 return false;
926 *p_fd = opd->ret_fd;
927 *p_errno = opd->ret_errno;
929 DEBUG(5,("find_completed_open: mid %llu returning "
930 "fd = %d, errno = %d (%s) "
931 "jobid (%d) for file %s\n",
932 (unsigned long long)opd->mid,
933 opd->ret_fd,
934 opd->ret_errno,
935 strerror(opd->ret_errno),
936 opd->jobid,
937 smb_fname_str_dbg(fsp->fsp_name)));
939 /* Now we can free the opd. */
940 TALLOC_FREE(opd);
941 return true;
944 /*****************************************************************
945 The core open function. Only go async on O_CREAT|O_EXCL
946 opens to prevent any race conditions.
947 *****************************************************************/
949 static int aio_pthread_open_fn(vfs_handle_struct *handle,
950 struct smb_filename *smb_fname,
951 files_struct *fsp,
952 int flags,
953 mode_t mode)
955 int my_errno = 0;
956 int fd = -1;
957 bool aio_allow_open = lp_parm_bool(
958 SNUM(handle->conn), "aio_pthread", "aio open", false);
960 if (smb_fname->stream_name) {
961 /* Don't handle stream opens. */
962 errno = ENOENT;
963 return -1;
966 if (!aio_allow_open) {
967 /* aio opens turned off. */
968 return open(smb_fname->base_name, flags, mode);
971 if (!(flags & O_CREAT)) {
972 /* Only creates matter. */
973 return open(smb_fname->base_name, flags, mode);
976 if (!(flags & O_EXCL)) {
977 /* Only creates with O_EXCL matter. */
978 return open(smb_fname->base_name, flags, mode);
982 * See if this is a reentrant call - i.e. is this a
983 * restart of an existing open that just completed.
986 if (find_completed_open(fsp,
987 &fd,
988 &my_errno)) {
989 errno = my_errno;
990 return fd;
993 /* Ok, it's a create exclusive call - pass it to a thread helper. */
994 return open_async(fsp, flags, mode);
996 #endif
998 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
999 const char *user)
1001 /*********************************************************************
1002 * How many threads to initialize ?
1003 * 100 per process seems insane as a default until you realize that
1004 * (a) Threads terminate after 1 second when idle.
1005 * (b) Throttling is done in SMB2 via the crediting algorithm.
1006 * (c) SMB1 clients are limited to max_mux (50) outstanding
1007 * requests and Windows clients don't use this anyway.
1008 * Essentially we want this to be unlimited unless smb.conf
1009 * says different.
1010 *********************************************************************/
1011 aio_pending_size = lp_parm_int(
1012 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
1013 return SMB_VFS_NEXT_CONNECT(handle, service, user);
1016 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
1017 .connect_fn = aio_pthread_connect,
1018 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
1019 .open_fn = aio_pthread_open_fn,
1020 #endif
1021 .aio_read_fn = aio_pthread_read,
1022 .aio_write_fn = aio_pthread_write,
1023 .aio_return_fn = aio_pthread_return_fn,
1024 .aio_cancel_fn = aio_pthread_cancel,
1025 .aio_error_fn = aio_pthread_error_fn,
1026 .aio_suspend_fn = aio_pthread_suspend,
1029 NTSTATUS vfs_aio_pthread_init(void);
1030 NTSTATUS vfs_aio_pthread_init(void)
1032 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
1033 "aio_pthread", &vfs_aio_pthread_fns);