1 // cygnal.cpp: GNU streaming Flash media server, for Gnash.
3 // Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
4 // Free Software Foundation, Inc.
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include "gnashconfig.h"
34 #include <sys/types.h>
38 #include "GnashSleep.h"
44 # include "GnashSystemIOHeaders.h"
49 extern int optind
, getopt(int, char *const *, const char *);
54 #include <boost/shared_ptr.hpp>
56 // classes internal to Gnash
66 #include "statistics.h"
69 #include "diskstream.h"
70 #include "arg_parser.h"
71 #include "GnashException.h"
72 #include "GnashSleep.h" // for usleep comptibility.
74 #include "rtmp_client.h"
76 // classes internal to Cygnal
77 #include "rtmp_server.h"
78 #include "http_server.h"
88 #include <boost/date_time/gregorian/gregorian.hpp>
89 //#include <boost/date_time/local_time/local_time.hpp>
90 #include <boost/date_time/time_zone_base.hpp>
91 #include <boost/date_time/posix_time/posix_time.hpp>
92 #include <boost/thread/thread.hpp>
93 #include <boost/bind.hpp>
94 #include <boost/thread/mutex.hpp>
95 #include <boost/thread/condition.hpp>
96 #include <boost/thread/tss.hpp>
102 //using gnash::log_network;
104 using namespace gnash
;
105 using namespace cygnal
;
108 static void version_and_copyright();
109 static void cntrlc_handler(int sig
);
110 static void hup_handler(int sig
);
112 void connection_handler(Network::thread_params_t
*args
);
113 void event_handler(Network::thread_params_t
*args
);
114 void admin_handler(Network::thread_params_t
*args
);
116 // Toggles very verbose debugging info from the network Network class
117 static bool netdebug
= false;
119 struct sigaction act1
, act2
;
121 // The next few global variables have to be global because Boost
122 // threads don't take arguments. Since these are set in main() before
123 // any of the threads are started, and it's value should never change,
124 // it's safe to use these without a mutex, as all threads share the
125 // same read-only value.
127 // This is the default path to look in for files to be streamed.
128 static string docroot
;
130 // This is the number of times a thread loop continues, for debugging only
131 int thread_retries
= 10;
133 // This is added to the default ports for testing so it doesn't
134 // conflict with apache on the same machine.
135 static int port_offset
= 0;
137 // Toggle the admin thread
138 static bool admin
= false;
140 // Admin commands are small
141 const int ADMINPKTSIZE
= 80;
143 // If set to a non zero value, this limits Cygnal to only one protocol
144 // at a time. This is for debugging only.
145 static int only_port
= 0;
147 // These keep track of the number of active threads.
150 map
<int, Network
*> networks
;
152 // This is the global object for Cygnl
153 // The debug log used by all the gnash libraries.
154 static Cygnal
& cyg
= Cygnal::getDefaultInstance();
156 // The debug log used by all the gnash libraries.
157 static LogFile
& dbglogfile
= LogFile::getDefaultInstance();
159 // The user config for Cygnal is loaded and parsed here:
160 static CRcInitFile
& crcfile
= CRcInitFile::getDefaultInstance();
162 // Cache support for responses and files.
163 static Cache
& cache
= Cache::getDefaultInstance();
165 // The list of active cgis being executed.
166 //static std::map<std::string, Proc> procs; // = proc::getDefaultInstance();
168 // This mutex is used to signify when all the threads are done.
169 static boost::condition alldone
;
170 static boost::mutex alldone_mutex
;
172 static boost::condition noclients
;
173 static boost::mutex noclients_mutex
;
175 const char *proto_str
[] = {
190 cout
<< _("cygnal -- a streaming media server.") << endl
192 << _("Usage: cygnal [options...]") << endl
193 << _(" -h, --help Print this help and exit") << endl
194 << _(" -V, --version Print version information and exit") << endl
195 << _(" -v, --verbose Output verbose debug info") << endl
196 << _(" -s, --singlethread Disable Multi Threading") << endl
197 << _(" -n, --netdebug Turn on net debugging messages") << endl
198 << _(" -o --only-port Only use port for debugging") << endl
199 << _(" -p --port-offset Port offset for debugging") << endl
200 << _(" -t, --testing Turn on special Gnash testing support") << endl
201 << _(" -a, --admin Enable the administration thread") << endl
202 << _(" -r, --root Document root for all files") << endl
203 << _(" -m, --machine Hostname for this machine") << endl
209 Cygnal::getDefaultInstance()
211 // GNASH_REPORT_FUNCTION;
219 // GNASH_REPORT_FUNCTION;
223 Cygnal::loadPeersFile()
225 // GNASH_REPORT_FUNCTION;
227 loadPeersFile("./peers.conf");
229 loadPeersFile("/etc/peers.conf");
231 // Check the users home directory
233 char *home
= std::getenv("HOME");
235 //on AmigaOS we have a GNASH: assign that point to program dir
236 char *home
= "/gnash";
239 string homefile
= home
;
240 homefile
+= "/peers.conf";
242 return loadPeersFile(homefile
);
246 Cygnal::loadPeersFile(const std::string
&filespec
)
248 // GNASH_REPORT_FUNCTION;
256 vector
<string
> supported
;
258 // Make sufre the file exists
259 if (stat(filespec
.c_str(), &stats
) != 0) {
263 in
.open(filespec
.c_str());
266 log_error(_(": couldn't open file: "), filespec
);
270 // Read in each line and parse it
272 while (std::getline(in
, line
)) {
276 // Ignore comment and empty lines
277 if (line
.empty() || line
[0] == '#') {
281 std::istringstream
ss(line
);
283 // Get the first token
284 if (! (ss
>> host
)) {
289 // 'action' should never be empty, or (ss >> action)
290 // above would have failed
292 if (host
[0] == '#') {
293 continue; // discard comments
297 if (!(ss
>> portstr
)) {
298 // Do we need to warn here as well?
303 supported
.push_back(cgi
);
307 // Create a new peer item
308 boost::shared_ptr
<peer_t
> peer(new Cygnal::peer_t
);
309 peer
->hostname
= host
;
310 peer
->port
= strtol(portstr
.c_str(), NULL
, 0) & 0xffff;
312 _peers
.push_back(peer
);
321 // GNASH_REPORT_FUNCTION;
327 Cygnal::probePeers(peer_t
&peer
)
329 // GNASH_REPORT_FUNCTION;
333 uri
<< peer
.hostname
;
335 vector
<string
>::iterator it
;
336 for (it
= peer
.supported
.begin(); it
<= peer
.supported
.end(); ++it
) {
337 string tmp
= uri
.str();
339 // log_network("Constructed: %s/%s", uri.str(), *it);
341 gnash::URL
url(uri
.str());
342 if (!(peer
.fd
= net
.connectToServer(uri
.str()))) {
343 log_network(_("Couldn't connect to %s"), uri
.str());
344 peer
.connected
= false;
346 peer
.connected
= true;
347 // peer.fd = net.getFileFd();
353 Cygnal::probePeers(std::vector
<boost::shared_ptr
<peer_t
> > &peers
)
355 // GNASH_REPORT_FUNCTION;
358 std::vector
<boost::shared_ptr
<Cygnal::peer_t
> >::iterator it
;
359 for (it
= peers
.begin(); it
!= peers
.end(); ++it
) {
360 boost::shared_ptr
<Cygnal::peer_t
> peer
= *it
;
362 if (peer
->connected
) {
363 log_network(_("%s is active on fd #%d."), peer
->hostname
,
365 _active_peers
.push_back(*it
);
371 Cygnal::removeHandler(const std::string
&path
)
373 // GNASH_REPORT_FUNCTION;
374 map
<std::string
, boost::shared_ptr
<Handler
> >::iterator it
;
375 it
= _handlers
.find(path
);
376 if (it
!= _handlers
.end()) {
377 boost::mutex::scoped_lock
lock(_mutex
);
382 boost::shared_ptr
<Handler
>
383 Cygnal::findHandler(const std::string
&path
)
385 // GNASH_REPORT_FUNCTION;
386 map
<std::string
, boost::shared_ptr
<Handler
> >::iterator it
;
387 boost::shared_ptr
<Handler
> hand
;
388 it
= _handlers
.find(path
);
389 if (it
!= _handlers
.end()) {
399 std::vector
<boost::shared_ptr
<Cygnal::peer_t
> >::iterator it
;
400 for (it
= _peers
.begin(); it
!= _peers
.end(); ++it
) {
401 cerr
<< "Remote Peer: " << (*it
)->hostname
402 << ":" << (*it
)->port
<< endl
;
407 main(int argc
, char *argv
[])
409 // Initialize national language support
411 setlocale (LC_ALL
, "");
412 bindtextdomain (PACKAGE
, LOCALEDIR
);
413 textdomain (PACKAGE
);
416 // This becomes the default hostname, which becomes
417 // 127.0.0.1 or ::1 for the localhost. The --machine
418 // otion can change this.
419 std::string hostname
= "localhost.localdomain";
421 const Arg_parser::Option opts
[] =
423 { 'h', "help", Arg_parser::no
},
424 { 'V', "version", Arg_parser::no
},
425 { 'p', "port-offset", Arg_parser::yes
},
426 { 'v', "verbose", Arg_parser::no
},
427 { 'd', "dump", Arg_parser::no
},
428 { 'n', "netdebug", Arg_parser::no
},
429 { 't', "testing", Arg_parser::no
},
430 { 'a', "admin", Arg_parser::no
},
431 { 'r', "root", Arg_parser::yes
},
432 { 'o', "only-port", Arg_parser::yes
},
433 { 's', "singlethreaded", Arg_parser::no
},
434 { 'm', "machine", Arg_parser::yes
}
437 Arg_parser
parser(argc
, argv
, opts
);
438 if( ! parser
.error().empty() ) {
439 cout
<< parser
.error() << endl
;
443 // crcfile.loadFiles();
445 // Set the log file name before trying to write to
446 // it, or we might get two.
447 dbglogfile
.setLogFilename(crcfile
.getDebugLog());
449 if (crcfile
.verbosityLevel() > 0) {
450 dbglogfile
.setVerbosity(crcfile
.verbosityLevel());
453 if (crcfile
.getDocumentRoot().size() > 0) {
454 docroot
= crcfile
.getDocumentRoot();
456 docroot
= "/var/www/html/software/tests/";
457 crcfile
.setDocumentRoot(docroot
);
459 if (crcfile
.getPortOffset()) {
460 port_offset
= crcfile
.getPortOffset();
463 // Handle command line arguments
464 for( int i
= 0; i
< parser
.arguments(); ++i
) {
465 const int code
= parser
.code(i
);
468 version_and_copyright();
472 version_and_copyright();
475 crcfile
.setTestingFlag(true);
481 dbglogfile
.setVerbosity();
482 LOG_ONCE(log_network(_("Verbose output turned on")))
485 port_offset
= parser
.argument
<int>(i
);
486 crcfile
.setPortOffset(port_offset
);
489 docroot
= parser
.argument(i
);
492 crcfile
.setThreadingFlag(false);
496 dbglogfile
.setNetwork(true);
499 only_port
= parser
.argument
<int>(i
);
506 hostname
= parser
.argument(i
);
509 log_error(_("Extraneous argument: %s"), parser
.argument(i
).c_str());
513 log_network(_("Document Root for media files is: %s"), docroot
);
514 crcfile
.setDocumentRoot(docroot
);
516 // load the file of peers. A peer is another instance of Cygnal we
517 // can use for distributed processing.
523 // Trap ^C (SIGINT) so we can kill all the threads
524 act1
.sa_handler
= cntrlc_handler
;
525 sigaction (SIGINT
, &act1
, NULL
);
526 act2
.sa_handler
= hup_handler
;
527 sigaction (SIGHUP
, &act2
, NULL
);
528 // sigaction (SIGPIPE, &act, NULL);
530 // Lock a mutex the main() waits in before exiting. This is
531 // because all the actually processing is done by other threads.
532 boost::mutex::scoped_lock
lk(alldone_mutex
);
534 // Start the Admin handler. This allows one to connect to Cygnal
535 // at port 1111 and dump statistics to the terminal for tuning
538 Network::thread_params_t admin_data
;
539 admin_data
.port
= gnash::ADMIN_PORT
;
540 boost::thread
admin_thread(boost::bind(&admin_handler
, &admin_data
));
544 // cvm.loadMovie("/tmp/out.swf");
546 // If a only-port is specified, we only want to run single
547 // threaded. As all the rest of the code checks the config value
548 // setting, this overrides that in the memory, but doesn't change
549 // the file itself. This feature is really only for debugging,
550 // where it's easier to work with one protocol at a time.
552 crcfile
.setThreadingFlag(false);
555 // Incomming connection handler for port 80, HTTP and
556 // RTMPT. As port 80 requires root access, cygnal supports a
557 // "port offset" for debugging and development of the
558 // server. Since this port offset changes the constant to test
559 // for which protocol, we pass the info to the start thread so
560 // it knows which handler to invoke.
561 Network::thread_params_t
*http_data
= new Network::thread_params_t
;
562 if ((only_port
== 0) || (only_port
== gnash::HTTP_PORT
)) {
564 http_data
->netfd
= 0;
565 http_data
->filespec
= docroot
;
566 http_data
->protocol
= Network::HTTP
;
567 http_data
->port
= port_offset
+ gnash::HTTP_PORT
;
568 http_data
->hostname
= hostname
;
569 if (crcfile
.getThreadingFlag()) {
570 boost::thread
http_thread(boost::bind(&connection_handler
, http_data
));
572 connection_handler(http_data
);
576 // Incomming connection handler for port 1935, RTMPT and
577 // RTMPTE. This supports the same port offset as the HTTP handler,
578 // just to keep things consistent.
579 Network::thread_params_t
*rtmp_data
= new Network::thread_params_t
;
580 if ((only_port
== 0) || (only_port
== gnash::RTMP_PORT
)) {
582 rtmp_data
->netfd
= 0;
583 rtmp_data
->filespec
= docroot
;
584 rtmp_data
->protocol
= Network::RTMP
;
585 rtmp_data
->port
= port_offset
+ gnash::RTMP_PORT
;
586 rtmp_data
->hostname
= hostname
;
587 if (crcfile
.getThreadingFlag()) {
588 boost::thread
rtmp_thread(boost::bind(&connection_handler
, rtmp_data
));
590 connection_handler(rtmp_data
);
594 // Wait for all the threads to die.
597 log_network(_("Cygnal done..."));
599 // Delete the data we allowcated to pass to each connection_handler.
606 // Trap Control-C (SIGINT) so we can cleanly exit
608 cntrlc_handler (int sig
)
610 log_network(_("Got a %d interrupt"), sig
);
611 // sigaction (SIGINT, &act, NULL);
615 // Trap SIGHUP so we can
617 hup_handler (int /* sig */)
619 if (crcfile
.getTestingFlag()) {
620 cerr
<< "Testing, Testing, Testing..." << endl
;
626 version_and_copyright()
628 cout
<< "Cygnal: " << BRANCH_NICK
<< "_" << BRANCH_REVNO
<< endl
630 << _("Copyright (C) 2007, 2008, 2009, 2010 Free Software Foundation, Inc.\n"
631 "Cygnal comes with NO WARRANTY, to the extent permitted by law.\n"
632 "You may redistribute copies of Cygnal under the terms of the GNU General\n"
633 "Public License V3 or later. For more information, see the file named COPYING.\n")
637 // FIXME: this function could be tweaked for better performance
639 admin_handler(Network::thread_params_t
*args
)
641 GNASH_REPORT_FUNCTION
;
645 map
<int, Handler
*>::iterator hit
;
646 stringstream response
;
649 Handler::admin_cmd_e cmd
= Handler::POLL
;
650 net
.createServer(args
->hostname
, args
->port
);
651 while (retries
> 0) {
652 log_network(_("Starting Admin Handler for port %d"), args
->port
);
654 if (net
.newConnection(true) <= 0) {
658 log_network(_("Got an incoming Admin request"));
661 Network::byte_t data
[ADMINPKTSIZE
+1];
662 memset(data
, 0, ADMINPKTSIZE
+1);
663 const char *ptr
= reinterpret_cast<const char *>(data
);
664 ret
= net
.readNet(data
, ADMINPKTSIZE
, 100);
666 log_network(_("no more admin data, exiting...\n"));
667 if ((ret
== 0) && cmd
!= Handler::POLL
) {
671 // force the case to make comparisons easier. Only compare enough characters to
672 // till each command is unique.
673 std::transform(ptr
, ptr
+ ret
, data
, (int(*)(int)) toupper
);
674 if (strncmp(ptr
, "QUIT", 4) == 0) {
676 } else if (strncmp(ptr
, "STATUS", 5) == 0) {
677 cmd
= Handler::STATUS
;
678 } else if (strncmp(ptr
, "HELP", 2) == 0) {
680 net
.writeNet("commands: help, status, poll, interval, statistics, quit.\n");
681 } else if (strncmp(ptr
, "POLL", 2) == 0) {
683 } else if (strncmp(ptr
, "INTERVAL", 2) == 0) {
684 cmd
= Handler::INTERVAL
;
688 // close this connection
692 case Handler::STATUS
:
694 #ifdef USE_STATS_CACHE
696 string results
= cache
.stats(false);
697 if (results
.size()) {
698 net
.writeNet(results
);
703 response
<< handlers
.size() << " handlers are currently active.";
704 for (hit
= handlers
.begin(); hit
!= handlers
.end(); hit
++) {
706 Handler
*hand
= hit
->second
;
707 response
<< fd
<< ","
709 << "," << hand
->outsize()
711 net
.writeNet(response
);
717 #ifdef USE_STATS_QUEUE
718 response
<< handlers
.size() << " handlers are currently active." << "\r\n";
719 for (hit
= handlers
.begin(); hit
!= handlers
.end(); ++hit
) {
721 Handler
*hand
= hit
->second
;
723 clock_gettime (CLOCK_REALTIME
, &now
);
724 // Incoming que stats
725 CQue::que_stats_t
*stats
= hand
->statsin();
726 float diff
= static_cast<float>(((now
.tv_sec
-
727 stats
->start
.tv_sec
) + ((now
.tv_nsec
-
728 stats
->start
.tv_nsec
)/1e9
)));
730 << "," << stats
->totalbytes
732 << "," << stats
->totalin
733 << "," << stats
->totalout
;
734 // Outgoing que stats
735 stats
= hand
->statsout();
736 response
<< "," <<stats
->totalbytes
737 << "," << stats
->totalin
738 << "," << stats
->totalout
740 net
.writeNet(response
.str());
744 case Handler::INTERVAL
:
745 net
.writeNet("set interval\n");
751 log_network(_("admin_handler: Done...!\n"));
752 net
.closeNet(); // this shuts down this socket connection
754 net
.closeConnection(); // this shuts down the server on this connection
756 // All threads should exit now.
757 alldone
.notify_all();
760 // A connection handler is started for each port the server needs to
761 // wait on for incoming connections. When it gets an incoming
762 // connection, it reads the first packet to get the resource name, and
763 // then starts the event handler thread if it's a newly requested
764 // resource, otherwise it loads a copy of the cached resource.
766 connection_handler(Network::thread_params_t
*args
)
768 // GNASH_REPORT_FUNCTION;
775 net
.toggleDebug(true);
777 // Start a server on this tcp/ip port.
778 fd
= net
.createServer(args
->hostname
, args
->port
);
780 log_error(_("Can't start %s Connection Handler for fd #%d, port %hd"),
781 proto_str
[args
->protocol
], fd
, args
->port
);
784 log_network(_("Starting %s Connection Handler for fd #%d, port %hd"),
785 proto_str
[args
->protocol
], fd
, args
->port
);
788 // Get the number of cpus in this system. For multicore
789 // systems we'll get better load balancing if we keep all the
790 // cpus busy. So a pool of threads is started for each cpu,
791 // the default being just one. Each thread is reponsible for
792 // handling part of the total active file descriptors.
794 long ncpus
= sysconf(_SC_NPROCESSORS_ONLN
);
795 LOG_ONCE(log_network(_("This system has %d cpus."), ncpus
));
797 size_t nfds
= crcfile
.getFDThread();
799 // log_network("This system is configured for %d file descriptors to be watched by each thread.", nfds);
801 // Get the next thread ID to hand off handling this file
802 // descriptor to. If the limit for threads per cpu hasn't been
803 // set or is set to 0, assume one thread per processor by
804 // default. There won't even be threads for each cpu if
805 // threading has been disabled in the cygnal config file.
810 spawn_limit
= ncpus
* nfds
;
813 // FIXME: this may run forever, we probably want a cleaner way to
814 // test for the end of time.
816 net
.setPort(args
->port
);
818 net
.toggleDebug(true);
821 // Rotate in a range of 0 to the limit.
822 tid
= (tid
+ 1) % (spawn_limit
+ 1);
823 // log_network("%s handler: thread ID #%d, fd #%d", proto_str[args->protocol], tid, fd);
825 // Wait for a connection to this tcp/ip from a client. If set
826 // to true, this will block until a request comes in. If set
827 // to single threaded mode, this will only allow one client to
828 // connect at a time. This is to make it easier to debug
829 // things when you have a heavily threaded application.
830 args
->netfd
= net
.newConnection(true, fd
);
831 if (args
->netfd
<= 0) {
832 log_network(_("No new %s network connections"),
833 proto_str
[args
->protocol
]);
836 log_network(_("*** New %s network connection for thread ID #%d, fd #%d ***"),
837 proto_str
[args
->protocol
], tid
, args
->netfd
);
841 // Setup HTTP handler
843 if (args
->protocol
== Network::HTTP
) {
844 Network::thread_params_t
*hargs
= new Network::thread_params_t
;
845 // std::copy(args, args+sizeof(Network::thread_params_t), &hargs);
846 hargs
->protocol
= args
->protocol
;
847 hargs
->netfd
= args
->netfd
;
849 boost::shared_ptr
<Handler
> hand
= cyg
.findHandler(path
);
850 HTTPServer
*http
= new HTTPServer
;
852 http
->setDocRoot(crcfile
.getDocumentRoot());
853 boost::shared_ptr
<cygnal::Buffer
> buf(http
->peekChunk());
854 http
->processHeaderFields(*buf
);
855 string hostname
, path
;
856 string::size_type pos
= http
->getField("host").find(":", 0);
857 if (pos
!= string::npos
) {
858 hostname
+= http
->getField("host").substr(0, pos
);
860 hostname
+= "localhost.localdomain";
862 path
= http
->getFilespec();
863 string key
= hostname
+ path
;
869 hand
->addClient(args
->netfd
, Network::HTTP
);
871 cygnal::Buffer
*buf
= 0;
873 buf
= hand
->parseFirstRequest(args
->netfd
, Network::HTTP
);
881 string
&key
= hand
->getKey(args
->netfd
);
882 log_network(_("Creating new %s Handler for %s using fd #%d"),
883 proto_str
[hargs
->protocol
], key
, hargs
->netfd
);
884 hargs
->handler
= hand
;
886 hargs
->filespec
= key
;
887 // cyg.addHandler(key, hand);
889 // If in multi-threaded mode (the default), start a thread
890 // with a connection_handler for each port we're interested
891 // in. Each port of could have a different protocol.
892 boost::bind(event_handler
, hargs
);
893 if (crcfile
.getThreadingFlag() == true) {
894 boost::thread
event_thread(boost::bind(&event_handler
, hargs
));
896 event_handler(hargs
);
897 // We're done, close this network connection
900 log_network(_("Reusing %s Handler for %s using fd #%d"),
901 proto_str
[hargs
->protocol
], key
, hargs
->netfd
);
902 hand
->addClient(args
->netfd
, Network::HTTP
);
908 // Setup RTMP handler
910 if (args
->protocol
== Network::RTMP
) {
911 Network::thread_params_t
*rargs
= new Network::thread_params_t
;
912 rargs
->protocol
= args
->protocol
;
913 rargs
->netfd
= args
->netfd
;
914 RTMPServer
*rtmp
= new RTMPServer
;
915 boost::shared_ptr
<cygnal::Element
> tcurl
=
916 rtmp
->processClientHandShake(args
->netfd
);
918 // log_error("Couldn't read the tcUrl variable!");
919 rtmp
->closeNet(args
->netfd
);
922 URL
url(tcurl
->to_string());
923 string key
= url
.hostname() + url
.path();
924 boost::shared_ptr
<Handler
> hand
= cyg
.findHandler(url
.path());
926 log_network(_("Creating new %s Handler for: %s for fd %#d"),
927 proto_str
[args
->protocol
], key
, args
->netfd
);
928 hand
.reset(new Handler
);
929 cyg
.addHandler(key
, hand
);
931 hand
->setNetConnection(rtmp
->getNetConnection());
932 std::vector
<boost::shared_ptr
<Cygnal::peer_t
> >::iterator it
;
933 std::vector
<boost::shared_ptr
<Cygnal::peer_t
> > active
= cyg
.getActive();
934 for (it
= active
.begin(); it
< active
.end(); ++it
) {
935 Cygnal::peer_t
*peer
= (*it
).get();
936 hand
->addRemote(peer
->fd
);
938 hand
->addClient(args
->netfd
, Network::RTMP
);
939 rargs
->handler
= reinterpret_cast<void *>(hand
.get());
940 args
->filespec
= key
;
944 char *env
= std::getenv("CYGNAL_PLUGINS");
948 if (crcfile
.getCgiRoot().size() > 0) {
949 cgiroot
+= ":" + crcfile
.getCgiRoot();
950 log_network(_("Cygnal Plugin paths are: %s"), cgiroot
);
952 cgiroot
= PLUGINSDIR
;
954 hand
->scanDir(cgiroot
);
955 boost::shared_ptr
<Handler::cygnal_init_t
> init
=
956 hand
->initModule(url
.path());
958 // this is where the real work gets done.
960 // If in multi-threaded mode (the default), start a thread
961 // with a connection_handler for each port we're interested
962 // in. Each port of course has a different protocol.
963 if (crcfile
.getThreadingFlag() == true) {
964 boost::thread
event_thread(boost::bind(&event_handler
, args
));
967 // We're done, close this network connection
968 net
.closeNet(args
->netfd
);
971 log_error(_("Couldn't load plugin for %s"), key
);
974 // // We're done, close this network connection
975 // if (crcfile.getThreadingFlag() == true) {
976 // net.closeNet(args->netfd);
982 log_network(_("Number of active Threads is %d"), tids
.num_of_tids());
984 // net.closeNet(args->netfd); // this shuts down this socket connection
985 log_network(_("Restarting loop for next connection for port %d..."),
989 // All threads should wake up now.
990 alldone
.notify_all();
992 } // end of connection_handler
995 event_handler(Network::thread_params_t
*args
)
997 GNASH_REPORT_FUNCTION
;
999 Network::thread_params_t largs
;
1000 // std::copy(args, args+sizeof(Network::thread_params_t), &largs);
1001 Handler
*hand
= reinterpret_cast<Handler
*>(args
->handler
);
1003 largs
.protocol
= args
->protocol
;
1004 largs
.netfd
= args
->netfd
;
1005 largs
.port
= args
->port
;
1006 largs
.buffer
= args
->buffer
;
1007 largs
.entry
= args
->entry
;
1008 largs
.filespec
= args
->filespec
;
1017 FD_SET(args
->netfd
, &hits
);
1021 // We need to calculate the highest numbered file descriptor
1022 // for select. We may want to do this elsewhere, as it could
1023 // be a performance hit as the number of file descriptors gets
1025 log_debug("Handler has %d clients attached, %d threads",
1026 hand
->getClients().size(), tids
.num_of_tids());
1029 for (size_t i
= 0; i
<hand
->getClients().size(); i
++) {
1030 log_debug("Handler client[%d] is: %d", i
, hand
->getClient(i
));
1031 if (hand
->getClient(i
) >= max
) {
1032 max
= hand
->getClient(i
);
1039 // If we have active disk streams, send those packets first.
1040 // 0 is a reserved stream, so we start with 1, as the reserved
1041 // stream isn't one we care about here.
1042 if (hand
->getActiveDiskStreams()) {
1043 log_network(_("%d active disk streams"),
1044 hand
->getActiveDiskStreams());
1048 boost::shared_ptr
<DiskStream
> filestream(cache
.findFile(args
->filespec
));
1056 boost::shared_ptr
<DiskStream
> ds
;
1057 for (int i
=1; i
<= hand
->getActiveDiskStreams(); i
++) {
1058 ds
= hand
->getDiskStream(i
);
1061 // Only play the next chunk of the file.
1062 //log_network("Sending following chunk of %s", ds->getFilespec());
1063 if (ds
->play(i
, false)) {
1064 if (ds
->getState() == DiskStream::CLOSED
) {
1065 net
.closeNet(args
->netfd
);
1066 hand
->removeClient(args
->netfd
);
1070 // something went wrong, the stream failed
1071 net
.closeNet(args
->netfd
);
1072 hand
->removeClient(args
->netfd
);
1078 // See if we have any data waiting behind any of the file
1080 for (int i
=0; i
<= max
+ 1; i
++) {
1081 if (FD_ISSET(i
, &hits
)) {
1083 log_network(_("Got a hit for fd #%d, protocol %s"), i
,
1084 proto_str
[hand
->getProtocol(i
)]);
1085 switch (hand
->getProtocol(i
)) {
1087 log_error(_("No protocol specified!"));
1092 // largs.filespec = fullpath;
1093 boost::shared_ptr
<HTTPServer
> &http
= hand
->getHTTPHandler(i
);
1094 if (!http
->http_handler(hand
, args
->netfd
, args
->buffer
)) {
1095 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i
, args
->filespec
);
1096 net
.closeNet(args
->netfd
);
1097 hand
->removeClient(args
->netfd
);
1100 log_network(_("Not Done with HTTP connection for fd #%d, it's a persistent connection."), i
);
1107 // args->filespec = path;
1108 if (!rtmp_handler(args
)) {
1109 log_network(_("Done with RTMP connection for fd #%d, CGI "), i
, args
->filespec
);
1113 case Network::RTMPT
:
1115 net
.setTimeout(timeout
);
1117 boost::shared_ptr
<HTTPServer
> &http
= hand
->getHTTPHandler(i
);
1118 // args->filespec = path;
1119 if (!http
->http_handler(hand
, args
->netfd
, args
->buffer
)) {
1120 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i
, largs
.filespec
);
1125 case Network::RTMPTS
:
1128 // args->filespec = path;
1129 boost::shared_ptr
<HTTPServer
> &http
= hand
->getHTTPHandler(i
);
1130 if (!http
->http_handler(hand
, args
->netfd
, args
->buffer
)) {
1131 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i
, args
->filespec
);
1136 case Network::RTMPE
:
1138 case Network::RTMPS
:
1143 log_error(_("Unsupported network protocol for fd #%d, %d"),
1144 largs
.netfd
, hand
->getProtocol(i
));
1148 // delete args->buffer;
1152 // // Clear the current message so next time we read new data
1153 // args->buffer->clear();
1154 // largs.buffer->clear();
1156 // Wait for something from one of the file descriptors. This timeout
1157 // is the time between sending packets to the client when there is
1158 // no client input, which effects the streaming speed of big files.
1160 hits
= net
.waitForNetData(hand
->getClients());
1161 if (FD_ISSET(0, &hits
)) {
1163 log_network(_("Got no hits, %d retries"), retries
);
1164 // net.closeNet(args->netfd);
1165 // hand->removeClient(args->netfd);
1170 if (retries
>= 10) {
1171 net
.closeNet(args
->netfd
);
1172 hand
->removeClient(args
->netfd
);
1180 } // end of event_handler
1184 // indent-tabs-mode: nil