modargs: New function: pa_modargs_get_value_double().
[pulseaudio-raopUDP/pulseaudio-raop-alac.git] / src / pulse / stream.c
blob2b6d3062ce8efabdcc5d854e2178eb2a008b3e3d
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
44 #include "internal.h"
45 #include "stream.h"
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
83 static pa_stream *pa_stream_new_with_proplist_internal(
84 pa_context *c,
85 const char *name,
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 unsigned int n_formats,
90 pa_proplist *p) {
92 pa_stream *s;
93 unsigned int i;
95 pa_assert(c);
96 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98 pa_assert(n_formats < PA_MAX_FORMATS);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103 s = pa_xnew(pa_stream, 1);
104 PA_REFCNT_INIT(s);
105 s->context = c;
106 s->mainloop = c->mainloop;
108 s->direction = PA_STREAM_NODIRECTION;
109 s->state = PA_STREAM_UNCONNECTED;
110 s->flags = 0;
112 if (ss)
113 s->sample_spec = *ss;
114 else
115 pa_sample_spec_init(&s->sample_spec);
117 if (map)
118 s->channel_map = *map;
119 else
120 pa_channel_map_init(&s->channel_map);
122 s->n_formats = 0;
123 if (formats) {
124 s->n_formats = n_formats;
125 for (i = 0; i < n_formats; i++)
126 s->req_formats[i] = pa_format_info_copy(formats[i]);
129 /* We'll get the final negotiated format after connecting */
130 s->format = NULL;
132 s->direct_on_input = PA_INVALID_INDEX;
134 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135 if (name)
136 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
138 s->channel = 0;
139 s->channel_valid = FALSE;
140 s->syncid = c->csyncid++;
141 s->stream_index = PA_INVALID_INDEX;
143 s->requested_bytes = 0;
144 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146 /* We initialize the target length here, so that if the user
147 * passes no explicit buffering metrics the default is similar to
148 * what older PA versions provided. */
150 s->buffer_attr.maxlength = (uint32_t) -1;
151 if (ss)
152 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153 else {
154 /* FIXME: We assume a worst-case compressed format corresponding to
155 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156 pa_sample_spec tmp_ss = {
157 .format = PA_SAMPLE_S16NE,
158 .rate = 48000,
159 .channels = 2,
161 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163 s->buffer_attr.minreq = (uint32_t) -1;
164 s->buffer_attr.prebuf = (uint32_t) -1;
165 s->buffer_attr.fragsize = (uint32_t) -1;
167 s->device_index = PA_INVALID_INDEX;
168 s->device_name = NULL;
169 s->suspended = FALSE;
170 s->corked = FALSE;
172 s->write_memblock = NULL;
173 s->write_data = NULL;
175 pa_memchunk_reset(&s->peek_memchunk);
176 s->peek_data = NULL;
177 s->record_memblockq = NULL;
179 memset(&s->timing_info, 0, sizeof(s->timing_info));
180 s->timing_info_valid = FALSE;
182 s->previous_time = 0;
183 s->latest_underrun_at_index = -1;
185 s->read_index_not_before = 0;
186 s->write_index_not_before = 0;
187 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188 s->write_index_corrections[i].valid = 0;
189 s->current_write_index_correction = 0;
191 s->auto_timing_update_event = NULL;
192 s->auto_timing_update_requested = FALSE;
193 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
195 reset_callbacks(s);
197 s->smoother = NULL;
199 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200 PA_LLIST_PREPEND(pa_stream, c->streams, s);
201 pa_stream_ref(s);
203 return s;
206 pa_stream *pa_stream_new_with_proplist(
207 pa_context *c,
208 const char *name,
209 const pa_sample_spec *ss,
210 const pa_channel_map *map,
211 pa_proplist *p) {
213 pa_channel_map tmap;
215 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
221 if (!map)
222 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
224 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
227 pa_stream *pa_stream_new_extended(
228 pa_context *c,
229 const char *name,
230 pa_format_info * const *formats,
231 unsigned int n_formats,
232 pa_proplist *p) {
234 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
236 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
239 static void stream_unlink(pa_stream *s) {
240 pa_operation *o, *n;
241 pa_assert(s);
243 if (!s->context)
244 return;
246 /* Detach from context */
248 /* Unref all operation objects that point to us */
249 for (o = s->context->operations; o; o = n) {
250 n = o->next;
252 if (o->stream == s)
253 pa_operation_cancel(o);
256 /* Drop all outstanding replies for this stream */
257 if (s->context->pdispatch)
258 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
260 if (s->channel_valid) {
261 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
262 s->channel = 0;
263 s->channel_valid = FALSE;
266 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
267 pa_stream_unref(s);
269 s->context = NULL;
271 if (s->auto_timing_update_event) {
272 pa_assert(s->mainloop);
273 s->mainloop->time_free(s->auto_timing_update_event);
276 reset_callbacks(s);
279 static void stream_free(pa_stream *s) {
280 unsigned int i;
282 pa_assert(s);
284 stream_unlink(s);
286 if (s->write_memblock) {
287 if (s->write_data)
288 pa_memblock_release(s->write_memblock);
289 pa_memblock_unref(s->write_memblock);
292 if (s->peek_memchunk.memblock) {
293 if (s->peek_data)
294 pa_memblock_release(s->peek_memchunk.memblock);
295 pa_memblock_unref(s->peek_memchunk.memblock);
298 if (s->record_memblockq)
299 pa_memblockq_free(s->record_memblockq);
301 if (s->proplist)
302 pa_proplist_free(s->proplist);
304 if (s->smoother)
305 pa_smoother_free(s->smoother);
307 for (i = 0; i < s->n_formats; i++)
308 pa_format_info_free(s->req_formats[i]);
310 if (s->format)
311 pa_format_info_free(s->format);
313 pa_xfree(s->device_name);
314 pa_xfree(s);
317 void pa_stream_unref(pa_stream *s) {
318 pa_assert(s);
319 pa_assert(PA_REFCNT_VALUE(s) >= 1);
321 if (PA_REFCNT_DEC(s) <= 0)
322 stream_free(s);
325 pa_stream* pa_stream_ref(pa_stream *s) {
326 pa_assert(s);
327 pa_assert(PA_REFCNT_VALUE(s) >= 1);
329 PA_REFCNT_INC(s);
330 return s;
333 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
334 pa_assert(s);
335 pa_assert(PA_REFCNT_VALUE(s) >= 1);
337 return s->state;
340 pa_context* pa_stream_get_context(pa_stream *s) {
341 pa_assert(s);
342 pa_assert(PA_REFCNT_VALUE(s) >= 1);
344 return s->context;
347 uint32_t pa_stream_get_index(pa_stream *s) {
348 pa_assert(s);
349 pa_assert(PA_REFCNT_VALUE(s) >= 1);
351 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
352 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
354 return s->stream_index;
357 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
358 pa_assert(s);
359 pa_assert(PA_REFCNT_VALUE(s) >= 1);
361 if (s->state == st)
362 return;
364 pa_stream_ref(s);
366 s->state = st;
368 if (s->state_callback)
369 s->state_callback(s, s->state_userdata);
371 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
372 stream_unlink(s);
374 pa_stream_unref(s);
377 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
378 pa_assert(s);
379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
381 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
382 return;
384 if (s->state == PA_STREAM_READY &&
385 (force || !s->auto_timing_update_requested)) {
386 pa_operation *o;
388 /* pa_log("Automatically requesting new timing data"); */
390 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
391 pa_operation_unref(o);
392 s->auto_timing_update_requested = TRUE;
396 if (s->auto_timing_update_event) {
397 if (s->suspended && !force) {
398 pa_assert(s->mainloop);
399 s->mainloop->time_free(s->auto_timing_update_event);
400 s->auto_timing_update_event = NULL;
401 } else {
402 if (force)
403 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
405 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
407 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
412 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
413 pa_context *c = userdata;
414 pa_stream *s;
415 uint32_t channel;
417 pa_assert(pd);
418 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
419 pa_assert(t);
420 pa_assert(c);
421 pa_assert(PA_REFCNT_VALUE(c) >= 1);
423 pa_context_ref(c);
425 if (pa_tagstruct_getu32(t, &channel) < 0 ||
426 !pa_tagstruct_eof(t)) {
427 pa_context_fail(c, PA_ERR_PROTOCOL);
428 goto finish;
431 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
432 goto finish;
434 if (s->state != PA_STREAM_READY)
435 goto finish;
437 pa_context_set_error(c, PA_ERR_KILLED);
438 pa_stream_set_state(s, PA_STREAM_FAILED);
440 finish:
441 pa_context_unref(c);
444 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
445 pa_usec_t x;
447 pa_assert(s);
448 pa_assert(!force_start || !force_stop);
450 if (!s->smoother)
451 return;
453 x = pa_rtclock_now();
455 if (s->timing_info_valid) {
456 if (aposteriori)
457 x -= s->timing_info.transport_usec;
458 else
459 x += s->timing_info.transport_usec;
462 if (s->suspended || s->corked || force_stop)
463 pa_smoother_pause(s->smoother, x);
464 else if (force_start || s->buffer_attr.prebuf == 0) {
466 if (!s->timing_info_valid &&
467 !aposteriori &&
468 !force_start &&
469 !force_stop &&
470 s->context->version >= 13) {
472 /* If the server supports STARTED events we take them as
473 * indications when audio really starts/stops playing, if
474 * we don't have any timing info yet -- instead of trying
475 * to be smart and guessing the server time. Otherwise the
476 * unknown transport delay adds too much noise to our time
477 * calculations. */
479 return;
482 pa_smoother_resume(s->smoother, x, TRUE);
485 /* Please note that we have no idea if playback actually started
486 * if prebuf is non-zero! */
489 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
491 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
492 pa_context *c = userdata;
493 pa_stream *s;
494 uint32_t channel;
495 const char *dn;
496 pa_bool_t suspended;
497 uint32_t di;
498 pa_usec_t usec = 0;
499 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
501 pa_assert(pd);
502 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
503 pa_assert(t);
504 pa_assert(c);
505 pa_assert(PA_REFCNT_VALUE(c) >= 1);
507 pa_context_ref(c);
509 if (c->version < 12) {
510 pa_context_fail(c, PA_ERR_PROTOCOL);
511 goto finish;
514 if (pa_tagstruct_getu32(t, &channel) < 0 ||
515 pa_tagstruct_getu32(t, &di) < 0 ||
516 pa_tagstruct_gets(t, &dn) < 0 ||
517 pa_tagstruct_get_boolean(t, &suspended) < 0) {
518 pa_context_fail(c, PA_ERR_PROTOCOL);
519 goto finish;
522 if (c->version >= 13) {
524 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
525 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
526 pa_tagstruct_getu32(t, &fragsize) < 0 ||
527 pa_tagstruct_get_usec(t, &usec) < 0) {
528 pa_context_fail(c, PA_ERR_PROTOCOL);
529 goto finish;
531 } else {
532 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
533 pa_tagstruct_getu32(t, &tlength) < 0 ||
534 pa_tagstruct_getu32(t, &prebuf) < 0 ||
535 pa_tagstruct_getu32(t, &minreq) < 0 ||
536 pa_tagstruct_get_usec(t, &usec) < 0) {
537 pa_context_fail(c, PA_ERR_PROTOCOL);
538 goto finish;
543 if (!pa_tagstruct_eof(t)) {
544 pa_context_fail(c, PA_ERR_PROTOCOL);
545 goto finish;
548 if (!dn || di == PA_INVALID_INDEX) {
549 pa_context_fail(c, PA_ERR_PROTOCOL);
550 goto finish;
553 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
554 goto finish;
556 if (s->state != PA_STREAM_READY)
557 goto finish;
559 if (c->version >= 13) {
560 if (s->direction == PA_STREAM_RECORD)
561 s->timing_info.configured_source_usec = usec;
562 else
563 s->timing_info.configured_sink_usec = usec;
565 s->buffer_attr.maxlength = maxlength;
566 s->buffer_attr.fragsize = fragsize;
567 s->buffer_attr.tlength = tlength;
568 s->buffer_attr.prebuf = prebuf;
569 s->buffer_attr.minreq = minreq;
572 pa_xfree(s->device_name);
573 s->device_name = pa_xstrdup(dn);
574 s->device_index = di;
576 s->suspended = suspended;
578 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
579 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
580 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
581 request_auto_timing_update(s, TRUE);
584 check_smoother_status(s, TRUE, FALSE, FALSE);
585 request_auto_timing_update(s, TRUE);
587 if (s->moved_callback)
588 s->moved_callback(s, s->moved_userdata);
590 finish:
591 pa_context_unref(c);
594 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
595 pa_context *c = userdata;
596 pa_stream *s;
597 uint32_t channel;
598 pa_usec_t usec = 0;
599 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
601 pa_assert(pd);
602 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
603 pa_assert(t);
604 pa_assert(c);
605 pa_assert(PA_REFCNT_VALUE(c) >= 1);
607 pa_context_ref(c);
609 if (c->version < 15) {
610 pa_context_fail(c, PA_ERR_PROTOCOL);
611 goto finish;
614 if (pa_tagstruct_getu32(t, &channel) < 0) {
615 pa_context_fail(c, PA_ERR_PROTOCOL);
616 goto finish;
619 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
620 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
621 pa_tagstruct_getu32(t, &fragsize) < 0 ||
622 pa_tagstruct_get_usec(t, &usec) < 0) {
623 pa_context_fail(c, PA_ERR_PROTOCOL);
624 goto finish;
626 } else {
627 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
628 pa_tagstruct_getu32(t, &tlength) < 0 ||
629 pa_tagstruct_getu32(t, &prebuf) < 0 ||
630 pa_tagstruct_getu32(t, &minreq) < 0 ||
631 pa_tagstruct_get_usec(t, &usec) < 0) {
632 pa_context_fail(c, PA_ERR_PROTOCOL);
633 goto finish;
637 if (!pa_tagstruct_eof(t)) {
638 pa_context_fail(c, PA_ERR_PROTOCOL);
639 goto finish;
642 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
643 goto finish;
645 if (s->state != PA_STREAM_READY)
646 goto finish;
648 if (s->direction == PA_STREAM_RECORD)
649 s->timing_info.configured_source_usec = usec;
650 else
651 s->timing_info.configured_sink_usec = usec;
653 s->buffer_attr.maxlength = maxlength;
654 s->buffer_attr.fragsize = fragsize;
655 s->buffer_attr.tlength = tlength;
656 s->buffer_attr.prebuf = prebuf;
657 s->buffer_attr.minreq = minreq;
659 request_auto_timing_update(s, TRUE);
661 if (s->buffer_attr_callback)
662 s->buffer_attr_callback(s, s->buffer_attr_userdata);
664 finish:
665 pa_context_unref(c);
668 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
669 pa_context *c = userdata;
670 pa_stream *s;
671 uint32_t channel;
672 pa_bool_t suspended;
674 pa_assert(pd);
675 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
676 pa_assert(t);
677 pa_assert(c);
678 pa_assert(PA_REFCNT_VALUE(c) >= 1);
680 pa_context_ref(c);
682 if (c->version < 12) {
683 pa_context_fail(c, PA_ERR_PROTOCOL);
684 goto finish;
687 if (pa_tagstruct_getu32(t, &channel) < 0 ||
688 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
689 !pa_tagstruct_eof(t)) {
690 pa_context_fail(c, PA_ERR_PROTOCOL);
691 goto finish;
694 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
695 goto finish;
697 if (s->state != PA_STREAM_READY)
698 goto finish;
700 s->suspended = suspended;
702 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
703 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
704 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
705 request_auto_timing_update(s, TRUE);
708 check_smoother_status(s, TRUE, FALSE, FALSE);
709 request_auto_timing_update(s, TRUE);
711 if (s->suspended_callback)
712 s->suspended_callback(s, s->suspended_userdata);
714 finish:
715 pa_context_unref(c);
718 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
719 pa_context *c = userdata;
720 pa_stream *s;
721 uint32_t channel;
723 pa_assert(pd);
724 pa_assert(command == PA_COMMAND_STARTED);
725 pa_assert(t);
726 pa_assert(c);
727 pa_assert(PA_REFCNT_VALUE(c) >= 1);
729 pa_context_ref(c);
731 if (c->version < 13) {
732 pa_context_fail(c, PA_ERR_PROTOCOL);
733 goto finish;
736 if (pa_tagstruct_getu32(t, &channel) < 0 ||
737 !pa_tagstruct_eof(t)) {
738 pa_context_fail(c, PA_ERR_PROTOCOL);
739 goto finish;
742 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
743 goto finish;
745 if (s->state != PA_STREAM_READY)
746 goto finish;
748 check_smoother_status(s, TRUE, TRUE, FALSE);
749 request_auto_timing_update(s, TRUE);
751 if (s->started_callback)
752 s->started_callback(s, s->started_userdata);
754 finish:
755 pa_context_unref(c);
758 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
759 pa_context *c = userdata;
760 pa_stream *s;
761 uint32_t channel;
762 pa_proplist *pl = NULL;
763 const char *event;
765 pa_assert(pd);
766 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
767 pa_assert(t);
768 pa_assert(c);
769 pa_assert(PA_REFCNT_VALUE(c) >= 1);
771 pa_context_ref(c);
773 if (c->version < 15) {
774 pa_context_fail(c, PA_ERR_PROTOCOL);
775 goto finish;
778 pl = pa_proplist_new();
780 if (pa_tagstruct_getu32(t, &channel) < 0 ||
781 pa_tagstruct_gets(t, &event) < 0 ||
782 pa_tagstruct_get_proplist(t, pl) < 0 ||
783 !pa_tagstruct_eof(t) || !event) {
784 pa_context_fail(c, PA_ERR_PROTOCOL);
785 goto finish;
788 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
789 goto finish;
791 if (s->state != PA_STREAM_READY)
792 goto finish;
794 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
795 /* Let client know what the running time was when the stream had to be killed */
796 pa_usec_t stream_time;
797 if (pa_stream_get_time(s, &stream_time) == 0)
798 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
801 if (s->event_callback)
802 s->event_callback(s, event, pl, s->event_userdata);
804 finish:
805 pa_context_unref(c);
807 if (pl)
808 pa_proplist_free(pl);
811 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
812 pa_stream *s;
813 pa_context *c = userdata;
814 uint32_t bytes, channel;
816 pa_assert(pd);
817 pa_assert(command == PA_COMMAND_REQUEST);
818 pa_assert(t);
819 pa_assert(c);
820 pa_assert(PA_REFCNT_VALUE(c) >= 1);
822 pa_context_ref(c);
824 if (pa_tagstruct_getu32(t, &channel) < 0 ||
825 pa_tagstruct_getu32(t, &bytes) < 0 ||
826 !pa_tagstruct_eof(t)) {
827 pa_context_fail(c, PA_ERR_PROTOCOL);
828 goto finish;
831 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
832 goto finish;
834 if (s->state != PA_STREAM_READY)
835 goto finish;
837 s->requested_bytes += bytes;
839 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
841 if (s->requested_bytes > 0 && s->write_callback)
842 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
844 finish:
845 pa_context_unref(c);
848 int64_t pa_stream_get_underflow_index(pa_stream *p)
850 pa_assert(p);
851 return p->latest_underrun_at_index;
854 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
855 pa_stream *s;
856 pa_context *c = userdata;
857 uint32_t channel;
858 int64_t offset = -1;
860 pa_assert(pd);
861 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
862 pa_assert(t);
863 pa_assert(c);
864 pa_assert(PA_REFCNT_VALUE(c) >= 1);
866 pa_context_ref(c);
868 if (pa_tagstruct_getu32(t, &channel) < 0) {
869 pa_context_fail(c, PA_ERR_PROTOCOL);
870 goto finish;
873 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
874 if (pa_tagstruct_gets64(t, &offset) < 0) {
875 pa_context_fail(c, PA_ERR_PROTOCOL);
876 goto finish;
880 if (!pa_tagstruct_eof(t)) {
881 pa_context_fail(c, PA_ERR_PROTOCOL);
882 goto finish;
885 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
886 goto finish;
888 if (s->state != PA_STREAM_READY)
889 goto finish;
891 if (offset != -1)
892 s->latest_underrun_at_index = offset;
894 if (s->buffer_attr.prebuf > 0)
895 check_smoother_status(s, TRUE, FALSE, TRUE);
897 request_auto_timing_update(s, TRUE);
899 if (command == PA_COMMAND_OVERFLOW) {
900 if (s->overflow_callback)
901 s->overflow_callback(s, s->overflow_userdata);
902 } else if (command == PA_COMMAND_UNDERFLOW) {
903 if (s->underflow_callback)
904 s->underflow_callback(s, s->underflow_userdata);
907 finish:
908 pa_context_unref(c);
911 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
912 pa_assert(s);
913 pa_assert(PA_REFCNT_VALUE(s) >= 1);
915 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
917 if (s->state != PA_STREAM_READY)
918 return;
920 if (w) {
921 s->write_index_not_before = s->context->ctag;
923 if (s->timing_info_valid)
924 s->timing_info.write_index_corrupt = TRUE;
926 /* pa_log("write_index invalidated"); */
929 if (r) {
930 s->read_index_not_before = s->context->ctag;
932 if (s->timing_info_valid)
933 s->timing_info.read_index_corrupt = TRUE;
935 /* pa_log("read_index invalidated"); */
938 request_auto_timing_update(s, TRUE);
941 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
942 pa_stream *s = userdata;
944 pa_assert(s);
945 pa_assert(PA_REFCNT_VALUE(s) >= 1);
947 pa_stream_ref(s);
948 request_auto_timing_update(s, FALSE);
949 pa_stream_unref(s);
952 static void create_stream_complete(pa_stream *s) {
953 pa_assert(s);
954 pa_assert(PA_REFCNT_VALUE(s) >= 1);
955 pa_assert(s->state == PA_STREAM_CREATING);
957 pa_stream_set_state(s, PA_STREAM_READY);
959 if (s->requested_bytes > 0 && s->write_callback)
960 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
962 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
963 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
964 pa_assert(!s->auto_timing_update_event);
965 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
967 request_auto_timing_update(s, TRUE);
970 check_smoother_status(s, TRUE, FALSE, FALSE);
973 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
974 const char *e;
976 pa_assert(s);
977 pa_assert(attr);
979 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
980 uint32_t ms;
982 if (pa_atou(e, &ms) < 0 || ms <= 0)
983 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
984 else {
985 attr->maxlength = (uint32_t) -1;
986 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
987 attr->minreq = (uint32_t) -1;
988 attr->prebuf = (uint32_t) -1;
989 attr->fragsize = attr->tlength;
992 if (flags)
993 *flags |= PA_STREAM_ADJUST_LATENCY;
996 if (s->context->version >= 13)
997 return;
999 /* Version older than 0.9.10 didn't do server side buffer_attr
1000 * selection, hence we have to fake it on the client side. */
1002 /* We choose fairly conservative values here, to not confuse
1003 * old clients with extremely large playback buffers */
1005 if (attr->maxlength == (uint32_t) -1)
1006 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1008 if (attr->tlength == (uint32_t) -1)
1009 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1011 if (attr->minreq == (uint32_t) -1)
1012 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1014 if (attr->prebuf == (uint32_t) -1)
1015 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1017 if (attr->fragsize == (uint32_t) -1)
1018 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1021 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1022 pa_stream *s = userdata;
1023 uint32_t requested_bytes = 0;
1025 pa_assert(pd);
1026 pa_assert(s);
1027 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1028 pa_assert(s->state == PA_STREAM_CREATING);
1030 pa_stream_ref(s);
1032 if (command != PA_COMMAND_REPLY) {
1033 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1034 goto finish;
1036 pa_stream_set_state(s, PA_STREAM_FAILED);
1037 goto finish;
1040 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1041 s->channel == PA_INVALID_INDEX ||
1042 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1043 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1044 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1045 goto finish;
1048 s->requested_bytes = (int64_t) requested_bytes;
1050 if (s->context->version >= 9) {
1051 if (s->direction == PA_STREAM_PLAYBACK) {
1052 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1053 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1054 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1055 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1056 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1057 goto finish;
1059 } else if (s->direction == PA_STREAM_RECORD) {
1060 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1061 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1062 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1063 goto finish;
1068 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1069 pa_sample_spec ss;
1070 pa_channel_map cm;
1071 const char *dn = NULL;
1072 pa_bool_t suspended;
1074 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1075 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1076 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1077 pa_tagstruct_gets(t, &dn) < 0 ||
1078 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1079 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1080 goto finish;
1083 if (!dn || s->device_index == PA_INVALID_INDEX ||
1084 ss.channels != cm.channels ||
1085 !pa_channel_map_valid(&cm) ||
1086 !pa_sample_spec_valid(&ss) ||
1087 (s->n_formats == 0 && (
1088 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1089 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1090 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1091 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1092 goto finish;
1095 pa_xfree(s->device_name);
1096 s->device_name = pa_xstrdup(dn);
1097 s->suspended = suspended;
1099 s->channel_map = cm;
1100 s->sample_spec = ss;
1103 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1104 pa_usec_t usec;
1106 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1107 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1108 goto finish;
1111 if (s->direction == PA_STREAM_RECORD)
1112 s->timing_info.configured_source_usec = usec;
1113 else
1114 s->timing_info.configured_sink_usec = usec;
1117 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1118 || s->context->version >= 22) {
1120 pa_format_info *f = pa_format_info_new();
1121 pa_tagstruct_get_format_info(t, f);
1123 if (pa_format_info_valid(f))
1124 s->format = f;
1125 else {
1126 pa_format_info_free(f);
1127 if (s->n_formats > 0) {
1128 /* We used the extended API, so we should have got back a proper format */
1129 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1130 goto finish;
1135 if (!pa_tagstruct_eof(t)) {
1136 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1137 goto finish;
1140 if (s->direction == PA_STREAM_RECORD) {
1141 pa_assert(!s->record_memblockq);
1143 s->record_memblockq = pa_memblockq_new(
1144 "client side record memblockq",
1146 s->buffer_attr.maxlength,
1148 &s->sample_spec,
1152 NULL);
1155 s->channel_valid = TRUE;
1156 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1158 create_stream_complete(s);
1160 finish:
1161 pa_stream_unref(s);
1164 static int create_stream(
1165 pa_stream_direction_t direction,
1166 pa_stream *s,
1167 const char *dev,
1168 const pa_buffer_attr *attr,
1169 pa_stream_flags_t flags,
1170 const pa_cvolume *volume,
1171 pa_stream *sync_stream) {
1173 pa_tagstruct *t;
1174 uint32_t tag;
1175 pa_bool_t volume_set = !!volume;
1176 pa_cvolume cv;
1177 uint32_t i;
1179 pa_assert(s);
1180 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1181 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1183 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1184 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1185 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1186 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1187 PA_STREAM_INTERPOLATE_TIMING|
1188 PA_STREAM_NOT_MONOTONIC|
1189 PA_STREAM_AUTO_TIMING_UPDATE|
1190 PA_STREAM_NO_REMAP_CHANNELS|
1191 PA_STREAM_NO_REMIX_CHANNELS|
1192 PA_STREAM_FIX_FORMAT|
1193 PA_STREAM_FIX_RATE|
1194 PA_STREAM_FIX_CHANNELS|
1195 PA_STREAM_DONT_MOVE|
1196 PA_STREAM_VARIABLE_RATE|
1197 PA_STREAM_PEAK_DETECT|
1198 PA_STREAM_START_MUTED|
1199 PA_STREAM_ADJUST_LATENCY|
1200 PA_STREAM_EARLY_REQUESTS|
1201 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1202 PA_STREAM_START_UNMUTED|
1203 PA_STREAM_FAIL_ON_SUSPEND|
1204 PA_STREAM_RELATIVE_VOLUME|
1205 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1208 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1209 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1210 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1211 /* Although some of the other flags are not supported on older
1212 * version, we don't check for them here, because it doesn't hurt
1213 * when they are passed but actually not supported. This makes
1214 * client development easier */
1216 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1217 PA_CHECK_VALIDITY(s->context, !volume || s->n_formats || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1218 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1219 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1221 pa_stream_ref(s);
1223 s->direction = direction;
1225 if (sync_stream)
1226 s->syncid = sync_stream->syncid;
1228 if (attr)
1229 s->buffer_attr = *attr;
1230 patch_buffer_attr(s, &s->buffer_attr, &flags);
1232 s->flags = flags;
1233 s->corked = !!(flags & PA_STREAM_START_CORKED);
1235 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1236 pa_usec_t x;
1238 x = pa_rtclock_now();
1240 pa_assert(!s->smoother);
1241 s->smoother = pa_smoother_new(
1242 SMOOTHER_ADJUST_TIME,
1243 SMOOTHER_HISTORY_TIME,
1244 !(flags & PA_STREAM_NOT_MONOTONIC),
1245 TRUE,
1246 SMOOTHER_MIN_HISTORY,
1248 TRUE);
1251 if (!dev)
1252 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1254 t = pa_tagstruct_command(
1255 s->context,
1256 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1257 &tag);
1259 if (s->context->version < 13)
1260 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1262 pa_tagstruct_put(
1264 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1265 PA_TAG_CHANNEL_MAP, &s->channel_map,
1266 PA_TAG_U32, PA_INVALID_INDEX,
1267 PA_TAG_STRING, dev,
1268 PA_TAG_U32, s->buffer_attr.maxlength,
1269 PA_TAG_BOOLEAN, s->corked,
1270 PA_TAG_INVALID);
1272 if (!volume) {
1273 if (pa_sample_spec_valid(&s->sample_spec))
1274 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1275 else {
1276 /* This is not really relevant, since no volume was set, and
1277 * the real number of channels is embedded in the format_info
1278 * structure */
1279 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1283 if (s->direction == PA_STREAM_PLAYBACK) {
1284 pa_tagstruct_put(
1286 PA_TAG_U32, s->buffer_attr.tlength,
1287 PA_TAG_U32, s->buffer_attr.prebuf,
1288 PA_TAG_U32, s->buffer_attr.minreq,
1289 PA_TAG_U32, s->syncid,
1290 PA_TAG_INVALID);
1292 pa_tagstruct_put_cvolume(t, volume);
1293 } else
1294 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1296 if (s->context->version >= 12) {
1297 pa_tagstruct_put(
1299 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1300 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1301 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1302 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1303 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1304 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1305 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1306 PA_TAG_INVALID);
1309 if (s->context->version >= 13) {
1311 if (s->direction == PA_STREAM_PLAYBACK)
1312 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1313 else
1314 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1316 pa_tagstruct_put(
1318 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1319 PA_TAG_PROPLIST, s->proplist,
1320 PA_TAG_INVALID);
1322 if (s->direction == PA_STREAM_RECORD)
1323 pa_tagstruct_putu32(t, s->direct_on_input);
1326 if (s->context->version >= 14) {
1328 if (s->direction == PA_STREAM_PLAYBACK)
1329 pa_tagstruct_put_boolean(t, volume_set);
1331 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1334 if (s->context->version >= 15) {
1336 if (s->direction == PA_STREAM_PLAYBACK)
1337 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1339 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1340 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1343 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1344 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1346 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1347 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1349 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1350 || s->context->version >= 22) {
1352 pa_tagstruct_putu8(t, s->n_formats);
1353 for (i = 0; i < s->n_formats; i++)
1354 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1357 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1358 pa_tagstruct_put_cvolume(t, volume);
1359 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1360 pa_tagstruct_put_boolean(t, volume_set);
1361 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1362 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1363 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1366 pa_pstream_send_tagstruct(s->context->pstream, t);
1367 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1369 pa_stream_set_state(s, PA_STREAM_CREATING);
1371 pa_stream_unref(s);
1372 return 0;
1375 int pa_stream_connect_playback(
1376 pa_stream *s,
1377 const char *dev,
1378 const pa_buffer_attr *attr,
1379 pa_stream_flags_t flags,
1380 const pa_cvolume *volume,
1381 pa_stream *sync_stream) {
1383 pa_assert(s);
1384 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1386 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1389 int pa_stream_connect_record(
1390 pa_stream *s,
1391 const char *dev,
1392 const pa_buffer_attr *attr,
1393 pa_stream_flags_t flags) {
1395 pa_assert(s);
1396 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1398 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1401 int pa_stream_begin_write(
1402 pa_stream *s,
1403 void **data,
1404 size_t *nbytes) {
1406 pa_assert(s);
1407 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1409 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1410 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1411 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1412 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1413 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1415 if (*nbytes != (size_t) -1) {
1416 size_t m, fs;
1418 m = pa_mempool_block_size_max(s->context->mempool);
1419 fs = pa_frame_size(&s->sample_spec);
1421 m = (m / fs) * fs;
1422 if (*nbytes > m)
1423 *nbytes = m;
1426 if (!s->write_memblock) {
1427 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1428 s->write_data = pa_memblock_acquire(s->write_memblock);
1431 *data = s->write_data;
1432 *nbytes = pa_memblock_get_length(s->write_memblock);
1434 return 0;
1437 int pa_stream_cancel_write(
1438 pa_stream *s) {
1440 pa_assert(s);
1441 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1443 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1444 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1445 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1446 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1448 pa_assert(s->write_data);
1450 pa_memblock_release(s->write_memblock);
1451 pa_memblock_unref(s->write_memblock);
1452 s->write_memblock = NULL;
1453 s->write_data = NULL;
1455 return 0;
1458 int pa_stream_write(
1459 pa_stream *s,
1460 const void *data,
1461 size_t length,
1462 pa_free_cb_t free_cb,
1463 int64_t offset,
1464 pa_seek_mode_t seek) {
1466 pa_assert(s);
1467 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1468 pa_assert(data);
1470 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1471 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1472 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1473 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1474 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1475 PA_CHECK_VALIDITY(s->context,
1476 !s->write_memblock ||
1477 ((data >= s->write_data) &&
1478 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1479 PA_ERR_INVALID);
1480 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1482 if (s->write_memblock) {
1483 pa_memchunk chunk;
1485 /* pa_stream_write_begin() was called before */
1487 pa_memblock_release(s->write_memblock);
1489 chunk.memblock = s->write_memblock;
1490 chunk.index = (const char *) data - (const char *) s->write_data;
1491 chunk.length = length;
1493 s->write_memblock = NULL;
1494 s->write_data = NULL;
1496 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1497 pa_memblock_unref(chunk.memblock);
1499 } else {
1500 pa_seek_mode_t t_seek = seek;
1501 int64_t t_offset = offset;
1502 size_t t_length = length;
1503 const void *t_data = data;
1505 /* pa_stream_write_begin() was not called before */
1507 while (t_length > 0) {
1508 pa_memchunk chunk;
1510 chunk.index = 0;
1512 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1513 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1514 chunk.length = t_length;
1515 } else {
1516 void *d;
1518 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1519 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1521 d = pa_memblock_acquire(chunk.memblock);
1522 memcpy(d, t_data, chunk.length);
1523 pa_memblock_release(chunk.memblock);
1526 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1528 t_offset = 0;
1529 t_seek = PA_SEEK_RELATIVE;
1531 t_data = (const uint8_t*) t_data + chunk.length;
1532 t_length -= chunk.length;
1534 pa_memblock_unref(chunk.memblock);
1537 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1538 free_cb((void*) data);
1541 /* This is obviously wrong since we ignore the seeking index . But
1542 * that's OK, the server side applies the same error */
1543 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1545 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1547 if (s->direction == PA_STREAM_PLAYBACK) {
1549 /* Update latency request correction */
1550 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1552 if (seek == PA_SEEK_ABSOLUTE) {
1553 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1554 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1555 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1556 } else if (seek == PA_SEEK_RELATIVE) {
1557 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1558 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1559 } else
1560 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1563 /* Update the write index in the already available latency data */
1564 if (s->timing_info_valid) {
1566 if (seek == PA_SEEK_ABSOLUTE) {
1567 s->timing_info.write_index_corrupt = FALSE;
1568 s->timing_info.write_index = offset + (int64_t) length;
1569 } else if (seek == PA_SEEK_RELATIVE) {
1570 if (!s->timing_info.write_index_corrupt)
1571 s->timing_info.write_index += offset + (int64_t) length;
1572 } else
1573 s->timing_info.write_index_corrupt = TRUE;
1576 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1577 request_auto_timing_update(s, TRUE);
1580 return 0;
1583 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1584 pa_assert(s);
1585 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1586 pa_assert(data);
1587 pa_assert(length);
1589 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1590 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1591 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1593 if (!s->peek_memchunk.memblock) {
1595 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1596 *data = NULL;
1597 *length = 0;
1598 return 0;
1601 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1604 pa_assert(s->peek_data);
1605 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1606 *length = s->peek_memchunk.length;
1607 return 0;
1610 int pa_stream_drop(pa_stream *s) {
1611 pa_assert(s);
1612 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1614 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1615 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1616 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1617 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1619 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1621 /* Fix the simulated local read index */
1622 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1623 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1625 pa_assert(s->peek_data);
1626 pa_memblock_release(s->peek_memchunk.memblock);
1627 pa_memblock_unref(s->peek_memchunk.memblock);
1628 pa_memchunk_reset(&s->peek_memchunk);
1630 return 0;
1633 size_t pa_stream_writable_size(pa_stream *s) {
1634 pa_assert(s);
1635 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1637 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1638 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1639 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1641 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1644 size_t pa_stream_readable_size(pa_stream *s) {
1645 pa_assert(s);
1646 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1648 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1649 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1650 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1652 return pa_memblockq_get_length(s->record_memblockq);
1655 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1656 pa_operation *o;
1657 pa_tagstruct *t;
1658 uint32_t tag;
1660 pa_assert(s);
1661 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1663 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1664 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1665 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1667 /* Ask for a timing update before we cork/uncork to get the best
1668 * accuracy for the transport latency suitable for the
1669 * check_smoother_status() call in the started callback */
1670 request_auto_timing_update(s, TRUE);
1672 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1674 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1675 pa_tagstruct_putu32(t, s->channel);
1676 pa_pstream_send_tagstruct(s->context->pstream, t);
1677 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1679 /* This might cause the read index to continue again, hence
1680 * let's request a timing update */
1681 request_auto_timing_update(s, TRUE);
1683 return o;
1686 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1687 pa_usec_t usec;
1689 pa_assert(s);
1690 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1691 pa_assert(s->state == PA_STREAM_READY);
1692 pa_assert(s->direction != PA_STREAM_UPLOAD);
1693 pa_assert(s->timing_info_valid);
1694 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1695 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1697 if (s->direction == PA_STREAM_PLAYBACK) {
1698 /* The last byte that was written into the output device
1699 * had this time value associated */
1700 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1702 if (!s->corked && !s->suspended) {
1704 if (!ignore_transport)
1705 /* Because the latency info took a little time to come
1706 * to us, we assume that the real output time is actually
1707 * a little ahead */
1708 usec += s->timing_info.transport_usec;
1710 /* However, the output device usually maintains a buffer
1711 too, hence the real sample currently played is a little
1712 back */
1713 if (s->timing_info.sink_usec >= usec)
1714 usec = 0;
1715 else
1716 usec -= s->timing_info.sink_usec;
1719 } else {
1720 pa_assert(s->direction == PA_STREAM_RECORD);
1722 /* The last byte written into the server side queue had
1723 * this time value associated */
1724 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1726 if (!s->corked && !s->suspended) {
1728 if (!ignore_transport)
1729 /* Add transport latency */
1730 usec += s->timing_info.transport_usec;
1732 /* Add latency of data in device buffer */
1733 usec += s->timing_info.source_usec;
1735 /* If this is a monitor source, we need to correct the
1736 * time by the playback device buffer */
1737 if (s->timing_info.sink_usec >= usec)
1738 usec = 0;
1739 else
1740 usec -= s->timing_info.sink_usec;
1744 return usec;
1747 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1748 pa_operation *o = userdata;
1749 struct timeval local, remote, now;
1750 pa_timing_info *i;
1751 pa_bool_t playing = FALSE;
1752 uint64_t underrun_for = 0, playing_for = 0;
1754 pa_assert(pd);
1755 pa_assert(o);
1756 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1758 if (!o->context || !o->stream)
1759 goto finish;
1761 i = &o->stream->timing_info;
1763 o->stream->timing_info_valid = FALSE;
1764 i->write_index_corrupt = TRUE;
1765 i->read_index_corrupt = TRUE;
1767 if (command != PA_COMMAND_REPLY) {
1768 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1769 goto finish;
1771 } else {
1773 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1774 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1775 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1776 pa_tagstruct_get_timeval(t, &local) < 0 ||
1777 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1778 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1779 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1781 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1782 goto finish;
1785 if (o->context->version >= 13 &&
1786 o->stream->direction == PA_STREAM_PLAYBACK)
1787 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1788 pa_tagstruct_getu64(t, &playing_for) < 0) {
1790 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1791 goto finish;
1795 if (!pa_tagstruct_eof(t)) {
1796 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1797 goto finish;
1799 o->stream->timing_info_valid = TRUE;
1800 i->write_index_corrupt = FALSE;
1801 i->read_index_corrupt = FALSE;
1803 i->playing = (int) playing;
1804 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1806 pa_gettimeofday(&now);
1808 /* Calculate timestamps */
1809 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1810 /* local and remote seem to have synchronized clocks */
1812 if (o->stream->direction == PA_STREAM_PLAYBACK)
1813 i->transport_usec = pa_timeval_diff(&remote, &local);
1814 else
1815 i->transport_usec = pa_timeval_diff(&now, &remote);
1817 i->synchronized_clocks = TRUE;
1818 i->timestamp = remote;
1819 } else {
1820 /* clocks are not synchronized, let's estimate latency then */
1821 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1822 i->synchronized_clocks = FALSE;
1823 i->timestamp = local;
1824 pa_timeval_add(&i->timestamp, i->transport_usec);
1827 /* Invalidate read and write indexes if necessary */
1828 if (tag < o->stream->read_index_not_before)
1829 i->read_index_corrupt = TRUE;
1831 if (tag < o->stream->write_index_not_before)
1832 i->write_index_corrupt = TRUE;
1834 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1835 /* Write index correction */
1837 int n, j;
1838 uint32_t ctag = tag;
1840 /* Go through the saved correction values and add up the
1841 * total correction.*/
1842 for (n = 0, j = o->stream->current_write_index_correction+1;
1843 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1844 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1846 /* Step over invalid data or out-of-date data */
1847 if (!o->stream->write_index_corrections[j].valid ||
1848 o->stream->write_index_corrections[j].tag < ctag)
1849 continue;
1851 /* Make sure that everything is in order */
1852 ctag = o->stream->write_index_corrections[j].tag+1;
1854 /* Now fix the write index */
1855 if (o->stream->write_index_corrections[j].corrupt) {
1856 /* A corrupting seek was made */
1857 i->write_index_corrupt = TRUE;
1858 } else if (o->stream->write_index_corrections[j].absolute) {
1859 /* An absolute seek was made */
1860 i->write_index = o->stream->write_index_corrections[j].value;
1861 i->write_index_corrupt = FALSE;
1862 } else if (!i->write_index_corrupt) {
1863 /* A relative seek was made */
1864 i->write_index += o->stream->write_index_corrections[j].value;
1868 /* Clear old correction entries */
1869 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1870 if (!o->stream->write_index_corrections[n].valid)
1871 continue;
1873 if (o->stream->write_index_corrections[n].tag <= tag)
1874 o->stream->write_index_corrections[n].valid = FALSE;
1878 if (o->stream->direction == PA_STREAM_RECORD) {
1879 /* Read index correction */
1881 if (!i->read_index_corrupt)
1882 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1885 /* Update smoother if we're not corked */
1886 if (o->stream->smoother && !o->stream->corked) {
1887 pa_usec_t u, x;
1889 u = x = pa_rtclock_now() - i->transport_usec;
1891 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1892 pa_usec_t su;
1894 /* If we weren't playing then it will take some time
1895 * until the audio will actually come out through the
1896 * speakers. Since we follow that timing here, we need
1897 * to try to fix this up */
1899 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1901 if (su < i->sink_usec)
1902 x += i->sink_usec - su;
1905 if (!i->playing)
1906 pa_smoother_pause(o->stream->smoother, x);
1908 /* Update the smoother */
1909 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1910 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1911 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1913 if (i->playing)
1914 pa_smoother_resume(o->stream->smoother, x, TRUE);
1918 o->stream->auto_timing_update_requested = FALSE;
1920 if (o->stream->latency_update_callback)
1921 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1923 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1924 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1925 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1928 finish:
1930 pa_operation_done(o);
1931 pa_operation_unref(o);
1934 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1935 uint32_t tag;
1936 pa_operation *o;
1937 pa_tagstruct *t;
1938 struct timeval now;
1939 int cidx = 0;
1941 pa_assert(s);
1942 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1944 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1945 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1946 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1948 if (s->direction == PA_STREAM_PLAYBACK) {
1949 /* Find a place to store the write_index correction data for this entry */
1950 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1952 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1953 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1955 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1957 t = pa_tagstruct_command(
1958 s->context,
1959 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1960 &tag);
1961 pa_tagstruct_putu32(t, s->channel);
1962 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1964 pa_pstream_send_tagstruct(s->context->pstream, t);
1965 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1967 if (s->direction == PA_STREAM_PLAYBACK) {
1968 /* Fill in initial correction data */
1970 s->current_write_index_correction = cidx;
1972 s->write_index_corrections[cidx].valid = TRUE;
1973 s->write_index_corrections[cidx].absolute = FALSE;
1974 s->write_index_corrections[cidx].corrupt = FALSE;
1975 s->write_index_corrections[cidx].tag = tag;
1976 s->write_index_corrections[cidx].value = 0;
1979 return o;
1982 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1983 pa_stream *s = userdata;
1985 pa_assert(pd);
1986 pa_assert(s);
1987 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1989 pa_stream_ref(s);
1991 if (command != PA_COMMAND_REPLY) {
1992 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1993 goto finish;
1995 pa_stream_set_state(s, PA_STREAM_FAILED);
1996 goto finish;
1997 } else if (!pa_tagstruct_eof(t)) {
1998 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1999 goto finish;
2002 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2004 finish:
2005 pa_stream_unref(s);
2008 int pa_stream_disconnect(pa_stream *s) {
2009 pa_tagstruct *t;
2010 uint32_t tag;
2012 pa_assert(s);
2013 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2015 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2016 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2017 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2019 pa_stream_ref(s);
2021 t = pa_tagstruct_command(
2022 s->context,
2023 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2024 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2025 &tag);
2026 pa_tagstruct_putu32(t, s->channel);
2027 pa_pstream_send_tagstruct(s->context->pstream, t);
2028 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2030 pa_stream_unref(s);
2031 return 0;
2034 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2035 pa_assert(s);
2036 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2038 if (pa_detect_fork())
2039 return;
2041 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2042 return;
2044 s->read_callback = cb;
2045 s->read_userdata = userdata;
2048 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2049 pa_assert(s);
2050 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2052 if (pa_detect_fork())
2053 return;
2055 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2056 return;
2058 s->write_callback = cb;
2059 s->write_userdata = userdata;
2062 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2063 pa_assert(s);
2064 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2066 if (pa_detect_fork())
2067 return;
2069 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2070 return;
2072 s->state_callback = cb;
2073 s->state_userdata = userdata;
2076 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2077 pa_assert(s);
2078 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2080 if (pa_detect_fork())
2081 return;
2083 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2084 return;
2086 s->overflow_callback = cb;
2087 s->overflow_userdata = userdata;
2090 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2091 pa_assert(s);
2092 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2094 if (pa_detect_fork())
2095 return;
2097 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2098 return;
2100 s->underflow_callback = cb;
2101 s->underflow_userdata = userdata;
2104 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2105 pa_assert(s);
2106 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2108 if (pa_detect_fork())
2109 return;
2111 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2112 return;
2114 s->latency_update_callback = cb;
2115 s->latency_update_userdata = userdata;
2118 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2119 pa_assert(s);
2120 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2122 if (pa_detect_fork())
2123 return;
2125 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2126 return;
2128 s->moved_callback = cb;
2129 s->moved_userdata = userdata;
2132 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2133 pa_assert(s);
2134 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2136 if (pa_detect_fork())
2137 return;
2139 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2140 return;
2142 s->suspended_callback = cb;
2143 s->suspended_userdata = userdata;
2146 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2147 pa_assert(s);
2148 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2150 if (pa_detect_fork())
2151 return;
2153 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2154 return;
2156 s->started_callback = cb;
2157 s->started_userdata = userdata;
2160 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2161 pa_assert(s);
2162 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2164 if (pa_detect_fork())
2165 return;
2167 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2168 return;
2170 s->event_callback = cb;
2171 s->event_userdata = userdata;
2174 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2175 pa_assert(s);
2176 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2178 if (pa_detect_fork())
2179 return;
2181 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2182 return;
2184 s->buffer_attr_callback = cb;
2185 s->buffer_attr_userdata = userdata;
2188 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2189 pa_operation *o = userdata;
2190 int success = 1;
2192 pa_assert(pd);
2193 pa_assert(o);
2194 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2196 if (!o->context)
2197 goto finish;
2199 if (command != PA_COMMAND_REPLY) {
2200 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2201 goto finish;
2203 success = 0;
2204 } else if (!pa_tagstruct_eof(t)) {
2205 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2206 goto finish;
2209 if (o->callback) {
2210 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2211 cb(o->stream, success, o->userdata);
2214 finish:
2215 pa_operation_done(o);
2216 pa_operation_unref(o);
2219 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2220 pa_operation *o;
2221 pa_tagstruct *t;
2222 uint32_t tag;
2224 pa_assert(s);
2225 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2227 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2228 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2229 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2231 /* Ask for a timing update before we cork/uncork to get the best
2232 * accuracy for the transport latency suitable for the
2233 * check_smoother_status() call in the started callback */
2234 request_auto_timing_update(s, TRUE);
2236 s->corked = b;
2238 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2240 t = pa_tagstruct_command(
2241 s->context,
2242 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2243 &tag);
2244 pa_tagstruct_putu32(t, s->channel);
2245 pa_tagstruct_put_boolean(t, !!b);
2246 pa_pstream_send_tagstruct(s->context->pstream, t);
2247 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2249 check_smoother_status(s, FALSE, FALSE, FALSE);
2251 /* This might cause the indexes to hang/start again, hence let's
2252 * request a timing update, after the cork/uncork, too */
2253 request_auto_timing_update(s, TRUE);
2255 return o;
2258 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2259 pa_tagstruct *t;
2260 pa_operation *o;
2261 uint32_t tag;
2263 pa_assert(s);
2264 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2266 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2267 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2269 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2271 t = pa_tagstruct_command(s->context, command, &tag);
2272 pa_tagstruct_putu32(t, s->channel);
2273 pa_pstream_send_tagstruct(s->context->pstream, t);
2274 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2276 return o;
2279 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2280 pa_operation *o;
2282 pa_assert(s);
2283 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2285 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2286 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2287 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2289 /* Ask for a timing update *before* the flush, so that the
2290 * transport usec is as up to date as possible when we get the
2291 * underflow message and update the smoother status*/
2292 request_auto_timing_update(s, TRUE);
2294 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2295 return NULL;
2297 if (s->direction == PA_STREAM_PLAYBACK) {
2299 if (s->write_index_corrections[s->current_write_index_correction].valid)
2300 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2302 if (s->buffer_attr.prebuf > 0)
2303 check_smoother_status(s, FALSE, FALSE, TRUE);
2305 /* This will change the write index, but leave the
2306 * read index untouched. */
2307 invalidate_indexes(s, FALSE, TRUE);
2309 } else
2310 /* For record streams this has no influence on the write
2311 * index, but the read index might jump. */
2312 invalidate_indexes(s, TRUE, FALSE);
2314 /* Note that we do not update requested_bytes here. This is
2315 * because we cannot really know how data actually was dropped
2316 * from the write index due to this. This 'error' will be applied
2317 * by both client and server and hence we should be fine. */
2319 return o;
2322 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2323 pa_operation *o;
2325 pa_assert(s);
2326 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2328 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2329 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2330 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2331 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2333 /* Ask for a timing update before we cork/uncork to get the best
2334 * accuracy for the transport latency suitable for the
2335 * check_smoother_status() call in the started callback */
2336 request_auto_timing_update(s, TRUE);
2338 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2339 return NULL;
2341 /* This might cause the read index to hang again, hence
2342 * let's request a timing update */
2343 request_auto_timing_update(s, TRUE);
2345 return o;
2348 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2349 pa_operation *o;
2351 pa_assert(s);
2352 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2356 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2357 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2359 /* Ask for a timing update before we cork/uncork to get the best
2360 * accuracy for the transport latency suitable for the
2361 * check_smoother_status() call in the started callback */
2362 request_auto_timing_update(s, TRUE);
2364 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2365 return NULL;
2367 /* This might cause the read index to start moving again, hence
2368 * let's request a timing update */
2369 request_auto_timing_update(s, TRUE);
2371 return o;
2374 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2375 pa_operation *o;
2377 pa_assert(s);
2378 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2379 pa_assert(name);
2381 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2383 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2385 if (s->context->version >= 13) {
2386 pa_proplist *p = pa_proplist_new();
2388 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2389 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2390 pa_proplist_free(p);
2391 } else {
2392 pa_tagstruct *t;
2393 uint32_t tag;
2395 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2396 t = pa_tagstruct_command(
2397 s->context,
2398 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2399 &tag);
2400 pa_tagstruct_putu32(t, s->channel);
2401 pa_tagstruct_puts(t, name);
2402 pa_pstream_send_tagstruct(s->context->pstream, t);
2403 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2406 return o;
2409 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2410 pa_usec_t usec;
2412 pa_assert(s);
2413 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2415 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2416 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2417 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2418 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2419 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2420 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2422 if (s->smoother)
2423 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2424 else
2425 usec = calc_time(s, FALSE);
2427 /* Make sure the time runs monotonically */
2428 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2429 if (usec < s->previous_time)
2430 usec = s->previous_time;
2431 else
2432 s->previous_time = usec;
2435 if (r_usec)
2436 *r_usec = usec;
2438 return 0;
2441 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2442 pa_assert(s);
2443 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2445 if (negative)
2446 *negative = 0;
2448 if (a >= b)
2449 return a-b;
2450 else {
2451 if (negative && s->direction == PA_STREAM_RECORD) {
2452 *negative = 1;
2453 return b-a;
2454 } else
2455 return 0;
2459 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2460 pa_usec_t t, c;
2461 int r;
2462 int64_t cindex;
2464 pa_assert(s);
2465 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2466 pa_assert(r_usec);
2468 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2469 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2470 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2471 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2472 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2473 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2475 if ((r = pa_stream_get_time(s, &t)) < 0)
2476 return r;
2478 if (s->direction == PA_STREAM_PLAYBACK)
2479 cindex = s->timing_info.write_index;
2480 else
2481 cindex = s->timing_info.read_index;
2483 if (cindex < 0)
2484 cindex = 0;
2486 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2488 if (s->direction == PA_STREAM_PLAYBACK)
2489 *r_usec = time_counter_diff(s, c, t, negative);
2490 else
2491 *r_usec = time_counter_diff(s, t, c, negative);
2493 return 0;
2496 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2497 pa_assert(s);
2498 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2500 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2501 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2502 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2503 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2505 return &s->timing_info;
2508 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2509 pa_assert(s);
2510 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2512 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2514 return &s->sample_spec;
2517 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2518 pa_assert(s);
2519 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2521 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2523 return &s->channel_map;
2526 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2527 pa_assert(s);
2528 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2530 /* We don't have the format till routing is done */
2531 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2532 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2534 return s->format;
2536 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2537 pa_assert(s);
2538 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2540 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2541 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2542 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2544 return &s->buffer_attr;
2547 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2548 pa_operation *o = userdata;
2549 int success = 1;
2551 pa_assert(pd);
2552 pa_assert(o);
2553 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2555 if (!o->context)
2556 goto finish;
2558 if (command != PA_COMMAND_REPLY) {
2559 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2560 goto finish;
2562 success = 0;
2563 } else {
2564 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2565 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2566 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2567 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2568 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2569 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2570 goto finish;
2572 } else if (o->stream->direction == PA_STREAM_RECORD) {
2573 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2574 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2575 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2576 goto finish;
2580 if (o->stream->context->version >= 13) {
2581 pa_usec_t usec;
2583 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2584 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2585 goto finish;
2588 if (o->stream->direction == PA_STREAM_RECORD)
2589 o->stream->timing_info.configured_source_usec = usec;
2590 else
2591 o->stream->timing_info.configured_sink_usec = usec;
2594 if (!pa_tagstruct_eof(t)) {
2595 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2596 goto finish;
2600 if (o->callback) {
2601 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2602 cb(o->stream, success, o->userdata);
2605 finish:
2606 pa_operation_done(o);
2607 pa_operation_unref(o);
2611 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2612 pa_operation *o;
2613 pa_tagstruct *t;
2614 uint32_t tag;
2615 pa_buffer_attr copy;
2617 pa_assert(s);
2618 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2619 pa_assert(attr);
2621 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2622 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2623 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2624 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2626 /* Ask for a timing update before we cork/uncork to get the best
2627 * accuracy for the transport latency suitable for the
2628 * check_smoother_status() call in the started callback */
2629 request_auto_timing_update(s, TRUE);
2631 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2633 t = pa_tagstruct_command(
2634 s->context,
2635 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2636 &tag);
2637 pa_tagstruct_putu32(t, s->channel);
2639 copy = *attr;
2640 patch_buffer_attr(s, &copy, NULL);
2641 attr = &copy;
2643 pa_tagstruct_putu32(t, attr->maxlength);
2645 if (s->direction == PA_STREAM_PLAYBACK)
2646 pa_tagstruct_put(
2648 PA_TAG_U32, attr->tlength,
2649 PA_TAG_U32, attr->prebuf,
2650 PA_TAG_U32, attr->minreq,
2651 PA_TAG_INVALID);
2652 else
2653 pa_tagstruct_putu32(t, attr->fragsize);
2655 if (s->context->version >= 13)
2656 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2658 if (s->context->version >= 14)
2659 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2661 pa_pstream_send_tagstruct(s->context->pstream, t);
2662 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2664 /* This might cause changes in the read/write index, hence let's
2665 * request a timing update */
2666 request_auto_timing_update(s, TRUE);
2668 return o;
2671 uint32_t pa_stream_get_device_index(pa_stream *s) {
2672 pa_assert(s);
2673 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2675 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2676 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2677 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2678 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2679 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2681 return s->device_index;
2684 const char *pa_stream_get_device_name(pa_stream *s) {
2685 pa_assert(s);
2686 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2688 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2689 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2690 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2691 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2692 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2694 return s->device_name;
2697 int pa_stream_is_suspended(pa_stream *s) {
2698 pa_assert(s);
2699 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2701 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2702 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2703 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2704 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2706 return s->suspended;
2709 int pa_stream_is_corked(pa_stream *s) {
2710 pa_assert(s);
2711 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2713 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2714 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2715 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2717 return s->corked;
2720 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2721 pa_operation *o = userdata;
2722 int success = 1;
2724 pa_assert(pd);
2725 pa_assert(o);
2726 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2728 if (!o->context)
2729 goto finish;
2731 if (command != PA_COMMAND_REPLY) {
2732 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2733 goto finish;
2735 success = 0;
2736 } else {
2738 if (!pa_tagstruct_eof(t)) {
2739 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2740 goto finish;
2744 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2745 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2747 if (o->callback) {
2748 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2749 cb(o->stream, success, o->userdata);
2752 finish:
2753 pa_operation_done(o);
2754 pa_operation_unref(o);
2758 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2759 pa_operation *o;
2760 pa_tagstruct *t;
2761 uint32_t tag;
2763 pa_assert(s);
2764 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2766 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2767 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2768 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2769 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2770 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2771 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2773 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2774 o->private = PA_UINT_TO_PTR(rate);
2776 t = pa_tagstruct_command(
2777 s->context,
2778 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2779 &tag);
2780 pa_tagstruct_putu32(t, s->channel);
2781 pa_tagstruct_putu32(t, rate);
2783 pa_pstream_send_tagstruct(s->context->pstream, t);
2784 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2786 return o;
2789 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2790 pa_operation *o;
2791 pa_tagstruct *t;
2792 uint32_t tag;
2794 pa_assert(s);
2795 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2797 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2798 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2799 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2800 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2801 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2803 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2805 t = pa_tagstruct_command(
2806 s->context,
2807 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2808 &tag);
2809 pa_tagstruct_putu32(t, s->channel);
2810 pa_tagstruct_putu32(t, (uint32_t) mode);
2811 pa_tagstruct_put_proplist(t, p);
2813 pa_pstream_send_tagstruct(s->context->pstream, t);
2814 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2816 /* Please note that we don't update s->proplist here, because we
2817 * don't export that field */
2819 return o;
2822 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2823 pa_operation *o;
2824 pa_tagstruct *t;
2825 uint32_t tag;
2826 const char * const*k;
2828 pa_assert(s);
2829 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2831 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2832 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2833 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2834 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2835 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2837 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2839 t = pa_tagstruct_command(
2840 s->context,
2841 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2842 &tag);
2843 pa_tagstruct_putu32(t, s->channel);
2845 for (k = keys; *k; k++)
2846 pa_tagstruct_puts(t, *k);
2848 pa_tagstruct_puts(t, NULL);
2850 pa_pstream_send_tagstruct(s->context->pstream, t);
2851 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2853 /* Please note that we don't update s->proplist here, because we
2854 * don't export that field */
2856 return o;
2859 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2860 pa_assert(s);
2861 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2863 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2864 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2865 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2866 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2868 s->direct_on_input = sink_input_idx;
2870 return 0;
2873 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2874 pa_assert(s);
2875 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2877 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2878 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2879 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2881 return s->direct_on_input;