bluetooth: accept temporarily unavailable error
[pulseaudio-mirror.git] / src / pulse / stream.c
blob2455fe7b0a8af291c9dd42b8ab0a1a0725f5edd0
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
41 #include "internal.h"
43 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
45 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
46 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_MIN_HISTORY (4)
49 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
50 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
53 static void reset_callbacks(pa_stream *s) {
54 s->read_callback = NULL;
55 s->read_userdata = NULL;
56 s->write_callback = NULL;
57 s->write_userdata = NULL;
58 s->state_callback = NULL;
59 s->state_userdata = NULL;
60 s->overflow_callback = NULL;
61 s->overflow_userdata = NULL;
62 s->underflow_callback = NULL;
63 s->underflow_userdata = NULL;
64 s->latency_update_callback = NULL;
65 s->latency_update_userdata = NULL;
66 s->moved_callback = NULL;
67 s->moved_userdata = NULL;
68 s->suspended_callback = NULL;
69 s->suspended_userdata = NULL;
70 s->started_callback = NULL;
71 s->started_userdata = NULL;
72 s->event_callback = NULL;
73 s->event_userdata = NULL;
76 pa_stream *pa_stream_new_with_proplist(
77 pa_context *c,
78 const char *name,
79 const pa_sample_spec *ss,
80 const pa_channel_map *map,
81 pa_proplist *p) {
83 pa_stream *s;
84 int i;
85 pa_channel_map tmap;
87 pa_assert(c);
88 pa_assert(PA_REFCNT_VALUE(c) >= 1);
90 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
91 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
92 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
93 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
94 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
97 if (!map)
98 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
100 s = pa_xnew(pa_stream, 1);
101 PA_REFCNT_INIT(s);
102 s->context = c;
103 s->mainloop = c->mainloop;
105 s->direction = PA_STREAM_NODIRECTION;
106 s->state = PA_STREAM_UNCONNECTED;
107 s->flags = 0;
109 s->sample_spec = *ss;
110 s->channel_map = *map;
112 s->direct_on_input = PA_INVALID_INDEX;
114 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
115 if (name)
116 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
118 s->channel = 0;
119 s->channel_valid = FALSE;
120 s->syncid = c->csyncid++;
121 s->stream_index = PA_INVALID_INDEX;
123 s->requested_bytes = 0;
124 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
126 /* We initialize der target length here, so that if the user
127 * passes no explicit buffering metrics the default is similar to
128 * what older PA versions provided. */
130 s->buffer_attr.maxlength = (uint32_t) -1;
131 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
132 s->buffer_attr.minreq = (uint32_t) -1;
133 s->buffer_attr.prebuf = (uint32_t) -1;
134 s->buffer_attr.fragsize = (uint32_t) -1;
136 s->device_index = PA_INVALID_INDEX;
137 s->device_name = NULL;
138 s->suspended = FALSE;
140 pa_memchunk_reset(&s->peek_memchunk);
141 s->peek_data = NULL;
143 s->record_memblockq = NULL;
145 s->corked = FALSE;
147 memset(&s->timing_info, 0, sizeof(s->timing_info));
148 s->timing_info_valid = FALSE;
150 s->previous_time = 0;
152 s->read_index_not_before = 0;
153 s->write_index_not_before = 0;
154 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
155 s->write_index_corrections[i].valid = 0;
156 s->current_write_index_correction = 0;
158 s->auto_timing_update_event = NULL;
159 s->auto_timing_update_requested = FALSE;
161 reset_callbacks(s);
163 s->smoother = NULL;
165 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
166 PA_LLIST_PREPEND(pa_stream, c->streams, s);
167 pa_stream_ref(s);
169 return s;
172 static void stream_unlink(pa_stream *s) {
173 pa_operation *o, *n;
174 pa_assert(s);
176 if (!s->context)
177 return;
179 /* Detach from context */
181 /* Unref all operatio object that point to us */
182 for (o = s->context->operations; o; o = n) {
183 n = o->next;
185 if (o->stream == s)
186 pa_operation_cancel(o);
189 /* Drop all outstanding replies for this stream */
190 if (s->context->pdispatch)
191 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
193 if (s->channel_valid) {
194 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
195 s->channel = 0;
196 s->channel_valid = FALSE;
199 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
200 pa_stream_unref(s);
202 s->context = NULL;
204 if (s->auto_timing_update_event) {
205 pa_assert(s->mainloop);
206 s->mainloop->time_free(s->auto_timing_update_event);
209 reset_callbacks(s);
212 static void stream_free(pa_stream *s) {
213 pa_assert(s);
215 stream_unlink(s);
217 if (s->peek_memchunk.memblock) {
218 if (s->peek_data)
219 pa_memblock_release(s->peek_memchunk.memblock);
220 pa_memblock_unref(s->peek_memchunk.memblock);
223 if (s->record_memblockq)
224 pa_memblockq_free(s->record_memblockq);
226 if (s->proplist)
227 pa_proplist_free(s->proplist);
229 if (s->smoother)
230 pa_smoother_free(s->smoother);
232 pa_xfree(s->device_name);
233 pa_xfree(s);
236 void pa_stream_unref(pa_stream *s) {
237 pa_assert(s);
238 pa_assert(PA_REFCNT_VALUE(s) >= 1);
240 if (PA_REFCNT_DEC(s) <= 0)
241 stream_free(s);
244 pa_stream* pa_stream_ref(pa_stream *s) {
245 pa_assert(s);
246 pa_assert(PA_REFCNT_VALUE(s) >= 1);
248 PA_REFCNT_INC(s);
249 return s;
252 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
253 pa_assert(s);
254 pa_assert(PA_REFCNT_VALUE(s) >= 1);
256 return s->state;
259 pa_context* pa_stream_get_context(pa_stream *s) {
260 pa_assert(s);
261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
263 return s->context;
266 uint32_t pa_stream_get_index(pa_stream *s) {
267 pa_assert(s);
268 pa_assert(PA_REFCNT_VALUE(s) >= 1);
270 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
272 return s->stream_index;
275 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
276 pa_assert(s);
277 pa_assert(PA_REFCNT_VALUE(s) >= 1);
279 if (s->state == st)
280 return;
282 pa_stream_ref(s);
284 s->state = st;
286 if (s->state_callback)
287 s->state_callback(s, s->state_userdata);
289 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
290 stream_unlink(s);
292 pa_stream_unref(s);
295 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
296 pa_assert(s);
297 pa_assert(PA_REFCNT_VALUE(s) >= 1);
299 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
300 return;
302 if (s->state == PA_STREAM_READY &&
303 (force || !s->auto_timing_update_requested)) {
304 pa_operation *o;
306 /* pa_log("automatically requesting new timing data"); */
308 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
309 pa_operation_unref(o);
310 s->auto_timing_update_requested = TRUE;
314 if (s->auto_timing_update_event) {
315 struct timeval next;
316 pa_gettimeofday(&next);
317 pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
318 s->mainloop->time_restart(s->auto_timing_update_event, &next);
322 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
323 pa_context *c = userdata;
324 pa_stream *s;
325 uint32_t channel;
327 pa_assert(pd);
328 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
329 pa_assert(t);
330 pa_assert(c);
331 pa_assert(PA_REFCNT_VALUE(c) >= 1);
333 pa_context_ref(c);
335 if (pa_tagstruct_getu32(t, &channel) < 0 ||
336 !pa_tagstruct_eof(t)) {
337 pa_context_fail(c, PA_ERR_PROTOCOL);
338 goto finish;
341 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
342 goto finish;
344 if (s->state != PA_STREAM_READY)
345 goto finish;
347 pa_context_set_error(c, PA_ERR_KILLED);
348 pa_stream_set_state(s, PA_STREAM_FAILED);
350 finish:
351 pa_context_unref(c);
354 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
355 pa_usec_t x;
357 pa_assert(s);
358 pa_assert(!force_start || !force_stop);
360 if (!s->smoother)
361 return;
363 x = pa_rtclock_usec();
365 if (s->timing_info_valid) {
366 if (aposteriori)
367 x -= s->timing_info.transport_usec;
368 else
369 x += s->timing_info.transport_usec;
371 if (s->direction == PA_STREAM_PLAYBACK)
372 /* it takes a while until the pause/resume is actually
373 * audible */
374 x += s->timing_info.sink_usec;
375 else
376 /* Data froma while back will be dropped */
377 x -= s->timing_info.source_usec;
380 if (s->suspended || s->corked || force_stop)
381 pa_smoother_pause(s->smoother, x);
382 else if (force_start || s->buffer_attr.prebuf == 0)
383 pa_smoother_resume(s->smoother, x);
385 /* Please note that we have no idea if playback actually started
386 * if prebuf is non-zero! */
389 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
390 pa_context *c = userdata;
391 pa_stream *s;
392 uint32_t channel;
393 const char *dn;
394 pa_bool_t suspended;
395 uint32_t di;
396 pa_usec_t usec;
397 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
399 pa_assert(pd);
400 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
401 pa_assert(t);
402 pa_assert(c);
403 pa_assert(PA_REFCNT_VALUE(c) >= 1);
405 pa_context_ref(c);
407 if (c->version < 12) {
408 pa_context_fail(c, PA_ERR_PROTOCOL);
409 goto finish;
412 if (pa_tagstruct_getu32(t, &channel) < 0 ||
413 pa_tagstruct_getu32(t, &di) < 0 ||
414 pa_tagstruct_gets(t, &dn) < 0 ||
415 pa_tagstruct_get_boolean(t, &suspended) < 0) {
416 pa_context_fail(c, PA_ERR_PROTOCOL);
417 goto finish;
420 if (c->version >= 13) {
422 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
423 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
424 pa_tagstruct_getu32(t, &fragsize) < 0 ||
425 pa_tagstruct_get_usec(t, &usec) < 0) {
426 pa_context_fail(c, PA_ERR_PROTOCOL);
427 goto finish;
429 } else {
430 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
431 pa_tagstruct_getu32(t, &tlength) < 0 ||
432 pa_tagstruct_getu32(t, &prebuf) < 0 ||
433 pa_tagstruct_getu32(t, &minreq) < 0 ||
434 pa_tagstruct_get_usec(t, &usec) < 0) {
435 pa_context_fail(c, PA_ERR_PROTOCOL);
436 goto finish;
441 if (!pa_tagstruct_eof(t)) {
442 pa_context_fail(c, PA_ERR_PROTOCOL);
443 goto finish;
446 if (!dn || di == PA_INVALID_INDEX) {
447 pa_context_fail(c, PA_ERR_PROTOCOL);
448 goto finish;
451 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
452 goto finish;
454 if (s->state != PA_STREAM_READY)
455 goto finish;
457 if (c->version >= 13) {
458 if (s->direction == PA_STREAM_RECORD)
459 s->timing_info.configured_source_usec = usec;
460 else
461 s->timing_info.configured_sink_usec = usec;
463 s->buffer_attr.maxlength = maxlength;
464 s->buffer_attr.fragsize = fragsize;
465 s->buffer_attr.tlength = tlength;
466 s->buffer_attr.prebuf = prebuf;
467 s->buffer_attr.minreq = minreq;
470 pa_xfree(s->device_name);
471 s->device_name = pa_xstrdup(dn);
472 s->device_index = di;
474 s->suspended = suspended;
476 check_smoother_status(s, TRUE, FALSE, FALSE);
477 request_auto_timing_update(s, TRUE);
479 if (s->moved_callback)
480 s->moved_callback(s, s->moved_userdata);
482 finish:
483 pa_context_unref(c);
486 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
487 pa_context *c = userdata;
488 pa_stream *s;
489 uint32_t channel;
490 pa_bool_t suspended;
492 pa_assert(pd);
493 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
494 pa_assert(t);
495 pa_assert(c);
496 pa_assert(PA_REFCNT_VALUE(c) >= 1);
498 pa_context_ref(c);
500 if (c->version < 12) {
501 pa_context_fail(c, PA_ERR_PROTOCOL);
502 goto finish;
505 if (pa_tagstruct_getu32(t, &channel) < 0 ||
506 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
507 !pa_tagstruct_eof(t)) {
508 pa_context_fail(c, PA_ERR_PROTOCOL);
509 goto finish;
512 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
513 goto finish;
515 if (s->state != PA_STREAM_READY)
516 goto finish;
518 s->suspended = suspended;
520 check_smoother_status(s, TRUE, FALSE, FALSE);
521 request_auto_timing_update(s, TRUE);
523 if (s->suspended_callback)
524 s->suspended_callback(s, s->suspended_userdata);
526 finish:
527 pa_context_unref(c);
530 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
531 pa_context *c = userdata;
532 pa_stream *s;
533 uint32_t channel;
535 pa_assert(pd);
536 pa_assert(command == PA_COMMAND_STARTED);
537 pa_assert(t);
538 pa_assert(c);
539 pa_assert(PA_REFCNT_VALUE(c) >= 1);
541 pa_context_ref(c);
543 if (c->version < 13) {
544 pa_context_fail(c, PA_ERR_PROTOCOL);
545 goto finish;
548 if (pa_tagstruct_getu32(t, &channel) < 0 ||
549 !pa_tagstruct_eof(t)) {
550 pa_context_fail(c, PA_ERR_PROTOCOL);
551 goto finish;
554 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
555 goto finish;
557 if (s->state != PA_STREAM_READY)
558 goto finish;
560 check_smoother_status(s, TRUE, TRUE, FALSE);
561 request_auto_timing_update(s, TRUE);
563 if (s->started_callback)
564 s->started_callback(s, s->started_userdata);
566 finish:
567 pa_context_unref(c);
570 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
571 pa_context *c = userdata;
572 pa_stream *s;
573 uint32_t channel;
574 pa_proplist *pl = NULL;
575 const char *event;
577 pa_assert(pd);
578 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
579 pa_assert(t);
580 pa_assert(c);
581 pa_assert(PA_REFCNT_VALUE(c) >= 1);
583 pa_context_ref(c);
585 if (c->version < 15) {
586 pa_context_fail(c, PA_ERR_PROTOCOL);
587 goto finish;
590 pl = pa_proplist_new();
592 if (pa_tagstruct_getu32(t, &channel) < 0 ||
593 pa_tagstruct_gets(t, &event) < 0 ||
594 pa_tagstruct_get_proplist(t, pl) < 0 ||
595 !pa_tagstruct_eof(t) || !event) {
596 pa_context_fail(c, PA_ERR_PROTOCOL);
597 goto finish;
600 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
601 goto finish;
603 if (s->state != PA_STREAM_READY)
604 goto finish;
606 if (s->event_callback)
607 s->event_callback(s, event, pl, s->event_userdata);
609 finish:
610 pa_context_unref(c);
612 if (pl)
613 pa_proplist_free(pl);
616 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
617 pa_stream *s;
618 pa_context *c = userdata;
619 uint32_t bytes, channel;
621 pa_assert(pd);
622 pa_assert(command == PA_COMMAND_REQUEST);
623 pa_assert(t);
624 pa_assert(c);
625 pa_assert(PA_REFCNT_VALUE(c) >= 1);
627 pa_context_ref(c);
629 if (pa_tagstruct_getu32(t, &channel) < 0 ||
630 pa_tagstruct_getu32(t, &bytes) < 0 ||
631 !pa_tagstruct_eof(t)) {
632 pa_context_fail(c, PA_ERR_PROTOCOL);
633 goto finish;
636 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
637 goto finish;
639 if (s->state != PA_STREAM_READY)
640 goto finish;
642 s->requested_bytes += bytes;
644 if (s->requested_bytes > 0 && s->write_callback)
645 s->write_callback(s, s->requested_bytes, s->write_userdata);
647 finish:
648 pa_context_unref(c);
651 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
652 pa_stream *s;
653 pa_context *c = userdata;
654 uint32_t channel;
656 pa_assert(pd);
657 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
658 pa_assert(t);
659 pa_assert(c);
660 pa_assert(PA_REFCNT_VALUE(c) >= 1);
662 pa_context_ref(c);
664 if (pa_tagstruct_getu32(t, &channel) < 0 ||
665 !pa_tagstruct_eof(t)) {
666 pa_context_fail(c, PA_ERR_PROTOCOL);
667 goto finish;
670 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
671 goto finish;
673 if (s->state != PA_STREAM_READY)
674 goto finish;
676 if (s->buffer_attr.prebuf > 0)
677 check_smoother_status(s, TRUE, FALSE, TRUE);
679 request_auto_timing_update(s, TRUE);
681 if (command == PA_COMMAND_OVERFLOW) {
682 if (s->overflow_callback)
683 s->overflow_callback(s, s->overflow_userdata);
684 } else if (command == PA_COMMAND_UNDERFLOW) {
685 if (s->underflow_callback)
686 s->underflow_callback(s, s->underflow_userdata);
689 finish:
690 pa_context_unref(c);
693 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
694 pa_assert(s);
695 pa_assert(PA_REFCNT_VALUE(s) >= 1);
697 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
699 if (s->state != PA_STREAM_READY)
700 return;
702 if (w) {
703 s->write_index_not_before = s->context->ctag;
705 if (s->timing_info_valid)
706 s->timing_info.write_index_corrupt = TRUE;
708 /* pa_log("write_index invalidated"); */
711 if (r) {
712 s->read_index_not_before = s->context->ctag;
714 if (s->timing_info_valid)
715 s->timing_info.read_index_corrupt = TRUE;
717 /* pa_log("read_index invalidated"); */
720 request_auto_timing_update(s, TRUE);
723 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
724 pa_stream *s = userdata;
726 pa_assert(s);
727 pa_assert(PA_REFCNT_VALUE(s) >= 1);
729 pa_stream_ref(s);
730 request_auto_timing_update(s, FALSE);
731 pa_stream_unref(s);
734 static void create_stream_complete(pa_stream *s) {
735 pa_assert(s);
736 pa_assert(PA_REFCNT_VALUE(s) >= 1);
737 pa_assert(s->state == PA_STREAM_CREATING);
739 pa_stream_set_state(s, PA_STREAM_READY);
741 if (s->requested_bytes > 0 && s->write_callback)
742 s->write_callback(s, s->requested_bytes, s->write_userdata);
744 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
745 struct timeval tv;
746 pa_gettimeofday(&tv);
747 tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
748 pa_assert(!s->auto_timing_update_event);
749 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
751 request_auto_timing_update(s, TRUE);
754 check_smoother_status(s, TRUE, FALSE, FALSE);
757 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
758 pa_assert(s);
759 pa_assert(attr);
760 pa_assert(ss);
762 if (s->context->version >= 13)
763 return;
765 /* Version older than 0.9.10 didn't do server side buffer_attr
766 * selection, hence we have to fake it on the client side. */
768 /* We choose fairly conservative values here, to not confuse
769 * old clients with extremely large playback buffers */
771 if (attr->maxlength == (uint32_t) -1)
772 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
774 if (attr->tlength == (uint32_t) -1)
775 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
777 if (attr->minreq == (uint32_t) -1)
778 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
780 if (attr->prebuf == (uint32_t) -1)
781 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
783 if (attr->fragsize == (uint32_t) -1)
784 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
787 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
788 pa_stream *s = userdata;
790 pa_assert(pd);
791 pa_assert(s);
792 pa_assert(PA_REFCNT_VALUE(s) >= 1);
793 pa_assert(s->state == PA_STREAM_CREATING);
795 pa_stream_ref(s);
797 if (command != PA_COMMAND_REPLY) {
798 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
799 goto finish;
801 pa_stream_set_state(s, PA_STREAM_FAILED);
802 goto finish;
805 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
806 s->channel == PA_INVALID_INDEX ||
807 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
808 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
809 pa_context_fail(s->context, PA_ERR_PROTOCOL);
810 goto finish;
813 if (s->context->version >= 9) {
814 if (s->direction == PA_STREAM_PLAYBACK) {
815 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
816 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
817 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
818 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
819 pa_context_fail(s->context, PA_ERR_PROTOCOL);
820 goto finish;
822 } else if (s->direction == PA_STREAM_RECORD) {
823 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
824 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
825 pa_context_fail(s->context, PA_ERR_PROTOCOL);
826 goto finish;
831 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
832 pa_sample_spec ss;
833 pa_channel_map cm;
834 const char *dn = NULL;
835 pa_bool_t suspended;
837 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
838 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
839 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
840 pa_tagstruct_gets(t, &dn) < 0 ||
841 pa_tagstruct_get_boolean(t, &suspended) < 0) {
842 pa_context_fail(s->context, PA_ERR_PROTOCOL);
843 goto finish;
846 if (!dn || s->device_index == PA_INVALID_INDEX ||
847 ss.channels != cm.channels ||
848 !pa_channel_map_valid(&cm) ||
849 !pa_sample_spec_valid(&ss) ||
850 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
851 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
852 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
853 pa_context_fail(s->context, PA_ERR_PROTOCOL);
854 goto finish;
857 pa_xfree(s->device_name);
858 s->device_name = pa_xstrdup(dn);
859 s->suspended = suspended;
861 s->channel_map = cm;
862 s->sample_spec = ss;
865 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
866 pa_usec_t usec;
868 if (pa_tagstruct_get_usec(t, &usec) < 0) {
869 pa_context_fail(s->context, PA_ERR_PROTOCOL);
870 goto finish;
873 if (s->direction == PA_STREAM_RECORD)
874 s->timing_info.configured_source_usec = usec;
875 else
876 s->timing_info.configured_sink_usec = usec;
879 if (!pa_tagstruct_eof(t)) {
880 pa_context_fail(s->context, PA_ERR_PROTOCOL);
881 goto finish;
884 if (s->direction == PA_STREAM_RECORD) {
885 pa_assert(!s->record_memblockq);
887 s->record_memblockq = pa_memblockq_new(
889 s->buffer_attr.maxlength,
891 pa_frame_size(&s->sample_spec),
895 NULL);
898 s->channel_valid = TRUE;
899 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
901 create_stream_complete(s);
903 finish:
904 pa_stream_unref(s);
907 static int create_stream(
908 pa_stream_direction_t direction,
909 pa_stream *s,
910 const char *dev,
911 const pa_buffer_attr *attr,
912 pa_stream_flags_t flags,
913 const pa_cvolume *volume,
914 pa_stream *sync_stream) {
916 pa_tagstruct *t;
917 uint32_t tag;
918 pa_bool_t volume_set = FALSE;
920 pa_assert(s);
921 pa_assert(PA_REFCNT_VALUE(s) >= 1);
922 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
924 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
925 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
926 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
927 PA_STREAM_INTERPOLATE_TIMING|
928 PA_STREAM_NOT_MONOTONIC|
929 PA_STREAM_AUTO_TIMING_UPDATE|
930 PA_STREAM_NO_REMAP_CHANNELS|
931 PA_STREAM_NO_REMIX_CHANNELS|
932 PA_STREAM_FIX_FORMAT|
933 PA_STREAM_FIX_RATE|
934 PA_STREAM_FIX_CHANNELS|
935 PA_STREAM_DONT_MOVE|
936 PA_STREAM_VARIABLE_RATE|
937 PA_STREAM_PEAK_DETECT|
938 PA_STREAM_START_MUTED|
939 PA_STREAM_ADJUST_LATENCY|
940 PA_STREAM_EARLY_REQUESTS|
941 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
942 PA_STREAM_START_UNMUTED|
943 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
945 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
946 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
947 /* Althought some of the other flags are not supported on older
948 * version, we don't check for them here, because it doesn't hurt
949 * when they are passed but actually not supported. This makes
950 * client development easier */
952 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
953 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
954 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
955 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
956 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
958 pa_stream_ref(s);
960 s->direction = direction;
961 s->flags = flags;
962 s->corked = !!(flags & PA_STREAM_START_CORKED);
964 if (sync_stream)
965 s->syncid = sync_stream->syncid;
967 if (attr)
968 s->buffer_attr = *attr;
969 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
971 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
972 pa_usec_t x;
974 if (s->smoother)
975 pa_smoother_free(s->smoother);
977 s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
979 x = pa_rtclock_usec();
980 pa_smoother_set_time_offset(s->smoother, x);
981 pa_smoother_pause(s->smoother, x);
984 if (!dev)
985 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
987 t = pa_tagstruct_command(
988 s->context,
989 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
990 &tag);
992 if (s->context->version < 13)
993 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
995 pa_tagstruct_put(
997 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
998 PA_TAG_CHANNEL_MAP, &s->channel_map,
999 PA_TAG_U32, PA_INVALID_INDEX,
1000 PA_TAG_STRING, dev,
1001 PA_TAG_U32, s->buffer_attr.maxlength,
1002 PA_TAG_BOOLEAN, s->corked,
1003 PA_TAG_INVALID);
1005 if (s->direction == PA_STREAM_PLAYBACK) {
1006 pa_cvolume cv;
1008 pa_tagstruct_put(
1010 PA_TAG_U32, s->buffer_attr.tlength,
1011 PA_TAG_U32, s->buffer_attr.prebuf,
1012 PA_TAG_U32, s->buffer_attr.minreq,
1013 PA_TAG_U32, s->syncid,
1014 PA_TAG_INVALID);
1016 volume_set = !!volume;
1018 if (!volume)
1019 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1021 pa_tagstruct_put_cvolume(t, volume);
1022 } else
1023 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1025 if (s->context->version >= 12) {
1026 pa_tagstruct_put(
1028 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1029 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1030 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1031 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1032 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1033 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1034 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1035 PA_TAG_INVALID);
1038 if (s->context->version >= 13) {
1040 if (s->direction == PA_STREAM_PLAYBACK)
1041 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1042 else
1043 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1045 pa_tagstruct_put(
1047 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1048 PA_TAG_PROPLIST, s->proplist,
1049 PA_TAG_INVALID);
1051 if (s->direction == PA_STREAM_RECORD)
1052 pa_tagstruct_putu32(t, s->direct_on_input);
1055 if (s->context->version >= 14) {
1057 if (s->direction == PA_STREAM_PLAYBACK)
1058 pa_tagstruct_put_boolean(t, volume_set);
1060 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1063 if (s->context->version >= 15) {
1065 if (s->direction == PA_STREAM_PLAYBACK)
1066 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1068 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1069 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1072 pa_pstream_send_tagstruct(s->context->pstream, t);
1073 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1075 pa_stream_set_state(s, PA_STREAM_CREATING);
1077 pa_stream_unref(s);
1078 return 0;
1081 int pa_stream_connect_playback(
1082 pa_stream *s,
1083 const char *dev,
1084 const pa_buffer_attr *attr,
1085 pa_stream_flags_t flags,
1086 pa_cvolume *volume,
1087 pa_stream *sync_stream) {
1089 pa_assert(s);
1090 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1092 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1095 int pa_stream_connect_record(
1096 pa_stream *s,
1097 const char *dev,
1098 const pa_buffer_attr *attr,
1099 pa_stream_flags_t flags) {
1101 pa_assert(s);
1102 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1104 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1107 int pa_stream_write(
1108 pa_stream *s,
1109 const void *data,
1110 size_t length,
1111 void (*free_cb)(void *p),
1112 int64_t offset,
1113 pa_seek_mode_t seek) {
1115 pa_memchunk chunk;
1116 pa_seek_mode_t t_seek;
1117 int64_t t_offset;
1118 size_t t_length;
1119 const void *t_data;
1121 pa_assert(s);
1122 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1123 pa_assert(data);
1125 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1126 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1127 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1128 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1130 if (length <= 0)
1131 return 0;
1133 t_seek = seek;
1134 t_offset = offset;
1135 t_length = length;
1136 t_data = data;
1138 while (t_length > 0) {
1140 chunk.index = 0;
1142 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1143 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1144 chunk.length = t_length;
1145 } else {
1146 void *d;
1148 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1149 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1151 d = pa_memblock_acquire(chunk.memblock);
1152 memcpy(d, t_data, chunk.length);
1153 pa_memblock_release(chunk.memblock);
1156 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1158 t_offset = 0;
1159 t_seek = PA_SEEK_RELATIVE;
1161 t_data = (const uint8_t*) t_data + chunk.length;
1162 t_length -= chunk.length;
1164 pa_memblock_unref(chunk.memblock);
1167 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1168 free_cb((void*) data);
1170 if (length < s->requested_bytes)
1171 s->requested_bytes -= (uint32_t) length;
1172 else
1173 s->requested_bytes = 0;
1175 /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1177 if (s->direction == PA_STREAM_PLAYBACK) {
1179 /* Update latency request correction */
1180 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1182 if (seek == PA_SEEK_ABSOLUTE) {
1183 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1184 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1185 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1186 } else if (seek == PA_SEEK_RELATIVE) {
1187 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1188 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1189 } else
1190 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1193 /* Update the write index in the already available latency data */
1194 if (s->timing_info_valid) {
1196 if (seek == PA_SEEK_ABSOLUTE) {
1197 s->timing_info.write_index_corrupt = FALSE;
1198 s->timing_info.write_index = offset + (int64_t) length;
1199 } else if (seek == PA_SEEK_RELATIVE) {
1200 if (!s->timing_info.write_index_corrupt)
1201 s->timing_info.write_index += offset + (int64_t) length;
1202 } else
1203 s->timing_info.write_index_corrupt = TRUE;
1206 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1207 request_auto_timing_update(s, TRUE);
1210 return 0;
1213 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1214 pa_assert(s);
1215 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1216 pa_assert(data);
1217 pa_assert(length);
1219 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1220 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1222 if (!s->peek_memchunk.memblock) {
1224 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1225 *data = NULL;
1226 *length = 0;
1227 return 0;
1230 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1233 pa_assert(s->peek_data);
1234 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1235 *length = s->peek_memchunk.length;
1236 return 0;
1239 int pa_stream_drop(pa_stream *s) {
1240 pa_assert(s);
1241 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1243 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1244 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1245 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1247 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1249 /* Fix the simulated local read index */
1250 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1251 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1253 pa_assert(s->peek_data);
1254 pa_memblock_release(s->peek_memchunk.memblock);
1255 pa_memblock_unref(s->peek_memchunk.memblock);
1256 pa_memchunk_reset(&s->peek_memchunk);
1258 return 0;
1261 size_t pa_stream_writable_size(pa_stream *s) {
1262 pa_assert(s);
1263 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1265 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1266 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1268 return s->requested_bytes;
1271 size_t pa_stream_readable_size(pa_stream *s) {
1272 pa_assert(s);
1273 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1275 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1276 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1278 return pa_memblockq_get_length(s->record_memblockq);
1281 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1282 pa_operation *o;
1283 pa_tagstruct *t;
1284 uint32_t tag;
1286 pa_assert(s);
1287 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1289 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1290 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1292 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1294 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1295 pa_tagstruct_putu32(t, s->channel);
1296 pa_pstream_send_tagstruct(s->context->pstream, t);
1297 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1299 return o;
1302 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1303 pa_usec_t usec;
1305 pa_assert(s);
1306 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1307 pa_assert(s->state == PA_STREAM_READY);
1308 pa_assert(s->direction != PA_STREAM_UPLOAD);
1309 pa_assert(s->timing_info_valid);
1310 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1311 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1313 if (s->direction == PA_STREAM_PLAYBACK) {
1314 /* The last byte that was written into the output device
1315 * had this time value associated */
1316 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1318 if (!s->corked && !s->suspended) {
1320 if (!ignore_transport)
1321 /* Because the latency info took a little time to come
1322 * to us, we assume that the real output time is actually
1323 * a little ahead */
1324 usec += s->timing_info.transport_usec;
1326 /* However, the output device usually maintains a buffer
1327 too, hence the real sample currently played is a little
1328 back */
1329 if (s->timing_info.sink_usec >= usec)
1330 usec = 0;
1331 else
1332 usec -= s->timing_info.sink_usec;
1335 } else {
1336 pa_assert(s->direction == PA_STREAM_RECORD);
1338 /* The last byte written into the server side queue had
1339 * this time value associated */
1340 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1342 if (!s->corked && !s->suspended) {
1344 if (!ignore_transport)
1345 /* Add transport latency */
1346 usec += s->timing_info.transport_usec;
1348 /* Add latency of data in device buffer */
1349 usec += s->timing_info.source_usec;
1351 /* If this is a monitor source, we need to correct the
1352 * time by the playback device buffer */
1353 if (s->timing_info.sink_usec >= usec)
1354 usec = 0;
1355 else
1356 usec -= s->timing_info.sink_usec;
1360 return usec;
1363 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1364 pa_operation *o = userdata;
1365 struct timeval local, remote, now;
1366 pa_timing_info *i;
1367 pa_bool_t playing = FALSE;
1368 uint64_t underrun_for = 0, playing_for = 0;
1370 pa_assert(pd);
1371 pa_assert(o);
1372 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1374 if (!o->context || !o->stream)
1375 goto finish;
1377 i = &o->stream->timing_info;
1379 o->stream->timing_info_valid = FALSE;
1380 i->write_index_corrupt = TRUE;
1381 i->read_index_corrupt = TRUE;
1383 if (command != PA_COMMAND_REPLY) {
1384 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1385 goto finish;
1387 } else {
1389 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1390 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1391 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1392 pa_tagstruct_get_timeval(t, &local) < 0 ||
1393 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1394 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1395 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1397 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1398 goto finish;
1401 if (o->context->version >= 13 &&
1402 o->stream->direction == PA_STREAM_PLAYBACK)
1403 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1404 pa_tagstruct_getu64(t, &playing_for) < 0) {
1406 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1407 goto finish;
1411 if (!pa_tagstruct_eof(t)) {
1412 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1413 goto finish;
1415 o->stream->timing_info_valid = TRUE;
1416 i->write_index_corrupt = FALSE;
1417 i->read_index_corrupt = FALSE;
1419 i->playing = (int) playing;
1420 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1422 pa_gettimeofday(&now);
1424 /* Calculcate timestamps */
1425 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1426 /* local and remote seem to have synchronized clocks */
1428 if (o->stream->direction == PA_STREAM_PLAYBACK)
1429 i->transport_usec = pa_timeval_diff(&remote, &local);
1430 else
1431 i->transport_usec = pa_timeval_diff(&now, &remote);
1433 i->synchronized_clocks = TRUE;
1434 i->timestamp = remote;
1435 } else {
1436 /* clocks are not synchronized, let's estimate latency then */
1437 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1438 i->synchronized_clocks = FALSE;
1439 i->timestamp = local;
1440 pa_timeval_add(&i->timestamp, i->transport_usec);
1443 /* Invalidate read and write indexes if necessary */
1444 if (tag < o->stream->read_index_not_before)
1445 i->read_index_corrupt = TRUE;
1447 if (tag < o->stream->write_index_not_before)
1448 i->write_index_corrupt = TRUE;
1450 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1451 /* Write index correction */
1453 int n, j;
1454 uint32_t ctag = tag;
1456 /* Go through the saved correction values and add up the
1457 * total correction.*/
1458 for (n = 0, j = o->stream->current_write_index_correction+1;
1459 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1460 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1462 /* Step over invalid data or out-of-date data */
1463 if (!o->stream->write_index_corrections[j].valid ||
1464 o->stream->write_index_corrections[j].tag < ctag)
1465 continue;
1467 /* Make sure that everything is in order */
1468 ctag = o->stream->write_index_corrections[j].tag+1;
1470 /* Now fix the write index */
1471 if (o->stream->write_index_corrections[j].corrupt) {
1472 /* A corrupting seek was made */
1473 i->write_index_corrupt = TRUE;
1474 } else if (o->stream->write_index_corrections[j].absolute) {
1475 /* An absolute seek was made */
1476 i->write_index = o->stream->write_index_corrections[j].value;
1477 i->write_index_corrupt = FALSE;
1478 } else if (!i->write_index_corrupt) {
1479 /* A relative seek was made */
1480 i->write_index += o->stream->write_index_corrections[j].value;
1484 /* Clear old correction entries */
1485 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1486 if (!o->stream->write_index_corrections[n].valid)
1487 continue;
1489 if (o->stream->write_index_corrections[n].tag <= tag)
1490 o->stream->write_index_corrections[n].valid = FALSE;
1494 if (o->stream->direction == PA_STREAM_RECORD) {
1495 /* Read index correction */
1497 if (!i->read_index_corrupt)
1498 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1501 /* Update smoother */
1502 if (o->stream->smoother) {
1503 pa_usec_t u, x;
1505 u = x = pa_rtclock_usec() - i->transport_usec;
1507 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1508 pa_usec_t su;
1510 /* If we weren't playing then it will take some time
1511 * until the audio will actually come out through the
1512 * speakers. Since we follow that timing here, we need
1513 * to try to fix this up */
1515 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1517 if (su < i->sink_usec)
1518 x += i->sink_usec - su;
1521 if (!i->playing)
1522 pa_smoother_pause(o->stream->smoother, x);
1524 /* Update the smoother */
1525 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1526 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1527 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1529 if (i->playing)
1530 pa_smoother_resume(o->stream->smoother, x);
1534 o->stream->auto_timing_update_requested = FALSE;
1536 if (o->stream->latency_update_callback)
1537 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1539 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1540 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1541 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1544 finish:
1546 pa_operation_done(o);
1547 pa_operation_unref(o);
1550 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1551 uint32_t tag;
1552 pa_operation *o;
1553 pa_tagstruct *t;
1554 struct timeval now;
1555 int cidx = 0;
1557 pa_assert(s);
1558 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1560 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1561 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1563 if (s->direction == PA_STREAM_PLAYBACK) {
1564 /* Find a place to store the write_index correction data for this entry */
1565 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1567 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1568 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1570 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1572 t = pa_tagstruct_command(
1573 s->context,
1574 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1575 &tag);
1576 pa_tagstruct_putu32(t, s->channel);
1577 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1579 pa_pstream_send_tagstruct(s->context->pstream, t);
1580 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1582 if (s->direction == PA_STREAM_PLAYBACK) {
1583 /* Fill in initial correction data */
1585 s->current_write_index_correction = cidx;
1587 s->write_index_corrections[cidx].valid = TRUE;
1588 s->write_index_corrections[cidx].absolute = FALSE;
1589 s->write_index_corrections[cidx].corrupt = FALSE;
1590 s->write_index_corrections[cidx].tag = tag;
1591 s->write_index_corrections[cidx].value = 0;
1594 return o;
1597 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1598 pa_stream *s = userdata;
1600 pa_assert(pd);
1601 pa_assert(s);
1602 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1604 pa_stream_ref(s);
1606 if (command != PA_COMMAND_REPLY) {
1607 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1608 goto finish;
1610 pa_stream_set_state(s, PA_STREAM_FAILED);
1611 goto finish;
1612 } else if (!pa_tagstruct_eof(t)) {
1613 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1614 goto finish;
1617 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1619 finish:
1620 pa_stream_unref(s);
1623 int pa_stream_disconnect(pa_stream *s) {
1624 pa_tagstruct *t;
1625 uint32_t tag;
1627 pa_assert(s);
1628 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1630 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1631 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1633 pa_stream_ref(s);
1635 t = pa_tagstruct_command(
1636 s->context,
1637 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1638 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1639 &tag);
1640 pa_tagstruct_putu32(t, s->channel);
1641 pa_pstream_send_tagstruct(s->context->pstream, t);
1642 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1644 pa_stream_unref(s);
1645 return 0;
1648 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1649 pa_assert(s);
1650 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1652 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1653 return;
1655 s->read_callback = cb;
1656 s->read_userdata = userdata;
1659 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1660 pa_assert(s);
1661 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1663 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1664 return;
1666 s->write_callback = cb;
1667 s->write_userdata = userdata;
1670 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1671 pa_assert(s);
1672 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1674 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1675 return;
1677 s->state_callback = cb;
1678 s->state_userdata = userdata;
1681 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1682 pa_assert(s);
1683 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1685 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1686 return;
1688 s->overflow_callback = cb;
1689 s->overflow_userdata = userdata;
1692 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1693 pa_assert(s);
1694 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1696 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1697 return;
1699 s->underflow_callback = cb;
1700 s->underflow_userdata = userdata;
1703 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1704 pa_assert(s);
1705 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1707 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1708 return;
1710 s->latency_update_callback = cb;
1711 s->latency_update_userdata = userdata;
1714 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1715 pa_assert(s);
1716 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1718 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1719 return;
1721 s->moved_callback = cb;
1722 s->moved_userdata = userdata;
1725 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1726 pa_assert(s);
1727 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1729 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1730 return;
1732 s->suspended_callback = cb;
1733 s->suspended_userdata = userdata;
1736 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1737 pa_assert(s);
1738 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1740 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1741 return;
1743 s->started_callback = cb;
1744 s->started_userdata = userdata;
1747 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1748 pa_assert(s);
1749 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1751 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1752 return;
1754 s->event_callback = cb;
1755 s->event_userdata = userdata;
1758 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1759 pa_operation *o = userdata;
1760 int success = 1;
1762 pa_assert(pd);
1763 pa_assert(o);
1764 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1766 if (!o->context)
1767 goto finish;
1769 if (command != PA_COMMAND_REPLY) {
1770 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1771 goto finish;
1773 success = 0;
1774 } else if (!pa_tagstruct_eof(t)) {
1775 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1776 goto finish;
1779 if (o->callback) {
1780 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1781 cb(o->stream, success, o->userdata);
1784 finish:
1785 pa_operation_done(o);
1786 pa_operation_unref(o);
1789 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1790 pa_operation *o;
1791 pa_tagstruct *t;
1792 uint32_t tag;
1794 pa_assert(s);
1795 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1797 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1798 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1800 s->corked = b;
1802 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1804 t = pa_tagstruct_command(
1805 s->context,
1806 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1807 &tag);
1808 pa_tagstruct_putu32(t, s->channel);
1809 pa_tagstruct_put_boolean(t, !!b);
1810 pa_pstream_send_tagstruct(s->context->pstream, t);
1811 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1813 check_smoother_status(s, FALSE, FALSE, FALSE);
1815 /* This might cause the indexes to hang/start again, hence
1816 * let's request a timing update */
1817 request_auto_timing_update(s, TRUE);
1819 return o;
1822 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1823 pa_tagstruct *t;
1824 pa_operation *o;
1825 uint32_t tag;
1827 pa_assert(s);
1828 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1830 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1832 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1834 t = pa_tagstruct_command(s->context, command, &tag);
1835 pa_tagstruct_putu32(t, s->channel);
1836 pa_pstream_send_tagstruct(s->context->pstream, t);
1837 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1839 return o;
1842 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1843 pa_operation *o;
1845 pa_assert(s);
1846 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1848 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1849 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1851 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1852 return NULL;
1854 if (s->direction == PA_STREAM_PLAYBACK) {
1856 if (s->write_index_corrections[s->current_write_index_correction].valid)
1857 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1859 if (s->buffer_attr.prebuf > 0)
1860 check_smoother_status(s, FALSE, FALSE, TRUE);
1862 /* This will change the write index, but leave the
1863 * read index untouched. */
1864 invalidate_indexes(s, FALSE, TRUE);
1866 } else
1867 /* For record streams this has no influence on the write
1868 * index, but the read index might jump. */
1869 invalidate_indexes(s, TRUE, FALSE);
1871 return o;
1874 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1875 pa_operation *o;
1877 pa_assert(s);
1878 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1880 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1881 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1882 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1884 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1885 return NULL;
1887 /* This might cause the read index to hang again, hence
1888 * let's request a timing update */
1889 request_auto_timing_update(s, TRUE);
1891 return o;
1894 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1895 pa_operation *o;
1897 pa_assert(s);
1898 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1900 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1901 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1902 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1904 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1905 return NULL;
1907 /* This might cause the read index to start moving again, hence
1908 * let's request a timing update */
1909 request_auto_timing_update(s, TRUE);
1911 return o;
1914 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1915 pa_operation *o;
1917 pa_assert(s);
1918 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1919 pa_assert(name);
1921 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1922 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1924 if (s->context->version >= 13) {
1925 pa_proplist *p = pa_proplist_new();
1927 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1928 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
1929 pa_proplist_free(p);
1930 } else {
1931 pa_tagstruct *t;
1932 uint32_t tag;
1934 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1935 t = pa_tagstruct_command(
1936 s->context,
1937 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
1938 &tag);
1939 pa_tagstruct_putu32(t, s->channel);
1940 pa_tagstruct_puts(t, name);
1941 pa_pstream_send_tagstruct(s->context->pstream, t);
1942 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1945 return o;
1948 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
1949 pa_usec_t usec;
1951 pa_assert(s);
1952 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1954 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1955 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1956 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
1957 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
1958 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
1960 if (s->smoother)
1961 usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
1962 else
1963 usec = calc_time(s, FALSE);
1965 /* Make sure the time runs monotonically */
1966 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
1967 if (usec < s->previous_time)
1968 usec = s->previous_time;
1969 else
1970 s->previous_time = usec;
1973 if (r_usec)
1974 *r_usec = usec;
1976 return 0;
1979 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
1980 pa_assert(s);
1981 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1983 if (negative)
1984 *negative = 0;
1986 if (a >= b)
1987 return a-b;
1988 else {
1989 if (negative && s->direction == PA_STREAM_RECORD) {
1990 *negative = 1;
1991 return b-a;
1992 } else
1993 return 0;
1997 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
1998 pa_usec_t t, c;
1999 int r;
2000 int64_t cindex;
2002 pa_assert(s);
2003 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2004 pa_assert(r_usec);
2006 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2007 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2008 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2009 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2010 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2012 if ((r = pa_stream_get_time(s, &t)) < 0)
2013 return r;
2015 if (s->direction == PA_STREAM_PLAYBACK)
2016 cindex = s->timing_info.write_index;
2017 else
2018 cindex = s->timing_info.read_index;
2020 if (cindex < 0)
2021 cindex = 0;
2023 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2025 if (s->direction == PA_STREAM_PLAYBACK)
2026 *r_usec = time_counter_diff(s, c, t, negative);
2027 else
2028 *r_usec = time_counter_diff(s, t, c, negative);
2030 return 0;
2033 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2034 pa_assert(s);
2035 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2037 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2038 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2039 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2041 return &s->timing_info;
2044 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2045 pa_assert(s);
2046 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2048 return &s->sample_spec;
2051 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2052 pa_assert(s);
2053 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2055 return &s->channel_map;
2058 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2059 pa_assert(s);
2060 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2062 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2063 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2064 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2066 return &s->buffer_attr;
2069 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2070 pa_operation *o = userdata;
2071 int success = 1;
2073 pa_assert(pd);
2074 pa_assert(o);
2075 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2077 if (!o->context)
2078 goto finish;
2080 if (command != PA_COMMAND_REPLY) {
2081 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2082 goto finish;
2084 success = 0;
2085 } else {
2086 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2087 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2088 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2089 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2090 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2091 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2092 goto finish;
2094 } else if (o->stream->direction == PA_STREAM_RECORD) {
2095 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2096 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2097 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2098 goto finish;
2102 if (o->stream->context->version >= 13) {
2103 pa_usec_t usec;
2105 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2106 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2107 goto finish;
2110 if (o->stream->direction == PA_STREAM_RECORD)
2111 o->stream->timing_info.configured_source_usec = usec;
2112 else
2113 o->stream->timing_info.configured_sink_usec = usec;
2116 if (!pa_tagstruct_eof(t)) {
2117 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2118 goto finish;
2122 if (o->callback) {
2123 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2124 cb(o->stream, success, o->userdata);
2127 finish:
2128 pa_operation_done(o);
2129 pa_operation_unref(o);
2133 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2134 pa_operation *o;
2135 pa_tagstruct *t;
2136 uint32_t tag;
2138 pa_assert(s);
2139 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140 pa_assert(attr);
2142 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2143 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2144 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2146 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2148 t = pa_tagstruct_command(
2149 s->context,
2150 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2151 &tag);
2152 pa_tagstruct_putu32(t, s->channel);
2154 pa_tagstruct_putu32(t, attr->maxlength);
2156 if (s->direction == PA_STREAM_PLAYBACK)
2157 pa_tagstruct_put(
2159 PA_TAG_U32, attr->tlength,
2160 PA_TAG_U32, attr->prebuf,
2161 PA_TAG_U32, attr->minreq,
2162 PA_TAG_INVALID);
2163 else
2164 pa_tagstruct_putu32(t, attr->fragsize);
2166 if (s->context->version >= 13)
2167 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2169 if (s->context->version >= 14)
2170 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2172 pa_pstream_send_tagstruct(s->context->pstream, t);
2173 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2175 /* This might cause changes in the read/write indexex, hence let's
2176 * request a timing update */
2177 request_auto_timing_update(s, TRUE);
2179 return o;
2182 uint32_t pa_stream_get_device_index(pa_stream *s) {
2183 pa_assert(s);
2184 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2186 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2187 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2188 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2189 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2191 return s->device_index;
2194 const char *pa_stream_get_device_name(pa_stream *s) {
2195 pa_assert(s);
2196 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2198 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2200 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2201 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2203 return s->device_name;
2206 int pa_stream_is_suspended(pa_stream *s) {
2207 pa_assert(s);
2208 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2210 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2211 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2212 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2214 return s->suspended;
2217 int pa_stream_is_corked(pa_stream *s) {
2218 pa_assert(s);
2219 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2221 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2222 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2224 return s->corked;
2227 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2228 pa_operation *o = userdata;
2229 int success = 1;
2231 pa_assert(pd);
2232 pa_assert(o);
2233 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2235 if (!o->context)
2236 goto finish;
2238 if (command != PA_COMMAND_REPLY) {
2239 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2240 goto finish;
2242 success = 0;
2243 } else {
2245 if (!pa_tagstruct_eof(t)) {
2246 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2247 goto finish;
2251 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2252 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2254 if (o->callback) {
2255 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2256 cb(o->stream, success, o->userdata);
2259 finish:
2260 pa_operation_done(o);
2261 pa_operation_unref(o);
2265 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2266 pa_operation *o;
2267 pa_tagstruct *t;
2268 uint32_t tag;
2270 pa_assert(s);
2271 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2273 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2274 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2275 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2276 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2277 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2279 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2280 o->private = PA_UINT_TO_PTR(rate);
2282 t = pa_tagstruct_command(
2283 s->context,
2284 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2285 &tag);
2286 pa_tagstruct_putu32(t, s->channel);
2287 pa_tagstruct_putu32(t, rate);
2289 pa_pstream_send_tagstruct(s->context->pstream, t);
2290 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2292 return o;
2295 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2296 pa_operation *o;
2297 pa_tagstruct *t;
2298 uint32_t tag;
2300 pa_assert(s);
2301 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2303 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2304 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2305 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2306 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2308 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2310 t = pa_tagstruct_command(
2311 s->context,
2312 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2313 &tag);
2314 pa_tagstruct_putu32(t, s->channel);
2315 pa_tagstruct_putu32(t, (uint32_t) mode);
2316 pa_tagstruct_put_proplist(t, p);
2318 pa_pstream_send_tagstruct(s->context->pstream, t);
2319 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2321 /* Please note that we don't update s->proplist here, because we
2322 * don't export that field */
2324 return o;
2327 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2328 pa_operation *o;
2329 pa_tagstruct *t;
2330 uint32_t tag;
2331 const char * const*k;
2333 pa_assert(s);
2334 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2336 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2337 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2338 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2339 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2341 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2343 t = pa_tagstruct_command(
2344 s->context,
2345 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2346 &tag);
2347 pa_tagstruct_putu32(t, s->channel);
2349 for (k = keys; *k; k++)
2350 pa_tagstruct_puts(t, *k);
2352 pa_tagstruct_puts(t, NULL);
2354 pa_pstream_send_tagstruct(s->context->pstream, t);
2355 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2357 /* Please note that we don't update s->proplist here, because we
2358 * don't export that field */
2360 return o;
2363 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2364 pa_assert(s);
2365 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2367 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2368 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2369 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2371 s->direct_on_input = sink_input_idx;
2373 return 0;
2376 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2377 pa_assert(s);
2378 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2380 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2381 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2383 return s->direct_on_input;