Limit rate adjustments to small, inaudible jumps
[pulseaudio-mirror.git] / src / modules / rtp / module-rtp-recv.c
blob3bbeb1fcbd82dbb72fb9b9375930b9ccb90ad13a
2 /***
3 This file is part of PulseAudio.
5 Copyright 2006 Lennart Poettering
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
59 #include "module-rtp-recv-symdef.h"
61 #include "rtp.h"
62 #include "sdp.h"
63 #include "sap.h"
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION);
68 PA_MODULE_LOAD_ONCE(FALSE);
69 PA_MODULE_USAGE(
70 "sink=<name of the sink> "
71 "sap_address=<multicast address to listen on> "
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
82 static const char* const valid_modargs[] = {
83 "sink",
84 "sap_address",
85 NULL
88 struct session {
89 struct userdata *userdata;
90 PA_LLIST_FIELDS(struct session);
92 pa_sink_input *sink_input;
93 pa_memblockq *memblockq;
95 pa_bool_t first_packet;
96 uint32_t ssrc;
97 uint32_t offset;
99 struct pa_sdp_info sdp_info;
101 pa_rtp_context rtp_context;
103 pa_rtpoll_item *rtpoll_item;
105 pa_atomic_t timestamp;
107 pa_smoother *smoother;
108 pa_usec_t intended_latency;
109 pa_usec_t sink_latency;
111 pa_usec_t last_rate_update;
114 struct userdata {
115 pa_module *module;
116 pa_core *core;
118 pa_sap_context sap_context;
119 pa_io_event* sap_event;
121 pa_time_event *check_death_event;
123 char *sink_name;
125 PA_LLIST_HEAD(struct session, sessions);
126 pa_hashmap *by_origin;
127 int n_sessions;
130 static void session_free(struct session *s);
132 /* Called from I/O thread context */
133 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
134 struct session *s = PA_SINK_INPUT(o)->userdata;
136 switch (code) {
137 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
138 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140 /* Fall through, the default handler will add in the extra
141 * latency added by the resampler */
142 break;
145 return pa_sink_input_process_msg(o, code, data, offset, chunk);
148 /* Called from I/O thread context */
149 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
150 struct session *s;
151 pa_sink_input_assert_ref(i);
152 pa_assert_se(s = i->userdata);
154 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
155 return -1;
157 pa_memblockq_drop(s->memblockq, chunk->length);
159 return 0;
162 /* Called from I/O thread context */
163 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
164 struct session *s;
166 pa_sink_input_assert_ref(i);
167 pa_assert_se(s = i->userdata);
169 pa_memblockq_rewind(s->memblockq, nbytes);
172 /* Called from I/O thread context */
173 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
174 struct session *s;
176 pa_sink_input_assert_ref(i);
177 pa_assert_se(s = i->userdata);
179 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
182 /* Called from main context */
183 static void sink_input_kill(pa_sink_input* i) {
184 struct session *s;
185 pa_sink_input_assert_ref(i);
186 pa_assert_se(s = i->userdata);
188 session_free(s);
191 /* Called from IO context */
192 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
193 struct session *s;
194 pa_sink_input_assert_ref(i);
195 pa_assert_se(s = i->userdata);
197 if (b) {
198 pa_smoother_pause(s->smoother, pa_rtclock_now());
199 pa_memblockq_flush_read(s->memblockq);
200 } else
201 s->first_packet = FALSE;
204 /* Called from I/O thread context */
205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206 pa_memchunk chunk;
207 int64_t k, j, delta;
208 struct timeval now = { 0, 0 };
209 struct session *s;
210 struct pollfd *p;
212 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
214 p = pa_rtpoll_item_get_pollfd(i, NULL);
216 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
217 pa_log("poll() signalled bad revents.");
218 return -1;
221 if ((p->revents & POLLIN) == 0)
222 return 0;
224 p->revents = 0;
226 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
227 return 0;
229 if (s->sdp_info.payload != s->rtp_context.payload ||
230 !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231 pa_memblock_unref(chunk.memblock);
232 return 0;
235 if (!s->first_packet) {
236 s->first_packet = TRUE;
238 s->ssrc = s->rtp_context.ssrc;
239 s->offset = s->rtp_context.timestamp;
241 if (s->ssrc == s->userdata->module->core->cookie)
242 pa_log_warn("Detected RTP packet loop!");
243 } else {
244 if (s->ssrc != s->rtp_context.ssrc) {
245 pa_memblock_unref(chunk.memblock);
246 return 0;
250 /* Check whether there was a timestamp overflow */
251 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
252 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
254 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
255 delta = k;
256 else
257 delta = j;
259 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
261 if (now.tv_sec == 0) {
262 PA_ONCE_BEGIN {
263 pa_log_warn("Using artificial time instead of timestamp");
264 } PA_ONCE_END;
265 pa_rtclock_get(&now);
266 } else
267 pa_rtclock_from_wallclock(&now);
269 pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
271 /* Tell the smoother that we are rolling now, in case it is still paused */
272 pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
274 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
275 pa_log_warn("Queue overrun");
276 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
279 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
281 pa_memblock_unref(chunk.memblock);
283 /* The next timestamp we expect */
284 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
286 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
288 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
289 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
290 unsigned fix_samples;
291 uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
292 uint32_t current_rate = s->sink_input->sample_spec.rate;
293 uint32_t new_rate;
295 pa_log_debug("Updating sample rate");
297 wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
298 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
300 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
302 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
303 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
305 if (ri > render_delay+sink_delay)
306 ri -= render_delay+sink_delay;
307 else
308 ri = 0;
310 if (wi < ri)
311 latency = 0;
312 else
313 latency = wi - ri;
315 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
317 /* Calculate deviation */
318 if (latency < s->intended_latency)
319 fix = s->intended_latency - latency;
320 else
321 fix = latency - s->intended_latency;
323 /* How many samples is this per second? */
324 fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
326 if (latency < s->intended_latency)
327 new_rate = current_rate - fix_samples;
328 else
329 new_rate = current_rate + fix_samples;
331 if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
332 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
333 new_rate = base_rate;
334 } else {
335 if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
336 new_rate = base_rate;
337 /* Do the adjustment in small steps; 2‰ can be considered inaudible */
338 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
339 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
340 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
343 s->sink_input->sample_spec.rate = new_rate;
345 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
347 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
349 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
351 s->last_rate_update = pa_timeval_load(&now);
354 if (pa_memblockq_is_readable(s->memblockq) &&
355 s->sink_input->thread_info.underrun_for > 0) {
356 pa_log_debug("Requesting rewind due to end of underrun");
357 pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
360 return 1;
363 /* Called from I/O thread context */
364 static void sink_input_attach(pa_sink_input *i) {
365 struct session *s;
366 struct pollfd *p;
368 pa_sink_input_assert_ref(i);
369 pa_assert_se(s = i->userdata);
371 pa_assert(!s->rtpoll_item);
372 s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
374 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
375 p->fd = s->rtp_context.fd;
376 p->events = POLLIN;
377 p->revents = 0;
379 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
380 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
383 /* Called from I/O thread context */
384 static void sink_input_detach(pa_sink_input *i) {
385 struct session *s;
386 pa_sink_input_assert_ref(i);
387 pa_assert_se(s = i->userdata);
389 pa_assert(s->rtpoll_item);
390 pa_rtpoll_item_free(s->rtpoll_item);
391 s->rtpoll_item = NULL;
394 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
395 int af, fd = -1, r, one;
397 pa_assert(sa);
398 pa_assert(salen > 0);
400 af = sa->sa_family;
401 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
402 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
403 goto fail;
406 pa_make_udp_socket_low_delay(fd);
408 one = 1;
409 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
410 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
411 goto fail;
414 one = 1;
415 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
416 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
417 goto fail;
420 if (af == AF_INET) {
421 struct ip_mreq mr4;
422 memset(&mr4, 0, sizeof(mr4));
423 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
424 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
425 #ifdef HAVE_IPV6
426 } else {
427 struct ipv6_mreq mr6;
428 memset(&mr6, 0, sizeof(mr6));
429 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
430 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
431 #endif
434 if (r < 0) {
435 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
436 goto fail;
439 if (bind(fd, sa, salen) < 0) {
440 pa_log("bind() failed: %s", pa_cstrerror(errno));
441 goto fail;
444 return fd;
446 fail:
447 if (fd >= 0)
448 close(fd);
450 return -1;
453 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
454 struct session *s = NULL;
455 pa_sink *sink;
456 int fd = -1;
457 pa_memchunk silence;
458 pa_sink_input_new_data data;
459 struct timeval now;
461 pa_assert(u);
462 pa_assert(sdp_info);
464 if (u->n_sessions >= MAX_SESSIONS) {
465 pa_log("Session limit reached.");
466 goto fail;
469 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
470 pa_log("Sink does not exist.");
471 goto fail;
474 pa_rtclock_get(&now);
476 s = pa_xnew0(struct session, 1);
477 s->userdata = u;
478 s->first_packet = FALSE;
479 s->sdp_info = *sdp_info;
480 s->rtpoll_item = NULL;
481 s->intended_latency = LATENCY_USEC;
482 s->smoother = pa_smoother_new(
483 PA_USEC_PER_SEC*5,
484 PA_USEC_PER_SEC*2,
485 TRUE,
486 TRUE,
488 pa_timeval_load(&now),
489 TRUE);
490 s->last_rate_update = pa_timeval_load(&now);
491 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
493 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
494 goto fail;
496 pa_sink_input_new_data_init(&data);
497 data.sink = sink;
498 data.driver = __FILE__;
499 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
500 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
501 "RTP Stream%s%s%s",
502 sdp_info->session_name ? " (" : "",
503 sdp_info->session_name ? sdp_info->session_name : "",
504 sdp_info->session_name ? ")" : "");
506 if (sdp_info->session_name)
507 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
508 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
509 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
510 data.module = u->module;
511 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
512 data.flags = PA_SINK_INPUT_VARIABLE_RATE;
514 pa_sink_input_new(&s->sink_input, u->module->core, &data);
515 pa_sink_input_new_data_done(&data);
517 if (!s->sink_input) {
518 pa_log("Failed to create sink input.");
519 goto fail;
522 s->sink_input->userdata = s;
524 s->sink_input->parent.process_msg = sink_input_process_msg;
525 s->sink_input->pop = sink_input_pop_cb;
526 s->sink_input->process_rewind = sink_input_process_rewind_cb;
527 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
528 s->sink_input->kill = sink_input_kill;
529 s->sink_input->attach = sink_input_attach;
530 s->sink_input->detach = sink_input_detach;
531 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
533 pa_sink_input_get_silence(s->sink_input, &silence);
535 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
537 if (s->intended_latency < s->sink_latency*2)
538 s->intended_latency = s->sink_latency*2;
540 s->memblockq = pa_memblockq_new(
542 MEMBLOCKQ_MAXLENGTH,
543 MEMBLOCKQ_MAXLENGTH,
544 pa_frame_size(&s->sink_input->sample_spec),
545 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
548 &silence);
550 pa_memblock_unref(silence.memblock);
552 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
554 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
555 u->n_sessions++;
556 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
558 pa_sink_input_put(s->sink_input);
560 pa_log_info("New session '%s'", s->sdp_info.session_name);
562 return s;
564 fail:
565 pa_xfree(s);
567 if (fd >= 0)
568 pa_close(fd);
570 return NULL;
573 static void session_free(struct session *s) {
574 pa_assert(s);
576 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
578 pa_sink_input_unlink(s->sink_input);
579 pa_sink_input_unref(s->sink_input);
581 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
582 pa_assert(s->userdata->n_sessions >= 1);
583 s->userdata->n_sessions--;
584 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
586 pa_memblockq_free(s->memblockq);
587 pa_sdp_info_destroy(&s->sdp_info);
588 pa_rtp_context_destroy(&s->rtp_context);
590 pa_smoother_free(s->smoother);
592 pa_xfree(s);
595 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
596 struct userdata *u = userdata;
597 pa_bool_t goodbye = FALSE;
598 pa_sdp_info info;
599 struct session *s;
601 pa_assert(m);
602 pa_assert(e);
603 pa_assert(u);
604 pa_assert(fd == u->sap_context.fd);
605 pa_assert(flags == PA_IO_EVENT_INPUT);
607 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
608 return;
610 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
611 return;
613 if (goodbye) {
615 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
616 session_free(s);
618 pa_sdp_info_destroy(&info);
619 } else {
621 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
622 if (!session_new(u, &info))
623 pa_sdp_info_destroy(&info);
625 } else {
626 struct timeval now;
627 pa_rtclock_get(&now);
628 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
630 pa_sdp_info_destroy(&info);
635 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
636 struct session *s, *n;
637 struct userdata *u = userdata;
638 struct timeval now;
640 pa_assert(m);
641 pa_assert(t);
642 pa_assert(u);
644 pa_rtclock_get(&now);
646 pa_log_debug("Checking for dead streams ...");
648 for (s = u->sessions; s; s = n) {
649 int k;
650 n = s->next;
652 k = pa_atomic_load(&s->timestamp);
654 if (k + DEATH_TIMEOUT < now.tv_sec)
655 session_free(s);
658 /* Restart timer */
659 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
662 int pa__init(pa_module*m) {
663 struct userdata *u;
664 pa_modargs *ma = NULL;
665 struct sockaddr_in sa4;
666 #ifdef HAVE_IPV6
667 struct sockaddr_in6 sa6;
668 #endif
669 struct sockaddr *sa;
670 socklen_t salen;
671 const char *sap_address;
672 int fd = -1;
674 pa_assert(m);
676 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
677 pa_log("failed to parse module arguments");
678 goto fail;
681 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
683 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
684 sa4.sin_family = AF_INET;
685 sa4.sin_port = htons(SAP_PORT);
686 sa = (struct sockaddr*) &sa4;
687 salen = sizeof(sa4);
688 #ifdef HAVE_IPV6
689 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
690 sa6.sin6_family = AF_INET6;
691 sa6.sin6_port = htons(SAP_PORT);
692 sa = (struct sockaddr*) &sa6;
693 salen = sizeof(sa6);
694 #endif
695 } else {
696 pa_log("Invalid SAP address '%s'", sap_address);
697 goto fail;
700 if ((fd = mcast_socket(sa, salen)) < 0)
701 goto fail;
703 m->userdata = u = pa_xnew(struct userdata, 1);
704 u->module = m;
705 u->core = m->core;
706 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
708 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
709 pa_sap_context_init_recv(&u->sap_context, fd);
711 PA_LLIST_HEAD_INIT(struct session, u->sessions);
712 u->n_sessions = 0;
713 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
715 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
717 pa_modargs_free(ma);
719 return 0;
721 fail:
722 if (ma)
723 pa_modargs_free(ma);
725 if (fd >= 0)
726 pa_close(fd);
728 return -1;
731 void pa__done(pa_module*m) {
732 struct userdata *u;
733 struct session *s;
735 pa_assert(m);
737 if (!(u = m->userdata))
738 return;
740 if (u->sap_event)
741 m->core->mainloop->io_free(u->sap_event);
743 if (u->check_death_event)
744 m->core->mainloop->time_free(u->check_death_event);
746 pa_sap_context_destroy(&u->sap_context);
748 if (u->by_origin) {
749 while ((s = pa_hashmap_first(u->by_origin)))
750 session_free(s);
752 pa_hashmap_free(u->by_origin, NULL, NULL);
755 pa_xfree(u->sink_name);
756 pa_xfree(u);