1 /* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
16 #include <sys/socket.h>
22 #include "thread/thread.h"
24 #include "httpp/httpp.h"
27 #include "connection.h"
42 #define CATMODULE "source"
45 static int _compare_clients(void *compare_arg
, void *a
, void *b
);
46 static int _free_client(void *key
);
47 static int _parse_audio_info(source_t
*source
, char *s
);
49 source_t
*source_create(client_t
*client
, connection_t
*con
,
50 http_parser_t
*parser
, const char *mount
, format_type_t type
,
51 mount_proxy
*mountinfo
)
55 src
= (source_t
*)malloc(sizeof(source_t
));
57 src
->mount
= (char *)strdup(mount
);
58 src
->fallback_mount
= NULL
;
59 src
->format
= format_get_plugin(type
, src
->mount
, parser
);
62 src
->client_tree
= avl_tree_new(_compare_clients
, NULL
);
63 src
->pending_tree
= avl_tree_new(_compare_clients
, NULL
);
65 src
->num_yp_directories
= 0;
67 src
->max_listeners
= -1;
69 src
->dumpfilename
= NULL
;
71 src
->audio_info
= util_dict_new();
74 if(mountinfo
!= NULL
) {
75 if (mountinfo
->fallback_mount
!= NULL
)
76 src
->fallback_mount
= strdup (mountinfo
->fallback_mount
);
77 src
->max_listeners
= mountinfo
->max_listeners
;
78 if (mountinfo
->dumpfile
!= NULL
)
79 src
->dumpfilename
= strdup (mountinfo
->dumpfile
);
82 if(src
->dumpfilename
!= NULL
) {
83 src
->dumpfile
= fopen(src
->dumpfilename
, "ab");
84 if(src
->dumpfile
== NULL
) {
85 WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
86 src
->dumpfilename
, strerror(errno
));
93 static int source_remove_source(void *key
)
98 /* you must already have a read lock on the global source tree
99 ** to call this function
101 source_t
*source_find_mount(const char *mount
)
110 /* get the root node */
111 node
= global
.source_tree
->root
->right
;
114 source
= (source_t
*)node
->key
;
115 cmp
= strcmp(mount
, source
->mount
);
128 int source_compare_sources(void *arg
, void *a
, void *b
)
130 source_t
*srca
= (source_t
*)a
;
131 source_t
*srcb
= (source_t
*)b
;
133 return strcmp(srca
->mount
, srcb
->mount
);
136 int source_free_source(void *key
)
138 source_t
*source
= key
;
144 free(source
->fallback_mount
);
145 free(source
->dumpfilename
);
146 client_destroy(source
->client
);
147 avl_tree_free(source
->pending_tree
, _free_client
);
148 avl_tree_free(source
->client_tree
, _free_client
);
149 source
->format
->free_plugin(source
->format
);
151 for (i
=0; i
<source
->num_yp_directories
; i
++) {
152 yp_destroy_ypdata(source
->ypdata
[i
]);
155 util_dict_free(source
->audio_info
);
161 client_t
*source_find_client(source_t
*source
, int id
)
165 connection_t fakecon
;
167 fakeclient
.con
= &fakecon
;
168 fakeclient
.con
->id
= id
;
170 avl_tree_rlock(source
->client_tree
);
171 if(avl_get_by_key(source
->client_tree
, &fakeclient
, &result
) == 0)
173 avl_tree_unlock(source
->client_tree
);
177 avl_tree_unlock(source
->client_tree
);
182 void *source_main(void *arg
)
184 source_t
*source
= (source_t
*)arg
;
185 source_t
*fallback_source
;
190 avl_node
*client_node
;
192 refbuf_t
*refbuf
, *abuf
;
205 ice_config_t
*config
;
209 config
= config_get_config();
211 queue_limit
= config
->queue_size_limit
;
212 timeout
= config
->source_timeout
;
213 hostname
= config
->hostname
;
217 for (i
=0;i
<config
->num_yp_directories
;i
++) {
218 if (config
->yp_url
[i
]) {
219 source
->ypdata
[source
->num_yp_directories
] = yp_create_ypdata();
220 source
->ypdata
[source
->num_yp_directories
]->yp_url
=
222 source
->ypdata
[source
->num_yp_directories
]->yp_url_timeout
=
223 config
->yp_url_timeout
[i
];
224 source
->ypdata
[source
->num_yp_directories
]->yp_touch_interval
= 0;
225 source
->num_yp_directories
++;
230 config_release_config();
232 /* grab a read lock, to make sure we get a chance to cleanup */
233 thread_rwlock_rlock(source
->shutdown_rwlock
);
235 avl_tree_wlock(global
.source_tree
);
236 /* Now, we must do a final check with write lock taken out that the
237 * mountpoint is available..
239 if (source_find_mount(source
->mount
) != NULL
) {
240 avl_tree_unlock(global
.source_tree
);
241 if(source
->send_return
) {
242 client_send_404(source
->client
, "Mountpoint in use");
247 thread_rwlock_unlock(source
->shutdown_rwlock
);
251 /* insert source onto source tree */
252 avl_insert(global
.source_tree
, (void *)source
);
253 /* release write lock on global source tree */
254 avl_tree_unlock(global
.source_tree
);
256 /* If we connected successfully, we can send the message (if requested)
259 if(source
->send_return
) {
260 source
->client
->respcode
= 200;
261 bytes
= sock_write(source
->client
->con
->sock
,
262 "HTTP/1.0 200 OK\r\n\r\n");
263 if(bytes
> 0) source
->client
->con
->sent_bytes
= bytes
;
266 /* start off the statistics */
267 source
->listeners
= 0;
268 stats_event(source
->mount
, "listeners", "0");
269 stats_event(source
->mount
, "type", source
->format
->format_description
);
271 /* ice-* is icecast, icy-* is shoutcast */
272 if ((s
= httpp_getvar(source
->parser
, "ice-url"))) {
273 add_yp_info(source
, "server_url", s
, YP_SERVER_URL
);
275 if ((s
= httpp_getvar(source
->parser
, "ice-name"))) {
276 add_yp_info(source
, "server_name", s
, YP_SERVER_NAME
);
278 if ((s
= httpp_getvar(source
->parser
, "icy-name"))) {
279 add_yp_info(source
, "server_name", s
, YP_SERVER_NAME
);
281 if ((s
= httpp_getvar(source
->parser
, "ice-url"))) {
282 add_yp_info(source
, "server_url", s
, YP_SERVER_URL
);
284 if ((s
= httpp_getvar(source
->parser
, "icy-url"))) {
285 add_yp_info(source
, "server_url", s
, YP_SERVER_URL
);
287 if ((s
= httpp_getvar(source
->parser
, "ice-genre"))) {
288 add_yp_info(source
, "genre", s
, YP_SERVER_GENRE
);
290 if ((s
= httpp_getvar(source
->parser
, "icy-genre"))) {
291 add_yp_info(source
, "genre", s
, YP_SERVER_GENRE
);
293 if ((s
= httpp_getvar(source
->parser
, "ice-bitrate"))) {
294 add_yp_info(source
, "bitrate", s
, YP_BITRATE
);
296 if ((s
= httpp_getvar(source
->parser
, "icy-br"))) {
297 add_yp_info(source
, "bitrate", s
, YP_BITRATE
);
299 if ((s
= httpp_getvar(source
->parser
, "ice-description"))) {
300 add_yp_info(source
, "server_description", s
, YP_SERVER_DESC
);
302 if ((s
= httpp_getvar(source
->parser
, "ice-public"))) {
303 stats_event(source
->mount
, "public", s
);
304 source
->yp_public
= atoi(s
);
306 if ((s
= httpp_getvar(source
->parser
, "icy-pub"))) {
307 stats_event(source
->mount
, "public", s
);
308 source
->yp_public
= atoi(s
);
310 if ((s
= httpp_getvar(source
->parser
, "ice-audio-info"))) {
311 stats_event(source
->mount
, "audio_info", s
);
312 if (_parse_audio_info(source
, s
)) {
313 ai
= util_dict_urlencode(source
->audio_info
, '&');
314 add_yp_info(source
, "audio_info",
322 for (i
=0;i
<source
->num_yp_directories
;i
++) {
323 add_yp_info(source
, "server_type",
324 source
->format
->format_description
,
326 if (source
->ypdata
[i
]->listen_url
) {
327 free(source
->ypdata
[i
]->listen_url
);
329 /* 6 for max size of port */
330 listen_url_size
= strlen("http://") +
332 strlen(":") + 6 + strlen(source
->mount
) + 1;
333 source
->ypdata
[i
]->listen_url
= malloc(listen_url_size
);
334 sprintf(source
->ypdata
[i
]->listen_url
, "http://%s:%d%s",
335 hostname
, port
, source
->mount
);
338 if(source
->yp_public
) {
340 current_time
= time(NULL
);
342 for (i
=0;i
<source
->num_yp_directories
;i
++) {
343 /* Give the source 5 seconds to update the metadata
344 before we do our first touch */
345 /* Don't permit touch intervals of less than 30 seconds */
346 if (source
->ypdata
[i
]->yp_touch_interval
<= 30) {
347 source
->ypdata
[i
]->yp_touch_interval
= 30;
349 source
->ypdata
[i
]->yp_last_touch
= 0;
354 DEBUG0("Source creation complete");
356 while (global
.running
== ICE_RUNNING
&& source
->running
) {
357 ret
= source
->format
->get_buffer(source
->format
, NULL
, 0, &refbuf
);
359 WARN0("Bad data from source");
362 bytes
= 1; /* Set to > 0 so that the post-loop check won't be tripped */
363 while (refbuf
== NULL
) {
366 ret
= util_timed_wait_for_fd(source
->con
->sock
, timeout
*1000);
368 if (ret
< 0 && sock_recoverable (sock_error()))
370 if (ret
<= 0) { /* timeout expired */
371 WARN1("Disconnecting source: socket timeout (%d s) expired",
377 bytes
= sock_read_bytes(source
->con
->sock
, buffer
, 4096);
378 if (bytes
== 0 || (bytes
< 0 && !sock_recoverable(sock_error()))) {
379 DEBUG1("Disconnecting source due to socket read error: %s",
380 strerror(sock_error()));
384 if (bytes
<= 0) break;
385 source
->client
->con
->sent_bytes
+= bytes
;
386 ret
= source
->format
->get_buffer(source
->format
, buffer
, bytes
, &refbuf
);
388 WARN0("Bad data from source");
394 INFO0("Removing source following disconnection");
398 /* we have a refbuf buffer, which a data block to be sent to
399 ** all clients. if a client is not able to send the buffer
400 ** immediately, it should store it on its queue for the next
403 ** instead of sending the current block, a client should send
404 ** all data in the queue, plus the current block, until either
405 ** it runs out of data, or it hits a recoverable error like
406 ** EAGAIN. this will allow a client that got slightly lagged
407 ** to catch back up if it can
410 /* First, stream dumping, if enabled */
411 if(source
->dumpfile
) {
412 if(fwrite(refbuf
->data
, 1, refbuf
->len
, source
->dumpfile
) !=
415 WARN1("Write to dump file failed, disabling: %s",
417 fclose(source
->dumpfile
);
418 source
->dumpfile
= NULL
;
422 /* acquire read lock on client_tree */
423 avl_tree_rlock(source
->client_tree
);
425 client_node
= avl_get_first(source
->client_tree
);
426 while (client_node
) {
427 /* acquire read lock on node */
428 avl_node_wlock(client_node
);
430 client
= (client_t
*)client_node
->key
;
434 /* do we have any old buffers? */
435 abuf
= refbuf_queue_remove(&client
->queue
);
438 bytes
= abuf
->len
- client
->pos
;
442 sbytes
= source
->format
->write_buf_to_client(source
->format
,
443 client
, &abuf
->data
[client
->pos
], bytes
);
445 if(sbytes
!= bytes
) {
446 /* We didn't send the entire buffer. Leave it for
447 * the moment, handle it in the next iteration.
449 client
->pos
+= sbytes
;
450 refbuf_queue_insert(&client
->queue
, abuf
);
456 DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
457 client
->con
->error
= 1;
459 refbuf_release(abuf
);
463 /* we're done with that refbuf, release it and reset the pos */
464 refbuf_release(abuf
);
467 abuf
= refbuf_queue_remove(&client
->queue
);
470 /* now send or queue the new data */
472 refbuf_addref(refbuf
);
473 refbuf_queue_add(&client
->queue
, refbuf
);
475 sbytes
= source
->format
->write_buf_to_client(source
->format
,
476 client
, refbuf
->data
, refbuf
->len
);
478 if(sbytes
!= refbuf
->len
) {
479 /* Didn't send the entire buffer, queue it */
480 client
->pos
= sbytes
;
481 refbuf_addref(refbuf
);
482 refbuf_queue_insert(&client
->queue
, refbuf
);
486 DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
487 client
->con
->error
= 1;
491 /* if the client is too slow, its queue will slowly build up.
492 ** we need to make sure the client is keeping up with the
493 ** data, so we'll kick any client who's queue gets to large.
495 if (refbuf_queue_length(&client
->queue
) > queue_limit
) {
496 DEBUG0("Client has fallen too far behind, removing");
497 client
->con
->error
= 1;
500 /* release read lock on node */
501 avl_node_unlock(client_node
);
503 /* get the next node */
504 client_node
= avl_get_next(client_node
);
506 /* release read lock on client_tree */
507 avl_tree_unlock(source
->client_tree
);
509 refbuf_release(refbuf
);
511 /* acquire write lock on client_tree */
512 avl_tree_wlock(source
->client_tree
);
514 /** delete bad clients **/
515 client_node
= avl_get_first(source
->client_tree
);
516 while (client_node
) {
517 client
= (client_t
*)client_node
->key
;
518 if (client
->con
->error
) {
519 client_node
= avl_get_next(client_node
);
520 avl_delete(source
->client_tree
, (void *)client
, _free_client
);
522 stats_event_args(source
->mount
, "listeners", "%d", listeners
);
523 source
->listeners
= listeners
;
524 DEBUG0("Client removed");
527 client_node
= avl_get_next(client_node
);
530 /* acquire write lock on pending_tree */
531 avl_tree_wlock(source
->pending_tree
);
533 /** add pending clients **/
534 client_node
= avl_get_first(source
->pending_tree
);
535 while (client_node
) {
536 avl_insert(source
->client_tree
, client_node
->key
);
537 /* listener count may have changed */
538 listeners
= source
->listeners
;
540 DEBUG0("Client added");
541 stats_event_inc(NULL
, "clients");
542 stats_event_inc(source
->mount
, "connections");
543 stats_event_args(source
->mount
, "listeners", "%d", listeners
);
544 source
->listeners
= listeners
;
546 /* we have to send cached headers for some data formats
547 ** this is where we queue up the buffers to send
549 if (source
->format
->has_predata
) {
550 client
= (client_t
*)client_node
->key
;
551 client
->queue
= source
->format
->get_predata(source
->format
);
554 client_node
= avl_get_next(client_node
);
557 /** clear pending tree **/
558 while (avl_get_first(source
->pending_tree
)) {
559 avl_delete(source
->pending_tree
, avl_get_first(source
->pending_tree
)->key
, source_remove_client
);
562 /* release write lock on pending_tree */
563 avl_tree_unlock(source
->pending_tree
);
565 /* release write lock on client_tree */
566 avl_tree_unlock(source
->client_tree
);
571 INFO1("Source \"%s\" exiting", source
->mount
);
574 if(source
->yp_public
) {
579 avl_tree_rlock(global
.source_tree
);
580 fallback_source
= source_find_mount(source
->fallback_mount
);
581 avl_tree_unlock(global
.source_tree
);
583 /* Now, we must remove this source from the source tree before
584 * removing the clients, otherwise new clients can sneak into the pending
585 * tree after we've cleared it
587 avl_tree_wlock(global
.source_tree
);
588 avl_delete(global
.source_tree
, source
, source_remove_source
);
589 avl_tree_unlock(global
.source_tree
);
591 /* we need to empty the client and pending trees */
592 avl_tree_wlock(source
->pending_tree
);
593 while (avl_get_first(source
->pending_tree
)) {
594 client_t
*client
= (client_t
*)avl_get_first(
595 source
->pending_tree
)->key
;
596 if(fallback_source
) {
597 avl_delete(source
->pending_tree
, client
, source_remove_client
);
599 /* TODO: reset client local format data? */
600 avl_tree_wlock(fallback_source
->pending_tree
);
601 avl_insert(fallback_source
->pending_tree
, (void *)client
);
602 avl_tree_unlock(fallback_source
->pending_tree
);
605 avl_delete(source
->pending_tree
, client
, _free_client
);
608 avl_tree_unlock(source
->pending_tree
);
610 avl_tree_wlock(source
->client_tree
);
611 while (avl_get_first(source
->client_tree
)) {
612 client_t
*client
= (client_t
*)avl_get_first(source
->client_tree
)->key
;
614 if(fallback_source
) {
615 avl_delete(source
->client_tree
, client
, source_remove_client
);
617 /* TODO: reset client local format data? */
618 avl_tree_wlock(fallback_source
->pending_tree
);
619 avl_insert(fallback_source
->pending_tree
, (void *)client
);
620 avl_tree_unlock(fallback_source
->pending_tree
);
623 avl_delete(source
->client_tree
, client
, _free_client
);
626 avl_tree_unlock(source
->client_tree
);
628 /* delete this sources stats */
629 stats_event_dec(NULL
, "sources");
630 stats_event(source
->mount
, "listeners", NULL
);
637 fclose(source
->dumpfile
);
639 /* release our hold on the lock so the main thread can continue cleaning up */
640 thread_rwlock_unlock(source
->shutdown_rwlock
);
642 source_free_source(source
);
649 static int _compare_clients(void *compare_arg
, void *a
, void *b
)
651 client_t
*clienta
= (client_t
*)a
;
652 client_t
*clientb
= (client_t
*)b
;
654 connection_t
*cona
= clienta
->con
;
655 connection_t
*conb
= clientb
->con
;
657 if (cona
->id
< conb
->id
) return -1;
658 if (cona
->id
> conb
->id
) return 1;
663 int source_remove_client(void *key
)
668 static int _free_client(void *key
)
670 client_t
*client
= (client_t
*)key
;
675 stats_event_dec(NULL
, "clients");
677 client_destroy(client
);
682 static int _parse_audio_info(source_t
*source
, char *s
)
686 char *variable
= NULL
;
689 while ((token
= strtok(s
,";")) != NULL
) {
690 pvar
= strchr(token
, '=');
692 variable
= (char *)malloc(pvar
-token
+1);
693 strncpy(variable
, token
, pvar
-token
);
694 variable
[pvar
-token
] = 0;
697 value
= util_url_unescape(pvar
);
698 util_dict_set(source
->audio_info
, variable
, value
);
699 stats_event(source
->mount
, variable
, value
);