11 #include "pthread_impl.h"
13 /* The following is a threads-based implementation of AIO with minimal
14 * dependence on implementation details. Most synchronization is
15 * performed with pthread primitives, but atomics and futex operations
16 * are used for notification in a couple places where the pthread
17 * primitives would be inefficient or impractical.
19 * For each fd with outstanding aio operations, an aio_queue structure
20 * is maintained. These are reference-counted and destroyed by the last
21 * aio worker thread to exit. Accessing any member of the aio_queue
22 * structure requires a lock on the aio_queue. Adding and removing aio
23 * queues themselves requires a write lock on the global map object,
24 * a 4-level table mapping file descriptor numbers to aio queues. A
25 * read lock on the map is used to obtain locks on existing queues by
26 * excluding destruction of the queue by a different thread while it is
29 * Each aio queue has a list of active threads/operations. Presently there
30 * is a one to one relationship between threads and operations. The only
31 * members of the aio_thread structure which are accessed by other threads
32 * are the linked list pointers, op (which is immutable), running (which
33 * is updated atomically), and err (which is synchronized via running),
34 * so no locking is necessary. Most of the other other members are used
35 * for sharing data between the main flow of execution and cancellation
38 * Taking any aio locks requires having all signals blocked. This is
39 * necessary because aio_cancel is needed by close, and close is required
40 * to be async-signal safe. All aio worker threads run with all signals
41 * blocked permanently.
47 struct aio_thread
*next
, *prev
;
55 int fd
, seekable
, append
, ref
, init
;
58 struct aio_thread
*head
;
68 static pthread_rwlock_t maplock
= PTHREAD_RWLOCK_INITIALIZER
;
69 static struct aio_queue
*****map
;
70 static volatile int aio_fd_cnt
;
71 volatile int __aio_fut
;
73 static struct aio_queue
*__aio_get_queue(int fd
, int need
)
80 unsigned char b
=fd
>>16, c
=fd
>>8, d
=fd
;
81 struct aio_queue
*q
= 0;
82 pthread_rwlock_rdlock(&maplock
);
83 if ((!map
|| !map
[a
] || !map
[a
][b
] || !map
[a
][b
][c
] || !(q
=map
[a
][b
][c
][d
])) && need
) {
84 pthread_rwlock_unlock(&maplock
);
85 if (fcntl(fd
, F_GETFD
) < 0) return 0;
86 pthread_rwlock_wrlock(&maplock
);
87 if (!map
) map
= calloc(sizeof *map
, (-1U/2+1)>>24);
89 if (!map
[a
]) map
[a
] = calloc(sizeof **map
, 256);
90 if (!map
[a
]) goto out
;
91 if (!map
[a
][b
]) map
[a
][b
] = calloc(sizeof ***map
, 256);
92 if (!map
[a
][b
]) goto out
;
93 if (!map
[a
][b
][c
]) map
[a
][b
][c
] = calloc(sizeof ****map
, 256);
94 if (!map
[a
][b
][c
]) goto out
;
95 if (!(q
= map
[a
][b
][c
][d
])) {
96 map
[a
][b
][c
][d
] = q
= calloc(sizeof *****map
, 1);
99 pthread_mutex_init(&q
->lock
, 0);
100 pthread_cond_init(&q
->cond
, 0);
105 if (q
) pthread_mutex_lock(&q
->lock
);
107 pthread_rwlock_unlock(&maplock
);
111 static void __aio_unref_queue(struct aio_queue
*q
)
115 pthread_mutex_unlock(&q
->lock
);
119 /* This is potentially the last reference, but a new reference
120 * may arrive since we cannot free the queue object without first
121 * taking the maplock, which requires releasing the queue lock. */
122 pthread_mutex_unlock(&q
->lock
);
123 pthread_rwlock_wrlock(&maplock
);
124 pthread_mutex_lock(&q
->lock
);
128 unsigned char b
=fd
>>16, c
=fd
>>8, d
=fd
;
131 pthread_rwlock_unlock(&maplock
);
132 pthread_mutex_unlock(&q
->lock
);
136 pthread_rwlock_unlock(&maplock
);
137 pthread_mutex_unlock(&q
->lock
);
141 static void cleanup(void *ctx
)
143 struct aio_thread
*at
= ctx
;
144 struct aio_queue
*q
= at
->q
;
145 struct aiocb
*cb
= at
->cb
;
146 struct sigevent sev
= cb
->aio_sigevent
;
148 /* There are four potential types of waiters we could need to wake:
149 * 1. Callers of aio_cancel/close.
150 * 2. Callers of aio_suspend with a single aiocb.
151 * 3. Callers of aio_suspend with a list.
152 * 4. AIO worker threads waiting for sequenced operations.
153 * Types 1-3 are notified via atomics/futexes, mainly for AS-safety
154 * considerations. Type 4 is notified later via a cond var. */
157 if (a_swap(&at
->running
, 0) < 0)
158 __wake(&at
->running
, -1, 1);
159 if (a_swap(&cb
->__err
, at
->err
) != EINPROGRESS
)
160 __wake(&cb
->__err
, -1, 1);
161 if (a_swap(&__aio_fut
, 0))
162 __wake(&__aio_fut
, -1, 1);
164 pthread_mutex_lock(&q
->lock
);
166 if (at
->next
) at
->next
->prev
= at
->prev
;
167 if (at
->prev
) at
->prev
->next
= at
->next
;
168 else q
->head
= at
->next
;
170 /* Signal aio worker threads waiting for sequenced operations. */
171 pthread_cond_broadcast(&q
->cond
);
173 __aio_unref_queue(q
);
175 if (sev
.sigev_notify
== SIGEV_SIGNAL
) {
177 .si_signo
= sev
.sigev_signo
,
178 .si_value
= sev
.sigev_value
,
179 .si_code
= SI_ASYNCIO
,
183 __syscall(SYS_rt_sigqueueinfo
, si
.si_pid
, si
.si_signo
, &si
);
185 if (sev
.sigev_notify
== SIGEV_THREAD
) {
186 a_store(&__pthread_self()->cancel
, 0);
187 sev
.sigev_notify_function(sev
.sigev_value
);
191 static void *io_thread_func(void *ctx
)
193 struct aio_thread at
, *p
;
195 struct aio_args
*args
= ctx
;
196 struct aiocb
*cb
= args
->cb
;
197 int fd
= cb
->aio_fildes
;
199 void *buf
= (void *)cb
->aio_buf
;
200 size_t len
= cb
->aio_nbytes
;
201 off_t off
= cb
->aio_offset
;
203 struct aio_queue
*q
= args
->q
;
206 pthread_mutex_lock(&q
->lock
);
207 sem_post(&args
->sem
);
214 at
.td
= __pthread_self();
217 if ((at
.next
= q
->head
)) at
.next
->prev
= &at
;
221 int seekable
= lseek(fd
, 0, SEEK_CUR
) >= 0;
222 q
->seekable
= seekable
;
223 q
->append
= !seekable
|| (fcntl(fd
, F_GETFL
) & O_APPEND
);
227 pthread_cleanup_push(cleanup
, &at
);
229 /* Wait for sequenced operations. */
230 if (op
!=LIO_READ
&& (op
!=LIO_WRITE
|| q
->append
)) {
232 for (p
=at
.next
; p
&& p
->op
!=LIO_WRITE
; p
=p
->next
);
234 pthread_cond_wait(&q
->cond
, &q
->lock
);
238 pthread_mutex_unlock(&q
->lock
);
242 ret
= q
->append
? write(fd
, buf
, len
) : pwrite(fd
, buf
, len
, off
);
245 ret
= !q
->seekable
? read(fd
, buf
, len
) : pread(fd
, buf
, len
, off
);
255 at
.err
= ret
<0 ? errno
: 0;
257 pthread_cleanup_pop(1);
262 static size_t io_thread_stack_size
= MINSIGSTKSZ
+2048;
263 static pthread_once_t init_stack_size_once
;
265 static void init_stack_size()
267 unsigned long val
= __getauxval(AT_MINSIGSTKSZ
);
268 if (val
> MINSIGSTKSZ
) io_thread_stack_size
= val
+ 512;
271 static int submit(struct aiocb
*cb
, int op
)
275 sigset_t allmask
, origmask
;
277 struct aio_queue
*q
= __aio_get_queue(cb
->aio_fildes
, 1);
278 struct aio_args args
= { .cb
= cb
, .op
= op
, .q
= q
};
279 sem_init(&args
.sem
, 0, 0);
282 if (errno
!= EBADF
) errno
= EAGAIN
;
288 pthread_mutex_unlock(&q
->lock
);
290 if (cb
->aio_sigevent
.sigev_notify
== SIGEV_THREAD
) {
291 if (cb
->aio_sigevent
.sigev_notify_attributes
)
292 a
= *cb
->aio_sigevent
.sigev_notify_attributes
;
294 pthread_attr_init(&a
);
296 pthread_once(&init_stack_size_once
, init_stack_size
);
297 pthread_attr_init(&a
);
298 pthread_attr_setstacksize(&a
, io_thread_stack_size
);
299 pthread_attr_setguardsize(&a
, 0);
301 pthread_attr_setdetachstate(&a
, PTHREAD_CREATE_DETACHED
);
302 sigfillset(&allmask
);
303 pthread_sigmask(SIG_BLOCK
, &allmask
, &origmask
);
304 cb
->__err
= EINPROGRESS
;
305 if (pthread_create(&td
, &a
, io_thread_func
, &args
)) {
306 pthread_mutex_lock(&q
->lock
);
307 __aio_unref_queue(q
);
308 cb
->__err
= errno
= EAGAIN
;
309 cb
->__ret
= ret
= -1;
311 pthread_sigmask(SIG_SETMASK
, &origmask
, 0);
314 while (sem_wait(&args
.sem
));
320 int aio_read(struct aiocb
*cb
)
322 return submit(cb
, LIO_READ
);
325 int aio_write(struct aiocb
*cb
)
327 return submit(cb
, LIO_WRITE
);
330 int aio_fsync(int op
, struct aiocb
*cb
)
332 if (op
!= O_SYNC
&& op
!= O_DSYNC
) {
336 return submit(cb
, op
);
339 ssize_t
aio_return(struct aiocb
*cb
)
344 int aio_error(const struct aiocb
*cb
)
347 return cb
->__err
& 0x7fffffff;
350 int aio_cancel(int fd
, struct aiocb
*cb
)
352 sigset_t allmask
, origmask
;
353 int ret
= AIO_ALLDONE
;
354 struct aio_thread
*p
;
357 /* Unspecified behavior case. Report an error. */
358 if (cb
&& fd
!= cb
->aio_fildes
) {
363 sigfillset(&allmask
);
364 pthread_sigmask(SIG_BLOCK
, &allmask
, &origmask
);
367 if (!(q
= __aio_get_queue(fd
, 0))) {
368 if (errno
== EBADF
) ret
= -1;
372 for (p
= q
->head
; p
; p
= p
->next
) {
373 if (cb
&& cb
!= p
->cb
) continue;
374 /* Transition target from running to running-with-waiters */
375 if (a_cas(&p
->running
, 1, -1)) {
376 pthread_cancel(p
->td
);
377 __wait(&p
->running
, 0, -1, 1);
378 if (p
->err
== ECANCELED
) ret
= AIO_CANCELED
;
382 pthread_mutex_unlock(&q
->lock
);
384 pthread_sigmask(SIG_SETMASK
, &origmask
, 0);
388 int __aio_close(int fd
)
391 if (aio_fd_cnt
) aio_cancel(fd
, 0);
395 weak_alias(aio_cancel
, aio_cancel64
);
396 weak_alias(aio_error
, aio_error64
);
397 weak_alias(aio_fsync
, aio_fsync64
);
398 weak_alias(aio_read
, aio_read64
);
399 weak_alias(aio_write
, aio_write64
);
400 weak_alias(aio_return
, aio_return64
);