Merge branch 'master' of ssh://repo.or.cz/srv/git/qemu
[qemu/mmix.git] / posix-aio-compat.c
blobc7fdac70b40b6cb50a0d4b2e7a9b4e62ab792d21
1 /*
2 * QEMU posix-aio emulation
4 * Copyright IBM, Corp. 2008
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
14 #include <sys/ioctl.h>
15 #include <pthread.h>
16 #include <unistd.h>
17 #include <errno.h>
18 #include <time.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include "osdep.h"
23 #include "qemu-common.h"
25 #include "posix-aio-compat.h"
27 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
28 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
29 static pthread_t thread_id;
30 static pthread_attr_t attr;
31 static int max_threads = 64;
32 static int cur_threads = 0;
33 static int idle_threads = 0;
34 static TAILQ_HEAD(, qemu_paiocb) request_list;
36 #ifdef HAVE_PREADV
37 static int preadv_present = 1;
38 #else
39 static int preadv_present = 0;
40 #endif
42 static void die2(int err, const char *what)
44 fprintf(stderr, "%s failed: %s\n", what, strerror(err));
45 abort();
48 static void die(const char *what)
50 die2(errno, what);
53 static void mutex_lock(pthread_mutex_t *mutex)
55 int ret = pthread_mutex_lock(mutex);
56 if (ret) die2(ret, "pthread_mutex_lock");
59 static void mutex_unlock(pthread_mutex_t *mutex)
61 int ret = pthread_mutex_unlock(mutex);
62 if (ret) die2(ret, "pthread_mutex_unlock");
65 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
66 struct timespec *ts)
68 int ret = pthread_cond_timedwait(cond, mutex, ts);
69 if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
70 return ret;
73 static void cond_signal(pthread_cond_t *cond)
75 int ret = pthread_cond_signal(cond);
76 if (ret) die2(ret, "pthread_cond_signal");
79 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
80 void *(*start_routine)(void*), void *arg)
82 int ret = pthread_create(thread, attr, start_routine, arg);
83 if (ret) die2(ret, "pthread_create");
86 static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
88 int ret;
90 ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
91 if (ret == -1)
92 return -errno;
93 return ret;
96 #ifdef HAVE_PREADV
98 static ssize_t
99 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
101 return preadv(fd, iov, nr_iov, offset);
104 static ssize_t
105 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
107 return pwritev(fd, iov, nr_iov, offset);
110 #else
112 static ssize_t
113 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
115 return -ENOSYS;
118 static ssize_t
119 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
121 return -ENOSYS;
124 #endif
127 * Check if we need to copy the data in the aiocb into a new
128 * properly aligned buffer.
130 static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
132 if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
133 int i;
135 for (i = 0; i < aiocb->aio_niov; i++)
136 if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
137 return 1;
140 return 0;
143 static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
145 size_t offset = 0;
146 ssize_t len;
148 do {
149 if (aiocb->aio_type == QEMU_PAIO_WRITE)
150 len = qemu_pwritev(aiocb->aio_fildes,
151 aiocb->aio_iov,
152 aiocb->aio_niov,
153 aiocb->aio_offset + offset);
154 else
155 len = qemu_preadv(aiocb->aio_fildes,
156 aiocb->aio_iov,
157 aiocb->aio_niov,
158 aiocb->aio_offset + offset);
159 } while (len == -1 && errno == EINTR);
161 if (len == -1)
162 return -errno;
163 return len;
166 static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
168 size_t offset = 0;
169 size_t len;
171 while (offset < aiocb->aio_nbytes) {
172 if (aiocb->aio_type == QEMU_PAIO_WRITE)
173 len = pwrite(aiocb->aio_fildes,
174 (const char *)buf + offset,
175 aiocb->aio_nbytes - offset,
176 aiocb->aio_offset + offset);
177 else
178 len = pread(aiocb->aio_fildes,
179 buf + offset,
180 aiocb->aio_nbytes - offset,
181 aiocb->aio_offset + offset);
183 if (len == -1 && errno == EINTR)
184 continue;
185 else if (len == -1) {
186 offset = -errno;
187 break;
188 } else if (len == 0)
189 break;
191 offset += len;
194 return offset;
197 static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
199 size_t nbytes;
200 char *buf;
202 if (!aiocb_needs_copy(aiocb)) {
204 * If there is just a single buffer, and it is properly aligned
205 * we can just use plain pread/pwrite without any problems.
207 if (aiocb->aio_niov == 1)
208 return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
211 * We have more than one iovec, and all are properly aligned.
213 * Try preadv/pwritev first and fall back to linearizing the
214 * buffer if it's not supported.
216 if (preadv_present) {
217 nbytes = handle_aiocb_rw_vector(aiocb);
218 if (nbytes == aiocb->aio_nbytes)
219 return nbytes;
220 if (nbytes < 0 && nbytes != -ENOSYS)
221 return nbytes;
222 preadv_present = 0;
226 * XXX(hch): short read/write. no easy way to handle the reminder
227 * using these interfaces. For now retry using plain
228 * pread/pwrite?
233 * Ok, we have to do it the hard way, copy all segments into
234 * a single aligned buffer.
236 buf = qemu_memalign(512, aiocb->aio_nbytes);
237 if (aiocb->aio_type == QEMU_PAIO_WRITE) {
238 char *p = buf;
239 int i;
241 for (i = 0; i < aiocb->aio_niov; ++i) {
242 memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
243 p += aiocb->aio_iov[i].iov_len;
247 nbytes = handle_aiocb_rw_linear(aiocb, buf);
248 if (aiocb->aio_type != QEMU_PAIO_WRITE) {
249 char *p = buf;
250 size_t count = aiocb->aio_nbytes, copy;
251 int i;
253 for (i = 0; i < aiocb->aio_niov && count; ++i) {
254 copy = count;
255 if (copy > aiocb->aio_iov[i].iov_len)
256 copy = aiocb->aio_iov[i].iov_len;
257 memcpy(aiocb->aio_iov[i].iov_base, p, copy);
258 p += copy;
259 count -= copy;
262 qemu_vfree(buf);
264 return nbytes;
267 static void *aio_thread(void *unused)
269 pid_t pid;
270 sigset_t set;
272 pid = getpid();
274 /* block all signals */
275 if (sigfillset(&set)) die("sigfillset");
276 if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
278 while (1) {
279 struct qemu_paiocb *aiocb;
280 size_t ret = 0;
281 qemu_timeval tv;
282 struct timespec ts;
284 qemu_gettimeofday(&tv);
285 ts.tv_sec = tv.tv_sec + 10;
286 ts.tv_nsec = 0;
288 mutex_lock(&lock);
290 while (TAILQ_EMPTY(&request_list) &&
291 !(ret == ETIMEDOUT)) {
292 ret = cond_timedwait(&cond, &lock, &ts);
295 if (TAILQ_EMPTY(&request_list))
296 break;
298 aiocb = TAILQ_FIRST(&request_list);
299 TAILQ_REMOVE(&request_list, aiocb, node);
300 aiocb->active = 1;
301 idle_threads--;
302 mutex_unlock(&lock);
304 switch (aiocb->aio_type) {
305 case QEMU_PAIO_READ:
306 case QEMU_PAIO_WRITE:
307 ret = handle_aiocb_rw(aiocb);
308 break;
309 case QEMU_PAIO_IOCTL:
310 ret = handle_aiocb_ioctl(aiocb);
311 break;
312 default:
313 fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
314 ret = -EINVAL;
315 break;
318 mutex_lock(&lock);
319 aiocb->ret = ret;
320 idle_threads++;
321 mutex_unlock(&lock);
323 if (kill(pid, aiocb->ev_signo)) die("kill failed");
326 idle_threads--;
327 cur_threads--;
328 mutex_unlock(&lock);
330 return NULL;
333 static void spawn_thread(void)
335 cur_threads++;
336 idle_threads++;
337 thread_create(&thread_id, &attr, aio_thread, NULL);
340 int qemu_paio_init(struct qemu_paioinit *aioinit)
342 int ret;
344 ret = pthread_attr_init(&attr);
345 if (ret) die2(ret, "pthread_attr_init");
347 ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
348 if (ret) die2(ret, "pthread_attr_setdetachstate");
350 TAILQ_INIT(&request_list);
352 return 0;
355 static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
357 aiocb->aio_type = type;
358 aiocb->ret = -EINPROGRESS;
359 aiocb->active = 0;
360 mutex_lock(&lock);
361 if (idle_threads == 0 && cur_threads < max_threads)
362 spawn_thread();
363 TAILQ_INSERT_TAIL(&request_list, aiocb, node);
364 mutex_unlock(&lock);
365 cond_signal(&cond);
367 return 0;
370 int qemu_paio_read(struct qemu_paiocb *aiocb)
372 return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
375 int qemu_paio_write(struct qemu_paiocb *aiocb)
377 return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
380 int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
382 return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
385 ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
387 ssize_t ret;
389 mutex_lock(&lock);
390 ret = aiocb->ret;
391 mutex_unlock(&lock);
393 return ret;
396 int qemu_paio_error(struct qemu_paiocb *aiocb)
398 ssize_t ret = qemu_paio_return(aiocb);
400 if (ret < 0)
401 ret = -ret;
402 else
403 ret = 0;
405 return ret;
408 int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
410 int ret;
412 mutex_lock(&lock);
413 if (!aiocb->active) {
414 TAILQ_REMOVE(&request_list, aiocb, node);
415 aiocb->ret = -ECANCELED;
416 ret = QEMU_PAIO_CANCELED;
417 } else if (aiocb->ret == -EINPROGRESS)
418 ret = QEMU_PAIO_NOTCANCELED;
419 else
420 ret = QEMU_PAIO_ALLDONE;
421 mutex_unlock(&lock);
423 return ret;