11 #include "pthread_impl.h"
14 #define malloc __libc_malloc
15 #define calloc __libc_calloc
16 #define realloc __libc_realloc
17 #define free __libc_free
19 /* The following is a threads-based implementation of AIO with minimal
20 * dependence on implementation details. Most synchronization is
21 * performed with pthread primitives, but atomics and futex operations
22 * are used for notification in a couple places where the pthread
23 * primitives would be inefficient or impractical.
25 * For each fd with outstanding aio operations, an aio_queue structure
26 * is maintained. These are reference-counted and destroyed by the last
27 * aio worker thread to exit. Accessing any member of the aio_queue
28 * structure requires a lock on the aio_queue. Adding and removing aio
29 * queues themselves requires a write lock on the global map object,
30 * a 4-level table mapping file descriptor numbers to aio queues. A
31 * read lock on the map is used to obtain locks on existing queues by
32 * excluding destruction of the queue by a different thread while it is
35 * Each aio queue has a list of active threads/operations. Presently there
36 * is a one to one relationship between threads and operations. The only
37 * members of the aio_thread structure which are accessed by other threads
38 * are the linked list pointers, op (which is immutable), running (which
39 * is updated atomically), and err (which is synchronized via running),
40 * so no locking is necessary. Most of the other other members are used
41 * for sharing data between the main flow of execution and cancellation
44 * Taking any aio locks requires having all signals blocked. This is
45 * necessary because aio_cancel is needed by close, and close is required
46 * to be async-signal safe. All aio worker threads run with all signals
47 * blocked permanently.
53 struct aio_thread
*next
, *prev
;
61 int fd
, seekable
, append
, ref
, init
;
64 struct aio_thread
*head
;
74 static pthread_rwlock_t maplock
= PTHREAD_RWLOCK_INITIALIZER
;
75 static struct aio_queue
*****map
;
76 static volatile int aio_fd_cnt
;
77 volatile int __aio_fut
;
79 static size_t io_thread_stack_size
;
81 #define MAX(a,b) ((a)>(b) ? (a) : (b))
83 static struct aio_queue
*__aio_get_queue(int fd
, int need
)
85 sigset_t allmask
, origmask
;
92 unsigned char b
=fd
>>16, c
=fd
>>8, d
=fd
;
93 struct aio_queue
*q
= 0;
94 pthread_rwlock_rdlock(&maplock
);
95 if ((!map
|| !map
[a
] || !map
[a
][b
] || !map
[a
][b
][c
] || !(q
=map
[a
][b
][c
][d
])) && need
) {
96 pthread_rwlock_unlock(&maplock
);
97 if (fcntl(fd
, F_GETFD
) < 0) return 0;
100 pthread_sigmask(SIG_BLOCK
, &allmask
, &origmask
);
101 pthread_rwlock_wrlock(&maplock
);
102 if (!io_thread_stack_size
) {
103 unsigned long val
= __getauxval(AT_MINSIGSTKSZ
);
104 io_thread_stack_size
= MAX(MINSIGSTKSZ
+2048, val
+512);
106 if (!map
) map
= calloc(sizeof *map
, (-1U/2+1)>>24);
108 if (!map
[a
]) map
[a
] = calloc(sizeof **map
, 256);
109 if (!map
[a
]) goto out
;
110 if (!map
[a
][b
]) map
[a
][b
] = calloc(sizeof ***map
, 256);
111 if (!map
[a
][b
]) goto out
;
112 if (!map
[a
][b
][c
]) map
[a
][b
][c
] = calloc(sizeof ****map
, 256);
113 if (!map
[a
][b
][c
]) goto out
;
114 if (!(q
= map
[a
][b
][c
][d
])) {
115 map
[a
][b
][c
][d
] = q
= calloc(sizeof *****map
, 1);
118 pthread_mutex_init(&q
->lock
, 0);
119 pthread_cond_init(&q
->cond
, 0);
124 if (q
) pthread_mutex_lock(&q
->lock
);
126 pthread_rwlock_unlock(&maplock
);
127 if (masked
) pthread_sigmask(SIG_SETMASK
, &origmask
, 0);
131 static void __aio_unref_queue(struct aio_queue
*q
)
135 pthread_mutex_unlock(&q
->lock
);
139 /* This is potentially the last reference, but a new reference
140 * may arrive since we cannot free the queue object without first
141 * taking the maplock, which requires releasing the queue lock. */
142 pthread_mutex_unlock(&q
->lock
);
143 pthread_rwlock_wrlock(&maplock
);
144 pthread_mutex_lock(&q
->lock
);
148 unsigned char b
=fd
>>16, c
=fd
>>8, d
=fd
;
151 pthread_rwlock_unlock(&maplock
);
152 pthread_mutex_unlock(&q
->lock
);
156 pthread_rwlock_unlock(&maplock
);
157 pthread_mutex_unlock(&q
->lock
);
161 static void cleanup(void *ctx
)
163 struct aio_thread
*at
= ctx
;
164 struct aio_queue
*q
= at
->q
;
165 struct aiocb
*cb
= at
->cb
;
166 struct sigevent sev
= cb
->aio_sigevent
;
168 /* There are four potential types of waiters we could need to wake:
169 * 1. Callers of aio_cancel/close.
170 * 2. Callers of aio_suspend with a single aiocb.
171 * 3. Callers of aio_suspend with a list.
172 * 4. AIO worker threads waiting for sequenced operations.
173 * Types 1-3 are notified via atomics/futexes, mainly for AS-safety
174 * considerations. Type 4 is notified later via a cond var. */
177 if (a_swap(&at
->running
, 0) < 0)
178 __wake(&at
->running
, -1, 1);
179 if (a_swap(&cb
->__err
, at
->err
) != EINPROGRESS
)
180 __wake(&cb
->__err
, -1, 1);
181 if (a_swap(&__aio_fut
, 0))
182 __wake(&__aio_fut
, -1, 1);
184 pthread_mutex_lock(&q
->lock
);
186 if (at
->next
) at
->next
->prev
= at
->prev
;
187 if (at
->prev
) at
->prev
->next
= at
->next
;
188 else q
->head
= at
->next
;
190 /* Signal aio worker threads waiting for sequenced operations. */
191 pthread_cond_broadcast(&q
->cond
);
193 __aio_unref_queue(q
);
195 if (sev
.sigev_notify
== SIGEV_SIGNAL
) {
197 .si_signo
= sev
.sigev_signo
,
198 .si_value
= sev
.sigev_value
,
199 .si_code
= SI_ASYNCIO
,
203 __syscall(SYS_rt_sigqueueinfo
, si
.si_pid
, si
.si_signo
, &si
);
205 if (sev
.sigev_notify
== SIGEV_THREAD
) {
206 a_store(&__pthread_self()->cancel
, 0);
207 sev
.sigev_notify_function(sev
.sigev_value
);
211 static void *io_thread_func(void *ctx
)
213 struct aio_thread at
, *p
;
215 struct aio_args
*args
= ctx
;
216 struct aiocb
*cb
= args
->cb
;
217 int fd
= cb
->aio_fildes
;
219 void *buf
= (void *)cb
->aio_buf
;
220 size_t len
= cb
->aio_nbytes
;
221 off_t off
= cb
->aio_offset
;
223 struct aio_queue
*q
= args
->q
;
226 pthread_mutex_lock(&q
->lock
);
227 sem_post(&args
->sem
);
234 at
.td
= __pthread_self();
237 if ((at
.next
= q
->head
)) at
.next
->prev
= &at
;
241 int seekable
= lseek(fd
, 0, SEEK_CUR
) >= 0;
242 q
->seekable
= seekable
;
243 q
->append
= !seekable
|| (fcntl(fd
, F_GETFL
) & O_APPEND
);
247 pthread_cleanup_push(cleanup
, &at
);
249 /* Wait for sequenced operations. */
250 if (op
!=LIO_READ
&& (op
!=LIO_WRITE
|| q
->append
)) {
252 for (p
=at
.next
; p
&& p
->op
!=LIO_WRITE
; p
=p
->next
);
254 pthread_cond_wait(&q
->cond
, &q
->lock
);
258 pthread_mutex_unlock(&q
->lock
);
262 ret
= q
->append
? write(fd
, buf
, len
) : pwrite(fd
, buf
, len
, off
);
265 ret
= !q
->seekable
? read(fd
, buf
, len
) : pread(fd
, buf
, len
, off
);
275 at
.err
= ret
<0 ? errno
: 0;
277 pthread_cleanup_pop(1);
282 static int submit(struct aiocb
*cb
, int op
)
286 sigset_t allmask
, origmask
;
288 struct aio_queue
*q
= __aio_get_queue(cb
->aio_fildes
, 1);
289 struct aio_args args
= { .cb
= cb
, .op
= op
, .q
= q
};
290 sem_init(&args
.sem
, 0, 0);
293 if (errno
!= EBADF
) errno
= EAGAIN
;
299 pthread_mutex_unlock(&q
->lock
);
301 if (cb
->aio_sigevent
.sigev_notify
== SIGEV_THREAD
) {
302 if (cb
->aio_sigevent
.sigev_notify_attributes
)
303 a
= *cb
->aio_sigevent
.sigev_notify_attributes
;
305 pthread_attr_init(&a
);
307 pthread_attr_init(&a
);
308 pthread_attr_setstacksize(&a
, io_thread_stack_size
);
309 pthread_attr_setguardsize(&a
, 0);
311 pthread_attr_setdetachstate(&a
, PTHREAD_CREATE_DETACHED
);
312 sigfillset(&allmask
);
313 pthread_sigmask(SIG_BLOCK
, &allmask
, &origmask
);
314 cb
->__err
= EINPROGRESS
;
315 if (pthread_create(&td
, &a
, io_thread_func
, &args
)) {
316 pthread_mutex_lock(&q
->lock
);
317 __aio_unref_queue(q
);
318 cb
->__err
= errno
= EAGAIN
;
319 cb
->__ret
= ret
= -1;
321 pthread_sigmask(SIG_SETMASK
, &origmask
, 0);
324 while (sem_wait(&args
.sem
));
330 int aio_read(struct aiocb
*cb
)
332 return submit(cb
, LIO_READ
);
335 int aio_write(struct aiocb
*cb
)
337 return submit(cb
, LIO_WRITE
);
340 int aio_fsync(int op
, struct aiocb
*cb
)
342 if (op
!= O_SYNC
&& op
!= O_DSYNC
) {
346 return submit(cb
, op
);
349 ssize_t
aio_return(struct aiocb
*cb
)
354 int aio_error(const struct aiocb
*cb
)
357 return cb
->__err
& 0x7fffffff;
360 int aio_cancel(int fd
, struct aiocb
*cb
)
362 sigset_t allmask
, origmask
;
363 int ret
= AIO_ALLDONE
;
364 struct aio_thread
*p
;
367 /* Unspecified behavior case. Report an error. */
368 if (cb
&& fd
!= cb
->aio_fildes
) {
373 sigfillset(&allmask
);
374 pthread_sigmask(SIG_BLOCK
, &allmask
, &origmask
);
377 if (!(q
= __aio_get_queue(fd
, 0))) {
378 if (errno
== EBADF
) ret
= -1;
382 for (p
= q
->head
; p
; p
= p
->next
) {
383 if (cb
&& cb
!= p
->cb
) continue;
384 /* Transition target from running to running-with-waiters */
385 if (a_cas(&p
->running
, 1, -1)) {
386 pthread_cancel(p
->td
);
387 __wait(&p
->running
, 0, -1, 1);
388 if (p
->err
== ECANCELED
) ret
= AIO_CANCELED
;
392 pthread_mutex_unlock(&q
->lock
);
394 pthread_sigmask(SIG_SETMASK
, &origmask
, 0);
398 int __aio_close(int fd
)
401 if (aio_fd_cnt
) aio_cancel(fd
, 0);
405 void __aio_atfork(int who
)
408 pthread_rwlock_rdlock(&maplock
);
411 pthread_rwlock_unlock(&maplock
);
415 if (pthread_rwlock_tryrdlock(&maplock
)) {
416 /* Obtaining lock may fail if _Fork was called nor via
417 * fork. In this case, no further aio is possible from
418 * child and we can just null out map so __aio_close
419 * does not attempt to do anything. */
423 if (map
) for (int a
=0; a
<(-1U/2+1)>>24; a
++)
424 if (map
[a
]) for (int b
=0; b
<256; b
++)
425 if (map
[a
][b
]) for (int c
=0; c
<256; c
++)
426 if (map
[a
][b
][c
]) for (int d
=0; d
<256; d
++)
428 /* Re-initialize the rwlock rather than unlocking since there
429 * may have been more than one reference on it in the parent.
430 * We are not a lock holder anyway; the thread in the parent was. */
431 pthread_rwlock_init(&maplock
, 0);