failure case, keep the source count in sync with the number of sources
[xiph/unicode.git] / icecast / src / source.c
blob56e856c47c48980230cb451015eed77bdc97d74d
1 /* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
2 #ifdef HAVE_CONFIG_H
3 #include <config.h>
4 #endif
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <sys/types.h>
10 #include <ogg/ogg.h>
11 #include <errno.h>
13 #ifndef _WIN32
14 #include <unistd.h>
15 #include <sys/time.h>
16 #include <sys/socket.h>
17 #else
18 #include <winsock2.h>
19 #include <windows.h>
20 #endif
22 #include "thread/thread.h"
23 #include "avl/avl.h"
24 #include "httpp/httpp.h"
25 #include "net/sock.h"
27 #include "connection.h"
28 #include "global.h"
29 #include "refbuf.h"
30 #include "client.h"
31 #include "stats.h"
32 #include "logging.h"
33 #include "cfgfile.h"
34 #include "util.h"
35 #ifdef USE_YP
36 #include "geturl.h"
37 #endif
38 #include "source.h"
39 #include "format.h"
41 #undef CATMODULE
42 #define CATMODULE "source"
44 /* avl tree helper */
45 static int _compare_clients(void *compare_arg, void *a, void *b);
46 static int _free_client(void *key);
47 static int _parse_audio_info(source_t *source, char *s);
49 source_t *source_create(client_t *client, connection_t *con,
50 http_parser_t *parser, const char *mount, format_type_t type,
51 mount_proxy *mountinfo)
53 source_t *src;
55 src = (source_t *)malloc(sizeof(source_t));
56 src->client = client;
57 src->mount = (char *)strdup(mount);
58 src->fallback_mount = NULL;
59 src->format = format_get_plugin(type, src->mount, parser);
60 src->con = con;
61 src->parser = parser;
62 src->client_tree = avl_tree_new(_compare_clients, NULL);
63 src->pending_tree = avl_tree_new(_compare_clients, NULL);
64 src->running = 1;
65 src->num_yp_directories = 0;
66 src->listeners = 0;
67 src->max_listeners = -1;
68 src->send_return = 0;
69 src->dumpfilename = NULL;
70 src->dumpfile = NULL;
71 src->audio_info = util_dict_new();
72 src->yp_public = 0;
74 if(mountinfo != NULL) {
75 if (mountinfo->fallback_mount != NULL)
76 src->fallback_mount = strdup (mountinfo->fallback_mount);
77 src->max_listeners = mountinfo->max_listeners;
78 if (mountinfo->dumpfile != NULL)
79 src->dumpfilename = strdup (mountinfo->dumpfile);
82 if(src->dumpfilename != NULL) {
83 src->dumpfile = fopen(src->dumpfilename, "ab");
84 if(src->dumpfile == NULL) {
85 WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
86 src->dumpfilename, strerror(errno));
90 return src;
93 static int source_remove_source(void *key)
95 return 1;
98 /* you must already have a read lock on the global source tree
99 ** to call this function
101 source_t *source_find_mount(const char *mount)
103 source_t *source;
104 avl_node *node;
105 int cmp;
107 if (!mount) {
108 return NULL;
110 /* get the root node */
111 node = global.source_tree->root->right;
113 while (node) {
114 source = (source_t *)node->key;
115 cmp = strcmp(mount, source->mount);
116 if (cmp < 0)
117 node = node->left;
118 else if (cmp > 0)
119 node = node->right;
120 else
121 return source;
124 /* didn't find it */
125 return NULL;
128 int source_compare_sources(void *arg, void *a, void *b)
130 source_t *srca = (source_t *)a;
131 source_t *srcb = (source_t *)b;
133 return strcmp(srca->mount, srcb->mount);
136 int source_free_source(void *key)
138 source_t *source = key;
139 #ifdef USE_YP
140 int i;
141 #endif
143 free(source->mount);
144 free(source->fallback_mount);
145 free(source->dumpfilename);
146 client_destroy(source->client);
147 avl_tree_free(source->pending_tree, _free_client);
148 avl_tree_free(source->client_tree, _free_client);
149 source->format->free_plugin(source->format);
150 #ifdef USE_YP
151 for (i=0; i<source->num_yp_directories; i++) {
152 yp_destroy_ypdata(source->ypdata[i]);
154 #endif
155 util_dict_free(source->audio_info);
156 free(source);
158 return 1;
161 client_t *source_find_client(source_t *source, int id)
163 client_t fakeclient;
164 void *result;
165 connection_t fakecon;
167 fakeclient.con = &fakecon;
168 fakeclient.con->id = id;
170 avl_tree_rlock(source->client_tree);
171 if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
173 avl_tree_unlock(source->client_tree);
174 return result;
177 avl_tree_unlock(source->client_tree);
178 return NULL;
182 void *source_main(void *arg)
184 source_t *source = (source_t *)arg;
185 source_t *fallback_source;
186 char buffer[4096];
187 long bytes, sbytes;
188 int ret, timeout;
189 client_t *client;
190 avl_node *client_node;
192 refbuf_t *refbuf, *abuf;
193 int data_done;
195 int listeners = 0;
196 #ifdef USE_YP
197 char *s;
198 long current_time;
199 int i;
200 char *ai;
201 int listen_url_size;
202 #endif
204 long queue_limit;
205 ice_config_t *config;
206 char *hostname;
207 int port;
209 config = config_get_config();
211 queue_limit = config->queue_size_limit;
212 timeout = config->source_timeout;
213 hostname = config->hostname;
214 port = config->port;
216 #ifdef USE_YP
217 for (i=0;i<config->num_yp_directories;i++) {
218 if (config->yp_url[i]) {
219 source->ypdata[source->num_yp_directories] = yp_create_ypdata();
220 source->ypdata[source->num_yp_directories]->yp_url =
221 config->yp_url[i];
222 source->ypdata[source->num_yp_directories]->yp_url_timeout =
223 config->yp_url_timeout[i];
224 source->ypdata[source->num_yp_directories]->yp_touch_interval = 0;
225 source->num_yp_directories++;
228 #endif
230 config_release_config();
232 /* grab a read lock, to make sure we get a chance to cleanup */
233 thread_rwlock_rlock(source->shutdown_rwlock);
235 avl_tree_wlock(global.source_tree);
236 /* Now, we must do a final check with write lock taken out that the
237 * mountpoint is available..
239 if (source_find_mount(source->mount) != NULL) {
240 avl_tree_unlock(global.source_tree);
241 if(source->send_return) {
242 client_send_404(source->client, "Mountpoint in use");
244 global_lock();
245 global.sources--;
246 global_unlock();
247 thread_rwlock_unlock(source->shutdown_rwlock);
248 thread_exit(0);
249 return NULL;
251 /* insert source onto source tree */
252 avl_insert(global.source_tree, (void *)source);
253 /* release write lock on global source tree */
254 avl_tree_unlock(global.source_tree);
256 /* If we connected successfully, we can send the message (if requested)
257 * back
259 if(source->send_return) {
260 source->client->respcode = 200;
261 bytes = sock_write(source->client->con->sock,
262 "HTTP/1.0 200 OK\r\n\r\n");
263 if(bytes > 0) source->client->con->sent_bytes = bytes;
266 /* start off the statistics */
267 source->listeners = 0;
268 stats_event(source->mount, "listeners", "0");
269 stats_event(source->mount, "type", source->format->format_description);
270 #ifdef USE_YP
271 /* ice-* is icecast, icy-* is shoutcast */
272 if ((s = httpp_getvar(source->parser, "ice-url"))) {
273 add_yp_info(source, "server_url", s, YP_SERVER_URL);
275 if ((s = httpp_getvar(source->parser, "ice-name"))) {
276 add_yp_info(source, "server_name", s, YP_SERVER_NAME);
278 if ((s = httpp_getvar(source->parser, "icy-name"))) {
279 add_yp_info(source, "server_name", s, YP_SERVER_NAME);
281 if ((s = httpp_getvar(source->parser, "ice-url"))) {
282 add_yp_info(source, "server_url", s, YP_SERVER_URL);
284 if ((s = httpp_getvar(source->parser, "icy-url"))) {
285 add_yp_info(source, "server_url", s, YP_SERVER_URL);
287 if ((s = httpp_getvar(source->parser, "ice-genre"))) {
288 add_yp_info(source, "genre", s, YP_SERVER_GENRE);
290 if ((s = httpp_getvar(source->parser, "icy-genre"))) {
291 add_yp_info(source, "genre", s, YP_SERVER_GENRE);
293 if ((s = httpp_getvar(source->parser, "ice-bitrate"))) {
294 add_yp_info(source, "bitrate", s, YP_BITRATE);
296 if ((s = httpp_getvar(source->parser, "icy-br"))) {
297 add_yp_info(source, "bitrate", s, YP_BITRATE);
299 if ((s = httpp_getvar(source->parser, "ice-description"))) {
300 add_yp_info(source, "server_description", s, YP_SERVER_DESC);
302 if ((s = httpp_getvar(source->parser, "ice-public"))) {
303 stats_event(source->mount, "public", s);
304 source->yp_public = atoi(s);
306 if ((s = httpp_getvar(source->parser, "icy-pub"))) {
307 stats_event(source->mount, "public", s);
308 source->yp_public = atoi(s);
310 if ((s = httpp_getvar(source->parser, "ice-audio-info"))) {
311 stats_event(source->mount, "audio_info", s);
312 if (_parse_audio_info(source, s)) {
313 ai = util_dict_urlencode(source->audio_info, '&');
314 add_yp_info(source, "audio_info",
316 YP_AUDIO_INFO);
317 if (ai) {
318 free(ai);
322 for (i=0;i<source->num_yp_directories;i++) {
323 add_yp_info(source, "server_type",
324 source->format->format_description,
325 YP_SERVER_TYPE);
326 if (source->ypdata[i]->listen_url) {
327 free(source->ypdata[i]->listen_url);
329 /* 6 for max size of port */
330 listen_url_size = strlen("http://") +
331 strlen(hostname) +
332 strlen(":") + 6 + strlen(source->mount) + 1;
333 source->ypdata[i]->listen_url = malloc(listen_url_size);
334 sprintf(source->ypdata[i]->listen_url, "http://%s:%d%s",
335 hostname, port, source->mount);
338 if(source->yp_public) {
340 current_time = time(NULL);
342 for (i=0;i<source->num_yp_directories;i++) {
343 /* Give the source 5 seconds to update the metadata
344 before we do our first touch */
345 /* Don't permit touch intervals of less than 30 seconds */
346 if (source->ypdata[i]->yp_touch_interval <= 30) {
347 source->ypdata[i]->yp_touch_interval = 30;
349 source->ypdata[i]->yp_last_touch = 0;
352 #endif
354 DEBUG0("Source creation complete");
356 while (global.running == ICE_RUNNING && source->running) {
357 ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
358 if(ret < 0) {
359 WARN0("Bad data from source");
360 break;
362 bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
363 while (refbuf == NULL) {
364 bytes = 0;
365 while (bytes <= 0) {
366 ret = util_timed_wait_for_fd(source->con->sock, timeout*1000);
368 if (ret < 0 && sock_recoverable (sock_error()))
369 continue;
370 if (ret <= 0) { /* timeout expired */
371 WARN1("Disconnecting source: socket timeout (%d s) expired",
372 timeout);
373 bytes = 0;
374 break;
377 bytes = sock_read_bytes(source->con->sock, buffer, 4096);
378 if (bytes == 0 || (bytes < 0 && !sock_recoverable(sock_error()))) {
379 DEBUG1("Disconnecting source due to socket read error: %s",
380 strerror(sock_error()));
381 break;
384 if (bytes <= 0) break;
385 source->client->con->sent_bytes += bytes;
386 ret = source->format->get_buffer(source->format, buffer, bytes, &refbuf);
387 if(ret < 0) {
388 WARN0("Bad data from source");
389 goto done;
393 if (bytes <= 0) {
394 INFO0("Removing source following disconnection");
395 break;
398 /* we have a refbuf buffer, which a data block to be sent to
399 ** all clients. if a client is not able to send the buffer
400 ** immediately, it should store it on its queue for the next
401 ** go around.
403 ** instead of sending the current block, a client should send
404 ** all data in the queue, plus the current block, until either
405 ** it runs out of data, or it hits a recoverable error like
406 ** EAGAIN. this will allow a client that got slightly lagged
407 ** to catch back up if it can
410 /* First, stream dumping, if enabled */
411 if(source->dumpfile) {
412 if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
413 refbuf->len)
415 WARN1("Write to dump file failed, disabling: %s",
416 strerror(errno));
417 fclose(source->dumpfile);
418 source->dumpfile = NULL;
422 /* acquire read lock on client_tree */
423 avl_tree_rlock(source->client_tree);
425 client_node = avl_get_first(source->client_tree);
426 while (client_node) {
427 /* acquire read lock on node */
428 avl_node_wlock(client_node);
430 client = (client_t *)client_node->key;
432 data_done = 0;
434 /* do we have any old buffers? */
435 abuf = refbuf_queue_remove(&client->queue);
436 while (abuf) {
437 if (client->pos > 0)
438 bytes = abuf->len - client->pos;
439 else
440 bytes = abuf->len;
442 sbytes = source->format->write_buf_to_client(source->format,
443 client, &abuf->data[client->pos], bytes);
444 if (sbytes >= 0) {
445 if(sbytes != bytes) {
446 /* We didn't send the entire buffer. Leave it for
447 * the moment, handle it in the next iteration.
449 client->pos += sbytes;
450 refbuf_queue_insert(&client->queue, abuf);
451 data_done = 1;
452 break;
455 else {
456 DEBUG0("Client has unrecoverable error catching up. Client has probably disconnected");
457 client->con->error = 1;
458 data_done = 1;
459 refbuf_release(abuf);
460 break;
463 /* we're done with that refbuf, release it and reset the pos */
464 refbuf_release(abuf);
465 client->pos = 0;
467 abuf = refbuf_queue_remove(&client->queue);
470 /* now send or queue the new data */
471 if (data_done) {
472 refbuf_addref(refbuf);
473 refbuf_queue_add(&client->queue, refbuf);
474 } else {
475 sbytes = source->format->write_buf_to_client(source->format,
476 client, refbuf->data, refbuf->len);
477 if (sbytes >= 0) {
478 if(sbytes != refbuf->len) {
479 /* Didn't send the entire buffer, queue it */
480 client->pos = sbytes;
481 refbuf_addref(refbuf);
482 refbuf_queue_insert(&client->queue, refbuf);
485 else {
486 DEBUG0("Client had unrecoverable error with new data, probably due to client disconnection");
487 client->con->error = 1;
491 /* if the client is too slow, its queue will slowly build up.
492 ** we need to make sure the client is keeping up with the
493 ** data, so we'll kick any client who's queue gets to large.
495 if (refbuf_queue_length(&client->queue) > queue_limit) {
496 DEBUG0("Client has fallen too far behind, removing");
497 client->con->error = 1;
500 /* release read lock on node */
501 avl_node_unlock(client_node);
503 /* get the next node */
504 client_node = avl_get_next(client_node);
506 /* release read lock on client_tree */
507 avl_tree_unlock(source->client_tree);
509 refbuf_release(refbuf);
511 /* acquire write lock on client_tree */
512 avl_tree_wlock(source->client_tree);
514 /** delete bad clients **/
515 client_node = avl_get_first(source->client_tree);
516 while (client_node) {
517 client = (client_t *)client_node->key;
518 if (client->con->error) {
519 client_node = avl_get_next(client_node);
520 avl_delete(source->client_tree, (void *)client, _free_client);
521 listeners--;
522 stats_event_args(source->mount, "listeners", "%d", listeners);
523 source->listeners = listeners;
524 DEBUG0("Client removed");
525 continue;
527 client_node = avl_get_next(client_node);
530 /* acquire write lock on pending_tree */
531 avl_tree_wlock(source->pending_tree);
533 /** add pending clients **/
534 client_node = avl_get_first(source->pending_tree);
535 while (client_node) {
536 avl_insert(source->client_tree, client_node->key);
537 /* listener count may have changed */
538 listeners = source->listeners;
539 listeners++;
540 DEBUG0("Client added");
541 stats_event_inc(NULL, "clients");
542 stats_event_inc(source->mount, "connections");
543 stats_event_args(source->mount, "listeners", "%d", listeners);
544 source->listeners = listeners;
546 /* we have to send cached headers for some data formats
547 ** this is where we queue up the buffers to send
549 if (source->format->has_predata) {
550 client = (client_t *)client_node->key;
551 client->queue = source->format->get_predata(source->format);
554 client_node = avl_get_next(client_node);
557 /** clear pending tree **/
558 while (avl_get_first(source->pending_tree)) {
559 avl_delete(source->pending_tree, avl_get_first(source->pending_tree)->key, source_remove_client);
562 /* release write lock on pending_tree */
563 avl_tree_unlock(source->pending_tree);
565 /* release write lock on client_tree */
566 avl_tree_unlock(source->client_tree);
569 done:
571 INFO1("Source \"%s\" exiting", source->mount);
573 #ifdef USE_YP
574 if(source->yp_public) {
575 yp_remove(source);
577 #endif
579 avl_tree_rlock(global.source_tree);
580 fallback_source = source_find_mount(source->fallback_mount);
581 avl_tree_unlock(global.source_tree);
583 /* Now, we must remove this source from the source tree before
584 * removing the clients, otherwise new clients can sneak into the pending
585 * tree after we've cleared it
587 avl_tree_wlock(global.source_tree);
588 avl_delete(global.source_tree, source, source_remove_source);
589 avl_tree_unlock(global.source_tree);
591 /* we need to empty the client and pending trees */
592 avl_tree_wlock(source->pending_tree);
593 while (avl_get_first(source->pending_tree)) {
594 client_t *client = (client_t *)avl_get_first(
595 source->pending_tree)->key;
596 if(fallback_source) {
597 avl_delete(source->pending_tree, client, source_remove_client);
599 /* TODO: reset client local format data? */
600 avl_tree_wlock(fallback_source->pending_tree);
601 avl_insert(fallback_source->pending_tree, (void *)client);
602 avl_tree_unlock(fallback_source->pending_tree);
604 else {
605 avl_delete(source->pending_tree, client, _free_client);
608 avl_tree_unlock(source->pending_tree);
610 avl_tree_wlock(source->client_tree);
611 while (avl_get_first(source->client_tree)) {
612 client_t *client = (client_t *)avl_get_first(source->client_tree)->key;
614 if(fallback_source) {
615 avl_delete(source->client_tree, client, source_remove_client);
617 /* TODO: reset client local format data? */
618 avl_tree_wlock(fallback_source->pending_tree);
619 avl_insert(fallback_source->pending_tree, (void *)client);
620 avl_tree_unlock(fallback_source->pending_tree);
622 else {
623 avl_delete(source->client_tree, client, _free_client);
626 avl_tree_unlock(source->client_tree);
628 /* delete this sources stats */
629 stats_event_dec(NULL, "sources");
630 stats_event(source->mount, "listeners", NULL);
632 global_lock();
633 global.sources--;
634 global_unlock();
636 if(source->dumpfile)
637 fclose(source->dumpfile);
639 /* release our hold on the lock so the main thread can continue cleaning up */
640 thread_rwlock_unlock(source->shutdown_rwlock);
642 source_free_source(source);
644 thread_exit(0);
646 return NULL;
649 static int _compare_clients(void *compare_arg, void *a, void *b)
651 client_t *clienta = (client_t *)a;
652 client_t *clientb = (client_t *)b;
654 connection_t *cona = clienta->con;
655 connection_t *conb = clientb->con;
657 if (cona->id < conb->id) return -1;
658 if (cona->id > conb->id) return 1;
660 return 0;
663 int source_remove_client(void *key)
665 return 1;
668 static int _free_client(void *key)
670 client_t *client = (client_t *)key;
672 global_lock();
673 global.clients--;
674 global_unlock();
675 stats_event_dec(NULL, "clients");
677 client_destroy(client);
679 return 1;
682 static int _parse_audio_info(source_t *source, char *s)
684 char *token = NULL;
685 char *pvar = NULL;
686 char *variable = NULL;
687 char *value = NULL;
689 while ((token = strtok(s,";")) != NULL) {
690 pvar = strchr(token, '=');
691 if (pvar) {
692 variable = (char *)malloc(pvar-token+1);
693 strncpy(variable, token, pvar-token);
694 variable[pvar-token] = 0;
695 pvar++;
696 if (strlen(pvar)) {
697 value = util_url_unescape(pvar);
698 util_dict_set(source->audio_info, variable, value);
699 stats_event(source->mount, variable, value);
700 if (value) {
701 free(value);
704 if (variable) {
705 free(variable);
708 s = NULL;
710 return 1;