esd,simple: use pa_memblockq_pop_missing()
[pulseaudio-mirror.git] / src / modules / module-tunnel.c
blobc97de3a11b2d23923bb72193cdf539a18858e924
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58 #include <pulsecore/mcalign.h>
60 #ifdef TUNNEL_SINK
61 #include "module-tunnel-sink-symdef.h"
62 #else
63 #include "module-tunnel-source-symdef.h"
64 #endif
66 #ifdef TUNNEL_SINK
67 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 PA_MODULE_USAGE(
69 "sink_name=<name for the local sink> "
70 "sink_properties=<properties for the local sink> "
71 "server=<address> "
72 "sink=<remote sink name> "
73 "cookie=<filename> "
74 "format=<sample format> "
75 "channels=<number of channels> "
76 "rate=<sample rate> "
77 "channel_map=<channel map>");
78 #else
79 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 PA_MODULE_USAGE(
81 "source_name=<name for the local source> "
82 "source_properties=<properties for the local source> "
83 "server=<address> "
84 "source=<remote source name> "
85 "cookie=<filename> "
86 "format=<sample format> "
87 "channels=<number of channels> "
88 "rate=<sample rate> "
89 "channel_map=<channel map>");
90 #endif
92 PA_MODULE_AUTHOR("Lennart Poettering");
93 PA_MODULE_VERSION(PACKAGE_VERSION);
94 PA_MODULE_LOAD_ONCE(FALSE);
96 static const char* const valid_modargs[] = {
97 "server",
98 "cookie",
99 "format",
100 "channels",
101 "rate",
102 #ifdef TUNNEL_SINK
103 "sink_name",
104 "sink_properties",
105 "sink",
106 #else
107 "source_name",
108 "source_properties",
109 "source",
110 #endif
111 "channel_map",
112 NULL,
115 #define DEFAULT_TIMEOUT 5
117 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
119 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
121 #ifdef TUNNEL_SINK
123 enum {
124 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
125 SINK_MESSAGE_REMOTE_SUSPEND,
126 SINK_MESSAGE_UPDATE_LATENCY,
127 SINK_MESSAGE_POST
130 #define DEFAULT_TLENGTH_MSEC 150
131 #define DEFAULT_MINREQ_MSEC 25
133 #else
135 enum {
136 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
137 SOURCE_MESSAGE_REMOTE_SUSPEND,
138 SOURCE_MESSAGE_UPDATE_LATENCY
141 #define DEFAULT_FRAGSIZE_MSEC 25
143 #endif
145 #ifdef TUNNEL_SINK
146 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 #endif
149 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
157 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
158 #ifdef TUNNEL_SINK
159 [PA_COMMAND_REQUEST] = command_request,
160 [PA_COMMAND_STARTED] = command_started,
161 #endif
162 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
163 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
164 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
165 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
166 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
167 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
168 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
169 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
170 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
171 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
172 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
173 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
174 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
175 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
178 struct userdata {
179 pa_core *core;
180 pa_module *module;
182 pa_thread_mq thread_mq;
183 pa_rtpoll *rtpoll;
184 pa_thread *thread;
186 pa_socket_client *client;
187 pa_pstream *pstream;
188 pa_pdispatch *pdispatch;
190 char *server_name;
191 #ifdef TUNNEL_SINK
192 char *sink_name;
193 pa_sink *sink;
194 size_t requested_bytes;
195 #else
196 char *source_name;
197 pa_source *source;
198 pa_mcalign *mcalign;
199 #endif
201 pa_auth_cookie *auth_cookie;
203 uint32_t version;
204 uint32_t ctag;
205 uint32_t device_index;
206 uint32_t channel;
208 int64_t counter, counter_delta;
210 pa_bool_t remote_corked:1;
211 pa_bool_t remote_suspended:1;
213 pa_usec_t transport_usec; /* maintained in the main thread */
214 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
216 uint32_t ignore_latency_before;
218 pa_time_event *time_event;
220 pa_smoother *smoother;
222 char *device_description;
223 char *server_fqdn;
224 char *user_name;
226 uint32_t maxlength;
227 #ifdef TUNNEL_SINK
228 uint32_t tlength;
229 uint32_t minreq;
230 uint32_t prebuf;
231 #else
232 uint32_t fragsize;
233 #endif
236 static void request_latency(struct userdata *u);
238 /* Called from main context */
239 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
240 pa_log_debug("Got stream or client event.");
243 /* Called from main context */
244 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
245 struct userdata *u = userdata;
247 pa_assert(pd);
248 pa_assert(t);
249 pa_assert(u);
250 pa_assert(u->pdispatch == pd);
252 pa_log_warn("Stream killed");
253 pa_module_unload_request(u->module, TRUE);
256 /* Called from main context */
257 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
258 struct userdata *u = userdata;
260 pa_assert(pd);
261 pa_assert(t);
262 pa_assert(u);
263 pa_assert(u->pdispatch == pd);
265 pa_log_info("Server signalled buffer overrun/underrun.");
266 request_latency(u);
269 /* Called from main context */
270 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
271 struct userdata *u = userdata;
272 uint32_t channel;
273 pa_bool_t suspended;
275 pa_assert(pd);
276 pa_assert(t);
277 pa_assert(u);
278 pa_assert(u->pdispatch == pd);
280 if (pa_tagstruct_getu32(t, &channel) < 0 ||
281 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
282 !pa_tagstruct_eof(t)) {
284 pa_log("Invalid packet.");
285 pa_module_unload_request(u->module, TRUE);
286 return;
289 pa_log_debug("Server reports device suspend.");
291 #ifdef TUNNEL_SINK
292 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #else
294 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
295 #endif
297 request_latency(u);
300 /* Called from main context */
301 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
302 struct userdata *u = userdata;
303 uint32_t channel, di;
304 const char *dn;
305 pa_bool_t suspended;
307 pa_assert(pd);
308 pa_assert(t);
309 pa_assert(u);
310 pa_assert(u->pdispatch == pd);
312 if (pa_tagstruct_getu32(t, &channel) < 0 ||
313 pa_tagstruct_getu32(t, &di) < 0 ||
314 pa_tagstruct_gets(t, &dn) < 0 ||
315 pa_tagstruct_get_boolean(t, &suspended) < 0) {
317 pa_log_error("Invalid packet.");
318 pa_module_unload_request(u->module, TRUE);
319 return;
322 pa_log_debug("Server reports a stream move.");
324 #ifdef TUNNEL_SINK
325 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #else
327 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
328 #endif
330 request_latency(u);
333 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
334 struct userdata *u = userdata;
335 uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
336 pa_usec_t usec;
338 pa_assert(pd);
339 pa_assert(t);
340 pa_assert(u);
341 pa_assert(u->pdispatch == pd);
343 if (pa_tagstruct_getu32(t, &channel) < 0 ||
344 pa_tagstruct_getu32(t, &maxlength) < 0) {
346 pa_log_error("Invalid packet.");
347 pa_module_unload_request(u->module, TRUE);
348 return;
351 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
352 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
353 pa_tagstruct_get_usec(t, &usec) < 0) {
355 pa_log_error("Invalid packet.");
356 pa_module_unload_request(u->module, TRUE);
357 return;
359 } else {
360 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
361 pa_tagstruct_getu32(t, &prebuf) < 0 ||
362 pa_tagstruct_getu32(t, &minreq) < 0 ||
363 pa_tagstruct_get_usec(t, &usec) < 0) {
365 pa_log_error("Invalid packet.");
366 pa_module_unload_request(u->module, TRUE);
367 return;
371 #ifdef TUNNEL_SINK
372 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
373 #endif
375 request_latency(u);
378 #ifdef TUNNEL_SINK
380 /* Called from main context */
381 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
382 struct userdata *u = userdata;
384 pa_assert(pd);
385 pa_assert(t);
386 pa_assert(u);
387 pa_assert(u->pdispatch == pd);
389 pa_log_debug("Server reports playback started.");
390 request_latency(u);
393 #endif
395 /* Called from IO thread context */
396 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
397 pa_usec_t x;
399 pa_assert(u);
401 x = pa_rtclock_now();
403 /* Correct by the time the requested issued needs to travel to the
404 * other side. This is a valid thread-safe access, because the
405 * main thread is waiting for us */
407 if (past)
408 x -= u->thread_transport_usec;
409 else
410 x += u->thread_transport_usec;
412 if (u->remote_suspended || u->remote_corked)
413 pa_smoother_pause(u->smoother, x);
414 else
415 pa_smoother_resume(u->smoother, x, TRUE);
418 /* Called from IO thread context */
419 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
420 pa_assert(u);
422 if (u->remote_corked == cork)
423 return;
425 u->remote_corked = cork;
426 check_smoother_status(u, FALSE);
429 /* Called from main context */
430 static void stream_cork(struct userdata *u, pa_bool_t cork) {
431 pa_tagstruct *t;
432 pa_assert(u);
434 if (!u->pstream)
435 return;
437 t = pa_tagstruct_new(NULL, 0);
438 #ifdef TUNNEL_SINK
439 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
440 #else
441 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
442 #endif
443 pa_tagstruct_putu32(t, u->ctag++);
444 pa_tagstruct_putu32(t, u->channel);
445 pa_tagstruct_put_boolean(t, !!cork);
446 pa_pstream_send_tagstruct(u->pstream, t);
448 request_latency(u);
451 /* Called from IO thread context */
452 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
453 pa_assert(u);
455 if (u->remote_suspended == suspend)
456 return;
458 u->remote_suspended = suspend;
459 check_smoother_status(u, TRUE);
462 #ifdef TUNNEL_SINK
464 /* Called from IO thread context */
465 static void send_data(struct userdata *u) {
466 pa_assert(u);
468 while (u->requested_bytes > 0) {
469 pa_memchunk memchunk;
471 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
472 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
473 pa_memblock_unref(memchunk.memblock);
475 u->requested_bytes -= memchunk.length;
477 u->counter += (int64_t) memchunk.length;
481 /* This function is called from IO context -- except when it is not. */
482 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
483 struct userdata *u = PA_SINK(o)->userdata;
485 switch (code) {
487 case PA_SINK_MESSAGE_SET_STATE: {
488 int r;
490 /* First, change the state, because otherwide pa_sink_render() would fail */
491 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
493 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
495 if (PA_SINK_IS_OPENED(u->sink->state))
496 send_data(u);
499 return r;
502 case PA_SINK_MESSAGE_GET_LATENCY: {
503 pa_usec_t yl, yr, *usec = data;
505 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
506 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
508 *usec = yl > yr ? yl - yr : 0;
509 return 0;
512 case SINK_MESSAGE_REQUEST:
514 pa_assert(offset > 0);
515 u->requested_bytes += (size_t) offset;
517 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
518 send_data(u);
520 return 0;
523 case SINK_MESSAGE_REMOTE_SUSPEND:
525 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
526 return 0;
529 case SINK_MESSAGE_UPDATE_LATENCY: {
530 pa_usec_t y;
532 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
534 if (y > (pa_usec_t) offset)
535 y -= (pa_usec_t) offset;
536 else
537 y = 0;
539 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
541 /* We can access this freely here, since the main thread is waiting for us */
542 u->thread_transport_usec = u->transport_usec;
544 return 0;
547 case SINK_MESSAGE_POST:
549 /* OK, This might be a bit confusing. This message is
550 * delivered to us from the main context -- NOT from the
551 * IO thread context where the rest of the messages are
552 * dispatched. Yeah, ugly, but I am a lazy bastard. */
554 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
556 u->counter_delta += (int64_t) chunk->length;
558 return 0;
561 return pa_sink_process_msg(o, code, data, offset, chunk);
564 /* Called from main context */
565 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
566 struct userdata *u;
567 pa_sink_assert_ref(s);
568 u = s->userdata;
570 switch ((pa_sink_state_t) state) {
572 case PA_SINK_SUSPENDED:
573 pa_assert(PA_SINK_IS_OPENED(s->state));
574 stream_cork(u, TRUE);
575 break;
577 case PA_SINK_IDLE:
578 case PA_SINK_RUNNING:
579 if (s->state == PA_SINK_SUSPENDED)
580 stream_cork(u, FALSE);
581 break;
583 case PA_SINK_UNLINKED:
584 case PA_SINK_INIT:
585 case PA_SINK_INVALID_STATE:
589 return 0;
592 #else
594 /* This function is called from IO context -- except when it is not. */
595 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
596 struct userdata *u = PA_SOURCE(o)->userdata;
598 switch (code) {
600 case PA_SOURCE_MESSAGE_SET_STATE: {
601 int r;
603 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
604 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
606 return r;
609 case PA_SOURCE_MESSAGE_GET_LATENCY: {
610 pa_usec_t yr, yl, *usec = data;
612 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
613 yr = pa_smoother_get(u->smoother, pa_rtclock_now());
615 *usec = yr > yl ? yr - yl : 0;
616 return 0;
619 case SOURCE_MESSAGE_POST: {
620 pa_memchunk c;
622 pa_mcalign_push(u->mcalign, chunk);
624 while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
626 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
627 pa_source_post(u->source, &c);
629 pa_memblock_unref(c.memblock);
631 u->counter += (int64_t) c.length;
634 return 0;
637 case SOURCE_MESSAGE_REMOTE_SUSPEND:
639 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
640 return 0;
642 case SOURCE_MESSAGE_UPDATE_LATENCY: {
643 pa_usec_t y;
645 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
646 y += (pa_usec_t) offset;
648 pa_smoother_put(u->smoother, pa_rtclock_now(), y);
650 /* We can access this freely here, since the main thread is waiting for us */
651 u->thread_transport_usec = u->transport_usec;
653 return 0;
657 return pa_source_process_msg(o, code, data, offset, chunk);
660 /* Called from main context */
661 static int source_set_state(pa_source *s, pa_source_state_t state) {
662 struct userdata *u;
663 pa_source_assert_ref(s);
664 u = s->userdata;
666 switch ((pa_source_state_t) state) {
668 case PA_SOURCE_SUSPENDED:
669 pa_assert(PA_SOURCE_IS_OPENED(s->state));
670 stream_cork(u, TRUE);
671 break;
673 case PA_SOURCE_IDLE:
674 case PA_SOURCE_RUNNING:
675 if (s->state == PA_SOURCE_SUSPENDED)
676 stream_cork(u, FALSE);
677 break;
679 case PA_SOURCE_UNLINKED:
680 case PA_SOURCE_INIT:
681 case PA_SINK_INVALID_STATE:
685 return 0;
688 #endif
690 static void thread_func(void *userdata) {
691 struct userdata *u = userdata;
693 pa_assert(u);
695 pa_log_debug("Thread starting up");
697 pa_thread_mq_install(&u->thread_mq);
699 for (;;) {
700 int ret;
702 #ifdef TUNNEL_SINK
703 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
704 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
705 pa_sink_process_rewind(u->sink, 0);
706 #endif
708 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
709 goto fail;
711 if (ret == 0)
712 goto finish;
715 fail:
716 /* If this was no regular exit from the loop we have to continue
717 * processing messages until we received PA_MESSAGE_SHUTDOWN */
718 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
719 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
721 finish:
722 pa_log_debug("Thread shutting down");
725 #ifdef TUNNEL_SINK
726 /* Called from main context */
727 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
728 struct userdata *u = userdata;
729 uint32_t bytes, channel;
731 pa_assert(pd);
732 pa_assert(command == PA_COMMAND_REQUEST);
733 pa_assert(t);
734 pa_assert(u);
735 pa_assert(u->pdispatch == pd);
737 if (pa_tagstruct_getu32(t, &channel) < 0 ||
738 pa_tagstruct_getu32(t, &bytes) < 0) {
739 pa_log("Invalid protocol reply");
740 goto fail;
743 if (channel != u->channel) {
744 pa_log("Received data for invalid channel");
745 goto fail;
748 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
749 return;
751 fail:
752 pa_module_unload_request(u->module, TRUE);
755 #endif
757 /* Called from main context */
758 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
759 struct userdata *u = userdata;
760 pa_usec_t sink_usec, source_usec;
761 pa_bool_t playing;
762 int64_t write_index, read_index;
763 struct timeval local, remote, now;
764 pa_sample_spec *ss;
765 int64_t delay;
767 pa_assert(pd);
768 pa_assert(u);
770 if (command != PA_COMMAND_REPLY) {
771 if (command == PA_COMMAND_ERROR)
772 pa_log("Failed to get latency.");
773 else
774 pa_log("Protocol error.");
775 goto fail;
778 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
779 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
780 pa_tagstruct_get_boolean(t, &playing) < 0 ||
781 pa_tagstruct_get_timeval(t, &local) < 0 ||
782 pa_tagstruct_get_timeval(t, &remote) < 0 ||
783 pa_tagstruct_gets64(t, &write_index) < 0 ||
784 pa_tagstruct_gets64(t, &read_index) < 0) {
785 pa_log("Invalid reply.");
786 goto fail;
789 #ifdef TUNNEL_SINK
790 if (u->version >= 13) {
791 uint64_t underrun_for = 0, playing_for = 0;
793 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
794 pa_tagstruct_getu64(t, &playing_for) < 0) {
795 pa_log("Invalid reply.");
796 goto fail;
799 #endif
801 if (!pa_tagstruct_eof(t)) {
802 pa_log("Invalid reply.");
803 goto fail;
806 if (tag < u->ignore_latency_before) {
807 return;
810 pa_gettimeofday(&now);
812 /* Calculate transport usec */
813 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
814 /* local and remote seem to have synchronized clocks */
815 #ifdef TUNNEL_SINK
816 u->transport_usec = pa_timeval_diff(&remote, &local);
817 #else
818 u->transport_usec = pa_timeval_diff(&now, &remote);
819 #endif
820 } else
821 u->transport_usec = pa_timeval_diff(&now, &local)/2;
823 /* First, take the device's delay */
824 #ifdef TUNNEL_SINK
825 delay = (int64_t) sink_usec;
826 ss = &u->sink->sample_spec;
827 #else
828 delay = (int64_t) source_usec;
829 ss = &u->source->sample_spec;
830 #endif
832 /* Add the length of our server-side buffer */
833 if (write_index >= read_index)
834 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
835 else
836 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
838 /* Our measurements are already out of date, hence correct by the *
839 * transport latency */
840 #ifdef TUNNEL_SINK
841 delay -= (int64_t) u->transport_usec;
842 #else
843 delay += (int64_t) u->transport_usec;
844 #endif
846 /* Now correct by what we have have read/written since we requested the update */
847 #ifdef TUNNEL_SINK
848 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
849 #else
850 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
851 #endif
853 #ifdef TUNNEL_SINK
854 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
855 #else
856 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
857 #endif
859 return;
861 fail:
863 pa_module_unload_request(u->module, TRUE);
866 /* Called from main context */
867 static void request_latency(struct userdata *u) {
868 pa_tagstruct *t;
869 struct timeval now;
870 uint32_t tag;
871 pa_assert(u);
873 t = pa_tagstruct_new(NULL, 0);
874 #ifdef TUNNEL_SINK
875 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
876 #else
877 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
878 #endif
879 pa_tagstruct_putu32(t, tag = u->ctag++);
880 pa_tagstruct_putu32(t, u->channel);
882 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
884 pa_pstream_send_tagstruct(u->pstream, t);
885 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
887 u->ignore_latency_before = tag;
888 u->counter_delta = 0;
891 /* Called from main context */
892 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
893 struct userdata *u = userdata;
895 pa_assert(m);
896 pa_assert(e);
897 pa_assert(u);
899 request_latency(u);
901 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
904 /* Called from main context */
905 static void update_description(struct userdata *u) {
906 char *d;
907 char un[128], hn[128];
908 pa_tagstruct *t;
910 pa_assert(u);
912 if (!u->server_fqdn || !u->user_name || !u->device_description)
913 return;
915 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
917 #ifdef TUNNEL_SINK
918 pa_sink_set_description(u->sink, d);
919 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
920 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
921 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
922 #else
923 pa_source_set_description(u->source, d);
924 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
925 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
926 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
927 #endif
929 pa_xfree(d);
931 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
932 pa_get_user_name(un, sizeof(un)),
933 pa_get_host_name(hn, sizeof(hn)));
935 t = pa_tagstruct_new(NULL, 0);
936 #ifdef TUNNEL_SINK
937 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
938 #else
939 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
940 #endif
941 pa_tagstruct_putu32(t, u->ctag++);
942 pa_tagstruct_putu32(t, u->channel);
943 pa_tagstruct_puts(t, d);
944 pa_pstream_send_tagstruct(u->pstream, t);
946 pa_xfree(d);
949 /* Called from main context */
950 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
951 struct userdata *u = userdata;
952 pa_sample_spec ss;
953 pa_channel_map cm;
954 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
955 uint32_t cookie;
957 pa_assert(pd);
958 pa_assert(u);
960 if (command != PA_COMMAND_REPLY) {
961 if (command == PA_COMMAND_ERROR)
962 pa_log("Failed to get info.");
963 else
964 pa_log("Protocol error.");
965 goto fail;
968 if (pa_tagstruct_gets(t, &server_name) < 0 ||
969 pa_tagstruct_gets(t, &server_version) < 0 ||
970 pa_tagstruct_gets(t, &user_name) < 0 ||
971 pa_tagstruct_gets(t, &host_name) < 0 ||
972 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
973 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
974 pa_tagstruct_gets(t, &default_source_name) < 0 ||
975 pa_tagstruct_getu32(t, &cookie) < 0 ||
976 (u->version >= 15 &&
977 pa_tagstruct_get_channel_map(t, &cm) < 0)) {
979 pa_log("Parse failure");
980 goto fail;
983 if (!pa_tagstruct_eof(t)) {
984 pa_log("Packet too long");
985 goto fail;
988 pa_xfree(u->server_fqdn);
989 u->server_fqdn = pa_xstrdup(host_name);
991 pa_xfree(u->user_name);
992 u->user_name = pa_xstrdup(user_name);
994 update_description(u);
996 return;
998 fail:
999 pa_module_unload_request(u->module, TRUE);
1002 #ifdef TUNNEL_SINK
1004 /* Called from main context */
1005 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1006 struct userdata *u = userdata;
1007 uint32_t idx, owner_module, monitor_source, flags;
1008 const char *name, *description, *monitor_source_name, *driver;
1009 pa_sample_spec ss;
1010 pa_channel_map cm;
1011 pa_cvolume volume;
1012 pa_bool_t mute;
1013 pa_usec_t latency;
1014 pa_proplist *pl;
1016 pa_assert(pd);
1017 pa_assert(u);
1019 pl = pa_proplist_new();
1021 if (command != PA_COMMAND_REPLY) {
1022 if (command == PA_COMMAND_ERROR)
1023 pa_log("Failed to get info.");
1024 else
1025 pa_log("Protocol error.");
1026 goto fail;
1029 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1030 pa_tagstruct_gets(t, &name) < 0 ||
1031 pa_tagstruct_gets(t, &description) < 0 ||
1032 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1033 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1034 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1035 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1036 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1037 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1038 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1039 pa_tagstruct_get_usec(t, &latency) < 0 ||
1040 pa_tagstruct_gets(t, &driver) < 0 ||
1041 pa_tagstruct_getu32(t, &flags) < 0) {
1043 pa_log("Parse failure");
1044 goto fail;
1047 if (u->version >= 13) {
1048 pa_usec_t configured_latency;
1050 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1051 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1053 pa_log("Parse failure");
1054 goto fail;
1058 if (u->version >= 15) {
1059 pa_volume_t base_volume;
1060 uint32_t state, n_volume_steps, card;
1062 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1063 pa_tagstruct_getu32(t, &state) < 0 ||
1064 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1065 pa_tagstruct_getu32(t, &card) < 0) {
1067 pa_log("Parse failure");
1068 goto fail;
1072 if (u->version >= 16) {
1073 uint32_t n_ports;
1074 const char *s;
1076 if (pa_tagstruct_getu32(t, &n_ports)) {
1077 pa_log("Parse failure");
1078 goto fail;
1081 for (uint32_t j = 0; j < n_ports; j++) {
1082 uint32_t priority;
1084 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1085 pa_tagstruct_gets(t, &s) < 0 || /* description */
1086 pa_tagstruct_getu32(t, &priority) < 0) {
1088 pa_log("Parse failure");
1089 goto fail;
1093 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1094 pa_log("Parse failure");
1095 goto fail;
1099 if (!pa_tagstruct_eof(t)) {
1100 pa_log("Packet too long");
1101 goto fail;
1104 pa_proplist_free(pl);
1106 if (!u->sink_name || strcmp(name, u->sink_name))
1107 return;
1109 pa_xfree(u->device_description);
1110 u->device_description = pa_xstrdup(description);
1112 update_description(u);
1114 return;
1116 fail:
1117 pa_module_unload_request(u->module, TRUE);
1118 pa_proplist_free(pl);
1121 /* Called from main context */
1122 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1123 struct userdata *u = userdata;
1124 uint32_t idx, owner_module, client, sink;
1125 pa_usec_t buffer_usec, sink_usec;
1126 const char *name, *driver, *resample_method;
1127 pa_bool_t mute = FALSE;
1128 pa_sample_spec sample_spec;
1129 pa_channel_map channel_map;
1130 pa_cvolume volume;
1131 pa_proplist *pl;
1133 pa_assert(pd);
1134 pa_assert(u);
1136 pl = pa_proplist_new();
1138 if (command != PA_COMMAND_REPLY) {
1139 if (command == PA_COMMAND_ERROR)
1140 pa_log("Failed to get info.");
1141 else
1142 pa_log("Protocol error.");
1143 goto fail;
1146 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1147 pa_tagstruct_gets(t, &name) < 0 ||
1148 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1149 pa_tagstruct_getu32(t, &client) < 0 ||
1150 pa_tagstruct_getu32(t, &sink) < 0 ||
1151 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1152 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1153 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1154 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1155 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1156 pa_tagstruct_gets(t, &resample_method) < 0 ||
1157 pa_tagstruct_gets(t, &driver) < 0) {
1159 pa_log("Parse failure");
1160 goto fail;
1163 if (u->version >= 11) {
1164 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1166 pa_log("Parse failure");
1167 goto fail;
1171 if (u->version >= 13) {
1172 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1174 pa_log("Parse failure");
1175 goto fail;
1179 if (!pa_tagstruct_eof(t)) {
1180 pa_log("Packet too long");
1181 goto fail;
1184 pa_proplist_free(pl);
1186 if (idx != u->device_index)
1187 return;
1189 pa_assert(u->sink);
1191 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1192 pa_cvolume_equal(&volume, &u->sink->real_volume))
1193 return;
1195 pa_sink_volume_changed(u->sink, &volume);
1197 if (u->version >= 11)
1198 pa_sink_mute_changed(u->sink, mute);
1200 return;
1202 fail:
1203 pa_module_unload_request(u->module, TRUE);
1204 pa_proplist_free(pl);
1207 #else
1209 /* Called from main context */
1210 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1211 struct userdata *u = userdata;
1212 uint32_t idx, owner_module, monitor_of_sink, flags;
1213 const char *name, *description, *monitor_of_sink_name, *driver;
1214 pa_sample_spec ss;
1215 pa_channel_map cm;
1216 pa_cvolume volume;
1217 pa_bool_t mute;
1218 pa_usec_t latency, configured_latency;
1219 pa_proplist *pl;
1221 pa_assert(pd);
1222 pa_assert(u);
1224 pl = pa_proplist_new();
1226 if (command != PA_COMMAND_REPLY) {
1227 if (command == PA_COMMAND_ERROR)
1228 pa_log("Failed to get info.");
1229 else
1230 pa_log("Protocol error.");
1231 goto fail;
1234 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1235 pa_tagstruct_gets(t, &name) < 0 ||
1236 pa_tagstruct_gets(t, &description) < 0 ||
1237 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1238 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1239 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1240 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1241 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1242 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1243 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1244 pa_tagstruct_get_usec(t, &latency) < 0 ||
1245 pa_tagstruct_gets(t, &driver) < 0 ||
1246 pa_tagstruct_getu32(t, &flags) < 0) {
1248 pa_log("Parse failure");
1249 goto fail;
1252 if (u->version >= 13) {
1253 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1254 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1256 pa_log("Parse failure");
1257 goto fail;
1261 if (u->version >= 15) {
1262 pa_volume_t base_volume;
1263 uint32_t state, n_volume_steps, card;
1265 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1266 pa_tagstruct_getu32(t, &state) < 0 ||
1267 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1268 pa_tagstruct_getu32(t, &card) < 0) {
1270 pa_log("Parse failure");
1271 goto fail;
1275 if (u->version >= 16) {
1276 uint32_t n_ports;
1277 const char *s;
1279 if (pa_tagstruct_getu32(t, &n_ports)) {
1280 pa_log("Parse failure");
1281 goto fail;
1284 for (uint32_t j = 0; j < n_ports; j++) {
1285 uint32_t priority;
1287 if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1288 pa_tagstruct_gets(t, &s) < 0 || /* description */
1289 pa_tagstruct_getu32(t, &priority) < 0) {
1291 pa_log("Parse failure");
1292 goto fail;
1296 if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1297 pa_log("Parse failure");
1298 goto fail;
1302 if (!pa_tagstruct_eof(t)) {
1303 pa_log("Packet too long");
1304 goto fail;
1307 pa_proplist_free(pl);
1309 if (!u->source_name || strcmp(name, u->source_name))
1310 return;
1312 pa_xfree(u->device_description);
1313 u->device_description = pa_xstrdup(description);
1315 update_description(u);
1317 return;
1319 fail:
1320 pa_module_unload_request(u->module, TRUE);
1321 pa_proplist_free(pl);
1324 #endif
1326 /* Called from main context */
1327 static void request_info(struct userdata *u) {
1328 pa_tagstruct *t;
1329 uint32_t tag;
1330 pa_assert(u);
1332 t = pa_tagstruct_new(NULL, 0);
1333 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1334 pa_tagstruct_putu32(t, tag = u->ctag++);
1335 pa_pstream_send_tagstruct(u->pstream, t);
1336 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1338 #ifdef TUNNEL_SINK
1339 t = pa_tagstruct_new(NULL, 0);
1340 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1341 pa_tagstruct_putu32(t, tag = u->ctag++);
1342 pa_tagstruct_putu32(t, u->device_index);
1343 pa_pstream_send_tagstruct(u->pstream, t);
1344 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1346 if (u->sink_name) {
1347 t = pa_tagstruct_new(NULL, 0);
1348 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1349 pa_tagstruct_putu32(t, tag = u->ctag++);
1350 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1351 pa_tagstruct_puts(t, u->sink_name);
1352 pa_pstream_send_tagstruct(u->pstream, t);
1353 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1355 #else
1356 if (u->source_name) {
1357 t = pa_tagstruct_new(NULL, 0);
1358 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1359 pa_tagstruct_putu32(t, tag = u->ctag++);
1360 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1361 pa_tagstruct_puts(t, u->source_name);
1362 pa_pstream_send_tagstruct(u->pstream, t);
1363 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1365 #endif
1368 /* Called from main context */
1369 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1370 struct userdata *u = userdata;
1371 pa_subscription_event_type_t e;
1372 uint32_t idx;
1374 pa_assert(pd);
1375 pa_assert(t);
1376 pa_assert(u);
1377 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1379 if (pa_tagstruct_getu32(t, &e) < 0 ||
1380 pa_tagstruct_getu32(t, &idx) < 0) {
1381 pa_log("Invalid protocol reply");
1382 pa_module_unload_request(u->module, TRUE);
1383 return;
1386 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1387 #ifdef TUNNEL_SINK
1388 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1389 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1390 #else
1391 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1392 #endif
1394 return;
1396 request_info(u);
1399 /* Called from main context */
1400 static void start_subscribe(struct userdata *u) {
1401 pa_tagstruct *t;
1402 pa_assert(u);
1404 t = pa_tagstruct_new(NULL, 0);
1405 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1406 pa_tagstruct_putu32(t, u->ctag++);
1407 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1408 #ifdef TUNNEL_SINK
1409 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1410 #else
1411 PA_SUBSCRIPTION_MASK_SOURCE
1412 #endif
1415 pa_pstream_send_tagstruct(u->pstream, t);
1418 /* Called from main context */
1419 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1420 struct userdata *u = userdata;
1421 #ifdef TUNNEL_SINK
1422 uint32_t bytes;
1423 #endif
1425 pa_assert(pd);
1426 pa_assert(u);
1427 pa_assert(u->pdispatch == pd);
1429 if (command != PA_COMMAND_REPLY) {
1430 if (command == PA_COMMAND_ERROR)
1431 pa_log("Failed to create stream.");
1432 else
1433 pa_log("Protocol error.");
1434 goto fail;
1437 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1438 pa_tagstruct_getu32(t, &u->device_index) < 0
1439 #ifdef TUNNEL_SINK
1440 || pa_tagstruct_getu32(t, &bytes) < 0
1441 #endif
1443 goto parse_error;
1445 if (u->version >= 9) {
1446 #ifdef TUNNEL_SINK
1447 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1448 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1449 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1450 pa_tagstruct_getu32(t, &u->minreq) < 0)
1451 goto parse_error;
1452 #else
1453 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1454 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1455 goto parse_error;
1456 #endif
1459 if (u->version >= 12) {
1460 pa_sample_spec ss;
1461 pa_channel_map cm;
1462 uint32_t device_index;
1463 const char *dn;
1464 pa_bool_t suspended;
1466 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1467 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1468 pa_tagstruct_getu32(t, &device_index) < 0 ||
1469 pa_tagstruct_gets(t, &dn) < 0 ||
1470 pa_tagstruct_get_boolean(t, &suspended) < 0)
1471 goto parse_error;
1473 #ifdef TUNNEL_SINK
1474 pa_xfree(u->sink_name);
1475 u->sink_name = pa_xstrdup(dn);
1476 #else
1477 pa_xfree(u->source_name);
1478 u->source_name = pa_xstrdup(dn);
1479 #endif
1482 if (u->version >= 13) {
1483 pa_usec_t usec;
1485 if (pa_tagstruct_get_usec(t, &usec) < 0)
1486 goto parse_error;
1488 /* #ifdef TUNNEL_SINK */
1489 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1490 /* #else */
1491 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1492 /* #endif */
1495 if (!pa_tagstruct_eof(t))
1496 goto parse_error;
1498 start_subscribe(u);
1499 request_info(u);
1501 pa_assert(!u->time_event);
1502 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1504 request_latency(u);
1506 pa_log_debug("Stream created.");
1508 #ifdef TUNNEL_SINK
1509 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1510 #endif
1512 return;
1514 parse_error:
1515 pa_log("Invalid reply. (Create stream)");
1517 fail:
1518 pa_module_unload_request(u->module, TRUE);
1522 /* Called from main context */
1523 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1524 struct userdata *u = userdata;
1525 pa_tagstruct *reply;
1526 char name[256], un[128], hn[128];
1527 #ifdef TUNNEL_SINK
1528 pa_cvolume volume;
1529 #endif
1531 pa_assert(pd);
1532 pa_assert(u);
1533 pa_assert(u->pdispatch == pd);
1535 if (command != PA_COMMAND_REPLY ||
1536 pa_tagstruct_getu32(t, &u->version) < 0 ||
1537 !pa_tagstruct_eof(t)) {
1539 if (command == PA_COMMAND_ERROR)
1540 pa_log("Failed to authenticate");
1541 else
1542 pa_log("Protocol error.");
1544 goto fail;
1547 /* Minimum supported protocol version */
1548 if (u->version < 8) {
1549 pa_log("Incompatible protocol version");
1550 goto fail;
1553 /* Starting with protocol version 13 the MSB of the version tag
1554 reflects if shm is enabled for this connection or not. We don't
1555 support SHM here at all, so we just ignore this. */
1557 if (u->version >= 13)
1558 u->version &= 0x7FFFFFFFU;
1560 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1562 #ifdef TUNNEL_SINK
1563 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1564 pa_sink_update_proplist(u->sink, 0, NULL);
1566 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1567 u->sink_name,
1568 pa_get_user_name(un, sizeof(un)),
1569 pa_get_host_name(hn, sizeof(hn)));
1570 #else
1571 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1572 pa_source_update_proplist(u->source, 0, NULL);
1574 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1575 u->source_name,
1576 pa_get_user_name(un, sizeof(un)),
1577 pa_get_host_name(hn, sizeof(hn)));
1578 #endif
1580 reply = pa_tagstruct_new(NULL, 0);
1581 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1582 pa_tagstruct_putu32(reply, u->ctag++);
1584 if (u->version >= 13) {
1585 pa_proplist *pl;
1586 pl = pa_proplist_new();
1587 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1588 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1589 pa_init_proplist(pl);
1590 pa_tagstruct_put_proplist(reply, pl);
1591 pa_proplist_free(pl);
1592 } else
1593 pa_tagstruct_puts(reply, "PulseAudio");
1595 pa_pstream_send_tagstruct(u->pstream, reply);
1596 /* We ignore the server's reply here */
1598 reply = pa_tagstruct_new(NULL, 0);
1600 if (u->version < 13)
1601 /* Only for older PA versions we need to fill in the maxlength */
1602 u->maxlength = 4*1024*1024;
1604 #ifdef TUNNEL_SINK
1605 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1606 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1607 u->prebuf = u->tlength;
1608 #else
1609 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1610 #endif
1612 #ifdef TUNNEL_SINK
1613 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1614 pa_tagstruct_putu32(reply, tag = u->ctag++);
1616 if (u->version < 13)
1617 pa_tagstruct_puts(reply, name);
1619 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1620 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1621 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1622 pa_tagstruct_puts(reply, u->sink_name);
1623 pa_tagstruct_putu32(reply, u->maxlength);
1624 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1625 pa_tagstruct_putu32(reply, u->tlength);
1626 pa_tagstruct_putu32(reply, u->prebuf);
1627 pa_tagstruct_putu32(reply, u->minreq);
1628 pa_tagstruct_putu32(reply, 0);
1629 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1630 pa_tagstruct_put_cvolume(reply, &volume);
1631 #else
1632 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1633 pa_tagstruct_putu32(reply, tag = u->ctag++);
1635 if (u->version < 13)
1636 pa_tagstruct_puts(reply, name);
1638 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1639 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1640 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1641 pa_tagstruct_puts(reply, u->source_name);
1642 pa_tagstruct_putu32(reply, u->maxlength);
1643 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1644 pa_tagstruct_putu32(reply, u->fragsize);
1645 #endif
1647 if (u->version >= 12) {
1648 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1649 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1650 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1651 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1652 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1653 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1654 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1657 if (u->version >= 13) {
1658 pa_proplist *pl;
1660 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1661 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1663 pl = pa_proplist_new();
1664 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1665 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1666 pa_tagstruct_put_proplist(reply, pl);
1667 pa_proplist_free(pl);
1669 #ifndef TUNNEL_SINK
1670 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1671 #endif
1674 if (u->version >= 14) {
1675 #ifdef TUNNEL_SINK
1676 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1677 #endif
1678 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1681 if (u->version >= 15) {
1682 #ifdef TUNNEL_SINK
1683 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1684 #endif
1685 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1686 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1689 pa_pstream_send_tagstruct(u->pstream, reply);
1690 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1692 pa_log_debug("Connection authenticated, creating stream ...");
1694 return;
1696 fail:
1697 pa_module_unload_request(u->module, TRUE);
1700 /* Called from main context */
1701 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1702 struct userdata *u = userdata;
1704 pa_assert(p);
1705 pa_assert(u);
1707 pa_log_warn("Stream died.");
1708 pa_module_unload_request(u->module, TRUE);
1711 /* Called from main context */
1712 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1713 struct userdata *u = userdata;
1715 pa_assert(p);
1716 pa_assert(packet);
1717 pa_assert(u);
1719 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1720 pa_log("Invalid packet");
1721 pa_module_unload_request(u->module, TRUE);
1722 return;
1726 #ifndef TUNNEL_SINK
1727 /* Called from main context */
1728 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1729 struct userdata *u = userdata;
1731 pa_assert(p);
1732 pa_assert(chunk);
1733 pa_assert(u);
1735 if (channel != u->channel) {
1736 pa_log("Received memory block on bad channel.");
1737 pa_module_unload_request(u->module, TRUE);
1738 return;
1741 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1743 u->counter_delta += (int64_t) chunk->length;
1745 #endif
1747 /* Called from main context */
1748 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1749 struct userdata *u = userdata;
1750 pa_tagstruct *t;
1751 uint32_t tag;
1753 pa_assert(sc);
1754 pa_assert(u);
1755 pa_assert(u->client == sc);
1757 pa_socket_client_unref(u->client);
1758 u->client = NULL;
1760 if (!io) {
1761 pa_log("Connection failed: %s", pa_cstrerror(errno));
1762 pa_module_unload_request(u->module, TRUE);
1763 return;
1766 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1767 u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1769 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1770 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1771 #ifndef TUNNEL_SINK
1772 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1773 #endif
1775 t = pa_tagstruct_new(NULL, 0);
1776 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1777 pa_tagstruct_putu32(t, tag = u->ctag++);
1778 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1780 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1782 #ifdef HAVE_CREDS
1784 pa_creds ucred;
1786 if (pa_iochannel_creds_supported(io))
1787 pa_iochannel_creds_enable(io);
1789 ucred.uid = getuid();
1790 ucred.gid = getgid();
1792 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1794 #else
1795 pa_pstream_send_tagstruct(u->pstream, t);
1796 #endif
1798 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1800 pa_log_debug("Connection established, authenticating ...");
1803 #ifdef TUNNEL_SINK
1805 /* Called from main context */
1806 static void sink_set_volume(pa_sink *sink) {
1807 struct userdata *u;
1808 pa_tagstruct *t;
1810 pa_assert(sink);
1811 u = sink->userdata;
1812 pa_assert(u);
1814 t = pa_tagstruct_new(NULL, 0);
1815 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1816 pa_tagstruct_putu32(t, u->ctag++);
1817 pa_tagstruct_putu32(t, u->device_index);
1818 pa_tagstruct_put_cvolume(t, &sink->real_volume);
1819 pa_pstream_send_tagstruct(u->pstream, t);
1822 /* Called from main context */
1823 static void sink_set_mute(pa_sink *sink) {
1824 struct userdata *u;
1825 pa_tagstruct *t;
1827 pa_assert(sink);
1828 u = sink->userdata;
1829 pa_assert(u);
1831 if (u->version < 11)
1832 return;
1834 t = pa_tagstruct_new(NULL, 0);
1835 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1836 pa_tagstruct_putu32(t, u->ctag++);
1837 pa_tagstruct_putu32(t, u->device_index);
1838 pa_tagstruct_put_boolean(t, !!sink->muted);
1839 pa_pstream_send_tagstruct(u->pstream, t);
1842 #endif
1844 int pa__init(pa_module*m) {
1845 pa_modargs *ma = NULL;
1846 struct userdata *u = NULL;
1847 pa_sample_spec ss;
1848 pa_channel_map map;
1849 char *dn = NULL;
1850 #ifdef TUNNEL_SINK
1851 pa_sink_new_data data;
1852 #else
1853 pa_source_new_data data;
1854 #endif
1856 pa_assert(m);
1858 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1859 pa_log("Failed to parse module arguments");
1860 goto fail;
1863 m->userdata = u = pa_xnew0(struct userdata, 1);
1864 u->core = m->core;
1865 u->module = m;
1866 u->client = NULL;
1867 u->pdispatch = NULL;
1868 u->pstream = NULL;
1869 u->server_name = NULL;
1870 #ifdef TUNNEL_SINK
1871 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1872 u->sink = NULL;
1873 u->requested_bytes = 0;
1874 #else
1875 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1876 u->source = NULL;
1877 #endif
1878 u->smoother = pa_smoother_new(
1879 PA_USEC_PER_SEC,
1880 PA_USEC_PER_SEC*2,
1881 TRUE,
1882 TRUE,
1884 pa_rtclock_now(),
1885 FALSE);
1886 u->ctag = 1;
1887 u->device_index = u->channel = PA_INVALID_INDEX;
1888 u->time_event = NULL;
1889 u->ignore_latency_before = 0;
1890 u->transport_usec = u->thread_transport_usec = 0;
1891 u->remote_suspended = u->remote_corked = FALSE;
1892 u->counter = u->counter_delta = 0;
1894 u->rtpoll = pa_rtpoll_new();
1895 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1897 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1898 goto fail;
1900 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1901 pa_log("No server specified.");
1902 goto fail;
1905 ss = m->core->default_sample_spec;
1906 map = m->core->default_channel_map;
1907 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1908 pa_log("Invalid sample format specification");
1909 goto fail;
1912 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1913 pa_log("Failed to connect to server '%s'", u->server_name);
1914 goto fail;
1917 pa_socket_client_set_callback(u->client, on_connection, u);
1919 #ifdef TUNNEL_SINK
1921 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1922 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1924 pa_sink_new_data_init(&data);
1925 data.driver = __FILE__;
1926 data.module = m;
1927 data.namereg_fail = TRUE;
1928 pa_sink_new_data_set_name(&data, dn);
1929 pa_sink_new_data_set_sample_spec(&data, &ss);
1930 pa_sink_new_data_set_channel_map(&data, &map);
1931 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1932 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1933 if (u->sink_name)
1934 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1936 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1937 pa_log("Invalid properties");
1938 pa_sink_new_data_done(&data);
1939 goto fail;
1942 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1943 pa_sink_new_data_done(&data);
1945 if (!u->sink) {
1946 pa_log("Failed to create sink.");
1947 goto fail;
1950 u->sink->parent.process_msg = sink_process_msg;
1951 u->sink->userdata = u;
1952 u->sink->set_state = sink_set_state;
1953 u->sink->set_volume = sink_set_volume;
1954 u->sink->set_mute = sink_set_mute;
1956 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1958 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1960 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1961 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1963 #else
1965 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1966 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1968 pa_source_new_data_init(&data);
1969 data.driver = __FILE__;
1970 data.module = m;
1971 data.namereg_fail = TRUE;
1972 pa_source_new_data_set_name(&data, dn);
1973 pa_source_new_data_set_sample_spec(&data, &ss);
1974 pa_source_new_data_set_channel_map(&data, &map);
1975 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1976 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1977 if (u->source_name)
1978 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1980 if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1981 pa_log("Invalid properties");
1982 pa_source_new_data_done(&data);
1983 goto fail;
1986 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1987 pa_source_new_data_done(&data);
1989 if (!u->source) {
1990 pa_log("Failed to create source.");
1991 goto fail;
1994 u->source->parent.process_msg = source_process_msg;
1995 u->source->set_state = source_set_state;
1996 u->source->userdata = u;
1998 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2000 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2001 pa_source_set_rtpoll(u->source, u->rtpoll);
2003 u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2004 #endif
2006 pa_xfree(dn);
2008 u->time_event = NULL;
2010 u->maxlength = (uint32_t) -1;
2011 #ifdef TUNNEL_SINK
2012 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2013 #else
2014 u->fragsize = (uint32_t) -1;
2015 #endif
2017 if (!(u->thread = pa_thread_new(thread_func, u))) {
2018 pa_log("Failed to create thread.");
2019 goto fail;
2022 #ifdef TUNNEL_SINK
2023 pa_sink_put(u->sink);
2024 #else
2025 pa_source_put(u->source);
2026 #endif
2028 pa_modargs_free(ma);
2030 return 0;
2032 fail:
2033 pa__done(m);
2035 if (ma)
2036 pa_modargs_free(ma);
2038 pa_xfree(dn);
2040 return -1;
2043 void pa__done(pa_module*m) {
2044 struct userdata* u;
2046 pa_assert(m);
2048 if (!(u = m->userdata))
2049 return;
2051 #ifdef TUNNEL_SINK
2052 if (u->sink)
2053 pa_sink_unlink(u->sink);
2054 #else
2055 if (u->source)
2056 pa_source_unlink(u->source);
2057 #endif
2059 if (u->thread) {
2060 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2061 pa_thread_free(u->thread);
2064 pa_thread_mq_done(&u->thread_mq);
2066 #ifdef TUNNEL_SINK
2067 if (u->sink)
2068 pa_sink_unref(u->sink);
2069 #else
2070 if (u->source)
2071 pa_source_unref(u->source);
2072 #endif
2074 if (u->rtpoll)
2075 pa_rtpoll_free(u->rtpoll);
2077 if (u->pstream) {
2078 pa_pstream_unlink(u->pstream);
2079 pa_pstream_unref(u->pstream);
2082 if (u->pdispatch)
2083 pa_pdispatch_unref(u->pdispatch);
2085 if (u->client)
2086 pa_socket_client_unref(u->client);
2088 if (u->auth_cookie)
2089 pa_auth_cookie_unref(u->auth_cookie);
2091 if (u->smoother)
2092 pa_smoother_free(u->smoother);
2094 if (u->time_event)
2095 u->core->mainloop->time_free(u->time_event);
2097 #ifndef TUNNEL_SINK
2098 if (u->mcalign)
2099 pa_mcalign_free(u->mcalign);
2100 #endif
2102 #ifdef TUNNEL_SINK
2103 pa_xfree(u->sink_name);
2104 #else
2105 pa_xfree(u->source_name);
2106 #endif
2107 pa_xfree(u->server_name);
2109 pa_xfree(u->device_description);
2110 pa_xfree(u->server_fqdn);
2111 pa_xfree(u->user_name);
2113 pa_xfree(u);