11 #include "pthread_impl.h"
13 /* The following is a threads-based implementation of AIO with minimal
14 * dependence on implementation details. Most synchronization is
15 * performed with pthread primitives, but atomics and futex operations
16 * are used for notification in a couple places where the pthread
17 * primitives would be inefficient or impractical.
19 * For each fd with outstanding aio operations, an aio_queue structure
20 * is maintained. These are reference-counted and destroyed by the last
21 * aio worker thread to exit. Accessing any member of the aio_queue
22 * structure requires a lock on the aio_queue. Adding and removing aio
23 * queues themselves requires a write lock on the global map object,
24 * a 4-level table mapping file descriptor numbers to aio queues. A
25 * read lock on the map is used to obtain locks on existing queues by
26 * excluding destruction of the queue by a different thread while it is
29 * Each aio queue has a list of active threads/operations. Presently there
30 * is a one to one relationship between threads and operations. The only
31 * members of the aio_thread structure which are accessed by other threads
32 * are the linked list pointers, op (which is immutable), running (which
33 * is updated atomically), and err (which is synchronized via running),
34 * so no locking is necessary. Most of the other other members are used
35 * for sharing data between the main flow of execution and cancellation
38 * Taking any aio locks requires having all signals blocked. This is
39 * necessary because aio_cancel is needed by close, and close is required
40 * to be async-signal safe. All aio worker threads run with all signals
41 * blocked permanently.
54 struct aio_thread
*next
, *prev
;
62 int fd
, seekable
, append
, ref
, init
;
65 struct aio_thread
*head
;
68 static pthread_rwlock_t maplock
= PTHREAD_RWLOCK_INITIALIZER
;
69 static struct aio_queue
*****map
;
70 static volatile int aio_fd_cnt
;
71 volatile int __aio_fut
;
73 static struct aio_queue
*__aio_get_queue(int fd
, int need
)
77 unsigned char b
=fd
>>16, c
=fd
>>8, d
=fd
;
78 struct aio_queue
*q
= 0;
79 pthread_rwlock_rdlock(&maplock
);
80 if ((!map
|| !map
[a
] || !map
[a
][b
] || !map
[a
][b
][c
] || !(q
=map
[a
][b
][c
][d
])) && need
) {
81 pthread_rwlock_unlock(&maplock
);
82 pthread_rwlock_wrlock(&maplock
);
83 if (!map
) map
= calloc(sizeof *map
, (-1U/2+1)>>24);
85 if (!map
[a
]) map
[a
] = calloc(sizeof **map
, 256);
86 if (!map
[a
]) goto out
;
87 if (!map
[a
][b
]) map
[a
][b
] = calloc(sizeof ***map
, 256);
88 if (!map
[a
][b
]) goto out
;
89 if (!map
[a
][b
][c
]) map
[a
][b
][c
] = calloc(sizeof ****map
, 256);
90 if (!map
[a
][b
][c
]) goto out
;
91 if (!(q
= map
[a
][b
][c
][d
])) {
92 map
[a
][b
][c
][d
] = q
= calloc(sizeof *****map
, 1);
95 pthread_mutex_init(&q
->lock
, 0);
96 pthread_cond_init(&q
->cond
, 0);
101 if (q
) pthread_mutex_lock(&q
->lock
);
103 pthread_rwlock_unlock(&maplock
);
107 static void __aio_unref_queue(struct aio_queue
*q
)
111 pthread_mutex_unlock(&q
->lock
);
115 /* This is potentially the last reference, but a new reference
116 * may arrive since we cannot free the queue object without first
117 * taking the maplock, which requires releasing the queue lock. */
118 pthread_mutex_unlock(&q
->lock
);
119 pthread_rwlock_wrlock(&maplock
);
120 pthread_mutex_lock(&q
->lock
);
124 unsigned char b
=fd
>>16, c
=fd
>>8, d
=fd
;
127 pthread_rwlock_unlock(&maplock
);
128 pthread_mutex_unlock(&q
->lock
);
132 pthread_rwlock_unlock(&maplock
);
133 pthread_mutex_unlock(&q
->lock
);
137 static void cleanup(void *ctx
)
139 struct aio_thread
*at
= ctx
;
140 struct aio_queue
*q
= at
->q
;
141 struct aiocb
*cb
= at
->cb
;
142 struct sigevent sev
= cb
->aio_sigevent
;
144 /* There are four potential types of waiters we could need to wake:
145 * 1. Callers of aio_cancel/close.
146 * 2. Callers of aio_suspend with a single aiocb.
147 * 3. Callers of aio_suspend with a list.
148 * 4. AIO worker threads waiting for sequenced operations.
149 * Types 1-3 are notified via atomics/futexes, mainly for AS-safety
150 * considerations. Type 4 is notified later via a cond var. */
153 if (a_swap(&at
->running
, 0) < 0)
154 __wake(&at
->running
, -1, 1);
155 if (a_swap(&cb
->__err
, at
->err
) != EINPROGRESS
)
156 __wake(&cb
->__err
, -1, 1);
157 if (a_swap(&__aio_fut
, 0))
158 __wake(&__aio_fut
, -1, 1);
160 pthread_mutex_lock(&q
->lock
);
162 if (at
->next
) at
->next
->prev
= at
->prev
;
163 if (at
->prev
) at
->prev
->next
= at
->next
;
164 else q
->head
= at
->next
;
166 /* Signal aio worker threads waiting for sequenced operations. */
167 pthread_cond_broadcast(&q
->cond
);
169 __aio_unref_queue(q
);
171 if (sev
.sigev_notify
== SIGEV_SIGNAL
) {
173 .si_signo
= sev
.sigev_signo
,
174 .si_value
= sev
.sigev_value
,
175 .si_code
= SI_ASYNCIO
,
179 __syscall(SYS_rt_sigqueueinfo
, si
.si_pid
, si
.si_signo
, &si
);
181 if (sev
.sigev_notify
== SIGEV_THREAD
) {
182 a_store(&__pthread_self()->cancel
, 0);
183 sev
.sigev_notify_function(sev
.sigev_value
);
187 static void *io_thread_func(void *ctx
)
189 struct aio_thread at
, *p
;
191 struct aio_args
*args
= ctx
;
192 struct aiocb
*cb
= args
->cb
;
193 int fd
= cb
->aio_fildes
;
195 void *buf
= (void *)cb
->aio_buf
;
196 size_t len
= cb
->aio_nbytes
;
197 off_t off
= cb
->aio_offset
;
199 struct aio_queue
*q
= __aio_get_queue(fd
, 1);
202 args
->err
= q
? 0 : EAGAIN
;
203 sem_post(&args
->sem
);
211 at
.td
= __pthread_self();
214 if ((at
.next
= q
->head
)) at
.next
->prev
= &at
;
219 int seekable
= lseek(fd
, 0, SEEK_CUR
) >= 0;
220 q
->seekable
= seekable
;
221 q
->append
= !seekable
|| (fcntl(fd
, F_GETFL
) & O_APPEND
);
225 pthread_cleanup_push(cleanup
, &at
);
227 /* Wait for sequenced operations. */
228 if (op
!=LIO_READ
&& (op
!=LIO_WRITE
|| q
->append
)) {
230 for (p
=at
.next
; p
&& p
->op
!=LIO_WRITE
; p
=p
->next
);
232 pthread_cond_wait(&q
->cond
, &q
->lock
);
236 pthread_mutex_unlock(&q
->lock
);
240 ret
= q
->append
? write(fd
, buf
, len
) : pwrite(fd
, buf
, len
, off
);
243 ret
= !q
->seekable
? read(fd
, buf
, len
) : pread(fd
, buf
, len
, off
);
253 at
.err
= ret
<0 ? errno
: 0;
255 pthread_cleanup_pop(1);
260 static int submit(struct aiocb
*cb
, int op
)
264 sigset_t allmask
, origmask
;
266 struct aio_args args
= { .cb
= cb
, .op
= op
};
267 sem_init(&args
.sem
, 0, 0);
269 if (cb
->aio_sigevent
.sigev_notify
== SIGEV_THREAD
) {
270 if (cb
->aio_sigevent
.sigev_notify_attributes
)
271 a
= *cb
->aio_sigevent
.sigev_notify_attributes
;
273 pthread_attr_init(&a
);
275 pthread_attr_init(&a
);
276 pthread_attr_setstacksize(&a
, PTHREAD_STACK_MIN
);
277 pthread_attr_setguardsize(&a
, 0);
279 pthread_attr_setdetachstate(&a
, PTHREAD_CREATE_DETACHED
);
280 sigfillset(&allmask
);
281 pthread_sigmask(SIG_BLOCK
, &allmask
, &origmask
);
282 cb
->__err
= EINPROGRESS
;
283 if (pthread_create(&td
, &a
, io_thread_func
, &args
)) {
287 pthread_sigmask(SIG_SETMASK
, &origmask
, 0);
290 while (sem_wait(&args
.sem
));
300 int aio_read(struct aiocb
*cb
)
302 return submit(cb
, LIO_READ
);
305 int aio_write(struct aiocb
*cb
)
307 return submit(cb
, LIO_WRITE
);
310 int aio_fsync(int op
, struct aiocb
*cb
)
312 if (op
!= O_SYNC
&& op
!= O_DSYNC
) {
316 return submit(cb
, op
);
319 ssize_t
aio_return(struct aiocb
*cb
)
324 int aio_error(const struct aiocb
*cb
)
327 return cb
->__err
& 0x7fffffff;
330 int aio_cancel(int fd
, struct aiocb
*cb
)
332 sigset_t allmask
, origmask
;
333 int ret
= AIO_ALLDONE
;
334 struct aio_thread
*p
;
337 /* Unspecified behavior case. Report an error. */
338 if (cb
&& fd
!= cb
->aio_fildes
) {
343 sigfillset(&allmask
);
344 pthread_sigmask(SIG_BLOCK
, &allmask
, &origmask
);
346 if (!(q
= __aio_get_queue(fd
, 0))) {
347 if (fcntl(fd
, F_GETFD
) < 0) ret
= -1;
351 for (p
= q
->head
; p
; p
= p
->next
) {
352 if (cb
&& cb
!= p
->cb
) continue;
353 /* Transition target from running to running-with-waiters */
354 if (a_cas(&p
->running
, 1, -1)) {
355 pthread_cancel(p
->td
);
356 __wait(&p
->running
, 0, -1, 1);
357 if (p
->err
== ECANCELED
) ret
= AIO_CANCELED
;
361 pthread_mutex_unlock(&q
->lock
);
363 pthread_sigmask(SIG_SETMASK
, &origmask
, 0);
367 int __aio_close(int fd
)
370 if (aio_fd_cnt
) aio_cancel(fd
, 0);