Backport FSCTL codes from master
[Samba.git] / source3 / modules / vfs_aio_pthread.c
blobceef822d008f234589863e62bff6a9b9167eaa1e
1 /*
2 * Simulate Posix AIO using pthreads.
4 * Based on the aio_fork work from Volker and Volker's pthreadpool library.
6 * Copyright (C) Volker Lendecke 2008
7 * Copyright (C) Jeremy Allison 2012
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "lib/pthreadpool/pthreadpool.h"
30 struct aio_extra;
31 static struct pthreadpool *pool;
32 static int aio_pthread_jobid;
34 struct aio_private_data {
35 struct aio_private_data *prev, *next;
36 int jobid;
37 SMB_STRUCT_AIOCB *aiocb;
38 ssize_t ret_size;
39 int ret_errno;
40 bool cancelled;
41 bool write_command;
44 /* List of outstanding requests we have. */
45 static struct aio_private_data *pd_list;
47 static void aio_pthread_handle_completion(struct event_context *event_ctx,
48 struct fd_event *event,
49 uint16 flags,
50 void *p);
52 /************************************************************************
53 How many threads to initialize ?
54 100 per process seems insane as a default until you realize that
55 (a) Threads terminate after 1 second when idle.
56 (b) Throttling is done in SMB2 via the crediting algorithm.
57 (c) SMB1 clients are limited to max_mux (50) outstanding requests and
58 Windows clients don't use this anyway.
59 Essentially we want this to be unlimited unless smb.conf says different.
60 ***********************************************************************/
62 static int aio_get_num_threads(struct vfs_handle_struct *handle)
64 return lp_parm_int(SNUM(handle->conn),
65 "aio_pthread", "aio num threads", 100);
68 /************************************************************************
69 Ensure thread pool is initialized.
70 ***********************************************************************/
72 static bool init_aio_threadpool(struct vfs_handle_struct *handle)
74 struct fd_event *sock_event = NULL;
75 int ret = 0;
76 int num_threads;
78 if (pool) {
79 return true;
82 num_threads = aio_get_num_threads(handle);
83 ret = pthreadpool_init(num_threads, &pool);
84 if (ret) {
85 errno = ret;
86 return false;
88 sock_event = tevent_add_fd(server_event_context(),
89 NULL,
90 pthreadpool_signal_fd(pool),
91 TEVENT_FD_READ,
92 aio_pthread_handle_completion,
93 NULL);
94 if (sock_event == NULL) {
95 pthreadpool_destroy(pool);
96 pool = NULL;
97 return false;
100 DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
101 num_threads));
103 return true;
107 /************************************************************************
108 Worker function - core of the pthread aio engine.
109 This is the function that actually does the IO.
110 ***********************************************************************/
112 static void aio_worker(void *private_data)
114 struct aio_private_data *pd =
115 (struct aio_private_data *)private_data;
117 if (pd->write_command) {
118 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
119 (const void *)pd->aiocb->aio_buf,
120 pd->aiocb->aio_nbytes,
121 pd->aiocb->aio_offset);
122 if (pd->ret_size == -1 && errno == ESPIPE) {
123 /* Maintain the fiction that pipes can
124 be seeked (sought?) on. */
125 pd->ret_size = sys_write(pd->aiocb->aio_fildes,
126 (const void *)pd->aiocb->aio_buf,
127 pd->aiocb->aio_nbytes);
129 } else {
130 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
131 (void *)pd->aiocb->aio_buf,
132 pd->aiocb->aio_nbytes,
133 pd->aiocb->aio_offset);
134 if (pd->ret_size == -1 && errno == ESPIPE) {
135 /* Maintain the fiction that pipes can
136 be seeked (sought?) on. */
137 pd->ret_size = sys_read(pd->aiocb->aio_fildes,
138 (void *)pd->aiocb->aio_buf,
139 pd->aiocb->aio_nbytes);
142 if (pd->ret_size == -1) {
143 pd->ret_errno = errno;
144 } else {
145 pd->ret_errno = 0;
149 /************************************************************************
150 Private data destructor.
151 ***********************************************************************/
153 static int pd_destructor(struct aio_private_data *pd)
155 DLIST_REMOVE(pd_list, pd);
156 return 0;
159 /************************************************************************
160 Create and initialize a private data struct.
161 ***********************************************************************/
163 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
164 SMB_STRUCT_AIOCB *aiocb)
166 struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
167 if (!pd) {
168 return NULL;
170 pd->jobid = aio_pthread_jobid++;
171 pd->aiocb = aiocb;
172 pd->ret_size = -1;
173 pd->ret_errno = EINPROGRESS;
174 talloc_set_destructor(pd, pd_destructor);
175 DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
176 return pd;
179 /************************************************************************
180 Spin off a threadpool (if needed) and initiate a pread call.
181 ***********************************************************************/
183 static int aio_pthread_read(struct vfs_handle_struct *handle,
184 struct files_struct *fsp,
185 SMB_STRUCT_AIOCB *aiocb)
187 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
188 struct aio_private_data *pd = NULL;
189 int ret;
191 if (!init_aio_threadpool(handle)) {
192 return -1;
195 pd = create_private_data(aio_ex, aiocb);
196 if (pd == NULL) {
197 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
198 return -1;
201 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
202 if (ret) {
203 errno = ret;
204 return -1;
207 DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
208 "of %llu bytes at offset %llu\n",
209 pd->jobid,
210 (unsigned long long)pd->aiocb->aio_nbytes,
211 (unsigned long long)pd->aiocb->aio_offset));
213 return 0;
216 /************************************************************************
217 Spin off a threadpool (if needed) and initiate a pwrite call.
218 ***********************************************************************/
220 static int aio_pthread_write(struct vfs_handle_struct *handle,
221 struct files_struct *fsp,
222 SMB_STRUCT_AIOCB *aiocb)
224 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
225 struct aio_private_data *pd = NULL;
226 int ret;
228 if (!init_aio_threadpool(handle)) {
229 return -1;
232 pd = create_private_data(aio_ex, aiocb);
233 if (pd == NULL) {
234 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
235 return -1;
238 pd->write_command = true;
240 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
241 if (ret) {
242 errno = ret;
243 return -1;
246 DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
247 "of %llu bytes at offset %llu\n",
248 pd->jobid,
249 (unsigned long long)pd->aiocb->aio_nbytes,
250 (unsigned long long)pd->aiocb->aio_offset));
252 return 0;
255 /************************************************************************
256 Find the private data by jobid.
257 ***********************************************************************/
259 static struct aio_private_data *find_private_data_by_jobid(int jobid)
261 struct aio_private_data *pd;
263 for (pd = pd_list; pd != NULL; pd = pd->next) {
264 if (pd->jobid == jobid) {
265 return pd;
269 return NULL;
272 /************************************************************************
273 Callback when an IO completes.
274 ***********************************************************************/
276 static void aio_pthread_handle_completion(struct event_context *event_ctx,
277 struct fd_event *event,
278 uint16 flags,
279 void *p)
281 struct aio_extra *aio_ex = NULL;
282 struct aio_private_data *pd = NULL;
283 int jobid = 0;
284 int ret;
286 DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
287 (int)flags));
289 if ((flags & EVENT_FD_READ) == 0) {
290 return;
293 ret = pthreadpool_finished_job(pool, &jobid);
294 if (ret) {
295 smb_panic("aio_pthread_handle_completion");
296 return;
299 pd = find_private_data_by_jobid(jobid);
300 if (pd == NULL) {
301 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
302 jobid));
303 return;
306 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
307 smbd_aio_complete_aio_ex(aio_ex);
309 DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
310 jobid ));
311 TALLOC_FREE(aio_ex);
314 /************************************************************************
315 Find the private data by aiocb.
316 ***********************************************************************/
318 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
320 struct aio_private_data *pd;
322 for (pd = pd_list; pd != NULL; pd = pd->next) {
323 if (pd->aiocb == aiocb) {
324 return pd;
328 return NULL;
331 /************************************************************************
332 Called to return the result of a completed AIO.
333 Should only be called if aio_error returns something other than EINPROGRESS.
334 Returns:
335 Any other value - return from IO operation.
336 ***********************************************************************/
338 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
339 struct files_struct *fsp,
340 SMB_STRUCT_AIOCB *aiocb)
342 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
344 if (pd == NULL) {
345 errno = EINVAL;
346 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
347 return -1;
350 pd->aiocb = NULL;
352 if (pd->ret_size == -1) {
353 errno = pd->ret_errno;
356 return pd->ret_size;
359 /************************************************************************
360 Called to check the result of an AIO.
361 Returns:
362 EINPROGRESS - still in progress.
363 EINVAL - invalid aiocb.
364 ECANCELED - request was cancelled.
365 0 - request completed successfully.
366 Any other value - errno from IO operation.
367 ***********************************************************************/
369 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
370 struct files_struct *fsp,
371 SMB_STRUCT_AIOCB *aiocb)
373 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
375 if (pd == NULL) {
376 return EINVAL;
378 if (pd->cancelled) {
379 return ECANCELED;
381 return pd->ret_errno;
384 /************************************************************************
385 Called to request the cancel of an AIO, or all of them on a specific
386 fsp if aiocb == NULL.
387 ***********************************************************************/
389 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
390 struct files_struct *fsp,
391 SMB_STRUCT_AIOCB *aiocb)
393 struct aio_private_data *pd = NULL;
395 for (pd = pd_list; pd != NULL; pd = pd->next) {
396 if (pd->aiocb == NULL) {
397 continue;
399 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
400 continue;
402 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
403 continue;
407 * We let the child do its job, but we discard the result when
408 * it's finished.
411 pd->cancelled = true;
414 return AIO_CANCELED;
417 /************************************************************************
418 Callback for a previously detected job completion.
419 ***********************************************************************/
421 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
422 struct tevent_immediate *im,
423 void *private_data)
425 struct aio_extra *aio_ex = NULL;
426 int *pjobid = (int *)private_data;
427 struct aio_private_data *pd = find_private_data_by_jobid(*pjobid);
429 if (pd == NULL) {
430 DEBUG(1, ("aio_pthread_handle_immediate cannot find jobid %d\n",
431 *pjobid));
432 TALLOC_FREE(pjobid);
433 return;
436 TALLOC_FREE(pjobid);
437 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
438 smbd_aio_complete_aio_ex(aio_ex);
439 TALLOC_FREE(aio_ex);
442 /************************************************************************
443 Private data struct used in suspend completion code.
444 ***********************************************************************/
446 struct suspend_private {
447 int num_entries;
448 int num_finished;
449 const SMB_STRUCT_AIOCB * const *aiocb_array;
452 /************************************************************************
453 Callback when an IO completes from a suspend call.
454 ***********************************************************************/
456 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
457 struct fd_event *event,
458 uint16 flags,
459 void *p)
461 struct suspend_private *sp = (struct suspend_private *)p;
462 struct aio_private_data *pd = NULL;
463 struct tevent_immediate *im = NULL;
464 int *pjobid = NULL;
465 int i;
467 DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
468 (int)flags));
470 if ((flags & EVENT_FD_READ) == 0) {
471 return;
474 pjobid = talloc_array(NULL, int, 1);
475 if (pjobid == NULL) {
476 smb_panic("aio_pthread_handle_suspend_completion: no memory.");
479 if (pthreadpool_finished_job(pool, pjobid)) {
480 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
481 return;
484 pd = find_private_data_by_jobid(*pjobid);
485 if (pd == NULL) {
486 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
487 *pjobid));
488 TALLOC_FREE(pjobid);
489 return;
492 /* Is this a jobid with an aiocb we're interested in ? */
493 for (i = 0; i < sp->num_entries; i++) {
494 if (sp->aiocb_array[i] == pd->aiocb) {
495 sp->num_finished++;
496 TALLOC_FREE(pjobid);
497 return;
501 /* Jobid completed we weren't waiting for.
502 We must reshedule this as an immediate event
503 on the main event context. */
504 im = tevent_create_immediate(NULL);
505 if (!im) {
506 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
509 DEBUG(10,("aio_pthread_handle_suspend_completion: "
510 "re-scheduling job id %d\n",
511 *pjobid));
513 tevent_schedule_immediate(im,
514 server_event_context(),
515 aio_pthread_handle_immediate,
516 (void *)pjobid);
520 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
521 struct tevent_timer *te,
522 struct timeval now,
523 void *private_data)
525 bool *timed_out = (bool *)private_data;
526 /* Remove this timed event handler. */
527 TALLOC_FREE(te);
528 *timed_out = true;
531 /************************************************************************
532 Called to request everything to stop until all IO is completed.
533 ***********************************************************************/
535 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
536 struct files_struct *fsp,
537 const SMB_STRUCT_AIOCB * const aiocb_array[],
538 int n,
539 const struct timespec *timeout)
541 struct event_context *ev = NULL;
542 struct fd_event *sock_event = NULL;
543 int ret = -1;
544 struct suspend_private sp;
545 bool timed_out = false;
546 TALLOC_CTX *frame = talloc_stackframe();
548 /* This is a blocking call, and has to use a sub-event loop. */
549 ev = event_context_init(frame);
550 if (ev == NULL) {
551 errno = ENOMEM;
552 goto out;
555 if (timeout) {
556 struct timeval tv = convert_timespec_to_timeval(*timeout);
557 struct tevent_timer *te = tevent_add_timer(ev,
558 frame,
559 timeval_current_ofs(tv.tv_sec,
560 tv.tv_usec),
561 aio_pthread_suspend_timed_out,
562 &timed_out);
563 if (!te) {
564 errno = ENOMEM;
565 goto out;
569 ZERO_STRUCT(sp);
570 sp.num_entries = n;
571 sp.aiocb_array = aiocb_array;
572 sp.num_finished = 0;
574 sock_event = tevent_add_fd(ev,
575 frame,
576 pthreadpool_signal_fd(pool),
577 TEVENT_FD_READ,
578 aio_pthread_handle_suspend_completion,
579 (void *)&sp);
580 if (sock_event == NULL) {
581 pthreadpool_destroy(pool);
582 pool = NULL;
583 goto out;
586 * We're going to cheat here. We know that smbd/aio.c
587 * only calls this when it's waiting for every single
588 * outstanding call to finish on a close, so just wait
589 * individually for each IO to complete. We don't care
590 * what order they finish - only that they all do. JRA.
592 while (sp.num_entries != sp.num_finished) {
593 if (tevent_loop_once(ev) == -1) {
594 goto out;
597 if (timed_out) {
598 errno = EAGAIN;
599 goto out;
603 ret = 0;
605 out:
607 TALLOC_FREE(frame);
608 return ret;
611 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
612 .aio_read = aio_pthread_read,
613 .aio_write = aio_pthread_write,
614 .aio_return_fn = aio_pthread_return_fn,
615 .aio_cancel = aio_pthread_cancel,
616 .aio_error_fn = aio_pthread_error_fn,
617 .aio_suspend = aio_pthread_suspend,
620 NTSTATUS vfs_aio_pthread_init(void);
621 NTSTATUS vfs_aio_pthread_init(void)
623 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
624 "aio_pthread", &vfs_aio_pthread_fns);