pstream: Fix spelling of 'receive'.
[pulseaudio-raopUDP/pulseaudio-raop-alac.git] / src / modules / module-tunnel.c
bloba667e21249ed51be6673a70bff27b9d93d8f2645
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
69 "server=<address> "
70 "sink=<remote sink name> "
71 "cookie=<filename> "
72 "format=<sample format> "
73 "channels=<number of channels> "
74 "rate=<sample rate> "
75 "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
81 "server=<address> "
82 "source=<remote source name> "
83 "cookie=<filename> "
84 "format=<sample format> "
85 "channels=<number of channels> "
86 "rate=<sample rate> "
87 "channel_map=<channel map>");
88 #endif
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(FALSE);
94 static const char* const valid_modargs[] = {
95 "server",
96 "cookie",
97 "format",
98 "channels",
99 "rate",
100 #ifdef TUNNEL_SINK
101 "sink_name",
102 "sink_properties",
103 "sink",
104 #else
105 "source_name",
106 "source_properties",
107 "source",
108 #endif
109 "channel_map",
110 NULL,
113 #define DEFAULT_TIMEOUT 5
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
119 #ifdef TUNNEL_SINK
121 enum {
122 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123 SINK_MESSAGE_REMOTE_SUSPEND,
124 SINK_MESSAGE_UPDATE_LATENCY,
125 SINK_MESSAGE_POST
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
131 #else
133 enum {
134 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135 SOURCE_MESSAGE_REMOTE_SUSPEND,
136 SOURCE_MESSAGE_UPDATE_LATENCY
139 #define DEFAULT_FRAGSIZE_MSEC 25
141 #endif
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157 [PA_COMMAND_REQUEST] = command_request,
158 [PA_COMMAND_STARTED] = command_started,
159 #endif
160 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
176 struct userdata {
177 pa_core *core;
178 pa_module *module;
180 pa_thread_mq thread_mq;
181 pa_rtpoll *rtpoll;
182 pa_thread *thread;
184 pa_socket_client *client;
185 pa_pstream *pstream;
186 pa_pdispatch *pdispatch;
188 char *server_name;
189 #ifdef TUNNEL_SINK
190 char *sink_name;
191 pa_sink *sink;
192 size_t requested_bytes;
193 #else
194 char *source_name;
195 pa_source *source;
196 pa_mcalign *mcalign;
197 #endif
199 pa_auth_cookie *auth_cookie;
201 uint32_t version;
202 uint32_t ctag;
203 uint32_t device_index;
204 uint32_t channel;
206 int64_t counter, counter_delta;
208 pa_bool_t remote_corked:1;
209 pa_bool_t remote_suspended:1;
211 pa_usec_t transport_usec; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
214 uint32_t ignore_latency_before;
216 pa_time_event *time_event;
218 pa_smoother *smoother;
220 char *device_description;
221 char *server_fqdn;
222 char *user_name;
224 uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226 uint32_t tlength;
227 uint32_t minreq;
228 uint32_t prebuf;
229 #else
230 uint32_t fragsize;
231 #endif
234 static void request_latency(struct userdata *u);
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238 pa_log_debug("Got stream or client event.");
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
243 struct userdata *u = userdata;
245 pa_assert(pd);
246 pa_assert(t);
247 pa_assert(u);
248 pa_assert(u->pdispatch == pd);
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u->module, TRUE);
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
256 struct userdata *u = userdata;
258 pa_assert(pd);
259 pa_assert(t);
260 pa_assert(u);
261 pa_assert(u->pdispatch == pd);
263 pa_log_info("Server signalled buffer overrun/underrun.");
264 request_latency(u);
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
269 struct userdata *u = userdata;
270 uint32_t channel;
271 pa_bool_t suspended;
273 pa_assert(pd);
274 pa_assert(t);
275 pa_assert(u);
276 pa_assert(u->pdispatch == pd);
278 if (pa_tagstruct_getu32(t, &channel) < 0 ||
279 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280 !pa_tagstruct_eof(t)) {
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u->module, TRUE);
284 return;
287 pa_log_debug("Server reports device suspend.");
289 #ifdef TUNNEL_SINK
290 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
295 request_latency(u);
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
300 struct userdata *u = userdata;
301 uint32_t channel, di;
302 const char *dn;
303 pa_bool_t suspended;
305 pa_assert(pd);
306 pa_assert(t);
307 pa_assert(u);
308 pa_assert(u->pdispatch == pd);
310 if (pa_tagstruct_getu32(t, &channel) < 0 ||
311 pa_tagstruct_getu32(t, &di) < 0 ||
312 pa_tagstruct_gets(t, &dn) < 0 ||
313 pa_tagstruct_get_boolean(t, &suspended) < 0) {
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u->module, TRUE);
317 return;
320 pa_log_debug("Server reports a stream move.");
322 #ifdef TUNNEL_SINK
323 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
328 request_latency(u);
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332 struct userdata *u = userdata;
333 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
334 pa_usec_t usec;
336 pa_assert(pd);
337 pa_assert(t);
338 pa_assert(u);
339 pa_assert(u->pdispatch == pd);
341 if (pa_tagstruct_getu32(t, &channel) < 0 ||
342 pa_tagstruct_getu32(t, &maxlength) < 0) {
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u->module, TRUE);
346 return;
349 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351 pa_tagstruct_get_usec(t, &usec) < 0) {
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u->module, TRUE);
355 return;
357 } else {
358 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359 pa_tagstruct_getu32(t, &prebuf) < 0 ||
360 pa_tagstruct_getu32(t, &minreq) < 0 ||
361 pa_tagstruct_get_usec(t, &usec) < 0) {
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u->module, TRUE);
365 return;
369 #ifdef TUNNEL_SINK
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
373 request_latency(u);
376 #ifdef TUNNEL_SINK
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380 struct userdata *u = userdata;
382 pa_assert(pd);
383 pa_assert(t);
384 pa_assert(u);
385 pa_assert(u->pdispatch == pd);
387 pa_log_debug("Server reports playback started.");
388 request_latency(u);
391 #endif
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
395 pa_usec_t x;
397 pa_assert(u);
399 x = pa_rtclock_now();
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
405 if (past)
406 x -= u->thread_transport_usec;
407 else
408 x += u->thread_transport_usec;
410 if (u->remote_suspended || u->remote_corked)
411 pa_smoother_pause(u->smoother, x);
412 else
413 pa_smoother_resume(u->smoother, x, TRUE);
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
418 pa_assert(u);
420 if (u->remote_corked == cork)
421 return;
423 u->remote_corked = cork;
424 check_smoother_status(u, FALSE);
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, pa_bool_t cork) {
429 pa_tagstruct *t;
430 pa_assert(u);
432 if (!u->pstream)
433 return;
435 t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441 pa_tagstruct_putu32(t, u->ctag++);
442 pa_tagstruct_putu32(t, u->channel);
443 pa_tagstruct_put_boolean(t, !!cork);
444 pa_pstream_send_tagstruct(u->pstream, t);
446 request_latency(u);
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
451 pa_assert(u);
453 if (u->remote_suspended == suspend)
454 return;
456 u->remote_suspended = suspend;
457 check_smoother_status(u, TRUE);
460 #ifdef TUNNEL_SINK
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464 pa_assert(u);
466 while (u->requested_bytes > 0) {
467 pa_memchunk memchunk;
469 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471 pa_memblock_unref(memchunk.memblock);
473 u->requested_bytes -= memchunk.length;
475 u->counter += (int64_t) memchunk.length;
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481 struct userdata *u = PA_SINK(o)->userdata;
483 switch (code) {
485 case PA_SINK_MESSAGE_SET_STATE: {
486 int r;
488 /* First, change the state, because otherwise pa_sink_render() would fail */
489 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
491 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
493 if (PA_SINK_IS_OPENED(u->sink->state))
494 send_data(u);
497 return r;
500 case PA_SINK_MESSAGE_GET_LATENCY: {
501 pa_usec_t yl, yr, *usec = data;
503 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
506 *usec = yl > yr ? yl - yr : 0;
507 return 0;
510 case SINK_MESSAGE_REQUEST:
512 pa_assert(offset > 0);
513 u->requested_bytes += (size_t) offset;
515 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516 send_data(u);
518 return 0;
521 case SINK_MESSAGE_REMOTE_SUSPEND:
523 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
524 return 0;
527 case SINK_MESSAGE_UPDATE_LATENCY: {
528 pa_usec_t y;
530 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
532 if (y > (pa_usec_t) offset)
533 y -= (pa_usec_t) offset;
534 else
535 y = 0;
537 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
539 /* We can access this freely here, since the main thread is waiting for us */
540 u->thread_transport_usec = u->transport_usec;
542 return 0;
545 case SINK_MESSAGE_POST:
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
552 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
554 u->counter_delta += (int64_t) chunk->length;
556 return 0;
559 return pa_sink_process_msg(o, code, data, offset, chunk);
562 /* Called from main context */
563 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
564 struct userdata *u;
565 pa_sink_assert_ref(s);
566 u = s->userdata;
568 switch ((pa_sink_state_t) state) {
570 case PA_SINK_SUSPENDED:
571 pa_assert(PA_SINK_IS_OPENED(s->state));
572 stream_cork(u, TRUE);
573 break;
575 case PA_SINK_IDLE:
576 case PA_SINK_RUNNING:
577 if (s->state == PA_SINK_SUSPENDED)
578 stream_cork(u, FALSE);
579 break;
581 case PA_SINK_UNLINKED:
582 case PA_SINK_INIT:
583 case PA_SINK_INVALID_STATE:
587 return 0;
590 #else
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
594 struct userdata *u = PA_SOURCE(o)->userdata;
596 switch (code) {
598 case PA_SOURCE_MESSAGE_SET_STATE: {
599 int r;
601 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
602 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
604 return r;
607 case PA_SOURCE_MESSAGE_GET_LATENCY: {
608 pa_usec_t yr, yl, *usec = data;
610 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
611 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
613 *usec = yr > yl ? yr - yl : 0;
614 return 0;
617 case SOURCE_MESSAGE_POST: {
618 pa_memchunk c;
620 pa_mcalign_push(u->mcalign, chunk);
622 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
624 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
625 pa_source_post(u->source, &c);
627 pa_memblock_unref(c.memblock);
629 u->counter += (int64_t) c.length;
632 return 0;
635 case SOURCE_MESSAGE_REMOTE_SUSPEND:
637 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
638 return 0;
640 case SOURCE_MESSAGE_UPDATE_LATENCY: {
641 pa_usec_t y;
643 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
644 y += (pa_usec_t) offset;
646 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
648 /* We can access this freely here, since the main thread is waiting for us */
649 u->thread_transport_usec = u->transport_usec;
651 return 0;
655 return pa_source_process_msg(o, code, data, offset, chunk);
658 /* Called from main context */
659 static int source_set_state(pa_source *s, pa_source_state_t state) {
660 struct userdata *u;
661 pa_source_assert_ref(s);
662 u = s->userdata;
664 switch ((pa_source_state_t) state) {
666 case PA_SOURCE_SUSPENDED:
667 pa_assert(PA_SOURCE_IS_OPENED(s->state));
668 stream_cork(u, TRUE);
669 break;
671 case PA_SOURCE_IDLE:
672 case PA_SOURCE_RUNNING:
673 if (s->state == PA_SOURCE_SUSPENDED)
674 stream_cork(u, FALSE);
675 break;
677 case PA_SOURCE_UNLINKED:
678 case PA_SOURCE_INIT:
679 case PA_SINK_INVALID_STATE:
683 return 0;
686 #endif
688 static void thread_func(void *userdata) {
689 struct userdata *u = userdata;
691 pa_assert(u);
693 pa_log_debug("Thread starting up");
695 pa_thread_mq_install(&u->thread_mq);
697 for (;;) {
698 int ret;
700 #ifdef TUNNEL_SINK
701 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
702 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
703 pa_sink_process_rewind(u->sink, 0);
704 #endif
706 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
707 goto fail;
709 if (ret == 0)
710 goto finish;
713 fail:
714 /* If this was no regular exit from the loop we have to continue
715 * processing messages until we received PA_MESSAGE_SHUTDOWN */
716 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
717 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
719 finish:
720 pa_log_debug("Thread shutting down");
723 #ifdef TUNNEL_SINK
724 /* Called from main context */
725 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
726 struct userdata *u = userdata;
727 uint32_t bytes, channel;
729 pa_assert(pd);
730 pa_assert(command == PA_COMMAND_REQUEST);
731 pa_assert(t);
732 pa_assert(u);
733 pa_assert(u->pdispatch == pd);
735 if (pa_tagstruct_getu32(t, &channel) < 0 ||
736 pa_tagstruct_getu32(t, &bytes) < 0) {
737 pa_log("Invalid protocol reply");
738 goto fail;
741 if (channel != u->channel) {
742 pa_log("Received data for invalid channel");
743 goto fail;
746 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
747 return;
749 fail:
750 pa_module_unload_request(u->module, TRUE);
753 #endif
755 /* Called from main context */
756 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 struct userdata *u = userdata;
758 pa_usec_t sink_usec, source_usec;
759 pa_bool_t playing;
760 int64_t write_index, read_index;
761 struct timeval local, remote, now;
762 pa_sample_spec *ss;
763 int64_t delay;
765 pa_assert(pd);
766 pa_assert(u);
768 if (command != PA_COMMAND_REPLY) {
769 if (command == PA_COMMAND_ERROR)
770 pa_log("Failed to get latency.");
771 else
772 pa_log("Protocol error.");
773 goto fail;
776 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
777 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
778 pa_tagstruct_get_boolean(t, &playing) < 0 ||
779 pa_tagstruct_get_timeval(t, &local) < 0 ||
780 pa_tagstruct_get_timeval(t, &remote) < 0 ||
781 pa_tagstruct_gets64(t, &write_index) < 0 ||
782 pa_tagstruct_gets64(t, &read_index) < 0) {
783 pa_log("Invalid reply.");
784 goto fail;
787 #ifdef TUNNEL_SINK
788 if (u->version >= 13) {
789 uint64_t underrun_for = 0, playing_for = 0;
791 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
792 pa_tagstruct_getu64(t, &playing_for) < 0) {
793 pa_log("Invalid reply.");
794 goto fail;
797 #endif
799 if (!pa_tagstruct_eof(t)) {
800 pa_log("Invalid reply.");
801 goto fail;
804 if (tag < u->ignore_latency_before) {
805 return;
808 pa_gettimeofday(&now);
810 /* Calculate transport usec */
811 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
812 /* local and remote seem to have synchronized clocks */
813 #ifdef TUNNEL_SINK
814 u->transport_usec = pa_timeval_diff(&remote, &local);
815 #else
816 u->transport_usec = pa_timeval_diff(&now, &remote);
817 #endif
818 } else
819 u->transport_usec = pa_timeval_diff(&now, &local)/2;
821 /* First, take the device's delay */
822 #ifdef TUNNEL_SINK
823 delay = (int64_t) sink_usec;
824 ss = &u->sink->sample_spec;
825 #else
826 delay = (int64_t) source_usec;
827 ss = &u->source->sample_spec;
828 #endif
830 /* Add the length of our server-side buffer */
831 if (write_index >= read_index)
832 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
833 else
834 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
836 /* Our measurements are already out of date, hence correct by the *
837 * transport latency */
838 #ifdef TUNNEL_SINK
839 delay -= (int64_t) u->transport_usec;
840 #else
841 delay += (int64_t) u->transport_usec;
842 #endif
844 /* Now correct by what we have have read/written since we requested the update */
845 #ifdef TUNNEL_SINK
846 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
847 #else
848 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
849 #endif
851 #ifdef TUNNEL_SINK
852 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
853 #else
854 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
855 #endif
857 return;
859 fail:
861 pa_module_unload_request(u->module, TRUE);
864 /* Called from main context */
865 static void request_latency(struct userdata *u) {
866 pa_tagstruct *t;
867 struct timeval now;
868 uint32_t tag;
869 pa_assert(u);
871 t = pa_tagstruct_new(NULL, 0);
872 #ifdef TUNNEL_SINK
873 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
874 #else
875 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
876 #endif
877 pa_tagstruct_putu32(t, tag = u->ctag++);
878 pa_tagstruct_putu32(t, u->channel);
880 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
882 pa_pstream_send_tagstruct(u->pstream, t);
883 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
885 u->ignore_latency_before = tag;
886 u->counter_delta = 0;
889 /* Called from main context */
890 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
891 struct userdata *u = userdata;
893 pa_assert(m);
894 pa_assert(e);
895 pa_assert(u);
897 request_latency(u);
899 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
902 /* Called from main context */
903 static void update_description(struct userdata *u) {
904 char *d;
905 char un[128], hn[128];
906 pa_tagstruct *t;
908 pa_assert(u);
910 if (!u->server_fqdn || !u->user_name || !u->device_description)
911 return;
913 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
915 #ifdef TUNNEL_SINK
916 pa_sink_set_description(u->sink, d);
917 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
918 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
919 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
920 #else
921 pa_source_set_description(u->source, d);
922 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
923 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
924 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
925 #endif
927 pa_xfree(d);
929 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
930 pa_get_user_name(un, sizeof(un)),
931 pa_get_host_name(hn, sizeof(hn)));
933 t = pa_tagstruct_new(NULL, 0);
934 #ifdef TUNNEL_SINK
935 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
936 #else
937 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
938 #endif
939 pa_tagstruct_putu32(t, u->ctag++);
940 pa_tagstruct_putu32(t, u->channel);
941 pa_tagstruct_puts(t, d);
942 pa_pstream_send_tagstruct(u->pstream, t);
944 pa_xfree(d);
947 /* Called from main context */
948 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
949 struct userdata *u = userdata;
950 pa_sample_spec ss;
951 pa_channel_map cm;
952 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
953 uint32_t cookie;
955 pa_assert(pd);
956 pa_assert(u);
958 if (command != PA_COMMAND_REPLY) {
959 if (command == PA_COMMAND_ERROR)
960 pa_log("Failed to get info.");
961 else
962 pa_log("Protocol error.");
963 goto fail;
966 if (pa_tagstruct_gets(t, &server_name) < 0 ||
967 pa_tagstruct_gets(t, &server_version) < 0 ||
968 pa_tagstruct_gets(t, &user_name) < 0 ||
969 pa_tagstruct_gets(t, &host_name) < 0 ||
970 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
971 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
972 pa_tagstruct_gets(t, &default_source_name) < 0 ||
973 pa_tagstruct_getu32(t, &cookie) < 0 ||
974 (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
976 pa_log("Parse failure");
977 goto fail;
980 if (!pa_tagstruct_eof(t)) {
981 pa_log("Packet too long");
982 goto fail;
985 pa_xfree(u->server_fqdn);
986 u->server_fqdn = pa_xstrdup(host_name);
988 pa_xfree(u->user_name);
989 u->user_name = pa_xstrdup(user_name);
991 update_description(u);
993 return;
995 fail:
996 pa_module_unload_request(u->module, TRUE);
999 static int read_ports(struct userdata *u, pa_tagstruct *t)
1001 if (u->version >= 16) {
1002 uint32_t n_ports;
1003 const char *s;
1005 if (pa_tagstruct_getu32(t, &n_ports)) {
1006 pa_log("Parse failure");
1007 return -PA_ERR_PROTOCOL;
1010 for (uint32_t j = 0; j < n_ports; j++) {
1011 uint32_t priority;
1013 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1014 pa_tagstruct_gets(t, &s) < 0 || /* description */
1015 pa_tagstruct_getu32(t, &priority) < 0) {
1017 pa_log("Parse failure");
1018 return -PA_ERR_PROTOCOL;
1020 if (u->version >= 24 && pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1021 pa_log("Parse failure");
1022 return -PA_ERR_PROTOCOL;
1026 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1027 pa_log("Parse failure");
1028 return -PA_ERR_PROTOCOL;
1031 return 0;
1034 #ifdef TUNNEL_SINK
1036 /* Called from main context */
1037 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1038 struct userdata *u = userdata;
1039 uint32_t idx, owner_module, monitor_source, flags;
1040 const char *name, *description, *monitor_source_name, *driver;
1041 pa_sample_spec ss;
1042 pa_channel_map cm;
1043 pa_cvolume volume;
1044 pa_bool_t mute;
1045 pa_usec_t latency;
1046 pa_proplist *pl;
1048 pa_assert(pd);
1049 pa_assert(u);
1051 pl = pa_proplist_new();
1053 if (command != PA_COMMAND_REPLY) {
1054 if (command == PA_COMMAND_ERROR)
1055 pa_log("Failed to get info.");
1056 else
1057 pa_log("Protocol error.");
1058 goto fail;
1061 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1062 pa_tagstruct_gets(t, &name) < 0 ||
1063 pa_tagstruct_gets(t, &description) < 0 ||
1064 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1065 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1066 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1067 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1068 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1069 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1070 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1071 pa_tagstruct_get_usec(t, &latency) < 0 ||
1072 pa_tagstruct_gets(t, &driver) < 0 ||
1073 pa_tagstruct_getu32(t, &flags) < 0) {
1075 pa_log("Parse failure");
1076 goto fail;
1079 if (u->version >= 13) {
1080 pa_usec_t configured_latency;
1082 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1083 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1085 pa_log("Parse failure");
1086 goto fail;
1090 if (u->version >= 15) {
1091 pa_volume_t base_volume;
1092 uint32_t state, n_volume_steps, card;
1094 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1095 pa_tagstruct_getu32(t, &state) < 0 ||
1096 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1097 pa_tagstruct_getu32(t, &card) < 0) {
1099 pa_log("Parse failure");
1100 goto fail;
1104 if (read_ports(u, t) < 0)
1105 goto fail;
1107 if (u->version >= 21) {
1108 uint8_t n_formats;
1109 pa_format_info *format;
1111 if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1112 pa_log("Parse failure");
1113 goto fail;
1116 for (uint8_t j = 0; j < n_formats; j++) {
1117 format = pa_format_info_new();
1118 if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1119 pa_format_info_free(format);
1120 pa_log("Parse failure");
1121 goto fail;
1123 pa_format_info_free(format);
1127 if (!pa_tagstruct_eof(t)) {
1128 pa_log("Packet too long");
1129 goto fail;
1132 pa_proplist_free(pl);
1134 if (!u->sink_name || strcmp(name, u->sink_name))
1135 return;
1137 pa_xfree(u->device_description);
1138 u->device_description = pa_xstrdup(description);
1140 update_description(u);
1142 return;
1144 fail:
1145 pa_module_unload_request(u->module, TRUE);
1146 pa_proplist_free(pl);
1149 /* Called from main context */
1150 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1151 struct userdata *u = userdata;
1152 uint32_t idx, owner_module, client, sink;
1153 pa_usec_t buffer_usec, sink_usec;
1154 const char *name, *driver, *resample_method;
1155 pa_bool_t mute = FALSE;
1156 pa_sample_spec sample_spec;
1157 pa_channel_map channel_map;
1158 pa_cvolume volume;
1159 pa_proplist *pl;
1160 pa_bool_t b;
1162 pa_assert(pd);
1163 pa_assert(u);
1165 pl = pa_proplist_new();
1167 if (command != PA_COMMAND_REPLY) {
1168 if (command == PA_COMMAND_ERROR)
1169 pa_log("Failed to get info.");
1170 else
1171 pa_log("Protocol error.");
1172 goto fail;
1175 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1176 pa_tagstruct_gets(t, &name) < 0 ||
1177 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1178 pa_tagstruct_getu32(t, &client) < 0 ||
1179 pa_tagstruct_getu32(t, &sink) < 0 ||
1180 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1181 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1182 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1183 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1184 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1185 pa_tagstruct_gets(t, &resample_method) < 0 ||
1186 pa_tagstruct_gets(t, &driver) < 0) {
1188 pa_log("Parse failure");
1189 goto fail;
1192 if (u->version >= 11) {
1193 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1195 pa_log("Parse failure");
1196 goto fail;
1200 if (u->version >= 13) {
1201 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1203 pa_log("Parse failure");
1204 goto fail;
1208 if (u->version >= 19) {
1209 if (pa_tagstruct_get_boolean(t, &b) < 0) {
1211 pa_log("Parse failure");
1212 goto fail;
1216 if (u->version >= 20) {
1217 if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1218 pa_tagstruct_get_boolean(t, &b) < 0) {
1220 pa_log("Parse failure");
1221 goto fail;
1225 if (u->version >= 21) {
1226 pa_format_info *format = pa_format_info_new();
1228 if (pa_tagstruct_get_format_info(t, format) < 0) {
1229 pa_format_info_free(format);
1230 pa_log("Parse failure");
1231 goto fail;
1233 pa_format_info_free(format);
1236 if (!pa_tagstruct_eof(t)) {
1237 pa_log("Packet too long");
1238 goto fail;
1241 pa_proplist_free(pl);
1243 if (idx != u->device_index)
1244 return;
1246 pa_assert(u->sink);
1248 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1249 pa_cvolume_equal(&volume, &u->sink->real_volume))
1250 return;
1252 pa_sink_volume_changed(u->sink, &volume);
1254 if (u->version >= 11)
1255 pa_sink_mute_changed(u->sink, mute);
1257 return;
1259 fail:
1260 pa_module_unload_request(u->module, TRUE);
1261 pa_proplist_free(pl);
1264 #else
1266 /* Called from main context */
1267 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1268 struct userdata *u = userdata;
1269 uint32_t idx, owner_module, monitor_of_sink, flags;
1270 const char *name, *description, *monitor_of_sink_name, *driver;
1271 pa_sample_spec ss;
1272 pa_channel_map cm;
1273 pa_cvolume volume;
1274 pa_bool_t mute;
1275 pa_usec_t latency, configured_latency;
1276 pa_proplist *pl;
1278 pa_assert(pd);
1279 pa_assert(u);
1281 pl = pa_proplist_new();
1283 if (command != PA_COMMAND_REPLY) {
1284 if (command == PA_COMMAND_ERROR)
1285 pa_log("Failed to get info.");
1286 else
1287 pa_log("Protocol error.");
1288 goto fail;
1291 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1292 pa_tagstruct_gets(t, &name) < 0 ||
1293 pa_tagstruct_gets(t, &description) < 0 ||
1294 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1295 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1296 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1297 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1298 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1299 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1300 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1301 pa_tagstruct_get_usec(t, &latency) < 0 ||
1302 pa_tagstruct_gets(t, &driver) < 0 ||
1303 pa_tagstruct_getu32(t, &flags) < 0) {
1305 pa_log("Parse failure");
1306 goto fail;
1309 if (u->version >= 13) {
1310 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1311 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1313 pa_log("Parse failure");
1314 goto fail;
1318 if (u->version >= 15) {
1319 pa_volume_t base_volume;
1320 uint32_t state, n_volume_steps, card;
1322 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1323 pa_tagstruct_getu32(t, &state) < 0 ||
1324 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1325 pa_tagstruct_getu32(t, &card) < 0) {
1327 pa_log("Parse failure");
1328 goto fail;
1332 if (read_ports(u, t) < 0)
1333 goto fail;
1335 if (!pa_tagstruct_eof(t)) {
1336 pa_log("Packet too long");
1337 goto fail;
1340 pa_proplist_free(pl);
1342 if (!u->source_name || strcmp(name, u->source_name))
1343 return;
1345 pa_xfree(u->device_description);
1346 u->device_description = pa_xstrdup(description);
1348 update_description(u);
1350 return;
1352 fail:
1353 pa_module_unload_request(u->module, TRUE);
1354 pa_proplist_free(pl);
1357 #endif
1359 /* Called from main context */
1360 static void request_info(struct userdata *u) {
1361 pa_tagstruct *t;
1362 uint32_t tag;
1363 pa_assert(u);
1365 t = pa_tagstruct_new(NULL, 0);
1366 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1367 pa_tagstruct_putu32(t, tag = u->ctag++);
1368 pa_pstream_send_tagstruct(u->pstream, t);
1369 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1371 #ifdef TUNNEL_SINK
1372 t = pa_tagstruct_new(NULL, 0);
1373 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1374 pa_tagstruct_putu32(t, tag = u->ctag++);
1375 pa_tagstruct_putu32(t, u->device_index);
1376 pa_pstream_send_tagstruct(u->pstream, t);
1377 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1379 if (u->sink_name) {
1380 t = pa_tagstruct_new(NULL, 0);
1381 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1382 pa_tagstruct_putu32(t, tag = u->ctag++);
1383 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1384 pa_tagstruct_puts(t, u->sink_name);
1385 pa_pstream_send_tagstruct(u->pstream, t);
1386 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1388 #else
1389 if (u->source_name) {
1390 t = pa_tagstruct_new(NULL, 0);
1391 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1392 pa_tagstruct_putu32(t, tag = u->ctag++);
1393 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1394 pa_tagstruct_puts(t, u->source_name);
1395 pa_pstream_send_tagstruct(u->pstream, t);
1396 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1398 #endif
1401 /* Called from main context */
1402 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1403 struct userdata *u = userdata;
1404 pa_subscription_event_type_t e;
1405 uint32_t idx;
1407 pa_assert(pd);
1408 pa_assert(t);
1409 pa_assert(u);
1410 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1412 if (pa_tagstruct_getu32(t, &e) < 0 ||
1413 pa_tagstruct_getu32(t, &idx) < 0) {
1414 pa_log("Invalid protocol reply");
1415 pa_module_unload_request(u->module, TRUE);
1416 return;
1419 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1420 #ifdef TUNNEL_SINK
1421 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1422 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1423 #else
1424 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1425 #endif
1427 return;
1429 request_info(u);
1432 /* Called from main context */
1433 static void start_subscribe(struct userdata *u) {
1434 pa_tagstruct *t;
1435 pa_assert(u);
1437 t = pa_tagstruct_new(NULL, 0);
1438 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1439 pa_tagstruct_putu32(t, u->ctag++);
1440 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1441 #ifdef TUNNEL_SINK
1442 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1443 #else
1444 PA_SUBSCRIPTION_MASK_SOURCE
1445 #endif
1448 pa_pstream_send_tagstruct(u->pstream, t);
1451 /* Called from main context */
1452 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1453 struct userdata *u = userdata;
1454 #ifdef TUNNEL_SINK
1455 uint32_t bytes;
1456 #endif
1458 pa_assert(pd);
1459 pa_assert(u);
1460 pa_assert(u->pdispatch == pd);
1462 if (command != PA_COMMAND_REPLY) {
1463 if (command == PA_COMMAND_ERROR)
1464 pa_log("Failed to create stream.");
1465 else
1466 pa_log("Protocol error.");
1467 goto fail;
1470 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1471 pa_tagstruct_getu32(t, &u->device_index) < 0
1472 #ifdef TUNNEL_SINK
1473 || pa_tagstruct_getu32(t, &bytes) < 0
1474 #endif
1476 goto parse_error;
1478 if (u->version >= 9) {
1479 #ifdef TUNNEL_SINK
1480 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1481 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1482 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1483 pa_tagstruct_getu32(t, &u->minreq) < 0)
1484 goto parse_error;
1485 #else
1486 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1487 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1488 goto parse_error;
1489 #endif
1492 if (u->version >= 12) {
1493 pa_sample_spec ss;
1494 pa_channel_map cm;
1495 uint32_t device_index;
1496 const char *dn;
1497 pa_bool_t suspended;
1499 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1500 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1501 pa_tagstruct_getu32(t, &device_index) < 0 ||
1502 pa_tagstruct_gets(t, &dn) < 0 ||
1503 pa_tagstruct_get_boolean(t, &suspended) < 0)
1504 goto parse_error;
1506 #ifdef TUNNEL_SINK
1507 pa_xfree(u->sink_name);
1508 u->sink_name = pa_xstrdup(dn);
1509 #else
1510 pa_xfree(u->source_name);
1511 u->source_name = pa_xstrdup(dn);
1512 #endif
1515 if (u->version >= 13) {
1516 pa_usec_t usec;
1518 if (pa_tagstruct_get_usec(t, &usec) < 0)
1519 goto parse_error;
1521 /* #ifdef TUNNEL_SINK */
1522 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1523 /* #else */
1524 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1525 /* #endif */
1528 if (u->version >= 21) {
1529 pa_format_info *format = pa_format_info_new();
1531 if (pa_tagstruct_get_format_info(t, format) < 0) {
1532 pa_format_info_free(format);
1533 goto parse_error;
1536 pa_format_info_free(format);
1539 if (!pa_tagstruct_eof(t))
1540 goto parse_error;
1542 start_subscribe(u);
1543 request_info(u);
1545 pa_assert(!u->time_event);
1546 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1548 request_latency(u);
1550 pa_log_debug("Stream created.");
1552 #ifdef TUNNEL_SINK
1553 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1554 #endif
1556 return;
1558 parse_error:
1559 pa_log("Invalid reply. (Create stream)");
1561 fail:
1562 pa_module_unload_request(u->module, TRUE);
1566 /* Called from main context */
1567 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1568 struct userdata *u = userdata;
1569 pa_tagstruct *reply;
1570 char name[256], un[128], hn[128];
1571 #ifdef TUNNEL_SINK
1572 pa_cvolume volume;
1573 #endif
1575 pa_assert(pd);
1576 pa_assert(u);
1577 pa_assert(u->pdispatch == pd);
1579 if (command != PA_COMMAND_REPLY ||
1580 pa_tagstruct_getu32(t, &u->version) < 0 ||
1581 !pa_tagstruct_eof(t)) {
1583 if (command == PA_COMMAND_ERROR)
1584 pa_log("Failed to authenticate");
1585 else
1586 pa_log("Protocol error.");
1588 goto fail;
1591 /* Minimum supported protocol version */
1592 if (u->version < 8) {
1593 pa_log("Incompatible protocol version");
1594 goto fail;
1597 /* Starting with protocol version 13 the MSB of the version tag
1598 reflects if shm is enabled for this connection or not. We don't
1599 support SHM here at all, so we just ignore this. */
1601 if (u->version >= 13)
1602 u->version &= 0x7FFFFFFFU;
1604 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1606 #ifdef TUNNEL_SINK
1607 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1608 pa_sink_update_proplist(u->sink, 0, NULL);
1610 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1611 u->sink_name,
1612 pa_get_user_name(un, sizeof(un)),
1613 pa_get_host_name(hn, sizeof(hn)));
1614 #else
1615 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1616 pa_source_update_proplist(u->source, 0, NULL);
1618 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1619 u->source_name,
1620 pa_get_user_name(un, sizeof(un)),
1621 pa_get_host_name(hn, sizeof(hn)));
1622 #endif
1624 reply = pa_tagstruct_new(NULL, 0);
1625 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1626 pa_tagstruct_putu32(reply, u->ctag++);
1628 if (u->version >= 13) {
1629 pa_proplist *pl;
1630 pl = pa_proplist_new();
1631 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1632 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1633 pa_init_proplist(pl);
1634 pa_tagstruct_put_proplist(reply, pl);
1635 pa_proplist_free(pl);
1636 } else
1637 pa_tagstruct_puts(reply, "PulseAudio");
1639 pa_pstream_send_tagstruct(u->pstream, reply);
1640 /* We ignore the server's reply here */
1642 reply = pa_tagstruct_new(NULL, 0);
1644 if (u->version < 13)
1645 /* Only for older PA versions we need to fill in the maxlength */
1646 u->maxlength = 4*1024*1024;
1648 #ifdef TUNNEL_SINK
1649 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1650 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1651 u->prebuf = u->tlength;
1652 #else
1653 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1654 #endif
1656 #ifdef TUNNEL_SINK
1657 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1658 pa_tagstruct_putu32(reply, tag = u->ctag++);
1660 if (u->version < 13)
1661 pa_tagstruct_puts(reply, name);
1663 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1664 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1665 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1666 pa_tagstruct_puts(reply, u->sink_name);
1667 pa_tagstruct_putu32(reply, u->maxlength);
1668 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1669 pa_tagstruct_putu32(reply, u->tlength);
1670 pa_tagstruct_putu32(reply, u->prebuf);
1671 pa_tagstruct_putu32(reply, u->minreq);
1672 pa_tagstruct_putu32(reply, 0);
1673 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1674 pa_tagstruct_put_cvolume(reply, &volume);
1675 #else
1676 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1677 pa_tagstruct_putu32(reply, tag = u->ctag++);
1679 if (u->version < 13)
1680 pa_tagstruct_puts(reply, name);
1682 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1683 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1684 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1685 pa_tagstruct_puts(reply, u->source_name);
1686 pa_tagstruct_putu32(reply, u->maxlength);
1687 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1688 pa_tagstruct_putu32(reply, u->fragsize);
1689 #endif
1691 if (u->version >= 12) {
1692 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1693 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1694 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1695 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1696 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1697 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1698 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1701 if (u->version >= 13) {
1702 pa_proplist *pl;
1704 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1705 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1707 pl = pa_proplist_new();
1708 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1709 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1710 pa_tagstruct_put_proplist(reply, pl);
1711 pa_proplist_free(pl);
1713 #ifndef TUNNEL_SINK
1714 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1715 #endif
1718 if (u->version >= 14) {
1719 #ifdef TUNNEL_SINK
1720 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1721 #endif
1722 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1725 if (u->version >= 15) {
1726 #ifdef TUNNEL_SINK
1727 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1728 #endif
1729 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1730 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1733 #ifdef TUNNEL_SINK
1734 if (u->version >= 17)
1735 pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1737 if (u->version >= 18)
1738 pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1739 #endif
1741 #ifdef TUNNEL_SINK
1742 if (u->version >= 21) {
1743 /* We're not using the extended API, so n_formats = 0 and that's that */
1744 pa_tagstruct_putu8(reply, 0);
1746 #endif
1748 pa_pstream_send_tagstruct(u->pstream, reply);
1749 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1751 pa_log_debug("Connection authenticated, creating stream ...");
1753 return;
1755 fail:
1756 pa_module_unload_request(u->module, TRUE);
1759 /* Called from main context */
1760 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1761 struct userdata *u = userdata;
1763 pa_assert(p);
1764 pa_assert(u);
1766 pa_log_warn("Stream died.");
1767 pa_module_unload_request(u->module, TRUE);
1770 /* Called from main context */
1771 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1772 struct userdata *u = userdata;
1774 pa_assert(p);
1775 pa_assert(packet);
1776 pa_assert(u);
1778 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1779 pa_log("Invalid packet");
1780 pa_module_unload_request(u->module, TRUE);
1781 return;
1785 #ifndef TUNNEL_SINK
1786 /* Called from main context */
1787 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1788 struct userdata *u = userdata;
1790 pa_assert(p);
1791 pa_assert(chunk);
1792 pa_assert(u);
1794 if (channel != u->channel) {
1795 pa_log("Received memory block on bad channel.");
1796 pa_module_unload_request(u->module, TRUE);
1797 return;
1800 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1802 u->counter_delta += (int64_t) chunk->length;
1804 #endif
1806 /* Called from main context */
1807 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1808 struct userdata *u = userdata;
1809 pa_tagstruct *t;
1810 uint32_t tag;
1812 pa_assert(sc);
1813 pa_assert(u);
1814 pa_assert(u->client == sc);
1816 pa_socket_client_unref(u->client);
1817 u->client = NULL;
1819 if (!io) {
1820 pa_log("Connection failed: %s", pa_cstrerror(errno));
1821 pa_module_unload_request(u->module, TRUE);
1822 return;
1825 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1826 u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1828 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1829 pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
1830 #ifndef TUNNEL_SINK
1831 pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
1832 #endif
1834 t = pa_tagstruct_new(NULL, 0);
1835 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1836 pa_tagstruct_putu32(t, tag = u->ctag++);
1837 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1839 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1841 #ifdef HAVE_CREDS
1843 pa_creds ucred;
1845 if (pa_iochannel_creds_supported(io))
1846 pa_iochannel_creds_enable(io);
1848 ucred.uid = getuid();
1849 ucred.gid = getgid();
1851 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1853 #else
1854 pa_pstream_send_tagstruct(u->pstream, t);
1855 #endif
1857 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1859 pa_log_debug("Connection established, authenticating ...");
1862 #ifdef TUNNEL_SINK
1864 /* Called from main context */
1865 static void sink_set_volume(pa_sink *sink) {
1866 struct userdata *u;
1867 pa_tagstruct *t;
1869 pa_assert(sink);
1870 u = sink->userdata;
1871 pa_assert(u);
1873 t = pa_tagstruct_new(NULL, 0);
1874 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1875 pa_tagstruct_putu32(t, u->ctag++);
1876 pa_tagstruct_putu32(t, u->device_index);
1877 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1878 pa_pstream_send_tagstruct(u->pstream, t);
1881 /* Called from main context */
1882 static void sink_set_mute(pa_sink *sink) {
1883 struct userdata *u;
1884 pa_tagstruct *t;
1886 pa_assert(sink);
1887 u = sink->userdata;
1888 pa_assert(u);
1890 if (u->version < 11)
1891 return;
1893 t = pa_tagstruct_new(NULL, 0);
1894 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1895 pa_tagstruct_putu32(t, u->ctag++);
1896 pa_tagstruct_putu32(t, u->device_index);
1897 pa_tagstruct_put_boolean(t, !!sink->muted);
1898 pa_pstream_send_tagstruct(u->pstream, t);
1901 #endif
1903 int pa__init(pa_module*m) {
1904 pa_modargs *ma = NULL;
1905 struct userdata *u = NULL;
1906 pa_sample_spec ss;
1907 pa_channel_map map;
1908 char *dn = NULL;
1909 #ifdef TUNNEL_SINK
1910 pa_sink_new_data data;
1911 #else
1912 pa_source_new_data data;
1913 #endif
1915 pa_assert(m);
1917 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1918 pa_log("Failed to parse module arguments");
1919 goto fail;
1922 m->userdata = u = pa_xnew0(struct userdata, 1);
1923 u->core = m->core;
1924 u->module = m;
1925 u->client = NULL;
1926 u->pdispatch = NULL;
1927 u->pstream = NULL;
1928 u->server_name = NULL;
1929 #ifdef TUNNEL_SINK
1930 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1931 u->sink = NULL;
1932 u->requested_bytes = 0;
1933 #else
1934 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1935 u->source = NULL;
1936 #endif
1937 u->smoother = pa_smoother_new(
1938 PA_USEC_PER_SEC,
1939 PA_USEC_PER_SEC*2,
1940 TRUE,
1941 TRUE,
1943 pa_rtclock_now(),
1944 FALSE);
1945 u->ctag = 1;
1946 u->device_index = u->channel = PA_INVALID_INDEX;
1947 u->time_event = NULL;
1948 u->ignore_latency_before = 0;
1949 u->transport_usec = u->thread_transport_usec = 0;
1950 u->remote_suspended = u->remote_corked = FALSE;
1951 u->counter = u->counter_delta = 0;
1953 u->rtpoll = pa_rtpoll_new();
1954 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1956 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1957 goto fail;
1959 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1960 pa_log("No server specified.");
1961 goto fail;
1964 ss = m->core->default_sample_spec;
1965 map = m->core->default_channel_map;
1966 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1967 pa_log("Invalid sample format specification");
1968 goto fail;
1971 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1972 pa_log("Failed to connect to server '%s'", u->server_name);
1973 goto fail;
1976 pa_socket_client_set_callback(u->client, on_connection, u);
1978 #ifdef TUNNEL_SINK
1980 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1981 dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
1983 pa_sink_new_data_init(&data);
1984 data.driver = __FILE__;
1985 data.module = m;
1986 data.namereg_fail = TRUE;
1987 pa_sink_new_data_set_name(&data, dn);
1988 pa_sink_new_data_set_sample_spec(&data, &ss);
1989 pa_sink_new_data_set_channel_map(&data, &map);
1990 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1991 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1992 if (u->sink_name)
1993 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1995 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1996 pa_log("Invalid properties");
1997 pa_sink_new_data_done(&data);
1998 goto fail;
2001 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2002 pa_sink_new_data_done(&data);
2004 if (!u->sink) {
2005 pa_log("Failed to create sink.");
2006 goto fail;
2009 u->sink->parent.process_msg = sink_process_msg;
2010 u->sink->userdata = u;
2011 u->sink->set_state = sink_set_state;
2012 pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2013 pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2015 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
2017 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2019 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2020 pa_sink_set_rtpoll(u->sink, u->rtpoll);
2022 #else
2024 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2025 dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2027 pa_source_new_data_init(&data);
2028 data.driver = __FILE__;
2029 data.module = m;
2030 data.namereg_fail = TRUE;
2031 pa_source_new_data_set_name(&data, dn);
2032 pa_source_new_data_set_sample_spec(&data, &ss);
2033 pa_source_new_data_set_channel_map(&data, &map);
2034 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2035 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2036 if (u->source_name)
2037 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2039 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2040 pa_log("Invalid properties");
2041 pa_source_new_data_done(&data);
2042 goto fail;
2045 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2046 pa_source_new_data_done(&data);
2048 if (!u->source) {
2049 pa_log("Failed to create source.");
2050 goto fail;
2053 u->source->parent.process_msg = source_process_msg;
2054 u->source->set_state = source_set_state;
2055 u->source->userdata = u;
2057 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2059 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2060 pa_source_set_rtpoll(u->source, u->rtpoll);
2062 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2063 #endif
2065 pa_xfree(dn);
2067 u->time_event = NULL;
2069 u->maxlength = (uint32_t) -1;
2070 #ifdef TUNNEL_SINK
2071 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2072 #else
2073 u->fragsize = (uint32_t) -1;
2074 #endif
2076 if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2077 pa_log("Failed to create thread.");
2078 goto fail;
2081 #ifdef TUNNEL_SINK
2082 pa_sink_put(u->sink);
2083 #else
2084 pa_source_put(u->source);
2085 #endif
2087 pa_modargs_free(ma);
2089 return 0;
2091 fail:
2092 pa__done(m);
2094 if (ma)
2095 pa_modargs_free(ma);
2097 pa_xfree(dn);
2099 return -1;
2102 void pa__done(pa_module*m) {
2103 struct userdata* u;
2105 pa_assert(m);
2107 if (!(u = m->userdata))
2108 return;
2110 #ifdef TUNNEL_SINK
2111 if (u->sink)
2112 pa_sink_unlink(u->sink);
2113 #else
2114 if (u->source)
2115 pa_source_unlink(u->source);
2116 #endif
2118 if (u->thread) {
2119 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2120 pa_thread_free(u->thread);
2123 pa_thread_mq_done(&u->thread_mq);
2125 #ifdef TUNNEL_SINK
2126 if (u->sink)
2127 pa_sink_unref(u->sink);
2128 #else
2129 if (u->source)
2130 pa_source_unref(u->source);
2131 #endif
2133 if (u->rtpoll)
2134 pa_rtpoll_free(u->rtpoll);
2136 if (u->pstream) {
2137 pa_pstream_unlink(u->pstream);
2138 pa_pstream_unref(u->pstream);
2141 if (u->pdispatch)
2142 pa_pdispatch_unref(u->pdispatch);
2144 if (u->client)
2145 pa_socket_client_unref(u->client);
2147 if (u->auth_cookie)
2148 pa_auth_cookie_unref(u->auth_cookie);
2150 if (u->smoother)
2151 pa_smoother_free(u->smoother);
2153 if (u->time_event)
2154 u->core->mainloop->time_free(u->time_event);
2156 #ifndef TUNNEL_SINK
2157 if (u->mcalign)
2158 pa_mcalign_free(u->mcalign);
2159 #endif
2161 #ifdef TUNNEL_SINK
2162 pa_xfree(u->sink_name);
2163 #else
2164 pa_xfree(u->source_name);
2165 #endif
2166 pa_xfree(u->server_name);
2168 pa_xfree(u->device_description);
2169 pa_xfree(u->server_fqdn);
2170 pa_xfree(u->user_name);
2172 pa_xfree(u);