Don't redefine MAXHOSTNAMELEN
[gnash.git] / cygnal / cygnal.cpp
blob320e278ecdeff952a52a2e2f2e6a89467ea9d4a1
1 // cygnal.cpp: GNU streaming Flash media server, for Gnash.
2 //
3 // Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
4 // Free Software Foundation, Inc.
5 //
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 #ifdef HAVE_CONFIG_H
22 #include "gnashconfig.h"
23 #endif
25 #include <sys/stat.h>
26 #include <list>
27 #include <map>
28 #include <iostream>
29 #include <sstream>
30 #include <csignal>
31 #include <vector>
32 #include <sys/mman.h>
33 #include <cerrno>
34 #include <sys/types.h>
35 #include <sys/stat.h>
36 #include <fcntl.h>
38 #include "GnashSleep.h"
39 #include "revno.h"
41 //#include "cvm.h"
43 extern "C"{
44 # include "GnashSystemIOHeaders.h"
45 #ifdef HAVE_GETOPT_H
46 # include <getopt.h>
47 #endif
48 #ifndef __GNUC__
49 extern int optind, getopt(int, char *const *, const char *);
50 extern char *optarg;
51 #endif
54 #include <boost/shared_ptr.hpp>
56 // classes internal to Gnash
57 #include "network.h"
58 #include "log.h"
59 #include "crc.h"
60 #include "proc.h"
61 #include "rtmp.h"
62 #include "buffer.h"
63 #include "utility.h"
64 #include "limits.h"
65 #include "netstats.h"
66 #include "statistics.h"
67 //#include "stream.h"
68 #include "gmemory.h"
69 #include "diskstream.h"
70 #include "arg_parser.h"
71 #include "GnashException.h"
72 #include "GnashSleep.h" // for usleep comptibility.
73 #include "URL.h"
74 #include "rtmp_client.h"
76 // classes internal to Cygnal
77 #include "rtmp_server.h"
78 #include "http_server.h"
80 #include "handler.h"
81 #include "cache.h"
82 #include "cygnal.h"
84 #ifdef ENABLE_NLS
85 # include <locale>
86 #endif
88 #include <boost/date_time/gregorian/gregorian.hpp>
89 //#include <boost/date_time/local_time/local_time.hpp>
90 #include <boost/date_time/time_zone_base.hpp>
91 #include <boost/date_time/posix_time/posix_time.hpp>
92 #include <boost/thread/thread.hpp>
93 #include <boost/bind.hpp>
94 #include <boost/thread/mutex.hpp>
95 #include <boost/thread/condition.hpp>
96 #include <boost/thread/tss.hpp>
98 #ifndef POLLRDHUP
99 #define POLLRDHUP 0
100 #endif
102 //using gnash::log_network;
103 using namespace std;
104 using namespace gnash;
105 using namespace cygnal;
107 static void usage();
108 static void version_and_copyright();
109 static void cntrlc_handler(int sig);
110 static void hup_handler(int sig);
112 void connection_handler(Network::thread_params_t *args);
113 void event_handler(Network::thread_params_t *args);
114 void admin_handler(Network::thread_params_t *args);
116 // Toggles very verbose debugging info from the network Network class
117 static bool netdebug = false;
119 struct sigaction act1, act2;
121 // The next few global variables have to be global because Boost
122 // threads don't take arguments. Since these are set in main() before
123 // any of the threads are started, and it's value should never change,
124 // it's safe to use these without a mutex, as all threads share the
125 // same read-only value.
127 // This is the default path to look in for files to be streamed.
128 static string docroot;
130 // This is the number of times a thread loop continues, for debugging only
131 int thread_retries = 10;
133 // This is added to the default ports for testing so it doesn't
134 // conflict with apache on the same machine.
135 static int port_offset = 0;
137 // Toggle the admin thread
138 static bool admin = false;
140 // Admin commands are small
141 const int ADMINPKTSIZE = 80;
143 // If set to a non zero value, this limits Cygnal to only one protocol
144 // at a time. This is for debugging only.
145 static int only_port = 0;
147 // These keep track of the number of active threads.
148 ThreadCounter tids;
150 map<int, Network *> networks;
152 // This is the global object for Cygnl
153 // The debug log used by all the gnash libraries.
154 static Cygnal& cyg = Cygnal::getDefaultInstance();
156 // The debug log used by all the gnash libraries.
157 static LogFile& dbglogfile = LogFile::getDefaultInstance();
159 // The user config for Cygnal is loaded and parsed here:
160 static CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
162 // Cache support for responses and files.
163 static Cache& cache = Cache::getDefaultInstance();
165 // The list of active cgis being executed.
166 //static std::map<std::string, Proc> procs; // = proc::getDefaultInstance();
168 // This mutex is used to signify when all the threads are done.
169 static boost::condition alldone;
170 static boost::mutex alldone_mutex;
172 static boost::condition noclients;
173 static boost::mutex noclients_mutex;
175 const char *proto_str[] = {
176 "NONE",
177 "HTTP",
178 "HTTPS",
179 "RTMP",
180 "RTMPT",
181 "RTMPTS",
182 "RTMPE",
183 "RTMPS",
184 "DTN"
187 static void
188 usage()
190 cout << _("cygnal -- a streaming media server.") << endl
191 << endl
192 << _("Usage: cygnal [options...]") << endl
193 << _(" -h, --help Print this help and exit") << endl
194 << _(" -V, --version Print version information and exit") << endl
195 << _(" -v, --verbose Output verbose debug info") << endl
196 << _(" -s, --singlethread Disable Multi Threading") << endl
197 << _(" -n, --netdebug Turn on net debugging messages") << endl
198 << _(" -o --only-port Only use port for debugging") << endl
199 << _(" -p --port-offset Port offset for debugging") << endl
200 << _(" -t, --testing Turn on special Gnash testing support") << endl
201 << _(" -a, --admin Enable the administration thread") << endl
202 << _(" -r, --root Document root for all files") << endl
203 << _(" -m, --machine Hostname for this machine") << endl
204 << endl;
208 Cygnal&
209 Cygnal::getDefaultInstance()
211 // GNASH_REPORT_FUNCTION;
212 static Cygnal o;
213 return o;
217 Cygnal::~Cygnal()
219 // GNASH_REPORT_FUNCTION;
222 bool
223 Cygnal::loadPeersFile()
225 // GNASH_REPORT_FUNCTION;
227 loadPeersFile("./peers.conf");
229 loadPeersFile("/etc/peers.conf");
231 // Check the users home directory
232 #ifndef __amigaos4__
233 char *home = std::getenv("HOME");
234 #else
235 //on AmigaOS we have a GNASH: assign that point to program dir
236 char *home = "/gnash";
237 #endif
239 string homefile = home;
240 homefile += "/peers.conf";
242 return loadPeersFile(homefile);
245 bool
246 Cygnal::loadPeersFile(const std::string &filespec)
248 // GNASH_REPORT_FUNCTION;
250 struct stat stats;
251 std::ifstream in;
252 std::string line;
253 string host;
254 string portstr;
255 string cgi;
256 vector<string> supported;
258 // Make sufre the file exists
259 if (stat(filespec.c_str(), &stats) != 0) {
260 return false;
263 in.open(filespec.c_str());
265 if (!in) {
266 log_error(_(": couldn't open file: "), filespec);
267 return false;
270 // Read in each line and parse it
271 size_t lineno = 0;
272 while (std::getline(in, line)) {
274 ++lineno;
276 // Ignore comment and empty lines
277 if (line.empty() || line[0] == '#') {
278 continue;
281 std::istringstream ss(line);
283 // Get the first token
284 if (! (ss >> host)) {
285 // Empty line
286 continue;
289 // 'action' should never be empty, or (ss >> action)
290 // above would have failed
292 if (host[0] == '#') {
293 continue; // discard comments
296 // Get second token
297 if (!(ss >> portstr)) {
298 // Do we need to warn here as well?
299 continue;
302 while (ss >> cgi) {
303 supported.push_back(cgi);
304 continue;
307 // Create a new peer item
308 boost::shared_ptr<peer_t> peer(new Cygnal::peer_t);
309 peer->hostname = host;
310 peer->port = strtol(portstr.c_str(), NULL, 0) & 0xffff;
312 _peers.push_back(peer);
315 return true;
318 void
319 Cygnal::probePeers()
321 // GNASH_REPORT_FUNCTION;
323 probePeers(_peers);
326 void
327 Cygnal::probePeers(peer_t &peer)
329 // GNASH_REPORT_FUNCTION;
330 RTMPClient net;
331 stringstream uri;
333 uri << peer.hostname;
335 vector<string>::iterator it;
336 for (it = peer.supported.begin(); it <= peer.supported.end(); ++it) {
337 string tmp = uri.str();
338 // tmp += (*it);
339 // log_network("Constructed: %s/%s", uri.str(), *it);
341 gnash::URL url(uri.str());
342 if (!(peer.fd = net.connectToServer(uri.str()))) {
343 log_network(_("Couldn't connect to %s"), uri.str());
344 peer.connected = false;
345 } else {
346 peer.connected = true;
347 // peer.fd = net.getFileFd();
352 void
353 Cygnal::probePeers(std::vector<boost::shared_ptr<peer_t> > &peers)
355 // GNASH_REPORT_FUNCTION;
357 // createClient();
358 std::vector<boost::shared_ptr<Cygnal::peer_t> >::iterator it;
359 for (it = peers.begin(); it != peers.end(); ++it) {
360 boost::shared_ptr<Cygnal::peer_t> peer = *it;
361 probePeers(*peer);
362 if (peer->connected) {
363 log_network(_("%s is active on fd #%d."), peer->hostname,
364 peer->fd);
365 _active_peers.push_back(*it);
370 void
371 Cygnal::removeHandler(const std::string &path)
373 // GNASH_REPORT_FUNCTION;
374 map<std::string, boost::shared_ptr<Handler> >::iterator it;
375 it = _handlers.find(path);
376 if (it != _handlers.end()) {
377 boost::mutex::scoped_lock lock(_mutex);
378 _handlers.erase(it);
382 boost::shared_ptr<Handler>
383 Cygnal::findHandler(const std::string &path)
385 // GNASH_REPORT_FUNCTION;
386 map<std::string, boost::shared_ptr<Handler> >::iterator it;
387 boost::shared_ptr<Handler> hand;
388 it = _handlers.find(path);
389 if (it != _handlers.end()) {
390 hand = (*it).second;
393 return hand;
396 void
397 Cygnal::dump()
399 std::vector<boost::shared_ptr<Cygnal::peer_t> >::iterator it;
400 for (it = _peers.begin(); it != _peers.end(); ++it) {
401 cerr << "Remote Peer: " << (*it)->hostname
402 << ":" << (*it)->port << endl;
407 main(int argc, char *argv[])
409 // Initialize national language support
410 #ifdef ENABLE_NLS
411 setlocale (LC_ALL, "");
412 bindtextdomain (PACKAGE, LOCALEDIR);
413 textdomain (PACKAGE);
414 #endif
416 // This becomes the default hostname, which becomes
417 // 127.0.0.1 or ::1 for the localhost. The --machine
418 // otion can change this.
419 std::string hostname = "localhost.localdomain";
421 const Arg_parser::Option opts[] =
423 { 'h', "help", Arg_parser::no },
424 { 'V', "version", Arg_parser::no },
425 { 'p', "port-offset", Arg_parser::yes },
426 { 'v', "verbose", Arg_parser::no },
427 { 'd', "dump", Arg_parser::no },
428 { 'n', "netdebug", Arg_parser::no },
429 { 't', "testing", Arg_parser::no },
430 { 'a', "admin", Arg_parser::no },
431 { 'r', "root", Arg_parser::yes },
432 { 'o', "only-port", Arg_parser::yes },
433 { 's', "singlethreaded", Arg_parser::no },
434 { 'm', "machine", Arg_parser::yes }
437 Arg_parser parser(argc, argv, opts);
438 if( ! parser.error().empty() ) {
439 cout << parser.error() << endl;
440 exit(EXIT_FAILURE);
443 // crcfile.loadFiles();
445 // Set the log file name before trying to write to
446 // it, or we might get two.
447 dbglogfile.setLogFilename(crcfile.getDebugLog());
449 if (crcfile.verbosityLevel() > 0) {
450 dbglogfile.setVerbosity(crcfile.verbosityLevel());
453 if (crcfile.getDocumentRoot().size() > 0) {
454 docroot = crcfile.getDocumentRoot();
455 } else {
456 docroot = "/var/www/html/software/tests/";
457 crcfile.setDocumentRoot(docroot);
459 if (crcfile.getPortOffset()) {
460 port_offset = crcfile.getPortOffset();
463 // Handle command line arguments
464 for( int i = 0; i < parser.arguments(); ++i ) {
465 const int code = parser.code(i);
466 switch( code ) {
467 case 'h':
468 version_and_copyright();
469 usage();
470 exit(EXIT_SUCCESS);
471 case 'V':
472 version_and_copyright();
473 exit(EXIT_SUCCESS);
474 case 't':
475 crcfile.setTestingFlag(true);
476 break;
477 case 'a':
478 admin = true;
479 break;
480 case 'v':
481 dbglogfile.setVerbosity();
482 LOG_ONCE(log_network(_("Verbose output turned on")))
483 break;
484 case 'p':
485 port_offset = parser.argument<int>(i);
486 crcfile.setPortOffset(port_offset);
487 break;
488 case 'r':
489 docroot = parser.argument(i);
490 break;
491 case 's':
492 crcfile.setThreadingFlag(false);
493 break;
494 case 'n':
495 netdebug = true;
496 dbglogfile.setNetwork(true);
497 break;
498 case 'o':
499 only_port = parser.argument<int>(i);
500 break;
501 case 'd':
502 crcfile.dump();
503 exit(EXIT_SUCCESS);
504 break;
505 case 'm':
506 hostname = parser.argument(i);
507 break;
508 default:
509 log_error(_("Extraneous argument: %s"), parser.argument(i).c_str());
513 log_network(_("Document Root for media files is: %s"), docroot);
514 crcfile.setDocumentRoot(docroot);
516 // load the file of peers. A peer is another instance of Cygnal we
517 // can use for distributed processing.
518 cyg.loadPeersFile();
519 cyg.probePeers();
521 // cyg.dump();
523 // Trap ^C (SIGINT) so we can kill all the threads
524 act1.sa_handler = cntrlc_handler;
525 sigaction (SIGINT, &act1, NULL);
526 act2.sa_handler = hup_handler;
527 sigaction (SIGHUP, &act2, NULL);
528 // sigaction (SIGPIPE, &act, NULL);
530 // Lock a mutex the main() waits in before exiting. This is
531 // because all the actually processing is done by other threads.
532 boost::mutex::scoped_lock lk(alldone_mutex);
534 // Start the Admin handler. This allows one to connect to Cygnal
535 // at port 1111 and dump statistics to the terminal for tuning
536 // purposes.
537 if (admin) {
538 Network::thread_params_t admin_data;
539 admin_data.port = gnash::ADMIN_PORT;
540 boost::thread admin_thread(boost::bind(&admin_handler, &admin_data));
543 // Cvm cvm;
544 // cvm.loadMovie("/tmp/out.swf");
546 // If a only-port is specified, we only want to run single
547 // threaded. As all the rest of the code checks the config value
548 // setting, this overrides that in the memory, but doesn't change
549 // the file itself. This feature is really only for debugging,
550 // where it's easier to work with one protocol at a time.
551 if (only_port) {
552 crcfile.setThreadingFlag(false);
555 // Incomming connection handler for port 80, HTTP and
556 // RTMPT. As port 80 requires root access, cygnal supports a
557 // "port offset" for debugging and development of the
558 // server. Since this port offset changes the constant to test
559 // for which protocol, we pass the info to the start thread so
560 // it knows which handler to invoke.
561 Network::thread_params_t *http_data = new Network::thread_params_t;
562 if ((only_port == 0) || (only_port == gnash::HTTP_PORT)) {
563 http_data->tid = 0;
564 http_data->netfd = 0;
565 http_data->filespec = docroot;
566 http_data->protocol = Network::HTTP;
567 http_data->port = port_offset + gnash::HTTP_PORT;
568 http_data->hostname = hostname;
569 if (crcfile.getThreadingFlag()) {
570 boost::thread http_thread(boost::bind(&connection_handler, http_data));
571 } else {
572 connection_handler(http_data);
576 // Incomming connection handler for port 1935, RTMPT and
577 // RTMPTE. This supports the same port offset as the HTTP handler,
578 // just to keep things consistent.
579 Network::thread_params_t *rtmp_data = new Network::thread_params_t;
580 if ((only_port == 0) || (only_port == gnash::RTMP_PORT)) {
581 rtmp_data->tid = 0;
582 rtmp_data->netfd = 0;
583 rtmp_data->filespec = docroot;
584 rtmp_data->protocol = Network::RTMP;
585 rtmp_data->port = port_offset + gnash::RTMP_PORT;
586 rtmp_data->hostname = hostname;
587 if (crcfile.getThreadingFlag()) {
588 boost::thread rtmp_thread(boost::bind(&connection_handler, rtmp_data));
589 } else {
590 connection_handler(rtmp_data);
594 // Wait for all the threads to die.
595 alldone.wait(lk);
597 log_network(_("Cygnal done..."));
599 // Delete the data we allowcated to pass to each connection_handler.
600 delete rtmp_data;
601 delete http_data;
603 return(0);
606 // Trap Control-C (SIGINT) so we can cleanly exit
607 static void
608 cntrlc_handler (int sig)
610 log_network(_("Got a %d interrupt"), sig);
611 // sigaction (SIGINT, &act, NULL);
612 exit(EXIT_FAILURE);
615 // Trap SIGHUP so we can
616 static void
617 hup_handler (int /* sig */)
619 if (crcfile.getTestingFlag()) {
620 cerr << "Testing, Testing, Testing..." << endl;
625 static void
626 version_and_copyright()
628 cout << "Cygnal: " << BRANCH_NICK << "_" << BRANCH_REVNO << endl
629 << endl
630 << _("Copyright (C) 2007, 2008, 2009, 2010 Free Software Foundation, Inc.\n"
631 "Cygnal comes with NO WARRANTY, to the extent permitted by law.\n"
632 "You may redistribute copies of Cygnal under the terms of the GNU General\n"
633 "Public License V3 or later. For more information, see the file named COPYING.\n")
634 << endl;
637 // FIXME: this function could be tweaked for better performance
638 void
639 admin_handler(Network::thread_params_t *args)
641 GNASH_REPORT_FUNCTION;
642 int retries = 100;
643 int ret;
645 map<int, Handler *>::iterator hit;
646 stringstream response;
648 Network net;
649 Handler::admin_cmd_e cmd = Handler::POLL;
650 net.createServer(args->hostname, args->port);
651 while (retries > 0) {
652 log_network(_("Starting Admin Handler for port %d"), args->port);
654 if (net.newConnection(true) <= 0) {
655 return;
658 log_network(_("Got an incoming Admin request"));
659 sleep(1);
660 do {
661 Network::byte_t data[ADMINPKTSIZE+1];
662 memset(data, 0, ADMINPKTSIZE+1);
663 const char *ptr = reinterpret_cast<const char *>(data);
664 ret = net.readNet(data, ADMINPKTSIZE, 100);
665 if (ret < 0) {
666 log_network(_("no more admin data, exiting...\n"));
667 if ((ret == 0) && cmd != Handler::POLL) {
668 break;
670 } else {
671 // force the case to make comparisons easier. Only compare enough characters to
672 // till each command is unique.
673 std::transform(ptr, ptr + ret, data, (int(*)(int)) toupper);
674 if (strncmp(ptr, "QUIT", 4) == 0) {
675 cmd = Handler::QUIT;
676 } else if (strncmp(ptr, "STATUS", 5) == 0) {
677 cmd = Handler::STATUS;
678 } else if (strncmp(ptr, "HELP", 2) == 0) {
679 cmd = Handler::HELP;
680 net.writeNet("commands: help, status, poll, interval, statistics, quit.\n");
681 } else if (strncmp(ptr, "POLL", 2) == 0) {
682 cmd = Handler::POLL;
683 } else if (strncmp(ptr, "INTERVAL", 2) == 0) {
684 cmd = Handler::INTERVAL;
687 switch (cmd) {
688 // close this connection
689 case Handler::QUIT:
690 ret = -1;
691 break;
692 case Handler::STATUS:
694 #ifdef USE_STATS_CACHE
695 // cache.dump();
696 string results = cache.stats(false);
697 if (results.size()) {
698 net.writeNet(results);
699 results.clear();
701 #endif
702 #if 0
703 response << handlers.size() << " handlers are currently active.";
704 for (hit = handlers.begin(); hit != handlers.end(); hit++) {
705 int fd = hit->first;
706 Handler *hand = hit->second;
707 response << fd << ","
708 << hand->insize()
709 << "," << hand->outsize()
710 << "\r\n";
711 net.writeNet(response);
713 #endif
715 break;
716 case Handler::POLL:
717 #ifdef USE_STATS_QUEUE
718 response << handlers.size() << " handlers are currently active." << "\r\n";
719 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
720 int fd = hit->first;
721 Handler *hand = hit->second;
722 struct timespec now;
723 clock_gettime (CLOCK_REALTIME, &now);
724 // Incoming que stats
725 CQue::que_stats_t *stats = hand->statsin();
726 float diff = static_cast<float>(((now.tv_sec -
727 stats->start.tv_sec) + ((now.tv_nsec -
728 stats->start.tv_nsec)/1e9)));
729 response << fd
730 << "," << stats->totalbytes
731 << "," << diff
732 << "," << stats->totalin
733 << "," << stats->totalout;
734 // Outgoing que stats
735 stats = hand->statsout();
736 response << "," <<stats->totalbytes
737 << "," << stats->totalin
738 << "," << stats->totalout
739 << "\r\n";
740 net.writeNet(response.str());
742 #endif
743 break;
744 case Handler::INTERVAL:
745 net.writeNet("set interval\n");
746 break;
747 default:
748 break;
750 } while (ret > 0);
751 log_network(_("admin_handler: Done...!\n"));
752 net.closeNet(); // this shuts down this socket connection
754 net.closeConnection(); // this shuts down the server on this connection
756 // All threads should exit now.
757 alldone.notify_all();
760 // A connection handler is started for each port the server needs to
761 // wait on for incoming connections. When it gets an incoming
762 // connection, it reads the first packet to get the resource name, and
763 // then starts the event handler thread if it's a newly requested
764 // resource, otherwise it loads a copy of the cached resource.
765 void
766 connection_handler(Network::thread_params_t *args)
768 // GNASH_REPORT_FUNCTION;
769 int fd = 0;
770 Network net;
771 bool done = false;
772 static int tid = 0;
774 if (netdebug) {
775 net.toggleDebug(true);
777 // Start a server on this tcp/ip port.
778 fd = net.createServer(args->hostname, args->port);
779 if (fd <= 0) {
780 log_error(_("Can't start %s Connection Handler for fd #%d, port %hd"),
781 proto_str[args->protocol], fd, args->port);
782 return;
783 } else {
784 log_network(_("Starting %s Connection Handler for fd #%d, port %hd"),
785 proto_str[args->protocol], fd, args->port);
788 // Get the number of cpus in this system. For multicore
789 // systems we'll get better load balancing if we keep all the
790 // cpus busy. So a pool of threads is started for each cpu,
791 // the default being just one. Each thread is reponsible for
792 // handling part of the total active file descriptors.
793 #ifdef HAVE_SYSCONF
794 long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
795 LOG_ONCE(log_network(_("This system has %d cpus."), ncpus));
796 #endif
797 size_t nfds = crcfile.getFDThread();
799 // log_network("This system is configured for %d file descriptors to be watched by each thread.", nfds);
801 // Get the next thread ID to hand off handling this file
802 // descriptor to. If the limit for threads per cpu hasn't been
803 // set or is set to 0, assume one thread per processor by
804 // default. There won't even be threads for each cpu if
805 // threading has been disabled in the cygnal config file.
806 int spawn_limit = 0;
807 if (nfds == 0) {
808 spawn_limit = ncpus;
809 } else {
810 spawn_limit = ncpus * nfds;
813 // FIXME: this may run forever, we probably want a cleaner way to
814 // test for the end of time.
815 do {
816 net.setPort(args->port);
817 if (netdebug) {
818 net.toggleDebug(true);
821 // Rotate in a range of 0 to the limit.
822 tid = (tid + 1) % (spawn_limit + 1);
823 // log_network("%s handler: thread ID #%d, fd #%d", proto_str[args->protocol], tid, fd);
825 // Wait for a connection to this tcp/ip from a client. If set
826 // to true, this will block until a request comes in. If set
827 // to single threaded mode, this will only allow one client to
828 // connect at a time. This is to make it easier to debug
829 // things when you have a heavily threaded application.
830 args->netfd = net.newConnection(true, fd);
831 if (args->netfd <= 0) {
832 log_network(_("No new %s network connections"),
833 proto_str[args->protocol]);
834 return;
835 } else {
836 log_network(_("*** New %s network connection for thread ID #%d, fd #%d ***"),
837 proto_str[args->protocol], tid, args->netfd);
841 // Setup HTTP handler
843 if (args->protocol == Network::HTTP) {
844 Network::thread_params_t *hargs = new Network::thread_params_t;
845 // std::copy(args, args+sizeof(Network::thread_params_t), &hargs);
846 hargs->protocol = args->protocol;
847 hargs->netfd = args->netfd;
848 #if 0
849 boost::shared_ptr<Handler> hand = cyg.findHandler(path);
850 HTTPServer *http = new HTTPServer;
851 hargs.entry = http;
852 http->setDocRoot(crcfile.getDocumentRoot());
853 boost::shared_ptr<cygnal::Buffer> buf(http->peekChunk());
854 http->processHeaderFields(*buf);
855 string hostname, path;
856 string::size_type pos = http->getField("host").find(":", 0);
857 if (pos != string::npos) {
858 hostname += http->getField("host").substr(0, pos);
859 } else {
860 hostname += "localhost.localdomain";
862 path = http->getFilespec();
863 string key = hostname + path;
864 #endif
865 string key;
866 Handler *hand = 0;
867 if (!hand) {
868 hand = new Handler;
869 hand->addClient(args->netfd, Network::HTTP);
870 int retries = 10;
871 cygnal::Buffer *buf = 0;
872 do {
873 buf = hand->parseFirstRequest(args->netfd, Network::HTTP);
874 if (!buf) {
875 retries--;
876 continue;
877 } else {
878 break;
880 } while (retries);
881 string &key = hand->getKey(args->netfd);
882 log_network(_("Creating new %s Handler for %s using fd #%d"),
883 proto_str[hargs->protocol], key, hargs->netfd);
884 hargs->handler = hand;
885 hargs->buffer = buf;
886 hargs->filespec = key;
887 // cyg.addHandler(key, hand);
889 // If in multi-threaded mode (the default), start a thread
890 // with a connection_handler for each port we're interested
891 // in. Each port of could have a different protocol.
892 boost::bind(event_handler, hargs);
893 if (crcfile.getThreadingFlag() == true) {
894 boost::thread event_thread(boost::bind(&event_handler, hargs));
895 } else {
896 event_handler(hargs);
897 // We're done, close this network connection
899 } else {
900 log_network(_("Reusing %s Handler for %s using fd #%d"),
901 proto_str[hargs->protocol], key, hargs->netfd);
902 hand->addClient(args->netfd, Network::HTTP);
904 // delete http;
905 } // end of if HTTP
908 // Setup RTMP handler
910 if (args->protocol == Network::RTMP) {
911 Network::thread_params_t *rargs = new Network::thread_params_t;
912 rargs->protocol = args->protocol;
913 rargs->netfd = args->netfd;
914 RTMPServer *rtmp = new RTMPServer;
915 boost::shared_ptr<cygnal::Element> tcurl =
916 rtmp->processClientHandShake(args->netfd);
917 if (!tcurl) {
918 // log_error("Couldn't read the tcUrl variable!");
919 rtmp->closeNet(args->netfd);
920 return;
922 URL url(tcurl->to_string());
923 string key = url.hostname() + url.path();
924 boost::shared_ptr<Handler> hand = cyg.findHandler(url.path());
925 if (!hand) {
926 log_network(_("Creating new %s Handler for: %s for fd %#d"),
927 proto_str[args->protocol], key, args->netfd);
928 hand.reset(new Handler);
929 cyg.addHandler(key, hand);
930 rargs->entry = rtmp;
931 hand->setNetConnection(rtmp->getNetConnection());
932 std::vector<boost::shared_ptr<Cygnal::peer_t> >::iterator it;
933 std::vector<boost::shared_ptr<Cygnal::peer_t> > active = cyg.getActive();
934 for (it = active.begin(); it < active.end(); ++it) {
935 Cygnal::peer_t *peer = (*it).get();
936 hand->addRemote(peer->fd);
938 hand->addClient(args->netfd, Network::RTMP);
939 rargs->handler = reinterpret_cast<void *>(hand.get());
940 args->filespec = key;
941 args->entry = rtmp;
943 string cgiroot;
944 char *env = std::getenv("CYGNAL_PLUGINS");
945 if (env != 0) {
946 cgiroot = env;
948 if (crcfile.getCgiRoot().size() > 0) {
949 cgiroot += ":" + crcfile.getCgiRoot();
950 log_network(_("Cygnal Plugin paths are: %s"), cgiroot);
951 } else {
952 cgiroot = PLUGINSDIR;
954 hand->scanDir(cgiroot);
955 boost::shared_ptr<Handler::cygnal_init_t> init =
956 hand->initModule(url.path());
958 // this is where the real work gets done.
959 if (init) {
960 // If in multi-threaded mode (the default), start a thread
961 // with a connection_handler for each port we're interested
962 // in. Each port of course has a different protocol.
963 if (crcfile.getThreadingFlag() == true) {
964 boost::thread event_thread(boost::bind(&event_handler, args));
965 } else {
966 event_handler(args);
967 // We're done, close this network connection
968 net.closeNet(args->netfd);
970 } else {
971 log_error(_("Couldn't load plugin for %s"), key);
974 // // We're done, close this network connection
975 // if (crcfile.getThreadingFlag() == true) {
976 // net.closeNet(args->netfd);
977 // }
979 // delete rtmp;
980 } // end of if RTMP
982 log_network(_("Number of active Threads is %d"), tids.num_of_tids());
984 // net.closeNet(args->netfd); // this shuts down this socket connection
985 log_network(_("Restarting loop for next connection for port %d..."),
986 args->port);
987 } while(!done);
989 // All threads should wake up now.
990 alldone.notify_all();
992 } // end of connection_handler
994 void
995 event_handler(Network::thread_params_t *args)
997 GNASH_REPORT_FUNCTION;
999 Network::thread_params_t largs;
1000 // std::copy(args, args+sizeof(Network::thread_params_t), &largs);
1001 Handler *hand = reinterpret_cast<Handler *>(args->handler);
1003 largs.protocol = args->protocol;
1004 largs.netfd = args->netfd;
1005 largs.port = args->port;
1006 largs.buffer = args->buffer;
1007 largs.entry = args->entry;
1008 largs.filespec = args->filespec;
1010 Network net;
1011 int timeout = 30;
1012 int retries = 0;
1013 bool done = false;
1015 fd_set hits;
1016 FD_ZERO(&hits);
1017 FD_SET(args->netfd, &hits);
1019 tids.increment();
1021 // We need to calculate the highest numbered file descriptor
1022 // for select. We may want to do this elsewhere, as it could
1023 // be a performance hit as the number of file descriptors gets
1024 // larger.
1025 log_debug("Handler has %d clients attached, %d threads",
1026 hand->getClients().size(), tids.num_of_tids());
1028 int max = 0;
1029 for (size_t i = 0; i<hand->getClients().size(); i++) {
1030 log_debug("Handler client[%d] is: %d", i, hand->getClient(i));
1031 if (hand->getClient(i) >= max) {
1032 max = hand->getClient(i);
1033 // hand->dump();
1037 do {
1039 // If we have active disk streams, send those packets first.
1040 // 0 is a reserved stream, so we start with 1, as the reserved
1041 // stream isn't one we care about here.
1042 if (hand->getActiveDiskStreams()) {
1043 log_network(_("%d active disk streams"),
1044 hand->getActiveDiskStreams());
1045 // hand->dump();
1047 #if 0
1048 boost::shared_ptr<DiskStream> filestream(cache.findFile(args->filespec));
1049 if (filestream) {
1050 filestream->dump();
1052 // #else
1053 // cache.dump();
1054 #endif
1055 //hand->dump();
1056 boost::shared_ptr<DiskStream> ds;
1057 for (int i=1; i <= hand->getActiveDiskStreams(); i++) {
1058 ds = hand->getDiskStream(i);
1059 if (ds) {
1060 //ds->dump();
1061 // Only play the next chunk of the file.
1062 //log_network("Sending following chunk of %s", ds->getFilespec());
1063 if (ds->play(i, false)) {
1064 if (ds->getState() == DiskStream::CLOSED) {
1065 net.closeNet(args->netfd);
1066 hand->removeClient(args->netfd);
1067 done = true;
1069 } else {
1070 // something went wrong, the stream failed
1071 net.closeNet(args->netfd);
1072 hand->removeClient(args->netfd);
1073 done = true;
1078 // See if we have any data waiting behind any of the file
1079 // descriptors.
1080 for (int i=0; i <= max + 1; i++) {
1081 if (FD_ISSET(i, &hits)) {
1082 FD_CLR(i, &hits);
1083 log_network(_("Got a hit for fd #%d, protocol %s"), i,
1084 proto_str[hand->getProtocol(i)]);
1085 switch (hand->getProtocol(i)) {
1086 case Network::NONE:
1087 log_error(_("No protocol specified!"));
1088 break;
1089 case Network::HTTP:
1091 largs.netfd = i;
1092 // largs.filespec = fullpath;
1093 boost::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1094 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1095 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1096 net.closeNet(args->netfd);
1097 hand->removeClient(args->netfd);
1098 done = true;
1099 } else {
1100 log_network(_("Not Done with HTTP connection for fd #%d, it's a persistent connection."), i);
1103 continue;
1105 case Network::RTMP:
1106 args->netfd = i;
1107 // args->filespec = path;
1108 if (!rtmp_handler(args)) {
1109 log_network(_("Done with RTMP connection for fd #%d, CGI "), i, args->filespec);
1110 done = true;
1112 break;
1113 case Network::RTMPT:
1115 net.setTimeout(timeout);
1116 args->netfd = i;
1117 boost::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1118 // args->filespec = path;
1119 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1120 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, largs.filespec);
1121 return;
1123 break;
1125 case Network::RTMPTS:
1127 args->netfd = i;
1128 // args->filespec = path;
1129 boost::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1130 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1131 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1132 return;
1134 break;
1136 case Network::RTMPE:
1137 break;
1138 case Network::RTMPS:
1139 break;
1140 case Network::DTN:
1141 break;
1142 default:
1143 log_error(_("Unsupported network protocol for fd #%d, %d"),
1144 largs.netfd, hand->getProtocol(i));
1145 done = true;
1146 break;
1148 // delete args->buffer;
1152 // // Clear the current message so next time we read new data
1153 // args->buffer->clear();
1154 // largs.buffer->clear();
1156 // Wait for something from one of the file descriptors. This timeout
1157 // is the time between sending packets to the client when there is
1158 // no client input, which effects the streaming speed of big files.
1159 net.setTimeout(5);
1160 hits = net.waitForNetData(hand->getClients());
1161 if (FD_ISSET(0, &hits)) {
1162 FD_CLR(0, &hits);
1163 log_network(_("Got no hits, %d retries"), retries);
1164 // net.closeNet(args->netfd);
1165 // hand->removeClient(args->netfd);
1166 // done = true;
1168 retries++;
1169 #if 0
1170 if (retries >= 10) {
1171 net.closeNet(args->netfd);
1172 hand->removeClient(args->netfd);
1173 done = true;
1175 #endif
1176 } while (!done);
1178 tids.decrement();
1180 } // end of event_handler
1182 // local Variables:
1183 // mode: C++
1184 // indent-tabs-mode: nil
1185 // End: