core: Add extended stream API to support compressed formats
[pulseaudio-mirror.git] / src / pulse / stream.c
blobf5bf42c95d1feb4008f308bc19a4495ae15d1e1d
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
44 #include "internal.h"
45 #include "stream.h"
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
83 static pa_stream *pa_stream_new_with_proplist_internal(
84 pa_context *c,
85 const char *name,
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 pa_proplist *p) {
91 pa_stream *s;
92 int i;
94 pa_assert(c);
95 pa_assert(PA_REFCNT_VALUE(c) >= 1);
96 pa_assert((ss == NULL && map == NULL) || formats == NULL);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
101 s = pa_xnew(pa_stream, 1);
102 PA_REFCNT_INIT(s);
103 s->context = c;
104 s->mainloop = c->mainloop;
106 s->direction = PA_STREAM_NODIRECTION;
107 s->state = PA_STREAM_UNCONNECTED;
108 s->flags = 0;
110 if (ss)
111 s->sample_spec = *ss;
112 else
113 s->sample_spec.format = PA_SAMPLE_INVALID;
115 if (map)
116 s->channel_map = *map;
117 else
118 pa_channel_map_init(&s->channel_map);
120 s->n_formats = 0;
121 if (formats) {
122 for (i = 0; formats[i] && i < PA_MAX_FORMATS; i++) {
123 s->n_formats++;
124 s->req_formats[i] = pa_format_info_copy(formats[i]);
126 /* Make sure the input array was NULL-terminated */
127 pa_assert(formats[i] == NULL);
130 /* We'll get the final negotiated format after connecting */
131 s->format = NULL;
133 s->direct_on_input = PA_INVALID_INDEX;
135 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136 if (name)
137 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
139 s->channel = 0;
140 s->channel_valid = FALSE;
141 s->syncid = c->csyncid++;
142 s->stream_index = PA_INVALID_INDEX;
144 s->requested_bytes = 0;
145 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
147 /* We initialize der target length here, so that if the user
148 * passes no explicit buffering metrics the default is similar to
149 * what older PA versions provided. */
151 s->buffer_attr.maxlength = (uint32_t) -1;
152 if (ss)
153 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154 else
155 /* XXX: How do we apply worst case conversion here? */
156 s->buffer_attr.minreq = (uint32_t) -1;
157 s->buffer_attr.prebuf = (uint32_t) -1;
158 s->buffer_attr.fragsize = (uint32_t) -1;
160 s->device_index = PA_INVALID_INDEX;
161 s->device_name = NULL;
162 s->suspended = FALSE;
163 s->corked = FALSE;
165 s->write_memblock = NULL;
166 s->write_data = NULL;
168 pa_memchunk_reset(&s->peek_memchunk);
169 s->peek_data = NULL;
170 s->record_memblockq = NULL;
172 memset(&s->timing_info, 0, sizeof(s->timing_info));
173 s->timing_info_valid = FALSE;
175 s->previous_time = 0;
177 s->read_index_not_before = 0;
178 s->write_index_not_before = 0;
179 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
180 s->write_index_corrections[i].valid = 0;
181 s->current_write_index_correction = 0;
183 s->auto_timing_update_event = NULL;
184 s->auto_timing_update_requested = FALSE;
185 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
187 reset_callbacks(s);
189 s->smoother = NULL;
191 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
192 PA_LLIST_PREPEND(pa_stream, c->streams, s);
193 pa_stream_ref(s);
195 return s;
198 pa_stream *pa_stream_new_with_proplist(
199 pa_context *c,
200 const char *name,
201 const pa_sample_spec *ss,
202 const pa_channel_map *map,
203 pa_proplist *p) {
205 pa_channel_map tmap;
207 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
208 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
209 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
210 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
211 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
213 if (!map)
214 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
216 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, p);
219 pa_stream *pa_stream_new_extended(
220 pa_context *c,
221 const char *name,
222 pa_format_info * const *formats,
223 pa_proplist *p) {
225 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
227 /* XXX: For the single-format PCM case, pass ss/map instead of formats */
229 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, p);
232 static void stream_unlink(pa_stream *s) {
233 pa_operation *o, *n;
234 pa_assert(s);
236 if (!s->context)
237 return;
239 /* Detach from context */
241 /* Unref all operatio object that point to us */
242 for (o = s->context->operations; o; o = n) {
243 n = o->next;
245 if (o->stream == s)
246 pa_operation_cancel(o);
249 /* Drop all outstanding replies for this stream */
250 if (s->context->pdispatch)
251 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
253 if (s->channel_valid) {
254 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
255 s->channel = 0;
256 s->channel_valid = FALSE;
259 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
260 pa_stream_unref(s);
262 s->context = NULL;
264 if (s->auto_timing_update_event) {
265 pa_assert(s->mainloop);
266 s->mainloop->time_free(s->auto_timing_update_event);
269 reset_callbacks(s);
272 static void stream_free(pa_stream *s) {
273 unsigned int i;
275 pa_assert(s);
277 stream_unlink(s);
279 if (s->write_memblock) {
280 pa_memblock_release(s->write_memblock);
281 pa_memblock_unref(s->write_data);
284 if (s->peek_memchunk.memblock) {
285 if (s->peek_data)
286 pa_memblock_release(s->peek_memchunk.memblock);
287 pa_memblock_unref(s->peek_memchunk.memblock);
290 if (s->record_memblockq)
291 pa_memblockq_free(s->record_memblockq);
293 if (s->proplist)
294 pa_proplist_free(s->proplist);
296 if (s->smoother)
297 pa_smoother_free(s->smoother);
299 for (i = 0; i < s->n_formats; i++)
300 pa_xfree(s->req_formats[i]);
302 pa_xfree(s->device_name);
303 pa_xfree(s);
306 void pa_stream_unref(pa_stream *s) {
307 pa_assert(s);
308 pa_assert(PA_REFCNT_VALUE(s) >= 1);
310 if (PA_REFCNT_DEC(s) <= 0)
311 stream_free(s);
314 pa_stream* pa_stream_ref(pa_stream *s) {
315 pa_assert(s);
316 pa_assert(PA_REFCNT_VALUE(s) >= 1);
318 PA_REFCNT_INC(s);
319 return s;
322 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
323 pa_assert(s);
324 pa_assert(PA_REFCNT_VALUE(s) >= 1);
326 return s->state;
329 pa_context* pa_stream_get_context(pa_stream *s) {
330 pa_assert(s);
331 pa_assert(PA_REFCNT_VALUE(s) >= 1);
333 return s->context;
336 uint32_t pa_stream_get_index(pa_stream *s) {
337 pa_assert(s);
338 pa_assert(PA_REFCNT_VALUE(s) >= 1);
340 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
341 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
343 return s->stream_index;
346 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
347 pa_assert(s);
348 pa_assert(PA_REFCNT_VALUE(s) >= 1);
350 if (s->state == st)
351 return;
353 pa_stream_ref(s);
355 s->state = st;
357 if (s->state_callback)
358 s->state_callback(s, s->state_userdata);
360 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
361 stream_unlink(s);
363 pa_stream_unref(s);
366 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
367 pa_assert(s);
368 pa_assert(PA_REFCNT_VALUE(s) >= 1);
370 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
371 return;
373 if (s->state == PA_STREAM_READY &&
374 (force || !s->auto_timing_update_requested)) {
375 pa_operation *o;
377 /* pa_log("Automatically requesting new timing data"); */
379 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
380 pa_operation_unref(o);
381 s->auto_timing_update_requested = TRUE;
385 if (s->auto_timing_update_event) {
386 if (force)
387 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
389 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
391 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
395 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
396 pa_context *c = userdata;
397 pa_stream *s;
398 uint32_t channel;
400 pa_assert(pd);
401 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
402 pa_assert(t);
403 pa_assert(c);
404 pa_assert(PA_REFCNT_VALUE(c) >= 1);
406 pa_context_ref(c);
408 if (pa_tagstruct_getu32(t, &channel) < 0 ||
409 !pa_tagstruct_eof(t)) {
410 pa_context_fail(c, PA_ERR_PROTOCOL);
411 goto finish;
414 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
415 goto finish;
417 if (s->state != PA_STREAM_READY)
418 goto finish;
420 pa_context_set_error(c, PA_ERR_KILLED);
421 pa_stream_set_state(s, PA_STREAM_FAILED);
423 finish:
424 pa_context_unref(c);
427 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
428 pa_usec_t x;
430 pa_assert(s);
431 pa_assert(!force_start || !force_stop);
433 if (!s->smoother)
434 return;
436 x = pa_rtclock_now();
438 if (s->timing_info_valid) {
439 if (aposteriori)
440 x -= s->timing_info.transport_usec;
441 else
442 x += s->timing_info.transport_usec;
445 if (s->suspended || s->corked || force_stop)
446 pa_smoother_pause(s->smoother, x);
447 else if (force_start || s->buffer_attr.prebuf == 0) {
449 if (!s->timing_info_valid &&
450 !aposteriori &&
451 !force_start &&
452 !force_stop &&
453 s->context->version >= 13) {
455 /* If the server supports STARTED events we take them as
456 * indications when audio really starts/stops playing, if
457 * we don't have any timing info yet -- instead of trying
458 * to be smart and guessing the server time. Otherwise the
459 * unknown transport delay add too much noise to our time
460 * calculations. */
462 return;
465 pa_smoother_resume(s->smoother, x, TRUE);
468 /* Please note that we have no idea if playback actually started
469 * if prebuf is non-zero! */
472 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
473 pa_context *c = userdata;
474 pa_stream *s;
475 uint32_t channel;
476 const char *dn;
477 pa_bool_t suspended;
478 uint32_t di;
479 pa_usec_t usec = 0;
480 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
482 pa_assert(pd);
483 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
484 pa_assert(t);
485 pa_assert(c);
486 pa_assert(PA_REFCNT_VALUE(c) >= 1);
488 pa_context_ref(c);
490 if (c->version < 12) {
491 pa_context_fail(c, PA_ERR_PROTOCOL);
492 goto finish;
495 if (pa_tagstruct_getu32(t, &channel) < 0 ||
496 pa_tagstruct_getu32(t, &di) < 0 ||
497 pa_tagstruct_gets(t, &dn) < 0 ||
498 pa_tagstruct_get_boolean(t, &suspended) < 0) {
499 pa_context_fail(c, PA_ERR_PROTOCOL);
500 goto finish;
503 if (c->version >= 13) {
505 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
506 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
507 pa_tagstruct_getu32(t, &fragsize) < 0 ||
508 pa_tagstruct_get_usec(t, &usec) < 0) {
509 pa_context_fail(c, PA_ERR_PROTOCOL);
510 goto finish;
512 } else {
513 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
514 pa_tagstruct_getu32(t, &tlength) < 0 ||
515 pa_tagstruct_getu32(t, &prebuf) < 0 ||
516 pa_tagstruct_getu32(t, &minreq) < 0 ||
517 pa_tagstruct_get_usec(t, &usec) < 0) {
518 pa_context_fail(c, PA_ERR_PROTOCOL);
519 goto finish;
524 if (!pa_tagstruct_eof(t)) {
525 pa_context_fail(c, PA_ERR_PROTOCOL);
526 goto finish;
529 if (!dn || di == PA_INVALID_INDEX) {
530 pa_context_fail(c, PA_ERR_PROTOCOL);
531 goto finish;
534 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
535 goto finish;
537 if (s->state != PA_STREAM_READY)
538 goto finish;
540 if (c->version >= 13) {
541 if (s->direction == PA_STREAM_RECORD)
542 s->timing_info.configured_source_usec = usec;
543 else
544 s->timing_info.configured_sink_usec = usec;
546 s->buffer_attr.maxlength = maxlength;
547 s->buffer_attr.fragsize = fragsize;
548 s->buffer_attr.tlength = tlength;
549 s->buffer_attr.prebuf = prebuf;
550 s->buffer_attr.minreq = minreq;
553 pa_xfree(s->device_name);
554 s->device_name = pa_xstrdup(dn);
555 s->device_index = di;
557 s->suspended = suspended;
559 check_smoother_status(s, TRUE, FALSE, FALSE);
560 request_auto_timing_update(s, TRUE);
562 if (s->moved_callback)
563 s->moved_callback(s, s->moved_userdata);
565 finish:
566 pa_context_unref(c);
569 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
570 pa_context *c = userdata;
571 pa_stream *s;
572 uint32_t channel;
573 pa_usec_t usec = 0;
574 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
576 pa_assert(pd);
577 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
578 pa_assert(t);
579 pa_assert(c);
580 pa_assert(PA_REFCNT_VALUE(c) >= 1);
582 pa_context_ref(c);
584 if (c->version < 15) {
585 pa_context_fail(c, PA_ERR_PROTOCOL);
586 goto finish;
589 if (pa_tagstruct_getu32(t, &channel) < 0) {
590 pa_context_fail(c, PA_ERR_PROTOCOL);
591 goto finish;
594 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
595 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
596 pa_tagstruct_getu32(t, &fragsize) < 0 ||
597 pa_tagstruct_get_usec(t, &usec) < 0) {
598 pa_context_fail(c, PA_ERR_PROTOCOL);
599 goto finish;
601 } else {
602 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
603 pa_tagstruct_getu32(t, &tlength) < 0 ||
604 pa_tagstruct_getu32(t, &prebuf) < 0 ||
605 pa_tagstruct_getu32(t, &minreq) < 0 ||
606 pa_tagstruct_get_usec(t, &usec) < 0) {
607 pa_context_fail(c, PA_ERR_PROTOCOL);
608 goto finish;
612 if (!pa_tagstruct_eof(t)) {
613 pa_context_fail(c, PA_ERR_PROTOCOL);
614 goto finish;
617 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
618 goto finish;
620 if (s->state != PA_STREAM_READY)
621 goto finish;
623 if (s->direction == PA_STREAM_RECORD)
624 s->timing_info.configured_source_usec = usec;
625 else
626 s->timing_info.configured_sink_usec = usec;
628 s->buffer_attr.maxlength = maxlength;
629 s->buffer_attr.fragsize = fragsize;
630 s->buffer_attr.tlength = tlength;
631 s->buffer_attr.prebuf = prebuf;
632 s->buffer_attr.minreq = minreq;
634 request_auto_timing_update(s, TRUE);
636 if (s->buffer_attr_callback)
637 s->buffer_attr_callback(s, s->buffer_attr_userdata);
639 finish:
640 pa_context_unref(c);
643 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
644 pa_context *c = userdata;
645 pa_stream *s;
646 uint32_t channel;
647 pa_bool_t suspended;
649 pa_assert(pd);
650 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
651 pa_assert(t);
652 pa_assert(c);
653 pa_assert(PA_REFCNT_VALUE(c) >= 1);
655 pa_context_ref(c);
657 if (c->version < 12) {
658 pa_context_fail(c, PA_ERR_PROTOCOL);
659 goto finish;
662 if (pa_tagstruct_getu32(t, &channel) < 0 ||
663 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
664 !pa_tagstruct_eof(t)) {
665 pa_context_fail(c, PA_ERR_PROTOCOL);
666 goto finish;
669 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
670 goto finish;
672 if (s->state != PA_STREAM_READY)
673 goto finish;
675 s->suspended = suspended;
677 check_smoother_status(s, TRUE, FALSE, FALSE);
678 request_auto_timing_update(s, TRUE);
680 if (s->suspended_callback)
681 s->suspended_callback(s, s->suspended_userdata);
683 finish:
684 pa_context_unref(c);
687 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
688 pa_context *c = userdata;
689 pa_stream *s;
690 uint32_t channel;
692 pa_assert(pd);
693 pa_assert(command == PA_COMMAND_STARTED);
694 pa_assert(t);
695 pa_assert(c);
696 pa_assert(PA_REFCNT_VALUE(c) >= 1);
698 pa_context_ref(c);
700 if (c->version < 13) {
701 pa_context_fail(c, PA_ERR_PROTOCOL);
702 goto finish;
705 if (pa_tagstruct_getu32(t, &channel) < 0 ||
706 !pa_tagstruct_eof(t)) {
707 pa_context_fail(c, PA_ERR_PROTOCOL);
708 goto finish;
711 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
712 goto finish;
714 if (s->state != PA_STREAM_READY)
715 goto finish;
717 check_smoother_status(s, TRUE, TRUE, FALSE);
718 request_auto_timing_update(s, TRUE);
720 if (s->started_callback)
721 s->started_callback(s, s->started_userdata);
723 finish:
724 pa_context_unref(c);
727 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
728 pa_context *c = userdata;
729 pa_stream *s;
730 uint32_t channel;
731 pa_proplist *pl = NULL;
732 const char *event;
734 pa_assert(pd);
735 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
736 pa_assert(t);
737 pa_assert(c);
738 pa_assert(PA_REFCNT_VALUE(c) >= 1);
740 pa_context_ref(c);
742 if (c->version < 15) {
743 pa_context_fail(c, PA_ERR_PROTOCOL);
744 goto finish;
747 pl = pa_proplist_new();
749 if (pa_tagstruct_getu32(t, &channel) < 0 ||
750 pa_tagstruct_gets(t, &event) < 0 ||
751 pa_tagstruct_get_proplist(t, pl) < 0 ||
752 !pa_tagstruct_eof(t) || !event) {
753 pa_context_fail(c, PA_ERR_PROTOCOL);
754 goto finish;
757 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
758 goto finish;
760 if (s->state != PA_STREAM_READY)
761 goto finish;
763 if (s->event_callback)
764 s->event_callback(s, event, pl, s->event_userdata);
766 finish:
767 pa_context_unref(c);
769 if (pl)
770 pa_proplist_free(pl);
773 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
774 pa_stream *s;
775 pa_context *c = userdata;
776 uint32_t bytes, channel;
778 pa_assert(pd);
779 pa_assert(command == PA_COMMAND_REQUEST);
780 pa_assert(t);
781 pa_assert(c);
782 pa_assert(PA_REFCNT_VALUE(c) >= 1);
784 pa_context_ref(c);
786 if (pa_tagstruct_getu32(t, &channel) < 0 ||
787 pa_tagstruct_getu32(t, &bytes) < 0 ||
788 !pa_tagstruct_eof(t)) {
789 pa_context_fail(c, PA_ERR_PROTOCOL);
790 goto finish;
793 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
794 goto finish;
796 if (s->state != PA_STREAM_READY)
797 goto finish;
799 s->requested_bytes += bytes;
801 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
803 if (s->requested_bytes > 0 && s->write_callback)
804 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
806 finish:
807 pa_context_unref(c);
810 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
811 pa_stream *s;
812 pa_context *c = userdata;
813 uint32_t channel;
815 pa_assert(pd);
816 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
817 pa_assert(t);
818 pa_assert(c);
819 pa_assert(PA_REFCNT_VALUE(c) >= 1);
821 pa_context_ref(c);
823 if (pa_tagstruct_getu32(t, &channel) < 0 ||
824 !pa_tagstruct_eof(t)) {
825 pa_context_fail(c, PA_ERR_PROTOCOL);
826 goto finish;
829 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
830 goto finish;
832 if (s->state != PA_STREAM_READY)
833 goto finish;
835 if (s->buffer_attr.prebuf > 0)
836 check_smoother_status(s, TRUE, FALSE, TRUE);
838 request_auto_timing_update(s, TRUE);
840 if (command == PA_COMMAND_OVERFLOW) {
841 if (s->overflow_callback)
842 s->overflow_callback(s, s->overflow_userdata);
843 } else if (command == PA_COMMAND_UNDERFLOW) {
844 if (s->underflow_callback)
845 s->underflow_callback(s, s->underflow_userdata);
848 finish:
849 pa_context_unref(c);
852 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
853 pa_assert(s);
854 pa_assert(PA_REFCNT_VALUE(s) >= 1);
856 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
858 if (s->state != PA_STREAM_READY)
859 return;
861 if (w) {
862 s->write_index_not_before = s->context->ctag;
864 if (s->timing_info_valid)
865 s->timing_info.write_index_corrupt = TRUE;
867 /* pa_log("write_index invalidated"); */
870 if (r) {
871 s->read_index_not_before = s->context->ctag;
873 if (s->timing_info_valid)
874 s->timing_info.read_index_corrupt = TRUE;
876 /* pa_log("read_index invalidated"); */
879 request_auto_timing_update(s, TRUE);
882 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
883 pa_stream *s = userdata;
885 pa_assert(s);
886 pa_assert(PA_REFCNT_VALUE(s) >= 1);
888 pa_stream_ref(s);
889 request_auto_timing_update(s, FALSE);
890 pa_stream_unref(s);
893 static void create_stream_complete(pa_stream *s) {
894 pa_assert(s);
895 pa_assert(PA_REFCNT_VALUE(s) >= 1);
896 pa_assert(s->state == PA_STREAM_CREATING);
898 pa_stream_set_state(s, PA_STREAM_READY);
900 if (s->requested_bytes > 0 && s->write_callback)
901 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
903 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
904 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
905 pa_assert(!s->auto_timing_update_event);
906 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
908 request_auto_timing_update(s, TRUE);
911 check_smoother_status(s, TRUE, FALSE, FALSE);
914 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
915 const char *e;
917 pa_assert(s);
918 pa_assert(attr);
920 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
921 uint32_t ms;
923 if (pa_atou(e, &ms) < 0 || ms <= 0)
924 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
925 else {
926 attr->maxlength = (uint32_t) -1;
927 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
928 attr->minreq = (uint32_t) -1;
929 attr->prebuf = (uint32_t) -1;
930 attr->fragsize = attr->tlength;
933 if (flags)
934 *flags |= PA_STREAM_ADJUST_LATENCY;
937 if (s->context->version >= 13)
938 return;
940 /* Version older than 0.9.10 didn't do server side buffer_attr
941 * selection, hence we have to fake it on the client side. */
943 /* We choose fairly conservative values here, to not confuse
944 * old clients with extremely large playback buffers */
946 if (attr->maxlength == (uint32_t) -1)
947 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
949 if (attr->tlength == (uint32_t) -1)
950 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
952 if (attr->minreq == (uint32_t) -1)
953 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
955 if (attr->prebuf == (uint32_t) -1)
956 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
958 if (attr->fragsize == (uint32_t) -1)
959 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
962 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
963 pa_stream *s = userdata;
964 uint32_t requested_bytes = 0;
966 pa_assert(pd);
967 pa_assert(s);
968 pa_assert(PA_REFCNT_VALUE(s) >= 1);
969 pa_assert(s->state == PA_STREAM_CREATING);
971 pa_stream_ref(s);
973 if (command != PA_COMMAND_REPLY) {
974 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
975 goto finish;
977 pa_stream_set_state(s, PA_STREAM_FAILED);
978 goto finish;
981 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
982 s->channel == PA_INVALID_INDEX ||
983 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
984 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
985 pa_context_fail(s->context, PA_ERR_PROTOCOL);
986 goto finish;
989 s->requested_bytes = (int64_t) requested_bytes;
991 if (s->context->version >= 9) {
992 if (s->direction == PA_STREAM_PLAYBACK) {
993 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
994 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
995 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
996 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
997 pa_context_fail(s->context, PA_ERR_PROTOCOL);
998 goto finish;
1000 } else if (s->direction == PA_STREAM_RECORD) {
1001 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1002 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1003 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1004 goto finish;
1009 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1010 pa_sample_spec ss;
1011 pa_channel_map cm;
1012 const char *dn = NULL;
1013 pa_bool_t suspended;
1015 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1016 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1017 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1018 pa_tagstruct_gets(t, &dn) < 0 ||
1019 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1020 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1021 goto finish;
1024 if (!dn || s->device_index == PA_INVALID_INDEX ||
1025 ss.channels != cm.channels ||
1026 !pa_channel_map_valid(&cm) ||
1027 !pa_sample_spec_valid(&ss) ||
1028 (s->n_formats == 0 && (
1029 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1030 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1031 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1032 /* XXX: checks for the n_formats > 0 case? */
1033 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1034 goto finish;
1037 pa_xfree(s->device_name);
1038 s->device_name = pa_xstrdup(dn);
1039 s->suspended = suspended;
1041 s->channel_map = cm;
1042 s->sample_spec = ss;
1045 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1046 pa_usec_t usec;
1048 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1049 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1050 goto finish;
1053 if (s->direction == PA_STREAM_RECORD)
1054 s->timing_info.configured_source_usec = usec;
1055 else
1056 s->timing_info.configured_sink_usec = usec;
1059 if (s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) {
1060 pa_format_info *f = pa_format_info_new();
1061 pa_tagstruct_get_format_info(t, f);
1063 if (pa_format_info_valid(f))
1064 s->format = f;
1065 else
1066 pa_format_info_free(f);
1069 if (!pa_tagstruct_eof(t)) {
1070 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1071 goto finish;
1074 if (s->direction == PA_STREAM_RECORD) {
1075 pa_assert(!s->record_memblockq);
1077 s->record_memblockq = pa_memblockq_new(
1079 s->buffer_attr.maxlength,
1081 pa_frame_size(&s->sample_spec),
1085 NULL);
1088 s->channel_valid = TRUE;
1089 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1091 create_stream_complete(s);
1093 finish:
1094 pa_stream_unref(s);
1097 static int create_stream(
1098 pa_stream_direction_t direction,
1099 pa_stream *s,
1100 const char *dev,
1101 const pa_buffer_attr *attr,
1102 pa_stream_flags_t flags,
1103 const pa_cvolume *volume,
1104 pa_stream *sync_stream) {
1106 pa_tagstruct *t;
1107 uint32_t tag;
1108 pa_bool_t volume_set = FALSE;
1109 uint32_t i;
1111 pa_assert(s);
1112 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1113 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1115 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1116 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1117 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1118 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1119 PA_STREAM_INTERPOLATE_TIMING|
1120 PA_STREAM_NOT_MONOTONIC|
1121 PA_STREAM_AUTO_TIMING_UPDATE|
1122 PA_STREAM_NO_REMAP_CHANNELS|
1123 PA_STREAM_NO_REMIX_CHANNELS|
1124 PA_STREAM_FIX_FORMAT|
1125 PA_STREAM_FIX_RATE|
1126 PA_STREAM_FIX_CHANNELS|
1127 PA_STREAM_DONT_MOVE|
1128 PA_STREAM_VARIABLE_RATE|
1129 PA_STREAM_PEAK_DETECT|
1130 PA_STREAM_START_MUTED|
1131 PA_STREAM_ADJUST_LATENCY|
1132 PA_STREAM_EARLY_REQUESTS|
1133 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1134 PA_STREAM_START_UNMUTED|
1135 PA_STREAM_FAIL_ON_SUSPEND|
1136 PA_STREAM_RELATIVE_VOLUME|
1137 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1140 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1141 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1142 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1143 /* Althought some of the other flags are not supported on older
1144 * version, we don't check for them here, because it doesn't hurt
1145 * when they are passed but actually not supported. This makes
1146 * client development easier */
1148 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1149 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1150 PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1151 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1152 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1154 pa_stream_ref(s);
1156 s->direction = direction;
1158 if (sync_stream)
1159 s->syncid = sync_stream->syncid;
1161 if (attr)
1162 s->buffer_attr = *attr;
1163 patch_buffer_attr(s, &s->buffer_attr, &flags);
1165 s->flags = flags;
1166 s->corked = !!(flags & PA_STREAM_START_CORKED);
1168 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1169 pa_usec_t x;
1171 x = pa_rtclock_now();
1173 pa_assert(!s->smoother);
1174 s->smoother = pa_smoother_new(
1175 SMOOTHER_ADJUST_TIME,
1176 SMOOTHER_HISTORY_TIME,
1177 !(flags & PA_STREAM_NOT_MONOTONIC),
1178 TRUE,
1179 SMOOTHER_MIN_HISTORY,
1181 TRUE);
1184 if (!dev)
1185 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1187 t = pa_tagstruct_command(
1188 s->context,
1189 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1190 &tag);
1192 if (s->context->version < 13)
1193 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1195 pa_tagstruct_put(
1197 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1198 PA_TAG_CHANNEL_MAP, &s->channel_map,
1199 PA_TAG_U32, PA_INVALID_INDEX,
1200 PA_TAG_STRING, dev,
1201 PA_TAG_U32, s->buffer_attr.maxlength,
1202 PA_TAG_BOOLEAN, s->corked,
1203 PA_TAG_INVALID);
1205 if (s->direction == PA_STREAM_PLAYBACK) {
1206 pa_cvolume cv;
1208 pa_tagstruct_put(
1210 PA_TAG_U32, s->buffer_attr.tlength,
1211 PA_TAG_U32, s->buffer_attr.prebuf,
1212 PA_TAG_U32, s->buffer_attr.minreq,
1213 PA_TAG_U32, s->syncid,
1214 PA_TAG_INVALID);
1216 volume_set = !!volume;
1218 if (!volume) {
1219 if (pa_sample_spec_valid(&s->sample_spec))
1220 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1221 else {
1222 /* This is not really relevant, since no volume was set, and
1223 * the real number of channels is embedded in the format_info
1224 * structure */
1225 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1229 pa_tagstruct_put_cvolume(t, volume);
1230 } else
1231 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1233 if (s->context->version >= 12) {
1234 pa_tagstruct_put(
1236 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1237 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1238 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1239 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1240 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1241 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1242 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1243 PA_TAG_INVALID);
1246 if (s->context->version >= 13) {
1248 if (s->direction == PA_STREAM_PLAYBACK)
1249 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1250 else
1251 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1253 pa_tagstruct_put(
1255 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1256 PA_TAG_PROPLIST, s->proplist,
1257 PA_TAG_INVALID);
1259 if (s->direction == PA_STREAM_RECORD)
1260 pa_tagstruct_putu32(t, s->direct_on_input);
1263 if (s->context->version >= 14) {
1265 if (s->direction == PA_STREAM_PLAYBACK)
1266 pa_tagstruct_put_boolean(t, volume_set);
1268 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1271 if (s->context->version >= 15) {
1273 if (s->direction == PA_STREAM_PLAYBACK)
1274 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1276 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1277 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1280 if (s->context->version >= 17) {
1282 if (s->direction == PA_STREAM_PLAYBACK)
1283 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1287 if (s->context->version >= 18) {
1289 if (s->direction == PA_STREAM_PLAYBACK)
1290 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1293 if (s->context->version >= 21) {
1295 if (s->direction == PA_STREAM_PLAYBACK) {
1296 pa_tagstruct_putu8(t, s->n_formats);
1297 for (i = 0; i < s->n_formats; i++)
1298 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1302 pa_pstream_send_tagstruct(s->context->pstream, t);
1303 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1305 pa_stream_set_state(s, PA_STREAM_CREATING);
1307 pa_stream_unref(s);
1308 return 0;
1311 int pa_stream_connect_playback(
1312 pa_stream *s,
1313 const char *dev,
1314 const pa_buffer_attr *attr,
1315 pa_stream_flags_t flags,
1316 const pa_cvolume *volume,
1317 pa_stream *sync_stream) {
1319 pa_assert(s);
1320 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1322 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1325 int pa_stream_connect_record(
1326 pa_stream *s,
1327 const char *dev,
1328 const pa_buffer_attr *attr,
1329 pa_stream_flags_t flags) {
1331 pa_assert(s);
1332 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1334 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1337 int pa_stream_begin_write(
1338 pa_stream *s,
1339 void **data,
1340 size_t *nbytes) {
1342 pa_assert(s);
1343 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1345 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1346 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1347 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1348 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1349 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1351 if (*nbytes != (size_t) -1) {
1352 size_t m, fs;
1354 m = pa_mempool_block_size_max(s->context->mempool);
1355 fs = pa_frame_size(&s->sample_spec);
1357 m = (m / fs) * fs;
1358 if (*nbytes > m)
1359 *nbytes = m;
1362 if (!s->write_memblock) {
1363 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1364 s->write_data = pa_memblock_acquire(s->write_memblock);
1367 *data = s->write_data;
1368 *nbytes = pa_memblock_get_length(s->write_memblock);
1370 return 0;
1373 int pa_stream_cancel_write(
1374 pa_stream *s) {
1376 pa_assert(s);
1377 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1379 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1380 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1381 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1382 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1384 pa_assert(s->write_data);
1386 pa_memblock_release(s->write_memblock);
1387 pa_memblock_unref(s->write_memblock);
1388 s->write_memblock = NULL;
1389 s->write_data = NULL;
1391 return 0;
1394 int pa_stream_write(
1395 pa_stream *s,
1396 const void *data,
1397 size_t length,
1398 pa_free_cb_t free_cb,
1399 int64_t offset,
1400 pa_seek_mode_t seek) {
1402 pa_assert(s);
1403 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1404 pa_assert(data);
1406 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1407 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1408 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1409 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1410 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1411 PA_CHECK_VALIDITY(s->context,
1412 !s->write_memblock ||
1413 ((data >= s->write_data) &&
1414 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1415 PA_ERR_INVALID);
1416 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1418 if (s->write_memblock) {
1419 pa_memchunk chunk;
1421 /* pa_stream_write_begin() was called before */
1423 pa_memblock_release(s->write_memblock);
1425 chunk.memblock = s->write_memblock;
1426 chunk.index = (const char *) data - (const char *) s->write_data;
1427 chunk.length = length;
1429 s->write_memblock = NULL;
1430 s->write_data = NULL;
1432 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1433 pa_memblock_unref(chunk.memblock);
1435 } else {
1436 pa_seek_mode_t t_seek = seek;
1437 int64_t t_offset = offset;
1438 size_t t_length = length;
1439 const void *t_data = data;
1441 /* pa_stream_write_begin() was not called before */
1443 while (t_length > 0) {
1444 pa_memchunk chunk;
1446 chunk.index = 0;
1448 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1449 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1450 chunk.length = t_length;
1451 } else {
1452 void *d;
1454 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1455 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1457 d = pa_memblock_acquire(chunk.memblock);
1458 memcpy(d, t_data, chunk.length);
1459 pa_memblock_release(chunk.memblock);
1462 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1464 t_offset = 0;
1465 t_seek = PA_SEEK_RELATIVE;
1467 t_data = (const uint8_t*) t_data + chunk.length;
1468 t_length -= chunk.length;
1470 pa_memblock_unref(chunk.memblock);
1473 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1474 free_cb((void*) data);
1477 /* This is obviously wrong since we ignore the seeking index . But
1478 * that's OK, the server side applies the same error */
1479 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1481 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1483 if (s->direction == PA_STREAM_PLAYBACK) {
1485 /* Update latency request correction */
1486 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1488 if (seek == PA_SEEK_ABSOLUTE) {
1489 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1490 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1491 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1492 } else if (seek == PA_SEEK_RELATIVE) {
1493 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1494 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1495 } else
1496 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1499 /* Update the write index in the already available latency data */
1500 if (s->timing_info_valid) {
1502 if (seek == PA_SEEK_ABSOLUTE) {
1503 s->timing_info.write_index_corrupt = FALSE;
1504 s->timing_info.write_index = offset + (int64_t) length;
1505 } else if (seek == PA_SEEK_RELATIVE) {
1506 if (!s->timing_info.write_index_corrupt)
1507 s->timing_info.write_index += offset + (int64_t) length;
1508 } else
1509 s->timing_info.write_index_corrupt = TRUE;
1512 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1513 request_auto_timing_update(s, TRUE);
1516 return 0;
1519 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1520 pa_assert(s);
1521 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1522 pa_assert(data);
1523 pa_assert(length);
1525 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1526 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1527 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1529 if (!s->peek_memchunk.memblock) {
1531 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1532 *data = NULL;
1533 *length = 0;
1534 return 0;
1537 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1540 pa_assert(s->peek_data);
1541 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1542 *length = s->peek_memchunk.length;
1543 return 0;
1546 int pa_stream_drop(pa_stream *s) {
1547 pa_assert(s);
1548 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1550 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1551 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1552 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1553 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1555 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1557 /* Fix the simulated local read index */
1558 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1559 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1561 pa_assert(s->peek_data);
1562 pa_memblock_release(s->peek_memchunk.memblock);
1563 pa_memblock_unref(s->peek_memchunk.memblock);
1564 pa_memchunk_reset(&s->peek_memchunk);
1566 return 0;
1569 size_t pa_stream_writable_size(pa_stream *s) {
1570 pa_assert(s);
1571 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1573 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1574 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1575 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1577 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1580 size_t pa_stream_readable_size(pa_stream *s) {
1581 pa_assert(s);
1582 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1584 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1585 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1586 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1588 return pa_memblockq_get_length(s->record_memblockq);
1591 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1592 pa_operation *o;
1593 pa_tagstruct *t;
1594 uint32_t tag;
1596 pa_assert(s);
1597 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1599 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1600 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1601 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1603 /* Ask for a timing update before we cork/uncork to get the best
1604 * accuracy for the transport latency suitable for the
1605 * check_smoother_status() call in the started callback */
1606 request_auto_timing_update(s, TRUE);
1608 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1610 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1611 pa_tagstruct_putu32(t, s->channel);
1612 pa_pstream_send_tagstruct(s->context->pstream, t);
1613 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1615 /* This might cause the read index to conitnue again, hence
1616 * let's request a timing update */
1617 request_auto_timing_update(s, TRUE);
1619 return o;
1622 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1623 pa_usec_t usec;
1625 pa_assert(s);
1626 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1627 pa_assert(s->state == PA_STREAM_READY);
1628 pa_assert(s->direction != PA_STREAM_UPLOAD);
1629 pa_assert(s->timing_info_valid);
1630 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1631 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1633 if (s->direction == PA_STREAM_PLAYBACK) {
1634 /* The last byte that was written into the output device
1635 * had this time value associated */
1636 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1638 if (!s->corked && !s->suspended) {
1640 if (!ignore_transport)
1641 /* Because the latency info took a little time to come
1642 * to us, we assume that the real output time is actually
1643 * a little ahead */
1644 usec += s->timing_info.transport_usec;
1646 /* However, the output device usually maintains a buffer
1647 too, hence the real sample currently played is a little
1648 back */
1649 if (s->timing_info.sink_usec >= usec)
1650 usec = 0;
1651 else
1652 usec -= s->timing_info.sink_usec;
1655 } else {
1656 pa_assert(s->direction == PA_STREAM_RECORD);
1658 /* The last byte written into the server side queue had
1659 * this time value associated */
1660 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1662 if (!s->corked && !s->suspended) {
1664 if (!ignore_transport)
1665 /* Add transport latency */
1666 usec += s->timing_info.transport_usec;
1668 /* Add latency of data in device buffer */
1669 usec += s->timing_info.source_usec;
1671 /* If this is a monitor source, we need to correct the
1672 * time by the playback device buffer */
1673 if (s->timing_info.sink_usec >= usec)
1674 usec = 0;
1675 else
1676 usec -= s->timing_info.sink_usec;
1680 return usec;
1683 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1684 pa_operation *o = userdata;
1685 struct timeval local, remote, now;
1686 pa_timing_info *i;
1687 pa_bool_t playing = FALSE;
1688 uint64_t underrun_for = 0, playing_for = 0;
1690 pa_assert(pd);
1691 pa_assert(o);
1692 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1694 if (!o->context || !o->stream)
1695 goto finish;
1697 i = &o->stream->timing_info;
1699 o->stream->timing_info_valid = FALSE;
1700 i->write_index_corrupt = TRUE;
1701 i->read_index_corrupt = TRUE;
1703 if (command != PA_COMMAND_REPLY) {
1704 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1705 goto finish;
1707 } else {
1709 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1710 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1711 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1712 pa_tagstruct_get_timeval(t, &local) < 0 ||
1713 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1714 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1715 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1717 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1718 goto finish;
1721 if (o->context->version >= 13 &&
1722 o->stream->direction == PA_STREAM_PLAYBACK)
1723 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1724 pa_tagstruct_getu64(t, &playing_for) < 0) {
1726 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1727 goto finish;
1731 if (!pa_tagstruct_eof(t)) {
1732 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1733 goto finish;
1735 o->stream->timing_info_valid = TRUE;
1736 i->write_index_corrupt = FALSE;
1737 i->read_index_corrupt = FALSE;
1739 i->playing = (int) playing;
1740 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1742 pa_gettimeofday(&now);
1744 /* Calculcate timestamps */
1745 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1746 /* local and remote seem to have synchronized clocks */
1748 if (o->stream->direction == PA_STREAM_PLAYBACK)
1749 i->transport_usec = pa_timeval_diff(&remote, &local);
1750 else
1751 i->transport_usec = pa_timeval_diff(&now, &remote);
1753 i->synchronized_clocks = TRUE;
1754 i->timestamp = remote;
1755 } else {
1756 /* clocks are not synchronized, let's estimate latency then */
1757 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1758 i->synchronized_clocks = FALSE;
1759 i->timestamp = local;
1760 pa_timeval_add(&i->timestamp, i->transport_usec);
1763 /* Invalidate read and write indexes if necessary */
1764 if (tag < o->stream->read_index_not_before)
1765 i->read_index_corrupt = TRUE;
1767 if (tag < o->stream->write_index_not_before)
1768 i->write_index_corrupt = TRUE;
1770 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1771 /* Write index correction */
1773 int n, j;
1774 uint32_t ctag = tag;
1776 /* Go through the saved correction values and add up the
1777 * total correction.*/
1778 for (n = 0, j = o->stream->current_write_index_correction+1;
1779 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1780 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1782 /* Step over invalid data or out-of-date data */
1783 if (!o->stream->write_index_corrections[j].valid ||
1784 o->stream->write_index_corrections[j].tag < ctag)
1785 continue;
1787 /* Make sure that everything is in order */
1788 ctag = o->stream->write_index_corrections[j].tag+1;
1790 /* Now fix the write index */
1791 if (o->stream->write_index_corrections[j].corrupt) {
1792 /* A corrupting seek was made */
1793 i->write_index_corrupt = TRUE;
1794 } else if (o->stream->write_index_corrections[j].absolute) {
1795 /* An absolute seek was made */
1796 i->write_index = o->stream->write_index_corrections[j].value;
1797 i->write_index_corrupt = FALSE;
1798 } else if (!i->write_index_corrupt) {
1799 /* A relative seek was made */
1800 i->write_index += o->stream->write_index_corrections[j].value;
1804 /* Clear old correction entries */
1805 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1806 if (!o->stream->write_index_corrections[n].valid)
1807 continue;
1809 if (o->stream->write_index_corrections[n].tag <= tag)
1810 o->stream->write_index_corrections[n].valid = FALSE;
1814 if (o->stream->direction == PA_STREAM_RECORD) {
1815 /* Read index correction */
1817 if (!i->read_index_corrupt)
1818 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1821 /* Update smoother if we're not corked */
1822 if (o->stream->smoother && !o->stream->corked) {
1823 pa_usec_t u, x;
1825 u = x = pa_rtclock_now() - i->transport_usec;
1827 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1828 pa_usec_t su;
1830 /* If we weren't playing then it will take some time
1831 * until the audio will actually come out through the
1832 * speakers. Since we follow that timing here, we need
1833 * to try to fix this up */
1835 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1837 if (su < i->sink_usec)
1838 x += i->sink_usec - su;
1841 if (!i->playing)
1842 pa_smoother_pause(o->stream->smoother, x);
1844 /* Update the smoother */
1845 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1846 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1847 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1849 if (i->playing)
1850 pa_smoother_resume(o->stream->smoother, x, TRUE);
1854 o->stream->auto_timing_update_requested = FALSE;
1856 if (o->stream->latency_update_callback)
1857 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1859 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1860 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1861 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1864 finish:
1866 pa_operation_done(o);
1867 pa_operation_unref(o);
1870 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1871 uint32_t tag;
1872 pa_operation *o;
1873 pa_tagstruct *t;
1874 struct timeval now;
1875 int cidx = 0;
1877 pa_assert(s);
1878 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1880 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1881 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1882 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1884 if (s->direction == PA_STREAM_PLAYBACK) {
1885 /* Find a place to store the write_index correction data for this entry */
1886 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1888 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1889 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1891 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1893 t = pa_tagstruct_command(
1894 s->context,
1895 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1896 &tag);
1897 pa_tagstruct_putu32(t, s->channel);
1898 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1900 pa_pstream_send_tagstruct(s->context->pstream, t);
1901 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1903 if (s->direction == PA_STREAM_PLAYBACK) {
1904 /* Fill in initial correction data */
1906 s->current_write_index_correction = cidx;
1908 s->write_index_corrections[cidx].valid = TRUE;
1909 s->write_index_corrections[cidx].absolute = FALSE;
1910 s->write_index_corrections[cidx].corrupt = FALSE;
1911 s->write_index_corrections[cidx].tag = tag;
1912 s->write_index_corrections[cidx].value = 0;
1915 return o;
1918 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1919 pa_stream *s = userdata;
1921 pa_assert(pd);
1922 pa_assert(s);
1923 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1925 pa_stream_ref(s);
1927 if (command != PA_COMMAND_REPLY) {
1928 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1929 goto finish;
1931 pa_stream_set_state(s, PA_STREAM_FAILED);
1932 goto finish;
1933 } else if (!pa_tagstruct_eof(t)) {
1934 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1935 goto finish;
1938 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1940 finish:
1941 pa_stream_unref(s);
1944 int pa_stream_disconnect(pa_stream *s) {
1945 pa_tagstruct *t;
1946 uint32_t tag;
1948 pa_assert(s);
1949 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1951 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1952 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1953 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1955 pa_stream_ref(s);
1957 t = pa_tagstruct_command(
1958 s->context,
1959 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1960 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1961 &tag);
1962 pa_tagstruct_putu32(t, s->channel);
1963 pa_pstream_send_tagstruct(s->context->pstream, t);
1964 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1966 pa_stream_unref(s);
1967 return 0;
1970 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1971 pa_assert(s);
1972 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1974 if (pa_detect_fork())
1975 return;
1977 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1978 return;
1980 s->read_callback = cb;
1981 s->read_userdata = userdata;
1984 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1985 pa_assert(s);
1986 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1988 if (pa_detect_fork())
1989 return;
1991 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1992 return;
1994 s->write_callback = cb;
1995 s->write_userdata = userdata;
1998 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1999 pa_assert(s);
2000 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2002 if (pa_detect_fork())
2003 return;
2005 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2006 return;
2008 s->state_callback = cb;
2009 s->state_userdata = userdata;
2012 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2013 pa_assert(s);
2014 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2016 if (pa_detect_fork())
2017 return;
2019 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2020 return;
2022 s->overflow_callback = cb;
2023 s->overflow_userdata = userdata;
2026 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2027 pa_assert(s);
2028 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2030 if (pa_detect_fork())
2031 return;
2033 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2034 return;
2036 s->underflow_callback = cb;
2037 s->underflow_userdata = userdata;
2040 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2041 pa_assert(s);
2042 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2044 if (pa_detect_fork())
2045 return;
2047 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2048 return;
2050 s->latency_update_callback = cb;
2051 s->latency_update_userdata = userdata;
2054 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2055 pa_assert(s);
2056 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2058 if (pa_detect_fork())
2059 return;
2061 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2062 return;
2064 s->moved_callback = cb;
2065 s->moved_userdata = userdata;
2068 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2069 pa_assert(s);
2070 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2072 if (pa_detect_fork())
2073 return;
2075 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2076 return;
2078 s->suspended_callback = cb;
2079 s->suspended_userdata = userdata;
2082 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2083 pa_assert(s);
2084 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2086 if (pa_detect_fork())
2087 return;
2089 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2090 return;
2092 s->started_callback = cb;
2093 s->started_userdata = userdata;
2096 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2097 pa_assert(s);
2098 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2100 if (pa_detect_fork())
2101 return;
2103 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2104 return;
2106 s->event_callback = cb;
2107 s->event_userdata = userdata;
2110 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2111 pa_assert(s);
2112 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2114 if (pa_detect_fork())
2115 return;
2117 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2118 return;
2120 s->buffer_attr_callback = cb;
2121 s->buffer_attr_userdata = userdata;
2124 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2125 pa_operation *o = userdata;
2126 int success = 1;
2128 pa_assert(pd);
2129 pa_assert(o);
2130 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2132 if (!o->context)
2133 goto finish;
2135 if (command != PA_COMMAND_REPLY) {
2136 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2137 goto finish;
2139 success = 0;
2140 } else if (!pa_tagstruct_eof(t)) {
2141 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2142 goto finish;
2145 if (o->callback) {
2146 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2147 cb(o->stream, success, o->userdata);
2150 finish:
2151 pa_operation_done(o);
2152 pa_operation_unref(o);
2155 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2156 pa_operation *o;
2157 pa_tagstruct *t;
2158 uint32_t tag;
2160 pa_assert(s);
2161 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2163 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2164 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2165 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2167 /* Ask for a timing update before we cork/uncork to get the best
2168 * accuracy for the transport latency suitable for the
2169 * check_smoother_status() call in the started callback */
2170 request_auto_timing_update(s, TRUE);
2172 s->corked = b;
2174 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2176 t = pa_tagstruct_command(
2177 s->context,
2178 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2179 &tag);
2180 pa_tagstruct_putu32(t, s->channel);
2181 pa_tagstruct_put_boolean(t, !!b);
2182 pa_pstream_send_tagstruct(s->context->pstream, t);
2183 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2185 check_smoother_status(s, FALSE, FALSE, FALSE);
2187 /* This might cause the indexes to hang/start again, hence let's
2188 * request a timing update, after the cork/uncork, too */
2189 request_auto_timing_update(s, TRUE);
2191 return o;
2194 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2195 pa_tagstruct *t;
2196 pa_operation *o;
2197 uint32_t tag;
2199 pa_assert(s);
2200 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2202 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2203 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2205 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2207 t = pa_tagstruct_command(s->context, command, &tag);
2208 pa_tagstruct_putu32(t, s->channel);
2209 pa_pstream_send_tagstruct(s->context->pstream, t);
2210 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2212 return o;
2215 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2216 pa_operation *o;
2218 pa_assert(s);
2219 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2221 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2222 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2223 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2225 /* Ask for a timing update *before* the flush, so that the
2226 * transport usec is as up to date as possible when we get the
2227 * underflow message and update the smoother status*/
2228 request_auto_timing_update(s, TRUE);
2230 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2231 return NULL;
2233 if (s->direction == PA_STREAM_PLAYBACK) {
2235 if (s->write_index_corrections[s->current_write_index_correction].valid)
2236 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2238 if (s->buffer_attr.prebuf > 0)
2239 check_smoother_status(s, FALSE, FALSE, TRUE);
2241 /* This will change the write index, but leave the
2242 * read index untouched. */
2243 invalidate_indexes(s, FALSE, TRUE);
2245 } else
2246 /* For record streams this has no influence on the write
2247 * index, but the read index might jump. */
2248 invalidate_indexes(s, TRUE, FALSE);
2250 /* Note that we do not update requested_bytes here. This is
2251 * because we cannot really know how data actually was dropped
2252 * from the write index due to this. This 'error' will be applied
2253 * by both client and server and hence we should be fine. */
2255 return o;
2258 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2259 pa_operation *o;
2261 pa_assert(s);
2262 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2264 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2265 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2266 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2267 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2269 /* Ask for a timing update before we cork/uncork to get the best
2270 * accuracy for the transport latency suitable for the
2271 * check_smoother_status() call in the started callback */
2272 request_auto_timing_update(s, TRUE);
2274 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2275 return NULL;
2277 /* This might cause the read index to hang again, hence
2278 * let's request a timing update */
2279 request_auto_timing_update(s, TRUE);
2281 return o;
2284 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2285 pa_operation *o;
2287 pa_assert(s);
2288 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2290 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2291 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2292 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2293 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2295 /* Ask for a timing update before we cork/uncork to get the best
2296 * accuracy for the transport latency suitable for the
2297 * check_smoother_status() call in the started callback */
2298 request_auto_timing_update(s, TRUE);
2300 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2301 return NULL;
2303 /* This might cause the read index to start moving again, hence
2304 * let's request a timing update */
2305 request_auto_timing_update(s, TRUE);
2307 return o;
2310 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2311 pa_operation *o;
2313 pa_assert(s);
2314 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2315 pa_assert(name);
2317 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2318 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2319 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2321 if (s->context->version >= 13) {
2322 pa_proplist *p = pa_proplist_new();
2324 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2325 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2326 pa_proplist_free(p);
2327 } else {
2328 pa_tagstruct *t;
2329 uint32_t tag;
2331 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2332 t = pa_tagstruct_command(
2333 s->context,
2334 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2335 &tag);
2336 pa_tagstruct_putu32(t, s->channel);
2337 pa_tagstruct_puts(t, name);
2338 pa_pstream_send_tagstruct(s->context->pstream, t);
2339 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2342 return o;
2345 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2346 pa_usec_t usec;
2348 pa_assert(s);
2349 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2351 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2352 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2353 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2354 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2355 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2356 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2358 if (s->smoother)
2359 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2360 else
2361 usec = calc_time(s, FALSE);
2363 /* Make sure the time runs monotonically */
2364 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2365 if (usec < s->previous_time)
2366 usec = s->previous_time;
2367 else
2368 s->previous_time = usec;
2371 if (r_usec)
2372 *r_usec = usec;
2374 return 0;
2377 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2378 pa_assert(s);
2379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2381 if (negative)
2382 *negative = 0;
2384 if (a >= b)
2385 return a-b;
2386 else {
2387 if (negative && s->direction == PA_STREAM_RECORD) {
2388 *negative = 1;
2389 return b-a;
2390 } else
2391 return 0;
2395 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2396 pa_usec_t t, c;
2397 int r;
2398 int64_t cindex;
2400 pa_assert(s);
2401 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2402 pa_assert(r_usec);
2404 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2405 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2406 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2407 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2408 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2409 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2411 if ((r = pa_stream_get_time(s, &t)) < 0)
2412 return r;
2414 if (s->direction == PA_STREAM_PLAYBACK)
2415 cindex = s->timing_info.write_index;
2416 else
2417 cindex = s->timing_info.read_index;
2419 if (cindex < 0)
2420 cindex = 0;
2422 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2424 if (s->direction == PA_STREAM_PLAYBACK)
2425 *r_usec = time_counter_diff(s, c, t, negative);
2426 else
2427 *r_usec = time_counter_diff(s, t, c, negative);
2429 return 0;
2432 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2433 pa_assert(s);
2434 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2436 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2437 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2438 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2439 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2441 return &s->timing_info;
2444 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2445 pa_assert(s);
2446 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2448 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2450 return &s->sample_spec;
2453 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2454 pa_assert(s);
2455 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2457 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2459 return &s->channel_map;
2462 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2463 pa_assert(s);
2464 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2466 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2467 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2468 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2470 return &s->buffer_attr;
2473 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2474 pa_operation *o = userdata;
2475 int success = 1;
2477 pa_assert(pd);
2478 pa_assert(o);
2479 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2481 if (!o->context)
2482 goto finish;
2484 if (command != PA_COMMAND_REPLY) {
2485 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2486 goto finish;
2488 success = 0;
2489 } else {
2490 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2491 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2492 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2493 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2494 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2495 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2496 goto finish;
2498 } else if (o->stream->direction == PA_STREAM_RECORD) {
2499 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2500 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2501 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2502 goto finish;
2506 if (o->stream->context->version >= 13) {
2507 pa_usec_t usec;
2509 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2510 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2511 goto finish;
2514 if (o->stream->direction == PA_STREAM_RECORD)
2515 o->stream->timing_info.configured_source_usec = usec;
2516 else
2517 o->stream->timing_info.configured_sink_usec = usec;
2520 if (!pa_tagstruct_eof(t)) {
2521 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2522 goto finish;
2526 if (o->callback) {
2527 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2528 cb(o->stream, success, o->userdata);
2531 finish:
2532 pa_operation_done(o);
2533 pa_operation_unref(o);
2537 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2538 pa_operation *o;
2539 pa_tagstruct *t;
2540 uint32_t tag;
2541 pa_buffer_attr copy;
2543 pa_assert(s);
2544 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2545 pa_assert(attr);
2547 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2548 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2549 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2550 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2552 /* Ask for a timing update before we cork/uncork to get the best
2553 * accuracy for the transport latency suitable for the
2554 * check_smoother_status() call in the started callback */
2555 request_auto_timing_update(s, TRUE);
2557 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2559 t = pa_tagstruct_command(
2560 s->context,
2561 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2562 &tag);
2563 pa_tagstruct_putu32(t, s->channel);
2565 copy = *attr;
2566 patch_buffer_attr(s, &copy, NULL);
2567 attr = &copy;
2569 pa_tagstruct_putu32(t, attr->maxlength);
2571 if (s->direction == PA_STREAM_PLAYBACK)
2572 pa_tagstruct_put(
2574 PA_TAG_U32, attr->tlength,
2575 PA_TAG_U32, attr->prebuf,
2576 PA_TAG_U32, attr->minreq,
2577 PA_TAG_INVALID);
2578 else
2579 pa_tagstruct_putu32(t, attr->fragsize);
2581 if (s->context->version >= 13)
2582 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2584 if (s->context->version >= 14)
2585 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2587 pa_pstream_send_tagstruct(s->context->pstream, t);
2588 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2590 /* This might cause changes in the read/write indexex, hence let's
2591 * request a timing update */
2592 request_auto_timing_update(s, TRUE);
2594 return o;
2597 uint32_t pa_stream_get_device_index(pa_stream *s) {
2598 pa_assert(s);
2599 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2601 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2602 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2603 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2604 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2605 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2607 return s->device_index;
2610 const char *pa_stream_get_device_name(pa_stream *s) {
2611 pa_assert(s);
2612 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2614 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2615 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2616 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2617 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2618 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2620 return s->device_name;
2623 int pa_stream_is_suspended(pa_stream *s) {
2624 pa_assert(s);
2625 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2627 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2628 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2629 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2630 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2632 return s->suspended;
2635 int pa_stream_is_corked(pa_stream *s) {
2636 pa_assert(s);
2637 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2639 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2640 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2641 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2643 return s->corked;
2646 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2647 pa_operation *o = userdata;
2648 int success = 1;
2650 pa_assert(pd);
2651 pa_assert(o);
2652 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2654 if (!o->context)
2655 goto finish;
2657 if (command != PA_COMMAND_REPLY) {
2658 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2659 goto finish;
2661 success = 0;
2662 } else {
2664 if (!pa_tagstruct_eof(t)) {
2665 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2666 goto finish;
2670 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2671 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2673 if (o->callback) {
2674 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2675 cb(o->stream, success, o->userdata);
2678 finish:
2679 pa_operation_done(o);
2680 pa_operation_unref(o);
2684 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2685 pa_operation *o;
2686 pa_tagstruct *t;
2687 uint32_t tag;
2689 pa_assert(s);
2690 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2692 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2693 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2694 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2695 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2696 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2697 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2699 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2700 o->private = PA_UINT_TO_PTR(rate);
2702 t = pa_tagstruct_command(
2703 s->context,
2704 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2705 &tag);
2706 pa_tagstruct_putu32(t, s->channel);
2707 pa_tagstruct_putu32(t, rate);
2709 pa_pstream_send_tagstruct(s->context->pstream, t);
2710 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2712 return o;
2715 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2716 pa_operation *o;
2717 pa_tagstruct *t;
2718 uint32_t tag;
2720 pa_assert(s);
2721 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2723 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2724 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2725 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2726 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2727 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2729 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2731 t = pa_tagstruct_command(
2732 s->context,
2733 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2734 &tag);
2735 pa_tagstruct_putu32(t, s->channel);
2736 pa_tagstruct_putu32(t, (uint32_t) mode);
2737 pa_tagstruct_put_proplist(t, p);
2739 pa_pstream_send_tagstruct(s->context->pstream, t);
2740 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2742 /* Please note that we don't update s->proplist here, because we
2743 * don't export that field */
2745 return o;
2748 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2749 pa_operation *o;
2750 pa_tagstruct *t;
2751 uint32_t tag;
2752 const char * const*k;
2754 pa_assert(s);
2755 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2757 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2758 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2759 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2760 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2761 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2763 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2765 t = pa_tagstruct_command(
2766 s->context,
2767 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2768 &tag);
2769 pa_tagstruct_putu32(t, s->channel);
2771 for (k = keys; *k; k++)
2772 pa_tagstruct_puts(t, *k);
2774 pa_tagstruct_puts(t, NULL);
2776 pa_pstream_send_tagstruct(s->context->pstream, t);
2777 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2779 /* Please note that we don't update s->proplist here, because we
2780 * don't export that field */
2782 return o;
2785 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2786 pa_assert(s);
2787 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2789 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2790 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2791 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2792 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2794 s->direct_on_input = sink_input_idx;
2796 return 0;
2799 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2800 pa_assert(s);
2801 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2803 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2804 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2805 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2807 return s->direct_on_input;