Update.
[glibc.git] / rt / aio_misc.c
blobfa3c75c150fc404226af1c88be22c69becb357e8
1 /* Handle general operations.
2 Copyright (C) 1997, 1998, 1999 Free Software Foundation, Inc.
3 This file is part of the GNU C Library.
4 Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
6 The GNU C Library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Library General Public License as
8 published by the Free Software Foundation; either version 2 of the
9 License, or (at your option) any later version.
11 The GNU C Library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Library General Public License for more details.
16 You should have received a copy of the GNU Library General Public
17 License along with the GNU C Library; see the file COPYING.LIB. If not,
18 write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 Boston, MA 02111-1307, USA. */
21 #include <aio.h>
22 #include <errno.h>
23 #include <limits.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
29 #include "aio_misc.h"
31 /* Pool of request list entries. */
32 static struct requestlist **pool;
34 /* Number of total and allocated pool entries. */
35 static size_t pool_tab_size;
36 static size_t pool_size;
38 /* We implement a two dimensional array but allocate each row separately.
39 The macro below determines how many entries should be used per row.
40 It should better be a power of two. */
41 #define ENTRIES_PER_ROW 16
43 /* The row table is incremented in units of this. */
44 #define ROW_STEP 8
46 /* List of available entries. */
47 static struct requestlist *freelist;
49 /* List of request waiting to be processed. */
50 static struct requestlist *runlist;
52 /* Structure list of all currently processed requests. */
53 static struct requestlist *requests;
55 /* Number of threads currently running. */
56 static int nthreads;
59 /* These are the values used to optimize the use of AIO. The user can
60 overwrite them by using the `aio_init' function. */
61 static struct aioinit optim =
63 20, /* int aio_threads; Maximal number of threads. */
64 256, /* int aio_num; Number of expected simultanious requests. */
69 { 0, }
73 /* Since the list is global we need a mutex protecting it. */
74 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
77 /* Functions to handle request list pool. */
78 static struct requestlist *
79 get_elem (void)
81 struct requestlist *result;
83 if (freelist == NULL)
85 struct requestlist *new_row;
86 size_t new_size;
88 /* Compute new size. */
89 new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
91 if ((new_size / ENTRIES_PER_ROW) >= pool_tab_size)
93 size_t new_tab_size = new_size / ENTRIES_PER_ROW;
94 struct requestlist **new_tab;
96 new_tab = (struct requestlist **)
97 realloc (pool, (new_tab_size * sizeof (struct requestlist *)));
99 if (new_tab == NULL)
100 return NULL;
102 pool_tab_size = new_tab_size;
103 pool = new_tab;
106 if (pool_size == 0)
108 size_t cnt;
110 new_row = (struct requestlist *)
111 calloc (new_size, sizeof (struct requestlist));
113 if (new_row == NULL)
114 return NULL;
116 for (cnt = 0; cnt < new_size / ENTRIES_PER_ROW; ++cnt)
117 pool[cnt] = &new_row[cnt * ENTRIES_PER_ROW];
119 else
121 /* Allocat one new row. */
122 new_row = (struct requestlist *)
123 calloc (ENTRIES_PER_ROW, sizeof (struct requestlist));
124 if (new_row == NULL)
125 return NULL;
127 pool[new_size / ENTRIES_PER_ROW] = new_row;
130 /* Put all the new entries in the freelist. */
133 new_row->next_prio = freelist;
134 freelist = new_row++;
136 while (++pool_size < new_size);
139 result = freelist;
140 freelist = freelist->next_prio;
142 return result;
146 void
147 internal_function
148 __aio_free_request (struct requestlist *elem)
150 elem->running = no;
151 elem->next_prio = freelist;
152 freelist = elem;
156 struct requestlist *
157 internal_function
158 __aio_find_req (aiocb_union *elem)
160 struct requestlist *runp = requests;
161 int fildes = elem->aiocb.aio_fildes;
163 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
164 runp = runp->next_fd;
166 if (runp != NULL)
168 if (runp->aiocbp->aiocb.aio_fildes != fildes)
169 runp = NULL;
170 else
171 while (runp != NULL && runp->aiocbp != elem)
172 runp = runp->next_prio;
175 return runp;
179 struct requestlist *
180 internal_function
181 __aio_find_req_fd (int fildes)
183 struct requestlist *runp = requests;
185 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
186 runp = runp->next_fd;
188 return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
189 ? runp : NULL);
193 /* The thread handler. */
194 static void *handle_fildes_io (void *arg);
197 /* User optimization. */
198 void
199 __aio_init (const struct aioinit *init)
201 /* Get the mutex. */
202 pthread_mutex_lock (&__aio_requests_mutex);
204 /* Only allow writing new values if the table is not yet allocated. */
205 if (pool == NULL)
207 optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
208 optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
209 ? ENTRIES_PER_ROW
210 : init->aio_num & ~ENTRIES_PER_ROW);
213 /* Release the mutex. */
214 pthread_mutex_unlock (&__aio_requests_mutex);
216 weak_alias (__aio_init, aio_init)
219 /* The main function of the async I/O handling. It enqueues requests
220 and if necessary starts and handles threads. */
221 struct requestlist *
222 internal_function
223 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
225 int result = 0;
226 int policy, prio;
227 struct sched_param param;
228 struct requestlist *last, *runp, *newp;
229 int running = no;
231 if (aiocbp->aiocb.aio_reqprio < 0
232 || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
234 /* Invalid priority value. */
235 __set_errno (EINVAL);
236 aiocbp->aiocb.__error_code = EINVAL;
237 aiocbp->aiocb.__return_value = -1;
238 return NULL;
241 /* Compute priority for this request. */
242 pthread_getschedparam (pthread_self (), &policy, &param);
243 prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
245 /* Get the mutex. */
246 pthread_mutex_lock (&__aio_requests_mutex);
248 last = NULL;
249 runp = requests;
250 /* First look whether the current file descriptor is currently
251 worked with. */
252 while (runp != NULL
253 && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
255 last = runp;
256 runp = runp->next_fd;
259 /* Get a new element for the waiting list. */
260 newp = get_elem ();
261 if (newp == NULL)
263 pthread_mutex_unlock (&__aio_requests_mutex);
264 __set_errno (EAGAIN);
265 return NULL;
267 newp->aiocbp = aiocbp;
268 newp->caller_pid = (aiocbp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL
269 ? getpid () : 0);
270 newp->waiting = NULL;
272 aiocbp->aiocb.__abs_prio = prio;
273 aiocbp->aiocb.__policy = policy;
274 aiocbp->aiocb.aio_lio_opcode = operation;
275 aiocbp->aiocb.__error_code = EINPROGRESS;
276 aiocbp->aiocb.__return_value = 0;
278 if (runp != NULL
279 && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
281 /* The current file descriptor is worked on. It makes no sense
282 to start another thread since this new thread would fight
283 with the running thread for the resources. But we also cannot
284 say that the thread processing this desriptor shall immediately
285 after finishing the current job process this request if there
286 are other threads in the running queue which have a higher
287 priority. */
289 /* Simply enqueue it after the running one according to the
290 priority. */
291 while (runp->next_prio != NULL
292 && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
293 runp = runp->next_prio;
295 newp->next_prio = runp->next_prio;
296 runp->next_prio = newp;
298 running = queued;
300 else
302 /* Enqueue this request for a new descriptor. */
303 if (last == NULL)
305 newp->last_fd = NULL;
306 newp->next_fd = requests;
307 if (requests != NULL)
308 requests->last_fd = newp;
309 requests = newp;
311 else
313 newp->next_fd = last->next_fd;
314 newp->last_fd = last;
315 last->next_fd = newp;
316 if (newp->next_fd != NULL)
317 newp->next_fd->last_fd = newp;
320 newp->next_prio = NULL;
323 if (running == no)
325 /* We try to create a new thread for this file descriptor. The
326 function which gets called will handle all available requests
327 for this descriptor and when all are processed it will
328 terminate.
330 If no new thread can be created or if the specified limit of
331 threads for AIO is reached we queue the request. */
333 /* See if we can create a thread. */
334 if (nthreads < optim.aio_threads)
336 pthread_t thid;
337 pthread_attr_t attr;
339 /* Make sure the thread is created detached. */
340 pthread_attr_init (&attr);
341 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
343 /* Now try to start a thread. */
344 if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
346 /* We managed to enqueue the request. All errors which can
347 happen now can be recognized by calls to `aio_return' and
348 `aio_error'. */
349 running = allocated;
350 ++nthreads;
352 else if (nthreads == 0)
353 /* We cannot create a thread in the moment and there is
354 also no thread running. This is a problem. `errno' is
355 set to EAGAIN if this is only a temporary problem. */
356 result = -1;
360 /* Enqueue the request in the run queue if it is not yet running. */
361 if (running < yes && result == 0)
363 if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
365 newp->next_run = runlist;
366 runlist = newp;
368 else
370 runp = runlist;
372 while (runp->next_run != NULL
373 && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
374 runp = runp->next_run;
376 newp->next_run = runp->next_run;
377 runp->next_run = newp;
381 if (result == 0)
382 newp->running = running;
383 else
385 /* Something went wrong. */
386 __aio_free_request (newp);
387 newp = NULL;
390 /* Release the mutex. */
391 pthread_mutex_unlock (&__aio_requests_mutex);
393 return newp;
397 static void *
398 handle_fildes_io (void *arg)
400 pthread_t self = pthread_self ();
401 struct sched_param param;
402 struct requestlist *runp = (struct requestlist *) arg;
403 aiocb_union *aiocbp;
404 int policy;
405 int fildes;
407 pthread_getschedparam (self, &policy, &param);
411 /* Update our variables. */
412 aiocbp = runp->aiocbp;
413 fildes = aiocbp->aiocb.aio_fildes;
415 /* Change the priority to the requested value (if necessary). */
416 if (aiocbp->aiocb.__abs_prio != param.sched_priority
417 || aiocbp->aiocb.__policy != policy)
419 param.sched_priority = aiocbp->aiocb.__abs_prio;
420 policy = aiocbp->aiocb.__policy;
421 pthread_setschedparam (self, policy, &param);
424 /* Process request pointed to by RUNP. We must not be disturbed
425 by signals. */
426 if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
428 if (aiocbp->aiocb.aio_lio_opcode & 128)
429 aiocbp->aiocb.__return_value =
430 TEMP_FAILURE_RETRY (__pread64 (fildes,
431 (void *) aiocbp->aiocb64.aio_buf,
432 aiocbp->aiocb64.aio_nbytes,
433 aiocbp->aiocb64.aio_offset));
434 else
435 aiocbp->aiocb.__return_value =
436 TEMP_FAILURE_RETRY (pread (fildes,
437 (void *) aiocbp->aiocb.aio_buf,
438 aiocbp->aiocb.aio_nbytes,
439 aiocbp->aiocb.aio_offset));
441 if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
442 /* The Linux kernel is different from others. It returns
443 ESPIPE if using pread on a socket. Other platforms
444 simply ignore the offset parameter and behave like
445 read. */
446 aiocbp->aiocb.__return_value =
447 TEMP_FAILURE_RETRY (read (fildes,
448 (void *) aiocbp->aiocb64.aio_buf,
449 aiocbp->aiocb64.aio_nbytes));
451 else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
453 if (aiocbp->aiocb.aio_lio_opcode & 128)
454 aiocbp->aiocb.__return_value =
455 TEMP_FAILURE_RETRY (__pwrite64 (fildes,
456 (const void *) aiocbp->aiocb64.aio_buf,
457 aiocbp->aiocb64.aio_nbytes,
458 aiocbp->aiocb64.aio_offset));
459 else
460 aiocbp->aiocb.__return_value =
461 TEMP_FAILURE_RETRY (pwrite (fildes,
462 (const void *) aiocbp->aiocb.aio_buf,
463 aiocbp->aiocb.aio_nbytes,
464 aiocbp->aiocb.aio_offset));
466 if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
467 /* The Linux kernel is different from others. It returns
468 ESPIPE if using pwrite on a socket. Other platforms
469 simply ignore the offset parameter and behave like
470 write. */
471 aiocbp->aiocb.__return_value =
472 TEMP_FAILURE_RETRY (write (fildes,
473 (void *) aiocbp->aiocb64.aio_buf,
474 aiocbp->aiocb64.aio_nbytes));
476 else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
477 aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
478 else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
479 aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
480 else
482 /* This is an invalid opcode. */
483 aiocbp->aiocb.__return_value = -1;
484 __set_errno (EINVAL);
487 /* Get the mutex. */
488 pthread_mutex_lock (&__aio_requests_mutex);
490 if (aiocbp->aiocb.__return_value == -1)
491 aiocbp->aiocb.__error_code = errno;
492 else
493 aiocbp->aiocb.__error_code = 0;
495 /* Send the signal to notify about finished processing of the
496 request. */
497 __aio_notify (runp);
499 /* Now dequeue the current request. */
500 if (runp->next_prio == NULL)
502 /* No outstanding request for this descriptor. Remove this
503 descriptor from the list. */
504 if (runp->next_fd != NULL)
505 runp->next_fd->last_fd = runp->last_fd;
506 if (runp->last_fd != NULL)
507 runp->last_fd->next_fd = runp->next_fd;
508 else
509 requests = runp->next_fd;
511 else
513 runp->next_prio->last_fd = runp->last_fd;
514 runp->next_prio->next_fd = runp->next_fd;
515 runp->next_prio->running = yes;
516 if (runp->next_fd != NULL)
517 runp->next_fd->last_fd = runp->next_prio;
518 if (runp->last_fd != NULL)
519 runp->last_fd->next_fd = runp->next_prio;
520 else
521 requests = runp->next_prio;
524 /* Free the old element. */
525 __aio_free_request (runp);
527 runp = runlist;
528 if (runp != NULL)
530 /* We must not run requests which are not marked `running'. */
531 if (runp->running == yes)
532 runlist = runp->next_run;
533 else
535 struct requestlist *old;
539 old = runp;
540 runp = runp->next_run;
542 while (runp != NULL && runp->running != yes);
544 if (runp != NULL)
545 old->next_run = runp->next_run;
549 /* If no request to work on we will stop the thread. */
550 if (runp == NULL)
551 --nthreads;
552 else
553 runp->running = allocated;
555 /* Release the mutex. */
556 pthread_mutex_unlock (&__aio_requests_mutex);
558 while (runp != NULL);
560 pthread_exit (NULL);
564 /* Free allocated resources. */
565 static void
566 __attribute__ ((unused))
567 free_res (void)
569 size_t row;
571 /* The first block of rows as specified in OPTIM is allocated in
572 one chunk. */
573 free (pool[0]);
575 for (row = optim.aio_num / ENTRIES_PER_ROW; row < pool_tab_size; ++row)
576 free (pool[row]);
578 free (pool);
581 text_set_element (__libc_subfreeres, free_res);