1 /* Handle general operations.
2 Copyright (C) 1997-2021 Free Software Foundation, Inc.
3 This file is part of the GNU C Library.
4 Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
6 The GNU C Library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2.1 of the License, or (at your option) any later version.
11 The GNU C Library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with the GNU C Library; if not, see
18 <https://www.gnu.org/licenses/>. */
27 #include <sys/param.h>
33 /* The available function names differ outside of libc. (In libc, we
34 need to use hidden aliases to avoid the PLT.) */
35 # define __pread __libc_pread
36 # define __pthread_attr_destroy pthread_attr_destroy
37 # define __pthread_attr_init pthread_attr_init
38 # define __pthread_attr_setdetachstate pthread_attr_setdetachstate
39 # define __pthread_cond_signal pthread_cond_signal
40 # define __pthread_cond_timedwait pthread_cond_timedwait
41 # define __pthread_getschedparam pthread_getschedparam
42 # define __pthread_setschedparam pthread_setschedparam
43 # define __pwrite __libc_pwrite
46 #ifndef aio_create_helper_thread
47 # define aio_create_helper_thread __aio_create_helper_thread
50 __aio_create_helper_thread (pthread_t
*threadp
, void *(*tf
) (void *), void *arg
)
54 /* Make sure the thread is created detached. */
55 __pthread_attr_init (&attr
);
56 __pthread_attr_setdetachstate (&attr
, PTHREAD_CREATE_DETACHED
);
58 int ret
= __pthread_create (threadp
, &attr
, tf
, arg
);
60 __pthread_attr_destroy (&attr
);
65 static void add_request_to_runlist (struct requestlist
*newrequest
);
67 /* Pool of request list entries. */
68 static struct requestlist
**pool
;
70 /* Number of total and allocated pool entries. */
71 static size_t pool_max_size
;
72 static size_t pool_size
;
74 /* We implement a two dimensional array but allocate each row separately.
75 The macro below determines how many entries should be used per row.
76 It should better be a power of two. */
77 #define ENTRIES_PER_ROW 32
79 /* How many rows we allocate at once. */
82 /* List of available entries. */
83 static struct requestlist
*freelist
;
85 /* List of request waiting to be processed. */
86 static struct requestlist
*runlist
;
88 /* Structure list of all currently processed requests. */
89 static struct requestlist
*requests
;
91 /* Number of threads currently running. */
94 /* Number of threads waiting for work to arrive. */
95 static int idle_thread_count
;
98 /* These are the values used to optimize the use of AIO. The user can
99 overwrite them by using the `aio_init' function. */
100 static struct aioinit optim
=
102 20, /* int aio_threads; Maximal number of threads. */
103 64, /* int aio_num; Number of expected simultaneous requests. */
113 /* Since the list is global we need a mutex protecting it. */
114 pthread_mutex_t __aio_requests_mutex
= PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
;
116 /* When you add a request to the list and there are idle threads present,
117 you signal this condition variable. When a thread finishes work, it waits
118 on this condition variable for a time before it actually exits. */
119 pthread_cond_t __aio_new_request_notification
= PTHREAD_COND_INITIALIZER
;
122 /* Functions to handle request list pool. */
123 static struct requestlist
*
126 struct requestlist
*result
;
128 if (freelist
== NULL
)
130 struct requestlist
*new_row
;
133 assert (sizeof (struct aiocb
) == sizeof (struct aiocb64
));
135 if (pool_size
+ 1 >= pool_max_size
)
137 size_t new_max_size
= pool_max_size
+ ROWS_STEP
;
138 struct requestlist
**new_tab
;
140 new_tab
= (struct requestlist
**)
141 realloc (pool
, new_max_size
* sizeof (struct requestlist
*));
146 pool_max_size
= new_max_size
;
150 /* Allocate the new row. */
151 cnt
= pool_size
== 0 ? optim
.aio_num
: ENTRIES_PER_ROW
;
152 new_row
= (struct requestlist
*) calloc (cnt
,
153 sizeof (struct requestlist
));
157 pool
[pool_size
++] = new_row
;
159 /* Put all the new entries in the freelist. */
162 new_row
->next_prio
= freelist
;
163 freelist
= new_row
++;
169 freelist
= freelist
->next_prio
;
176 __aio_free_request (struct requestlist
*elem
)
179 elem
->next_prio
= freelist
;
185 __aio_find_req (aiocb_union
*elem
)
187 struct requestlist
*runp
= requests
;
188 int fildes
= elem
->aiocb
.aio_fildes
;
190 while (runp
!= NULL
&& runp
->aiocbp
->aiocb
.aio_fildes
< fildes
)
191 runp
= runp
->next_fd
;
195 if (runp
->aiocbp
->aiocb
.aio_fildes
!= fildes
)
198 while (runp
!= NULL
&& runp
->aiocbp
!= elem
)
199 runp
= runp
->next_prio
;
207 __aio_find_req_fd (int fildes
)
209 struct requestlist
*runp
= requests
;
211 while (runp
!= NULL
&& runp
->aiocbp
->aiocb
.aio_fildes
< fildes
)
212 runp
= runp
->next_fd
;
214 return (runp
!= NULL
&& runp
->aiocbp
->aiocb
.aio_fildes
== fildes
220 __aio_remove_request (struct requestlist
*last
, struct requestlist
*req
,
223 assert (req
->running
== yes
|| req
->running
== queued
224 || req
->running
== done
);
227 last
->next_prio
= all
? NULL
: req
->next_prio
;
230 if (all
|| req
->next_prio
== NULL
)
232 if (req
->last_fd
!= NULL
)
233 req
->last_fd
->next_fd
= req
->next_fd
;
235 requests
= req
->next_fd
;
236 if (req
->next_fd
!= NULL
)
237 req
->next_fd
->last_fd
= req
->last_fd
;
241 if (req
->last_fd
!= NULL
)
242 req
->last_fd
->next_fd
= req
->next_prio
;
244 requests
= req
->next_prio
;
246 if (req
->next_fd
!= NULL
)
247 req
->next_fd
->last_fd
= req
->next_prio
;
249 req
->next_prio
->last_fd
= req
->last_fd
;
250 req
->next_prio
->next_fd
= req
->next_fd
;
252 /* Mark this entry as runnable. */
253 req
->next_prio
->running
= yes
;
256 if (req
->running
== yes
)
258 struct requestlist
*runp
= runlist
;
266 runlist
= runp
->next_run
;
268 last
->next_run
= runp
->next_run
;
272 runp
= runp
->next_run
;
279 /* The thread handler. */
280 static void *handle_fildes_io (void *arg
);
283 /* User optimization. */
285 __aio_init (const struct aioinit
*init
)
288 __pthread_mutex_lock (&__aio_requests_mutex
);
290 /* Only allow writing new values if the table is not yet allocated. */
293 optim
.aio_threads
= init
->aio_threads
< 1 ? 1 : init
->aio_threads
;
294 assert (powerof2 (ENTRIES_PER_ROW
));
295 optim
.aio_num
= (init
->aio_num
< ENTRIES_PER_ROW
297 : init
->aio_num
& ~(ENTRIES_PER_ROW
- 1));
300 if (init
->aio_idle_time
!= 0)
301 optim
.aio_idle_time
= init
->aio_idle_time
;
303 /* Release the mutex. */
304 __pthread_mutex_unlock (&__aio_requests_mutex
);
308 /* The main function of the async I/O handling. It enqueues requests
309 and if necessary starts and handles threads. */
311 __aio_enqueue_request (aiocb_union
*aiocbp
, int operation
)
315 struct sched_param param
;
316 struct requestlist
*last
, *runp
, *newp
;
319 if (operation
== LIO_SYNC
|| operation
== LIO_DSYNC
)
320 aiocbp
->aiocb
.aio_reqprio
= 0;
321 else if (aiocbp
->aiocb
.aio_reqprio
< 0
322 #ifdef AIO_PRIO_DELTA_MAX
323 || aiocbp
->aiocb
.aio_reqprio
> AIO_PRIO_DELTA_MAX
327 /* Invalid priority value. */
328 __set_errno (EINVAL
);
329 aiocbp
->aiocb
.__error_code
= EINVAL
;
330 aiocbp
->aiocb
.__return_value
= -1;
334 /* Compute priority for this request. */
335 __pthread_getschedparam (__pthread_self (), &policy
, ¶m
);
336 prio
= param
.sched_priority
- aiocbp
->aiocb
.aio_reqprio
;
339 __pthread_mutex_lock (&__aio_requests_mutex
);
343 /* First look whether the current file descriptor is currently
346 && runp
->aiocbp
->aiocb
.aio_fildes
< aiocbp
->aiocb
.aio_fildes
)
349 runp
= runp
->next_fd
;
352 /* Get a new element for the waiting list. */
356 __pthread_mutex_unlock (&__aio_requests_mutex
);
357 __set_errno (EAGAIN
);
360 newp
->aiocbp
= aiocbp
;
361 newp
->waiting
= NULL
;
363 aiocbp
->aiocb
.__abs_prio
= prio
;
364 aiocbp
->aiocb
.__policy
= policy
;
365 aiocbp
->aiocb
.aio_lio_opcode
= operation
;
366 aiocbp
->aiocb
.__error_code
= EINPROGRESS
;
367 aiocbp
->aiocb
.__return_value
= 0;
370 && runp
->aiocbp
->aiocb
.aio_fildes
== aiocbp
->aiocb
.aio_fildes
)
372 /* The current file descriptor is worked on. It makes no sense
373 to start another thread since this new thread would fight
374 with the running thread for the resources. But we also cannot
375 say that the thread processing this desriptor shall immediately
376 after finishing the current job process this request if there
377 are other threads in the running queue which have a higher
380 /* Simply enqueue it after the running one according to the
383 while (runp
->next_prio
!= NULL
384 && runp
->next_prio
->aiocbp
->aiocb
.__abs_prio
>= prio
)
387 runp
= runp
->next_prio
;
390 newp
->next_prio
= runp
->next_prio
;
391 runp
->next_prio
= newp
;
398 /* Enqueue this request for a new descriptor. */
401 newp
->last_fd
= NULL
;
402 newp
->next_fd
= requests
;
403 if (requests
!= NULL
)
404 requests
->last_fd
= newp
;
409 newp
->next_fd
= last
->next_fd
;
410 newp
->last_fd
= last
;
411 last
->next_fd
= newp
;
412 if (newp
->next_fd
!= NULL
)
413 newp
->next_fd
->last_fd
= newp
;
416 newp
->next_prio
= NULL
;
422 /* We try to create a new thread for this file descriptor. The
423 function which gets called will handle all available requests
424 for this descriptor and when all are processed it will
427 If no new thread can be created or if the specified limit of
428 threads for AIO is reached we queue the request. */
430 /* See if we need to and are able to create a thread. */
431 if (nthreads
< optim
.aio_threads
&& idle_thread_count
== 0)
435 running
= newp
->running
= allocated
;
437 /* Now try to start a thread. */
438 result
= aio_create_helper_thread (&thid
, handle_fildes_io
, newp
);
440 /* We managed to enqueue the request. All errors which can
441 happen now can be recognized by calls to `aio_return' and
446 /* Reset the running flag. The new request is not running. */
447 running
= newp
->running
= yes
;
451 /* We cannot create a thread in the moment and there is
452 also no thread running. This is a problem. `errno' is
453 set to EAGAIN if this is only a temporary problem. */
454 __aio_remove_request (last
, newp
, 0);
462 /* Enqueue the request in the run queue if it is not yet running. */
463 if (running
== yes
&& result
== 0)
465 add_request_to_runlist (newp
);
467 /* If there is a thread waiting for work, then let it know that we
468 have just given it something to do. */
469 if (idle_thread_count
> 0)
470 __pthread_cond_signal (&__aio_new_request_notification
);
474 newp
->running
= running
;
477 /* Something went wrong. */
478 __aio_free_request (newp
);
479 aiocbp
->aiocb
.__error_code
= result
;
480 __set_errno (result
);
484 /* Release the mutex. */
485 __pthread_mutex_unlock (&__aio_requests_mutex
);
492 handle_fildes_io (void *arg
)
494 pthread_t self
= __pthread_self ();
495 struct sched_param param
;
496 struct requestlist
*runp
= (struct requestlist
*) arg
;
501 __pthread_getschedparam (self
, &policy
, ¶m
);
505 /* If runp is NULL, then we were created to service the work queue
506 in general, not to handle any particular request. In that case we
507 skip the "do work" stuff on the first pass, and go directly to the
508 "get work off the work queue" part of this loop, which is near the
511 __pthread_mutex_lock (&__aio_requests_mutex
);
514 /* Hopefully this request is marked as running. */
515 assert (runp
->running
== allocated
);
517 /* Update our variables. */
518 aiocbp
= runp
->aiocbp
;
519 fildes
= aiocbp
->aiocb
.aio_fildes
;
521 /* Change the priority to the requested value (if necessary). */
522 if (aiocbp
->aiocb
.__abs_prio
!= param
.sched_priority
523 || aiocbp
->aiocb
.__policy
!= policy
)
525 param
.sched_priority
= aiocbp
->aiocb
.__abs_prio
;
526 policy
= aiocbp
->aiocb
.__policy
;
527 __pthread_setschedparam (self
, policy
, ¶m
);
530 /* Process request pointed to by RUNP. We must not be disturbed
532 if ((aiocbp
->aiocb
.aio_lio_opcode
& 127) == LIO_READ
)
534 if (sizeof (off_t
) != sizeof (off64_t
)
535 && aiocbp
->aiocb
.aio_lio_opcode
& 128)
536 aiocbp
->aiocb
.__return_value
=
537 TEMP_FAILURE_RETRY (__pread64 (fildes
, (void *)
538 aiocbp
->aiocb64
.aio_buf
,
539 aiocbp
->aiocb64
.aio_nbytes
,
540 aiocbp
->aiocb64
.aio_offset
));
542 aiocbp
->aiocb
.__return_value
=
543 TEMP_FAILURE_RETRY (__pread (fildes
,
545 aiocbp
->aiocb
.aio_buf
,
546 aiocbp
->aiocb
.aio_nbytes
,
547 aiocbp
->aiocb
.aio_offset
));
549 if (aiocbp
->aiocb
.__return_value
== -1 && errno
== ESPIPE
)
550 /* The Linux kernel is different from others. It returns
551 ESPIPE if using pread on a socket. Other platforms
552 simply ignore the offset parameter and behave like
554 aiocbp
->aiocb
.__return_value
=
555 TEMP_FAILURE_RETRY (read (fildes
,
556 (void *) aiocbp
->aiocb64
.aio_buf
,
557 aiocbp
->aiocb64
.aio_nbytes
));
559 else if ((aiocbp
->aiocb
.aio_lio_opcode
& 127) == LIO_WRITE
)
561 if (sizeof (off_t
) != sizeof (off64_t
)
562 && aiocbp
->aiocb
.aio_lio_opcode
& 128)
563 aiocbp
->aiocb
.__return_value
=
564 TEMP_FAILURE_RETRY (__pwrite64 (fildes
, (const void *)
565 aiocbp
->aiocb64
.aio_buf
,
566 aiocbp
->aiocb64
.aio_nbytes
,
567 aiocbp
->aiocb64
.aio_offset
));
569 aiocbp
->aiocb
.__return_value
=
570 TEMP_FAILURE_RETRY (__pwrite (fildes
, (const void *)
571 aiocbp
->aiocb
.aio_buf
,
572 aiocbp
->aiocb
.aio_nbytes
,
573 aiocbp
->aiocb
.aio_offset
));
575 if (aiocbp
->aiocb
.__return_value
== -1 && errno
== ESPIPE
)
576 /* The Linux kernel is different from others. It returns
577 ESPIPE if using pwrite on a socket. Other platforms
578 simply ignore the offset parameter and behave like
580 aiocbp
->aiocb
.__return_value
=
581 TEMP_FAILURE_RETRY (write (fildes
,
582 (void *) aiocbp
->aiocb64
.aio_buf
,
583 aiocbp
->aiocb64
.aio_nbytes
));
585 else if (aiocbp
->aiocb
.aio_lio_opcode
== LIO_DSYNC
)
586 aiocbp
->aiocb
.__return_value
=
587 TEMP_FAILURE_RETRY (fdatasync (fildes
));
588 else if (aiocbp
->aiocb
.aio_lio_opcode
== LIO_SYNC
)
589 aiocbp
->aiocb
.__return_value
=
590 TEMP_FAILURE_RETRY (fsync (fildes
));
593 /* This is an invalid opcode. */
594 aiocbp
->aiocb
.__return_value
= -1;
595 __set_errno (EINVAL
);
599 __pthread_mutex_lock (&__aio_requests_mutex
);
601 if (aiocbp
->aiocb
.__return_value
== -1)
602 aiocbp
->aiocb
.__error_code
= errno
;
604 aiocbp
->aiocb
.__error_code
= 0;
606 /* Send the signal to notify about finished processing of the
610 /* For debugging purposes we reset the running flag of the
612 assert (runp
->running
== allocated
);
613 runp
->running
= done
;
615 /* Now dequeue the current request. */
616 __aio_remove_request (NULL
, runp
, 0);
617 if (runp
->next_prio
!= NULL
)
618 add_request_to_runlist (runp
->next_prio
);
620 /* Free the old element. */
621 __aio_free_request (runp
);
626 /* If the runlist is empty, then we sleep for a while, waiting for
627 something to arrive in it. */
628 if (runp
== NULL
&& optim
.aio_idle_time
>= 0)
631 struct timespec wakeup_time
;
634 __clock_gettime (CLOCK_REALTIME
, &now
);
635 wakeup_time
.tv_sec
= now
.tv_sec
+ optim
.aio_idle_time
;
636 wakeup_time
.tv_nsec
= now
.tv_nsec
;
637 if (wakeup_time
.tv_nsec
>= 1000000000)
639 wakeup_time
.tv_nsec
-= 1000000000;
640 ++wakeup_time
.tv_sec
;
642 __pthread_cond_timedwait (&__aio_new_request_notification
,
643 &__aio_requests_mutex
,
653 assert (runp
->running
== yes
);
654 runp
->running
= allocated
;
655 runlist
= runp
->next_run
;
657 /* If we have a request to process, and there's still another in
658 the run list, then we need to either wake up or create a new
659 thread to service the request that is still in the run list. */
662 /* There are at least two items in the work queue to work on.
663 If there are other idle threads, then we should wake them
664 up for these other work elements; otherwise, we should try
665 to create a new thread. */
666 if (idle_thread_count
> 0)
667 __pthread_cond_signal (&__aio_new_request_notification
);
668 else if (nthreads
< optim
.aio_threads
)
673 /* Make sure the thread is created detached. */
674 __pthread_attr_init (&attr
);
675 __pthread_attr_setdetachstate (&attr
,
676 PTHREAD_CREATE_DETACHED
);
678 /* Now try to start a thread. If we fail, no big deal,
679 because we know that there is at least one thread (us)
680 that is working on AIO operations. */
681 if (__pthread_create (&thid
, &attr
, handle_fildes_io
, NULL
)
688 /* Release the mutex. */
689 __pthread_mutex_unlock (&__aio_requests_mutex
);
691 while (runp
!= NULL
);
697 /* Free allocated resources. */
698 libc_freeres_fn (free_res
)
702 for (row
= 0; row
< pool_max_size
; ++row
)
709 /* Add newrequest to the runlist. The __abs_prio flag of newrequest must
710 be correctly set to do this. Also, you had better set newrequest's
711 "running" flag to "yes" before you release your lock or you'll throw an
714 add_request_to_runlist (struct requestlist
*newrequest
)
716 int prio
= newrequest
->aiocbp
->aiocb
.__abs_prio
;
717 struct requestlist
*runp
;
719 if (runlist
== NULL
|| runlist
->aiocbp
->aiocb
.__abs_prio
< prio
)
721 newrequest
->next_run
= runlist
;
722 runlist
= newrequest
;
728 while (runp
->next_run
!= NULL
729 && runp
->next_run
->aiocbp
->aiocb
.__abs_prio
>= prio
)
730 runp
= runp
->next_run
;
732 newrequest
->next_run
= runp
->next_run
;
733 runp
->next_run
= newrequest
;
738 versioned_symbol (libc
, __aio_init
, aio_init
, GLIBC_2_34
);
739 # if OTHER_SHLIB_COMPAT (librt, GLIBC_2_1, GLIBC_2_34)
740 compat_symbol (librt
, __aio_init
, aio_init
, GLIBC_2_1
);
742 #else /* !PTHREAD_IN_LIBC */
743 weak_alias (__aio_init
, aio_init
)
744 #endif /* !PTHREAD_IN_LIBC */