pulse: ask for timing updates both *before* and *after* triggering a stream state...
[pulseaudio-mirror.git] / src / pulse / stream.c
blob4dea56707227e75205b19e98d66be3c3820b6ff8
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
42 #include "fork-detect.h"
43 #include "internal.h"
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 static void reset_callbacks(pa_stream *s) {
57 s->read_callback = NULL;
58 s->read_userdata = NULL;
59 s->write_callback = NULL;
60 s->write_userdata = NULL;
61 s->state_callback = NULL;
62 s->state_userdata = NULL;
63 s->overflow_callback = NULL;
64 s->overflow_userdata = NULL;
65 s->underflow_callback = NULL;
66 s->underflow_userdata = NULL;
67 s->latency_update_callback = NULL;
68 s->latency_update_userdata = NULL;
69 s->moved_callback = NULL;
70 s->moved_userdata = NULL;
71 s->suspended_callback = NULL;
72 s->suspended_userdata = NULL;
73 s->started_callback = NULL;
74 s->started_userdata = NULL;
75 s->event_callback = NULL;
76 s->event_userdata = NULL;
77 s->buffer_attr_callback = NULL;
78 s->buffer_attr_userdata = NULL;
81 pa_stream *pa_stream_new_with_proplist(
82 pa_context *c,
83 const char *name,
84 const pa_sample_spec *ss,
85 const pa_channel_map *map,
86 pa_proplist *p) {
88 pa_stream *s;
89 int i;
90 pa_channel_map tmap;
92 pa_assert(c);
93 pa_assert(PA_REFCNT_VALUE(c) >= 1);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103 if (!map)
104 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
106 s = pa_xnew(pa_stream, 1);
107 PA_REFCNT_INIT(s);
108 s->context = c;
109 s->mainloop = c->mainloop;
111 s->direction = PA_STREAM_NODIRECTION;
112 s->state = PA_STREAM_UNCONNECTED;
113 s->flags = 0;
115 s->sample_spec = *ss;
116 s->channel_map = *map;
118 s->direct_on_input = PA_INVALID_INDEX;
120 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121 if (name)
122 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
124 s->channel = 0;
125 s->channel_valid = FALSE;
126 s->syncid = c->csyncid++;
127 s->stream_index = PA_INVALID_INDEX;
129 s->requested_bytes = 0;
130 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
132 /* We initialize der target length here, so that if the user
133 * passes no explicit buffering metrics the default is similar to
134 * what older PA versions provided. */
136 s->buffer_attr.maxlength = (uint32_t) -1;
137 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138 s->buffer_attr.minreq = (uint32_t) -1;
139 s->buffer_attr.prebuf = (uint32_t) -1;
140 s->buffer_attr.fragsize = (uint32_t) -1;
142 s->device_index = PA_INVALID_INDEX;
143 s->device_name = NULL;
144 s->suspended = FALSE;
145 s->corked = FALSE;
147 s->write_memblock = NULL;
148 s->write_data = NULL;
150 pa_memchunk_reset(&s->peek_memchunk);
151 s->peek_data = NULL;
152 s->record_memblockq = NULL;
154 memset(&s->timing_info, 0, sizeof(s->timing_info));
155 s->timing_info_valid = FALSE;
157 s->previous_time = 0;
159 s->read_index_not_before = 0;
160 s->write_index_not_before = 0;
161 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162 s->write_index_corrections[i].valid = 0;
163 s->current_write_index_correction = 0;
165 s->auto_timing_update_event = NULL;
166 s->auto_timing_update_requested = FALSE;
167 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
169 reset_callbacks(s);
171 s->smoother = NULL;
173 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174 PA_LLIST_PREPEND(pa_stream, c->streams, s);
175 pa_stream_ref(s);
177 return s;
180 static void stream_unlink(pa_stream *s) {
181 pa_operation *o, *n;
182 pa_assert(s);
184 if (!s->context)
185 return;
187 /* Detach from context */
189 /* Unref all operatio object that point to us */
190 for (o = s->context->operations; o; o = n) {
191 n = o->next;
193 if (o->stream == s)
194 pa_operation_cancel(o);
197 /* Drop all outstanding replies for this stream */
198 if (s->context->pdispatch)
199 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
201 if (s->channel_valid) {
202 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203 s->channel = 0;
204 s->channel_valid = FALSE;
207 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208 pa_stream_unref(s);
210 s->context = NULL;
212 if (s->auto_timing_update_event) {
213 pa_assert(s->mainloop);
214 s->mainloop->time_free(s->auto_timing_update_event);
217 reset_callbacks(s);
220 static void stream_free(pa_stream *s) {
221 pa_assert(s);
223 stream_unlink(s);
225 if (s->write_memblock) {
226 pa_memblock_release(s->write_memblock);
227 pa_memblock_unref(s->write_data);
230 if (s->peek_memchunk.memblock) {
231 if (s->peek_data)
232 pa_memblock_release(s->peek_memchunk.memblock);
233 pa_memblock_unref(s->peek_memchunk.memblock);
236 if (s->record_memblockq)
237 pa_memblockq_free(s->record_memblockq);
239 if (s->proplist)
240 pa_proplist_free(s->proplist);
242 if (s->smoother)
243 pa_smoother_free(s->smoother);
245 pa_xfree(s->device_name);
246 pa_xfree(s);
249 void pa_stream_unref(pa_stream *s) {
250 pa_assert(s);
251 pa_assert(PA_REFCNT_VALUE(s) >= 1);
253 if (PA_REFCNT_DEC(s) <= 0)
254 stream_free(s);
257 pa_stream* pa_stream_ref(pa_stream *s) {
258 pa_assert(s);
259 pa_assert(PA_REFCNT_VALUE(s) >= 1);
261 PA_REFCNT_INC(s);
262 return s;
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266 pa_assert(s);
267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
269 return s->state;
272 pa_context* pa_stream_get_context(pa_stream *s) {
273 pa_assert(s);
274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
276 return s->context;
279 uint32_t pa_stream_get_index(pa_stream *s) {
280 pa_assert(s);
281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
283 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
286 return s->stream_index;
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290 pa_assert(s);
291 pa_assert(PA_REFCNT_VALUE(s) >= 1);
293 if (s->state == st)
294 return;
296 pa_stream_ref(s);
298 s->state = st;
300 if (s->state_callback)
301 s->state_callback(s, s->state_userdata);
303 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304 stream_unlink(s);
306 pa_stream_unref(s);
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310 pa_assert(s);
311 pa_assert(PA_REFCNT_VALUE(s) >= 1);
313 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314 return;
316 if (s->state == PA_STREAM_READY &&
317 (force || !s->auto_timing_update_requested)) {
318 pa_operation *o;
320 /* pa_log("Automatically requesting new timing data"); */
322 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323 pa_operation_unref(o);
324 s->auto_timing_update_requested = TRUE;
328 if (s->auto_timing_update_event) {
329 if (force)
330 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
332 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
334 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339 pa_context *c = userdata;
340 pa_stream *s;
341 uint32_t channel;
343 pa_assert(pd);
344 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345 pa_assert(t);
346 pa_assert(c);
347 pa_assert(PA_REFCNT_VALUE(c) >= 1);
349 pa_context_ref(c);
351 if (pa_tagstruct_getu32(t, &channel) < 0 ||
352 !pa_tagstruct_eof(t)) {
353 pa_context_fail(c, PA_ERR_PROTOCOL);
354 goto finish;
357 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358 goto finish;
360 if (s->state != PA_STREAM_READY)
361 goto finish;
363 pa_context_set_error(c, PA_ERR_KILLED);
364 pa_stream_set_state(s, PA_STREAM_FAILED);
366 finish:
367 pa_context_unref(c);
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371 pa_usec_t x;
373 pa_assert(s);
374 pa_assert(!force_start || !force_stop);
376 if (!s->smoother)
377 return;
379 x = pa_rtclock_now();
381 if (s->timing_info_valid) {
382 if (aposteriori)
383 x -= s->timing_info.transport_usec;
384 else
385 x += s->timing_info.transport_usec;
388 if (s->suspended || s->corked || force_stop)
389 pa_smoother_pause(s->smoother, x);
390 else if (force_start || s->buffer_attr.prebuf == 0) {
392 if (!s->timing_info_valid &&
393 !aposteriori &&
394 !force_start &&
395 !force_stop &&
396 s->context->version >= 13) {
398 /* If the server supports STARTED events we take them as
399 * indications when audio really starts/stops playing, if
400 * we don't have any timing info yet -- instead of trying
401 * to be smart and guessing the server time. Otherwise the
402 * unknown transport delay add too much noise to our time
403 * calculations. */
405 return;
408 pa_smoother_resume(s->smoother, x, TRUE);
411 /* Please note that we have no idea if playback actually started
412 * if prebuf is non-zero! */
415 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
416 pa_context *c = userdata;
417 pa_stream *s;
418 uint32_t channel;
419 const char *dn;
420 pa_bool_t suspended;
421 uint32_t di;
422 pa_usec_t usec = 0;
423 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
425 pa_assert(pd);
426 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
427 pa_assert(t);
428 pa_assert(c);
429 pa_assert(PA_REFCNT_VALUE(c) >= 1);
431 pa_context_ref(c);
433 if (c->version < 12) {
434 pa_context_fail(c, PA_ERR_PROTOCOL);
435 goto finish;
438 if (pa_tagstruct_getu32(t, &channel) < 0 ||
439 pa_tagstruct_getu32(t, &di) < 0 ||
440 pa_tagstruct_gets(t, &dn) < 0 ||
441 pa_tagstruct_get_boolean(t, &suspended) < 0) {
442 pa_context_fail(c, PA_ERR_PROTOCOL);
443 goto finish;
446 if (c->version >= 13) {
448 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
449 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
450 pa_tagstruct_getu32(t, &fragsize) < 0 ||
451 pa_tagstruct_get_usec(t, &usec) < 0) {
452 pa_context_fail(c, PA_ERR_PROTOCOL);
453 goto finish;
455 } else {
456 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
457 pa_tagstruct_getu32(t, &tlength) < 0 ||
458 pa_tagstruct_getu32(t, &prebuf) < 0 ||
459 pa_tagstruct_getu32(t, &minreq) < 0 ||
460 pa_tagstruct_get_usec(t, &usec) < 0) {
461 pa_context_fail(c, PA_ERR_PROTOCOL);
462 goto finish;
467 if (!pa_tagstruct_eof(t)) {
468 pa_context_fail(c, PA_ERR_PROTOCOL);
469 goto finish;
472 if (!dn || di == PA_INVALID_INDEX) {
473 pa_context_fail(c, PA_ERR_PROTOCOL);
474 goto finish;
477 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
478 goto finish;
480 if (s->state != PA_STREAM_READY)
481 goto finish;
483 if (c->version >= 13) {
484 if (s->direction == PA_STREAM_RECORD)
485 s->timing_info.configured_source_usec = usec;
486 else
487 s->timing_info.configured_sink_usec = usec;
489 s->buffer_attr.maxlength = maxlength;
490 s->buffer_attr.fragsize = fragsize;
491 s->buffer_attr.tlength = tlength;
492 s->buffer_attr.prebuf = prebuf;
493 s->buffer_attr.minreq = minreq;
496 pa_xfree(s->device_name);
497 s->device_name = pa_xstrdup(dn);
498 s->device_index = di;
500 s->suspended = suspended;
502 check_smoother_status(s, TRUE, FALSE, FALSE);
503 request_auto_timing_update(s, TRUE);
505 if (s->moved_callback)
506 s->moved_callback(s, s->moved_userdata);
508 finish:
509 pa_context_unref(c);
512 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
513 pa_context *c = userdata;
514 pa_stream *s;
515 uint32_t channel;
516 pa_usec_t usec = 0;
517 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
519 pa_assert(pd);
520 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
521 pa_assert(t);
522 pa_assert(c);
523 pa_assert(PA_REFCNT_VALUE(c) >= 1);
525 pa_context_ref(c);
527 if (c->version < 15) {
528 pa_context_fail(c, PA_ERR_PROTOCOL);
529 goto finish;
532 if (pa_tagstruct_getu32(t, &channel) < 0) {
533 pa_context_fail(c, PA_ERR_PROTOCOL);
534 goto finish;
537 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
538 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
539 pa_tagstruct_getu32(t, &fragsize) < 0 ||
540 pa_tagstruct_get_usec(t, &usec) < 0) {
541 pa_context_fail(c, PA_ERR_PROTOCOL);
542 goto finish;
544 } else {
545 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
546 pa_tagstruct_getu32(t, &tlength) < 0 ||
547 pa_tagstruct_getu32(t, &prebuf) < 0 ||
548 pa_tagstruct_getu32(t, &minreq) < 0 ||
549 pa_tagstruct_get_usec(t, &usec) < 0) {
550 pa_context_fail(c, PA_ERR_PROTOCOL);
551 goto finish;
555 if (!pa_tagstruct_eof(t)) {
556 pa_context_fail(c, PA_ERR_PROTOCOL);
557 goto finish;
560 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
561 goto finish;
563 if (s->state != PA_STREAM_READY)
564 goto finish;
566 if (s->direction == PA_STREAM_RECORD)
567 s->timing_info.configured_source_usec = usec;
568 else
569 s->timing_info.configured_sink_usec = usec;
571 s->buffer_attr.maxlength = maxlength;
572 s->buffer_attr.fragsize = fragsize;
573 s->buffer_attr.tlength = tlength;
574 s->buffer_attr.prebuf = prebuf;
575 s->buffer_attr.minreq = minreq;
577 request_auto_timing_update(s, TRUE);
579 if (s->buffer_attr_callback)
580 s->buffer_attr_callback(s, s->buffer_attr_userdata);
582 finish:
583 pa_context_unref(c);
586 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
587 pa_context *c = userdata;
588 pa_stream *s;
589 uint32_t channel;
590 pa_bool_t suspended;
592 pa_assert(pd);
593 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
594 pa_assert(t);
595 pa_assert(c);
596 pa_assert(PA_REFCNT_VALUE(c) >= 1);
598 pa_context_ref(c);
600 if (c->version < 12) {
601 pa_context_fail(c, PA_ERR_PROTOCOL);
602 goto finish;
605 if (pa_tagstruct_getu32(t, &channel) < 0 ||
606 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
607 !pa_tagstruct_eof(t)) {
608 pa_context_fail(c, PA_ERR_PROTOCOL);
609 goto finish;
612 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
613 goto finish;
615 if (s->state != PA_STREAM_READY)
616 goto finish;
618 s->suspended = suspended;
620 check_smoother_status(s, TRUE, FALSE, FALSE);
621 request_auto_timing_update(s, TRUE);
623 if (s->suspended_callback)
624 s->suspended_callback(s, s->suspended_userdata);
626 finish:
627 pa_context_unref(c);
630 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
631 pa_context *c = userdata;
632 pa_stream *s;
633 uint32_t channel;
635 pa_assert(pd);
636 pa_assert(command == PA_COMMAND_STARTED);
637 pa_assert(t);
638 pa_assert(c);
639 pa_assert(PA_REFCNT_VALUE(c) >= 1);
641 pa_context_ref(c);
643 if (c->version < 13) {
644 pa_context_fail(c, PA_ERR_PROTOCOL);
645 goto finish;
648 if (pa_tagstruct_getu32(t, &channel) < 0 ||
649 !pa_tagstruct_eof(t)) {
650 pa_context_fail(c, PA_ERR_PROTOCOL);
651 goto finish;
654 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
655 goto finish;
657 if (s->state != PA_STREAM_READY)
658 goto finish;
660 check_smoother_status(s, TRUE, TRUE, FALSE);
661 request_auto_timing_update(s, TRUE);
663 if (s->started_callback)
664 s->started_callback(s, s->started_userdata);
666 finish:
667 pa_context_unref(c);
670 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
671 pa_context *c = userdata;
672 pa_stream *s;
673 uint32_t channel;
674 pa_proplist *pl = NULL;
675 const char *event;
677 pa_assert(pd);
678 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
679 pa_assert(t);
680 pa_assert(c);
681 pa_assert(PA_REFCNT_VALUE(c) >= 1);
683 pa_context_ref(c);
685 if (c->version < 15) {
686 pa_context_fail(c, PA_ERR_PROTOCOL);
687 goto finish;
690 pl = pa_proplist_new();
692 if (pa_tagstruct_getu32(t, &channel) < 0 ||
693 pa_tagstruct_gets(t, &event) < 0 ||
694 pa_tagstruct_get_proplist(t, pl) < 0 ||
695 !pa_tagstruct_eof(t) || !event) {
696 pa_context_fail(c, PA_ERR_PROTOCOL);
697 goto finish;
700 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
701 goto finish;
703 if (s->state != PA_STREAM_READY)
704 goto finish;
706 if (s->event_callback)
707 s->event_callback(s, event, pl, s->event_userdata);
709 finish:
710 pa_context_unref(c);
712 if (pl)
713 pa_proplist_free(pl);
716 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717 pa_stream *s;
718 pa_context *c = userdata;
719 uint32_t bytes, channel;
721 pa_assert(pd);
722 pa_assert(command == PA_COMMAND_REQUEST);
723 pa_assert(t);
724 pa_assert(c);
725 pa_assert(PA_REFCNT_VALUE(c) >= 1);
727 pa_context_ref(c);
729 if (pa_tagstruct_getu32(t, &channel) < 0 ||
730 pa_tagstruct_getu32(t, &bytes) < 0 ||
731 !pa_tagstruct_eof(t)) {
732 pa_context_fail(c, PA_ERR_PROTOCOL);
733 goto finish;
736 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
737 goto finish;
739 if (s->state != PA_STREAM_READY)
740 goto finish;
742 s->requested_bytes += bytes;
744 if (s->requested_bytes > 0 && s->write_callback)
745 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
747 finish:
748 pa_context_unref(c);
751 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
752 pa_stream *s;
753 pa_context *c = userdata;
754 uint32_t channel;
756 pa_assert(pd);
757 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
758 pa_assert(t);
759 pa_assert(c);
760 pa_assert(PA_REFCNT_VALUE(c) >= 1);
762 pa_context_ref(c);
764 if (pa_tagstruct_getu32(t, &channel) < 0 ||
765 !pa_tagstruct_eof(t)) {
766 pa_context_fail(c, PA_ERR_PROTOCOL);
767 goto finish;
770 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
771 goto finish;
773 if (s->state != PA_STREAM_READY)
774 goto finish;
776 if (s->buffer_attr.prebuf > 0)
777 check_smoother_status(s, TRUE, FALSE, TRUE);
779 request_auto_timing_update(s, TRUE);
781 if (command == PA_COMMAND_OVERFLOW) {
782 if (s->overflow_callback)
783 s->overflow_callback(s, s->overflow_userdata);
784 } else if (command == PA_COMMAND_UNDERFLOW) {
785 if (s->underflow_callback)
786 s->underflow_callback(s, s->underflow_userdata);
789 finish:
790 pa_context_unref(c);
793 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
794 pa_assert(s);
795 pa_assert(PA_REFCNT_VALUE(s) >= 1);
797 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
799 if (s->state != PA_STREAM_READY)
800 return;
802 if (w) {
803 s->write_index_not_before = s->context->ctag;
805 if (s->timing_info_valid)
806 s->timing_info.write_index_corrupt = TRUE;
808 /* pa_log("write_index invalidated"); */
811 if (r) {
812 s->read_index_not_before = s->context->ctag;
814 if (s->timing_info_valid)
815 s->timing_info.read_index_corrupt = TRUE;
817 /* pa_log("read_index invalidated"); */
820 request_auto_timing_update(s, TRUE);
823 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
824 pa_stream *s = userdata;
826 pa_assert(s);
827 pa_assert(PA_REFCNT_VALUE(s) >= 1);
829 pa_stream_ref(s);
830 request_auto_timing_update(s, FALSE);
831 pa_stream_unref(s);
834 static void create_stream_complete(pa_stream *s) {
835 pa_assert(s);
836 pa_assert(PA_REFCNT_VALUE(s) >= 1);
837 pa_assert(s->state == PA_STREAM_CREATING);
839 pa_stream_set_state(s, PA_STREAM_READY);
841 if (s->requested_bytes > 0 && s->write_callback)
842 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
844 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
845 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
846 pa_assert(!s->auto_timing_update_event);
847 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
849 request_auto_timing_update(s, TRUE);
852 check_smoother_status(s, TRUE, FALSE, FALSE);
855 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
856 pa_assert(s);
857 pa_assert(attr);
858 pa_assert(ss);
860 if (s->context->version >= 13)
861 return;
863 /* Version older than 0.9.10 didn't do server side buffer_attr
864 * selection, hence we have to fake it on the client side. */
866 /* We choose fairly conservative values here, to not confuse
867 * old clients with extremely large playback buffers */
869 if (attr->maxlength == (uint32_t) -1)
870 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
872 if (attr->tlength == (uint32_t) -1)
873 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
875 if (attr->minreq == (uint32_t) -1)
876 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
878 if (attr->prebuf == (uint32_t) -1)
879 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
881 if (attr->fragsize == (uint32_t) -1)
882 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
885 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
886 pa_stream *s = userdata;
887 uint32_t requested_bytes = 0;
889 pa_assert(pd);
890 pa_assert(s);
891 pa_assert(PA_REFCNT_VALUE(s) >= 1);
892 pa_assert(s->state == PA_STREAM_CREATING);
894 pa_stream_ref(s);
896 if (command != PA_COMMAND_REPLY) {
897 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
898 goto finish;
900 pa_stream_set_state(s, PA_STREAM_FAILED);
901 goto finish;
904 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
905 s->channel == PA_INVALID_INDEX ||
906 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
907 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
908 pa_context_fail(s->context, PA_ERR_PROTOCOL);
909 goto finish;
912 s->requested_bytes = (int64_t) requested_bytes;
914 if (s->context->version >= 9) {
915 if (s->direction == PA_STREAM_PLAYBACK) {
916 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
917 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
918 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
919 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
920 pa_context_fail(s->context, PA_ERR_PROTOCOL);
921 goto finish;
923 } else if (s->direction == PA_STREAM_RECORD) {
924 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
925 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
926 pa_context_fail(s->context, PA_ERR_PROTOCOL);
927 goto finish;
932 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
933 pa_sample_spec ss;
934 pa_channel_map cm;
935 const char *dn = NULL;
936 pa_bool_t suspended;
938 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
939 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
940 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
941 pa_tagstruct_gets(t, &dn) < 0 ||
942 pa_tagstruct_get_boolean(t, &suspended) < 0) {
943 pa_context_fail(s->context, PA_ERR_PROTOCOL);
944 goto finish;
947 if (!dn || s->device_index == PA_INVALID_INDEX ||
948 ss.channels != cm.channels ||
949 !pa_channel_map_valid(&cm) ||
950 !pa_sample_spec_valid(&ss) ||
951 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
952 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
953 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
954 pa_context_fail(s->context, PA_ERR_PROTOCOL);
955 goto finish;
958 pa_xfree(s->device_name);
959 s->device_name = pa_xstrdup(dn);
960 s->suspended = suspended;
962 s->channel_map = cm;
963 s->sample_spec = ss;
966 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
967 pa_usec_t usec;
969 if (pa_tagstruct_get_usec(t, &usec) < 0) {
970 pa_context_fail(s->context, PA_ERR_PROTOCOL);
971 goto finish;
974 if (s->direction == PA_STREAM_RECORD)
975 s->timing_info.configured_source_usec = usec;
976 else
977 s->timing_info.configured_sink_usec = usec;
980 if (!pa_tagstruct_eof(t)) {
981 pa_context_fail(s->context, PA_ERR_PROTOCOL);
982 goto finish;
985 if (s->direction == PA_STREAM_RECORD) {
986 pa_assert(!s->record_memblockq);
988 s->record_memblockq = pa_memblockq_new(
990 s->buffer_attr.maxlength,
992 pa_frame_size(&s->sample_spec),
996 NULL);
999 s->channel_valid = TRUE;
1000 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
1002 create_stream_complete(s);
1004 finish:
1005 pa_stream_unref(s);
1008 static int create_stream(
1009 pa_stream_direction_t direction,
1010 pa_stream *s,
1011 const char *dev,
1012 const pa_buffer_attr *attr,
1013 pa_stream_flags_t flags,
1014 const pa_cvolume *volume,
1015 pa_stream *sync_stream) {
1017 pa_tagstruct *t;
1018 uint32_t tag;
1019 pa_bool_t volume_set = FALSE;
1021 pa_assert(s);
1022 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1023 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1025 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1026 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1027 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1028 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1029 PA_STREAM_INTERPOLATE_TIMING|
1030 PA_STREAM_NOT_MONOTONIC|
1031 PA_STREAM_AUTO_TIMING_UPDATE|
1032 PA_STREAM_NO_REMAP_CHANNELS|
1033 PA_STREAM_NO_REMIX_CHANNELS|
1034 PA_STREAM_FIX_FORMAT|
1035 PA_STREAM_FIX_RATE|
1036 PA_STREAM_FIX_CHANNELS|
1037 PA_STREAM_DONT_MOVE|
1038 PA_STREAM_VARIABLE_RATE|
1039 PA_STREAM_PEAK_DETECT|
1040 PA_STREAM_START_MUTED|
1041 PA_STREAM_ADJUST_LATENCY|
1042 PA_STREAM_EARLY_REQUESTS|
1043 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1044 PA_STREAM_START_UNMUTED|
1045 PA_STREAM_FAIL_ON_SUSPEND|
1046 PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1048 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1049 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1050 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1051 /* Althought some of the other flags are not supported on older
1052 * version, we don't check for them here, because it doesn't hurt
1053 * when they are passed but actually not supported. This makes
1054 * client development easier */
1056 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1057 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1058 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1059 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1060 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1062 pa_stream_ref(s);
1064 s->direction = direction;
1065 s->flags = flags;
1066 s->corked = !!(flags & PA_STREAM_START_CORKED);
1068 if (sync_stream)
1069 s->syncid = sync_stream->syncid;
1071 if (attr)
1072 s->buffer_attr = *attr;
1073 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1075 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1076 pa_usec_t x;
1078 x = pa_rtclock_now();
1080 pa_assert(!s->smoother);
1081 s->smoother = pa_smoother_new(
1082 SMOOTHER_ADJUST_TIME,
1083 SMOOTHER_HISTORY_TIME,
1084 !(flags & PA_STREAM_NOT_MONOTONIC),
1085 TRUE,
1086 SMOOTHER_MIN_HISTORY,
1088 TRUE);
1091 if (!dev)
1092 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1094 t = pa_tagstruct_command(
1095 s->context,
1096 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1097 &tag);
1099 if (s->context->version < 13)
1100 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1102 pa_tagstruct_put(
1104 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1105 PA_TAG_CHANNEL_MAP, &s->channel_map,
1106 PA_TAG_U32, PA_INVALID_INDEX,
1107 PA_TAG_STRING, dev,
1108 PA_TAG_U32, s->buffer_attr.maxlength,
1109 PA_TAG_BOOLEAN, s->corked,
1110 PA_TAG_INVALID);
1112 if (s->direction == PA_STREAM_PLAYBACK) {
1113 pa_cvolume cv;
1115 pa_tagstruct_put(
1117 PA_TAG_U32, s->buffer_attr.tlength,
1118 PA_TAG_U32, s->buffer_attr.prebuf,
1119 PA_TAG_U32, s->buffer_attr.minreq,
1120 PA_TAG_U32, s->syncid,
1121 PA_TAG_INVALID);
1123 volume_set = !!volume;
1125 if (!volume)
1126 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1128 pa_tagstruct_put_cvolume(t, volume);
1129 } else
1130 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1132 if (s->context->version >= 12) {
1133 pa_tagstruct_put(
1135 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1136 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1137 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1138 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1139 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1140 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1141 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1142 PA_TAG_INVALID);
1145 if (s->context->version >= 13) {
1147 if (s->direction == PA_STREAM_PLAYBACK)
1148 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1149 else
1150 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1152 pa_tagstruct_put(
1154 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1155 PA_TAG_PROPLIST, s->proplist,
1156 PA_TAG_INVALID);
1158 if (s->direction == PA_STREAM_RECORD)
1159 pa_tagstruct_putu32(t, s->direct_on_input);
1162 if (s->context->version >= 14) {
1164 if (s->direction == PA_STREAM_PLAYBACK)
1165 pa_tagstruct_put_boolean(t, volume_set);
1167 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1170 if (s->context->version >= 15) {
1172 if (s->direction == PA_STREAM_PLAYBACK)
1173 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1175 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1176 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1179 if (s->context->version >= 17) {
1181 if (s->direction == PA_STREAM_PLAYBACK)
1182 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1186 pa_pstream_send_tagstruct(s->context->pstream, t);
1187 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1189 pa_stream_set_state(s, PA_STREAM_CREATING);
1191 pa_stream_unref(s);
1192 return 0;
1195 int pa_stream_connect_playback(
1196 pa_stream *s,
1197 const char *dev,
1198 const pa_buffer_attr *attr,
1199 pa_stream_flags_t flags,
1200 const pa_cvolume *volume,
1201 pa_stream *sync_stream) {
1203 pa_assert(s);
1204 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1206 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1209 int pa_stream_connect_record(
1210 pa_stream *s,
1211 const char *dev,
1212 const pa_buffer_attr *attr,
1213 pa_stream_flags_t flags) {
1215 pa_assert(s);
1216 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1218 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1221 int pa_stream_begin_write(
1222 pa_stream *s,
1223 void **data,
1224 size_t *nbytes) {
1226 pa_assert(s);
1227 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1229 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1230 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1231 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1232 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1233 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1235 if (*nbytes != (size_t) -1) {
1236 size_t m, fs;
1238 m = pa_mempool_block_size_max(s->context->mempool);
1239 fs = pa_frame_size(&s->sample_spec);
1241 m = (m / fs) * fs;
1242 if (*nbytes > m)
1243 *nbytes = m;
1246 if (!s->write_memblock) {
1247 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1248 s->write_data = pa_memblock_acquire(s->write_memblock);
1251 *data = s->write_data;
1252 *nbytes = pa_memblock_get_length(s->write_memblock);
1254 return 0;
1257 int pa_stream_cancel_write(
1258 pa_stream *s) {
1260 pa_assert(s);
1261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1263 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1264 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1265 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1266 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1268 pa_assert(s->write_data);
1270 pa_memblock_release(s->write_memblock);
1271 pa_memblock_unref(s->write_memblock);
1272 s->write_memblock = NULL;
1273 s->write_data = NULL;
1275 return 0;
1278 int pa_stream_write(
1279 pa_stream *s,
1280 const void *data,
1281 size_t length,
1282 pa_free_cb_t free_cb,
1283 int64_t offset,
1284 pa_seek_mode_t seek) {
1286 pa_assert(s);
1287 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1288 pa_assert(data);
1290 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1291 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1292 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1293 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1294 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1295 PA_CHECK_VALIDITY(s->context,
1296 !s->write_memblock ||
1297 ((data >= s->write_data) &&
1298 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1299 PA_ERR_INVALID);
1300 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1302 if (s->write_memblock) {
1303 pa_memchunk chunk;
1305 /* pa_stream_write_begin() was called before */
1307 pa_memblock_release(s->write_memblock);
1309 chunk.memblock = s->write_memblock;
1310 chunk.index = (const char *) data - (const char *) s->write_data;
1311 chunk.length = length;
1313 s->write_memblock = NULL;
1314 s->write_data = NULL;
1316 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1317 pa_memblock_unref(chunk.memblock);
1319 } else {
1320 pa_seek_mode_t t_seek = seek;
1321 int64_t t_offset = offset;
1322 size_t t_length = length;
1323 const void *t_data = data;
1325 /* pa_stream_write_begin() was not called before */
1327 while (t_length > 0) {
1328 pa_memchunk chunk;
1330 chunk.index = 0;
1332 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1333 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1334 chunk.length = t_length;
1335 } else {
1336 void *d;
1338 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1339 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1341 d = pa_memblock_acquire(chunk.memblock);
1342 memcpy(d, t_data, chunk.length);
1343 pa_memblock_release(chunk.memblock);
1346 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1348 t_offset = 0;
1349 t_seek = PA_SEEK_RELATIVE;
1351 t_data = (const uint8_t*) t_data + chunk.length;
1352 t_length -= chunk.length;
1354 pa_memblock_unref(chunk.memblock);
1357 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1358 free_cb((void*) data);
1361 /* This is obviously wrong since we ignore the seeking index . But
1362 * that's OK, the server side applies the same error */
1363 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1365 if (s->direction == PA_STREAM_PLAYBACK) {
1367 /* Update latency request correction */
1368 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1370 if (seek == PA_SEEK_ABSOLUTE) {
1371 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1372 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1373 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1374 } else if (seek == PA_SEEK_RELATIVE) {
1375 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1376 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1377 } else
1378 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1381 /* Update the write index in the already available latency data */
1382 if (s->timing_info_valid) {
1384 if (seek == PA_SEEK_ABSOLUTE) {
1385 s->timing_info.write_index_corrupt = FALSE;
1386 s->timing_info.write_index = offset + (int64_t) length;
1387 } else if (seek == PA_SEEK_RELATIVE) {
1388 if (!s->timing_info.write_index_corrupt)
1389 s->timing_info.write_index += offset + (int64_t) length;
1390 } else
1391 s->timing_info.write_index_corrupt = TRUE;
1394 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1395 request_auto_timing_update(s, TRUE);
1398 return 0;
1401 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1402 pa_assert(s);
1403 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1404 pa_assert(data);
1405 pa_assert(length);
1407 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1408 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1409 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1411 if (!s->peek_memchunk.memblock) {
1413 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1414 *data = NULL;
1415 *length = 0;
1416 return 0;
1419 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1422 pa_assert(s->peek_data);
1423 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1424 *length = s->peek_memchunk.length;
1425 return 0;
1428 int pa_stream_drop(pa_stream *s) {
1429 pa_assert(s);
1430 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1432 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1433 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1434 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1435 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1437 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1439 /* Fix the simulated local read index */
1440 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1441 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1443 pa_assert(s->peek_data);
1444 pa_memblock_release(s->peek_memchunk.memblock);
1445 pa_memblock_unref(s->peek_memchunk.memblock);
1446 pa_memchunk_reset(&s->peek_memchunk);
1448 return 0;
1451 size_t pa_stream_writable_size(pa_stream *s) {
1452 pa_assert(s);
1453 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1455 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1456 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1457 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1459 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1462 size_t pa_stream_readable_size(pa_stream *s) {
1463 pa_assert(s);
1464 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1466 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1467 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1468 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1470 return pa_memblockq_get_length(s->record_memblockq);
1473 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1474 pa_operation *o;
1475 pa_tagstruct *t;
1476 uint32_t tag;
1478 pa_assert(s);
1479 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1481 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1482 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1483 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1485 /* Ask for a timing update before we cork/uncork to get the best
1486 * accuracy for the transport latency suitable for the
1487 * check_smoother_status() call in the started callback */
1488 request_auto_timing_update(s, TRUE);
1490 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1492 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1493 pa_tagstruct_putu32(t, s->channel);
1494 pa_pstream_send_tagstruct(s->context->pstream, t);
1495 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1497 /* This might cause the read index to conitnue again, hence
1498 * let's request a timing update */
1499 request_auto_timing_update(s, TRUE);
1501 return o;
1504 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1505 pa_usec_t usec;
1507 pa_assert(s);
1508 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1509 pa_assert(s->state == PA_STREAM_READY);
1510 pa_assert(s->direction != PA_STREAM_UPLOAD);
1511 pa_assert(s->timing_info_valid);
1512 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1513 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1515 if (s->direction == PA_STREAM_PLAYBACK) {
1516 /* The last byte that was written into the output device
1517 * had this time value associated */
1518 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1520 if (!s->corked && !s->suspended) {
1522 if (!ignore_transport)
1523 /* Because the latency info took a little time to come
1524 * to us, we assume that the real output time is actually
1525 * a little ahead */
1526 usec += s->timing_info.transport_usec;
1528 /* However, the output device usually maintains a buffer
1529 too, hence the real sample currently played is a little
1530 back */
1531 if (s->timing_info.sink_usec >= usec)
1532 usec = 0;
1533 else
1534 usec -= s->timing_info.sink_usec;
1537 } else {
1538 pa_assert(s->direction == PA_STREAM_RECORD);
1540 /* The last byte written into the server side queue had
1541 * this time value associated */
1542 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1544 if (!s->corked && !s->suspended) {
1546 if (!ignore_transport)
1547 /* Add transport latency */
1548 usec += s->timing_info.transport_usec;
1550 /* Add latency of data in device buffer */
1551 usec += s->timing_info.source_usec;
1553 /* If this is a monitor source, we need to correct the
1554 * time by the playback device buffer */
1555 if (s->timing_info.sink_usec >= usec)
1556 usec = 0;
1557 else
1558 usec -= s->timing_info.sink_usec;
1562 return usec;
1565 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1566 pa_operation *o = userdata;
1567 struct timeval local, remote, now;
1568 pa_timing_info *i;
1569 pa_bool_t playing = FALSE;
1570 uint64_t underrun_for = 0, playing_for = 0;
1572 pa_assert(pd);
1573 pa_assert(o);
1574 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1576 if (!o->context || !o->stream)
1577 goto finish;
1579 i = &o->stream->timing_info;
1581 o->stream->timing_info_valid = FALSE;
1582 i->write_index_corrupt = TRUE;
1583 i->read_index_corrupt = TRUE;
1585 if (command != PA_COMMAND_REPLY) {
1586 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1587 goto finish;
1589 } else {
1591 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1592 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1593 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1594 pa_tagstruct_get_timeval(t, &local) < 0 ||
1595 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1596 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1597 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1599 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1600 goto finish;
1603 if (o->context->version >= 13 &&
1604 o->stream->direction == PA_STREAM_PLAYBACK)
1605 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1606 pa_tagstruct_getu64(t, &playing_for) < 0) {
1608 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1609 goto finish;
1613 if (!pa_tagstruct_eof(t)) {
1614 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1615 goto finish;
1617 o->stream->timing_info_valid = TRUE;
1618 i->write_index_corrupt = FALSE;
1619 i->read_index_corrupt = FALSE;
1621 i->playing = (int) playing;
1622 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1624 pa_gettimeofday(&now);
1626 /* Calculcate timestamps */
1627 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1628 /* local and remote seem to have synchronized clocks */
1630 if (o->stream->direction == PA_STREAM_PLAYBACK)
1631 i->transport_usec = pa_timeval_diff(&remote, &local);
1632 else
1633 i->transport_usec = pa_timeval_diff(&now, &remote);
1635 i->synchronized_clocks = TRUE;
1636 i->timestamp = remote;
1637 } else {
1638 /* clocks are not synchronized, let's estimate latency then */
1639 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1640 i->synchronized_clocks = FALSE;
1641 i->timestamp = local;
1642 pa_timeval_add(&i->timestamp, i->transport_usec);
1645 /* Invalidate read and write indexes if necessary */
1646 if (tag < o->stream->read_index_not_before)
1647 i->read_index_corrupt = TRUE;
1649 if (tag < o->stream->write_index_not_before)
1650 i->write_index_corrupt = TRUE;
1652 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1653 /* Write index correction */
1655 int n, j;
1656 uint32_t ctag = tag;
1658 /* Go through the saved correction values and add up the
1659 * total correction.*/
1660 for (n = 0, j = o->stream->current_write_index_correction+1;
1661 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1662 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1664 /* Step over invalid data or out-of-date data */
1665 if (!o->stream->write_index_corrections[j].valid ||
1666 o->stream->write_index_corrections[j].tag < ctag)
1667 continue;
1669 /* Make sure that everything is in order */
1670 ctag = o->stream->write_index_corrections[j].tag+1;
1672 /* Now fix the write index */
1673 if (o->stream->write_index_corrections[j].corrupt) {
1674 /* A corrupting seek was made */
1675 i->write_index_corrupt = TRUE;
1676 } else if (o->stream->write_index_corrections[j].absolute) {
1677 /* An absolute seek was made */
1678 i->write_index = o->stream->write_index_corrections[j].value;
1679 i->write_index_corrupt = FALSE;
1680 } else if (!i->write_index_corrupt) {
1681 /* A relative seek was made */
1682 i->write_index += o->stream->write_index_corrections[j].value;
1686 /* Clear old correction entries */
1687 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1688 if (!o->stream->write_index_corrections[n].valid)
1689 continue;
1691 if (o->stream->write_index_corrections[n].tag <= tag)
1692 o->stream->write_index_corrections[n].valid = FALSE;
1696 if (o->stream->direction == PA_STREAM_RECORD) {
1697 /* Read index correction */
1699 if (!i->read_index_corrupt)
1700 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1703 /* Update smoother */
1704 if (o->stream->smoother) {
1705 pa_usec_t u, x;
1707 u = x = pa_rtclock_now() - i->transport_usec;
1709 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1710 pa_usec_t su;
1712 /* If we weren't playing then it will take some time
1713 * until the audio will actually come out through the
1714 * speakers. Since we follow that timing here, we need
1715 * to try to fix this up */
1717 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1719 if (su < i->sink_usec)
1720 x += i->sink_usec - su;
1723 if (!i->playing)
1724 pa_smoother_pause(o->stream->smoother, x);
1726 /* Update the smoother */
1727 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1728 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1729 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1731 if (i->playing)
1732 pa_smoother_resume(o->stream->smoother, x, TRUE);
1736 o->stream->auto_timing_update_requested = FALSE;
1738 if (o->stream->latency_update_callback)
1739 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1741 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1742 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1743 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1746 finish:
1748 pa_operation_done(o);
1749 pa_operation_unref(o);
1752 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1753 uint32_t tag;
1754 pa_operation *o;
1755 pa_tagstruct *t;
1756 struct timeval now;
1757 int cidx = 0;
1759 pa_assert(s);
1760 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1762 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1763 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1764 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1766 if (s->direction == PA_STREAM_PLAYBACK) {
1767 /* Find a place to store the write_index correction data for this entry */
1768 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1770 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1771 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1773 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1775 t = pa_tagstruct_command(
1776 s->context,
1777 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1778 &tag);
1779 pa_tagstruct_putu32(t, s->channel);
1780 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1782 pa_pstream_send_tagstruct(s->context->pstream, t);
1783 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1785 if (s->direction == PA_STREAM_PLAYBACK) {
1786 /* Fill in initial correction data */
1788 s->current_write_index_correction = cidx;
1790 s->write_index_corrections[cidx].valid = TRUE;
1791 s->write_index_corrections[cidx].absolute = FALSE;
1792 s->write_index_corrections[cidx].corrupt = FALSE;
1793 s->write_index_corrections[cidx].tag = tag;
1794 s->write_index_corrections[cidx].value = 0;
1797 return o;
1800 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1801 pa_stream *s = userdata;
1803 pa_assert(pd);
1804 pa_assert(s);
1805 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1807 pa_stream_ref(s);
1809 if (command != PA_COMMAND_REPLY) {
1810 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1811 goto finish;
1813 pa_stream_set_state(s, PA_STREAM_FAILED);
1814 goto finish;
1815 } else if (!pa_tagstruct_eof(t)) {
1816 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1817 goto finish;
1820 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1822 finish:
1823 pa_stream_unref(s);
1826 int pa_stream_disconnect(pa_stream *s) {
1827 pa_tagstruct *t;
1828 uint32_t tag;
1830 pa_assert(s);
1831 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1833 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1834 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1835 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1837 pa_stream_ref(s);
1839 t = pa_tagstruct_command(
1840 s->context,
1841 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1842 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1843 &tag);
1844 pa_tagstruct_putu32(t, s->channel);
1845 pa_pstream_send_tagstruct(s->context->pstream, t);
1846 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1848 pa_stream_unref(s);
1849 return 0;
1852 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1853 pa_assert(s);
1854 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1856 if (pa_detect_fork())
1857 return;
1859 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1860 return;
1862 s->read_callback = cb;
1863 s->read_userdata = userdata;
1866 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1867 pa_assert(s);
1868 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1870 if (pa_detect_fork())
1871 return;
1873 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1874 return;
1876 s->write_callback = cb;
1877 s->write_userdata = userdata;
1880 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1881 pa_assert(s);
1882 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1884 if (pa_detect_fork())
1885 return;
1887 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1888 return;
1890 s->state_callback = cb;
1891 s->state_userdata = userdata;
1894 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1895 pa_assert(s);
1896 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1898 if (pa_detect_fork())
1899 return;
1901 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1902 return;
1904 s->overflow_callback = cb;
1905 s->overflow_userdata = userdata;
1908 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1909 pa_assert(s);
1910 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1912 if (pa_detect_fork())
1913 return;
1915 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1916 return;
1918 s->underflow_callback = cb;
1919 s->underflow_userdata = userdata;
1922 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1923 pa_assert(s);
1924 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1926 if (pa_detect_fork())
1927 return;
1929 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1930 return;
1932 s->latency_update_callback = cb;
1933 s->latency_update_userdata = userdata;
1936 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1937 pa_assert(s);
1938 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1940 if (pa_detect_fork())
1941 return;
1943 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1944 return;
1946 s->moved_callback = cb;
1947 s->moved_userdata = userdata;
1950 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1951 pa_assert(s);
1952 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1954 if (pa_detect_fork())
1955 return;
1957 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1958 return;
1960 s->suspended_callback = cb;
1961 s->suspended_userdata = userdata;
1964 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1965 pa_assert(s);
1966 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1968 if (pa_detect_fork())
1969 return;
1971 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1972 return;
1974 s->started_callback = cb;
1975 s->started_userdata = userdata;
1978 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1979 pa_assert(s);
1980 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1982 if (pa_detect_fork())
1983 return;
1985 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1986 return;
1988 s->event_callback = cb;
1989 s->event_userdata = userdata;
1992 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1993 pa_assert(s);
1994 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1996 if (pa_detect_fork())
1997 return;
1999 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2000 return;
2002 s->buffer_attr_callback = cb;
2003 s->buffer_attr_userdata = userdata;
2006 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2007 pa_operation *o = userdata;
2008 int success = 1;
2010 pa_assert(pd);
2011 pa_assert(o);
2012 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2014 if (!o->context)
2015 goto finish;
2017 if (command != PA_COMMAND_REPLY) {
2018 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2019 goto finish;
2021 success = 0;
2022 } else if (!pa_tagstruct_eof(t)) {
2023 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2024 goto finish;
2027 if (o->callback) {
2028 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2029 cb(o->stream, success, o->userdata);
2032 finish:
2033 pa_operation_done(o);
2034 pa_operation_unref(o);
2037 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2038 pa_operation *o;
2039 pa_tagstruct *t;
2040 uint32_t tag;
2042 pa_assert(s);
2043 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2045 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2046 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2047 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2049 /* Ask for a timing update before we cork/uncork to get the best
2050 * accuracy for the transport latency suitable for the
2051 * check_smoother_status() call in the started callback */
2052 request_auto_timing_update(s, TRUE);
2054 s->corked = b;
2056 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2058 t = pa_tagstruct_command(
2059 s->context,
2060 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2061 &tag);
2062 pa_tagstruct_putu32(t, s->channel);
2063 pa_tagstruct_put_boolean(t, !!b);
2064 pa_pstream_send_tagstruct(s->context->pstream, t);
2065 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2067 check_smoother_status(s, FALSE, FALSE, FALSE);
2069 /* This might cause the indexes to hang/start again, hence let's
2070 * request a timing update, after the cork/uncork, too */
2071 request_auto_timing_update(s, TRUE);
2073 return o;
2076 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2077 pa_tagstruct *t;
2078 pa_operation *o;
2079 uint32_t tag;
2081 pa_assert(s);
2082 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2084 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2085 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2087 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2089 t = pa_tagstruct_command(s->context, command, &tag);
2090 pa_tagstruct_putu32(t, s->channel);
2091 pa_pstream_send_tagstruct(s->context->pstream, t);
2092 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2094 return o;
2097 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2098 pa_operation *o;
2100 pa_assert(s);
2101 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2103 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2104 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2105 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2107 /* Ask for a timing update *before* the flush, so that the
2108 * transport usec is as up to date as possible when we get the
2109 * underflow message and update the smoother status*/
2110 request_auto_timing_update(s, TRUE);
2112 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2113 return NULL;
2115 if (s->direction == PA_STREAM_PLAYBACK) {
2117 if (s->write_index_corrections[s->current_write_index_correction].valid)
2118 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2120 if (s->buffer_attr.prebuf > 0)
2121 check_smoother_status(s, FALSE, FALSE, TRUE);
2123 /* This will change the write index, but leave the
2124 * read index untouched. */
2125 invalidate_indexes(s, FALSE, TRUE);
2127 } else
2128 /* For record streams this has no influence on the write
2129 * index, but the read index might jump. */
2130 invalidate_indexes(s, TRUE, FALSE);
2132 return o;
2135 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2136 pa_operation *o;
2138 pa_assert(s);
2139 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2141 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2142 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2143 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2144 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2146 /* Ask for a timing update before we cork/uncork to get the best
2147 * accuracy for the transport latency suitable for the
2148 * check_smoother_status() call in the started callback */
2149 request_auto_timing_update(s, TRUE);
2151 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2152 return NULL;
2154 /* This might cause the read index to hang again, hence
2155 * let's request a timing update */
2156 request_auto_timing_update(s, TRUE);
2158 return o;
2161 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2162 pa_operation *o;
2164 pa_assert(s);
2165 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2167 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2168 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2169 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2170 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2172 /* Ask for a timing update before we cork/uncork to get the best
2173 * accuracy for the transport latency suitable for the
2174 * check_smoother_status() call in the started callback */
2175 request_auto_timing_update(s, TRUE);
2177 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2178 return NULL;
2180 /* This might cause the read index to start moving again, hence
2181 * let's request a timing update */
2182 request_auto_timing_update(s, TRUE);
2184 return o;
2187 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2188 pa_operation *o;
2190 pa_assert(s);
2191 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2192 pa_assert(name);
2194 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2195 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2196 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2198 if (s->context->version >= 13) {
2199 pa_proplist *p = pa_proplist_new();
2201 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2202 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2203 pa_proplist_free(p);
2204 } else {
2205 pa_tagstruct *t;
2206 uint32_t tag;
2208 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2209 t = pa_tagstruct_command(
2210 s->context,
2211 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2212 &tag);
2213 pa_tagstruct_putu32(t, s->channel);
2214 pa_tagstruct_puts(t, name);
2215 pa_pstream_send_tagstruct(s->context->pstream, t);
2216 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2219 return o;
2222 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2223 pa_usec_t usec;
2225 pa_assert(s);
2226 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2228 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2229 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2230 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2231 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2232 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2233 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2235 if (s->smoother)
2236 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2237 else
2238 usec = calc_time(s, FALSE);
2240 /* Make sure the time runs monotonically */
2241 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2242 if (usec < s->previous_time)
2243 usec = s->previous_time;
2244 else
2245 s->previous_time = usec;
2248 if (r_usec)
2249 *r_usec = usec;
2251 return 0;
2254 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2255 pa_assert(s);
2256 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2258 if (negative)
2259 *negative = 0;
2261 if (a >= b)
2262 return a-b;
2263 else {
2264 if (negative && s->direction == PA_STREAM_RECORD) {
2265 *negative = 1;
2266 return b-a;
2267 } else
2268 return 0;
2272 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2273 pa_usec_t t, c;
2274 int r;
2275 int64_t cindex;
2277 pa_assert(s);
2278 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279 pa_assert(r_usec);
2281 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2282 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2283 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2284 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2285 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2286 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2288 if ((r = pa_stream_get_time(s, &t)) < 0)
2289 return r;
2291 if (s->direction == PA_STREAM_PLAYBACK)
2292 cindex = s->timing_info.write_index;
2293 else
2294 cindex = s->timing_info.read_index;
2296 if (cindex < 0)
2297 cindex = 0;
2299 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2301 if (s->direction == PA_STREAM_PLAYBACK)
2302 *r_usec = time_counter_diff(s, c, t, negative);
2303 else
2304 *r_usec = time_counter_diff(s, t, c, negative);
2306 return 0;
2309 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2310 pa_assert(s);
2311 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2313 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2314 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2315 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2316 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2318 return &s->timing_info;
2321 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2322 pa_assert(s);
2323 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2325 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2327 return &s->sample_spec;
2330 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2331 pa_assert(s);
2332 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2334 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2336 return &s->channel_map;
2339 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2340 pa_assert(s);
2341 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2343 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2344 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2345 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2347 return &s->buffer_attr;
2350 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2351 pa_operation *o = userdata;
2352 int success = 1;
2354 pa_assert(pd);
2355 pa_assert(o);
2356 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2358 if (!o->context)
2359 goto finish;
2361 if (command != PA_COMMAND_REPLY) {
2362 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2363 goto finish;
2365 success = 0;
2366 } else {
2367 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2368 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2369 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2370 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2371 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2372 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2373 goto finish;
2375 } else if (o->stream->direction == PA_STREAM_RECORD) {
2376 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2377 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2378 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2379 goto finish;
2383 if (o->stream->context->version >= 13) {
2384 pa_usec_t usec;
2386 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2387 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2388 goto finish;
2391 if (o->stream->direction == PA_STREAM_RECORD)
2392 o->stream->timing_info.configured_source_usec = usec;
2393 else
2394 o->stream->timing_info.configured_sink_usec = usec;
2397 if (!pa_tagstruct_eof(t)) {
2398 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2399 goto finish;
2403 if (o->callback) {
2404 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2405 cb(o->stream, success, o->userdata);
2408 finish:
2409 pa_operation_done(o);
2410 pa_operation_unref(o);
2414 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2415 pa_operation *o;
2416 pa_tagstruct *t;
2417 uint32_t tag;
2419 pa_assert(s);
2420 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2421 pa_assert(attr);
2423 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2424 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2425 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2426 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2428 /* Ask for a timing update before we cork/uncork to get the best
2429 * accuracy for the transport latency suitable for the
2430 * check_smoother_status() call in the started callback */
2431 request_auto_timing_update(s, TRUE);
2433 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2435 t = pa_tagstruct_command(
2436 s->context,
2437 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2438 &tag);
2439 pa_tagstruct_putu32(t, s->channel);
2441 pa_tagstruct_putu32(t, attr->maxlength);
2443 if (s->direction == PA_STREAM_PLAYBACK)
2444 pa_tagstruct_put(
2446 PA_TAG_U32, attr->tlength,
2447 PA_TAG_U32, attr->prebuf,
2448 PA_TAG_U32, attr->minreq,
2449 PA_TAG_INVALID);
2450 else
2451 pa_tagstruct_putu32(t, attr->fragsize);
2453 if (s->context->version >= 13)
2454 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2456 if (s->context->version >= 14)
2457 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2459 pa_pstream_send_tagstruct(s->context->pstream, t);
2460 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2462 /* This might cause changes in the read/write indexex, hence let's
2463 * request a timing update */
2464 request_auto_timing_update(s, TRUE);
2466 return o;
2469 uint32_t pa_stream_get_device_index(pa_stream *s) {
2470 pa_assert(s);
2471 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2473 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2474 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2475 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2476 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2477 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2479 return s->device_index;
2482 const char *pa_stream_get_device_name(pa_stream *s) {
2483 pa_assert(s);
2484 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2486 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2487 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2488 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2489 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2490 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2492 return s->device_name;
2495 int pa_stream_is_suspended(pa_stream *s) {
2496 pa_assert(s);
2497 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2499 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2500 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2501 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2502 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2504 return s->suspended;
2507 int pa_stream_is_corked(pa_stream *s) {
2508 pa_assert(s);
2509 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2511 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2512 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2513 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2515 return s->corked;
2518 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2519 pa_operation *o = userdata;
2520 int success = 1;
2522 pa_assert(pd);
2523 pa_assert(o);
2524 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2526 if (!o->context)
2527 goto finish;
2529 if (command != PA_COMMAND_REPLY) {
2530 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2531 goto finish;
2533 success = 0;
2534 } else {
2536 if (!pa_tagstruct_eof(t)) {
2537 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2538 goto finish;
2542 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2543 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2545 if (o->callback) {
2546 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2547 cb(o->stream, success, o->userdata);
2550 finish:
2551 pa_operation_done(o);
2552 pa_operation_unref(o);
2556 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2557 pa_operation *o;
2558 pa_tagstruct *t;
2559 uint32_t tag;
2561 pa_assert(s);
2562 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2564 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2565 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2566 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2567 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2568 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2569 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2571 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2572 o->private = PA_UINT_TO_PTR(rate);
2574 t = pa_tagstruct_command(
2575 s->context,
2576 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2577 &tag);
2578 pa_tagstruct_putu32(t, s->channel);
2579 pa_tagstruct_putu32(t, rate);
2581 pa_pstream_send_tagstruct(s->context->pstream, t);
2582 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2584 return o;
2587 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2588 pa_operation *o;
2589 pa_tagstruct *t;
2590 uint32_t tag;
2592 pa_assert(s);
2593 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2595 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2596 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2597 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2598 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2599 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2601 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2603 t = pa_tagstruct_command(
2604 s->context,
2605 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2606 &tag);
2607 pa_tagstruct_putu32(t, s->channel);
2608 pa_tagstruct_putu32(t, (uint32_t) mode);
2609 pa_tagstruct_put_proplist(t, p);
2611 pa_pstream_send_tagstruct(s->context->pstream, t);
2612 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2614 /* Please note that we don't update s->proplist here, because we
2615 * don't export that field */
2617 return o;
2620 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2621 pa_operation *o;
2622 pa_tagstruct *t;
2623 uint32_t tag;
2624 const char * const*k;
2626 pa_assert(s);
2627 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2629 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2630 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2631 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2632 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2633 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2635 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2637 t = pa_tagstruct_command(
2638 s->context,
2639 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2640 &tag);
2641 pa_tagstruct_putu32(t, s->channel);
2643 for (k = keys; *k; k++)
2644 pa_tagstruct_puts(t, *k);
2646 pa_tagstruct_puts(t, NULL);
2648 pa_pstream_send_tagstruct(s->context->pstream, t);
2649 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2651 /* Please note that we don't update s->proplist here, because we
2652 * don't export that field */
2654 return o;
2657 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2658 pa_assert(s);
2659 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2661 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2662 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2663 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2664 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2666 s->direct_on_input = sink_input_idx;
2668 return 0;
2671 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2672 pa_assert(s);
2673 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2675 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2676 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2677 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2679 return s->direct_on_input;