Update.
[glibc.git] / rt / aio_misc.c
blob014870566214b877d33885270cbfeba08765f28d
1 /* Handle general operations.
2 Copyright (C) 1997, 1998 Free Software Foundation, Inc.
3 This file is part of the GNU C Library.
4 Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
6 The GNU C Library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Library General Public License as
8 published by the Free Software Foundation; either version 2 of the
9 License, or (at your option) any later version.
11 The GNU C Library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Library General Public License for more details.
16 You should have received a copy of the GNU Library General Public
17 License along with the GNU C Library; see the file COPYING.LIB. If not,
18 write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 Boston, MA 02111-1307, USA. */
21 #include <aio.h>
22 #include <errno.h>
23 #include <limits.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
29 #include "aio_misc.h"
31 /* Pool of request list entries. */
32 static struct requestlist **pool;
34 /* Number of total and allocated pool entries. */
35 static size_t pool_tab_size;
36 static size_t pool_size;
38 /* We implement a two dimensional array but allocate each row separately.
39 The macro below determines how many entries should be used per row.
40 It should better be a power of two. */
41 #define ENTRIES_PER_ROW 16
43 /* The row table is incremented in units of this. */
44 #define ROW_STEP 8
46 /* List of available entries. */
47 static struct requestlist *freelist;
49 /* List of request waiting to be processed. */
50 static struct requestlist *runlist;
52 /* Structure list of all currently processed requests. */
53 static struct requestlist *requests;
55 /* Number of threads currently running. */
56 static int nthreads;
59 /* These are the values used to optimize the use of AIO. The user can
60 overwrite them by using the `aio_init' function. */
61 static struct aioinit optim =
63 20, /* int aio_threads; Maximal number of threads. */
64 256, /* int aio_num; Number of expected simultanious requests. */
69 { 0, }
73 /* Since the list is global we need a mutex protecting it. */
74 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
77 /* Functions to handle request list pool. */
78 static struct requestlist *
79 get_elem (void)
81 struct requestlist *result;
83 if (freelist == NULL)
85 struct requestlist *new_row;
86 size_t new_size;
88 /* Compute new size. */
89 new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
91 if ((new_size / ENTRIES_PER_ROW) >= pool_tab_size)
93 size_t new_tab_size = new_size / ENTRIES_PER_ROW;
94 struct requestlist **new_tab;
96 new_tab = (struct requestlist **)
97 realloc (pool, (new_tab_size * sizeof (struct requestlist *)));
99 if (new_tab == NULL)
100 return NULL;
102 pool_tab_size = new_tab_size;
103 pool = new_tab;
106 if (pool_size == 0)
108 size_t cnt;
110 new_row = (struct requestlist *)
111 calloc (new_size, sizeof (struct requestlist));
113 if (new_row == NULL)
114 return NULL;
116 for (cnt = 0; cnt < new_size / ENTRIES_PER_ROW; ++cnt)
117 pool[cnt] = &new_row[cnt * ENTRIES_PER_ROW];
119 else
121 /* Allocat one new row. */
122 new_row = (struct requestlist *)
123 calloc (ENTRIES_PER_ROW, sizeof (struct requestlist));
124 if (new_row == NULL)
125 return NULL;
127 pool[new_size / ENTRIES_PER_ROW] = new_row;
130 /* Put all the new entries in the freelist. */
133 new_row->next_prio = freelist;
134 freelist = new_row++;
136 while (++pool_size < new_size);
139 result = freelist;
140 freelist = freelist->next_prio;
142 return result;
146 void
147 __aio_free_request (struct requestlist *elem)
149 elem->running = no;
150 elem->next_prio = freelist;
151 freelist = elem;
155 struct requestlist *
156 __aio_find_req (aiocb_union *elem)
158 struct requestlist *runp = requests;
159 int fildes = elem->aiocb.aio_fildes;
161 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
162 runp = runp->next_fd;
164 if (runp != NULL)
165 if (runp->aiocbp->aiocb.aio_fildes != fildes)
166 runp = NULL;
167 else
168 while (runp != NULL && runp->aiocbp != elem)
169 runp = runp->next_prio;
171 return runp;
175 struct requestlist *
176 __aio_find_req_fd (int fildes)
178 struct requestlist *runp = requests;
180 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
181 runp = runp->next_fd;
183 return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
184 ? runp : NULL);
188 /* The thread handler. */
189 static void *handle_fildes_io (void *arg);
192 /* User optimization. */
193 void
194 __aio_init (const struct aioinit *init)
196 /* Get the mutex. */
197 pthread_mutex_lock (&__aio_requests_mutex);
199 /* Only allow writing new values if the table is not yet allocated. */
200 if (pool == NULL)
202 optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
203 optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
204 ? ENTRIES_PER_ROW
205 : init->aio_num & ~ENTRIES_PER_ROW);
208 /* Release the mutex. */
209 pthread_mutex_unlock (&__aio_requests_mutex);
211 weak_alias (__aio_init, aio_init)
214 /* The main function of the async I/O handling. It enqueues requests
215 and if necessary starts and handles threads. */
216 struct requestlist *
217 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
219 int result = 0;
220 int policy, prio;
221 struct sched_param param;
222 struct requestlist *last, *runp, *newp;
223 int running = no;
225 if (aiocbp->aiocb.aio_reqprio < 0
226 || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX)
228 /* Invalid priority value. */
229 __set_errno (EINVAL);
230 aiocbp->aiocb.__error_code = EINVAL;
231 aiocbp->aiocb.__return_value = -1;
232 return NULL;
235 /* Compute priority for this request. */
236 pthread_getschedparam (pthread_self (), &policy, &param);
237 prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
239 /* Get the mutex. */
240 pthread_mutex_lock (&__aio_requests_mutex);
242 last = NULL;
243 runp = requests;
244 /* First look whether the current file descriptor is currently
245 worked with. */
246 while (runp != NULL
247 && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
249 last = runp;
250 runp = runp->next_fd;
253 /* Get a new element for the waiting list. */
254 newp = get_elem ();
255 if (newp == NULL)
257 __set_errno (EAGAIN);
258 pthread_mutex_unlock (&__aio_requests_mutex);
259 return NULL;
261 newp->aiocbp = aiocbp;
262 newp->waiting = NULL;
264 aiocbp->aiocb.__abs_prio = prio;
265 aiocbp->aiocb.__policy = policy;
266 aiocbp->aiocb.aio_lio_opcode = operation;
267 aiocbp->aiocb.__error_code = EINPROGRESS;
268 aiocbp->aiocb.__return_value = 0;
270 if (runp != NULL
271 && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
273 /* The current file descriptor is worked on. It makes no sense
274 to start another thread since this new thread would fight
275 with the running thread for the resources. But we also cannot
276 say that the thread processing this desriptor shall immediately
277 after finishing the current job process this request if there
278 are other threads in the running queue which have a higher
279 priority. */
281 /* Simply enqueue it after the running one according to the
282 priority. */
283 while (runp->next_prio != NULL
284 && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
285 runp = runp->next_prio;
287 newp->next_prio = runp->next_prio;
288 runp->next_prio = newp;
290 running = queued;
292 else
294 /* Enqueue this request for a new descriptor. */
295 if (last == NULL)
297 newp->last_fd = NULL;
298 newp->next_fd = requests;
299 if (requests != NULL)
300 requests->last_fd = newp;
301 requests = newp;
303 else
305 newp->next_fd = last->next_fd;
306 newp->last_fd = last;
307 last->next_fd = newp;
308 if (newp->next_fd != NULL)
309 newp->next_fd->last_fd = newp;
312 newp->next_prio = NULL;
315 if (running == no)
317 /* We try to create a new thread for this file descriptor. The
318 function which gets called will handle all available requests
319 for this descriptor and when all are processed it will
320 terminate.
322 If no new thread can be created or if the specified limit of
323 threads for AIO is reached we queue the request. */
325 /* See if we can create a thread. */
326 if (nthreads < optim.aio_threads)
328 pthread_t thid;
329 pthread_attr_t attr;
331 /* Make sure the thread is created detached. */
332 pthread_attr_init (&attr);
333 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
335 /* Now try to start a thread. */
336 if (pthread_create (&thid, &attr, handle_fildes_io, newp) == 0)
338 /* We managed to enqueue the request. All errors which can
339 happen now can be recognized by calls to `aio_return' and
340 `aio_error'. */
341 running = allocated;
342 ++nthreads;
344 else if (nthreads == 0)
345 /* We cannot create a thread in the moment and there is
346 also no thread running. This is a problem. `errno' is
347 set to EAGAIN if this is only a temporary problem. */
348 result = -1;
352 /* Enqueue the request in the run queue if it is not yet running. */
353 if (running < yes && result == 0)
355 if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
357 newp->next_run = runlist;
358 runlist = newp;
360 else
362 runp = runlist;
364 while (runp->next_run != NULL
365 && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
366 runp = runp->next_run;
368 newp->next_run = runp->next_run;
369 runp->next_run = newp;
373 if (result == 0)
374 newp->running = running;
375 else
377 /* Something went wrong. */
378 __aio_free_request (newp);
379 newp = NULL;
382 /* Release the mutex. */
383 pthread_mutex_unlock (&__aio_requests_mutex);
385 return newp;
389 static void *
390 handle_fildes_io (void *arg)
392 pthread_t self = pthread_self ();
393 struct sched_param param;
394 struct requestlist *runp = (struct requestlist *) arg;
395 aiocb_union *aiocbp;
396 int policy;
397 int fildes;
399 pthread_getschedparam (self, &policy, &param);
403 /* Update our variables. */
404 aiocbp = runp->aiocbp;
405 fildes = aiocbp->aiocb.aio_fildes;
407 /* Change the priority to the requested value (if necessary). */
408 if (aiocbp->aiocb.__abs_prio != param.sched_priority
409 || aiocbp->aiocb.__policy != policy)
411 param.sched_priority = aiocbp->aiocb.__abs_prio;
412 policy = aiocbp->aiocb.__policy;
413 pthread_setschedparam (self, policy, &param);
416 /* Process request pointed to by RUNP. We must not be disturbed
417 by signals. */
418 if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
420 if (aiocbp->aiocb.aio_lio_opcode & 128)
421 aiocbp->aiocb.__return_value =
422 TEMP_FAILURE_RETRY (__pread64 (fildes,
423 (void *) aiocbp->aiocb64.aio_buf,
424 aiocbp->aiocb64.aio_nbytes,
425 aiocbp->aiocb64.aio_offset));
426 else
427 aiocbp->aiocb.__return_value =
428 TEMP_FAILURE_RETRY (pread (fildes,
429 (void *) aiocbp->aiocb.aio_buf,
430 aiocbp->aiocb.aio_nbytes,
431 aiocbp->aiocb.aio_offset));
433 else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
435 if (aiocbp->aiocb.aio_lio_opcode & 128)
436 aiocbp->aiocb.__return_value =
437 TEMP_FAILURE_RETRY (__pwrite64 (fildes,
438 (const void *) aiocbp->aiocb64.aio_buf,
439 aiocbp->aiocb64.aio_nbytes,
440 aiocbp->aiocb64.aio_offset));
441 else
442 aiocbp->aiocb.__return_value =
443 TEMP_FAILURE_RETRY (pwrite (fildes,
444 (const void *) aiocbp->aiocb.aio_buf,
445 aiocbp->aiocb.aio_nbytes,
446 aiocbp->aiocb.aio_offset));
448 else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
449 aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
450 else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
451 aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
452 else
454 /* This is an invalid opcode. */
455 aiocbp->aiocb.__return_value = -1;
456 __set_errno (EINVAL);
459 /* Get the mutex. */
460 pthread_mutex_lock (&__aio_requests_mutex);
462 if (aiocbp->aiocb.__return_value == -1)
463 aiocbp->aiocb.__error_code = errno;
464 else
465 aiocbp->aiocb.__error_code = 0;
467 /* Send the signal to notify about finished processing of the
468 request. */
469 __aio_notify (runp);
471 /* Now dequeue the current request. */
472 if (runp->next_prio == NULL)
474 /* No outstanding request for this descriptor. Remove this
475 descriptor from the list. */
476 if (runp->next_fd != NULL)
477 runp->next_fd->last_fd = runp->last_fd;
478 if (runp->last_fd != NULL)
479 runp->last_fd->next_fd = runp->next_fd;
480 else
481 requests = runp->next_fd;
483 else
485 runp->next_prio->last_fd = runp->last_fd;
486 runp->next_prio->next_fd = runp->next_fd;
487 runp->next_prio->running = yes;
488 if (runp->next_fd != NULL)
489 runp->next_fd->last_fd = runp->next_prio;
490 if (runp->last_fd != NULL)
491 runp->last_fd->next_fd = runp->next_prio;
492 else
493 requests = runp->next_prio;
496 /* Free the old element. */
497 __aio_free_request (runp);
499 runp = runlist;
500 if (runp != NULL)
502 /* We must not run requests which are not marked `running'. */
503 if (runp->running == yes)
504 runlist = runp->next_run;
505 else
507 struct requestlist *old;
511 old = runp;
512 runp = runp->next_run;
514 while (runp != NULL && runp->running != yes);
516 if (runp != NULL)
517 old->next_run = runp->next_run;
521 /* If no request to work on we will stop the thread. */
522 if (runp == NULL)
523 --nthreads;
524 else
525 runp->running = allocated;
527 /* Release the mutex. */
528 pthread_mutex_unlock (&__aio_requests_mutex);
530 while (runp != NULL);
532 pthread_exit (NULL);
536 /* Free allocated resources. */
537 static void
538 __attribute__ ((unused))
539 free_res (void)
541 size_t row;
543 /* The first block of rows as specified in OPTIM is allocated in
544 one chunk. */
545 free (pool[0]);
547 for (row = optim.aio_num / ENTRIES_PER_ROW; row < pool_tab_size; ++row)
548 free (pool[row]);
550 free (pool);
553 text_set_element (__libc_subfreeres, free_res);