core: relex validity checks when destructing half-set up source outputs/sink inputs
[pulseaudio-mirror.git] / src / pulsecore / rtpoll.c
blob42708a8a22c7bcf3df98229603c8daa9ab355c07
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <sys/types.h>
28 #include <stdio.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <errno.h>
33 #ifdef HAVE_POLL_H
34 #include <poll.h>
35 #else
36 #include <pulsecore/poll.h>
37 #endif
39 #include <pulse/xmalloc.h>
40 #include <pulse/timeval.h>
42 #include <pulsecore/core-error.h>
43 #include <pulsecore/core-rtclock.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/llist.h>
46 #include <pulsecore/flist.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/winsock.h>
49 #include <pulsecore/ratelimit.h>
51 #include "rtpoll.h"
53 /* #define DEBUG_TIMING */
55 struct pa_rtpoll {
56 struct pollfd *pollfd, *pollfd2;
57 unsigned n_pollfd_alloc, n_pollfd_used;
59 struct timeval next_elapse;
60 pa_bool_t timer_enabled:1;
62 pa_bool_t scan_for_dead:1;
63 pa_bool_t running:1;
64 pa_bool_t rebuild_needed:1;
65 pa_bool_t quit:1;
67 #ifdef DEBUG_TIMING
68 pa_usec_t timestamp;
69 pa_usec_t slept, awake;
70 #endif
72 PA_LLIST_HEAD(pa_rtpoll_item, items);
75 struct pa_rtpoll_item {
76 pa_rtpoll *rtpoll;
77 pa_bool_t dead;
79 pa_rtpoll_priority_t priority;
81 struct pollfd *pollfd;
82 unsigned n_pollfd;
84 int (*work_cb)(pa_rtpoll_item *i);
85 int (*before_cb)(pa_rtpoll_item *i);
86 void (*after_cb)(pa_rtpoll_item *i);
87 void *userdata;
89 PA_LLIST_FIELDS(pa_rtpoll_item);
92 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
94 pa_rtpoll *pa_rtpoll_new(void) {
95 pa_rtpoll *p;
97 p = pa_xnew(pa_rtpoll, 1);
99 p->n_pollfd_alloc = 32;
100 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
101 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
102 p->n_pollfd_used = 0;
104 pa_zero(p->next_elapse);
105 p->timer_enabled = FALSE;
107 p->running = FALSE;
108 p->scan_for_dead = FALSE;
109 p->rebuild_needed = FALSE;
110 p->quit = FALSE;
112 PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items);
114 #ifdef DEBUG_TIMING
115 p->timestamp = pa_rtclock_now();
116 p->slept = p->awake = 0;
117 #endif
119 return p;
122 static void rtpoll_rebuild(pa_rtpoll *p) {
124 struct pollfd *e, *t;
125 pa_rtpoll_item *i;
126 int ra = 0;
128 pa_assert(p);
130 p->rebuild_needed = FALSE;
132 if (p->n_pollfd_used > p->n_pollfd_alloc) {
133 /* Hmm, we have to allocate some more space */
134 p->n_pollfd_alloc = p->n_pollfd_used * 2;
135 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
136 ra = 1;
139 e = p->pollfd2;
141 for (i = p->items; i; i = i->next) {
143 if (i->n_pollfd > 0) {
144 size_t l = i->n_pollfd * sizeof(struct pollfd);
146 if (i->pollfd)
147 memcpy(e, i->pollfd, l);
148 else
149 memset(e, 0, l);
151 i->pollfd = e;
152 } else
153 i->pollfd = NULL;
155 e += i->n_pollfd;
158 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
159 t = p->pollfd;
160 p->pollfd = p->pollfd2;
161 p->pollfd2 = t;
163 if (ra)
164 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
167 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
168 pa_rtpoll *p;
170 pa_assert(i);
172 p = i->rtpoll;
174 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
176 p->n_pollfd_used -= i->n_pollfd;
178 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
179 pa_xfree(i);
181 p->rebuild_needed = TRUE;
184 void pa_rtpoll_free(pa_rtpoll *p) {
185 pa_assert(p);
187 while (p->items)
188 rtpoll_item_destroy(p->items);
190 pa_xfree(p->pollfd);
191 pa_xfree(p->pollfd2);
193 pa_xfree(p);
196 static void reset_revents(pa_rtpoll_item *i) {
197 struct pollfd *f;
198 unsigned n;
200 pa_assert(i);
202 if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
203 return;
205 for (; n > 0; n--)
206 f[n-1].revents = 0;
209 static void reset_all_revents(pa_rtpoll *p) {
210 pa_rtpoll_item *i;
212 pa_assert(p);
214 for (i = p->items; i; i = i->next) {
216 if (i->dead)
217 continue;
219 reset_revents(i);
223 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait_op) {
224 pa_rtpoll_item *i;
225 int r = 0;
226 struct timeval timeout;
228 pa_assert(p);
229 pa_assert(!p->running);
231 p->running = TRUE;
233 /* First, let's do some work */
234 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
235 int k;
237 if (i->dead)
238 continue;
240 if (!i->work_cb)
241 continue;
243 if (p->quit)
244 goto finish;
246 if ((k = i->work_cb(i)) != 0) {
247 if (k < 0)
248 r = k;
250 goto finish;
254 /* Now let's prepare for entering the sleep */
255 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
256 int k = 0;
258 if (i->dead)
259 continue;
261 if (!i->before_cb)
262 continue;
264 if (p->quit || (k = i->before_cb(i)) != 0) {
266 /* Hmm, this one doesn't let us enter the poll, so rewind everything */
268 for (i = i->prev; i; i = i->prev) {
270 if (i->dead)
271 continue;
273 if (!i->after_cb)
274 continue;
276 i->after_cb(i);
279 if (k < 0)
280 r = k;
282 goto finish;
286 if (p->rebuild_needed)
287 rtpoll_rebuild(p);
289 memset(&timeout, 0, sizeof(timeout));
291 /* Calculate timeout */
292 if (wait_op && !p->quit && p->timer_enabled) {
293 struct timeval now;
294 pa_rtclock_get(&now);
296 if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
297 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
300 #ifdef DEBUG_TIMING
302 pa_usec_t now = pa_rtclock_now();
303 p->awake = now - p->timestamp;
304 p->timestamp = now;
306 #endif
308 /* OK, now let's sleep */
309 #ifdef HAVE_PPOLL
311 struct timespec ts;
312 ts.tv_sec = timeout.tv_sec;
313 ts.tv_nsec = timeout.tv_usec * 1000;
314 r = ppoll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? &ts : NULL, NULL);
316 #else
317 r = poll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
318 #endif
320 #ifdef DEBUG_TIMING
322 pa_usec_t now = pa_rtclock_now();
323 p->slept = now - p->timestamp;
324 p->timestamp = now;
326 pa_log("Process time %llu ms; sleep time %llu ms",
327 (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
328 (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
330 #endif
332 if (r < 0) {
333 if (errno == EAGAIN || errno == EINTR)
334 r = 0;
335 else
336 pa_log_error("poll(): %s", pa_cstrerror(errno));
338 reset_all_revents(p);
341 /* Let's tell everyone that we left the sleep */
342 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
344 if (i->dead)
345 continue;
347 if (!i->after_cb)
348 continue;
350 i->after_cb(i);
353 finish:
355 p->running = FALSE;
357 if (p->scan_for_dead) {
358 pa_rtpoll_item *n;
360 p->scan_for_dead = FALSE;
362 for (i = p->items; i; i = n) {
363 n = i->next;
365 if (i->dead)
366 rtpoll_item_destroy(i);
370 return r < 0 ? r : !p->quit;
373 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
374 pa_assert(p);
376 pa_timeval_store(&p->next_elapse, usec);
377 p->timer_enabled = TRUE;
380 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
381 pa_assert(p);
383 /* Scheduling a timeout for more than an hour is very very suspicious */
384 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
386 pa_rtclock_get(&p->next_elapse);
387 pa_timeval_add(&p->next_elapse, usec);
388 p->timer_enabled = TRUE;
391 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
392 pa_assert(p);
394 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
395 p->timer_enabled = FALSE;
398 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
399 pa_rtpoll_item *i, *j, *l = NULL;
401 pa_assert(p);
403 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
404 i = pa_xnew(pa_rtpoll_item, 1);
406 i->rtpoll = p;
407 i->dead = FALSE;
408 i->n_pollfd = n_fds;
409 i->pollfd = NULL;
410 i->priority = prio;
412 i->userdata = NULL;
413 i->before_cb = NULL;
414 i->after_cb = NULL;
415 i->work_cb = NULL;
417 for (j = p->items; j; j = j->next) {
418 if (prio <= j->priority)
419 break;
421 l = j;
424 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
426 if (n_fds > 0) {
427 p->rebuild_needed = 1;
428 p->n_pollfd_used += n_fds;
431 return i;
434 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
435 pa_assert(i);
437 if (i->rtpoll->running) {
438 i->dead = TRUE;
439 i->rtpoll->scan_for_dead = TRUE;
440 return;
443 rtpoll_item_destroy(i);
446 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
447 pa_assert(i);
449 if (i->n_pollfd > 0)
450 if (i->rtpoll->rebuild_needed)
451 rtpoll_rebuild(i->rtpoll);
453 if (n_fds)
454 *n_fds = i->n_pollfd;
456 return i->pollfd;
459 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
460 pa_assert(i);
461 pa_assert(i->priority < PA_RTPOLL_NEVER);
463 i->before_cb = before_cb;
466 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
467 pa_assert(i);
468 pa_assert(i->priority < PA_RTPOLL_NEVER);
470 i->after_cb = after_cb;
473 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
474 pa_assert(i);
475 pa_assert(i->priority < PA_RTPOLL_NEVER);
477 i->work_cb = work_cb;
480 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
481 pa_assert(i);
483 i->userdata = userdata;
486 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
487 pa_assert(i);
489 return i->userdata;
492 static int fdsem_before(pa_rtpoll_item *i) {
494 if (pa_fdsem_before_poll(i->userdata) < 0)
495 return 1; /* 1 means immediate restart of the loop */
497 return 0;
500 static void fdsem_after(pa_rtpoll_item *i) {
501 pa_assert(i);
503 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
504 pa_fdsem_after_poll(i->userdata);
507 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
508 pa_rtpoll_item *i;
509 struct pollfd *pollfd;
511 pa_assert(p);
512 pa_assert(f);
514 i = pa_rtpoll_item_new(p, prio, 1);
516 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
518 pollfd->fd = pa_fdsem_get(f);
519 pollfd->events = POLLIN;
521 i->before_cb = fdsem_before;
522 i->after_cb = fdsem_after;
523 i->userdata = f;
525 return i;
528 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
529 pa_assert(i);
531 if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
532 return 1; /* 1 means immediate restart of the loop */
534 return 0;
537 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
538 pa_assert(i);
540 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
541 pa_asyncmsgq_read_after_poll(i->userdata);
544 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
545 pa_msgobject *object;
546 int code;
547 void *data;
548 pa_memchunk chunk;
549 int64_t offset;
551 pa_assert(i);
553 if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
554 int ret;
556 if (!object && code == PA_MESSAGE_SHUTDOWN) {
557 pa_asyncmsgq_done(i->userdata, 0);
558 pa_rtpoll_quit(i->rtpoll);
559 return 1;
562 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
563 pa_asyncmsgq_done(i->userdata, ret);
564 return 1;
567 return 0;
570 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
571 pa_rtpoll_item *i;
572 struct pollfd *pollfd;
574 pa_assert(p);
575 pa_assert(q);
577 i = pa_rtpoll_item_new(p, prio, 1);
579 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
580 pollfd->fd = pa_asyncmsgq_read_fd(q);
581 pollfd->events = POLLIN;
583 i->before_cb = asyncmsgq_read_before;
584 i->after_cb = asyncmsgq_read_after;
585 i->work_cb = asyncmsgq_read_work;
586 i->userdata = q;
588 return i;
591 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
592 pa_assert(i);
594 pa_asyncmsgq_write_before_poll(i->userdata);
595 return 0;
598 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
599 pa_assert(i);
601 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
602 pa_asyncmsgq_write_after_poll(i->userdata);
605 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
606 pa_rtpoll_item *i;
607 struct pollfd *pollfd;
609 pa_assert(p);
610 pa_assert(q);
612 i = pa_rtpoll_item_new(p, prio, 1);
614 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
615 pollfd->fd = pa_asyncmsgq_write_fd(q);
616 pollfd->events = POLLIN;
618 i->before_cb = asyncmsgq_write_before;
619 i->after_cb = asyncmsgq_write_after;
620 i->work_cb = NULL;
621 i->userdata = q;
623 return i;
626 void pa_rtpoll_quit(pa_rtpoll *p) {
627 pa_assert(p);
629 p->quit = TRUE;