update copyright date
[gnash.git] / cygnal / cygnal.cpp
blobd9ab4007995df8fb0ac357a23d6e65c7442af540
1 // cygnal.cpp: GNU streaming Flash media server, for Gnash.
2 //
3 // Copyright (C) 2007, 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
4 //
5 // This program is free software; you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation; either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with this program; if not, write to the Free Software
17 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 #ifdef HAVE_CONFIG_H
21 #include "gnashconfig.h"
22 #endif
24 #include <sys/stat.h>
25 #include <list>
26 #include <map>
27 #include <iostream>
28 #include <sstream>
29 #include <csignal>
30 #include <vector>
31 #include <sys/mman.h>
32 #include <cerrno>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <fcntl.h>
37 #include "GnashSleep.h"
38 #include "revno.h"
40 //#include "cvm.h"
42 extern "C"{
43 # include "GnashSystemIOHeaders.h"
44 #ifdef HAVE_GETOPT_H
45 # include <getopt.h>
46 #endif
47 #ifndef __GNUC__
48 extern int optind, getopt(int, char *const *, const char *);
49 extern char *optarg;
50 #endif
53 #include <boost/shared_ptr.hpp>
55 // classes internal to Gnash
56 #include "network.h"
57 #include "log.h"
58 #include "crc.h"
59 #include "proc.h"
60 #include "rtmp.h"
61 #include "buffer.h"
62 #include "utility.h"
63 #include "limits.h"
64 #include "netstats.h"
65 #include "statistics.h"
66 //#include "stream.h"
67 #include "gmemory.h"
68 #include "diskstream.h"
69 #include "arg_parser.h"
70 #include "GnashException.h"
71 #include "GnashSleep.h" // for usleep comptibility.
72 #include "URL.h"
73 #include "rtmp_client.h"
75 // classes internal to Cygnal
76 #include "rtmp_server.h"
77 #include "http_server.h"
79 #include "handler.h"
80 #include "cache.h"
81 #include "cygnal.h"
83 #ifdef ENABLE_NLS
84 # include <locale>
85 #endif
87 #include <boost/date_time/gregorian/gregorian.hpp>
88 //#include <boost/date_time/local_time/local_time.hpp>
89 #include <boost/date_time/time_zone_base.hpp>
90 #include <boost/date_time/posix_time/posix_time.hpp>
91 #include <boost/thread/thread.hpp>
92 #include <boost/bind.hpp>
93 #include <boost/thread/mutex.hpp>
94 #include <boost/thread/condition.hpp>
95 #include <boost/thread/tss.hpp>
97 #ifndef POLLRDHUP
98 #define POLLRDHUP 0
99 #endif
101 //using gnash::log_network;
102 using namespace std;
103 using namespace gnash;
104 using namespace cygnal;
106 static void usage();
107 static void version_and_copyright();
108 static void cntrlc_handler(int sig);
109 static void hup_handler(int sig);
111 void connection_handler(Network::thread_params_t *args);
112 void event_handler(Network::thread_params_t *args);
113 void admin_handler(Network::thread_params_t *args);
115 // Toggles very verbose debugging info from the network Network class
116 static bool netdebug = false;
118 struct sigaction act1, act2;
120 // The next few global variables have to be global because Boost
121 // threads don't take arguments. Since these are set in main() before
122 // any of the threads are started, and it's value should never change,
123 // it's safe to use these without a mutex, as all threads share the
124 // same read-only value.
126 // This is the default path to look in for files to be streamed.
127 static string docroot;
129 // This is the number of times a thread loop continues, for debugging only
130 int thread_retries = 10;
132 // This is added to the default ports for testing so it doesn't
133 // conflict with apache on the same machine.
134 static int port_offset = 0;
136 // Toggle the admin thread
137 static bool admin = false;
139 // Admin commands are small
140 const int ADMINPKTSIZE = 80;
142 // If set to a non zero value, this limits Cygnal to only one protocol
143 // at a time. This is for debugging only.
144 static int only_port = 0;
146 // These keep track of the number of active threads.
147 ThreadCounter tids;
149 map<int, Network *> networks;
151 // This is the global object for Cygnl
152 // The debug log used by all the gnash libraries.
153 static Cygnal& cyg = Cygnal::getDefaultInstance();
155 // The debug log used by all the gnash libraries.
156 static LogFile& dbglogfile = LogFile::getDefaultInstance();
158 // The user config for Cygnal is loaded and parsed here:
159 static CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
161 // Cache support for responses and files.
162 static Cache& cache = Cache::getDefaultInstance();
164 // The list of active cgis being executed.
165 //static std::map<std::string, Proc> procs; // = proc::getDefaultInstance();
167 // This mutex is used to signify when all the threads are done.
168 static boost::condition alldone;
169 static boost::mutex alldone_mutex;
171 static boost::condition noclients;
172 static boost::mutex noclients_mutex;
174 const char *proto_str[] = {
175 "NONE",
176 "HTTP",
177 "HTTPS",
178 "RTMP",
179 "RTMPT",
180 "RTMPTS",
181 "RTMPE",
182 "RTMPS",
183 "DTN"
186 static void
187 usage()
189 cout << _("cygnal -- a streaming media server.") << endl
190 << endl
191 << _("Usage: cygnal [options...]") << endl
192 << _(" -h, --help Print this help and exit") << endl
193 << _(" -V, --version Print version information and exit") << endl
194 << _(" -v, --verbose Output verbose debug info") << endl
195 << _(" -s, --singlethread Disable Multi Threading") << endl
196 << _(" -n, --netdebug Turn on net debugging messages") << endl
197 << _(" -o --only-port Only use port for debugging") << endl
198 << _(" -p --port-offset Port offset for debugging") << endl
199 << _(" -t, --testing Turn on special Gnash testing support") << endl
200 << _(" -a, --admin Enable the administration thread") << endl
201 << _(" -r, --root Document root for all files") << endl
202 << endl;
206 Cygnal&
207 Cygnal::getDefaultInstance()
209 // GNASH_REPORT_FUNCTION;
210 static Cygnal o;
211 return o;
215 Cygnal::~Cygnal()
217 // GNASH_REPORT_FUNCTION;
220 bool
221 Cygnal::loadPeersFile()
223 // GNASH_REPORT_FUNCTION;
225 loadPeersFile("./peers.conf");
227 loadPeersFile("/etc/peers.conf");
229 // Check the users home directory
230 #ifndef __amigaos4__
231 char *home = std::getenv("HOME");
232 #else
233 //on AmigaOS we have a GNASH: assign that point to program dir
234 char *home = "/gnash";
235 #endif
237 string homefile = home;
238 homefile += "/peers.conf";
240 return loadPeersFile(homefile);
243 bool
244 Cygnal::loadPeersFile(const std::string &filespec)
246 // GNASH_REPORT_FUNCTION;
248 struct stat stats;
249 std::ifstream in;
250 std::string line;
251 string host;
252 string portstr;
253 string cgi;
254 vector<string> supported;
256 // Make sufre the file exists
257 if (stat(filespec.c_str(), &stats) != 0) {
258 return false;
261 in.open(filespec.c_str());
263 if (!in) {
264 log_error(": couldn't open file: ", filespec);
265 return false;
268 // Read in each line and parse it
269 size_t lineno = 0;
270 while (std::getline(in, line)) {
272 ++lineno;
274 // Ignore comment and empty lines
275 if (line.empty() || line[0] == '#') {
276 continue;
279 std::istringstream ss(line);
281 // Get the first token
282 if (! (ss >> host)) {
283 // Empty line
284 continue;
287 // 'action' should never be empty, or (ss >> action)
288 // above would have failed
290 if (host[0] == '#') {
291 continue; // discard comments
294 // Get second token
295 if (!(ss >> portstr)) {
296 // Do we need to warn here as well?
297 continue;
300 while (ss >> cgi) {
301 supported.push_back(cgi);
302 continue;
305 // Create a new peer item
306 boost::shared_ptr<peer_t> peer(new Cygnal::peer_t);
307 peer->hostname = host;
308 peer->port = strtol(portstr.c_str(), NULL, 0) & 0xffff;
310 _peers.push_back(peer);
313 return true;
316 void
317 Cygnal::probePeers()
319 // GNASH_REPORT_FUNCTION;
321 probePeers(_peers);
324 void
325 Cygnal::probePeers(peer_t &peer)
327 // GNASH_REPORT_FUNCTION;
328 RTMPClient net;
329 stringstream uri;
331 uri << peer.hostname;
333 vector<string>::iterator it;
334 for (it = peer.supported.begin(); it <= peer.supported.end(); ++it) {
335 string tmp = uri.str();
336 // tmp += (*it);
337 // log_network("Constructed: %s/%s", uri.str(), *it);
339 gnash::URL url(uri.str());
340 if (!(peer.fd = net.connectToServer(uri.str()))) {
341 log_network("Couldn't connect to %s", uri.str());
342 peer.connected = false;
343 } else {
344 peer.connected = true;
345 // peer.fd = net.getFileFd();
350 void
351 Cygnal::probePeers(std::vector<boost::shared_ptr<peer_t> > &peers)
353 // GNASH_REPORT_FUNCTION;
355 // createClient();
356 std::vector<boost::shared_ptr<Cygnal::peer_t> >::iterator it;
357 for (it = peers.begin(); it != peers.end(); it++) {
358 boost::shared_ptr<Cygnal::peer_t> peer = *it;
359 probePeers(*peer);
360 if (peer->connected) {
361 log_network("%s is active on fd #%d.", peer->hostname,
362 peer->fd);
363 _active_peers.push_back(*it);
368 void
369 Cygnal::removeHandler(const std::string &path)
371 // GNASH_REPORT_FUNCTION;
372 map<std::string, boost::shared_ptr<Handler> >::iterator it;
373 it = _handlers.find(path);
374 if (it != _handlers.end()) {
375 boost::mutex::scoped_lock lock(_mutex);
376 _handlers.erase(it);
380 boost::shared_ptr<Handler>
381 Cygnal::findHandler(const std::string &path)
383 // GNASH_REPORT_FUNCTION;
384 map<std::string, boost::shared_ptr<Handler> >::iterator it;
385 boost::shared_ptr<Handler> hand;
386 it = _handlers.find(path);
387 if (it != _handlers.end()) {
388 hand = (*it).second;
391 return hand;
394 void
395 Cygnal::dump()
397 std::vector<boost::shared_ptr<Cygnal::peer_t> >::iterator it;
398 for (it = _peers.begin(); it != _peers.end(); it++) {
399 cerr << "Remote Peer: " << (*it)->hostname
400 << ":" << (*it)->port << endl;
405 main(int argc, char *argv[])
407 // Initialize national language support
408 #ifdef ENABLE_NLS
409 setlocale (LC_ALL, "");
410 bindtextdomain (PACKAGE, LOCALEDIR);
411 textdomain (PACKAGE);
412 #endif
414 const Arg_parser::Option opts[] =
416 { 'h', "help", Arg_parser::no },
417 { 'V', "version", Arg_parser::no },
418 { 'p', "port-offset", Arg_parser::yes },
419 { 'v', "verbose", Arg_parser::no },
420 { 'd', "dump", Arg_parser::no },
421 { 'n', "netdebug", Arg_parser::no },
422 { 't', "testing", Arg_parser::no },
423 { 'a', "admin", Arg_parser::no },
424 { 'r', "root", Arg_parser::yes },
425 { 'o', "only-port", Arg_parser::yes },
426 { 's', "singlethreaded", Arg_parser::no }
429 Arg_parser parser(argc, argv, opts);
430 if( ! parser.error().empty() )
432 cout << parser.error() << endl;
433 exit(EXIT_FAILURE);
436 // crcfile.loadFiles();
438 // Set the log file name before trying to write to
439 // it, or we might get two.
440 dbglogfile.setLogFilename(crcfile.getDebugLog());
442 if (crcfile.verbosityLevel() > 0) {
443 dbglogfile.setVerbosity(crcfile.verbosityLevel());
446 if (crcfile.getDocumentRoot().size() > 0) {
447 docroot = crcfile.getDocumentRoot();
448 } else {
449 docroot = "/var/www/html/software/tests/";
450 crcfile.setDocumentRoot(docroot);
452 if (crcfile.getPortOffset()) {
453 port_offset = crcfile.getPortOffset();
456 // Handle command line arguments
457 for( int i = 0; i < parser.arguments(); ++i ) {
458 const int code = parser.code(i);
459 switch( code ) {
460 case 'h':
461 version_and_copyright();
462 usage();
463 exit(EXIT_SUCCESS);
464 case 'V':
465 version_and_copyright();
466 exit(EXIT_SUCCESS);
467 case 't':
468 crcfile.setTestingFlag(true);
469 break;
470 case 'a':
471 admin = true;
472 break;
473 case 'v':
474 dbglogfile.setVerbosity();
475 LOG_ONCE(log_network (_("Verbose output turned on")))
476 break;
477 case 'p':
478 port_offset = parser.argument<int>(i);
479 crcfile.setPortOffset(port_offset);
480 break;
481 case 'r':
482 docroot = parser.argument(i);
483 break;
484 case 's':
485 crcfile.setThreadingFlag(false);
486 break;
487 case 'n':
488 netdebug = true;
489 dbglogfile.setNetwork(true);
490 break;
491 case 'o':
492 only_port = parser.argument<int>(i);
493 break;
494 case 'd':
495 crcfile.dump();
496 exit(EXIT_SUCCESS);
497 break;
498 default:
499 log_error (_("Extraneous argument: %s"), parser.argument(i).c_str());
503 log_network (_("Document Root for media files is: %s"), docroot);
504 crcfile.setDocumentRoot(docroot);
506 // load the file of peers. A peer is another instance of Cygnal we
507 // can use for distributed processing.
508 cyg.loadPeersFile();
509 cyg.probePeers();
511 // cyg.dump();
513 // Trap ^C (SIGINT) so we can kill all the threads
514 act1.sa_handler = cntrlc_handler;
515 sigaction (SIGINT, &act1, NULL);
516 act2.sa_handler = hup_handler;
517 sigaction (SIGHUP, &act2, NULL);
518 // sigaction (SIGPIPE, &act, NULL);
520 // Lock a mutex the main() waits in before exiting. This is
521 // because all the actually processing is done by other threads.
522 boost::mutex::scoped_lock lk(alldone_mutex);
524 // Start the Admin handler. This allows one to connect to Cygnal
525 // at port 1111 and dump statistics to the terminal for tuning
526 // purposes.
527 if (admin) {
528 Network::thread_params_t admin_data;
529 admin_data.port = gnash::ADMIN_PORT;
530 boost::thread admin_thread(boost::bind(&admin_handler, &admin_data));
533 // Cvm cvm;
534 // cvm.loadMovie("/tmp/out.swf");
536 // If a only-port is specified, we only want to run single
537 // threaded. As all the rest of the code checks the config value
538 // setting, this overrides that in the memory, but doesn't change
539 // the file itself. This feature is really only for debugging,
540 // where it's easier to work with one protocol at a time.
541 if (only_port) {
542 crcfile.setThreadingFlag(false);
545 // Incomming connection handler for port 80, HTTP and
546 // RTMPT. As port 80 requires root access, cygnal supports a
547 // "port offset" for debugging and development of the
548 // server. Since this port offset changes the constant to test
549 // for which protocol, we pass the info to the start thread so
550 // it knows which handler to invoke.
551 Network::thread_params_t *http_data = new Network::thread_params_t;
552 if ((only_port == 0) || (only_port == gnash::HTTP_PORT)) {
553 http_data->tid = 0;
554 http_data->netfd = 0;
555 http_data->filespec = docroot;
556 http_data->protocol = Network::HTTP;
557 http_data->port = port_offset + gnash::HTTP_PORT;
558 if (crcfile.getThreadingFlag()) {
559 boost::thread http_thread(boost::bind(&connection_handler, http_data));
560 } else {
561 connection_handler(http_data);
565 // Incomming connection handler for port 1935, RTMPT and
566 // RTMPTE. This supports the same port offset as the HTTP handler,
567 // just to keep things consistent.
568 Network::thread_params_t *rtmp_data = new Network::thread_params_t;
569 if ((only_port == 0) || (only_port == gnash::RTMP_PORT)) {
570 rtmp_data->tid = 0;
571 rtmp_data->netfd = 0;
572 rtmp_data->filespec = docroot;
573 rtmp_data->protocol = Network::RTMP;
574 rtmp_data->port = port_offset + gnash::RTMP_PORT;
575 if (crcfile.getThreadingFlag()) {
576 boost::thread rtmp_thread(boost::bind(&connection_handler, rtmp_data));
577 } else {
578 connection_handler(rtmp_data);
582 // Wait for all the threads to die.
583 alldone.wait(lk);
585 log_network (_("Cygnal done..."));
587 // Delete the data we allowcated to pass to each connection_handler.
588 delete rtmp_data;
589 delete http_data;
591 return(0);
594 // Trap Control-C (SIGINT) so we can cleanly exit
595 static void
596 cntrlc_handler (int sig)
598 log_network(_("Got a %d interrupt"), sig);
599 // sigaction (SIGINT, &act, NULL);
600 exit(EXIT_FAILURE);
603 // Trap SIGHUP so we can
604 static void
605 hup_handler (int /* sig */)
607 if (crcfile.getTestingFlag()) {
608 cerr << "Testing, Testing, Testing..." << endl;
613 static void
614 version_and_copyright()
616 cout << "Cygnal: " << BRANCH_NICK << "_" << BRANCH_REVNO << endl
617 << endl
618 << _("Copyright (C) 2007, 2008, 2009, 2010 Free Software Foundation, Inc.\n"
619 "Cygnal comes with NO WARRANTY, to the extent permitted by law.\n"
620 "You may redistribute copies of Cygnal under the terms of the GNU General\n"
621 "Public License V3. For more information, see the file named COPYING.\n")
622 << endl;
625 // FIXME: this function could be tweaked for better performance
626 void
627 admin_handler(Network::thread_params_t *args)
629 GNASH_REPORT_FUNCTION;
630 int retries = 100;
631 int ret;
633 map<int, Handler *>::iterator hit;
634 stringstream response;
635 int index = 0;
637 Network net;
638 Handler::admin_cmd_e cmd = Handler::POLL;
639 net.createServer(args->port);
640 while (retries > 0) {
641 log_network(_("Starting Admin Handler for port %d"), args->port);
642 net.newConnection(true);
643 log_network(_("Got an incoming Admin request"));
644 sleep(1);
645 do {
646 Network::byte_t data[ADMINPKTSIZE+1];
647 memset(data, 0, ADMINPKTSIZE+1);
648 const char *ptr = reinterpret_cast<const char *>(data);
649 ret = net.readNet(data, ADMINPKTSIZE, 100);
650 if (ret < 0) {
651 log_network("no more admin data, exiting...\n");
652 if ((ret == 0) && cmd != Handler::POLL) {
653 break;
655 } else {
656 // force the case to make comparisons easier. Only compare enough characters to
657 // till each command is unique.
658 std::transform(ptr, ptr + ret, data, (int(*)(int)) toupper);
659 if (strncmp(ptr, "QUIT", 4) == 0) {
660 cmd = Handler::QUIT;
661 } else if (strncmp(ptr, "STATUS", 5) == 0) {
662 cmd = Handler::STATUS;
663 } else if (strncmp(ptr, "HELP", 2) == 0) {
664 cmd = Handler::HELP;
665 net.writeNet("commands: help, status, poll, interval, statistics, quit.\n");
666 } else if (strncmp(ptr, "POLL", 2) == 0) {
667 cmd = Handler::POLL;
668 } else if (strncmp(ptr, "INTERVAL", 2) == 0) {
669 cmd = Handler::INTERVAL;
672 switch (cmd) {
673 // close this connection
674 case Handler::QUIT:
675 ret = -1;
676 break;
677 case Handler::STATUS:
679 #ifdef USE_STATS_CACHE
680 // cache.dump();
681 string results = cache.stats(false);
682 if (results.size()) {
683 net.writeNet(results);
684 results.clear();
686 #endif
687 #if 0
688 response << handlers.size() << " handlers are currently active.";
689 for (hit = handlers.begin(); hit != handlers.end(); hit++) {
690 int fd = hit->first;
691 Handler *hand = hit->second;
692 response << fd << ","
693 << hand->insize()
694 << "," << hand->outsize()
695 << "\r\n";
696 net.writeNet(response);
697 index++;
699 #endif
701 index = 0;
702 break;
703 case Handler::POLL:
704 #ifdef USE_STATS_QUEUE
705 index = 0;
706 response << handlers.size() << " handlers are currently active." << "\r\n";
707 for (hit = handlers.begin(); hit != handlers.end(); hit++) {
708 int fd = hit->first;
709 Handler *hand = hit->second;
710 struct timespec now;
711 clock_gettime (CLOCK_REALTIME, &now);
712 // Incoming que stats
713 CQue::que_stats_t *stats = hand->statsin();
714 float diff = static_cast<float>(((now.tv_sec -
715 stats->start.tv_sec) + ((now.tv_nsec -
716 stats->start.tv_nsec)/1e9)));
717 response << fd
718 << "," << stats->totalbytes
719 << "," << diff
720 << "," << stats->totalin
721 << "," << stats->totalout;
722 // Outgoing que stats
723 stats = hand->statsout();
724 response << "," <<stats->totalbytes
725 << "," << stats->totalin
726 << "," << stats->totalout
727 << "\r\n";
728 net.writeNet(response.str());
729 index++;
731 index = 0;
732 #endif
733 break;
734 case Handler::INTERVAL:
735 net.writeNet("set interval\n");
736 break;
737 default:
738 break;
740 } while (ret > 0);
741 log_network("admin_handler: Done...!\n");
742 net.closeNet(); // this shuts down this socket connection
744 net.closeConnection(); // this shuts down the server on this connection
746 // All threads should exit now.
747 alldone.notify_all();
750 // A connection handler is started for each port the server needs to
751 // wait on for incoming connections. When it gets an incoming
752 // connection, it reads the first packet to get the resource name, and
753 // then starts the event handler thread if it's a newly requested
754 // resource, otherwise it loads a copy of the cached resource.
755 void
756 connection_handler(Network::thread_params_t *args)
758 // GNASH_REPORT_FUNCTION;
759 int fd = 0;
760 Network net;
761 bool done = false;
762 static int tid = 0;
764 if (netdebug) {
765 net.toggleDebug(true);
767 // Start a server on this tcp/ip port.
768 fd = net.createServer(args->port);
769 if (fd <= 0) {
770 log_error("Can't start %s Connection Handler for fd #%d, port %hd",
771 proto_str[args->protocol], fd, args->port);
772 return;
773 } else {
774 log_network("Starting %s Connection Handler for fd #%d, port %hd",
775 proto_str[args->protocol], fd, args->port);
778 // Get the number of cpus in this system. For multicore
779 // systems we'll get better load balancing if we keep all the
780 // cpus busy. So a pool of threads is started for each cpu,
781 // the default being just one. Each thread is reponsible for
782 // handling part of the total active file descriptors.
783 #ifdef HAVE_SYSCONF
784 long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
785 LOG_ONCE(log_network("This system has %d cpus.", ncpus));
786 #endif
787 size_t nfds = crcfile.getFDThread();
789 // log_network("This system is configured for %d file descriptors to be watched by each thread.", nfds);
791 // Get the next thread ID to hand off handling this file
792 // descriptor to. If the limit for threads per cpu hasn't been
793 // set or is set to 0, assume one thread per processor by
794 // default. There won't even be threads for each cpu if
795 // threading has been disabled in the cygnal config file.
796 int spawn_limit = 0;
797 if (nfds == 0) {
798 spawn_limit = ncpus;
799 } else {
800 spawn_limit = ncpus * nfds;
803 // FIXME: this may run forever, we probably want a cleaner way to
804 // test for the end of time.
805 do {
806 net.setPort(args->port);
807 if (netdebug) {
808 net.toggleDebug(true);
811 // Rotate in a range of 0 to the limit.
812 tid = (tid + 1) % (spawn_limit + 1);
813 // log_network("%s handler: thread ID #%d, fd #%d", proto_str[args->protocol], tid, fd);
815 // Wait for a connection to this tcp/ip from a client. If set
816 // to true, this will block until a request comes in. If set
817 // to single threaded mode, this will only allow one client to
818 // connect at a time. This is to make it easier to debug
819 // things when you have a heavily threaded application.
820 args->netfd = net.newConnection(true, fd);
821 if (args->netfd <= 0) {
822 log_network("No new %s network connections", proto_str[args->protocol]);
823 continue;
824 } else {
825 log_network("*** New %s network connection for thread ID #%d, fd #%d ***",
826 proto_str[args->protocol], tid, args->netfd);
830 // Setup HTTP handler
832 if (args->protocol == Network::HTTP) {
833 Network::thread_params_t *hargs = new Network::thread_params_t;
834 // std::copy(args, args+sizeof(Network::thread_params_t), &hargs);
835 hargs->protocol = args->protocol;
836 hargs->netfd = args->netfd;
837 #if 0
838 boost::shared_ptr<Handler> hand = cyg.findHandler(path);
839 HTTPServer *http = new HTTPServer;
840 hargs.entry = http;
841 http->setDocRoot(crcfile.getDocumentRoot());
842 boost::shared_ptr<cygnal::Buffer> buf(http->peekChunk());
843 http->processHeaderFields(*buf);
844 string hostname, path;
845 string::size_type pos = http->getField("host").find(":", 0);
846 if (pos != string::npos) {
847 hostname += http->getField("host").substr(0, pos);
848 } else {
849 hostname += "localhost";
851 path = http->getFilespec();
852 string key = hostname + path;
853 #endif
854 string key;
855 Handler *hand = 0;
856 if (!hand) {
857 hand = new Handler;
858 hand->addClient(args->netfd, Network::HTTP);
859 int retries = 10;
860 cygnal::Buffer *buf = 0;
861 do {
862 buf = hand->parseFirstRequest(args->netfd, Network::HTTP);
863 if (!buf) {
864 retries--;
865 continue;
866 } else {
867 break;
869 } while (retries);
870 string &key = hand->getKey(args->netfd);
871 log_network("Creating new %s Handler for %s using fd #%d",
872 proto_str[hargs->protocol], key, hargs->netfd);
873 hargs->handler = hand;
874 hargs->buffer = buf;
875 hargs->filespec = key;
876 // cyg.addHandler(key, hand);
878 // If in multi-threaded mode (the default), start a thread
879 // with a connection_handler for each port we're interested
880 // in. Each port of could have a different protocol.
881 boost::bind(event_handler, hargs);
882 if (crcfile.getThreadingFlag() == true) {
883 boost::thread event_thread(boost::bind(&event_handler, hargs));
884 } else {
885 event_handler(hargs);
886 // We're done, close this network connection
888 } else {
889 log_network("Resuing %s Handler for %s using fd #%d",
890 proto_str[hargs->protocol], key, hargs->netfd);
891 hand->addClient(args->netfd, Network::HTTP);
893 // delete http;
894 } // end of if HTTP
897 // Setup RTMP handler
899 if (args->protocol == Network::RTMP) {
900 Network::thread_params_t *rargs = new Network::thread_params_t;
901 rargs->protocol = args->protocol;
902 rargs->netfd = args->netfd;
903 RTMPServer *rtmp = new RTMPServer;
904 boost::shared_ptr<cygnal::Element> tcurl =
905 rtmp->processClientHandShake(args->netfd);
906 if (!tcurl) {
907 // log_error("Couldn't read the tcUrl variable!");
908 rtmp->closeNet(args->netfd);
909 return;
911 URL url(tcurl->to_string());
912 string key = url.hostname() + url.path();
913 boost::shared_ptr<Handler> hand = cyg.findHandler(url.path());
914 if (!hand) {
915 log_network("Creating new %s Handler for: %s for fd %#d",
916 proto_str[args->protocol], key, args->netfd);
917 hand.reset(new Handler);
918 cyg.addHandler(key, hand);
919 rargs->entry = rtmp;
920 hand->setNetConnection(rtmp->getNetConnection());
921 std::vector<boost::shared_ptr<Cygnal::peer_t> >::iterator it;
922 std::vector<boost::shared_ptr<Cygnal::peer_t> > active = cyg.getActive();
923 for (it = active.begin(); it < active.end(); ++it) {
924 Cygnal::peer_t *peer = (*it).get();
925 hand->addRemote(peer->fd);
927 hand->addClient(args->netfd, Network::RTMP);
928 rargs->handler = reinterpret_cast<void *>(hand.get());
929 args->filespec = key;
930 args->entry = rtmp;
932 string cgiroot;
933 char *env = std::getenv("CYGNAL_PLUGINS");
934 if (env != 0) {
935 cgiroot = env;
937 if (crcfile.getCgiRoot().size() > 0) {
938 cgiroot += ":" + crcfile.getCgiRoot();
939 log_network (_("Cygnal Plugin paths are: %s"), cgiroot);
940 } else {
941 cgiroot = PLUGINSDIR;
943 hand->scanDir(cgiroot);
944 boost::shared_ptr<Handler::cygnal_init_t> init =
945 hand->initModule(url.path());
947 // this is where the real work gets done.
948 if (init) {
949 // If in multi-threaded mode (the default), start a thread
950 // with a connection_handler for each port we're interested
951 // in. Each port of course has a different protocol.
952 if (crcfile.getThreadingFlag() == true) {
953 boost::thread event_thread(boost::bind(&event_handler, args));
954 } else {
955 event_handler(args);
956 // We're done, close this network connection
957 net.closeNet(args->netfd);
959 } else {
960 log_error("Couldn't load plugin for %s", key);
963 // // We're done, close this network connection
964 // if (crcfile.getThreadingFlag() == true) {
965 // net.closeNet(args->netfd);
966 // }
968 // delete rtmp;
969 } // end of if RTMP
971 log_network("Number of active Threads is %d", tids.num_of_tids());
973 // net.closeNet(args->netfd); // this shuts down this socket connection
974 log_network("Restarting loop for next connection for port %d...", args->port);
975 } while(!done);
977 // All threads should wake up now.
978 alldone.notify_all();
980 } // end of connection_handler
982 void
983 event_handler(Network::thread_params_t *args)
985 GNASH_REPORT_FUNCTION;
987 Network::thread_params_t largs;
988 // std::copy(args, args+sizeof(Network::thread_params_t), &largs);
989 Handler *hand = reinterpret_cast<Handler *>(args->handler);
991 largs.protocol = args->protocol;
992 largs.netfd = args->netfd;
993 largs.port = args->port;
994 largs.buffer = args->buffer;
995 largs.entry = args->entry;
996 largs.filespec = args->filespec;
998 Network net;
999 int timeout = 30;
1000 int retries = 0;
1001 bool done = false;
1003 fd_set hits;
1004 FD_ZERO(&hits);
1005 FD_SET(args->netfd, &hits);
1007 tids.increment();
1009 // We need to calculate the highest numbered file descriptor
1010 // for select. We may want to do this elsewhere, as it could
1011 // be a performance hit as the number of file descriptors gets
1012 // larger.
1013 log_debug("Handler has %d clients attached, %d threads",
1014 hand->getClients().size(), tids.num_of_tids());
1016 int max = 0;
1017 for (size_t i = 0; i<hand->getClients().size(); i++) {
1018 log_debug("Handler client[%d] is: %d", i, hand->getClient(i));
1019 if (hand->getClient(i) >= max) {
1020 max = hand->getClient(i);
1021 // hand->dump();
1025 do {
1027 // If we have active disk streams, send those packets first.
1028 // 0 is a reserved stream, so we start with 1, as the reserved
1029 // stream isn't one we care about here.
1030 if (hand->getActiveDiskStreams()) {
1031 log_network("%d active disk streams",
1032 hand->getActiveDiskStreams());
1033 // hand->dump();
1035 #if 0
1036 boost::shared_ptr<DiskStream> filestream(cache.findFile(args->filespec));
1037 if (filestream) {
1038 filestream->dump();
1040 // #else
1041 // cache.dump();
1042 #endif
1043 //hand->dump();
1044 boost::shared_ptr<DiskStream> ds;
1045 for (int i=1; i <= hand->getActiveDiskStreams(); i++) {
1046 ds = hand->getDiskStream(i);
1047 if (ds) {
1048 //ds->dump();
1049 // Only play the next chunk of the file.
1050 //log_network("Sending following chunk of %s", ds->getFilespec());
1051 if (ds->play(i, false)) {
1052 if (ds->getState() == DiskStream::CLOSED) {
1053 net.closeNet(args->netfd);
1054 hand->removeClient(args->netfd);
1055 done = true;
1057 } else {
1058 // something went wrong, the stream failed
1059 net.closeNet(args->netfd);
1060 hand->removeClient(args->netfd);
1061 done = true;
1066 // See if we have any data waiting behind any of the file
1067 // descriptors.
1068 for (int i=0; i <= max + 1; i++) {
1069 if (FD_ISSET(i, &hits)) {
1070 FD_CLR(i, &hits);
1071 log_network("Got a hit for fd #%d, protocol %s", i,
1072 proto_str[hand->getProtocol(i)]);
1073 switch (hand->getProtocol(i)) {
1074 case Network::NONE:
1075 log_error("No protocol specified!");
1076 break;
1077 case Network::HTTP:
1079 largs.netfd = i;
1080 // largs.filespec = fullpath;
1081 boost::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1082 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1083 log_network("Done with HTTP connection for fd #%d, CGI %s", i, args->filespec);
1084 net.closeNet(args->netfd);
1085 hand->removeClient(args->netfd);
1086 done = true;
1087 } else {
1088 log_network("Not Done with HTTP connection for fd #%d, it's a persistent connection.", i);
1091 continue;
1093 case Network::RTMP:
1094 args->netfd = i;
1095 // args->filespec = path;
1096 if (!rtmp_handler(args)) {
1097 log_network("Done with RTMP connection for fd #%d, CGI ", i, args->filespec);
1098 done = true;
1100 break;
1101 case Network::RTMPT:
1103 net.setTimeout(timeout);
1104 args->netfd = i;
1105 boost::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1106 // args->filespec = path;
1107 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1108 log_network("Done with HTTP connection for fd #%d, CGI %s", i, largs.filespec);
1109 return;
1111 break;
1113 case Network::RTMPTS:
1115 args->netfd = i;
1116 // args->filespec = path;
1117 boost::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1118 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1119 log_network("Done with HTTP connection for fd #%d, CGI %s", i, args->filespec);
1120 return;
1122 break;
1124 case Network::RTMPE:
1125 break;
1126 case Network::RTMPS:
1127 break;
1128 case Network::DTN:
1129 break;
1130 default:
1131 log_error("Unsupported network protocol for fd #%d, %d",
1132 largs.netfd, hand->getProtocol(i));
1133 done = true;
1134 break;
1136 // delete args->buffer;
1140 // // Clear the current message so next time we read new data
1141 // args->buffer->clear();
1142 // largs.buffer->clear();
1144 // Wait for something from one of the file descriptors. This timeout
1145 // is the time between sending packets to the client when there is
1146 // no client input, which effects the streaming speed of big files.
1147 net.setTimeout(5);
1148 hits = net.waitForNetData(hand->getClients());
1149 if (FD_ISSET(0, &hits)) {
1150 FD_CLR(0, &hits);
1151 log_network("Got no hits, %d retries", retries);
1152 // net.closeNet(args->netfd);
1153 // hand->removeClient(args->netfd);
1154 // done = true;
1156 retries++;
1157 #if 0
1158 if (retries >= 10) {
1159 net.closeNet(args->netfd);
1160 hand->removeClient(args->netfd);
1161 done = true;
1163 #endif
1164 } while (!done);
1166 tids.decrement();
1168 } // end of event_handler
1170 // local Variables:
1171 // mode: C++
1172 // indent-tabs-mode: t
1173 // End: