tweak copyright message
[gnash.git] / cygnal / cygnal.cpp
blob67f265ca4b4e7311f770492dced0ab701d993bb3
1 // cygnal.cpp: GNU streaming Flash media server, for Gnash.
2 //
3 // Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
4 // Free Software Foundation, Inc.
5 //
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 #ifdef HAVE_CONFIG_H
22 #include "gnashconfig.h"
23 #endif
25 #include <sys/stat.h>
26 #include <list>
27 #include <map>
28 #include <iostream>
29 #include <sstream>
30 #include <csignal>
31 #include <vector>
32 #include <sys/mman.h>
33 #include <cerrno>
34 #include <sys/types.h>
35 #include <sys/stat.h>
36 #include <fcntl.h>
38 #include "GnashSleep.h"
39 #include "revno.h"
41 //#include "cvm.h"
43 extern "C"{
44 # include "GnashSystemIOHeaders.h"
45 #ifdef HAVE_GETOPT_H
46 # include <getopt.h>
47 #endif
48 #ifndef __GNUC__
49 extern int optind, getopt(int, char *const *, const char *);
50 extern char *optarg;
51 #endif
54 #include <boost/shared_ptr.hpp>
56 // classes internal to Gnash
57 #include "network.h"
58 #include "log.h"
59 #include "crc.h"
60 #include "proc.h"
61 #include "rtmp.h"
62 #include "buffer.h"
63 #include "utility.h"
64 #include "limits.h"
65 #include "netstats.h"
66 #include "statistics.h"
67 //#include "stream.h"
68 #include "gmemory.h"
69 #include "diskstream.h"
70 #include "arg_parser.h"
71 #include "GnashException.h"
72 #include "GnashSleep.h" // for usleep comptibility.
73 #include "URL.h"
74 #include "rtmp_client.h"
76 // classes internal to Cygnal
77 #include "rtmp_server.h"
78 #include "http_server.h"
80 #include "handler.h"
81 #include "cache.h"
82 #include "cygnal.h"
84 #ifdef ENABLE_NLS
85 # include <locale>
86 #endif
88 #include <boost/date_time/gregorian/gregorian.hpp>
89 //#include <boost/date_time/local_time/local_time.hpp>
90 #include <boost/date_time/time_zone_base.hpp>
91 #include <boost/date_time/posix_time/posix_time.hpp>
92 #include <boost/thread/thread.hpp>
93 #include <boost/bind.hpp>
94 #include <boost/thread/mutex.hpp>
95 #include <boost/thread/condition.hpp>
96 #include <boost/thread/tss.hpp>
98 #ifndef POLLRDHUP
99 #define POLLRDHUP 0
100 #endif
102 //using gnash::log_network;
103 using namespace std;
104 using namespace gnash;
105 using namespace cygnal;
107 static void usage();
108 static void version_and_copyright();
109 static void cntrlc_handler(int sig);
110 static void hup_handler(int sig);
112 void connection_handler(Network::thread_params_t *args);
113 void event_handler(Network::thread_params_t *args);
114 void admin_handler(Network::thread_params_t *args);
116 // Toggles very verbose debugging info from the network Network class
117 static bool netdebug = false;
119 struct sigaction act1, act2;
121 // The next few global variables have to be global because Boost
122 // threads don't take arguments. Since these are set in main() before
123 // any of the threads are started, and it's value should never change,
124 // it's safe to use these without a mutex, as all threads share the
125 // same read-only value.
127 // This is the default path to look in for files to be streamed.
128 static string docroot;
130 // This is the number of times a thread loop continues, for debugging only
131 int thread_retries = 10;
133 // This is added to the default ports for testing so it doesn't
134 // conflict with apache on the same machine.
135 static int port_offset = 0;
137 // Toggle the admin thread
138 static bool admin = false;
140 // Admin commands are small
141 const int ADMINPKTSIZE = 80;
143 // If set to a non zero value, this limits Cygnal to only one protocol
144 // at a time. This is for debugging only.
145 static int only_port = 0;
147 // These keep track of the number of active threads.
148 ThreadCounter tids;
150 map<int, Network *> networks;
152 // This is the global object for Cygnl
153 // The debug log used by all the gnash libraries.
154 static Cygnal& cyg = Cygnal::getDefaultInstance();
156 // The debug log used by all the gnash libraries.
157 static LogFile& dbglogfile = LogFile::getDefaultInstance();
159 // The user config for Cygnal is loaded and parsed here:
160 static CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
162 // Cache support for responses and files.
163 static Cache& cache = Cache::getDefaultInstance();
165 // The list of active cgis being executed.
166 //static std::map<std::string, Proc> procs; // = proc::getDefaultInstance();
168 // This mutex is used to signify when all the threads are done.
169 static boost::condition alldone;
170 static boost::mutex alldone_mutex;
172 static boost::condition noclients;
173 static boost::mutex noclients_mutex;
175 const char *proto_str[] = {
176 "NONE",
177 "HTTP",
178 "HTTPS",
179 "RTMP",
180 "RTMPT",
181 "RTMPTS",
182 "RTMPE",
183 "RTMPS",
184 "DTN"
187 static void
188 usage()
190 cout << _("cygnal -- a streaming media server.") << endl
191 << endl
192 << _("Usage: cygnal [options...]") << endl
193 << _(" -h, --help Print this help and exit") << endl
194 << _(" -V, --version Print version information and exit") << endl
195 << _(" -v, --verbose Output verbose debug info") << endl
196 << _(" -s, --singlethread Disable Multi Threading") << endl
197 << _(" -n, --netdebug Turn on net debugging messages") << endl
198 << _(" -o --only-port Only use port for debugging") << endl
199 << _(" -p --port-offset Port offset for debugging") << endl
200 << _(" -t, --testing Turn on special Gnash testing support") << endl
201 << _(" -a, --admin Enable the administration thread") << endl
202 << _(" -r, --root Document root for all files") << endl
203 << endl;
207 Cygnal&
208 Cygnal::getDefaultInstance()
210 // GNASH_REPORT_FUNCTION;
211 static Cygnal o;
212 return o;
216 Cygnal::~Cygnal()
218 // GNASH_REPORT_FUNCTION;
221 bool
222 Cygnal::loadPeersFile()
224 // GNASH_REPORT_FUNCTION;
226 loadPeersFile("./peers.conf");
228 loadPeersFile("/etc/peers.conf");
230 // Check the users home directory
231 #ifndef __amigaos4__
232 char *home = std::getenv("HOME");
233 #else
234 //on AmigaOS we have a GNASH: assign that point to program dir
235 char *home = "/gnash";
236 #endif
238 string homefile = home;
239 homefile += "/peers.conf";
241 return loadPeersFile(homefile);
244 bool
245 Cygnal::loadPeersFile(const std::string &filespec)
247 // GNASH_REPORT_FUNCTION;
249 struct stat stats;
250 std::ifstream in;
251 std::string line;
252 string host;
253 string portstr;
254 string cgi;
255 vector<string> supported;
257 // Make sufre the file exists
258 if (stat(filespec.c_str(), &stats) != 0) {
259 return false;
262 in.open(filespec.c_str());
264 if (!in) {
265 log_error(_(": couldn't open file: "), filespec);
266 return false;
269 // Read in each line and parse it
270 size_t lineno = 0;
271 while (std::getline(in, line)) {
273 ++lineno;
275 // Ignore comment and empty lines
276 if (line.empty() || line[0] == '#') {
277 continue;
280 std::istringstream ss(line);
282 // Get the first token
283 if (! (ss >> host)) {
284 // Empty line
285 continue;
288 // 'action' should never be empty, or (ss >> action)
289 // above would have failed
291 if (host[0] == '#') {
292 continue; // discard comments
295 // Get second token
296 if (!(ss >> portstr)) {
297 // Do we need to warn here as well?
298 continue;
301 while (ss >> cgi) {
302 supported.push_back(cgi);
303 continue;
306 // Create a new peer item
307 boost::shared_ptr<peer_t> peer(new Cygnal::peer_t);
308 peer->hostname = host;
309 peer->port = strtol(portstr.c_str(), NULL, 0) & 0xffff;
311 _peers.push_back(peer);
314 return true;
317 void
318 Cygnal::probePeers()
320 // GNASH_REPORT_FUNCTION;
322 probePeers(_peers);
325 void
326 Cygnal::probePeers(peer_t &peer)
328 // GNASH_REPORT_FUNCTION;
329 RTMPClient net;
330 stringstream uri;
332 uri << peer.hostname;
334 vector<string>::iterator it;
335 for (it = peer.supported.begin(); it <= peer.supported.end(); ++it) {
336 string tmp = uri.str();
337 // tmp += (*it);
338 // log_network("Constructed: %s/%s", uri.str(), *it);
340 gnash::URL url(uri.str());
341 if (!(peer.fd = net.connectToServer(uri.str()))) {
342 log_network(_("Couldn't connect to %s"), uri.str());
343 peer.connected = false;
344 } else {
345 peer.connected = true;
346 // peer.fd = net.getFileFd();
351 void
352 Cygnal::probePeers(std::vector<boost::shared_ptr<peer_t> > &peers)
354 // GNASH_REPORT_FUNCTION;
356 // createClient();
357 std::vector<boost::shared_ptr<Cygnal::peer_t> >::iterator it;
358 for (it = peers.begin(); it != peers.end(); ++it) {
359 boost::shared_ptr<Cygnal::peer_t> peer = *it;
360 probePeers(*peer);
361 if (peer->connected) {
362 log_network(_("%s is active on fd #%d."), peer->hostname,
363 peer->fd);
364 _active_peers.push_back(*it);
369 void
370 Cygnal::removeHandler(const std::string &path)
372 // GNASH_REPORT_FUNCTION;
373 map<std::string, boost::shared_ptr<Handler> >::iterator it;
374 it = _handlers.find(path);
375 if (it != _handlers.end()) {
376 boost::mutex::scoped_lock lock(_mutex);
377 _handlers.erase(it);
381 boost::shared_ptr<Handler>
382 Cygnal::findHandler(const std::string &path)
384 // GNASH_REPORT_FUNCTION;
385 map<std::string, boost::shared_ptr<Handler> >::iterator it;
386 boost::shared_ptr<Handler> hand;
387 it = _handlers.find(path);
388 if (it != _handlers.end()) {
389 hand = (*it).second;
392 return hand;
395 void
396 Cygnal::dump()
398 std::vector<boost::shared_ptr<Cygnal::peer_t> >::iterator it;
399 for (it = _peers.begin(); it != _peers.end(); ++it) {
400 cerr << "Remote Peer: " << (*it)->hostname
401 << ":" << (*it)->port << endl;
406 main(int argc, char *argv[])
408 // Initialize national language support
409 #ifdef ENABLE_NLS
410 setlocale (LC_ALL, "");
411 bindtextdomain (PACKAGE, LOCALEDIR);
412 textdomain (PACKAGE);
413 #endif
415 const Arg_parser::Option opts[] =
417 { 'h', "help", Arg_parser::no },
418 { 'V', "version", Arg_parser::no },
419 { 'p', "port-offset", Arg_parser::yes },
420 { 'v', "verbose", Arg_parser::no },
421 { 'd', "dump", Arg_parser::no },
422 { 'n', "netdebug", Arg_parser::no },
423 { 't', "testing", Arg_parser::no },
424 { 'a', "admin", Arg_parser::no },
425 { 'r', "root", Arg_parser::yes },
426 { 'o', "only-port", Arg_parser::yes },
427 { 's', "singlethreaded", Arg_parser::no }
430 Arg_parser parser(argc, argv, opts);
431 if( ! parser.error().empty() )
433 cout << parser.error() << endl;
434 exit(EXIT_FAILURE);
437 // crcfile.loadFiles();
439 // Set the log file name before trying to write to
440 // it, or we might get two.
441 dbglogfile.setLogFilename(crcfile.getDebugLog());
443 if (crcfile.verbosityLevel() > 0) {
444 dbglogfile.setVerbosity(crcfile.verbosityLevel());
447 if (crcfile.getDocumentRoot().size() > 0) {
448 docroot = crcfile.getDocumentRoot();
449 } else {
450 docroot = "/var/www/html/software/tests/";
451 crcfile.setDocumentRoot(docroot);
453 if (crcfile.getPortOffset()) {
454 port_offset = crcfile.getPortOffset();
457 // Handle command line arguments
458 for( int i = 0; i < parser.arguments(); ++i ) {
459 const int code = parser.code(i);
460 switch( code ) {
461 case 'h':
462 version_and_copyright();
463 usage();
464 exit(EXIT_SUCCESS);
465 case 'V':
466 version_and_copyright();
467 exit(EXIT_SUCCESS);
468 case 't':
469 crcfile.setTestingFlag(true);
470 break;
471 case 'a':
472 admin = true;
473 break;
474 case 'v':
475 dbglogfile.setVerbosity();
476 LOG_ONCE(log_network(_("Verbose output turned on")))
477 break;
478 case 'p':
479 port_offset = parser.argument<int>(i);
480 crcfile.setPortOffset(port_offset);
481 break;
482 case 'r':
483 docroot = parser.argument(i);
484 break;
485 case 's':
486 crcfile.setThreadingFlag(false);
487 break;
488 case 'n':
489 netdebug = true;
490 dbglogfile.setNetwork(true);
491 break;
492 case 'o':
493 only_port = parser.argument<int>(i);
494 break;
495 case 'd':
496 crcfile.dump();
497 exit(EXIT_SUCCESS);
498 break;
499 default:
500 log_error(_("Extraneous argument: %s"), parser.argument(i).c_str());
504 log_network(_("Document Root for media files is: %s"), docroot);
505 crcfile.setDocumentRoot(docroot);
507 // load the file of peers. A peer is another instance of Cygnal we
508 // can use for distributed processing.
509 cyg.loadPeersFile();
510 cyg.probePeers();
512 // cyg.dump();
514 // Trap ^C (SIGINT) so we can kill all the threads
515 act1.sa_handler = cntrlc_handler;
516 sigaction (SIGINT, &act1, NULL);
517 act2.sa_handler = hup_handler;
518 sigaction (SIGHUP, &act2, NULL);
519 // sigaction (SIGPIPE, &act, NULL);
521 // Lock a mutex the main() waits in before exiting. This is
522 // because all the actually processing is done by other threads.
523 boost::mutex::scoped_lock lk(alldone_mutex);
525 // Start the Admin handler. This allows one to connect to Cygnal
526 // at port 1111 and dump statistics to the terminal for tuning
527 // purposes.
528 if (admin) {
529 Network::thread_params_t admin_data;
530 admin_data.port = gnash::ADMIN_PORT;
531 boost::thread admin_thread(boost::bind(&admin_handler, &admin_data));
534 // Cvm cvm;
535 // cvm.loadMovie("/tmp/out.swf");
537 // If a only-port is specified, we only want to run single
538 // threaded. As all the rest of the code checks the config value
539 // setting, this overrides that in the memory, but doesn't change
540 // the file itself. This feature is really only for debugging,
541 // where it's easier to work with one protocol at a time.
542 if (only_port) {
543 crcfile.setThreadingFlag(false);
546 // Incomming connection handler for port 80, HTTP and
547 // RTMPT. As port 80 requires root access, cygnal supports a
548 // "port offset" for debugging and development of the
549 // server. Since this port offset changes the constant to test
550 // for which protocol, we pass the info to the start thread so
551 // it knows which handler to invoke.
552 Network::thread_params_t *http_data = new Network::thread_params_t;
553 if ((only_port == 0) || (only_port == gnash::HTTP_PORT)) {
554 http_data->tid = 0;
555 http_data->netfd = 0;
556 http_data->filespec = docroot;
557 http_data->protocol = Network::HTTP;
558 http_data->port = port_offset + gnash::HTTP_PORT;
559 if (crcfile.getThreadingFlag()) {
560 boost::thread http_thread(boost::bind(&connection_handler, http_data));
561 } else {
562 connection_handler(http_data);
566 // Incomming connection handler for port 1935, RTMPT and
567 // RTMPTE. This supports the same port offset as the HTTP handler,
568 // just to keep things consistent.
569 Network::thread_params_t *rtmp_data = new Network::thread_params_t;
570 if ((only_port == 0) || (only_port == gnash::RTMP_PORT)) {
571 rtmp_data->tid = 0;
572 rtmp_data->netfd = 0;
573 rtmp_data->filespec = docroot;
574 rtmp_data->protocol = Network::RTMP;
575 rtmp_data->port = port_offset + gnash::RTMP_PORT;
576 if (crcfile.getThreadingFlag()) {
577 boost::thread rtmp_thread(boost::bind(&connection_handler, rtmp_data));
578 } else {
579 connection_handler(rtmp_data);
583 // Wait for all the threads to die.
584 alldone.wait(lk);
586 log_network(_("Cygnal done..."));
588 // Delete the data we allowcated to pass to each connection_handler.
589 delete rtmp_data;
590 delete http_data;
592 return(0);
595 // Trap Control-C (SIGINT) so we can cleanly exit
596 static void
597 cntrlc_handler (int sig)
599 log_network(_("Got a %d interrupt"), sig);
600 // sigaction (SIGINT, &act, NULL);
601 exit(EXIT_FAILURE);
604 // Trap SIGHUP so we can
605 static void
606 hup_handler (int /* sig */)
608 if (crcfile.getTestingFlag()) {
609 cerr << "Testing, Testing, Testing..." << endl;
614 static void
615 version_and_copyright()
617 cout << "Cygnal: " << BRANCH_NICK << "_" << BRANCH_REVNO << endl
618 << endl
619 << _("Copyright (C) 2007, 2008, 2009, 2010 Free Software Foundation, Inc.\n"
620 "Cygnal comes with NO WARRANTY, to the extent permitted by law.\n"
621 "You may redistribute copies of Cygnal under the terms of the GNU General\n"
622 "Public License V3 or later. For more information, see the file named COPYING.\n")
623 << endl;
626 // FIXME: this function could be tweaked for better performance
627 void
628 admin_handler(Network::thread_params_t *args)
630 GNASH_REPORT_FUNCTION;
631 int retries = 100;
632 int ret;
634 map<int, Handler *>::iterator hit;
635 stringstream response;
637 Network net;
638 Handler::admin_cmd_e cmd = Handler::POLL;
639 net.createServer(args->port);
640 while (retries > 0) {
641 log_network(_("Starting Admin Handler for port %d"), args->port);
642 net.newConnection(true);
643 log_network(_("Got an incoming Admin request"));
644 sleep(1);
645 do {
646 Network::byte_t data[ADMINPKTSIZE+1];
647 memset(data, 0, ADMINPKTSIZE+1);
648 const char *ptr = reinterpret_cast<const char *>(data);
649 ret = net.readNet(data, ADMINPKTSIZE, 100);
650 if (ret < 0) {
651 log_network(_("no more admin data, exiting...\n"));
652 if ((ret == 0) && cmd != Handler::POLL) {
653 break;
655 } else {
656 // force the case to make comparisons easier. Only compare enough characters to
657 // till each command is unique.
658 std::transform(ptr, ptr + ret, data, (int(*)(int)) toupper);
659 if (strncmp(ptr, "QUIT", 4) == 0) {
660 cmd = Handler::QUIT;
661 } else if (strncmp(ptr, "STATUS", 5) == 0) {
662 cmd = Handler::STATUS;
663 } else if (strncmp(ptr, "HELP", 2) == 0) {
664 cmd = Handler::HELP;
665 net.writeNet("commands: help, status, poll, interval, statistics, quit.\n");
666 } else if (strncmp(ptr, "POLL", 2) == 0) {
667 cmd = Handler::POLL;
668 } else if (strncmp(ptr, "INTERVAL", 2) == 0) {
669 cmd = Handler::INTERVAL;
672 switch (cmd) {
673 // close this connection
674 case Handler::QUIT:
675 ret = -1;
676 break;
677 case Handler::STATUS:
679 #ifdef USE_STATS_CACHE
680 // cache.dump();
681 string results = cache.stats(false);
682 if (results.size()) {
683 net.writeNet(results);
684 results.clear();
686 #endif
687 #if 0
688 response << handlers.size() << " handlers are currently active.";
689 for (hit = handlers.begin(); hit != handlers.end(); hit++) {
690 int fd = hit->first;
691 Handler *hand = hit->second;
692 response << fd << ","
693 << hand->insize()
694 << "," << hand->outsize()
695 << "\r\n";
696 net.writeNet(response);
698 #endif
700 break;
701 case Handler::POLL:
702 #ifdef USE_STATS_QUEUE
703 response << handlers.size() << " handlers are currently active." << "\r\n";
704 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
705 int fd = hit->first;
706 Handler *hand = hit->second;
707 struct timespec now;
708 clock_gettime (CLOCK_REALTIME, &now);
709 // Incoming que stats
710 CQue::que_stats_t *stats = hand->statsin();
711 float diff = static_cast<float>(((now.tv_sec -
712 stats->start.tv_sec) + ((now.tv_nsec -
713 stats->start.tv_nsec)/1e9)));
714 response << fd
715 << "," << stats->totalbytes
716 << "," << diff
717 << "," << stats->totalin
718 << "," << stats->totalout;
719 // Outgoing que stats
720 stats = hand->statsout();
721 response << "," <<stats->totalbytes
722 << "," << stats->totalin
723 << "," << stats->totalout
724 << "\r\n";
725 net.writeNet(response.str());
727 #endif
728 break;
729 case Handler::INTERVAL:
730 net.writeNet("set interval\n");
731 break;
732 default:
733 break;
735 } while (ret > 0);
736 log_network(_("admin_handler: Done...!\n"));
737 net.closeNet(); // this shuts down this socket connection
739 net.closeConnection(); // this shuts down the server on this connection
741 // All threads should exit now.
742 alldone.notify_all();
745 // A connection handler is started for each port the server needs to
746 // wait on for incoming connections. When it gets an incoming
747 // connection, it reads the first packet to get the resource name, and
748 // then starts the event handler thread if it's a newly requested
749 // resource, otherwise it loads a copy of the cached resource.
750 void
751 connection_handler(Network::thread_params_t *args)
753 // GNASH_REPORT_FUNCTION;
754 int fd = 0;
755 Network net;
756 bool done = false;
757 static int tid = 0;
759 if (netdebug) {
760 net.toggleDebug(true);
762 // Start a server on this tcp/ip port.
763 fd = net.createServer(args->port);
764 if (fd <= 0) {
765 log_error(_("Can't start %s Connection Handler for fd #%d, port %hd"),
766 proto_str[args->protocol], fd, args->port);
767 return;
768 } else {
769 log_network(_("Starting %s Connection Handler for fd #%d, port %hd"),
770 proto_str[args->protocol], fd, args->port);
773 // Get the number of cpus in this system. For multicore
774 // systems we'll get better load balancing if we keep all the
775 // cpus busy. So a pool of threads is started for each cpu,
776 // the default being just one. Each thread is reponsible for
777 // handling part of the total active file descriptors.
778 #ifdef HAVE_SYSCONF
779 long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
780 LOG_ONCE(log_network(_("This system has %d cpus."), ncpus));
781 #endif
782 size_t nfds = crcfile.getFDThread();
784 // log_network("This system is configured for %d file descriptors to be watched by each thread.", nfds);
786 // Get the next thread ID to hand off handling this file
787 // descriptor to. If the limit for threads per cpu hasn't been
788 // set or is set to 0, assume one thread per processor by
789 // default. There won't even be threads for each cpu if
790 // threading has been disabled in the cygnal config file.
791 int spawn_limit = 0;
792 if (nfds == 0) {
793 spawn_limit = ncpus;
794 } else {
795 spawn_limit = ncpus * nfds;
798 // FIXME: this may run forever, we probably want a cleaner way to
799 // test for the end of time.
800 do {
801 net.setPort(args->port);
802 if (netdebug) {
803 net.toggleDebug(true);
806 // Rotate in a range of 0 to the limit.
807 tid = (tid + 1) % (spawn_limit + 1);
808 // log_network("%s handler: thread ID #%d, fd #%d", proto_str[args->protocol], tid, fd);
810 // Wait for a connection to this tcp/ip from a client. If set
811 // to true, this will block until a request comes in. If set
812 // to single threaded mode, this will only allow one client to
813 // connect at a time. This is to make it easier to debug
814 // things when you have a heavily threaded application.
815 args->netfd = net.newConnection(true, fd);
816 if (args->netfd <= 0) {
817 log_network(_("No new %s network connections"),
818 proto_str[args->protocol]);
819 continue;
820 } else {
821 log_network(_("*** New %s network connection for thread ID #%d, fd #%d ***"),
822 proto_str[args->protocol], tid, args->netfd);
826 // Setup HTTP handler
828 if (args->protocol == Network::HTTP) {
829 Network::thread_params_t *hargs = new Network::thread_params_t;
830 // std::copy(args, args+sizeof(Network::thread_params_t), &hargs);
831 hargs->protocol = args->protocol;
832 hargs->netfd = args->netfd;
833 #if 0
834 boost::shared_ptr<Handler> hand = cyg.findHandler(path);
835 HTTPServer *http = new HTTPServer;
836 hargs.entry = http;
837 http->setDocRoot(crcfile.getDocumentRoot());
838 boost::shared_ptr<cygnal::Buffer> buf(http->peekChunk());
839 http->processHeaderFields(*buf);
840 string hostname, path;
841 string::size_type pos = http->getField("host").find(":", 0);
842 if (pos != string::npos) {
843 hostname += http->getField("host").substr(0, pos);
844 } else {
845 hostname += "localhost";
847 path = http->getFilespec();
848 string key = hostname + path;
849 #endif
850 string key;
851 Handler *hand = 0;
852 if (!hand) {
853 hand = new Handler;
854 hand->addClient(args->netfd, Network::HTTP);
855 int retries = 10;
856 cygnal::Buffer *buf = 0;
857 do {
858 buf = hand->parseFirstRequest(args->netfd, Network::HTTP);
859 if (!buf) {
860 retries--;
861 continue;
862 } else {
863 break;
865 } while (retries);
866 string &key = hand->getKey(args->netfd);
867 log_network(_("Creating new %s Handler for %s using fd #%d"),
868 proto_str[hargs->protocol], key, hargs->netfd);
869 hargs->handler = hand;
870 hargs->buffer = buf;
871 hargs->filespec = key;
872 // cyg.addHandler(key, hand);
874 // If in multi-threaded mode (the default), start a thread
875 // with a connection_handler for each port we're interested
876 // in. Each port of could have a different protocol.
877 boost::bind(event_handler, hargs);
878 if (crcfile.getThreadingFlag() == true) {
879 boost::thread event_thread(boost::bind(&event_handler, hargs));
880 } else {
881 event_handler(hargs);
882 // We're done, close this network connection
884 } else {
885 log_network(_("Reusing %s Handler for %s using fd #%d"),
886 proto_str[hargs->protocol], key, hargs->netfd);
887 hand->addClient(args->netfd, Network::HTTP);
889 // delete http;
890 } // end of if HTTP
893 // Setup RTMP handler
895 if (args->protocol == Network::RTMP) {
896 Network::thread_params_t *rargs = new Network::thread_params_t;
897 rargs->protocol = args->protocol;
898 rargs->netfd = args->netfd;
899 RTMPServer *rtmp = new RTMPServer;
900 boost::shared_ptr<cygnal::Element> tcurl =
901 rtmp->processClientHandShake(args->netfd);
902 if (!tcurl) {
903 // log_error("Couldn't read the tcUrl variable!");
904 rtmp->closeNet(args->netfd);
905 return;
907 URL url(tcurl->to_string());
908 string key = url.hostname() + url.path();
909 boost::shared_ptr<Handler> hand = cyg.findHandler(url.path());
910 if (!hand) {
911 log_network(_("Creating new %s Handler for: %s for fd %#d"),
912 proto_str[args->protocol], key, args->netfd);
913 hand.reset(new Handler);
914 cyg.addHandler(key, hand);
915 rargs->entry = rtmp;
916 hand->setNetConnection(rtmp->getNetConnection());
917 std::vector<boost::shared_ptr<Cygnal::peer_t> >::iterator it;
918 std::vector<boost::shared_ptr<Cygnal::peer_t> > active = cyg.getActive();
919 for (it = active.begin(); it < active.end(); ++it) {
920 Cygnal::peer_t *peer = (*it).get();
921 hand->addRemote(peer->fd);
923 hand->addClient(args->netfd, Network::RTMP);
924 rargs->handler = reinterpret_cast<void *>(hand.get());
925 args->filespec = key;
926 args->entry = rtmp;
928 string cgiroot;
929 char *env = std::getenv("CYGNAL_PLUGINS");
930 if (env != 0) {
931 cgiroot = env;
933 if (crcfile.getCgiRoot().size() > 0) {
934 cgiroot += ":" + crcfile.getCgiRoot();
935 log_network(_("Cygnal Plugin paths are: %s"), cgiroot);
936 } else {
937 cgiroot = PLUGINSDIR;
939 hand->scanDir(cgiroot);
940 boost::shared_ptr<Handler::cygnal_init_t> init =
941 hand->initModule(url.path());
943 // this is where the real work gets done.
944 if (init) {
945 // If in multi-threaded mode (the default), start a thread
946 // with a connection_handler for each port we're interested
947 // in. Each port of course has a different protocol.
948 if (crcfile.getThreadingFlag() == true) {
949 boost::thread event_thread(boost::bind(&event_handler, args));
950 } else {
951 event_handler(args);
952 // We're done, close this network connection
953 net.closeNet(args->netfd);
955 } else {
956 log_error(_("Couldn't load plugin for %s"), key);
959 // // We're done, close this network connection
960 // if (crcfile.getThreadingFlag() == true) {
961 // net.closeNet(args->netfd);
962 // }
964 // delete rtmp;
965 } // end of if RTMP
967 log_network(_("Number of active Threads is %d"), tids.num_of_tids());
969 // net.closeNet(args->netfd); // this shuts down this socket connection
970 log_network(_("Restarting loop for next connection for port %d..."),
971 args->port);
972 } while(!done);
974 // All threads should wake up now.
975 alldone.notify_all();
977 } // end of connection_handler
979 void
980 event_handler(Network::thread_params_t *args)
982 GNASH_REPORT_FUNCTION;
984 Network::thread_params_t largs;
985 // std::copy(args, args+sizeof(Network::thread_params_t), &largs);
986 Handler *hand = reinterpret_cast<Handler *>(args->handler);
988 largs.protocol = args->protocol;
989 largs.netfd = args->netfd;
990 largs.port = args->port;
991 largs.buffer = args->buffer;
992 largs.entry = args->entry;
993 largs.filespec = args->filespec;
995 Network net;
996 int timeout = 30;
997 int retries = 0;
998 bool done = false;
1000 fd_set hits;
1001 FD_ZERO(&hits);
1002 FD_SET(args->netfd, &hits);
1004 tids.increment();
1006 // We need to calculate the highest numbered file descriptor
1007 // for select. We may want to do this elsewhere, as it could
1008 // be a performance hit as the number of file descriptors gets
1009 // larger.
1010 log_debug("Handler has %d clients attached, %d threads",
1011 hand->getClients().size(), tids.num_of_tids());
1013 int max = 0;
1014 for (size_t i = 0; i<hand->getClients().size(); i++) {
1015 log_debug("Handler client[%d] is: %d", i, hand->getClient(i));
1016 if (hand->getClient(i) >= max) {
1017 max = hand->getClient(i);
1018 // hand->dump();
1022 do {
1024 // If we have active disk streams, send those packets first.
1025 // 0 is a reserved stream, so we start with 1, as the reserved
1026 // stream isn't one we care about here.
1027 if (hand->getActiveDiskStreams()) {
1028 log_network(_("%d active disk streams"),
1029 hand->getActiveDiskStreams());
1030 // hand->dump();
1032 #if 0
1033 boost::shared_ptr<DiskStream> filestream(cache.findFile(args->filespec));
1034 if (filestream) {
1035 filestream->dump();
1037 // #else
1038 // cache.dump();
1039 #endif
1040 //hand->dump();
1041 boost::shared_ptr<DiskStream> ds;
1042 for (int i=1; i <= hand->getActiveDiskStreams(); i++) {
1043 ds = hand->getDiskStream(i);
1044 if (ds) {
1045 //ds->dump();
1046 // Only play the next chunk of the file.
1047 //log_network("Sending following chunk of %s", ds->getFilespec());
1048 if (ds->play(i, false)) {
1049 if (ds->getState() == DiskStream::CLOSED) {
1050 net.closeNet(args->netfd);
1051 hand->removeClient(args->netfd);
1052 done = true;
1054 } else {
1055 // something went wrong, the stream failed
1056 net.closeNet(args->netfd);
1057 hand->removeClient(args->netfd);
1058 done = true;
1063 // See if we have any data waiting behind any of the file
1064 // descriptors.
1065 for (int i=0; i <= max + 1; i++) {
1066 if (FD_ISSET(i, &hits)) {
1067 FD_CLR(i, &hits);
1068 log_network(_("Got a hit for fd #%d, protocol %s"), i,
1069 proto_str[hand->getProtocol(i)]);
1070 switch (hand->getProtocol(i)) {
1071 case Network::NONE:
1072 log_error(_("No protocol specified!"));
1073 break;
1074 case Network::HTTP:
1076 largs.netfd = i;
1077 // largs.filespec = fullpath;
1078 boost::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1079 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1080 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1081 net.closeNet(args->netfd);
1082 hand->removeClient(args->netfd);
1083 done = true;
1084 } else {
1085 log_network(_("Not Done with HTTP connection for fd #%d, it's a persistent connection."), i);
1088 continue;
1090 case Network::RTMP:
1091 args->netfd = i;
1092 // args->filespec = path;
1093 if (!rtmp_handler(args)) {
1094 log_network(_("Done with RTMP connection for fd #%d, CGI "), i, args->filespec);
1095 done = true;
1097 break;
1098 case Network::RTMPT:
1100 net.setTimeout(timeout);
1101 args->netfd = i;
1102 boost::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1103 // args->filespec = path;
1104 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1105 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, largs.filespec);
1106 return;
1108 break;
1110 case Network::RTMPTS:
1112 args->netfd = i;
1113 // args->filespec = path;
1114 boost::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1115 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1116 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1117 return;
1119 break;
1121 case Network::RTMPE:
1122 break;
1123 case Network::RTMPS:
1124 break;
1125 case Network::DTN:
1126 break;
1127 default:
1128 log_error(_("Unsupported network protocol for fd #%d, %d"),
1129 largs.netfd, hand->getProtocol(i));
1130 done = true;
1131 break;
1133 // delete args->buffer;
1137 // // Clear the current message so next time we read new data
1138 // args->buffer->clear();
1139 // largs.buffer->clear();
1141 // Wait for something from one of the file descriptors. This timeout
1142 // is the time between sending packets to the client when there is
1143 // no client input, which effects the streaming speed of big files.
1144 net.setTimeout(5);
1145 hits = net.waitForNetData(hand->getClients());
1146 if (FD_ISSET(0, &hits)) {
1147 FD_CLR(0, &hits);
1148 log_network(_("Got no hits, %d retries"), retries);
1149 // net.closeNet(args->netfd);
1150 // hand->removeClient(args->netfd);
1151 // done = true;
1153 retries++;
1154 #if 0
1155 if (retries >= 10) {
1156 net.closeNet(args->netfd);
1157 hand->removeClient(args->netfd);
1158 done = true;
1160 #endif
1161 } while (!done);
1163 tids.decrement();
1165 } // end of event_handler
1167 // local Variables:
1168 // mode: C++
1169 // indent-tabs-mode: nil
1170 // End: