Update ChangeLog.old/ChangeLog.23.
[glibc.git] / rt / aio_misc.c
blobacbd4f3623ee7756220ec119085fec5c24bd6a5f
1 /* Handle general operations.
2 Copyright (C) 1997-2021 Free Software Foundation, Inc.
3 This file is part of the GNU C Library.
4 Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
6 The GNU C Library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2.1 of the License, or (at your option) any later version.
11 The GNU C Library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with the GNU C Library; if not, see
18 <https://www.gnu.org/licenses/>. */
20 #include <aio.h>
21 #include <assert.h>
22 #include <errno.h>
23 #include <limits.h>
24 #include <pthreadP.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <sys/param.h>
28 #include <sys/stat.h>
29 #include <sys/time.h>
30 #include <aio_misc.h>
32 #if !PTHREAD_IN_LIBC
33 /* The available function names differ outside of libc. (In libc, we
34 need to use hidden aliases to avoid the PLT.) */
35 # define __pread __libc_pread
36 # define __pthread_attr_destroy pthread_attr_destroy
37 # define __pthread_attr_init pthread_attr_init
38 # define __pthread_attr_setdetachstate pthread_attr_setdetachstate
39 # define __pthread_cond_signal pthread_cond_signal
40 # define __pthread_cond_timedwait pthread_cond_timedwait
41 # define __pthread_getschedparam pthread_getschedparam
42 # define __pthread_setschedparam pthread_setschedparam
43 # define __pwrite __libc_pwrite
44 #endif
46 #ifndef aio_create_helper_thread
47 # define aio_create_helper_thread __aio_create_helper_thread
49 extern inline int
50 __aio_create_helper_thread (pthread_t *threadp, void *(*tf) (void *), void *arg)
52 pthread_attr_t attr;
54 /* Make sure the thread is created detached. */
55 __pthread_attr_init (&attr);
56 __pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
58 int ret = __pthread_create (threadp, &attr, tf, arg);
60 __pthread_attr_destroy (&attr);
61 return ret;
63 #endif
65 static void add_request_to_runlist (struct requestlist *newrequest);
67 /* Pool of request list entries. */
68 static struct requestlist **pool;
70 /* Number of total and allocated pool entries. */
71 static size_t pool_max_size;
72 static size_t pool_size;
74 /* We implement a two dimensional array but allocate each row separately.
75 The macro below determines how many entries should be used per row.
76 It should better be a power of two. */
77 #define ENTRIES_PER_ROW 32
79 /* How many rows we allocate at once. */
80 #define ROWS_STEP 8
82 /* List of available entries. */
83 static struct requestlist *freelist;
85 /* List of request waiting to be processed. */
86 static struct requestlist *runlist;
88 /* Structure list of all currently processed requests. */
89 static struct requestlist *requests;
91 /* Number of threads currently running. */
92 static int nthreads;
94 /* Number of threads waiting for work to arrive. */
95 static int idle_thread_count;
98 /* These are the values used to optimize the use of AIO. The user can
99 overwrite them by using the `aio_init' function. */
100 static struct aioinit optim =
102 20, /* int aio_threads; Maximal number of threads. */
103 64, /* int aio_num; Number of expected simultaneous requests. */
113 /* Since the list is global we need a mutex protecting it. */
114 pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
116 /* When you add a request to the list and there are idle threads present,
117 you signal this condition variable. When a thread finishes work, it waits
118 on this condition variable for a time before it actually exits. */
119 pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
122 /* Functions to handle request list pool. */
123 static struct requestlist *
124 get_elem (void)
126 struct requestlist *result;
128 if (freelist == NULL)
130 struct requestlist *new_row;
131 int cnt;
133 assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
135 if (pool_size + 1 >= pool_max_size)
137 size_t new_max_size = pool_max_size + ROWS_STEP;
138 struct requestlist **new_tab;
140 new_tab = (struct requestlist **)
141 realloc (pool, new_max_size * sizeof (struct requestlist *));
143 if (new_tab == NULL)
144 return NULL;
146 pool_max_size = new_max_size;
147 pool = new_tab;
150 /* Allocate the new row. */
151 cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW;
152 new_row = (struct requestlist *) calloc (cnt,
153 sizeof (struct requestlist));
154 if (new_row == NULL)
155 return NULL;
157 pool[pool_size++] = new_row;
159 /* Put all the new entries in the freelist. */
162 new_row->next_prio = freelist;
163 freelist = new_row++;
165 while (--cnt > 0);
168 result = freelist;
169 freelist = freelist->next_prio;
171 return result;
175 void
176 __aio_free_request (struct requestlist *elem)
178 elem->running = no;
179 elem->next_prio = freelist;
180 freelist = elem;
184 struct requestlist *
185 __aio_find_req (aiocb_union *elem)
187 struct requestlist *runp = requests;
188 int fildes = elem->aiocb.aio_fildes;
190 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
191 runp = runp->next_fd;
193 if (runp != NULL)
195 if (runp->aiocbp->aiocb.aio_fildes != fildes)
196 runp = NULL;
197 else
198 while (runp != NULL && runp->aiocbp != elem)
199 runp = runp->next_prio;
202 return runp;
206 struct requestlist *
207 __aio_find_req_fd (int fildes)
209 struct requestlist *runp = requests;
211 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
212 runp = runp->next_fd;
214 return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
215 ? runp : NULL);
219 void
220 __aio_remove_request (struct requestlist *last, struct requestlist *req,
221 int all)
223 assert (req->running == yes || req->running == queued
224 || req->running == done);
226 if (last != NULL)
227 last->next_prio = all ? NULL : req->next_prio;
228 else
230 if (all || req->next_prio == NULL)
232 if (req->last_fd != NULL)
233 req->last_fd->next_fd = req->next_fd;
234 else
235 requests = req->next_fd;
236 if (req->next_fd != NULL)
237 req->next_fd->last_fd = req->last_fd;
239 else
241 if (req->last_fd != NULL)
242 req->last_fd->next_fd = req->next_prio;
243 else
244 requests = req->next_prio;
246 if (req->next_fd != NULL)
247 req->next_fd->last_fd = req->next_prio;
249 req->next_prio->last_fd = req->last_fd;
250 req->next_prio->next_fd = req->next_fd;
252 /* Mark this entry as runnable. */
253 req->next_prio->running = yes;
256 if (req->running == yes)
258 struct requestlist *runp = runlist;
260 last = NULL;
261 while (runp != NULL)
263 if (runp == req)
265 if (last == NULL)
266 runlist = runp->next_run;
267 else
268 last->next_run = runp->next_run;
269 break;
271 last = runp;
272 runp = runp->next_run;
279 /* The thread handler. */
280 static void *handle_fildes_io (void *arg);
283 /* User optimization. */
284 void
285 __aio_init (const struct aioinit *init)
287 /* Get the mutex. */
288 __pthread_mutex_lock (&__aio_requests_mutex);
290 /* Only allow writing new values if the table is not yet allocated. */
291 if (pool == NULL)
293 optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
294 assert (powerof2 (ENTRIES_PER_ROW));
295 optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
296 ? ENTRIES_PER_ROW
297 : init->aio_num & ~(ENTRIES_PER_ROW - 1));
300 if (init->aio_idle_time != 0)
301 optim.aio_idle_time = init->aio_idle_time;
303 /* Release the mutex. */
304 __pthread_mutex_unlock (&__aio_requests_mutex);
308 /* The main function of the async I/O handling. It enqueues requests
309 and if necessary starts and handles threads. */
310 struct requestlist *
311 __aio_enqueue_request (aiocb_union *aiocbp, int operation)
313 int result = 0;
314 int policy, prio;
315 struct sched_param param;
316 struct requestlist *last, *runp, *newp;
317 int running = no;
319 if (operation == LIO_SYNC || operation == LIO_DSYNC)
320 aiocbp->aiocb.aio_reqprio = 0;
321 else if (aiocbp->aiocb.aio_reqprio < 0
322 #ifdef AIO_PRIO_DELTA_MAX
323 || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX
324 #endif
327 /* Invalid priority value. */
328 __set_errno (EINVAL);
329 aiocbp->aiocb.__error_code = EINVAL;
330 aiocbp->aiocb.__return_value = -1;
331 return NULL;
334 /* Compute priority for this request. */
335 __pthread_getschedparam (__pthread_self (), &policy, &param);
336 prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
338 /* Get the mutex. */
339 __pthread_mutex_lock (&__aio_requests_mutex);
341 last = NULL;
342 runp = requests;
343 /* First look whether the current file descriptor is currently
344 worked with. */
345 while (runp != NULL
346 && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
348 last = runp;
349 runp = runp->next_fd;
352 /* Get a new element for the waiting list. */
353 newp = get_elem ();
354 if (newp == NULL)
356 __pthread_mutex_unlock (&__aio_requests_mutex);
357 __set_errno (EAGAIN);
358 return NULL;
360 newp->aiocbp = aiocbp;
361 newp->waiting = NULL;
363 aiocbp->aiocb.__abs_prio = prio;
364 aiocbp->aiocb.__policy = policy;
365 aiocbp->aiocb.aio_lio_opcode = operation;
366 aiocbp->aiocb.__error_code = EINPROGRESS;
367 aiocbp->aiocb.__return_value = 0;
369 if (runp != NULL
370 && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
372 /* The current file descriptor is worked on. It makes no sense
373 to start another thread since this new thread would fight
374 with the running thread for the resources. But we also cannot
375 say that the thread processing this desriptor shall immediately
376 after finishing the current job process this request if there
377 are other threads in the running queue which have a higher
378 priority. */
380 /* Simply enqueue it after the running one according to the
381 priority. */
382 last = NULL;
383 while (runp->next_prio != NULL
384 && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
386 last = runp;
387 runp = runp->next_prio;
390 newp->next_prio = runp->next_prio;
391 runp->next_prio = newp;
393 running = queued;
395 else
397 running = yes;
398 /* Enqueue this request for a new descriptor. */
399 if (last == NULL)
401 newp->last_fd = NULL;
402 newp->next_fd = requests;
403 if (requests != NULL)
404 requests->last_fd = newp;
405 requests = newp;
407 else
409 newp->next_fd = last->next_fd;
410 newp->last_fd = last;
411 last->next_fd = newp;
412 if (newp->next_fd != NULL)
413 newp->next_fd->last_fd = newp;
416 newp->next_prio = NULL;
417 last = NULL;
420 if (running == yes)
422 /* We try to create a new thread for this file descriptor. The
423 function which gets called will handle all available requests
424 for this descriptor and when all are processed it will
425 terminate.
427 If no new thread can be created or if the specified limit of
428 threads for AIO is reached we queue the request. */
430 /* See if we need to and are able to create a thread. */
431 if (nthreads < optim.aio_threads && idle_thread_count == 0)
433 pthread_t thid;
435 running = newp->running = allocated;
437 /* Now try to start a thread. */
438 result = aio_create_helper_thread (&thid, handle_fildes_io, newp);
439 if (result == 0)
440 /* We managed to enqueue the request. All errors which can
441 happen now can be recognized by calls to `aio_return' and
442 `aio_error'. */
443 ++nthreads;
444 else
446 /* Reset the running flag. The new request is not running. */
447 running = newp->running = yes;
449 if (nthreads == 0)
451 /* We cannot create a thread in the moment and there is
452 also no thread running. This is a problem. `errno' is
453 set to EAGAIN if this is only a temporary problem. */
454 __aio_remove_request (last, newp, 0);
456 else
457 result = 0;
462 /* Enqueue the request in the run queue if it is not yet running. */
463 if (running == yes && result == 0)
465 add_request_to_runlist (newp);
467 /* If there is a thread waiting for work, then let it know that we
468 have just given it something to do. */
469 if (idle_thread_count > 0)
470 __pthread_cond_signal (&__aio_new_request_notification);
473 if (result == 0)
474 newp->running = running;
475 else
477 /* Something went wrong. */
478 __aio_free_request (newp);
479 aiocbp->aiocb.__error_code = result;
480 __set_errno (result);
481 newp = NULL;
484 /* Release the mutex. */
485 __pthread_mutex_unlock (&__aio_requests_mutex);
487 return newp;
491 static void *
492 handle_fildes_io (void *arg)
494 pthread_t self = __pthread_self ();
495 struct sched_param param;
496 struct requestlist *runp = (struct requestlist *) arg;
497 aiocb_union *aiocbp;
498 int policy;
499 int fildes;
501 __pthread_getschedparam (self, &policy, &param);
505 /* If runp is NULL, then we were created to service the work queue
506 in general, not to handle any particular request. In that case we
507 skip the "do work" stuff on the first pass, and go directly to the
508 "get work off the work queue" part of this loop, which is near the
509 end. */
510 if (runp == NULL)
511 __pthread_mutex_lock (&__aio_requests_mutex);
512 else
514 /* Hopefully this request is marked as running. */
515 assert (runp->running == allocated);
517 /* Update our variables. */
518 aiocbp = runp->aiocbp;
519 fildes = aiocbp->aiocb.aio_fildes;
521 /* Change the priority to the requested value (if necessary). */
522 if (aiocbp->aiocb.__abs_prio != param.sched_priority
523 || aiocbp->aiocb.__policy != policy)
525 param.sched_priority = aiocbp->aiocb.__abs_prio;
526 policy = aiocbp->aiocb.__policy;
527 __pthread_setschedparam (self, policy, &param);
530 /* Process request pointed to by RUNP. We must not be disturbed
531 by signals. */
532 if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
534 if (sizeof (off_t) != sizeof (off64_t)
535 && aiocbp->aiocb.aio_lio_opcode & 128)
536 aiocbp->aiocb.__return_value =
537 TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
538 aiocbp->aiocb64.aio_buf,
539 aiocbp->aiocb64.aio_nbytes,
540 aiocbp->aiocb64.aio_offset));
541 else
542 aiocbp->aiocb.__return_value =
543 TEMP_FAILURE_RETRY (__pread (fildes,
544 (void *)
545 aiocbp->aiocb.aio_buf,
546 aiocbp->aiocb.aio_nbytes,
547 aiocbp->aiocb.aio_offset));
549 if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
550 /* The Linux kernel is different from others. It returns
551 ESPIPE if using pread on a socket. Other platforms
552 simply ignore the offset parameter and behave like
553 read. */
554 aiocbp->aiocb.__return_value =
555 TEMP_FAILURE_RETRY (read (fildes,
556 (void *) aiocbp->aiocb64.aio_buf,
557 aiocbp->aiocb64.aio_nbytes));
559 else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
561 if (sizeof (off_t) != sizeof (off64_t)
562 && aiocbp->aiocb.aio_lio_opcode & 128)
563 aiocbp->aiocb.__return_value =
564 TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
565 aiocbp->aiocb64.aio_buf,
566 aiocbp->aiocb64.aio_nbytes,
567 aiocbp->aiocb64.aio_offset));
568 else
569 aiocbp->aiocb.__return_value =
570 TEMP_FAILURE_RETRY (__pwrite (fildes, (const void *)
571 aiocbp->aiocb.aio_buf,
572 aiocbp->aiocb.aio_nbytes,
573 aiocbp->aiocb.aio_offset));
575 if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
576 /* The Linux kernel is different from others. It returns
577 ESPIPE if using pwrite on a socket. Other platforms
578 simply ignore the offset parameter and behave like
579 write. */
580 aiocbp->aiocb.__return_value =
581 TEMP_FAILURE_RETRY (write (fildes,
582 (void *) aiocbp->aiocb64.aio_buf,
583 aiocbp->aiocb64.aio_nbytes));
585 else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
586 aiocbp->aiocb.__return_value =
587 TEMP_FAILURE_RETRY (fdatasync (fildes));
588 else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
589 aiocbp->aiocb.__return_value =
590 TEMP_FAILURE_RETRY (fsync (fildes));
591 else
593 /* This is an invalid opcode. */
594 aiocbp->aiocb.__return_value = -1;
595 __set_errno (EINVAL);
598 /* Get the mutex. */
599 __pthread_mutex_lock (&__aio_requests_mutex);
601 if (aiocbp->aiocb.__return_value == -1)
602 aiocbp->aiocb.__error_code = errno;
603 else
604 aiocbp->aiocb.__error_code = 0;
606 /* Send the signal to notify about finished processing of the
607 request. */
608 __aio_notify (runp);
610 /* For debugging purposes we reset the running flag of the
611 finished request. */
612 assert (runp->running == allocated);
613 runp->running = done;
615 /* Now dequeue the current request. */
616 __aio_remove_request (NULL, runp, 0);
617 if (runp->next_prio != NULL)
618 add_request_to_runlist (runp->next_prio);
620 /* Free the old element. */
621 __aio_free_request (runp);
624 runp = runlist;
626 /* If the runlist is empty, then we sleep for a while, waiting for
627 something to arrive in it. */
628 if (runp == NULL && optim.aio_idle_time >= 0)
630 struct timespec now;
631 struct timespec wakeup_time;
633 ++idle_thread_count;
634 __clock_gettime (CLOCK_REALTIME, &now);
635 wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
636 wakeup_time.tv_nsec = now.tv_nsec;
637 if (wakeup_time.tv_nsec >= 1000000000)
639 wakeup_time.tv_nsec -= 1000000000;
640 ++wakeup_time.tv_sec;
642 __pthread_cond_timedwait (&__aio_new_request_notification,
643 &__aio_requests_mutex,
644 &wakeup_time);
645 --idle_thread_count;
646 runp = runlist;
649 if (runp == NULL)
650 --nthreads;
651 else
653 assert (runp->running == yes);
654 runp->running = allocated;
655 runlist = runp->next_run;
657 /* If we have a request to process, and there's still another in
658 the run list, then we need to either wake up or create a new
659 thread to service the request that is still in the run list. */
660 if (runlist != NULL)
662 /* There are at least two items in the work queue to work on.
663 If there are other idle threads, then we should wake them
664 up for these other work elements; otherwise, we should try
665 to create a new thread. */
666 if (idle_thread_count > 0)
667 __pthread_cond_signal (&__aio_new_request_notification);
668 else if (nthreads < optim.aio_threads)
670 pthread_t thid;
671 pthread_attr_t attr;
673 /* Make sure the thread is created detached. */
674 __pthread_attr_init (&attr);
675 __pthread_attr_setdetachstate (&attr,
676 PTHREAD_CREATE_DETACHED);
678 /* Now try to start a thread. If we fail, no big deal,
679 because we know that there is at least one thread (us)
680 that is working on AIO operations. */
681 if (__pthread_create (&thid, &attr, handle_fildes_io, NULL)
682 == 0)
683 ++nthreads;
688 /* Release the mutex. */
689 __pthread_mutex_unlock (&__aio_requests_mutex);
691 while (runp != NULL);
693 return NULL;
697 /* Free allocated resources. */
698 libc_freeres_fn (free_res)
700 size_t row;
702 for (row = 0; row < pool_max_size; ++row)
703 free (pool[row]);
705 free (pool);
709 /* Add newrequest to the runlist. The __abs_prio flag of newrequest must
710 be correctly set to do this. Also, you had better set newrequest's
711 "running" flag to "yes" before you release your lock or you'll throw an
712 assertion. */
713 static void
714 add_request_to_runlist (struct requestlist *newrequest)
716 int prio = newrequest->aiocbp->aiocb.__abs_prio;
717 struct requestlist *runp;
719 if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
721 newrequest->next_run = runlist;
722 runlist = newrequest;
724 else
726 runp = runlist;
728 while (runp->next_run != NULL
729 && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
730 runp = runp->next_run;
732 newrequest->next_run = runp->next_run;
733 runp->next_run = newrequest;
737 #if PTHREAD_IN_LIBC
738 versioned_symbol (libc, __aio_init, aio_init, GLIBC_2_34);
739 # if OTHER_SHLIB_COMPAT (librt, GLIBC_2_1, GLIBC_2_34)
740 compat_symbol (librt, __aio_init, aio_init, GLIBC_2_1);
741 # endif
742 #else /* !PTHREAD_IN_LIBC */
743 weak_alias (__aio_init, aio_init)
744 #endif /* !PTHREAD_IN_LIBC */