alsa: automatically decrease watermark after a time of stability
[pulseaudio-mirror.git] / src / pulsecore / rtpoll.c
blob666cbc98453fab7f428fa52d53237e13b4f2ab0f
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <sys/types.h>
28 #include <stdio.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <errno.h>
33 #ifdef HAVE_POLL_H
34 #include <poll.h>
35 #else
36 #include <pulsecore/poll.h>
37 #endif
39 #include <pulse/xmalloc.h>
40 #include <pulse/timeval.h>
42 #include <pulsecore/core-error.h>
43 #include <pulsecore/core-rtclock.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/llist.h>
46 #include <pulsecore/flist.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/winsock.h>
49 #include <pulsecore/ratelimit.h>
51 #include "rtpoll.h"
53 /* #define DEBUG_TIMING */
55 struct pa_rtpoll {
56 struct pollfd *pollfd, *pollfd2;
57 unsigned n_pollfd_alloc, n_pollfd_used;
59 struct timeval next_elapse;
60 pa_bool_t timer_enabled:1;
62 pa_bool_t scan_for_dead:1;
63 pa_bool_t running:1;
64 pa_bool_t rebuild_needed:1;
65 pa_bool_t quit:1;
66 pa_bool_t timer_elapsed:1;
68 #ifdef DEBUG_TIMING
69 pa_usec_t timestamp;
70 pa_usec_t slept, awake;
71 #endif
73 PA_LLIST_HEAD(pa_rtpoll_item, items);
76 struct pa_rtpoll_item {
77 pa_rtpoll *rtpoll;
78 pa_bool_t dead;
80 pa_rtpoll_priority_t priority;
82 struct pollfd *pollfd;
83 unsigned n_pollfd;
85 int (*work_cb)(pa_rtpoll_item *i);
86 int (*before_cb)(pa_rtpoll_item *i);
87 void (*after_cb)(pa_rtpoll_item *i);
88 void *userdata;
90 PA_LLIST_FIELDS(pa_rtpoll_item);
93 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
95 pa_rtpoll *pa_rtpoll_new(void) {
96 pa_rtpoll *p;
98 p = pa_xnew0(pa_rtpoll, 1);
100 p->n_pollfd_alloc = 32;
101 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
102 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
104 #ifdef DEBUG_TIMING
105 p->timestamp = pa_rtclock_now();
106 #endif
108 return p;
111 static void rtpoll_rebuild(pa_rtpoll *p) {
113 struct pollfd *e, *t;
114 pa_rtpoll_item *i;
115 int ra = 0;
117 pa_assert(p);
119 p->rebuild_needed = FALSE;
121 if (p->n_pollfd_used > p->n_pollfd_alloc) {
122 /* Hmm, we have to allocate some more space */
123 p->n_pollfd_alloc = p->n_pollfd_used * 2;
124 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
125 ra = 1;
128 e = p->pollfd2;
130 for (i = p->items; i; i = i->next) {
132 if (i->n_pollfd > 0) {
133 size_t l = i->n_pollfd * sizeof(struct pollfd);
135 if (i->pollfd)
136 memcpy(e, i->pollfd, l);
137 else
138 memset(e, 0, l);
140 i->pollfd = e;
141 } else
142 i->pollfd = NULL;
144 e += i->n_pollfd;
147 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
148 t = p->pollfd;
149 p->pollfd = p->pollfd2;
150 p->pollfd2 = t;
152 if (ra)
153 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
156 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
157 pa_rtpoll *p;
159 pa_assert(i);
161 p = i->rtpoll;
163 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
165 p->n_pollfd_used -= i->n_pollfd;
167 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
168 pa_xfree(i);
170 p->rebuild_needed = TRUE;
173 void pa_rtpoll_free(pa_rtpoll *p) {
174 pa_assert(p);
176 while (p->items)
177 rtpoll_item_destroy(p->items);
179 pa_xfree(p->pollfd);
180 pa_xfree(p->pollfd2);
182 pa_xfree(p);
185 static void reset_revents(pa_rtpoll_item *i) {
186 struct pollfd *f;
187 unsigned n;
189 pa_assert(i);
191 if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
192 return;
194 for (; n > 0; n--)
195 f[n-1].revents = 0;
198 static void reset_all_revents(pa_rtpoll *p) {
199 pa_rtpoll_item *i;
201 pa_assert(p);
203 for (i = p->items; i; i = i->next) {
205 if (i->dead)
206 continue;
208 reset_revents(i);
212 int pa_rtpoll_run(pa_rtpoll *p, pa_bool_t wait_op) {
213 pa_rtpoll_item *i;
214 int r = 0;
215 struct timeval timeout;
217 pa_assert(p);
218 pa_assert(!p->running);
220 p->running = TRUE;
221 p->timer_elapsed = FALSE;
223 /* First, let's do some work */
224 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
225 int k;
227 if (i->dead)
228 continue;
230 if (!i->work_cb)
231 continue;
233 if (p->quit)
234 goto finish;
236 if ((k = i->work_cb(i)) != 0) {
237 if (k < 0)
238 r = k;
240 goto finish;
244 /* Now let's prepare for entering the sleep */
245 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
246 int k = 0;
248 if (i->dead)
249 continue;
251 if (!i->before_cb)
252 continue;
254 if (p->quit || (k = i->before_cb(i)) != 0) {
256 /* Hmm, this one doesn't let us enter the poll, so rewind everything */
258 for (i = i->prev; i; i = i->prev) {
260 if (i->dead)
261 continue;
263 if (!i->after_cb)
264 continue;
266 i->after_cb(i);
269 if (k < 0)
270 r = k;
272 goto finish;
276 if (p->rebuild_needed)
277 rtpoll_rebuild(p);
279 pa_zero(timeout);
281 /* Calculate timeout */
282 if (wait_op && !p->quit && p->timer_enabled) {
283 struct timeval now;
284 pa_rtclock_get(&now);
286 if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
287 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
290 #ifdef DEBUG_TIMING
292 pa_usec_t now = pa_rtclock_now();
293 p->awake = now - p->timestamp;
294 p->timestamp = now;
296 #endif
298 /* OK, now let's sleep */
299 #ifdef HAVE_PPOLL
301 struct timespec ts;
302 ts.tv_sec = timeout.tv_sec;
303 ts.tv_nsec = timeout.tv_usec * 1000;
304 r = ppoll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? &ts : NULL, NULL);
306 #else
307 r = poll(p->pollfd, p->n_pollfd_used, (!wait_op || p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
308 #endif
310 p->timer_elapsed = r == 0;
312 #ifdef DEBUG_TIMING
314 pa_usec_t now = pa_rtclock_now();
315 p->slept = now - p->timestamp;
316 p->timestamp = now;
318 pa_log("Process time %llu ms; sleep time %llu ms",
319 (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
320 (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
322 #endif
324 if (r < 0) {
325 if (errno == EAGAIN || errno == EINTR)
326 r = 0;
327 else
328 pa_log_error("poll(): %s", pa_cstrerror(errno));
330 reset_all_revents(p);
333 /* Let's tell everyone that we left the sleep */
334 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
336 if (i->dead)
337 continue;
339 if (!i->after_cb)
340 continue;
342 i->after_cb(i);
345 finish:
347 p->running = FALSE;
349 if (p->scan_for_dead) {
350 pa_rtpoll_item *n;
352 p->scan_for_dead = FALSE;
354 for (i = p->items; i; i = n) {
355 n = i->next;
357 if (i->dead)
358 rtpoll_item_destroy(i);
362 return r < 0 ? r : !p->quit;
365 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
366 pa_assert(p);
368 pa_timeval_store(&p->next_elapse, usec);
369 p->timer_enabled = TRUE;
372 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
373 pa_assert(p);
375 /* Scheduling a timeout for more than an hour is very very suspicious */
376 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
378 pa_rtclock_get(&p->next_elapse);
379 pa_timeval_add(&p->next_elapse, usec);
380 p->timer_enabled = TRUE;
383 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
384 pa_assert(p);
386 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
387 p->timer_enabled = FALSE;
390 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
391 pa_rtpoll_item *i, *j, *l = NULL;
393 pa_assert(p);
395 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
396 i = pa_xnew(pa_rtpoll_item, 1);
398 i->rtpoll = p;
399 i->dead = FALSE;
400 i->n_pollfd = n_fds;
401 i->pollfd = NULL;
402 i->priority = prio;
404 i->userdata = NULL;
405 i->before_cb = NULL;
406 i->after_cb = NULL;
407 i->work_cb = NULL;
409 for (j = p->items; j; j = j->next) {
410 if (prio <= j->priority)
411 break;
413 l = j;
416 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
418 if (n_fds > 0) {
419 p->rebuild_needed = 1;
420 p->n_pollfd_used += n_fds;
423 return i;
426 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
427 pa_assert(i);
429 if (i->rtpoll->running) {
430 i->dead = TRUE;
431 i->rtpoll->scan_for_dead = TRUE;
432 return;
435 rtpoll_item_destroy(i);
438 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
439 pa_assert(i);
441 if (i->n_pollfd > 0)
442 if (i->rtpoll->rebuild_needed)
443 rtpoll_rebuild(i->rtpoll);
445 if (n_fds)
446 *n_fds = i->n_pollfd;
448 return i->pollfd;
451 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)) {
452 pa_assert(i);
453 pa_assert(i->priority < PA_RTPOLL_NEVER);
455 i->before_cb = before_cb;
458 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)) {
459 pa_assert(i);
460 pa_assert(i->priority < PA_RTPOLL_NEVER);
462 i->after_cb = after_cb;
465 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) {
466 pa_assert(i);
467 pa_assert(i->priority < PA_RTPOLL_NEVER);
469 i->work_cb = work_cb;
472 void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) {
473 pa_assert(i);
475 i->userdata = userdata;
478 void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i) {
479 pa_assert(i);
481 return i->userdata;
484 static int fdsem_before(pa_rtpoll_item *i) {
486 if (pa_fdsem_before_poll(i->userdata) < 0)
487 return 1; /* 1 means immediate restart of the loop */
489 return 0;
492 static void fdsem_after(pa_rtpoll_item *i) {
493 pa_assert(i);
495 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
496 pa_fdsem_after_poll(i->userdata);
499 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
500 pa_rtpoll_item *i;
501 struct pollfd *pollfd;
503 pa_assert(p);
504 pa_assert(f);
506 i = pa_rtpoll_item_new(p, prio, 1);
508 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
510 pollfd->fd = pa_fdsem_get(f);
511 pollfd->events = POLLIN;
513 i->before_cb = fdsem_before;
514 i->after_cb = fdsem_after;
515 i->userdata = f;
517 return i;
520 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
521 pa_assert(i);
523 if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
524 return 1; /* 1 means immediate restart of the loop */
526 return 0;
529 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
530 pa_assert(i);
532 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
533 pa_asyncmsgq_read_after_poll(i->userdata);
536 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
537 pa_msgobject *object;
538 int code;
539 void *data;
540 pa_memchunk chunk;
541 int64_t offset;
543 pa_assert(i);
545 if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
546 int ret;
548 if (!object && code == PA_MESSAGE_SHUTDOWN) {
549 pa_asyncmsgq_done(i->userdata, 0);
550 pa_rtpoll_quit(i->rtpoll);
551 return 1;
554 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
555 pa_asyncmsgq_done(i->userdata, ret);
556 return 1;
559 return 0;
562 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
563 pa_rtpoll_item *i;
564 struct pollfd *pollfd;
566 pa_assert(p);
567 pa_assert(q);
569 i = pa_rtpoll_item_new(p, prio, 1);
571 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
572 pollfd->fd = pa_asyncmsgq_read_fd(q);
573 pollfd->events = POLLIN;
575 i->before_cb = asyncmsgq_read_before;
576 i->after_cb = asyncmsgq_read_after;
577 i->work_cb = asyncmsgq_read_work;
578 i->userdata = q;
580 return i;
583 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
584 pa_assert(i);
586 pa_asyncmsgq_write_before_poll(i->userdata);
587 return 0;
590 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
591 pa_assert(i);
593 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
594 pa_asyncmsgq_write_after_poll(i->userdata);
597 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
598 pa_rtpoll_item *i;
599 struct pollfd *pollfd;
601 pa_assert(p);
602 pa_assert(q);
604 i = pa_rtpoll_item_new(p, prio, 1);
606 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
607 pollfd->fd = pa_asyncmsgq_write_fd(q);
608 pollfd->events = POLLIN;
610 i->before_cb = asyncmsgq_write_before;
611 i->after_cb = asyncmsgq_write_after;
612 i->work_cb = NULL;
613 i->userdata = q;
615 return i;
618 void pa_rtpoll_quit(pa_rtpoll *p) {
619 pa_assert(p);
621 p->quit = TRUE;
624 pa_bool_t pa_rtpoll_timer_elapsed(pa_rtpoll *p) {
625 pa_assert(p);
627 return p->timer_elapsed;