2 * Simulate Posix AIO using pthreads.
4 * Based on the aio_fork work from Volker and Volker's pthreadpool library.
6 * Copyright (C) Volker Lendecke 2008
7 * Copyright (C) Jeremy Allison 2012
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "lib/pthreadpool/pthreadpool.h"
31 static struct pthreadpool
*pool
;
32 static int aio_pthread_jobid
;
34 struct aio_private_data
{
35 struct aio_private_data
*prev
, *next
;
37 SMB_STRUCT_AIOCB
*aiocb
;
44 /* List of outstanding requests we have. */
45 static struct aio_private_data
*pd_list
;
47 static void aio_pthread_handle_completion(struct event_context
*event_ctx
,
48 struct fd_event
*event
,
52 /************************************************************************
53 How many threads to initialize ?
54 100 per process seems insane as a default until you realize that
55 (a) Threads terminate after 1 second when idle.
56 (b) Throttling is done in SMB2 via the crediting algorithm.
57 (c) SMB1 clients are limited to max_mux (50) outstanding requests and
58 Windows clients don't use this anyway.
59 Essentially we want this to be unlimited unless smb.conf says different.
60 ***********************************************************************/
62 static int aio_get_num_threads(struct vfs_handle_struct
*handle
)
64 return lp_parm_int(SNUM(handle
->conn
),
65 "aio_pthread", "aio num threads", 100);
68 /************************************************************************
69 Ensure thread pool is initialized.
70 ***********************************************************************/
72 static bool init_aio_threadpool(struct vfs_handle_struct
*handle
)
74 struct fd_event
*sock_event
= NULL
;
82 num_threads
= aio_get_num_threads(handle
);
83 ret
= pthreadpool_init(num_threads
, &pool
);
88 sock_event
= tevent_add_fd(server_event_context(),
90 pthreadpool_signal_fd(pool
),
92 aio_pthread_handle_completion
,
94 if (sock_event
== NULL
) {
95 pthreadpool_destroy(pool
);
100 DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
107 /************************************************************************
108 Worker function - core of the pthread aio engine.
109 This is the function that actually does the IO.
110 ***********************************************************************/
112 static void aio_worker(void *private_data
)
114 struct aio_private_data
*pd
=
115 (struct aio_private_data
*)private_data
;
117 if (pd
->write_command
) {
118 pd
->ret_size
= sys_pwrite(pd
->aiocb
->aio_fildes
,
119 (const void *)pd
->aiocb
->aio_buf
,
120 pd
->aiocb
->aio_nbytes
,
121 pd
->aiocb
->aio_offset
);
122 if (pd
->ret_size
== -1 && errno
== ESPIPE
) {
123 /* Maintain the fiction that pipes can
124 be seeked (sought?) on. */
125 pd
->ret_size
= sys_write(pd
->aiocb
->aio_fildes
,
126 (const void *)pd
->aiocb
->aio_buf
,
127 pd
->aiocb
->aio_nbytes
);
130 pd
->ret_size
= sys_pread(pd
->aiocb
->aio_fildes
,
131 (void *)pd
->aiocb
->aio_buf
,
132 pd
->aiocb
->aio_nbytes
,
133 pd
->aiocb
->aio_offset
);
134 if (pd
->ret_size
== -1 && errno
== ESPIPE
) {
135 /* Maintain the fiction that pipes can
136 be seeked (sought?) on. */
137 pd
->ret_size
= sys_read(pd
->aiocb
->aio_fildes
,
138 (void *)pd
->aiocb
->aio_buf
,
139 pd
->aiocb
->aio_nbytes
);
142 if (pd
->ret_size
== -1) {
143 pd
->ret_errno
= errno
;
149 /************************************************************************
150 Private data destructor.
151 ***********************************************************************/
153 static int pd_destructor(struct aio_private_data
*pd
)
155 DLIST_REMOVE(pd_list
, pd
);
159 /************************************************************************
160 Create and initialize a private data struct.
161 ***********************************************************************/
163 static struct aio_private_data
*create_private_data(TALLOC_CTX
*ctx
,
164 SMB_STRUCT_AIOCB
*aiocb
)
166 struct aio_private_data
*pd
= talloc_zero(ctx
, struct aio_private_data
);
170 pd
->jobid
= aio_pthread_jobid
++;
173 pd
->ret_errno
= EINPROGRESS
;
174 talloc_set_destructor(pd
, pd_destructor
);
175 DLIST_ADD_END(pd_list
, pd
, struct aio_private_data
*);
179 /************************************************************************
180 Spin off a threadpool (if needed) and initiate a pread call.
181 ***********************************************************************/
183 static int aio_pthread_read(struct vfs_handle_struct
*handle
,
184 struct files_struct
*fsp
,
185 SMB_STRUCT_AIOCB
*aiocb
)
187 struct aio_extra
*aio_ex
= (struct aio_extra
*)aiocb
->aio_sigevent
.sigev_value
.sival_ptr
;
188 struct aio_private_data
*pd
= NULL
;
191 if (!init_aio_threadpool(handle
)) {
195 pd
= create_private_data(aio_ex
, aiocb
);
197 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
201 ret
= pthreadpool_add_job(pool
, pd
->jobid
, aio_worker
, (void *)pd
);
207 DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
208 "of %llu bytes at offset %llu\n",
210 (unsigned long long)pd
->aiocb
->aio_nbytes
,
211 (unsigned long long)pd
->aiocb
->aio_offset
));
216 /************************************************************************
217 Spin off a threadpool (if needed) and initiate a pwrite call.
218 ***********************************************************************/
220 static int aio_pthread_write(struct vfs_handle_struct
*handle
,
221 struct files_struct
*fsp
,
222 SMB_STRUCT_AIOCB
*aiocb
)
224 struct aio_extra
*aio_ex
= (struct aio_extra
*)aiocb
->aio_sigevent
.sigev_value
.sival_ptr
;
225 struct aio_private_data
*pd
= NULL
;
228 if (!init_aio_threadpool(handle
)) {
232 pd
= create_private_data(aio_ex
, aiocb
);
234 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
238 pd
->write_command
= true;
240 ret
= pthreadpool_add_job(pool
, pd
->jobid
, aio_worker
, (void *)pd
);
246 DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
247 "of %llu bytes at offset %llu\n",
249 (unsigned long long)pd
->aiocb
->aio_nbytes
,
250 (unsigned long long)pd
->aiocb
->aio_offset
));
255 /************************************************************************
256 Find the private data by jobid.
257 ***********************************************************************/
259 static struct aio_private_data
*find_private_data_by_jobid(int jobid
)
261 struct aio_private_data
*pd
;
263 for (pd
= pd_list
; pd
!= NULL
; pd
= pd
->next
) {
264 if (pd
->jobid
== jobid
) {
272 /************************************************************************
273 Callback when an IO completes.
274 ***********************************************************************/
276 static void aio_pthread_handle_completion(struct event_context
*event_ctx
,
277 struct fd_event
*event
,
281 struct aio_extra
*aio_ex
= NULL
;
282 struct aio_private_data
*pd
= NULL
;
286 DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
289 if ((flags
& EVENT_FD_READ
) == 0) {
293 ret
= pthreadpool_finished_job(pool
, &jobid
);
295 smb_panic("aio_pthread_handle_completion");
299 pd
= find_private_data_by_jobid(jobid
);
301 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
306 aio_ex
= (struct aio_extra
*)pd
->aiocb
->aio_sigevent
.sigev_value
.sival_ptr
;
307 smbd_aio_complete_aio_ex(aio_ex
);
309 DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
314 /************************************************************************
315 Find the private data by aiocb.
316 ***********************************************************************/
318 static struct aio_private_data
*find_private_data_by_aiocb(SMB_STRUCT_AIOCB
*aiocb
)
320 struct aio_private_data
*pd
;
322 for (pd
= pd_list
; pd
!= NULL
; pd
= pd
->next
) {
323 if (pd
->aiocb
== aiocb
) {
331 /************************************************************************
332 Called to return the result of a completed AIO.
333 Should only be called if aio_error returns something other than EINPROGRESS.
335 Any other value - return from IO operation.
336 ***********************************************************************/
338 static ssize_t
aio_pthread_return_fn(struct vfs_handle_struct
*handle
,
339 struct files_struct
*fsp
,
340 SMB_STRUCT_AIOCB
*aiocb
)
342 struct aio_private_data
*pd
= find_private_data_by_aiocb(aiocb
);
346 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
352 if (pd
->ret_size
== -1) {
353 errno
= pd
->ret_errno
;
359 /************************************************************************
360 Called to check the result of an AIO.
362 EINPROGRESS - still in progress.
363 EINVAL - invalid aiocb.
364 ECANCELED - request was cancelled.
365 0 - request completed successfully.
366 Any other value - errno from IO operation.
367 ***********************************************************************/
369 static int aio_pthread_error_fn(struct vfs_handle_struct
*handle
,
370 struct files_struct
*fsp
,
371 SMB_STRUCT_AIOCB
*aiocb
)
373 struct aio_private_data
*pd
= find_private_data_by_aiocb(aiocb
);
381 return pd
->ret_errno
;
384 /************************************************************************
385 Called to request the cancel of an AIO, or all of them on a specific
386 fsp if aiocb == NULL.
387 ***********************************************************************/
389 static int aio_pthread_cancel(struct vfs_handle_struct
*handle
,
390 struct files_struct
*fsp
,
391 SMB_STRUCT_AIOCB
*aiocb
)
393 struct aio_private_data
*pd
= NULL
;
395 for (pd
= pd_list
; pd
!= NULL
; pd
= pd
->next
) {
396 if (pd
->aiocb
== NULL
) {
399 if (pd
->aiocb
->aio_fildes
!= fsp
->fh
->fd
) {
402 if ((aiocb
!= NULL
) && (pd
->aiocb
!= aiocb
)) {
407 * We let the child do its job, but we discard the result when
411 pd
->cancelled
= true;
417 /************************************************************************
418 Callback for a previously detected job completion.
419 ***********************************************************************/
421 static void aio_pthread_handle_immediate(struct tevent_context
*ctx
,
422 struct tevent_immediate
*im
,
425 struct aio_extra
*aio_ex
= NULL
;
426 int *pjobid
= (int *)private_data
;
427 struct aio_private_data
*pd
= find_private_data_by_jobid(*pjobid
);
430 DEBUG(1, ("aio_pthread_handle_immediate cannot find jobid %d\n",
437 aio_ex
= (struct aio_extra
*)pd
->aiocb
->aio_sigevent
.sigev_value
.sival_ptr
;
438 smbd_aio_complete_aio_ex(aio_ex
);
442 /************************************************************************
443 Private data struct used in suspend completion code.
444 ***********************************************************************/
446 struct suspend_private
{
449 const SMB_STRUCT_AIOCB
* const *aiocb_array
;
452 /************************************************************************
453 Callback when an IO completes from a suspend call.
454 ***********************************************************************/
456 static void aio_pthread_handle_suspend_completion(struct event_context
*event_ctx
,
457 struct fd_event
*event
,
461 struct suspend_private
*sp
= (struct suspend_private
*)p
;
462 struct aio_private_data
*pd
= NULL
;
463 struct tevent_immediate
*im
= NULL
;
467 DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
470 if ((flags
& EVENT_FD_READ
) == 0) {
474 pjobid
= talloc_array(NULL
, int, 1);
475 if (pjobid
== NULL
) {
476 smb_panic("aio_pthread_handle_suspend_completion: no memory.");
479 if (pthreadpool_finished_job(pool
, pjobid
)) {
480 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
484 pd
= find_private_data_by_jobid(*pjobid
);
486 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
492 /* Is this a jobid with an aiocb we're interested in ? */
493 for (i
= 0; i
< sp
->num_entries
; i
++) {
494 if (sp
->aiocb_array
[i
] == pd
->aiocb
) {
501 /* Jobid completed we weren't waiting for.
502 We must reshedule this as an immediate event
503 on the main event context. */
504 im
= tevent_create_immediate(NULL
);
506 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
509 DEBUG(10,("aio_pthread_handle_suspend_completion: "
510 "re-scheduling job id %d\n",
513 tevent_schedule_immediate(im
,
514 server_event_context(),
515 aio_pthread_handle_immediate
,
520 static void aio_pthread_suspend_timed_out(struct tevent_context
*event_ctx
,
521 struct tevent_timer
*te
,
525 bool *timed_out
= (bool *)private_data
;
526 /* Remove this timed event handler. */
531 /************************************************************************
532 Called to request everything to stop until all IO is completed.
533 ***********************************************************************/
535 static int aio_pthread_suspend(struct vfs_handle_struct
*handle
,
536 struct files_struct
*fsp
,
537 const SMB_STRUCT_AIOCB
* const aiocb_array
[],
539 const struct timespec
*timeout
)
541 struct event_context
*ev
= NULL
;
542 struct fd_event
*sock_event
= NULL
;
544 struct suspend_private sp
;
545 bool timed_out
= false;
546 TALLOC_CTX
*frame
= talloc_stackframe();
548 /* This is a blocking call, and has to use a sub-event loop. */
549 ev
= event_context_init(frame
);
556 struct timeval tv
= convert_timespec_to_timeval(*timeout
);
557 struct tevent_timer
*te
= tevent_add_timer(ev
,
559 timeval_current_ofs(tv
.tv_sec
,
561 aio_pthread_suspend_timed_out
,
571 sp
.aiocb_array
= aiocb_array
;
574 sock_event
= tevent_add_fd(ev
,
576 pthreadpool_signal_fd(pool
),
578 aio_pthread_handle_suspend_completion
,
580 if (sock_event
== NULL
) {
581 pthreadpool_destroy(pool
);
586 * We're going to cheat here. We know that smbd/aio.c
587 * only calls this when it's waiting for every single
588 * outstanding call to finish on a close, so just wait
589 * individually for each IO to complete. We don't care
590 * what order they finish - only that they all do. JRA.
592 while (sp
.num_entries
!= sp
.num_finished
) {
593 if (tevent_loop_once(ev
) == -1) {
611 static struct vfs_fn_pointers vfs_aio_pthread_fns
= {
612 .aio_read
= aio_pthread_read
,
613 .aio_write
= aio_pthread_write
,
614 .aio_return_fn
= aio_pthread_return_fn
,
615 .aio_cancel
= aio_pthread_cancel
,
616 .aio_error_fn
= aio_pthread_error_fn
,
617 .aio_suspend
= aio_pthread_suspend
,
620 NTSTATUS
vfs_aio_pthread_init(void);
621 NTSTATUS
vfs_aio_pthread_init(void)
623 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION
,
624 "aio_pthread", &vfs_aio_pthread_fns
);