2 * Simulate Posix AIO using pthreads.
4 * Based on the aio_fork work from Volker and Volker's pthreadpool library.
6 * Copyright (C) Volker Lendecke 2008
7 * Copyright (C) Jeremy Allison 2012
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
30 #include "lib/asys/asys.h"
31 #include "lib/util/tevent_unix.h"
32 #ifdef HAVE_LINUX_FALLOC_H
33 #include <linux/falloc.h>
36 static struct asys_context
*asys_ctx
;
37 struct tevent_fd
*asys_fde
;
39 struct aio_pthread_state
{
40 struct tevent_req
*req
;
45 static int aio_pthread_state_destructor(struct aio_pthread_state
*s
)
47 asys_cancel(asys_ctx
, s
->req
);
51 static struct tevent_req
*aio_pthread_pread_send(
52 struct vfs_handle_struct
*handle
,
53 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
54 struct files_struct
*fsp
, void *data
, size_t n
, off_t offset
)
56 struct tevent_req
*req
;
57 struct aio_pthread_state
*state
;
60 req
= tevent_req_create(mem_ctx
, &state
, struct aio_pthread_state
);
66 ret
= asys_pread(asys_ctx
, fsp
->fh
->fd
, data
, n
, offset
, req
);
68 tevent_req_error(req
, ret
);
69 return tevent_req_post(req
, ev
);
71 talloc_set_destructor(state
, aio_pthread_state_destructor
);
76 static struct tevent_req
*aio_pthread_pwrite_send(
77 struct vfs_handle_struct
*handle
,
78 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
79 struct files_struct
*fsp
, const void *data
, size_t n
, off_t offset
)
81 struct tevent_req
*req
;
82 struct aio_pthread_state
*state
;
85 req
= tevent_req_create(mem_ctx
, &state
, struct aio_pthread_state
);
91 ret
= asys_pwrite(asys_ctx
, fsp
->fh
->fd
, data
, n
, offset
, req
);
93 tevent_req_error(req
, ret
);
94 return tevent_req_post(req
, ev
);
96 talloc_set_destructor(state
, aio_pthread_state_destructor
);
101 static void aio_pthread_finished(struct tevent_context
*ev
,
102 struct tevent_fd
*fde
,
103 uint16_t flags
, void *p
)
105 struct tevent_req
*req
;
106 struct aio_pthread_state
*state
;
112 if ((flags
& TEVENT_FD_READ
) == 0) {
116 res
= asys_result(asys_ctx
, &ret
, &err
, &private_data
);
117 if (res
== ECANCELED
) {
122 DEBUG(1, ("asys_result returned %s\n", strerror(res
)));
126 req
= talloc_get_type_abort(private_data
, struct tevent_req
);
127 state
= tevent_req_data(req
, struct aio_pthread_state
);
129 talloc_set_destructor(state
, NULL
);
133 tevent_req_done(req
);
136 static ssize_t
aio_pthread_recv(struct tevent_req
*req
, int *err
)
138 struct aio_pthread_state
*state
= tevent_req_data(
139 req
, struct aio_pthread_state
);
141 if (tevent_req_is_unix_error(req
, err
)) {
149 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
151 /************************************************************************
152 Ensure thread pool is initialized.
153 ***********************************************************************/
155 static bool init_aio_threadpool(struct event_context
*ev_ctx
,
156 struct pthreadpool
**pp_pool
,
157 void (*completion_fn
)(struct event_context
*,
162 struct fd_event
*sock_event
= NULL
;
169 ret
= pthreadpool_init(aio_pending_size
, pp_pool
);
174 sock_event
= tevent_add_fd(ev_ctx
,
176 pthreadpool_signal_fd(*pp_pool
),
180 if (sock_event
== NULL
) {
181 pthreadpool_destroy(*pp_pool
);
186 DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
193 * We must have openat() to do any thread-based
194 * asynchronous opens. We also must be using
195 * thread-specific credentials (Linux-only
200 * NB. This threadpool is shared over all
201 * instances of this VFS module in this
202 * process, as is the current jobid.
205 static struct pthreadpool
*open_pool
;
206 static int aio_pthread_open_jobid
;
208 struct aio_open_private_data
{
209 struct aio_open_private_data
*prev
, *next
;
219 struct smbd_server_connection
*sconn
;
220 const struct security_unix_token
*ux_tok
;
221 uint64_t initial_allocation_size
;
227 /* List of outstanding requests we have. */
228 static struct aio_open_private_data
*open_pd_list
;
230 /************************************************************************
231 Find the open private data by jobid.
232 ***********************************************************************/
234 static struct aio_open_private_data
*find_open_private_data_by_jobid(int jobid
)
236 struct aio_open_private_data
*opd
;
238 for (opd
= open_pd_list
; opd
!= NULL
; opd
= opd
->next
) {
239 if (opd
->jobid
== jobid
) {
247 /************************************************************************
248 Find the open private data by mid.
249 ***********************************************************************/
251 static struct aio_open_private_data
*find_open_private_data_by_mid(uint64_t mid
)
253 struct aio_open_private_data
*opd
;
255 for (opd
= open_pd_list
; opd
!= NULL
; opd
= opd
->next
) {
256 if (opd
->mid
== mid
) {
264 /************************************************************************
265 Callback when an open completes.
266 ***********************************************************************/
268 static void aio_open_handle_completion(struct event_context
*event_ctx
,
269 struct fd_event
*event
,
273 struct aio_open_private_data
*opd
= NULL
;
277 DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
280 if ((flags
& EVENT_FD_READ
) == 0) {
284 ret
= pthreadpool_finished_job(open_pool
, &jobid
);
286 smb_panic("aio_open_handle_completion");
291 opd
= find_open_private_data_by_jobid(jobid
);
293 DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
295 smb_panic("aio_open_handle_completion - no jobid");
300 DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
301 "for file %s/%s completed\n",
303 (unsigned long long)opd
->mid
,
307 opd
->in_progress
= false;
309 /* Find outstanding event and reschdule. */
310 if (!schedule_deferred_open_message_smb(opd
->sconn
, opd
->mid
)) {
312 * Outstanding event didn't exist or was
313 * cancelled. Free up the fd and throw
316 if (opd
->ret_fd
!= -1) {
324 /*****************************************************************
325 The core of the async open code - the worker function. Note we
326 use the new openat() system call to avoid any problems with
327 current working directory changes plus we change credentials
328 on the thread to prevent any security race conditions.
329 *****************************************************************/
331 static void aio_open_worker(void *private_data
)
333 struct aio_open_private_data
*opd
=
334 (struct aio_open_private_data
*)private_data
;
336 /* Become the correct credential on this thread. */
337 if (set_thread_credentials(opd
->ux_tok
->uid
,
339 (size_t)opd
->ux_tok
->ngroups
,
340 opd
->ux_tok
->groups
) != 0) {
342 opd
->ret_errno
= errno
;
346 opd
->ret_fd
= openat(opd
->dir_fd
,
351 if (opd
->ret_fd
== -1) {
352 opd
->ret_errno
= errno
;
354 /* Create was successful. */
357 #if defined(HAVE_LINUX_FALLOCATE)
359 * See if we can set the initial
360 * allocation size. We don't record
361 * the return for this as it's an
362 * optimization - the upper layer
363 * will also do this for us once
366 if (opd
->initial_allocation_size
) {
367 (void)fallocate(opd
->ret_fd
,
370 (off_t
)opd
->initial_allocation_size
);
376 /************************************************************************
377 Open private data destructor.
378 ***********************************************************************/
380 static int opd_destructor(struct aio_open_private_data
*opd
)
382 if (opd
->dir_fd
!= -1) {
385 DLIST_REMOVE(open_pd_list
, opd
);
389 /************************************************************************
390 Create and initialize a private data struct for async open.
391 ***********************************************************************/
393 static struct aio_open_private_data
*create_private_open_data(const files_struct
*fsp
,
397 struct aio_open_private_data
*opd
= talloc_zero(NULL
,
398 struct aio_open_private_data
);
399 const char *fname
= NULL
;
405 opd
->jobid
= aio_pthread_open_jobid
++;
408 opd
->ret_errno
= EINPROGRESS
;
412 opd
->in_progress
= true;
413 opd
->sconn
= fsp
->conn
->sconn
;
414 opd
->initial_allocation_size
= fsp
->initial_allocation_size
;
416 /* Copy our current credentials. */
417 opd
->ux_tok
= copy_unix_token(opd
, get_current_utok(fsp
->conn
));
418 if (opd
->ux_tok
== NULL
) {
424 * Copy the parent directory name and the
425 * relative path within it.
427 if (parent_dirname(opd
,
428 fsp
->fsp_name
->base_name
,
434 opd
->fname
= talloc_strdup(opd
, fname
);
435 if (opd
->fname
== NULL
) {
440 #if defined(O_DIRECTORY)
441 opd
->dir_fd
= open(opd
->dname
, O_RDONLY
|O_DIRECTORY
);
443 opd
->dir_fd
= open(opd
->dname
, O_RDONLY
);
445 if (opd
->dir_fd
== -1) {
450 talloc_set_destructor(opd
, opd_destructor
);
451 DLIST_ADD_END(open_pd_list
, opd
, struct aio_open_private_data
*);
455 /*****************************************************************
457 *****************************************************************/
459 static int open_async(const files_struct
*fsp
,
463 struct aio_open_private_data
*opd
= NULL
;
466 if (!init_aio_threadpool(fsp
->conn
->sconn
->ev_ctx
,
468 aio_open_handle_completion
)) {
472 opd
= create_private_open_data(fsp
, flags
, mode
);
474 DEBUG(10, ("open_async: Could not create private data.\n"));
478 ret
= pthreadpool_add_job(open_pool
,
487 DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
488 (unsigned long long)opd
->mid
,
493 /* Cause the calling code to reschedule us. */
494 errno
= EINTR
; /* Maps to NT_STATUS_RETRY. */
498 /*****************************************************************
499 Look for a matching SMB2 mid. If we find it we're rescheduled,
500 just return the completed open.
501 *****************************************************************/
503 static bool find_completed_open(files_struct
*fsp
,
507 struct aio_open_private_data
*opd
;
509 opd
= find_open_private_data_by_mid(fsp
->mid
);
514 if (opd
->in_progress
) {
515 DEBUG(0,("find_completed_open: mid %llu "
516 "jobid %d still in progress for "
517 "file %s/%s. PANIC !\n",
518 (unsigned long long)opd
->mid
,
522 /* Disaster ! This is an open timeout. Just panic. */
523 smb_panic("find_completed_open - in_progress\n");
529 *p_errno
= opd
->ret_errno
;
531 DEBUG(5,("find_completed_open: mid %llu returning "
532 "fd = %d, errno = %d (%s) "
533 "jobid (%d) for file %s\n",
534 (unsigned long long)opd
->mid
,
537 strerror(opd
->ret_errno
),
539 smb_fname_str_dbg(fsp
->fsp_name
)));
541 /* Now we can free the opd. */
546 /*****************************************************************
547 The core open function. Only go async on O_CREAT|O_EXCL
548 opens to prevent any race conditions.
549 *****************************************************************/
551 static int aio_pthread_open_fn(vfs_handle_struct
*handle
,
552 struct smb_filename
*smb_fname
,
559 bool aio_allow_open
= lp_parm_bool(
560 SNUM(handle
->conn
), "aio_pthread", "aio open", false);
562 if (smb_fname
->stream_name
) {
563 /* Don't handle stream opens. */
568 if (!aio_allow_open
) {
569 /* aio opens turned off. */
570 return open(smb_fname
->base_name
, flags
, mode
);
573 if (!(flags
& O_CREAT
)) {
574 /* Only creates matter. */
575 return open(smb_fname
->base_name
, flags
, mode
);
578 if (!(flags
& O_EXCL
)) {
579 /* Only creates with O_EXCL matter. */
580 return open(smb_fname
->base_name
, flags
, mode
);
584 * See if this is a reentrant call - i.e. is this a
585 * restart of an existing open that just completed.
588 if (find_completed_open(fsp
,
595 /* Ok, it's a create exclusive call - pass it to a thread helper. */
596 return open_async(fsp
, flags
, mode
);
600 static int aio_pthread_connect(vfs_handle_struct
*handle
, const char *service
,
603 /*********************************************************************
604 * How many threads to initialize ?
605 * 100 per process seems insane as a default until you realize that
606 * (a) Threads terminate after 1 second when idle.
607 * (b) Throttling is done in SMB2 via the crediting algorithm.
608 * (c) SMB1 clients are limited to max_mux (50) outstanding
609 * requests and Windows clients don't use this anyway.
610 * Essentially we want this to be unlimited unless smb.conf
612 *********************************************************************/
613 aio_pending_size
= lp_parm_int(
614 SNUM(handle
->conn
), "aio_pthread", "aio num threads", 100);
616 if (asys_ctx
== NULL
) {
619 ret
= asys_context_init(&asys_ctx
, aio_pending_size
);
621 DEBUG(1, ("asys_context_init failed: %s\n",
626 asys_fde
= tevent_add_fd(handle
->conn
->sconn
->ev_ctx
, NULL
,
627 asys_signalfd(asys_ctx
),
628 TEVENT_FD_READ
, aio_pthread_finished
,
630 if (asys_fde
== NULL
) {
631 DEBUG(1, ("tevent_add_fd failed\n"));
632 asys_context_destroy(asys_ctx
);
637 return SMB_VFS_NEXT_CONNECT(handle
, service
, user
);
640 static struct vfs_fn_pointers vfs_aio_pthread_fns
= {
641 .connect_fn
= aio_pthread_connect
,
642 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
643 .open_fn
= aio_pthread_open_fn
,
645 .pread_send_fn
= aio_pthread_pread_send
,
646 .pread_recv_fn
= aio_pthread_recv
,
647 .pwrite_send_fn
= aio_pthread_pwrite_send
,
648 .pwrite_recv_fn
= aio_pthread_recv
,
651 NTSTATUS
vfs_aio_pthread_init(void);
652 NTSTATUS
vfs_aio_pthread_init(void)
654 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION
,
655 "aio_pthread", &vfs_aio_pthread_fns
);