2.5-18.1
[glibc.git] / rtkaio / sysdeps / unix / sysv / linux / kaio_misc.c
blob76e0c430b7d4ba5083c45f5884bef74009ca0e01
1 /* Handle general operations.
2 Copyright (C) 1997,1998,1999,2000,2001,2002,2003,2006
3 Free Software Foundation, Inc.
4 This file is part of the GNU C Library.
5 Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
7 The GNU C Library is free software; you can redistribute it and/or
8 modify it under the terms of the GNU Lesser General Public
9 License as published by the Free Software Foundation; either
10 version 2.1 of the License, or (at your option) any later version.
12 The GNU C Library is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with the GNU C Library; if not, write to the Free
19 Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20 02111-1307 USA. */
22 #include <kaio_misc.h>
24 #ifndef USE_KAIO
25 #include <aio_misc.c>
26 #else
28 #include <aio.h>
29 #include <assert.h>
30 #include <atomic.h>
31 #include <errno.h>
32 #include <limits.h>
33 #include <pthread.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <sys/stat.h>
37 #include <sys/time.h>
38 #include <sys/sysmacros.h>
40 #ifndef aio_create_helper_thread
41 # define aio_create_helper_thread __aio_create_helper_thread
43 extern inline int
44 __aio_create_helper_thread (pthread_t *threadp, void *(*tf) (void *), void *arg)
46 pthread_attr_t attr;
48 /* Make sure the thread is created detached. */
49 pthread_attr_init (&attr);
50 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
52 int ret = pthread_create (threadp, &attr, tf, arg);
54 (void) pthread_attr_destroy (&attr);
55 return ret;
58 #endif
61 static void add_request_to_runlist (struct requestlist *newrequest)
62 internal_function;
63 static int add_request_to_list (struct requestlist *newrequest, int fildes,
64 int prio)
65 internal_function;
66 static void * handle_kernel_aio (void *arg);
67 static void kernel_callback (kctx_t ctx, struct kiocb *kiocb, long res,
68 long res2);
70 /* Pool of request list entries. */
71 static struct requestlist **pool;
73 /* Number of total and allocated pool entries. */
74 static size_t pool_max_size;
75 static size_t pool_size;
77 /* Kernel AIO context. */
78 kctx_t __aio_kioctx = KCTX_NONE;
79 int __have_no_kernel_aio;
80 int __kernel_thread_started;
82 /* We implement a two dimensional array but allocate each row separately.
83 The macro below determines how many entries should be used per row.
84 It should better be a power of two. */
85 #define ENTRIES_PER_ROW 32
87 /* How many rows we allocate at once. */
88 #define ROWS_STEP 8
90 /* List of available entries. */
91 static struct requestlist *freelist;
93 /* List of request waiting to be processed. */
94 static struct requestlist *runlist;
96 /* Structure list of all currently processed requests. */
97 static struct requestlist *requests, *krequests;
99 /* Number of threads currently running. */
100 static int nthreads;
102 /* Number of threads waiting for work to arrive. */
103 static int idle_thread_count;
106 /* These are the values used to optimize the use of AIO. The user can
107 overwrite them by using the `aio_init' function. */
108 static struct aioinit optim =
110 20, /* int aio_threads; Maximal number of threads. */
111 64, /* int aio_num; Number of expected simultanious requests. */
121 /* Since the list is global we need a mutex protecting it. */
122 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
124 /* When you add a request to the list and there are idle threads present,
125 you signal this condition variable. When a thread finishes work, it waits
126 on this condition variable for a time before it actually exits. */
127 pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
130 /* Functions to handle request list pool. */
131 static struct requestlist *
132 get_elem (void)
134 struct requestlist *result;
136 if (freelist == NULL)
138 struct requestlist *new_row;
139 int cnt;
141 assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
143 if (pool_size + 1 >= pool_max_size)
145 size_t new_max_size = pool_max_size + ROWS_STEP;
146 struct requestlist **new_tab;
148 new_tab = (struct requestlist **)
149 realloc (pool, new_max_size * sizeof (struct requestlist *));
151 if (new_tab == NULL)
152 return NULL;
154 pool_max_size = new_max_size;
155 pool = new_tab;
158 /* Allocate the new row. */
159 cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW;
160 new_row = (struct requestlist *) calloc (cnt,
161 sizeof (struct requestlist));
162 if (new_row == NULL)
163 return NULL;
165 pool[pool_size++] = new_row;
167 /* Put all the new entries in the freelist. */
170 new_row->next_prio = freelist;
171 freelist = new_row++;
173 while (--cnt > 0);
176 result = freelist;
177 freelist = freelist->next_prio;
179 return result;
183 void
184 internal_function
185 __aio_free_request (struct requestlist *elem)
187 elem->running = no;
188 elem->next_prio = freelist;
189 freelist = elem;
193 struct requestlist *
194 internal_function
195 __aio_find_req (aiocb_union *elem)
197 struct requestlist *runp;
198 int fildes = elem->aiocb.aio_fildes;
199 int i;
201 for (i = 0; i < 2; i++)
203 runp = i ? requests : krequests;
205 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
206 runp = runp->next_fd;
208 if (runp != NULL)
210 if (runp->aiocbp->aiocb.aio_fildes != fildes)
211 runp = NULL;
212 else
213 while (runp != NULL && runp->aiocbp != elem)
214 runp = runp->next_prio;
215 if (runp != NULL)
216 return runp;
220 return NULL;
224 struct requestlist *
225 internal_function
226 __aio_find_req_fd (int fildes)
228 struct requestlist *runp = requests;
230 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
231 runp = runp->next_fd;
233 return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
234 ? runp : NULL);
238 struct requestlist *
239 internal_function
240 __aio_find_kreq_fd (int fildes)
242 struct requestlist *runp = krequests;
244 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
245 runp = runp->next_fd;
247 return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
248 ? runp : NULL);
252 void
253 internal_function
254 __aio_remove_request (struct requestlist *last, struct requestlist *req,
255 int all)
257 assert (req->running == yes || req->running == queued
258 || req->running == done);
259 assert (req->kioctx == KCTX_NONE);
261 if (last != NULL)
262 last->next_prio = all ? NULL : req->next_prio;
263 else
265 if (all || req->next_prio == NULL)
267 if (req->last_fd != NULL)
268 req->last_fd->next_fd = req->next_fd;
269 else
270 requests = req->next_fd;
271 if (req->next_fd != NULL)
272 req->next_fd->last_fd = req->last_fd;
274 else
276 if (req->last_fd != NULL)
277 req->last_fd->next_fd = req->next_prio;
278 else
279 requests = req->next_prio;
281 if (req->next_fd != NULL)
282 req->next_fd->last_fd = req->next_prio;
284 req->next_prio->last_fd = req->last_fd;
285 req->next_prio->next_fd = req->next_fd;
287 /* Mark this entry as runnable. */
288 req->next_prio->running = yes;
291 if (req->running == yes)
293 struct requestlist *runp = runlist;
295 last = NULL;
296 while (runp != NULL)
298 if (runp == req)
300 if (last == NULL)
301 runlist = runp->next_run;
302 else
303 last->next_run = runp->next_run;
304 break;
306 last = runp;
307 runp = runp->next_run;
313 void
314 internal_function
315 __aio_remove_krequest (struct requestlist *req)
317 assert (req->running == yes || req->running == queued
318 || req->running == done);
319 assert (req->kioctx != KCTX_NONE);
321 if (req->prev_prio != NULL)
323 req->prev_prio->next_prio = req->next_prio;
324 if (req->next_prio != NULL)
325 req->next_prio->prev_prio = req->prev_prio;
327 else if (req->next_prio == NULL)
329 if (req->last_fd != NULL)
330 req->last_fd->next_fd = req->next_fd;
331 else
332 krequests = req->next_fd;
333 if (req->next_fd != NULL)
334 req->next_fd->last_fd = req->last_fd;
336 else
338 if (req->last_fd != NULL)
339 req->last_fd->next_fd = req->next_prio;
340 else
341 krequests = req->next_prio;
342 if (req->next_fd != NULL)
343 req->next_fd->last_fd = req->next_prio;
345 req->next_prio->prev_prio = NULL;
346 req->next_prio->last_fd = req->last_fd;
347 req->next_prio->next_fd = req->next_fd;
352 /* The thread handler. */
353 static void *handle_fildes_io (void *arg);
354 static int wait_for_kernel_requests (int fildes);
357 /* User optimization. */
358 void
359 __aio_init (const struct aioinit *init)
361 /* Get the mutex. */
362 pthread_mutex_lock (&__aio_requests_mutex);
364 /* Only allow writing new values if the table is not yet allocated. */
365 if (pool == NULL)
367 optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
368 optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
369 ? ENTRIES_PER_ROW
370 : init->aio_num & ~ENTRIES_PER_ROW);
373 if (init->aio_idle_time != 0)
374 optim.aio_idle_time = init->aio_idle_time;
376 /* Release the mutex. */
377 pthread_mutex_unlock (&__aio_requests_mutex);
379 weak_alias (__aio_init, aio_init)
381 static void
382 kernel_callback (kctx_t ctx, struct kiocb *kiocb, long res, long res2)
384 struct requestlist *req = (struct requestlist *)kiocb;
385 long errcode = 0;
387 if (res < 0 && res > -1000)
389 errcode = -res;
390 res = -1;
392 req->aiocbp->aiocb.__return_value = res;
393 atomic_write_barrier ();
394 req->aiocbp->aiocb.__error_code = errcode;
395 __aio_notify (req);
396 assert (req->running == allocated);
397 req->running = done;
398 __aio_remove_krequest (req);
399 __aio_free_request (req);
402 void
403 internal_function
404 __aio_read_one_event (void)
406 struct kio_event ev[10];
407 struct timespec ts;
408 int count, i;
410 if (__aio_kioctx == KCTX_NONE)
411 return;
412 ts.tv_sec = 0;
413 ts.tv_nsec = 0;
416 INTERNAL_SYSCALL_DECL (err);
417 count = INTERNAL_SYSCALL (io_getevents, err, 5, __aio_kioctx, 0, 10,
418 ev, &ts);
419 if (INTERNAL_SYSCALL_ERROR_P (count, err) || count == 0)
420 break;
421 pthread_mutex_lock (&__aio_requests_mutex);
422 for (i = 0; i < count; i++)
424 void (*cb)(kctx_t, struct kiocb *, long, long);
426 cb = (void *) (uintptr_t) ev[i].kioe_data;
427 cb (__aio_kioctx, (struct kiocb *) (uintptr_t) ev[i].kioe_obj,
428 ev[i].kioe_res, ev[i].kioe_res2);
430 pthread_mutex_unlock (&__aio_requests_mutex);
432 while (count == 10);
436 internal_function
437 __aio_wait_for_events (kctx_t kctx, const struct timespec *timespec)
439 int ret, i;
440 struct kio_event ev[10];
441 struct timespec ts;
442 INTERNAL_SYSCALL_DECL (err);
444 pthread_mutex_unlock (&__aio_requests_mutex);
445 ts.tv_sec = 0;
446 ts.tv_nsec = 0;
449 ret = INTERNAL_SYSCALL (io_getevents, err, 5, kctx, 1, 10, ev,
450 timespec);
451 if (INTERNAL_SYSCALL_ERROR_P (ret, err) || ret == 0)
452 break;
454 pthread_mutex_lock (&__aio_requests_mutex);
455 for (i = 0; i < ret; i++)
457 void (*cb)(kctx_t, struct kiocb *, long, long);
459 cb = (void *) (uintptr_t) ev[i].kioe_data;
460 cb (kctx, (struct kiocb *) (uintptr_t) ev[i].kioe_obj,
461 ev[i].kioe_res, ev[i].kioe_res2);
463 if (ret < 10)
464 return 0;
465 pthread_mutex_unlock (&__aio_requests_mutex);
466 timespec = &ts;
468 while (1);
470 pthread_mutex_lock (&__aio_requests_mutex);
471 return (timespec != &ts
472 && INTERNAL_SYSCALL_ERROR_P (ret, err)
473 && INTERNAL_SYSCALL_ERRNO (ret, err) == ETIMEDOUT) ? ETIMEDOUT : 0;
477 internal_function
478 __aio_create_kernel_thread (void)
480 pthread_t thid;
482 if (__kernel_thread_started)
483 return 0;
485 if (aio_create_helper_thread (&thid, handle_kernel_aio, NULL) != 0)
486 return -1;
487 __kernel_thread_started = 1;
488 return 0;
491 static void *
492 handle_kernel_aio (void *arg __attribute__((unused)))
494 int ret, i;
495 INTERNAL_SYSCALL_DECL (err);
496 struct kio_event ev[10];
498 for (;;)
500 ret = INTERNAL_SYSCALL (io_getevents, err, 5, __aio_kioctx, 1, 10, ev,
501 NULL);
502 if (INTERNAL_SYSCALL_ERROR_P (ret, err) || ret == 0)
503 continue;
504 pthread_mutex_lock (&__aio_requests_mutex);
505 for (i = 0; i < ret; i++)
507 void (*cb)(kctx_t, struct kiocb *, long, long);
509 cb = (void *) (uintptr_t) ev[i].kioe_data;
510 cb (__aio_kioctx, (struct kiocb *) (uintptr_t) ev[i].kioe_obj,
511 ev[i].kioe_res, ev[i].kioe_res2);
513 pthread_mutex_unlock (&__aio_requests_mutex);
515 return NULL;
518 static int
519 internal_function
520 add_request_to_list (struct requestlist *newp, int fildes, int prio)
522 struct requestlist *last, *runp, *reqs;
524 last = NULL;
525 reqs = newp->kioctx != KCTX_NONE ? krequests : requests;
526 runp = reqs;
528 /* First look whether the current file descriptor is currently
529 worked with. */
530 while (runp != NULL
531 && runp->aiocbp->aiocb.aio_fildes < fildes)
533 last = runp;
534 runp = runp->next_fd;
537 if (runp != NULL
538 && runp->aiocbp->aiocb.aio_fildes == fildes)
540 /* The current file descriptor is worked on. It makes no sense
541 to start another thread since this new thread would fight
542 with the running thread for the resources. But we also cannot
543 say that the thread processing this desriptor shall immediately
544 after finishing the current job process this request if there
545 are other threads in the running queue which have a higher
546 priority. */
548 /* Simply enqueue it after the running one according to the
549 priority. */
550 while (runp->next_prio != NULL
551 && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
552 runp = runp->next_prio;
554 newp->next_prio = runp->next_prio;
555 runp->next_prio = newp;
556 if (newp->kioctx != KCTX_NONE)
558 newp->prev_prio = runp;
559 if (newp->next_prio != NULL)
560 newp->next_prio->prev_prio = newp;
562 return queued;
564 else
566 /* Enqueue this request for a new descriptor. */
567 if (last == NULL)
569 newp->last_fd = NULL;
570 newp->next_fd = reqs;
571 if (reqs != NULL)
572 reqs->last_fd = newp;
573 if (newp->kioctx != KCTX_NONE)
574 krequests = newp;
575 else
576 requests = newp;
578 else
580 newp->next_fd = last->next_fd;
581 newp->last_fd = last;
582 last->next_fd = newp;
583 if (newp->next_fd != NULL)
584 newp->next_fd->last_fd = newp;
587 newp->next_prio = NULL;
588 if (newp->kioctx != KCTX_NONE)
589 newp->prev_prio = NULL;
590 return yes;
594 static int
595 internal_function
596 __aio_enqueue_user_request (struct requestlist *newp)
598 int result = 0;
599 int running = add_request_to_list (newp, newp->aiocbp->aiocb.aio_fildes,
600 newp->aiocbp->aiocb.__abs_prio);
602 if (running == yes)
604 /* We try to create a new thread for this file descriptor. The
605 function which gets called will handle all available requests
606 for this descriptor and when all are processed it will
607 terminate.
609 If no new thread can be created or if the specified limit of
610 threads for AIO is reached we queue the request. */
612 /* See if we need to and are able to create a thread. */
613 if (nthreads < optim.aio_threads && idle_thread_count == 0)
615 pthread_t thid;
617 running = newp->running = allocated;
619 /* Now try to start a thread. */
620 if (aio_create_helper_thread (&thid, handle_fildes_io, newp) == 0)
621 /* We managed to enqueue the request. All errors which can
622 happen now can be recognized by calls to `aio_return' and
623 `aio_error'. */
624 ++nthreads;
625 else
627 /* Reset the running flag. The new request is not running. */
628 running = newp->running = yes;
630 if (nthreads == 0)
631 /* We cannot create a thread in the moment and there is
632 also no thread running. This is a problem. `errno' is
633 set to EAGAIN if this is only a temporary problem. */
634 result = -1;
639 /* Enqueue the request in the run queue if it is not yet running. */
640 if (running == yes && result == 0)
642 add_request_to_runlist (newp);
644 /* If there is a thread waiting for work, then let it know that we
645 have just given it something to do. */
646 if (idle_thread_count > 0)
647 pthread_cond_signal (&__aio_new_request_notification);
650 if (result == 0)
651 newp->running = running;
652 return result;
655 /* The main function of the async I/O handling. It enqueues requests
656 and if necessary starts and handles threads. */
657 struct requestlist *
658 internal_function
659 __aio_enqueue_request_ctx (aiocb_union *aiocbp, int operation, kctx_t kctx)
661 int policy, prio;
662 struct sched_param param;
663 struct requestlist *newp;
664 int op = (operation & 0xffff);
666 if (op == LIO_SYNC || op == LIO_DSYNC)
668 aiocbp->aiocb.aio_reqprio = 0;
669 /* FIXME: Kernel doesn't support sync yet. */
670 operation &= ~LIO_KTHREAD;
671 kctx = KCTX_NONE;
673 else if (aiocbp->aiocb.aio_reqprio < 0
674 || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
676 /* Invalid priority value. */
677 __set_errno (EINVAL);
678 aiocbp->aiocb.__error_code = EINVAL;
679 aiocbp->aiocb.__return_value = -1;
680 return NULL;
683 if ((operation & LIO_KTHREAD) || kctx != KCTX_NONE)
685 /* io_* is only really asynchronous for O_DIRECT or /dev/raw*. */
686 int fl = __fcntl (aiocbp->aiocb.aio_fildes, F_GETFL);
687 if (fl < 0 || (fl & O_DIRECT) == 0)
689 struct stat64 st;
690 if (__fxstat64 (_STAT_VER, aiocbp->aiocb.aio_fildes, &st) < 0
691 || ! S_ISCHR (st.st_mode)
692 || major (st.st_rdev) != 162)
694 operation &= ~LIO_KTHREAD;
695 kctx = KCTX_NONE;
700 /* Compute priority for this request. */
701 pthread_getschedparam (pthread_self (), &policy, &param);
702 prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
704 /* Get the mutex. */
705 pthread_mutex_lock (&__aio_requests_mutex);
707 if (operation & LIO_KTHREAD)
709 if (__aio_kioctx == KCTX_NONE && !__have_no_kernel_aio)
711 int res;
712 INTERNAL_SYSCALL_DECL (err);
714 __aio_kioctx = 0;
716 res = INTERNAL_SYSCALL (io_setup, err, 2, 1024, &__aio_kioctx);
717 while (INTERNAL_SYSCALL_ERROR_P (res, err)
718 && INTERNAL_SYSCALL_ERRNO (res, err) == EINTR);
719 if (INTERNAL_SYSCALL_ERROR_P (res, err))
721 __have_no_kernel_aio = 1;
722 __aio_kioctx = KCTX_NONE;
726 kctx = __aio_kioctx;
728 if (kctx != KCTX_NONE && !__kernel_thread_started
729 && ((operation & LIO_KTHREAD_REQUIRED)
730 || aiocbp->aiocb.aio_sigevent.sigev_notify != SIGEV_NONE))
732 if (__aio_create_kernel_thread () < 0)
733 kctx = KCTX_NONE;
737 /* Get a new element for the waiting list. */
738 newp = get_elem ();
739 if (newp == NULL)
741 pthread_mutex_unlock (&__aio_requests_mutex);
742 __set_errno (EAGAIN);
743 return NULL;
745 newp->aiocbp = aiocbp;
746 #ifdef BROKEN_THREAD_SIGNALS
747 newp->caller_pid = (aiocbp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL
748 ? getpid () : 0);
749 #endif
750 newp->waiting = NULL;
751 newp->kioctx = kctx;
753 aiocbp->aiocb.__abs_prio = prio;
754 aiocbp->aiocb.__policy = policy;
755 aiocbp->aiocb.aio_lio_opcode = op;
756 aiocbp->aiocb.__error_code = EINPROGRESS;
757 aiocbp->aiocb.__return_value = 0;
759 if (newp->kioctx != KCTX_NONE)
761 int res;
762 INTERNAL_SYSCALL_DECL (err);
764 aiocb_union *aiocbp = newp->aiocbp;
765 struct kiocb *kiocbs[] __attribute__((unused)) = { &newp->kiocb };
767 newp->kiocb.kiocb_data = (uintptr_t) kernel_callback;
768 switch (op & 127)
770 case LIO_READ: newp->kiocb.kiocb_lio_opcode = IO_CMD_PREAD; break;
771 case LIO_WRITE: newp->kiocb.kiocb_lio_opcode = IO_CMD_PWRITE; break;
772 case LIO_SYNC:
773 case LIO_DSYNC: newp->kiocb.kiocb_lio_opcode = IO_CMD_FSYNC; break;
775 if (op & 128)
776 newp->kiocb.kiocb_offset = aiocbp->aiocb64.aio_offset;
777 else
778 newp->kiocb.kiocb_offset = aiocbp->aiocb.aio_offset;
779 newp->kiocb.kiocb_fildes = aiocbp->aiocb.aio_fildes;
780 newp->kiocb.kiocb_buf = (uintptr_t) aiocbp->aiocb.aio_buf;
781 newp->kiocb.kiocb_nbytes = aiocbp->aiocb.aio_nbytes;
782 /* FIXME. */
783 newp->kiocb.kiocb_req_prio = 0;
784 res = INTERNAL_SYSCALL (io_submit, err, 3, newp->kioctx, 1, kiocbs);
785 if (! INTERNAL_SYSCALL_ERROR_P (res, err))
787 newp->running = allocated;
788 add_request_to_list (newp, aiocbp->aiocb.aio_fildes, prio);
789 /* Release the mutex. */
790 pthread_mutex_unlock (&__aio_requests_mutex);
791 return newp;
793 newp->kioctx = KCTX_NONE;
796 if (__aio_enqueue_user_request (newp))
798 /* Something went wrong. */
799 __aio_free_request (newp);
800 newp = NULL;
803 /* Release the mutex. */
804 pthread_mutex_unlock (&__aio_requests_mutex);
806 return newp;
810 static int
811 wait_for_kernel_requests (int fildes)
813 pthread_mutex_lock (&__aio_requests_mutex);
815 struct requestlist *kreq = __aio_find_kreq_fd (fildes), *req;
816 int nent = 0;
817 int ret = 0;
819 req = kreq;
820 while (req)
822 if (req->running == allocated)
823 ++nent;
824 req = req->next_prio;
827 if (nent)
829 if (__aio_create_kernel_thread () < 0)
831 pthread_mutex_unlock (&__aio_requests_mutex);
832 return -1;
835 #ifndef DONT_NEED_AIO_MISC_COND
836 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
837 #endif
838 struct waitlist waitlist[nent];
839 int cnt = 0;
841 while (kreq)
843 if (kreq->running == allocated)
845 #ifndef DONT_NEED_AIO_MISC_COND
846 waitlist[cnt].cond = &cond;
847 #endif
848 waitlist[cnt].result = NULL;
849 waitlist[cnt].next = kreq->waiting;
850 waitlist[cnt].counterp = &nent;
851 waitlist[cnt].sigevp = NULL;
852 #ifdef BROKEN_THREAD_SIGNALS
853 waitlist[cnt].caller_pid = 0; /* Not needed. */
854 #endif
855 kreq->waiting = &waitlist[cnt++];
857 kreq = kreq->next_prio;
860 #ifdef DONT_NEED_AIO_MISC_COND
861 AIO_MISC_WAIT (ret, nent, NULL, 0);
862 #else
864 pthread_cond_wait (&cond, &__aio_requests_mutex);
865 while (nent);
867 pthread_cond_destroy (&cond);
868 #endif
871 pthread_mutex_unlock (&__aio_requests_mutex);
872 return ret;
876 static void *
877 handle_fildes_io (void *arg)
879 pthread_t self = pthread_self ();
880 struct sched_param param;
881 struct requestlist *runp = (struct requestlist *) arg;
882 aiocb_union *aiocbp;
883 int policy;
884 int fildes;
886 pthread_getschedparam (self, &policy, &param);
890 /* If runp is NULL, then we were created to service the work queue
891 in general, not to handle any particular request. In that case we
892 skip the "do work" stuff on the first pass, and go directly to the
893 "get work off the work queue" part of this loop, which is near the
894 end. */
895 if (runp == NULL)
896 pthread_mutex_lock (&__aio_requests_mutex);
897 else
899 /* Hopefully this request is marked as running. */
900 assert (runp->running == allocated);
902 /* Update our variables. */
903 aiocbp = runp->aiocbp;
904 fildes = aiocbp->aiocb.aio_fildes;
906 /* Change the priority to the requested value (if necessary). */
907 if (aiocbp->aiocb.__abs_prio != param.sched_priority
908 || aiocbp->aiocb.__policy != policy)
910 param.sched_priority = aiocbp->aiocb.__abs_prio;
911 policy = aiocbp->aiocb.__policy;
912 pthread_setschedparam (self, policy, &param);
915 /* Process request pointed to by RUNP. We must not be disturbed
916 by signals. */
917 if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
919 if (aiocbp->aiocb.aio_lio_opcode & 128)
920 aiocbp->aiocb.__return_value =
921 TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
922 aiocbp->aiocb64.aio_buf,
923 aiocbp->aiocb64.aio_nbytes,
924 aiocbp->aiocb64.aio_offset));
925 else
926 aiocbp->aiocb.__return_value =
927 TEMP_FAILURE_RETRY (pread (fildes,
928 (void *) aiocbp->aiocb.aio_buf,
929 aiocbp->aiocb.aio_nbytes,
930 aiocbp->aiocb.aio_offset));
932 if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
933 /* The Linux kernel is different from others. It returns
934 ESPIPE if using pread on a socket. Other platforms
935 simply ignore the offset parameter and behave like
936 read. */
937 aiocbp->aiocb.__return_value =
938 TEMP_FAILURE_RETRY (read (fildes,
939 (void *) aiocbp->aiocb64.aio_buf,
940 aiocbp->aiocb64.aio_nbytes));
942 else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
944 if (aiocbp->aiocb.aio_lio_opcode & 128)
945 aiocbp->aiocb.__return_value =
946 TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
947 aiocbp->aiocb64.aio_buf,
948 aiocbp->aiocb64.aio_nbytes,
949 aiocbp->aiocb64.aio_offset));
950 else
951 aiocbp->aiocb.__return_value =
952 TEMP_FAILURE_RETRY (__libc_pwrite (fildes, (const void *)
953 aiocbp->aiocb.aio_buf,
954 aiocbp->aiocb.aio_nbytes,
955 aiocbp->aiocb.aio_offset));
957 if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
958 /* The Linux kernel is different from others. It returns
959 ESPIPE if using pwrite on a socket. Other platforms
960 simply ignore the offset parameter and behave like
961 write. */
962 aiocbp->aiocb.__return_value =
963 TEMP_FAILURE_RETRY (write (fildes,
964 (void *) aiocbp->aiocb64.aio_buf,
965 aiocbp->aiocb64.aio_nbytes));
967 else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC
968 || aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
970 if (wait_for_kernel_requests (fildes) < 0)
972 aiocbp->aiocb.__return_value = -1;
973 __set_errno (ENOMEM);
975 else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
976 aiocbp->aiocb.__return_value =
977 TEMP_FAILURE_RETRY (fdatasync (fildes));
978 else
979 aiocbp->aiocb.__return_value =
980 TEMP_FAILURE_RETRY (fsync (fildes));
982 else
984 /* This is an invalid opcode. */
985 aiocbp->aiocb.__return_value = -1;
986 __set_errno (EINVAL);
989 /* Get the mutex. */
990 pthread_mutex_lock (&__aio_requests_mutex);
992 /* In theory we would need here a write memory barrier since the
993 callers test using aio_error() whether the request finished
994 and once this value != EINPROGRESS the field __return_value
995 must be committed to memory.
997 But since the pthread_mutex_lock call involves write memory
998 barriers as well it is not necessary. */
1000 if (aiocbp->aiocb.__return_value == -1)
1001 aiocbp->aiocb.__error_code = errno;
1002 else
1003 aiocbp->aiocb.__error_code = 0;
1005 /* Send the signal to notify about finished processing of the
1006 request. */
1007 __aio_notify (runp);
1009 /* For debugging purposes we reset the running flag of the
1010 finished request. */
1011 assert (runp->running == allocated);
1012 runp->running = done;
1014 /* Now dequeue the current request. */
1015 __aio_remove_request (NULL, runp, 0);
1016 if (runp->next_prio != NULL)
1017 add_request_to_runlist (runp->next_prio);
1019 /* Free the old element. */
1020 __aio_free_request (runp);
1023 runp = runlist;
1025 /* If the runlist is empty, then we sleep for a while, waiting for
1026 something to arrive in it. */
1027 if (runp == NULL && optim.aio_idle_time >= 0)
1029 struct timeval now;
1030 struct timespec wakeup_time;
1032 ++idle_thread_count;
1033 gettimeofday (&now, NULL);
1034 wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
1035 wakeup_time.tv_nsec = now.tv_usec * 1000;
1036 if (wakeup_time.tv_nsec > 1000000000)
1038 wakeup_time.tv_nsec -= 1000000000;
1039 ++wakeup_time.tv_sec;
1041 pthread_cond_timedwait (&__aio_new_request_notification,
1042 &__aio_requests_mutex,
1043 &wakeup_time);
1044 --idle_thread_count;
1045 runp = runlist;
1048 if (runp == NULL)
1049 --nthreads;
1050 else
1052 assert (runp->running == yes);
1053 runp->running = allocated;
1054 runlist = runp->next_run;
1056 /* If we have a request to process, and there's still another in
1057 the run list, then we need to either wake up or create a new
1058 thread to service the request that is still in the run list. */
1059 if (runlist != NULL)
1061 /* There are at least two items in the work queue to work on.
1062 If there are other idle threads, then we should wake them
1063 up for these other work elements; otherwise, we should try
1064 to create a new thread. */
1065 if (idle_thread_count > 0)
1066 pthread_cond_signal (&__aio_new_request_notification);
1067 else if (nthreads < optim.aio_threads)
1069 pthread_t thid;
1071 /* Now try to start a thread. If we fail, no big deal,
1072 because we know that there is at least one thread (us)
1073 that is working on AIO operations. */
1074 if (aio_create_helper_thread (&thid, handle_fildes_io, NULL)
1075 == 0)
1076 ++nthreads;
1081 /* Release the mutex. */
1082 pthread_mutex_unlock (&__aio_requests_mutex);
1084 while (runp != NULL);
1086 return NULL;
1090 /* Free allocated resources. */
1091 libc_freeres_fn (free_res)
1093 size_t row;
1095 for (row = 0; row < pool_max_size; ++row)
1096 free (pool[row]);
1098 free (pool);
1102 /* Add newrequest to the runlist. The __abs_prio flag of newrequest must
1103 be correctly set to do this. Also, you had better set newrequest's
1104 "running" flag to "yes" before you release your lock or you'll throw an
1105 assertion. */
1106 static void
1107 internal_function
1108 add_request_to_runlist (struct requestlist *newrequest)
1110 int prio = newrequest->aiocbp->aiocb.__abs_prio;
1111 struct requestlist *runp;
1113 if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
1115 newrequest->next_run = runlist;
1116 runlist = newrequest;
1118 else
1120 runp = runlist;
1122 while (runp->next_run != NULL
1123 && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
1124 runp = runp->next_run;
1126 newrequest->next_run = runp->next_run;
1127 runp->next_run = newrequest;
1130 #endif