Update with current status
[gnash.git] / cygnal / cygnal.cpp
blobc9e9399900833d089ed5f5c1ff0ba8bb3dc3727f
1 // cygnal.cpp: GNU streaming Flash media server, for Gnash.
2 //
3 // Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
4 // Free Software Foundation, Inc.
5 //
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 #ifdef HAVE_CONFIG_H
22 #include "gnashconfig.h"
23 #endif
25 #include <sys/stat.h>
26 #include <list>
27 #include <map>
28 #include <iostream>
29 #include <sstream>
30 #include <csignal>
31 #include <vector>
32 #include <sys/mman.h>
33 #include <cerrno>
34 #include <sys/types.h>
35 #include <sys/stat.h>
36 #include <fcntl.h>
37 #include <functional>
38 #include <mutex>
39 #include <condition_variable>
41 #include "GnashSleep.h"
42 #include "revno.h"
44 //#include "cvm.h"
46 extern "C"{
47 # include "GnashSystemIOHeaders.h"
48 #ifdef HAVE_GETOPT_H
49 # include <getopt.h>
50 #endif
51 #ifndef __GNUC__
52 extern int optind, getopt(int, char *const *, const char *);
53 extern char *optarg;
54 #endif
58 // classes internal to Gnash
59 #include "network.h"
60 #include "log.h"
61 #include "crc.h"
62 #include "proc.h"
63 #include "rtmp.h"
64 #include "buffer.h"
65 #include "utility.h"
66 #include "limits.h"
67 #include "netstats.h"
68 #include "statistics.h"
69 //#include "stream.h"
70 #include "gmemory.h"
71 #include "diskstream.h"
72 #include "arg_parser.h"
73 #include "GnashException.h"
74 #include "GnashSleep.h" // for usleep comptibility.
75 #include "URL.h"
76 #include "rtmp_client.h"
78 // classes internal to Cygnal
79 #include "rtmp_server.h"
80 #include "http_server.h"
82 #include "handler.h"
83 #include "cache.h"
84 #include "cygnal.h"
86 #ifdef ENABLE_NLS
87 # include <locale>
88 #endif
90 #include <boost/date_time/gregorian/gregorian.hpp>
91 //#include <boost/date_time/local_time/local_time.hpp>
92 #include <boost/date_time/time_zone_base.hpp>
93 #include <boost/date_time/posix_time/posix_time.hpp>
95 #ifndef POLLRDHUP
96 #define POLLRDHUP 0
97 #endif
99 //using gnash::log_network;
100 using namespace std;
101 using namespace gnash;
102 using namespace cygnal;
104 static void usage();
105 static void version_and_copyright();
106 static void cntrlc_handler(int sig);
107 static void hup_handler(int sig);
109 void connection_handler(Network::thread_params_t *args);
110 void event_handler(Network::thread_params_t *args);
111 void admin_handler(Network::thread_params_t *args);
113 // Toggles very verbose debugging info from the network Network class
114 static bool netdebug = false;
116 struct sigaction act1, act2;
118 // The next few global variables have to be global because Boost
119 // threads don't take arguments. Since these are set in main() before
120 // any of the threads are started, and it's value should never change,
121 // it's safe to use these without a mutex, as all threads share the
122 // same read-only value.
124 // This is the default path to look in for files to be streamed.
125 static string docroot;
127 // This is the number of times a thread loop continues, for debugging only
128 int thread_retries = 10;
130 // This is added to the default ports for testing so it doesn't
131 // conflict with apache on the same machine.
132 static int port_offset = 0;
134 // Toggle the admin thread
135 static bool admin = false;
137 // Admin commands are small
138 const int ADMINPKTSIZE = 80;
140 // If set to a non zero value, this limits Cygnal to only one protocol
141 // at a time. This is for debugging only.
142 static int only_port = 0;
144 // These keep track of the number of active threads.
145 ThreadCounter tids;
147 map<int, Network *> networks;
149 // This is the global object for Cygnl
150 // The debug log used by all the gnash libraries.
151 static Cygnal& cyg = Cygnal::getDefaultInstance();
153 // The debug log used by all the gnash libraries.
154 static LogFile& dbglogfile = LogFile::getDefaultInstance();
156 // The user config for Cygnal is loaded and parsed here:
157 static CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
159 // Cache support for responses and files.
160 static Cache& cache = Cache::getDefaultInstance();
162 // The list of active cgis being executed.
163 //static std::map<std::string, Proc> procs; // = proc::getDefaultInstance();
165 // This mutex is used to signify when all the threads are done.
166 static std::condition_variable alldone;
167 static std::mutex alldone_mutex;
169 static std::condition_variable noclients;
170 static std::mutex noclients_mutex;
172 const char *proto_str[] = {
173 "NONE",
174 "HTTP",
175 "HTTPS",
176 "RTMP",
177 "RTMPT",
178 "RTMPTS",
179 "RTMPE",
180 "RTMPS",
181 "DTN"
184 static void
185 usage()
187 cout << _("cygnal -- a streaming media server.") << endl
188 << endl
189 << _("Usage: cygnal [options...]") << endl
190 << _(" -h, --help Print this help and exit") << endl
191 << _(" -V, --version Print version information and exit") << endl
192 << _(" -v, --verbose Output verbose debug info") << endl
193 << _(" -s, --singlethread Disable Multi Threading") << endl
194 << _(" -n, --netdebug Turn on net debugging messages") << endl
195 << _(" -o --only-port Only use port for debugging") << endl
196 << _(" -p --port-offset Port offset for debugging") << endl
197 << _(" -t, --testing Turn on special Gnash testing support") << endl
198 << _(" -a, --admin Enable the administration thread") << endl
199 << _(" -r, --root Document root for all files") << endl
200 << _(" -m, --machine Hostname for this machine") << endl
201 << endl;
205 Cygnal&
206 Cygnal::getDefaultInstance()
208 // GNASH_REPORT_FUNCTION;
209 static Cygnal o;
210 return o;
214 Cygnal::~Cygnal()
216 // GNASH_REPORT_FUNCTION;
219 bool
220 Cygnal::loadPeersFile()
222 // GNASH_REPORT_FUNCTION;
224 loadPeersFile("./peers.conf");
226 loadPeersFile("/etc/peers.conf");
228 // Check the users home directory
229 #ifndef __amigaos4__
230 char *home = std::getenv("HOME");
231 #else
232 //on AmigaOS we have a GNASH: assign that point to program dir
233 char *home = "/gnash";
234 #endif
236 string homefile = home;
237 homefile += "/peers.conf";
239 return loadPeersFile(homefile);
242 bool
243 Cygnal::loadPeersFile(const std::string &filespec)
245 // GNASH_REPORT_FUNCTION;
247 struct stat stats;
248 std::ifstream in;
249 std::string line;
250 string host;
251 string portstr;
252 string cgi;
253 vector<string> supported;
255 // Make sufre the file exists
256 if (stat(filespec.c_str(), &stats) != 0) {
257 return false;
260 in.open(filespec.c_str());
262 if (!in) {
263 log_error(_(": couldn't open file: "), filespec);
264 return false;
267 // Read in each line and parse it
268 size_t lineno = 0;
269 while (std::getline(in, line)) {
271 ++lineno;
273 // Ignore comment and empty lines
274 if (line.empty() || line[0] == '#') {
275 continue;
278 std::istringstream ss(line);
280 // Get the first token
281 if (! (ss >> host)) {
282 // Empty line
283 continue;
286 // 'action' should never be empty, or (ss >> action)
287 // above would have failed
289 if (host[0] == '#') {
290 continue; // discard comments
293 // Get second token
294 if (!(ss >> portstr)) {
295 // Do we need to warn here as well?
296 continue;
299 while (ss >> cgi) {
300 supported.push_back(cgi);
301 continue;
304 // Create a new peer item
305 std::shared_ptr<peer_t> peer(new Cygnal::peer_t);
306 peer->hostname = host;
307 peer->port = strtol(portstr.c_str(), NULL, 0) & 0xffff;
309 _peers.push_back(peer);
312 return true;
315 void
316 Cygnal::probePeers()
318 // GNASH_REPORT_FUNCTION;
320 probePeers(_peers);
323 void
324 Cygnal::probePeers(peer_t &peer)
326 // GNASH_REPORT_FUNCTION;
327 RTMPClient net;
328 stringstream uri;
330 uri << peer.hostname;
332 vector<string>::iterator it;
333 for (it = peer.supported.begin(); it <= peer.supported.end(); ++it) {
334 string tmp = uri.str();
335 // tmp += (*it);
336 // log_network("Constructed: %s/%s", uri.str(), *it);
338 gnash::URL url(uri.str());
339 if (!(peer.fd = net.connectToServer(uri.str()))) {
340 log_network(_("Couldn't connect to %s"), uri.str());
341 peer.connected = false;
342 } else {
343 peer.connected = true;
344 // peer.fd = net.getFileFd();
349 void
350 Cygnal::probePeers(std::vector<std::shared_ptr<peer_t> > &peers)
352 // GNASH_REPORT_FUNCTION;
354 // createClient();
355 std::vector<std::shared_ptr<Cygnal::peer_t> >::iterator it;
356 for (it = peers.begin(); it != peers.end(); ++it) {
357 std::shared_ptr<Cygnal::peer_t> peer = *it;
358 probePeers(*peer);
359 if (peer->connected) {
360 log_network(_("%s is active on fd #%d."), peer->hostname,
361 peer->fd);
362 _active_peers.push_back(*it);
367 void
368 Cygnal::removeHandler(const std::string &path)
370 // GNASH_REPORT_FUNCTION;
371 map<std::string, std::shared_ptr<Handler> >::iterator it;
372 it = _handlers.find(path);
373 if (it != _handlers.end()) {
374 std::lock_guard<std::mutex> lock(_mutex);
375 _handlers.erase(it);
379 std::shared_ptr<Handler>
380 Cygnal::findHandler(const std::string &path)
382 // GNASH_REPORT_FUNCTION;
383 map<std::string, std::shared_ptr<Handler> >::iterator it;
384 std::shared_ptr<Handler> hand;
385 it = _handlers.find(path);
386 if (it != _handlers.end()) {
387 hand = (*it).second;
390 return hand;
393 void
394 Cygnal::dump()
396 std::vector<std::shared_ptr<Cygnal::peer_t> >::iterator it;
397 for (it = _peers.begin(); it != _peers.end(); ++it) {
398 cerr << "Remote Peer: " << (*it)->hostname
399 << ":" << (*it)->port << endl;
404 main(int argc, char *argv[])
406 // Initialize national language support
407 #ifdef ENABLE_NLS
408 setlocale (LC_ALL, "");
409 bindtextdomain (PACKAGE, LOCALEDIR);
410 textdomain (PACKAGE);
411 #endif
413 // This becomes the default hostname, which becomes
414 // 127.0.0.1 or ::1 for the localhost. The --machine
415 // otion can change this.
416 std::string hostname = "localhost.localdomain";
418 const Arg_parser::Option opts[] =
420 { 'h', "help", Arg_parser::no },
421 { 'V', "version", Arg_parser::no },
422 { 'p', "port-offset", Arg_parser::yes },
423 { 'v', "verbose", Arg_parser::no },
424 { 'd', "dump", Arg_parser::no },
425 { 'n', "netdebug", Arg_parser::no },
426 { 't', "testing", Arg_parser::no },
427 { 'a', "admin", Arg_parser::no },
428 { 'r', "root", Arg_parser::yes },
429 { 'o', "only-port", Arg_parser::yes },
430 { 's', "singlethreaded", Arg_parser::no },
431 { 'm', "machine", Arg_parser::yes }
434 Arg_parser parser(argc, argv, opts);
435 if( ! parser.error().empty() ) {
436 cout << parser.error() << endl;
437 exit(EXIT_FAILURE);
440 // crcfile.loadFiles();
442 // Set the log file name before trying to write to
443 // it, or we might get two.
444 dbglogfile.setLogFilename(crcfile.getDebugLog());
446 if (crcfile.verbosityLevel() > 0) {
447 dbglogfile.setVerbosity(crcfile.verbosityLevel());
450 if (crcfile.getDocumentRoot().size() > 0) {
451 docroot = crcfile.getDocumentRoot();
452 } else {
453 docroot = "/var/www/html/software/tests/";
454 crcfile.setDocumentRoot(docroot);
456 if (crcfile.getPortOffset()) {
457 port_offset = crcfile.getPortOffset();
460 // Handle command line arguments
461 for( int i = 0; i < parser.arguments(); ++i ) {
462 const int code = parser.code(i);
463 switch( code ) {
464 case 'h':
465 version_and_copyright();
466 usage();
467 exit(EXIT_SUCCESS);
468 case 'V':
469 version_and_copyright();
470 exit(EXIT_SUCCESS);
471 case 't':
472 crcfile.setTestingFlag(true);
473 break;
474 case 'a':
475 admin = true;
476 break;
477 case 'v':
478 dbglogfile.setVerbosity();
479 LOG_ONCE(log_network(_("Verbose output turned on")))
480 break;
481 case 'p':
482 port_offset = parser.argument<int>(i);
483 crcfile.setPortOffset(port_offset);
484 break;
485 case 'r':
486 docroot = parser.argument(i);
487 break;
488 case 's':
489 crcfile.setThreadingFlag(false);
490 break;
491 case 'n':
492 netdebug = true;
493 dbglogfile.setNetwork(true);
494 break;
495 case 'o':
496 only_port = parser.argument<int>(i);
497 break;
498 case 'd':
499 crcfile.dump();
500 exit(EXIT_SUCCESS);
501 break;
502 case 'm':
503 hostname = parser.argument(i);
504 break;
505 default:
506 log_error(_("Extraneous argument: %s"), parser.argument(i).c_str());
510 log_network(_("Document Root for media files is: %s"), docroot);
511 crcfile.setDocumentRoot(docroot);
513 // load the file of peers. A peer is another instance of Cygnal we
514 // can use for distributed processing.
515 cyg.loadPeersFile();
516 cyg.probePeers();
518 // cyg.dump();
520 // Trap ^C (SIGINT) so we can kill all the threads
521 act1.sa_handler = cntrlc_handler;
522 sigaction (SIGINT, &act1, NULL);
523 act2.sa_handler = hup_handler;
524 sigaction (SIGHUP, &act2, NULL);
525 // sigaction (SIGPIPE, &act, NULL);
527 // Lock a mutex the main() waits in before exiting. This is
528 // because all the actually processing is done by other threads.
529 std::unique_lock<std::mutex> lk(alldone_mutex);
531 // Start the Admin handler. This allows one to connect to Cygnal
532 // at port 1111 and dump statistics to the terminal for tuning
533 // purposes.
534 if (admin) {
535 Network::thread_params_t admin_data;
536 admin_data.port = gnash::ADMIN_PORT;
537 std::thread admin_thread(std::bind(&admin_handler, &admin_data));
540 // Cvm cvm;
541 // cvm.loadMovie("/tmp/out.swf");
543 // If a only-port is specified, we only want to run single
544 // threaded. As all the rest of the code checks the config value
545 // setting, this overrides that in the memory, but doesn't change
546 // the file itself. This feature is really only for debugging,
547 // where it's easier to work with one protocol at a time.
548 if (only_port) {
549 crcfile.setThreadingFlag(false);
552 // Incomming connection handler for port 80, HTTP and
553 // RTMPT. As port 80 requires root access, cygnal supports a
554 // "port offset" for debugging and development of the
555 // server. Since this port offset changes the constant to test
556 // for which protocol, we pass the info to the start thread so
557 // it knows which handler to invoke.
558 Network::thread_params_t *http_data = new Network::thread_params_t;
559 if ((only_port == 0) || (only_port == gnash::HTTP_PORT)) {
560 http_data->tid = 0;
561 http_data->netfd = 0;
562 http_data->filespec = docroot;
563 http_data->protocol = Network::HTTP;
564 http_data->port = port_offset + gnash::HTTP_PORT;
565 http_data->hostname = hostname;
566 if (crcfile.getThreadingFlag()) {
567 std::thread http_thread(std::bind(&connection_handler, http_data));
568 } else {
569 connection_handler(http_data);
573 // Incomming connection handler for port 1935, RTMPT and
574 // RTMPTE. This supports the same port offset as the HTTP handler,
575 // just to keep things consistent.
576 Network::thread_params_t *rtmp_data = new Network::thread_params_t;
577 if ((only_port == 0) || (only_port == gnash::RTMP_PORT)) {
578 rtmp_data->tid = 0;
579 rtmp_data->netfd = 0;
580 rtmp_data->filespec = docroot;
581 rtmp_data->protocol = Network::RTMP;
582 rtmp_data->port = port_offset + gnash::RTMP_PORT;
583 rtmp_data->hostname = hostname;
584 if (crcfile.getThreadingFlag()) {
585 std::thread rtmp_thread(std::bind(&connection_handler, rtmp_data));
586 } else {
587 connection_handler(rtmp_data);
591 // Wait for all the threads to die.
592 alldone.wait(lk);
594 log_network(_("Cygnal done..."));
596 // Delete the data we allowcated to pass to each connection_handler.
597 delete rtmp_data;
598 delete http_data;
600 return(0);
603 // Trap Control-C (SIGINT) so we can cleanly exit
604 static void
605 cntrlc_handler (int sig)
607 log_network(_("Got a %d interrupt"), sig);
608 // sigaction (SIGINT, &act, NULL);
609 exit(EXIT_FAILURE);
612 // Trap SIGHUP so we can
613 static void
614 hup_handler (int /* sig */)
616 if (crcfile.getTestingFlag()) {
617 cerr << "Testing, Testing, Testing..." << endl;
622 static void
623 version_and_copyright()
625 cout << "Cygnal: " << BRANCH_NICK << "_" << BRANCH_REVNO << endl
626 << endl
627 << _("Copyright (C) 2007, 2008, 2009, 2010 Free Software Foundation, Inc.\n"
628 "Cygnal comes with NO WARRANTY, to the extent permitted by law.\n"
629 "You may redistribute copies of Cygnal under the terms of the GNU General\n"
630 "Public License V3 or later. For more information, see the file named COPYING.\n")
631 << endl;
634 // FIXME: this function could be tweaked for better performance
635 void
636 admin_handler(Network::thread_params_t *args)
638 GNASH_REPORT_FUNCTION;
639 int retries = 100;
640 int ret;
642 map<int, Handler *>::iterator hit;
643 stringstream response;
645 Network net;
646 Handler::admin_cmd_e cmd = Handler::POLL;
647 net.createServer(args->hostname, args->port);
648 while (retries > 0) {
649 log_network(_("Starting Admin Handler for port %d"), args->port);
651 if (net.newConnection(true) <= 0) {
652 return;
655 log_network(_("Got an incoming Admin request"));
656 sleep(1);
657 do {
658 Network::byte_t data[ADMINPKTSIZE+1];
659 memset(data, 0, ADMINPKTSIZE+1);
660 const char *ptr = reinterpret_cast<const char *>(data);
661 ret = net.readNet(data, ADMINPKTSIZE, 100);
662 if (ret < 0) {
663 log_network(_("no more admin data, exiting...\n"));
664 if ((ret == 0) && cmd != Handler::POLL) {
665 break;
667 } else {
668 // force the case to make comparisons easier. Only compare enough characters to
669 // till each command is unique.
670 std::transform(ptr, ptr + ret, data, (int(*)(int)) toupper);
671 if (strncmp(ptr, "QUIT", 4) == 0) {
672 cmd = Handler::QUIT;
673 } else if (strncmp(ptr, "STATUS", 5) == 0) {
674 cmd = Handler::STATUS;
675 } else if (strncmp(ptr, "HELP", 2) == 0) {
676 cmd = Handler::HELP;
677 net.writeNet("commands: help, status, poll, interval, statistics, quit.\n");
678 } else if (strncmp(ptr, "POLL", 2) == 0) {
679 cmd = Handler::POLL;
680 } else if (strncmp(ptr, "INTERVAL", 2) == 0) {
681 cmd = Handler::INTERVAL;
684 switch (cmd) {
685 // close this connection
686 case Handler::QUIT:
687 ret = -1;
688 break;
689 case Handler::STATUS:
691 #ifdef USE_STATS_CACHE
692 // cache.dump();
693 string results = cache.stats(false);
694 if (results.size()) {
695 net.writeNet(results);
696 results.clear();
698 #endif
699 #if 0
700 response << handlers.size() << " handlers are currently active.";
701 for (hit = handlers.begin(); hit != handlers.end(); hit++) {
702 int fd = hit->first;
703 Handler *hand = hit->second;
704 response << fd << ","
705 << hand->insize()
706 << "," << hand->outsize()
707 << "\r\n";
708 net.writeNet(response);
710 #endif
712 break;
713 case Handler::POLL:
714 #ifdef USE_STATS_QUEUE
715 response << handlers.size() << " handlers are currently active." << "\r\n";
716 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
717 int fd = hit->first;
718 Handler *hand = hit->second;
719 struct timespec now;
720 clock_gettime (CLOCK_REALTIME, &now);
721 // Incoming que stats
722 CQue::que_stats_t *stats = hand->statsin();
723 float diff = static_cast<float>(((now.tv_sec -
724 stats->start.tv_sec) + ((now.tv_nsec -
725 stats->start.tv_nsec)/1e9)));
726 response << fd
727 << "," << stats->totalbytes
728 << "," << diff
729 << "," << stats->totalin
730 << "," << stats->totalout;
731 // Outgoing que stats
732 stats = hand->statsout();
733 response << "," <<stats->totalbytes
734 << "," << stats->totalin
735 << "," << stats->totalout
736 << "\r\n";
737 net.writeNet(response.str());
739 #endif
740 break;
741 case Handler::INTERVAL:
742 net.writeNet("set interval\n");
743 break;
744 default:
745 break;
747 } while (ret > 0);
748 log_network(_("admin_handler: Done...!\n"));
749 net.closeNet(); // this shuts down this socket connection
751 net.closeConnection(); // this shuts down the server on this connection
753 // All threads should exit now.
754 alldone.notify_all();
757 // A connection handler is started for each port the server needs to
758 // wait on for incoming connections. When it gets an incoming
759 // connection, it reads the first packet to get the resource name, and
760 // then starts the event handler thread if it's a newly requested
761 // resource, otherwise it loads a copy of the cached resource.
762 void
763 connection_handler(Network::thread_params_t *args)
765 // GNASH_REPORT_FUNCTION;
766 int fd = 0;
767 Network net;
768 bool done = false;
769 static int tid = 0;
771 if (netdebug) {
772 net.toggleDebug(true);
774 // Start a server on this tcp/ip port.
775 fd = net.createServer(args->hostname, args->port);
776 if (fd <= 0) {
777 log_error(_("Can't start %s Connection Handler for fd #%d, port %hd"),
778 proto_str[args->protocol], fd, args->port);
779 return;
780 } else {
781 log_network(_("Starting %s Connection Handler for fd #%d, port %hd"),
782 proto_str[args->protocol], fd, args->port);
785 // Get the number of cpus in this system. For multicore
786 // systems we'll get better load balancing if we keep all the
787 // cpus busy. So a pool of threads is started for each cpu,
788 // the default being just one. Each thread is reponsible for
789 // handling part of the total active file descriptors.
790 #ifdef HAVE_SYSCONF
791 long ncpus = sysconf(_SC_NPROCESSORS_ONLN);
792 LOG_ONCE(log_network(_("This system has %d cpus."), ncpus));
793 #endif
794 size_t nfds = crcfile.getFDThread();
796 // log_network("This system is configured for %d file descriptors to be watched by each thread.", nfds);
798 // Get the next thread ID to hand off handling this file
799 // descriptor to. If the limit for threads per cpu hasn't been
800 // set or is set to 0, assume one thread per processor by
801 // default. There won't even be threads for each cpu if
802 // threading has been disabled in the cygnal config file.
803 int spawn_limit = 0;
804 if (nfds == 0) {
805 spawn_limit = ncpus;
806 } else {
807 spawn_limit = ncpus * nfds;
810 // FIXME: this may run forever, we probably want a cleaner way to
811 // test for the end of time.
812 do {
813 net.setPort(args->port);
814 if (netdebug) {
815 net.toggleDebug(true);
818 // Rotate in a range of 0 to the limit.
819 tid = (tid + 1) % (spawn_limit + 1);
820 // log_network("%s handler: thread ID #%d, fd #%d", proto_str[args->protocol], tid, fd);
822 // Wait for a connection to this tcp/ip from a client. If set
823 // to true, this will block until a request comes in. If set
824 // to single threaded mode, this will only allow one client to
825 // connect at a time. This is to make it easier to debug
826 // things when you have a heavily threaded application.
827 args->netfd = net.newConnection(true, fd);
828 if (args->netfd <= 0) {
829 log_network(_("No new %s network connections"),
830 proto_str[args->protocol]);
831 return;
832 } else {
833 log_network(_("*** New %s network connection for thread ID #%d, fd #%d ***"),
834 proto_str[args->protocol], tid, args->netfd);
838 // Setup HTTP handler
840 if (args->protocol == Network::HTTP) {
841 Network::thread_params_t *hargs = new Network::thread_params_t;
842 // std::copy(args, args+sizeof(Network::thread_params_t), &hargs);
843 hargs->protocol = args->protocol;
844 hargs->netfd = args->netfd;
845 #if 0
846 std::shared_ptr<Handler> hand = cyg.findHandler(path);
847 HTTPServer *http = new HTTPServer;
848 hargs.entry = http;
849 http->setDocRoot(crcfile.getDocumentRoot());
850 std::shared_ptr<cygnal::Buffer> buf(http->peekChunk());
851 http->processHeaderFields(*buf);
852 string hostname, path;
853 string::size_type pos = http->getField("host").find(":", 0);
854 if (pos != string::npos) {
855 hostname += http->getField("host").substr(0, pos);
856 } else {
857 hostname += "localhost.localdomain";
859 path = http->getFilespec();
860 string key = hostname + path;
861 #endif
862 string key;
863 Handler *hand = 0;
864 if (!hand) {
865 hand = new Handler;
866 hand->addClient(args->netfd, Network::HTTP);
867 int retries = 10;
868 cygnal::Buffer *buf = 0;
869 do {
870 buf = hand->parseFirstRequest(args->netfd, Network::HTTP);
871 if (!buf) {
872 retries--;
873 continue;
874 } else {
875 break;
877 } while (retries);
878 string &key = hand->getKey(args->netfd);
879 log_network(_("Creating new %s Handler for %s using fd #%d"),
880 proto_str[hargs->protocol], key, hargs->netfd);
881 hargs->handler = hand;
882 hargs->buffer = buf;
883 hargs->filespec = key;
884 // cyg.addHandler(key, hand);
886 // If in multi-threaded mode (the default), start a thread
887 // with a connection_handler for each port we're interested
888 // in. Each port of could have a different protocol.
889 std::bind(event_handler, hargs);
890 if (crcfile.getThreadingFlag() == true) {
891 std::thread event_thread(std::bind(&event_handler, hargs));
892 } else {
893 event_handler(hargs);
894 // We're done, close this network connection
896 } else {
897 log_network(_("Reusing %s Handler for %s using fd #%d"),
898 proto_str[hargs->protocol], key, hargs->netfd);
899 hand->addClient(args->netfd, Network::HTTP);
901 // delete http;
902 } // end of if HTTP
905 // Setup RTMP handler
907 if (args->protocol == Network::RTMP) {
908 Network::thread_params_t *rargs = new Network::thread_params_t;
909 rargs->protocol = args->protocol;
910 rargs->netfd = args->netfd;
911 RTMPServer *rtmp = new RTMPServer;
912 std::shared_ptr<cygnal::Element> tcurl =
913 rtmp->processClientHandShake(args->netfd);
914 if (!tcurl) {
915 // log_error("Couldn't read the tcUrl variable!");
916 rtmp->closeNet(args->netfd);
917 return;
919 URL url(tcurl->to_string());
920 string key = url.hostname() + url.path();
921 std::shared_ptr<Handler> hand = cyg.findHandler(url.path());
922 if (!hand) {
923 log_network(_("Creating new %s Handler for: %s for fd %#d"),
924 proto_str[args->protocol], key, args->netfd);
925 hand.reset(new Handler);
926 cyg.addHandler(key, hand);
927 rargs->entry = rtmp;
928 hand->setNetConnection(rtmp->getNetConnection());
929 std::vector<std::shared_ptr<Cygnal::peer_t> >::iterator it;
930 std::vector<std::shared_ptr<Cygnal::peer_t> > active = cyg.getActive();
931 for (it = active.begin(); it < active.end(); ++it) {
932 Cygnal::peer_t *peer = (*it).get();
933 hand->addRemote(peer->fd);
935 hand->addClient(args->netfd, Network::RTMP);
936 rargs->handler = reinterpret_cast<void *>(hand.get());
937 args->filespec = key;
938 args->entry = rtmp;
940 string cgiroot;
941 char *env = std::getenv("CYGNAL_PLUGINS");
942 if (env != 0) {
943 cgiroot = env;
945 if (crcfile.getCgiRoot().size() > 0) {
946 cgiroot += ":" + crcfile.getCgiRoot();
947 log_network(_("Cygnal Plugin paths are: %s"), cgiroot);
948 } else {
949 cgiroot = PLUGINSDIR;
951 hand->scanDir(cgiroot);
952 std::shared_ptr<Handler::cygnal_init_t> init =
953 hand->initModule(url.path());
955 // this is where the real work gets done.
956 if (init) {
957 // If in multi-threaded mode (the default), start a thread
958 // with a connection_handler for each port we're interested
959 // in. Each port of course has a different protocol.
960 if (crcfile.getThreadingFlag() == true) {
961 std::thread event_thread(std::bind(&event_handler, args));
962 } else {
963 event_handler(args);
964 // We're done, close this network connection
965 net.closeNet(args->netfd);
967 } else {
968 log_error(_("Couldn't load plugin for %s"), key);
971 // // We're done, close this network connection
972 // if (crcfile.getThreadingFlag() == true) {
973 // net.closeNet(args->netfd);
974 // }
976 // delete rtmp;
977 } // end of if RTMP
979 log_network(_("Number of active Threads is %d"), tids.num_of_tids());
981 // net.closeNet(args->netfd); // this shuts down this socket connection
982 log_network(_("Restarting loop for next connection for port %d..."),
983 args->port);
984 } while(!done);
986 // All threads should wake up now.
987 alldone.notify_all();
989 } // end of connection_handler
991 void
992 event_handler(Network::thread_params_t *args)
994 GNASH_REPORT_FUNCTION;
996 Network::thread_params_t largs;
997 // std::copy(args, args+sizeof(Network::thread_params_t), &largs);
998 Handler *hand = reinterpret_cast<Handler *>(args->handler);
1000 largs.protocol = args->protocol;
1001 largs.netfd = args->netfd;
1002 largs.port = args->port;
1003 largs.buffer = args->buffer;
1004 largs.entry = args->entry;
1005 largs.filespec = args->filespec;
1007 Network net;
1008 int timeout = 30;
1009 int retries = 0;
1010 bool done = false;
1012 fd_set hits;
1013 FD_ZERO(&hits);
1014 FD_SET(args->netfd, &hits);
1016 tids.increment();
1018 // We need to calculate the highest numbered file descriptor
1019 // for select. We may want to do this elsewhere, as it could
1020 // be a performance hit as the number of file descriptors gets
1021 // larger.
1022 log_debug("Handler has %d clients attached, %d threads",
1023 hand->getClients().size(), tids.num_of_tids());
1025 int max = 0;
1026 for (size_t i = 0; i<hand->getClients().size(); i++) {
1027 log_debug("Handler client[%d] is: %d", i, hand->getClient(i));
1028 if (hand->getClient(i) >= max) {
1029 max = hand->getClient(i);
1030 // hand->dump();
1034 do {
1036 // If we have active disk streams, send those packets first.
1037 // 0 is a reserved stream, so we start with 1, as the reserved
1038 // stream isn't one we care about here.
1039 if (hand->getActiveDiskStreams()) {
1040 log_network(_("%d active disk streams"),
1041 hand->getActiveDiskStreams());
1042 // hand->dump();
1044 #if 0
1045 std::shared_ptr<DiskStream> filestream(cache.findFile(args->filespec));
1046 if (filestream) {
1047 filestream->dump();
1049 // #else
1050 // cache.dump();
1051 #endif
1052 //hand->dump();
1053 std::shared_ptr<DiskStream> ds;
1054 for (int i=1; i <= hand->getActiveDiskStreams(); i++) {
1055 ds = hand->getDiskStream(i);
1056 if (ds) {
1057 //ds->dump();
1058 // Only play the next chunk of the file.
1059 //log_network("Sending following chunk of %s", ds->getFilespec());
1060 if (ds->play(i, false)) {
1061 if (ds->getState() == DiskStream::CLOSED) {
1062 net.closeNet(args->netfd);
1063 hand->removeClient(args->netfd);
1064 done = true;
1066 } else {
1067 // something went wrong, the stream failed
1068 net.closeNet(args->netfd);
1069 hand->removeClient(args->netfd);
1070 done = true;
1075 // See if we have any data waiting behind any of the file
1076 // descriptors.
1077 for (int i=0; i <= max + 1; i++) {
1078 if (FD_ISSET(i, &hits)) {
1079 FD_CLR(i, &hits);
1080 log_network(_("Got a hit for fd #%d, protocol %s"), i,
1081 proto_str[hand->getProtocol(i)]);
1082 switch (hand->getProtocol(i)) {
1083 case Network::NONE:
1084 log_error(_("No protocol specified!"));
1085 break;
1086 case Network::HTTP:
1088 largs.netfd = i;
1089 // largs.filespec = fullpath;
1090 std::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1091 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1092 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1093 net.closeNet(args->netfd);
1094 hand->removeClient(args->netfd);
1095 done = true;
1096 } else {
1097 log_network(_("Not Done with HTTP connection for fd #%d, it's a persistent connection."), i);
1100 continue;
1102 case Network::RTMP:
1103 args->netfd = i;
1104 // args->filespec = path;
1105 if (!rtmp_handler(args)) {
1106 log_network(_("Done with RTMP connection for fd #%d, CGI "), i, args->filespec);
1107 done = true;
1109 break;
1110 case Network::RTMPT:
1112 net.setTimeout(timeout);
1113 args->netfd = i;
1114 std::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1115 // args->filespec = path;
1116 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1117 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, largs.filespec);
1118 return;
1120 break;
1122 case Network::RTMPTS:
1124 args->netfd = i;
1125 // args->filespec = path;
1126 std::shared_ptr<HTTPServer> &http = hand->getHTTPHandler(i);
1127 if (!http->http_handler(hand, args->netfd, args->buffer)) {
1128 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i, args->filespec);
1129 return;
1131 break;
1133 case Network::RTMPE:
1134 break;
1135 case Network::RTMPS:
1136 break;
1137 case Network::DTN:
1138 break;
1139 default:
1140 log_error(_("Unsupported network protocol for fd #%d, %d"),
1141 largs.netfd, hand->getProtocol(i));
1142 done = true;
1143 break;
1145 // delete args->buffer;
1149 // // Clear the current message so next time we read new data
1150 // args->buffer->clear();
1151 // largs.buffer->clear();
1153 // Wait for something from one of the file descriptors. This timeout
1154 // is the time between sending packets to the client when there is
1155 // no client input, which effects the streaming speed of big files.
1156 net.setTimeout(5);
1157 hits = net.waitForNetData(hand->getClients());
1158 if (FD_ISSET(0, &hits)) {
1159 FD_CLR(0, &hits);
1160 log_network(_("Got no hits, %d retries"), retries);
1161 // net.closeNet(args->netfd);
1162 // hand->removeClient(args->netfd);
1163 // done = true;
1165 retries++;
1166 #if 0
1167 if (retries >= 10) {
1168 net.closeNet(args->netfd);
1169 hand->removeClient(args->netfd);
1170 done = true;
1172 #endif
1173 } while (!done);
1175 tids.decrement();
1177 } // end of event_handler
1179 // local Variables:
1180 // mode: C++
1181 // indent-tabs-mode: nil
1182 // End: