client: limit block size for zero-copy operations to mempool block size
[pulseaudio-raopUDP/pulseaudio-raop-alac.git] / src / pulse / stream.c
blob72d49e11e91514e4bdc1b13117cdda8b67543c8f
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
42 #include "fork-detect.h"
43 #include "internal.h"
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 static void reset_callbacks(pa_stream *s) {
57 s->read_callback = NULL;
58 s->read_userdata = NULL;
59 s->write_callback = NULL;
60 s->write_userdata = NULL;
61 s->state_callback = NULL;
62 s->state_userdata = NULL;
63 s->overflow_callback = NULL;
64 s->overflow_userdata = NULL;
65 s->underflow_callback = NULL;
66 s->underflow_userdata = NULL;
67 s->latency_update_callback = NULL;
68 s->latency_update_userdata = NULL;
69 s->moved_callback = NULL;
70 s->moved_userdata = NULL;
71 s->suspended_callback = NULL;
72 s->suspended_userdata = NULL;
73 s->started_callback = NULL;
74 s->started_userdata = NULL;
75 s->event_callback = NULL;
76 s->event_userdata = NULL;
77 s->buffer_attr_callback = NULL;
78 s->buffer_attr_userdata = NULL;
81 pa_stream *pa_stream_new_with_proplist(
82 pa_context *c,
83 const char *name,
84 const pa_sample_spec *ss,
85 const pa_channel_map *map,
86 pa_proplist *p) {
88 pa_stream *s;
89 int i;
90 pa_channel_map tmap;
92 pa_assert(c);
93 pa_assert(PA_REFCNT_VALUE(c) >= 1);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103 if (!map)
104 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
106 s = pa_xnew(pa_stream, 1);
107 PA_REFCNT_INIT(s);
108 s->context = c;
109 s->mainloop = c->mainloop;
111 s->direction = PA_STREAM_NODIRECTION;
112 s->state = PA_STREAM_UNCONNECTED;
113 s->flags = 0;
115 s->sample_spec = *ss;
116 s->channel_map = *map;
118 s->direct_on_input = PA_INVALID_INDEX;
120 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121 if (name)
122 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
124 s->channel = 0;
125 s->channel_valid = FALSE;
126 s->syncid = c->csyncid++;
127 s->stream_index = PA_INVALID_INDEX;
129 s->requested_bytes = 0;
130 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
132 /* We initialize der target length here, so that if the user
133 * passes no explicit buffering metrics the default is similar to
134 * what older PA versions provided. */
136 s->buffer_attr.maxlength = (uint32_t) -1;
137 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138 s->buffer_attr.minreq = (uint32_t) -1;
139 s->buffer_attr.prebuf = (uint32_t) -1;
140 s->buffer_attr.fragsize = (uint32_t) -1;
142 s->device_index = PA_INVALID_INDEX;
143 s->device_name = NULL;
144 s->suspended = FALSE;
145 s->corked = FALSE;
147 s->write_memblock = NULL;
148 s->write_data = NULL;
150 pa_memchunk_reset(&s->peek_memchunk);
151 s->peek_data = NULL;
152 s->record_memblockq = NULL;
154 memset(&s->timing_info, 0, sizeof(s->timing_info));
155 s->timing_info_valid = FALSE;
157 s->previous_time = 0;
159 s->read_index_not_before = 0;
160 s->write_index_not_before = 0;
161 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162 s->write_index_corrections[i].valid = 0;
163 s->current_write_index_correction = 0;
165 s->auto_timing_update_event = NULL;
166 s->auto_timing_update_requested = FALSE;
167 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
169 reset_callbacks(s);
171 s->smoother = NULL;
173 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174 PA_LLIST_PREPEND(pa_stream, c->streams, s);
175 pa_stream_ref(s);
177 return s;
180 static void stream_unlink(pa_stream *s) {
181 pa_operation *o, *n;
182 pa_assert(s);
184 if (!s->context)
185 return;
187 /* Detach from context */
189 /* Unref all operatio object that point to us */
190 for (o = s->context->operations; o; o = n) {
191 n = o->next;
193 if (o->stream == s)
194 pa_operation_cancel(o);
197 /* Drop all outstanding replies for this stream */
198 if (s->context->pdispatch)
199 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
201 if (s->channel_valid) {
202 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203 s->channel = 0;
204 s->channel_valid = FALSE;
207 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208 pa_stream_unref(s);
210 s->context = NULL;
212 if (s->auto_timing_update_event) {
213 pa_assert(s->mainloop);
214 s->mainloop->time_free(s->auto_timing_update_event);
217 reset_callbacks(s);
220 static void stream_free(pa_stream *s) {
221 pa_assert(s);
223 stream_unlink(s);
225 if (s->write_memblock) {
226 pa_memblock_release(s->write_memblock);
227 pa_memblock_unref(s->write_data);
230 if (s->peek_memchunk.memblock) {
231 if (s->peek_data)
232 pa_memblock_release(s->peek_memchunk.memblock);
233 pa_memblock_unref(s->peek_memchunk.memblock);
236 if (s->record_memblockq)
237 pa_memblockq_free(s->record_memblockq);
239 if (s->proplist)
240 pa_proplist_free(s->proplist);
242 if (s->smoother)
243 pa_smoother_free(s->smoother);
245 pa_xfree(s->device_name);
246 pa_xfree(s);
249 void pa_stream_unref(pa_stream *s) {
250 pa_assert(s);
251 pa_assert(PA_REFCNT_VALUE(s) >= 1);
253 if (PA_REFCNT_DEC(s) <= 0)
254 stream_free(s);
257 pa_stream* pa_stream_ref(pa_stream *s) {
258 pa_assert(s);
259 pa_assert(PA_REFCNT_VALUE(s) >= 1);
261 PA_REFCNT_INC(s);
262 return s;
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266 pa_assert(s);
267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
269 return s->state;
272 pa_context* pa_stream_get_context(pa_stream *s) {
273 pa_assert(s);
274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
276 return s->context;
279 uint32_t pa_stream_get_index(pa_stream *s) {
280 pa_assert(s);
281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
283 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
286 return s->stream_index;
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290 pa_assert(s);
291 pa_assert(PA_REFCNT_VALUE(s) >= 1);
293 if (s->state == st)
294 return;
296 pa_stream_ref(s);
298 s->state = st;
300 if (s->state_callback)
301 s->state_callback(s, s->state_userdata);
303 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304 stream_unlink(s);
306 pa_stream_unref(s);
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310 pa_assert(s);
311 pa_assert(PA_REFCNT_VALUE(s) >= 1);
313 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314 return;
316 if (s->state == PA_STREAM_READY &&
317 (force || !s->auto_timing_update_requested)) {
318 pa_operation *o;
320 /* pa_log("Automatically requesting new timing data"); */
322 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323 pa_operation_unref(o);
324 s->auto_timing_update_requested = TRUE;
328 if (s->auto_timing_update_event) {
329 if (force)
330 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
332 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
334 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339 pa_context *c = userdata;
340 pa_stream *s;
341 uint32_t channel;
343 pa_assert(pd);
344 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345 pa_assert(t);
346 pa_assert(c);
347 pa_assert(PA_REFCNT_VALUE(c) >= 1);
349 pa_context_ref(c);
351 if (pa_tagstruct_getu32(t, &channel) < 0 ||
352 !pa_tagstruct_eof(t)) {
353 pa_context_fail(c, PA_ERR_PROTOCOL);
354 goto finish;
357 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358 goto finish;
360 if (s->state != PA_STREAM_READY)
361 goto finish;
363 pa_context_set_error(c, PA_ERR_KILLED);
364 pa_stream_set_state(s, PA_STREAM_FAILED);
366 finish:
367 pa_context_unref(c);
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371 pa_usec_t x;
373 pa_assert(s);
374 pa_assert(!force_start || !force_stop);
376 if (!s->smoother)
377 return;
379 x = pa_rtclock_now();
381 if (s->timing_info_valid) {
382 if (aposteriori)
383 x -= s->timing_info.transport_usec;
384 else
385 x += s->timing_info.transport_usec;
388 if (s->suspended || s->corked || force_stop)
389 pa_smoother_pause(s->smoother, x);
390 else if (force_start || s->buffer_attr.prebuf == 0)
391 pa_smoother_resume(s->smoother, x, TRUE);
394 /* Please note that we have no idea if playback actually started
395 * if prebuf is non-zero! */
398 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
399 pa_context *c = userdata;
400 pa_stream *s;
401 uint32_t channel;
402 const char *dn;
403 pa_bool_t suspended;
404 uint32_t di;
405 pa_usec_t usec = 0;
406 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
408 pa_assert(pd);
409 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
410 pa_assert(t);
411 pa_assert(c);
412 pa_assert(PA_REFCNT_VALUE(c) >= 1);
414 pa_context_ref(c);
416 if (c->version < 12) {
417 pa_context_fail(c, PA_ERR_PROTOCOL);
418 goto finish;
421 if (pa_tagstruct_getu32(t, &channel) < 0 ||
422 pa_tagstruct_getu32(t, &di) < 0 ||
423 pa_tagstruct_gets(t, &dn) < 0 ||
424 pa_tagstruct_get_boolean(t, &suspended) < 0) {
425 pa_context_fail(c, PA_ERR_PROTOCOL);
426 goto finish;
429 if (c->version >= 13) {
431 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
432 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
433 pa_tagstruct_getu32(t, &fragsize) < 0 ||
434 pa_tagstruct_get_usec(t, &usec) < 0) {
435 pa_context_fail(c, PA_ERR_PROTOCOL);
436 goto finish;
438 } else {
439 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
440 pa_tagstruct_getu32(t, &tlength) < 0 ||
441 pa_tagstruct_getu32(t, &prebuf) < 0 ||
442 pa_tagstruct_getu32(t, &minreq) < 0 ||
443 pa_tagstruct_get_usec(t, &usec) < 0) {
444 pa_context_fail(c, PA_ERR_PROTOCOL);
445 goto finish;
450 if (!pa_tagstruct_eof(t)) {
451 pa_context_fail(c, PA_ERR_PROTOCOL);
452 goto finish;
455 if (!dn || di == PA_INVALID_INDEX) {
456 pa_context_fail(c, PA_ERR_PROTOCOL);
457 goto finish;
460 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
461 goto finish;
463 if (s->state != PA_STREAM_READY)
464 goto finish;
466 if (c->version >= 13) {
467 if (s->direction == PA_STREAM_RECORD)
468 s->timing_info.configured_source_usec = usec;
469 else
470 s->timing_info.configured_sink_usec = usec;
472 s->buffer_attr.maxlength = maxlength;
473 s->buffer_attr.fragsize = fragsize;
474 s->buffer_attr.tlength = tlength;
475 s->buffer_attr.prebuf = prebuf;
476 s->buffer_attr.minreq = minreq;
479 pa_xfree(s->device_name);
480 s->device_name = pa_xstrdup(dn);
481 s->device_index = di;
483 s->suspended = suspended;
485 check_smoother_status(s, TRUE, FALSE, FALSE);
486 request_auto_timing_update(s, TRUE);
488 if (s->moved_callback)
489 s->moved_callback(s, s->moved_userdata);
491 finish:
492 pa_context_unref(c);
495 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
496 pa_context *c = userdata;
497 pa_stream *s;
498 uint32_t channel;
499 pa_usec_t usec = 0;
500 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
502 pa_assert(pd);
503 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
504 pa_assert(t);
505 pa_assert(c);
506 pa_assert(PA_REFCNT_VALUE(c) >= 1);
508 pa_context_ref(c);
510 if (c->version < 15) {
511 pa_context_fail(c, PA_ERR_PROTOCOL);
512 goto finish;
515 if (pa_tagstruct_getu32(t, &channel) < 0) {
516 pa_context_fail(c, PA_ERR_PROTOCOL);
517 goto finish;
520 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
521 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
522 pa_tagstruct_getu32(t, &fragsize) < 0 ||
523 pa_tagstruct_get_usec(t, &usec) < 0) {
524 pa_context_fail(c, PA_ERR_PROTOCOL);
525 goto finish;
527 } else {
528 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
529 pa_tagstruct_getu32(t, &tlength) < 0 ||
530 pa_tagstruct_getu32(t, &prebuf) < 0 ||
531 pa_tagstruct_getu32(t, &minreq) < 0 ||
532 pa_tagstruct_get_usec(t, &usec) < 0) {
533 pa_context_fail(c, PA_ERR_PROTOCOL);
534 goto finish;
538 if (!pa_tagstruct_eof(t)) {
539 pa_context_fail(c, PA_ERR_PROTOCOL);
540 goto finish;
543 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
544 goto finish;
546 if (s->state != PA_STREAM_READY)
547 goto finish;
549 if (s->direction == PA_STREAM_RECORD)
550 s->timing_info.configured_source_usec = usec;
551 else
552 s->timing_info.configured_sink_usec = usec;
554 s->buffer_attr.maxlength = maxlength;
555 s->buffer_attr.fragsize = fragsize;
556 s->buffer_attr.tlength = tlength;
557 s->buffer_attr.prebuf = prebuf;
558 s->buffer_attr.minreq = minreq;
560 request_auto_timing_update(s, TRUE);
562 if (s->buffer_attr_callback)
563 s->buffer_attr_callback(s, s->buffer_attr_userdata);
565 finish:
566 pa_context_unref(c);
569 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
570 pa_context *c = userdata;
571 pa_stream *s;
572 uint32_t channel;
573 pa_bool_t suspended;
575 pa_assert(pd);
576 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
577 pa_assert(t);
578 pa_assert(c);
579 pa_assert(PA_REFCNT_VALUE(c) >= 1);
581 pa_context_ref(c);
583 if (c->version < 12) {
584 pa_context_fail(c, PA_ERR_PROTOCOL);
585 goto finish;
588 if (pa_tagstruct_getu32(t, &channel) < 0 ||
589 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
590 !pa_tagstruct_eof(t)) {
591 pa_context_fail(c, PA_ERR_PROTOCOL);
592 goto finish;
595 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
596 goto finish;
598 if (s->state != PA_STREAM_READY)
599 goto finish;
601 s->suspended = suspended;
603 check_smoother_status(s, TRUE, FALSE, FALSE);
604 request_auto_timing_update(s, TRUE);
606 if (s->suspended_callback)
607 s->suspended_callback(s, s->suspended_userdata);
609 finish:
610 pa_context_unref(c);
613 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
614 pa_context *c = userdata;
615 pa_stream *s;
616 uint32_t channel;
618 pa_assert(pd);
619 pa_assert(command == PA_COMMAND_STARTED);
620 pa_assert(t);
621 pa_assert(c);
622 pa_assert(PA_REFCNT_VALUE(c) >= 1);
624 pa_context_ref(c);
626 if (c->version < 13) {
627 pa_context_fail(c, PA_ERR_PROTOCOL);
628 goto finish;
631 if (pa_tagstruct_getu32(t, &channel) < 0 ||
632 !pa_tagstruct_eof(t)) {
633 pa_context_fail(c, PA_ERR_PROTOCOL);
634 goto finish;
637 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
638 goto finish;
640 if (s->state != PA_STREAM_READY)
641 goto finish;
643 check_smoother_status(s, TRUE, TRUE, FALSE);
644 request_auto_timing_update(s, TRUE);
646 if (s->started_callback)
647 s->started_callback(s, s->started_userdata);
649 finish:
650 pa_context_unref(c);
653 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
654 pa_context *c = userdata;
655 pa_stream *s;
656 uint32_t channel;
657 pa_proplist *pl = NULL;
658 const char *event;
660 pa_assert(pd);
661 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
662 pa_assert(t);
663 pa_assert(c);
664 pa_assert(PA_REFCNT_VALUE(c) >= 1);
666 pa_context_ref(c);
668 if (c->version < 15) {
669 pa_context_fail(c, PA_ERR_PROTOCOL);
670 goto finish;
673 pl = pa_proplist_new();
675 if (pa_tagstruct_getu32(t, &channel) < 0 ||
676 pa_tagstruct_gets(t, &event) < 0 ||
677 pa_tagstruct_get_proplist(t, pl) < 0 ||
678 !pa_tagstruct_eof(t) || !event) {
679 pa_context_fail(c, PA_ERR_PROTOCOL);
680 goto finish;
683 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
684 goto finish;
686 if (s->state != PA_STREAM_READY)
687 goto finish;
689 if (s->event_callback)
690 s->event_callback(s, event, pl, s->event_userdata);
692 finish:
693 pa_context_unref(c);
695 if (pl)
696 pa_proplist_free(pl);
699 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
700 pa_stream *s;
701 pa_context *c = userdata;
702 uint32_t bytes, channel;
704 pa_assert(pd);
705 pa_assert(command == PA_COMMAND_REQUEST);
706 pa_assert(t);
707 pa_assert(c);
708 pa_assert(PA_REFCNT_VALUE(c) >= 1);
710 pa_context_ref(c);
712 if (pa_tagstruct_getu32(t, &channel) < 0 ||
713 pa_tagstruct_getu32(t, &bytes) < 0 ||
714 !pa_tagstruct_eof(t)) {
715 pa_context_fail(c, PA_ERR_PROTOCOL);
716 goto finish;
719 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
720 goto finish;
722 if (s->state != PA_STREAM_READY)
723 goto finish;
725 s->requested_bytes += bytes;
727 if (s->requested_bytes > 0 && s->write_callback)
728 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
730 finish:
731 pa_context_unref(c);
734 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
735 pa_stream *s;
736 pa_context *c = userdata;
737 uint32_t channel;
739 pa_assert(pd);
740 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
741 pa_assert(t);
742 pa_assert(c);
743 pa_assert(PA_REFCNT_VALUE(c) >= 1);
745 pa_context_ref(c);
747 if (pa_tagstruct_getu32(t, &channel) < 0 ||
748 !pa_tagstruct_eof(t)) {
749 pa_context_fail(c, PA_ERR_PROTOCOL);
750 goto finish;
753 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
754 goto finish;
756 if (s->state != PA_STREAM_READY)
757 goto finish;
759 if (s->buffer_attr.prebuf > 0)
760 check_smoother_status(s, TRUE, FALSE, TRUE);
762 request_auto_timing_update(s, TRUE);
764 if (command == PA_COMMAND_OVERFLOW) {
765 if (s->overflow_callback)
766 s->overflow_callback(s, s->overflow_userdata);
767 } else if (command == PA_COMMAND_UNDERFLOW) {
768 if (s->underflow_callback)
769 s->underflow_callback(s, s->underflow_userdata);
772 finish:
773 pa_context_unref(c);
776 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
777 pa_assert(s);
778 pa_assert(PA_REFCNT_VALUE(s) >= 1);
780 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
782 if (s->state != PA_STREAM_READY)
783 return;
785 if (w) {
786 s->write_index_not_before = s->context->ctag;
788 if (s->timing_info_valid)
789 s->timing_info.write_index_corrupt = TRUE;
791 /* pa_log("write_index invalidated"); */
794 if (r) {
795 s->read_index_not_before = s->context->ctag;
797 if (s->timing_info_valid)
798 s->timing_info.read_index_corrupt = TRUE;
800 /* pa_log("read_index invalidated"); */
803 request_auto_timing_update(s, TRUE);
806 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
807 pa_stream *s = userdata;
809 pa_assert(s);
810 pa_assert(PA_REFCNT_VALUE(s) >= 1);
812 pa_stream_ref(s);
813 request_auto_timing_update(s, FALSE);
814 pa_stream_unref(s);
817 static void create_stream_complete(pa_stream *s) {
818 pa_assert(s);
819 pa_assert(PA_REFCNT_VALUE(s) >= 1);
820 pa_assert(s->state == PA_STREAM_CREATING);
822 pa_stream_set_state(s, PA_STREAM_READY);
824 if (s->requested_bytes > 0 && s->write_callback)
825 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
827 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
828 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
829 pa_assert(!s->auto_timing_update_event);
830 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
832 request_auto_timing_update(s, TRUE);
835 check_smoother_status(s, TRUE, FALSE, FALSE);
838 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
839 pa_assert(s);
840 pa_assert(attr);
841 pa_assert(ss);
843 if (s->context->version >= 13)
844 return;
846 /* Version older than 0.9.10 didn't do server side buffer_attr
847 * selection, hence we have to fake it on the client side. */
849 /* We choose fairly conservative values here, to not confuse
850 * old clients with extremely large playback buffers */
852 if (attr->maxlength == (uint32_t) -1)
853 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
855 if (attr->tlength == (uint32_t) -1)
856 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
858 if (attr->minreq == (uint32_t) -1)
859 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
861 if (attr->prebuf == (uint32_t) -1)
862 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
864 if (attr->fragsize == (uint32_t) -1)
865 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
868 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
869 pa_stream *s = userdata;
870 uint32_t requested_bytes;
872 pa_assert(pd);
873 pa_assert(s);
874 pa_assert(PA_REFCNT_VALUE(s) >= 1);
875 pa_assert(s->state == PA_STREAM_CREATING);
877 pa_stream_ref(s);
879 if (command != PA_COMMAND_REPLY) {
880 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
881 goto finish;
883 pa_stream_set_state(s, PA_STREAM_FAILED);
884 goto finish;
887 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
888 s->channel == PA_INVALID_INDEX ||
889 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
890 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
891 pa_context_fail(s->context, PA_ERR_PROTOCOL);
892 goto finish;
895 s->requested_bytes = (int64_t) requested_bytes;
897 if (s->context->version >= 9) {
898 if (s->direction == PA_STREAM_PLAYBACK) {
899 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
900 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
901 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
902 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
903 pa_context_fail(s->context, PA_ERR_PROTOCOL);
904 goto finish;
906 } else if (s->direction == PA_STREAM_RECORD) {
907 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
909 pa_context_fail(s->context, PA_ERR_PROTOCOL);
910 goto finish;
915 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
916 pa_sample_spec ss;
917 pa_channel_map cm;
918 const char *dn = NULL;
919 pa_bool_t suspended;
921 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
922 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
923 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
924 pa_tagstruct_gets(t, &dn) < 0 ||
925 pa_tagstruct_get_boolean(t, &suspended) < 0) {
926 pa_context_fail(s->context, PA_ERR_PROTOCOL);
927 goto finish;
930 if (!dn || s->device_index == PA_INVALID_INDEX ||
931 ss.channels != cm.channels ||
932 !pa_channel_map_valid(&cm) ||
933 !pa_sample_spec_valid(&ss) ||
934 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
935 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
936 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
937 pa_context_fail(s->context, PA_ERR_PROTOCOL);
938 goto finish;
941 pa_xfree(s->device_name);
942 s->device_name = pa_xstrdup(dn);
943 s->suspended = suspended;
945 s->channel_map = cm;
946 s->sample_spec = ss;
949 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
950 pa_usec_t usec;
952 if (pa_tagstruct_get_usec(t, &usec) < 0) {
953 pa_context_fail(s->context, PA_ERR_PROTOCOL);
954 goto finish;
957 if (s->direction == PA_STREAM_RECORD)
958 s->timing_info.configured_source_usec = usec;
959 else
960 s->timing_info.configured_sink_usec = usec;
963 if (!pa_tagstruct_eof(t)) {
964 pa_context_fail(s->context, PA_ERR_PROTOCOL);
965 goto finish;
968 if (s->direction == PA_STREAM_RECORD) {
969 pa_assert(!s->record_memblockq);
971 s->record_memblockq = pa_memblockq_new(
973 s->buffer_attr.maxlength,
975 pa_frame_size(&s->sample_spec),
979 NULL);
982 s->channel_valid = TRUE;
983 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
985 create_stream_complete(s);
987 finish:
988 pa_stream_unref(s);
991 static int create_stream(
992 pa_stream_direction_t direction,
993 pa_stream *s,
994 const char *dev,
995 const pa_buffer_attr *attr,
996 pa_stream_flags_t flags,
997 const pa_cvolume *volume,
998 pa_stream *sync_stream) {
1000 pa_tagstruct *t;
1001 uint32_t tag;
1002 pa_bool_t volume_set = FALSE;
1004 pa_assert(s);
1005 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1008 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1009 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1010 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1011 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1012 PA_STREAM_INTERPOLATE_TIMING|
1013 PA_STREAM_NOT_MONOTONIC|
1014 PA_STREAM_AUTO_TIMING_UPDATE|
1015 PA_STREAM_NO_REMAP_CHANNELS|
1016 PA_STREAM_NO_REMIX_CHANNELS|
1017 PA_STREAM_FIX_FORMAT|
1018 PA_STREAM_FIX_RATE|
1019 PA_STREAM_FIX_CHANNELS|
1020 PA_STREAM_DONT_MOVE|
1021 PA_STREAM_VARIABLE_RATE|
1022 PA_STREAM_PEAK_DETECT|
1023 PA_STREAM_START_MUTED|
1024 PA_STREAM_ADJUST_LATENCY|
1025 PA_STREAM_EARLY_REQUESTS|
1026 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1027 PA_STREAM_START_UNMUTED|
1028 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1030 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1031 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1032 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1033 /* Althought some of the other flags are not supported on older
1034 * version, we don't check for them here, because it doesn't hurt
1035 * when they are passed but actually not supported. This makes
1036 * client development easier */
1038 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1039 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1040 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1041 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1042 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1044 pa_stream_ref(s);
1046 s->direction = direction;
1047 s->flags = flags;
1048 s->corked = !!(flags & PA_STREAM_START_CORKED);
1050 if (sync_stream)
1051 s->syncid = sync_stream->syncid;
1053 if (attr)
1054 s->buffer_attr = *attr;
1055 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1057 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1058 pa_usec_t x;
1060 x = pa_rtclock_now();
1062 pa_assert(!s->smoother);
1063 s->smoother = pa_smoother_new(
1064 SMOOTHER_ADJUST_TIME,
1065 SMOOTHER_HISTORY_TIME,
1066 !(flags & PA_STREAM_NOT_MONOTONIC),
1067 TRUE,
1068 SMOOTHER_MIN_HISTORY,
1070 TRUE);
1073 if (!dev)
1074 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1076 t = pa_tagstruct_command(
1077 s->context,
1078 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1079 &tag);
1081 if (s->context->version < 13)
1082 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1084 pa_tagstruct_put(
1086 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1087 PA_TAG_CHANNEL_MAP, &s->channel_map,
1088 PA_TAG_U32, PA_INVALID_INDEX,
1089 PA_TAG_STRING, dev,
1090 PA_TAG_U32, s->buffer_attr.maxlength,
1091 PA_TAG_BOOLEAN, s->corked,
1092 PA_TAG_INVALID);
1094 if (s->direction == PA_STREAM_PLAYBACK) {
1095 pa_cvolume cv;
1097 pa_tagstruct_put(
1099 PA_TAG_U32, s->buffer_attr.tlength,
1100 PA_TAG_U32, s->buffer_attr.prebuf,
1101 PA_TAG_U32, s->buffer_attr.minreq,
1102 PA_TAG_U32, s->syncid,
1103 PA_TAG_INVALID);
1105 volume_set = !!volume;
1107 if (!volume)
1108 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1110 pa_tagstruct_put_cvolume(t, volume);
1111 } else
1112 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1114 if (s->context->version >= 12) {
1115 pa_tagstruct_put(
1117 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1118 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1119 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1120 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1121 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1122 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1123 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1124 PA_TAG_INVALID);
1127 if (s->context->version >= 13) {
1129 if (s->direction == PA_STREAM_PLAYBACK)
1130 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1131 else
1132 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1134 pa_tagstruct_put(
1136 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1137 PA_TAG_PROPLIST, s->proplist,
1138 PA_TAG_INVALID);
1140 if (s->direction == PA_STREAM_RECORD)
1141 pa_tagstruct_putu32(t, s->direct_on_input);
1144 if (s->context->version >= 14) {
1146 if (s->direction == PA_STREAM_PLAYBACK)
1147 pa_tagstruct_put_boolean(t, volume_set);
1149 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1152 if (s->context->version >= 15) {
1154 if (s->direction == PA_STREAM_PLAYBACK)
1155 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1157 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1158 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1161 pa_pstream_send_tagstruct(s->context->pstream, t);
1162 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1164 pa_stream_set_state(s, PA_STREAM_CREATING);
1166 pa_stream_unref(s);
1167 return 0;
1170 int pa_stream_connect_playback(
1171 pa_stream *s,
1172 const char *dev,
1173 const pa_buffer_attr *attr,
1174 pa_stream_flags_t flags,
1175 const pa_cvolume *volume,
1176 pa_stream *sync_stream) {
1178 pa_assert(s);
1179 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1181 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1184 int pa_stream_connect_record(
1185 pa_stream *s,
1186 const char *dev,
1187 const pa_buffer_attr *attr,
1188 pa_stream_flags_t flags) {
1190 pa_assert(s);
1191 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1193 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1196 int pa_stream_begin_write(
1197 pa_stream *s,
1198 void **data,
1199 size_t *nbytes) {
1201 pa_assert(s);
1202 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1204 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1205 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1206 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1207 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1208 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1210 if (*nbytes != (size_t) -1) {
1211 size_t m, fs;
1213 m = pa_mempool_block_size_max(s->context->mempool);
1214 fs = pa_frame_size(&s->sample_spec);
1216 m = (m / fs) * fs;
1217 if (*nbytes > m)
1218 *nbytes = m;
1221 if (!s->write_memblock) {
1222 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1223 s->write_data = pa_memblock_acquire(s->write_memblock);
1226 *data = s->write_data;
1227 *nbytes = pa_memblock_get_length(s->write_memblock);
1229 return 0;
1232 int pa_stream_cancel_write(
1233 pa_stream *s) {
1235 pa_assert(s);
1236 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1238 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1239 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1240 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1241 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1243 pa_assert(s->write_data);
1245 pa_memblock_release(s->write_memblock);
1246 pa_memblock_unref(s->write_memblock);
1247 s->write_memblock = NULL;
1248 s->write_data = NULL;
1250 return 0;
1253 int pa_stream_write(
1254 pa_stream *s,
1255 const void *data,
1256 size_t length,
1257 pa_free_cb_t free_cb,
1258 int64_t offset,
1259 pa_seek_mode_t seek) {
1261 pa_assert(s);
1262 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1263 pa_assert(data);
1265 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1266 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1267 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1268 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1269 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1270 PA_CHECK_VALIDITY(s->context,
1271 !s->write_memblock ||
1272 ((data >= s->write_data) &&
1273 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1274 PA_ERR_INVALID);
1275 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1277 if (s->write_memblock) {
1278 pa_memchunk chunk;
1280 /* pa_stream_write_begin() was called before */
1282 pa_memblock_release(s->write_memblock);
1284 chunk.memblock = s->write_memblock;
1285 chunk.index = (const char *) data - (const char *) s->write_data;
1286 chunk.length = length;
1288 s->write_memblock = NULL;
1289 s->write_data = NULL;
1291 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1292 pa_memblock_unref(chunk.memblock);
1294 } else {
1295 pa_seek_mode_t t_seek = seek;
1296 int64_t t_offset = offset;
1297 size_t t_length = length;
1298 const void *t_data = data;
1300 /* pa_stream_write_begin() was not called before */
1302 while (t_length > 0) {
1303 pa_memchunk chunk;
1305 chunk.index = 0;
1307 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1308 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1309 chunk.length = t_length;
1310 } else {
1311 void *d;
1313 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1314 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1316 d = pa_memblock_acquire(chunk.memblock);
1317 memcpy(d, t_data, chunk.length);
1318 pa_memblock_release(chunk.memblock);
1321 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1323 t_offset = 0;
1324 t_seek = PA_SEEK_RELATIVE;
1326 t_data = (const uint8_t*) t_data + chunk.length;
1327 t_length -= chunk.length;
1329 pa_memblock_unref(chunk.memblock);
1332 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1333 free_cb((void*) data);
1336 /* This is obviously wrong since we ignore the seeking index . But
1337 * that's OK, the server side applies the same error */
1338 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1340 if (s->direction == PA_STREAM_PLAYBACK) {
1342 /* Update latency request correction */
1343 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1345 if (seek == PA_SEEK_ABSOLUTE) {
1346 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1347 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1348 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1349 } else if (seek == PA_SEEK_RELATIVE) {
1350 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1351 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1352 } else
1353 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1356 /* Update the write index in the already available latency data */
1357 if (s->timing_info_valid) {
1359 if (seek == PA_SEEK_ABSOLUTE) {
1360 s->timing_info.write_index_corrupt = FALSE;
1361 s->timing_info.write_index = offset + (int64_t) length;
1362 } else if (seek == PA_SEEK_RELATIVE) {
1363 if (!s->timing_info.write_index_corrupt)
1364 s->timing_info.write_index += offset + (int64_t) length;
1365 } else
1366 s->timing_info.write_index_corrupt = TRUE;
1369 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1370 request_auto_timing_update(s, TRUE);
1373 return 0;
1376 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1377 pa_assert(s);
1378 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1379 pa_assert(data);
1380 pa_assert(length);
1382 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1383 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1384 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1386 if (!s->peek_memchunk.memblock) {
1388 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1389 *data = NULL;
1390 *length = 0;
1391 return 0;
1394 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1397 pa_assert(s->peek_data);
1398 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1399 *length = s->peek_memchunk.length;
1400 return 0;
1403 int pa_stream_drop(pa_stream *s) {
1404 pa_assert(s);
1405 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1407 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1408 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1409 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1410 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1412 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1414 /* Fix the simulated local read index */
1415 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1416 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1418 pa_assert(s->peek_data);
1419 pa_memblock_release(s->peek_memchunk.memblock);
1420 pa_memblock_unref(s->peek_memchunk.memblock);
1421 pa_memchunk_reset(&s->peek_memchunk);
1423 return 0;
1426 size_t pa_stream_writable_size(pa_stream *s) {
1427 pa_assert(s);
1428 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1430 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1431 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1432 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1434 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1437 size_t pa_stream_readable_size(pa_stream *s) {
1438 pa_assert(s);
1439 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1441 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1442 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1443 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1445 return pa_memblockq_get_length(s->record_memblockq);
1448 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1449 pa_operation *o;
1450 pa_tagstruct *t;
1451 uint32_t tag;
1453 pa_assert(s);
1454 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1456 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1457 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1458 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1460 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1462 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1463 pa_tagstruct_putu32(t, s->channel);
1464 pa_pstream_send_tagstruct(s->context->pstream, t);
1465 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1467 return o;
1470 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1471 pa_usec_t usec;
1473 pa_assert(s);
1474 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1475 pa_assert(s->state == PA_STREAM_READY);
1476 pa_assert(s->direction != PA_STREAM_UPLOAD);
1477 pa_assert(s->timing_info_valid);
1478 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1479 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1481 if (s->direction == PA_STREAM_PLAYBACK) {
1482 /* The last byte that was written into the output device
1483 * had this time value associated */
1484 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1486 if (!s->corked && !s->suspended) {
1488 if (!ignore_transport)
1489 /* Because the latency info took a little time to come
1490 * to us, we assume that the real output time is actually
1491 * a little ahead */
1492 usec += s->timing_info.transport_usec;
1494 /* However, the output device usually maintains a buffer
1495 too, hence the real sample currently played is a little
1496 back */
1497 if (s->timing_info.sink_usec >= usec)
1498 usec = 0;
1499 else
1500 usec -= s->timing_info.sink_usec;
1503 } else {
1504 pa_assert(s->direction == PA_STREAM_RECORD);
1506 /* The last byte written into the server side queue had
1507 * this time value associated */
1508 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1510 if (!s->corked && !s->suspended) {
1512 if (!ignore_transport)
1513 /* Add transport latency */
1514 usec += s->timing_info.transport_usec;
1516 /* Add latency of data in device buffer */
1517 usec += s->timing_info.source_usec;
1519 /* If this is a monitor source, we need to correct the
1520 * time by the playback device buffer */
1521 if (s->timing_info.sink_usec >= usec)
1522 usec = 0;
1523 else
1524 usec -= s->timing_info.sink_usec;
1528 return usec;
1531 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1532 pa_operation *o = userdata;
1533 struct timeval local, remote, now;
1534 pa_timing_info *i;
1535 pa_bool_t playing = FALSE;
1536 uint64_t underrun_for = 0, playing_for = 0;
1538 pa_assert(pd);
1539 pa_assert(o);
1540 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1542 if (!o->context || !o->stream)
1543 goto finish;
1545 i = &o->stream->timing_info;
1547 o->stream->timing_info_valid = FALSE;
1548 i->write_index_corrupt = TRUE;
1549 i->read_index_corrupt = TRUE;
1551 if (command != PA_COMMAND_REPLY) {
1552 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1553 goto finish;
1555 } else {
1557 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1558 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1559 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1560 pa_tagstruct_get_timeval(t, &local) < 0 ||
1561 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1562 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1563 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1565 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1566 goto finish;
1569 if (o->context->version >= 13 &&
1570 o->stream->direction == PA_STREAM_PLAYBACK)
1571 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1572 pa_tagstruct_getu64(t, &playing_for) < 0) {
1574 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1575 goto finish;
1579 if (!pa_tagstruct_eof(t)) {
1580 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1581 goto finish;
1583 o->stream->timing_info_valid = TRUE;
1584 i->write_index_corrupt = FALSE;
1585 i->read_index_corrupt = FALSE;
1587 i->playing = (int) playing;
1588 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1590 pa_gettimeofday(&now);
1592 /* Calculcate timestamps */
1593 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1594 /* local and remote seem to have synchronized clocks */
1596 if (o->stream->direction == PA_STREAM_PLAYBACK)
1597 i->transport_usec = pa_timeval_diff(&remote, &local);
1598 else
1599 i->transport_usec = pa_timeval_diff(&now, &remote);
1601 i->synchronized_clocks = TRUE;
1602 i->timestamp = remote;
1603 } else {
1604 /* clocks are not synchronized, let's estimate latency then */
1605 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1606 i->synchronized_clocks = FALSE;
1607 i->timestamp = local;
1608 pa_timeval_add(&i->timestamp, i->transport_usec);
1611 /* Invalidate read and write indexes if necessary */
1612 if (tag < o->stream->read_index_not_before)
1613 i->read_index_corrupt = TRUE;
1615 if (tag < o->stream->write_index_not_before)
1616 i->write_index_corrupt = TRUE;
1618 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1619 /* Write index correction */
1621 int n, j;
1622 uint32_t ctag = tag;
1624 /* Go through the saved correction values and add up the
1625 * total correction.*/
1626 for (n = 0, j = o->stream->current_write_index_correction+1;
1627 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1628 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1630 /* Step over invalid data or out-of-date data */
1631 if (!o->stream->write_index_corrections[j].valid ||
1632 o->stream->write_index_corrections[j].tag < ctag)
1633 continue;
1635 /* Make sure that everything is in order */
1636 ctag = o->stream->write_index_corrections[j].tag+1;
1638 /* Now fix the write index */
1639 if (o->stream->write_index_corrections[j].corrupt) {
1640 /* A corrupting seek was made */
1641 i->write_index_corrupt = TRUE;
1642 } else if (o->stream->write_index_corrections[j].absolute) {
1643 /* An absolute seek was made */
1644 i->write_index = o->stream->write_index_corrections[j].value;
1645 i->write_index_corrupt = FALSE;
1646 } else if (!i->write_index_corrupt) {
1647 /* A relative seek was made */
1648 i->write_index += o->stream->write_index_corrections[j].value;
1652 /* Clear old correction entries */
1653 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1654 if (!o->stream->write_index_corrections[n].valid)
1655 continue;
1657 if (o->stream->write_index_corrections[n].tag <= tag)
1658 o->stream->write_index_corrections[n].valid = FALSE;
1662 if (o->stream->direction == PA_STREAM_RECORD) {
1663 /* Read index correction */
1665 if (!i->read_index_corrupt)
1666 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1669 /* Update smoother */
1670 if (o->stream->smoother) {
1671 pa_usec_t u, x;
1673 u = x = pa_rtclock_now() - i->transport_usec;
1675 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1676 pa_usec_t su;
1678 /* If we weren't playing then it will take some time
1679 * until the audio will actually come out through the
1680 * speakers. Since we follow that timing here, we need
1681 * to try to fix this up */
1683 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1685 if (su < i->sink_usec)
1686 x += i->sink_usec - su;
1689 if (!i->playing)
1690 pa_smoother_pause(o->stream->smoother, x);
1692 /* Update the smoother */
1693 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1694 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1695 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1697 if (i->playing)
1698 pa_smoother_resume(o->stream->smoother, x, TRUE);
1702 o->stream->auto_timing_update_requested = FALSE;
1704 if (o->stream->latency_update_callback)
1705 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1707 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1708 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1709 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1712 finish:
1714 pa_operation_done(o);
1715 pa_operation_unref(o);
1718 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1719 uint32_t tag;
1720 pa_operation *o;
1721 pa_tagstruct *t;
1722 struct timeval now;
1723 int cidx = 0;
1725 pa_assert(s);
1726 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1728 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1729 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1730 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1732 if (s->direction == PA_STREAM_PLAYBACK) {
1733 /* Find a place to store the write_index correction data for this entry */
1734 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1736 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1737 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1739 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1741 t = pa_tagstruct_command(
1742 s->context,
1743 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1744 &tag);
1745 pa_tagstruct_putu32(t, s->channel);
1746 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1748 pa_pstream_send_tagstruct(s->context->pstream, t);
1749 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1751 if (s->direction == PA_STREAM_PLAYBACK) {
1752 /* Fill in initial correction data */
1754 s->current_write_index_correction = cidx;
1756 s->write_index_corrections[cidx].valid = TRUE;
1757 s->write_index_corrections[cidx].absolute = FALSE;
1758 s->write_index_corrections[cidx].corrupt = FALSE;
1759 s->write_index_corrections[cidx].tag = tag;
1760 s->write_index_corrections[cidx].value = 0;
1763 return o;
1766 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1767 pa_stream *s = userdata;
1769 pa_assert(pd);
1770 pa_assert(s);
1771 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1773 pa_stream_ref(s);
1775 if (command != PA_COMMAND_REPLY) {
1776 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1777 goto finish;
1779 pa_stream_set_state(s, PA_STREAM_FAILED);
1780 goto finish;
1781 } else if (!pa_tagstruct_eof(t)) {
1782 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1783 goto finish;
1786 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1788 finish:
1789 pa_stream_unref(s);
1792 int pa_stream_disconnect(pa_stream *s) {
1793 pa_tagstruct *t;
1794 uint32_t tag;
1796 pa_assert(s);
1797 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1799 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1800 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1801 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1803 pa_stream_ref(s);
1805 t = pa_tagstruct_command(
1806 s->context,
1807 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1808 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1809 &tag);
1810 pa_tagstruct_putu32(t, s->channel);
1811 pa_pstream_send_tagstruct(s->context->pstream, t);
1812 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1814 pa_stream_unref(s);
1815 return 0;
1818 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1819 pa_assert(s);
1820 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1822 if (pa_detect_fork())
1823 return;
1825 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1826 return;
1828 s->read_callback = cb;
1829 s->read_userdata = userdata;
1832 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1833 pa_assert(s);
1834 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1836 if (pa_detect_fork())
1837 return;
1839 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1840 return;
1842 s->write_callback = cb;
1843 s->write_userdata = userdata;
1846 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1847 pa_assert(s);
1848 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1850 if (pa_detect_fork())
1851 return;
1853 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1854 return;
1856 s->state_callback = cb;
1857 s->state_userdata = userdata;
1860 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1861 pa_assert(s);
1862 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1864 if (pa_detect_fork())
1865 return;
1867 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1868 return;
1870 s->overflow_callback = cb;
1871 s->overflow_userdata = userdata;
1874 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1875 pa_assert(s);
1876 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1878 if (pa_detect_fork())
1879 return;
1881 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1882 return;
1884 s->underflow_callback = cb;
1885 s->underflow_userdata = userdata;
1888 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1889 pa_assert(s);
1890 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1892 if (pa_detect_fork())
1893 return;
1895 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1896 return;
1898 s->latency_update_callback = cb;
1899 s->latency_update_userdata = userdata;
1902 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1903 pa_assert(s);
1904 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1906 if (pa_detect_fork())
1907 return;
1909 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1910 return;
1912 s->moved_callback = cb;
1913 s->moved_userdata = userdata;
1916 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1917 pa_assert(s);
1918 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1920 if (pa_detect_fork())
1921 return;
1923 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1924 return;
1926 s->suspended_callback = cb;
1927 s->suspended_userdata = userdata;
1930 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1931 pa_assert(s);
1932 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1934 if (pa_detect_fork())
1935 return;
1937 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1938 return;
1940 s->started_callback = cb;
1941 s->started_userdata = userdata;
1944 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1945 pa_assert(s);
1946 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1948 if (pa_detect_fork())
1949 return;
1951 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1952 return;
1954 s->event_callback = cb;
1955 s->event_userdata = userdata;
1958 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1959 pa_assert(s);
1960 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1962 if (pa_detect_fork())
1963 return;
1965 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1966 return;
1968 s->buffer_attr_callback = cb;
1969 s->buffer_attr_userdata = userdata;
1972 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1973 pa_operation *o = userdata;
1974 int success = 1;
1976 pa_assert(pd);
1977 pa_assert(o);
1978 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1980 if (!o->context)
1981 goto finish;
1983 if (command != PA_COMMAND_REPLY) {
1984 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1985 goto finish;
1987 success = 0;
1988 } else if (!pa_tagstruct_eof(t)) {
1989 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1990 goto finish;
1993 if (o->callback) {
1994 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1995 cb(o->stream, success, o->userdata);
1998 finish:
1999 pa_operation_done(o);
2000 pa_operation_unref(o);
2003 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2004 pa_operation *o;
2005 pa_tagstruct *t;
2006 uint32_t tag;
2008 pa_assert(s);
2009 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2011 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2012 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2013 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2015 s->corked = b;
2017 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2019 t = pa_tagstruct_command(
2020 s->context,
2021 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2022 &tag);
2023 pa_tagstruct_putu32(t, s->channel);
2024 pa_tagstruct_put_boolean(t, !!b);
2025 pa_pstream_send_tagstruct(s->context->pstream, t);
2026 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2028 check_smoother_status(s, FALSE, FALSE, FALSE);
2030 /* This might cause the indexes to hang/start again, hence
2031 * let's request a timing update */
2032 request_auto_timing_update(s, TRUE);
2034 return o;
2037 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2038 pa_tagstruct *t;
2039 pa_operation *o;
2040 uint32_t tag;
2042 pa_assert(s);
2043 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2045 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2046 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2048 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2050 t = pa_tagstruct_command(s->context, command, &tag);
2051 pa_tagstruct_putu32(t, s->channel);
2052 pa_pstream_send_tagstruct(s->context->pstream, t);
2053 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2055 return o;
2058 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2059 pa_operation *o;
2061 pa_assert(s);
2062 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2064 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2065 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2066 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2068 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2069 return NULL;
2071 if (s->direction == PA_STREAM_PLAYBACK) {
2073 if (s->write_index_corrections[s->current_write_index_correction].valid)
2074 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2076 if (s->buffer_attr.prebuf > 0)
2077 check_smoother_status(s, FALSE, FALSE, TRUE);
2079 /* This will change the write index, but leave the
2080 * read index untouched. */
2081 invalidate_indexes(s, FALSE, TRUE);
2083 } else
2084 /* For record streams this has no influence on the write
2085 * index, but the read index might jump. */
2086 invalidate_indexes(s, TRUE, FALSE);
2088 return o;
2091 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2092 pa_operation *o;
2094 pa_assert(s);
2095 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2097 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2098 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2099 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2100 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2102 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2103 return NULL;
2105 /* This might cause the read index to hang again, hence
2106 * let's request a timing update */
2107 request_auto_timing_update(s, TRUE);
2109 return o;
2112 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2113 pa_operation *o;
2115 pa_assert(s);
2116 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2118 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2119 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2120 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2121 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2123 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2124 return NULL;
2126 /* This might cause the read index to start moving again, hence
2127 * let's request a timing update */
2128 request_auto_timing_update(s, TRUE);
2130 return o;
2133 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2134 pa_operation *o;
2136 pa_assert(s);
2137 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2138 pa_assert(name);
2140 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2141 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2142 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2144 if (s->context->version >= 13) {
2145 pa_proplist *p = pa_proplist_new();
2147 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2148 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2149 pa_proplist_free(p);
2150 } else {
2151 pa_tagstruct *t;
2152 uint32_t tag;
2154 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2155 t = pa_tagstruct_command(
2156 s->context,
2157 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2158 &tag);
2159 pa_tagstruct_putu32(t, s->channel);
2160 pa_tagstruct_puts(t, name);
2161 pa_pstream_send_tagstruct(s->context->pstream, t);
2162 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2165 return o;
2168 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2169 pa_usec_t usec;
2171 pa_assert(s);
2172 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2174 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2175 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2176 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2177 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2178 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2179 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2181 if (s->smoother)
2182 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2183 else
2184 usec = calc_time(s, FALSE);
2186 /* Make sure the time runs monotonically */
2187 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2188 if (usec < s->previous_time)
2189 usec = s->previous_time;
2190 else
2191 s->previous_time = usec;
2194 if (r_usec)
2195 *r_usec = usec;
2197 return 0;
2200 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2201 pa_assert(s);
2202 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2204 if (negative)
2205 *negative = 0;
2207 if (a >= b)
2208 return a-b;
2209 else {
2210 if (negative && s->direction == PA_STREAM_RECORD) {
2211 *negative = 1;
2212 return b-a;
2213 } else
2214 return 0;
2218 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2219 pa_usec_t t, c;
2220 int r;
2221 int64_t cindex;
2223 pa_assert(s);
2224 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2225 pa_assert(r_usec);
2227 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2228 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2229 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2230 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2231 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2232 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2234 if ((r = pa_stream_get_time(s, &t)) < 0)
2235 return r;
2237 if (s->direction == PA_STREAM_PLAYBACK)
2238 cindex = s->timing_info.write_index;
2239 else
2240 cindex = s->timing_info.read_index;
2242 if (cindex < 0)
2243 cindex = 0;
2245 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2247 if (s->direction == PA_STREAM_PLAYBACK)
2248 *r_usec = time_counter_diff(s, c, t, negative);
2249 else
2250 *r_usec = time_counter_diff(s, t, c, negative);
2252 return 0;
2255 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2256 pa_assert(s);
2257 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2259 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2260 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2261 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2262 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2264 return &s->timing_info;
2267 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2268 pa_assert(s);
2269 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2271 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2273 return &s->sample_spec;
2276 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2277 pa_assert(s);
2278 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2280 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2282 return &s->channel_map;
2285 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2286 pa_assert(s);
2287 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2289 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2290 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2291 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2293 return &s->buffer_attr;
2296 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2297 pa_operation *o = userdata;
2298 int success = 1;
2300 pa_assert(pd);
2301 pa_assert(o);
2302 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2304 if (!o->context)
2305 goto finish;
2307 if (command != PA_COMMAND_REPLY) {
2308 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2309 goto finish;
2311 success = 0;
2312 } else {
2313 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2314 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2315 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2316 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2317 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2318 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2319 goto finish;
2321 } else if (o->stream->direction == PA_STREAM_RECORD) {
2322 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2323 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2324 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2325 goto finish;
2329 if (o->stream->context->version >= 13) {
2330 pa_usec_t usec;
2332 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2333 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2334 goto finish;
2337 if (o->stream->direction == PA_STREAM_RECORD)
2338 o->stream->timing_info.configured_source_usec = usec;
2339 else
2340 o->stream->timing_info.configured_sink_usec = usec;
2343 if (!pa_tagstruct_eof(t)) {
2344 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2345 goto finish;
2349 if (o->callback) {
2350 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2351 cb(o->stream, success, o->userdata);
2354 finish:
2355 pa_operation_done(o);
2356 pa_operation_unref(o);
2360 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2361 pa_operation *o;
2362 pa_tagstruct *t;
2363 uint32_t tag;
2365 pa_assert(s);
2366 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2367 pa_assert(attr);
2369 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2370 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2371 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2372 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2374 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2376 t = pa_tagstruct_command(
2377 s->context,
2378 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2379 &tag);
2380 pa_tagstruct_putu32(t, s->channel);
2382 pa_tagstruct_putu32(t, attr->maxlength);
2384 if (s->direction == PA_STREAM_PLAYBACK)
2385 pa_tagstruct_put(
2387 PA_TAG_U32, attr->tlength,
2388 PA_TAG_U32, attr->prebuf,
2389 PA_TAG_U32, attr->minreq,
2390 PA_TAG_INVALID);
2391 else
2392 pa_tagstruct_putu32(t, attr->fragsize);
2394 if (s->context->version >= 13)
2395 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2397 if (s->context->version >= 14)
2398 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2400 pa_pstream_send_tagstruct(s->context->pstream, t);
2401 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2403 /* This might cause changes in the read/write indexex, hence let's
2404 * request a timing update */
2405 request_auto_timing_update(s, TRUE);
2407 return o;
2410 uint32_t pa_stream_get_device_index(pa_stream *s) {
2411 pa_assert(s);
2412 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2414 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2415 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2416 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2417 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2418 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2420 return s->device_index;
2423 const char *pa_stream_get_device_name(pa_stream *s) {
2424 pa_assert(s);
2425 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2427 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2428 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2429 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2430 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2431 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2433 return s->device_name;
2436 int pa_stream_is_suspended(pa_stream *s) {
2437 pa_assert(s);
2438 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2440 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2441 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2442 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2443 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2445 return s->suspended;
2448 int pa_stream_is_corked(pa_stream *s) {
2449 pa_assert(s);
2450 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2452 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2453 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2454 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2456 return s->corked;
2459 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2460 pa_operation *o = userdata;
2461 int success = 1;
2463 pa_assert(pd);
2464 pa_assert(o);
2465 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2467 if (!o->context)
2468 goto finish;
2470 if (command != PA_COMMAND_REPLY) {
2471 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2472 goto finish;
2474 success = 0;
2475 } else {
2477 if (!pa_tagstruct_eof(t)) {
2478 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2479 goto finish;
2483 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2484 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2486 if (o->callback) {
2487 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2488 cb(o->stream, success, o->userdata);
2491 finish:
2492 pa_operation_done(o);
2493 pa_operation_unref(o);
2497 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2498 pa_operation *o;
2499 pa_tagstruct *t;
2500 uint32_t tag;
2502 pa_assert(s);
2503 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2505 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2506 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2507 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2508 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2509 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2510 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2512 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2513 o->private = PA_UINT_TO_PTR(rate);
2515 t = pa_tagstruct_command(
2516 s->context,
2517 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2518 &tag);
2519 pa_tagstruct_putu32(t, s->channel);
2520 pa_tagstruct_putu32(t, rate);
2522 pa_pstream_send_tagstruct(s->context->pstream, t);
2523 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2525 return o;
2528 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2529 pa_operation *o;
2530 pa_tagstruct *t;
2531 uint32_t tag;
2533 pa_assert(s);
2534 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2536 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2537 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2538 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2539 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2540 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2542 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2544 t = pa_tagstruct_command(
2545 s->context,
2546 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2547 &tag);
2548 pa_tagstruct_putu32(t, s->channel);
2549 pa_tagstruct_putu32(t, (uint32_t) mode);
2550 pa_tagstruct_put_proplist(t, p);
2552 pa_pstream_send_tagstruct(s->context->pstream, t);
2553 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2555 /* Please note that we don't update s->proplist here, because we
2556 * don't export that field */
2558 return o;
2561 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2562 pa_operation *o;
2563 pa_tagstruct *t;
2564 uint32_t tag;
2565 const char * const*k;
2567 pa_assert(s);
2568 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2570 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2571 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2572 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2573 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2574 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2576 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2578 t = pa_tagstruct_command(
2579 s->context,
2580 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2581 &tag);
2582 pa_tagstruct_putu32(t, s->channel);
2584 for (k = keys; *k; k++)
2585 pa_tagstruct_puts(t, *k);
2587 pa_tagstruct_puts(t, NULL);
2589 pa_pstream_send_tagstruct(s->context->pstream, t);
2590 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2592 /* Please note that we don't update s->proplist here, because we
2593 * don't export that field */
2595 return o;
2598 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2599 pa_assert(s);
2600 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2602 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2603 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2604 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2605 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2607 s->direct_on_input = sink_input_idx;
2609 return 0;
2612 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2613 pa_assert(s);
2614 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2616 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2617 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2618 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2620 return s->direct_on_input;