Allow init_aio_threadpool() to be setup for different threadpool handles with differe...
[Samba/gebeck_regimport.git] / source3 / modules / vfs_aio_pthread.c
blob71678186088a3b8a4a3ee7f7c8e88b20cd6d7616
1 /*
2 * Simulate Posix AIO using pthreads.
4 * Based on the aio_fork work from Volker and Volker's pthreadpool library.
6 * Copyright (C) Volker Lendecke 2008
7 * Copyright (C) Jeremy Allison 2012
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
31 struct aio_extra;
32 static struct pthreadpool *pool;
33 static int aio_pthread_jobid;
35 struct aio_private_data {
36 struct aio_private_data *prev, *next;
37 int jobid;
38 SMB_STRUCT_AIOCB *aiocb;
39 ssize_t ret_size;
40 int ret_errno;
41 bool cancelled;
42 bool write_command;
45 /* List of outstanding requests we have. */
46 static struct aio_private_data *pd_list;
48 static void aio_pthread_handle_completion(struct event_context *event_ctx,
49 struct fd_event *event,
50 uint16 flags,
51 void *p);
54 /************************************************************************
55 Ensure thread pool is initialized.
56 ***********************************************************************/
58 static bool init_aio_threadpool(struct event_context *ev_ctx,
59 struct pthreadpool **pp_pool,
60 void (*completion_fn)(struct event_context *,
61 struct fd_event *,
62 uint16,
63 void *))
65 struct fd_event *sock_event = NULL;
66 int ret = 0;
68 if (*pp_pool) {
69 return true;
72 ret = pthreadpool_init(aio_pending_size, pp_pool);
73 if (ret) {
74 errno = ret;
75 return false;
77 sock_event = tevent_add_fd(ev_ctx,
78 NULL,
79 pthreadpool_signal_fd(*pp_pool),
80 TEVENT_FD_READ,
81 completion_fn,
82 NULL);
83 if (sock_event == NULL) {
84 pthreadpool_destroy(*pp_pool);
85 *pp_pool = NULL;
86 return false;
89 DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
90 aio_pending_size));
92 return true;
96 /************************************************************************
97 Worker function - core of the pthread aio engine.
98 This is the function that actually does the IO.
99 ***********************************************************************/
101 static void aio_worker(void *private_data)
103 struct aio_private_data *pd =
104 (struct aio_private_data *)private_data;
106 if (pd->write_command) {
107 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
108 (const void *)pd->aiocb->aio_buf,
109 pd->aiocb->aio_nbytes,
110 pd->aiocb->aio_offset);
111 if (pd->ret_size == -1 && errno == ESPIPE) {
112 /* Maintain the fiction that pipes can
113 be seeked (sought?) on. */
114 pd->ret_size = sys_write(pd->aiocb->aio_fildes,
115 (const void *)pd->aiocb->aio_buf,
116 pd->aiocb->aio_nbytes);
118 } else {
119 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
120 (void *)pd->aiocb->aio_buf,
121 pd->aiocb->aio_nbytes,
122 pd->aiocb->aio_offset);
123 if (pd->ret_size == -1 && errno == ESPIPE) {
124 /* Maintain the fiction that pipes can
125 be seeked (sought?) on. */
126 pd->ret_size = sys_read(pd->aiocb->aio_fildes,
127 (void *)pd->aiocb->aio_buf,
128 pd->aiocb->aio_nbytes);
131 if (pd->ret_size == -1) {
132 pd->ret_errno = errno;
133 } else {
134 pd->ret_errno = 0;
138 /************************************************************************
139 Private data destructor.
140 ***********************************************************************/
142 static int pd_destructor(struct aio_private_data *pd)
144 DLIST_REMOVE(pd_list, pd);
145 return 0;
148 /************************************************************************
149 Create and initialize a private data struct.
150 ***********************************************************************/
152 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
153 SMB_STRUCT_AIOCB *aiocb)
155 struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
156 if (!pd) {
157 return NULL;
159 pd->jobid = aio_pthread_jobid++;
160 pd->aiocb = aiocb;
161 pd->ret_size = -1;
162 pd->ret_errno = EINPROGRESS;
163 talloc_set_destructor(pd, pd_destructor);
164 DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
165 return pd;
168 /************************************************************************
169 Spin off a threadpool (if needed) and initiate a pread call.
170 ***********************************************************************/
172 static int aio_pthread_read(struct vfs_handle_struct *handle,
173 struct files_struct *fsp,
174 SMB_STRUCT_AIOCB *aiocb)
176 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
177 struct aio_private_data *pd = NULL;
178 int ret;
180 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
181 &pool,
182 aio_pthread_handle_completion)) {
183 return -1;
186 pd = create_private_data(aio_ex, aiocb);
187 if (pd == NULL) {
188 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
189 return -1;
192 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
193 if (ret) {
194 errno = ret;
195 return -1;
198 DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
199 "of %llu bytes at offset %llu\n",
200 pd->jobid,
201 (unsigned long long)pd->aiocb->aio_nbytes,
202 (unsigned long long)pd->aiocb->aio_offset));
204 return 0;
207 /************************************************************************
208 Spin off a threadpool (if needed) and initiate a pwrite call.
209 ***********************************************************************/
211 static int aio_pthread_write(struct vfs_handle_struct *handle,
212 struct files_struct *fsp,
213 SMB_STRUCT_AIOCB *aiocb)
215 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
216 struct aio_private_data *pd = NULL;
217 int ret;
219 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
220 &pool,
221 aio_pthread_handle_completion)) {
222 return -1;
225 pd = create_private_data(aio_ex, aiocb);
226 if (pd == NULL) {
227 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
228 return -1;
231 pd->write_command = true;
233 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
234 if (ret) {
235 errno = ret;
236 return -1;
239 DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
240 "of %llu bytes at offset %llu\n",
241 pd->jobid,
242 (unsigned long long)pd->aiocb->aio_nbytes,
243 (unsigned long long)pd->aiocb->aio_offset));
245 return 0;
248 /************************************************************************
249 Find the private data by jobid.
250 ***********************************************************************/
252 static struct aio_private_data *find_private_data_by_jobid(int jobid)
254 struct aio_private_data *pd;
256 for (pd = pd_list; pd != NULL; pd = pd->next) {
257 if (pd->jobid == jobid) {
258 return pd;
262 return NULL;
265 /************************************************************************
266 Callback when an IO completes.
267 ***********************************************************************/
269 static void aio_pthread_handle_completion(struct event_context *event_ctx,
270 struct fd_event *event,
271 uint16 flags,
272 void *p)
274 struct aio_extra *aio_ex = NULL;
275 struct aio_private_data *pd = NULL;
276 int jobid = 0;
277 int ret;
279 DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
280 (int)flags));
282 if ((flags & EVENT_FD_READ) == 0) {
283 return;
286 ret = pthreadpool_finished_job(pool, &jobid);
287 if (ret) {
288 smb_panic("aio_pthread_handle_completion");
289 return;
292 pd = find_private_data_by_jobid(jobid);
293 if (pd == NULL) {
294 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
295 jobid));
296 return;
299 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
300 smbd_aio_complete_aio_ex(aio_ex);
302 DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
303 jobid ));
304 TALLOC_FREE(aio_ex);
307 /************************************************************************
308 Find the private data by aiocb.
309 ***********************************************************************/
311 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
313 struct aio_private_data *pd;
315 for (pd = pd_list; pd != NULL; pd = pd->next) {
316 if (pd->aiocb == aiocb) {
317 return pd;
321 return NULL;
324 /************************************************************************
325 Called to return the result of a completed AIO.
326 Should only be called if aio_error returns something other than EINPROGRESS.
327 Returns:
328 Any other value - return from IO operation.
329 ***********************************************************************/
331 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
332 struct files_struct *fsp,
333 SMB_STRUCT_AIOCB *aiocb)
335 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
337 if (pd == NULL) {
338 errno = EINVAL;
339 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
340 return -1;
343 pd->aiocb = NULL;
345 if (pd->cancelled) {
346 errno = ECANCELED;
347 return -1;
350 if (pd->ret_size == -1) {
351 errno = pd->ret_errno;
354 return pd->ret_size;
357 /************************************************************************
358 Called to check the result of an AIO.
359 Returns:
360 EINPROGRESS - still in progress.
361 EINVAL - invalid aiocb.
362 ECANCELED - request was cancelled.
363 0 - request completed successfully.
364 Any other value - errno from IO operation.
365 ***********************************************************************/
367 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
368 struct files_struct *fsp,
369 SMB_STRUCT_AIOCB *aiocb)
371 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
373 if (pd == NULL) {
374 return EINVAL;
376 if (pd->cancelled) {
377 return ECANCELED;
379 return pd->ret_errno;
382 /************************************************************************
383 Called to request the cancel of an AIO, or all of them on a specific
384 fsp if aiocb == NULL.
385 ***********************************************************************/
387 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
388 struct files_struct *fsp,
389 SMB_STRUCT_AIOCB *aiocb)
391 struct aio_private_data *pd = NULL;
393 for (pd = pd_list; pd != NULL; pd = pd->next) {
394 if (pd->aiocb == NULL) {
395 continue;
397 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
398 continue;
400 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
401 continue;
405 * We let the child do its job, but we discard the result when
406 * it's finished.
409 pd->cancelled = true;
412 return AIO_CANCELED;
415 /************************************************************************
416 Callback for a previously detected job completion.
417 ***********************************************************************/
419 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
420 struct tevent_immediate *im,
421 void *private_data)
423 struct aio_extra *aio_ex = NULL;
424 struct aio_private_data *pd = (struct aio_private_data *)private_data;
426 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
427 smbd_aio_complete_aio_ex(aio_ex);
428 TALLOC_FREE(aio_ex);
431 /************************************************************************
432 Private data struct used in suspend completion code.
433 ***********************************************************************/
435 struct suspend_private {
436 int num_entries;
437 int num_finished;
438 const SMB_STRUCT_AIOCB * const *aiocb_array;
441 /************************************************************************
442 Callback when an IO completes from a suspend call.
443 ***********************************************************************/
445 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
446 struct fd_event *event,
447 uint16 flags,
448 void *p)
450 struct suspend_private *sp = (struct suspend_private *)p;
451 struct aio_private_data *pd = NULL;
452 struct tevent_immediate *im = NULL;
453 int jobid;
454 int i;
456 DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
457 (int)flags));
459 if ((flags & EVENT_FD_READ) == 0) {
460 return;
463 if (pthreadpool_finished_job(pool, &jobid)) {
464 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
465 return;
468 pd = find_private_data_by_jobid(jobid);
469 if (pd == NULL) {
470 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
471 jobid));
472 return;
475 /* Is this a jobid with an aiocb we're interested in ? */
476 for (i = 0; i < sp->num_entries; i++) {
477 if (sp->aiocb_array[i] == pd->aiocb) {
478 sp->num_finished++;
479 return;
483 /* Jobid completed we weren't waiting for.
484 We must reschedule this as an immediate event
485 on the main event context. */
486 im = tevent_create_immediate(NULL);
487 if (!im) {
488 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
491 DEBUG(10,("aio_pthread_handle_suspend_completion: "
492 "re-scheduling job id %d\n",
493 jobid));
495 tevent_schedule_immediate(im,
496 server_event_context(),
497 aio_pthread_handle_immediate,
498 (void *)pd);
502 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
503 struct tevent_timer *te,
504 struct timeval now,
505 void *private_data)
507 bool *timed_out = (bool *)private_data;
508 /* Remove this timed event handler. */
509 TALLOC_FREE(te);
510 *timed_out = true;
513 /************************************************************************
514 Called to request everything to stop until all IO is completed.
515 ***********************************************************************/
517 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
518 struct files_struct *fsp,
519 const SMB_STRUCT_AIOCB * const aiocb_array[],
520 int n,
521 const struct timespec *timeout)
523 struct event_context *ev = NULL;
524 struct fd_event *sock_event = NULL;
525 int ret = -1;
526 struct suspend_private sp;
527 bool timed_out = false;
528 TALLOC_CTX *frame = talloc_stackframe();
530 /* This is a blocking call, and has to use a sub-event loop. */
531 ev = event_context_init(frame);
532 if (ev == NULL) {
533 errno = ENOMEM;
534 goto out;
537 if (timeout) {
538 struct timeval tv = convert_timespec_to_timeval(*timeout);
539 struct tevent_timer *te = tevent_add_timer(ev,
540 frame,
541 timeval_current_ofs(tv.tv_sec,
542 tv.tv_usec),
543 aio_pthread_suspend_timed_out,
544 &timed_out);
545 if (!te) {
546 errno = ENOMEM;
547 goto out;
551 ZERO_STRUCT(sp);
552 sp.num_entries = n;
553 sp.aiocb_array = aiocb_array;
554 sp.num_finished = 0;
556 sock_event = tevent_add_fd(ev,
557 frame,
558 pthreadpool_signal_fd(pool),
559 TEVENT_FD_READ,
560 aio_pthread_handle_suspend_completion,
561 (void *)&sp);
562 if (sock_event == NULL) {
563 pthreadpool_destroy(pool);
564 pool = NULL;
565 goto out;
568 * We're going to cheat here. We know that smbd/aio.c
569 * only calls this when it's waiting for every single
570 * outstanding call to finish on a close, so just wait
571 * individually for each IO to complete. We don't care
572 * what order they finish - only that they all do. JRA.
574 while (sp.num_entries != sp.num_finished) {
575 if (tevent_loop_once(ev) == -1) {
576 goto out;
579 if (timed_out) {
580 errno = EAGAIN;
581 goto out;
585 ret = 0;
587 out:
589 TALLOC_FREE(frame);
590 return ret;
593 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
594 const char *user)
596 /*********************************************************************
597 * How many threads to initialize ?
598 * 100 per process seems insane as a default until you realize that
599 * (a) Threads terminate after 1 second when idle.
600 * (b) Throttling is done in SMB2 via the crediting algorithm.
601 * (c) SMB1 clients are limited to max_mux (50) outstanding
602 * requests and Windows clients don't use this anyway.
603 * Essentially we want this to be unlimited unless smb.conf
604 * says different.
605 *********************************************************************/
606 aio_pending_size = lp_parm_int(
607 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
608 return SMB_VFS_NEXT_CONNECT(handle, service, user);
611 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
612 .connect_fn = aio_pthread_connect,
613 .aio_read_fn = aio_pthread_read,
614 .aio_write_fn = aio_pthread_write,
615 .aio_return_fn = aio_pthread_return_fn,
616 .aio_cancel_fn = aio_pthread_cancel,
617 .aio_error_fn = aio_pthread_error_fn,
618 .aio_suspend_fn = aio_pthread_suspend,
621 NTSTATUS vfs_aio_pthread_init(void);
622 NTSTATUS vfs_aio_pthread_init(void)
624 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
625 "aio_pthread", &vfs_aio_pthread_fns);