1 // cygnal.cpp: GNU streaming Flash media server, for Gnash.
3 // Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
4 // Free Software Foundation, Inc.
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include "gnashconfig.h"
34 #include <sys/types.h>
38 #include "GnashSleep.h"
44 # include "GnashSystemIOHeaders.h"
49 extern int optind
, getopt(int, char *const *, const char *);
54 #include <boost/shared_ptr.hpp>
56 // classes internal to Gnash
66 #include "statistics.h"
69 #include "diskstream.h"
70 #include "arg_parser.h"
71 #include "GnashException.h"
72 #include "GnashSleep.h" // for usleep comptibility.
74 #include "rtmp_client.h"
76 // classes internal to Cygnal
77 #include "rtmp_server.h"
78 #include "http_server.h"
88 #include <boost/date_time/gregorian/gregorian.hpp>
89 //#include <boost/date_time/local_time/local_time.hpp>
90 #include <boost/date_time/time_zone_base.hpp>
91 #include <boost/date_time/posix_time/posix_time.hpp>
92 #include <boost/thread/thread.hpp>
93 #include <boost/bind.hpp>
94 #include <boost/thread/mutex.hpp>
95 #include <boost/thread/condition.hpp>
96 #include <boost/thread/tss.hpp>
102 //using gnash::log_network;
104 using namespace gnash
;
105 using namespace cygnal
;
108 static void version_and_copyright();
109 static void cntrlc_handler(int sig
);
110 static void hup_handler(int sig
);
112 void connection_handler(Network::thread_params_t
*args
);
113 void event_handler(Network::thread_params_t
*args
);
114 void admin_handler(Network::thread_params_t
*args
);
116 // Toggles very verbose debugging info from the network Network class
117 static bool netdebug
= false;
119 struct sigaction act1
, act2
;
121 // The next few global variables have to be global because Boost
122 // threads don't take arguments. Since these are set in main() before
123 // any of the threads are started, and it's value should never change,
124 // it's safe to use these without a mutex, as all threads share the
125 // same read-only value.
127 // This is the default path to look in for files to be streamed.
128 static string docroot
;
130 // This is the number of times a thread loop continues, for debugging only
131 int thread_retries
= 10;
133 // This is added to the default ports for testing so it doesn't
134 // conflict with apache on the same machine.
135 static int port_offset
= 0;
137 // Toggle the admin thread
138 static bool admin
= false;
140 // Admin commands are small
141 const int ADMINPKTSIZE
= 80;
143 // If set to a non zero value, this limits Cygnal to only one protocol
144 // at a time. This is for debugging only.
145 static int only_port
= 0;
147 // These keep track of the number of active threads.
150 map
<int, Network
*> networks
;
152 // This is the global object for Cygnl
153 // The debug log used by all the gnash libraries.
154 static Cygnal
& cyg
= Cygnal::getDefaultInstance();
156 // The debug log used by all the gnash libraries.
157 static LogFile
& dbglogfile
= LogFile::getDefaultInstance();
159 // The user config for Cygnal is loaded and parsed here:
160 static CRcInitFile
& crcfile
= CRcInitFile::getDefaultInstance();
162 // Cache support for responses and files.
163 static Cache
& cache
= Cache::getDefaultInstance();
165 // The list of active cgis being executed.
166 //static std::map<std::string, Proc> procs; // = proc::getDefaultInstance();
168 // This mutex is used to signify when all the threads are done.
169 static boost::condition alldone
;
170 static boost::mutex alldone_mutex
;
172 static boost::condition noclients
;
173 static boost::mutex noclients_mutex
;
175 const char *proto_str
[] = {
190 cout
<< _("cygnal -- a streaming media server.") << endl
192 << _("Usage: cygnal [options...]") << endl
193 << _(" -h, --help Print this help and exit") << endl
194 << _(" -V, --version Print version information and exit") << endl
195 << _(" -v, --verbose Output verbose debug info") << endl
196 << _(" -s, --singlethread Disable Multi Threading") << endl
197 << _(" -n, --netdebug Turn on net debugging messages") << endl
198 << _(" -o --only-port Only use port for debugging") << endl
199 << _(" -p --port-offset Port offset for debugging") << endl
200 << _(" -t, --testing Turn on special Gnash testing support") << endl
201 << _(" -a, --admin Enable the administration thread") << endl
202 << _(" -r, --root Document root for all files") << endl
208 Cygnal::getDefaultInstance()
210 // GNASH_REPORT_FUNCTION;
218 // GNASH_REPORT_FUNCTION;
222 Cygnal::loadPeersFile()
224 // GNASH_REPORT_FUNCTION;
226 loadPeersFile("./peers.conf");
228 loadPeersFile("/etc/peers.conf");
230 // Check the users home directory
232 char *home
= std::getenv("HOME");
234 //on AmigaOS we have a GNASH: assign that point to program dir
235 char *home
= "/gnash";
238 string homefile
= home
;
239 homefile
+= "/peers.conf";
241 return loadPeersFile(homefile
);
245 Cygnal::loadPeersFile(const std::string
&filespec
)
247 // GNASH_REPORT_FUNCTION;
255 vector
<string
> supported
;
257 // Make sufre the file exists
258 if (stat(filespec
.c_str(), &stats
) != 0) {
262 in
.open(filespec
.c_str());
265 log_error(_(": couldn't open file: "), filespec
);
269 // Read in each line and parse it
271 while (std::getline(in
, line
)) {
275 // Ignore comment and empty lines
276 if (line
.empty() || line
[0] == '#') {
280 std::istringstream
ss(line
);
282 // Get the first token
283 if (! (ss
>> host
)) {
288 // 'action' should never be empty, or (ss >> action)
289 // above would have failed
291 if (host
[0] == '#') {
292 continue; // discard comments
296 if (!(ss
>> portstr
)) {
297 // Do we need to warn here as well?
302 supported
.push_back(cgi
);
306 // Create a new peer item
307 boost::shared_ptr
<peer_t
> peer(new Cygnal::peer_t
);
308 peer
->hostname
= host
;
309 peer
->port
= strtol(portstr
.c_str(), NULL
, 0) & 0xffff;
311 _peers
.push_back(peer
);
320 // GNASH_REPORT_FUNCTION;
326 Cygnal::probePeers(peer_t
&peer
)
328 // GNASH_REPORT_FUNCTION;
332 uri
<< peer
.hostname
;
334 vector
<string
>::iterator it
;
335 for (it
= peer
.supported
.begin(); it
<= peer
.supported
.end(); ++it
) {
336 string tmp
= uri
.str();
338 // log_network("Constructed: %s/%s", uri.str(), *it);
340 gnash::URL
url(uri
.str());
341 if (!(peer
.fd
= net
.connectToServer(uri
.str()))) {
342 log_network(_("Couldn't connect to %s"), uri
.str());
343 peer
.connected
= false;
345 peer
.connected
= true;
346 // peer.fd = net.getFileFd();
352 Cygnal::probePeers(std::vector
<boost::shared_ptr
<peer_t
> > &peers
)
354 // GNASH_REPORT_FUNCTION;
357 std::vector
<boost::shared_ptr
<Cygnal::peer_t
> >::iterator it
;
358 for (it
= peers
.begin(); it
!= peers
.end(); ++it
) {
359 boost::shared_ptr
<Cygnal::peer_t
> peer
= *it
;
361 if (peer
->connected
) {
362 log_network(_("%s is active on fd #%d."), peer
->hostname
,
364 _active_peers
.push_back(*it
);
370 Cygnal::removeHandler(const std::string
&path
)
372 // GNASH_REPORT_FUNCTION;
373 map
<std::string
, boost::shared_ptr
<Handler
> >::iterator it
;
374 it
= _handlers
.find(path
);
375 if (it
!= _handlers
.end()) {
376 boost::mutex::scoped_lock
lock(_mutex
);
381 boost::shared_ptr
<Handler
>
382 Cygnal::findHandler(const std::string
&path
)
384 // GNASH_REPORT_FUNCTION;
385 map
<std::string
, boost::shared_ptr
<Handler
> >::iterator it
;
386 boost::shared_ptr
<Handler
> hand
;
387 it
= _handlers
.find(path
);
388 if (it
!= _handlers
.end()) {
398 std::vector
<boost::shared_ptr
<Cygnal::peer_t
> >::iterator it
;
399 for (it
= _peers
.begin(); it
!= _peers
.end(); ++it
) {
400 cerr
<< "Remote Peer: " << (*it
)->hostname
401 << ":" << (*it
)->port
<< endl
;
406 main(int argc
, char *argv
[])
408 // Initialize national language support
410 setlocale (LC_ALL
, "");
411 bindtextdomain (PACKAGE
, LOCALEDIR
);
412 textdomain (PACKAGE
);
415 const Arg_parser::Option opts
[] =
417 { 'h', "help", Arg_parser::no
},
418 { 'V', "version", Arg_parser::no
},
419 { 'p', "port-offset", Arg_parser::yes
},
420 { 'v', "verbose", Arg_parser::no
},
421 { 'd', "dump", Arg_parser::no
},
422 { 'n', "netdebug", Arg_parser::no
},
423 { 't', "testing", Arg_parser::no
},
424 { 'a', "admin", Arg_parser::no
},
425 { 'r', "root", Arg_parser::yes
},
426 { 'o', "only-port", Arg_parser::yes
},
427 { 's', "singlethreaded", Arg_parser::no
}
430 Arg_parser
parser(argc
, argv
, opts
);
431 if( ! parser
.error().empty() )
433 cout
<< parser
.error() << endl
;
437 // crcfile.loadFiles();
439 // Set the log file name before trying to write to
440 // it, or we might get two.
441 dbglogfile
.setLogFilename(crcfile
.getDebugLog());
443 if (crcfile
.verbosityLevel() > 0) {
444 dbglogfile
.setVerbosity(crcfile
.verbosityLevel());
447 if (crcfile
.getDocumentRoot().size() > 0) {
448 docroot
= crcfile
.getDocumentRoot();
450 docroot
= "/var/www/html/software/tests/";
451 crcfile
.setDocumentRoot(docroot
);
453 if (crcfile
.getPortOffset()) {
454 port_offset
= crcfile
.getPortOffset();
457 // Handle command line arguments
458 for( int i
= 0; i
< parser
.arguments(); ++i
) {
459 const int code
= parser
.code(i
);
462 version_and_copyright();
466 version_and_copyright();
469 crcfile
.setTestingFlag(true);
475 dbglogfile
.setVerbosity();
476 LOG_ONCE(log_network(_("Verbose output turned on")))
479 port_offset
= parser
.argument
<int>(i
);
480 crcfile
.setPortOffset(port_offset
);
483 docroot
= parser
.argument(i
);
486 crcfile
.setThreadingFlag(false);
490 dbglogfile
.setNetwork(true);
493 only_port
= parser
.argument
<int>(i
);
500 log_error(_("Extraneous argument: %s"), parser
.argument(i
).c_str());
504 log_network(_("Document Root for media files is: %s"), docroot
);
505 crcfile
.setDocumentRoot(docroot
);
507 // load the file of peers. A peer is another instance of Cygnal we
508 // can use for distributed processing.
514 // Trap ^C (SIGINT) so we can kill all the threads
515 act1
.sa_handler
= cntrlc_handler
;
516 sigaction (SIGINT
, &act1
, NULL
);
517 act2
.sa_handler
= hup_handler
;
518 sigaction (SIGHUP
, &act2
, NULL
);
519 // sigaction (SIGPIPE, &act, NULL);
521 // Lock a mutex the main() waits in before exiting. This is
522 // because all the actually processing is done by other threads.
523 boost::mutex::scoped_lock
lk(alldone_mutex
);
525 // Start the Admin handler. This allows one to connect to Cygnal
526 // at port 1111 and dump statistics to the terminal for tuning
529 Network::thread_params_t admin_data
;
530 admin_data
.port
= gnash::ADMIN_PORT
;
531 boost::thread
admin_thread(boost::bind(&admin_handler
, &admin_data
));
535 // cvm.loadMovie("/tmp/out.swf");
537 // If a only-port is specified, we only want to run single
538 // threaded. As all the rest of the code checks the config value
539 // setting, this overrides that in the memory, but doesn't change
540 // the file itself. This feature is really only for debugging,
541 // where it's easier to work with one protocol at a time.
543 crcfile
.setThreadingFlag(false);
546 // Incomming connection handler for port 80, HTTP and
547 // RTMPT. As port 80 requires root access, cygnal supports a
548 // "port offset" for debugging and development of the
549 // server. Since this port offset changes the constant to test
550 // for which protocol, we pass the info to the start thread so
551 // it knows which handler to invoke.
552 Network::thread_params_t
*http_data
= new Network::thread_params_t
;
553 if ((only_port
== 0) || (only_port
== gnash::HTTP_PORT
)) {
555 http_data
->netfd
= 0;
556 http_data
->filespec
= docroot
;
557 http_data
->protocol
= Network::HTTP
;
558 http_data
->port
= port_offset
+ gnash::HTTP_PORT
;
559 if (crcfile
.getThreadingFlag()) {
560 boost::thread
http_thread(boost::bind(&connection_handler
, http_data
));
562 connection_handler(http_data
);
566 // Incomming connection handler for port 1935, RTMPT and
567 // RTMPTE. This supports the same port offset as the HTTP handler,
568 // just to keep things consistent.
569 Network::thread_params_t
*rtmp_data
= new Network::thread_params_t
;
570 if ((only_port
== 0) || (only_port
== gnash::RTMP_PORT
)) {
572 rtmp_data
->netfd
= 0;
573 rtmp_data
->filespec
= docroot
;
574 rtmp_data
->protocol
= Network::RTMP
;
575 rtmp_data
->port
= port_offset
+ gnash::RTMP_PORT
;
576 if (crcfile
.getThreadingFlag()) {
577 boost::thread
rtmp_thread(boost::bind(&connection_handler
, rtmp_data
));
579 connection_handler(rtmp_data
);
583 // Wait for all the threads to die.
586 log_network(_("Cygnal done..."));
588 // Delete the data we allowcated to pass to each connection_handler.
595 // Trap Control-C (SIGINT) so we can cleanly exit
597 cntrlc_handler (int sig
)
599 log_network(_("Got a %d interrupt"), sig
);
600 // sigaction (SIGINT, &act, NULL);
604 // Trap SIGHUP so we can
606 hup_handler (int /* sig */)
608 if (crcfile
.getTestingFlag()) {
609 cerr
<< "Testing, Testing, Testing..." << endl
;
615 version_and_copyright()
617 cout
<< "Cygnal: " << BRANCH_NICK
<< "_" << BRANCH_REVNO
<< endl
619 << _("Copyright (C) 2007, 2008, 2009, 2010 Free Software Foundation, Inc.\n"
620 "Cygnal comes with NO WARRANTY, to the extent permitted by law.\n"
621 "You may redistribute copies of Cygnal under the terms of the GNU General\n"
622 "Public License V3 or later. For more information, see the file named COPYING.\n")
626 // FIXME: this function could be tweaked for better performance
628 admin_handler(Network::thread_params_t
*args
)
630 GNASH_REPORT_FUNCTION
;
634 map
<int, Handler
*>::iterator hit
;
635 stringstream response
;
638 Handler::admin_cmd_e cmd
= Handler::POLL
;
639 net
.createServer(args
->port
);
640 while (retries
> 0) {
641 log_network(_("Starting Admin Handler for port %d"), args
->port
);
642 net
.newConnection(true);
643 log_network(_("Got an incoming Admin request"));
646 Network::byte_t data
[ADMINPKTSIZE
+1];
647 memset(data
, 0, ADMINPKTSIZE
+1);
648 const char *ptr
= reinterpret_cast<const char *>(data
);
649 ret
= net
.readNet(data
, ADMINPKTSIZE
, 100);
651 log_network(_("no more admin data, exiting...\n"));
652 if ((ret
== 0) && cmd
!= Handler::POLL
) {
656 // force the case to make comparisons easier. Only compare enough characters to
657 // till each command is unique.
658 std::transform(ptr
, ptr
+ ret
, data
, (int(*)(int)) toupper
);
659 if (strncmp(ptr
, "QUIT", 4) == 0) {
661 } else if (strncmp(ptr
, "STATUS", 5) == 0) {
662 cmd
= Handler::STATUS
;
663 } else if (strncmp(ptr
, "HELP", 2) == 0) {
665 net
.writeNet("commands: help, status, poll, interval, statistics, quit.\n");
666 } else if (strncmp(ptr
, "POLL", 2) == 0) {
668 } else if (strncmp(ptr
, "INTERVAL", 2) == 0) {
669 cmd
= Handler::INTERVAL
;
673 // close this connection
677 case Handler::STATUS
:
679 #ifdef USE_STATS_CACHE
681 string results
= cache
.stats(false);
682 if (results
.size()) {
683 net
.writeNet(results
);
688 response
<< handlers
.size() << " handlers are currently active.";
689 for (hit
= handlers
.begin(); hit
!= handlers
.end(); hit
++) {
691 Handler
*hand
= hit
->second
;
692 response
<< fd
<< ","
694 << "," << hand
->outsize()
696 net
.writeNet(response
);
702 #ifdef USE_STATS_QUEUE
703 response
<< handlers
.size() << " handlers are currently active." << "\r\n";
704 for (hit
= handlers
.begin(); hit
!= handlers
.end(); ++hit
) {
706 Handler
*hand
= hit
->second
;
708 clock_gettime (CLOCK_REALTIME
, &now
);
709 // Incoming que stats
710 CQue::que_stats_t
*stats
= hand
->statsin();
711 float diff
= static_cast<float>(((now
.tv_sec
-
712 stats
->start
.tv_sec
) + ((now
.tv_nsec
-
713 stats
->start
.tv_nsec
)/1e9
)));
715 << "," << stats
->totalbytes
717 << "," << stats
->totalin
718 << "," << stats
->totalout
;
719 // Outgoing que stats
720 stats
= hand
->statsout();
721 response
<< "," <<stats
->totalbytes
722 << "," << stats
->totalin
723 << "," << stats
->totalout
725 net
.writeNet(response
.str());
729 case Handler::INTERVAL
:
730 net
.writeNet("set interval\n");
736 log_network(_("admin_handler: Done...!\n"));
737 net
.closeNet(); // this shuts down this socket connection
739 net
.closeConnection(); // this shuts down the server on this connection
741 // All threads should exit now.
742 alldone
.notify_all();
745 // A connection handler is started for each port the server needs to
746 // wait on for incoming connections. When it gets an incoming
747 // connection, it reads the first packet to get the resource name, and
748 // then starts the event handler thread if it's a newly requested
749 // resource, otherwise it loads a copy of the cached resource.
751 connection_handler(Network::thread_params_t
*args
)
753 // GNASH_REPORT_FUNCTION;
760 net
.toggleDebug(true);
762 // Start a server on this tcp/ip port.
763 fd
= net
.createServer(args
->port
);
765 log_error(_("Can't start %s Connection Handler for fd #%d, port %hd"),
766 proto_str
[args
->protocol
], fd
, args
->port
);
769 log_network(_("Starting %s Connection Handler for fd #%d, port %hd"),
770 proto_str
[args
->protocol
], fd
, args
->port
);
773 // Get the number of cpus in this system. For multicore
774 // systems we'll get better load balancing if we keep all the
775 // cpus busy. So a pool of threads is started for each cpu,
776 // the default being just one. Each thread is reponsible for
777 // handling part of the total active file descriptors.
779 long ncpus
= sysconf(_SC_NPROCESSORS_ONLN
);
780 LOG_ONCE(log_network(_("This system has %d cpus."), ncpus
));
782 size_t nfds
= crcfile
.getFDThread();
784 // log_network("This system is configured for %d file descriptors to be watched by each thread.", nfds);
786 // Get the next thread ID to hand off handling this file
787 // descriptor to. If the limit for threads per cpu hasn't been
788 // set or is set to 0, assume one thread per processor by
789 // default. There won't even be threads for each cpu if
790 // threading has been disabled in the cygnal config file.
795 spawn_limit
= ncpus
* nfds
;
798 // FIXME: this may run forever, we probably want a cleaner way to
799 // test for the end of time.
801 net
.setPort(args
->port
);
803 net
.toggleDebug(true);
806 // Rotate in a range of 0 to the limit.
807 tid
= (tid
+ 1) % (spawn_limit
+ 1);
808 // log_network("%s handler: thread ID #%d, fd #%d", proto_str[args->protocol], tid, fd);
810 // Wait for a connection to this tcp/ip from a client. If set
811 // to true, this will block until a request comes in. If set
812 // to single threaded mode, this will only allow one client to
813 // connect at a time. This is to make it easier to debug
814 // things when you have a heavily threaded application.
815 args
->netfd
= net
.newConnection(true, fd
);
816 if (args
->netfd
<= 0) {
817 log_network(_("No new %s network connections"),
818 proto_str
[args
->protocol
]);
821 log_network(_("*** New %s network connection for thread ID #%d, fd #%d ***"),
822 proto_str
[args
->protocol
], tid
, args
->netfd
);
826 // Setup HTTP handler
828 if (args
->protocol
== Network::HTTP
) {
829 Network::thread_params_t
*hargs
= new Network::thread_params_t
;
830 // std::copy(args, args+sizeof(Network::thread_params_t), &hargs);
831 hargs
->protocol
= args
->protocol
;
832 hargs
->netfd
= args
->netfd
;
834 boost::shared_ptr
<Handler
> hand
= cyg
.findHandler(path
);
835 HTTPServer
*http
= new HTTPServer
;
837 http
->setDocRoot(crcfile
.getDocumentRoot());
838 boost::shared_ptr
<cygnal::Buffer
> buf(http
->peekChunk());
839 http
->processHeaderFields(*buf
);
840 string hostname
, path
;
841 string::size_type pos
= http
->getField("host").find(":", 0);
842 if (pos
!= string::npos
) {
843 hostname
+= http
->getField("host").substr(0, pos
);
845 hostname
+= "localhost";
847 path
= http
->getFilespec();
848 string key
= hostname
+ path
;
854 hand
->addClient(args
->netfd
, Network::HTTP
);
856 cygnal::Buffer
*buf
= 0;
858 buf
= hand
->parseFirstRequest(args
->netfd
, Network::HTTP
);
866 string
&key
= hand
->getKey(args
->netfd
);
867 log_network(_("Creating new %s Handler for %s using fd #%d"),
868 proto_str
[hargs
->protocol
], key
, hargs
->netfd
);
869 hargs
->handler
= hand
;
871 hargs
->filespec
= key
;
872 // cyg.addHandler(key, hand);
874 // If in multi-threaded mode (the default), start a thread
875 // with a connection_handler for each port we're interested
876 // in. Each port of could have a different protocol.
877 boost::bind(event_handler
, hargs
);
878 if (crcfile
.getThreadingFlag() == true) {
879 boost::thread
event_thread(boost::bind(&event_handler
, hargs
));
881 event_handler(hargs
);
882 // We're done, close this network connection
885 log_network(_("Reusing %s Handler for %s using fd #%d"),
886 proto_str
[hargs
->protocol
], key
, hargs
->netfd
);
887 hand
->addClient(args
->netfd
, Network::HTTP
);
893 // Setup RTMP handler
895 if (args
->protocol
== Network::RTMP
) {
896 Network::thread_params_t
*rargs
= new Network::thread_params_t
;
897 rargs
->protocol
= args
->protocol
;
898 rargs
->netfd
= args
->netfd
;
899 RTMPServer
*rtmp
= new RTMPServer
;
900 boost::shared_ptr
<cygnal::Element
> tcurl
=
901 rtmp
->processClientHandShake(args
->netfd
);
903 // log_error("Couldn't read the tcUrl variable!");
904 rtmp
->closeNet(args
->netfd
);
907 URL
url(tcurl
->to_string());
908 string key
= url
.hostname() + url
.path();
909 boost::shared_ptr
<Handler
> hand
= cyg
.findHandler(url
.path());
911 log_network(_("Creating new %s Handler for: %s for fd %#d"),
912 proto_str
[args
->protocol
], key
, args
->netfd
);
913 hand
.reset(new Handler
);
914 cyg
.addHandler(key
, hand
);
916 hand
->setNetConnection(rtmp
->getNetConnection());
917 std::vector
<boost::shared_ptr
<Cygnal::peer_t
> >::iterator it
;
918 std::vector
<boost::shared_ptr
<Cygnal::peer_t
> > active
= cyg
.getActive();
919 for (it
= active
.begin(); it
< active
.end(); ++it
) {
920 Cygnal::peer_t
*peer
= (*it
).get();
921 hand
->addRemote(peer
->fd
);
923 hand
->addClient(args
->netfd
, Network::RTMP
);
924 rargs
->handler
= reinterpret_cast<void *>(hand
.get());
925 args
->filespec
= key
;
929 char *env
= std::getenv("CYGNAL_PLUGINS");
933 if (crcfile
.getCgiRoot().size() > 0) {
934 cgiroot
+= ":" + crcfile
.getCgiRoot();
935 log_network(_("Cygnal Plugin paths are: %s"), cgiroot
);
937 cgiroot
= PLUGINSDIR
;
939 hand
->scanDir(cgiroot
);
940 boost::shared_ptr
<Handler::cygnal_init_t
> init
=
941 hand
->initModule(url
.path());
943 // this is where the real work gets done.
945 // If in multi-threaded mode (the default), start a thread
946 // with a connection_handler for each port we're interested
947 // in. Each port of course has a different protocol.
948 if (crcfile
.getThreadingFlag() == true) {
949 boost::thread
event_thread(boost::bind(&event_handler
, args
));
952 // We're done, close this network connection
953 net
.closeNet(args
->netfd
);
956 log_error(_("Couldn't load plugin for %s"), key
);
959 // // We're done, close this network connection
960 // if (crcfile.getThreadingFlag() == true) {
961 // net.closeNet(args->netfd);
967 log_network(_("Number of active Threads is %d"), tids
.num_of_tids());
969 // net.closeNet(args->netfd); // this shuts down this socket connection
970 log_network(_("Restarting loop for next connection for port %d..."),
974 // All threads should wake up now.
975 alldone
.notify_all();
977 } // end of connection_handler
980 event_handler(Network::thread_params_t
*args
)
982 GNASH_REPORT_FUNCTION
;
984 Network::thread_params_t largs
;
985 // std::copy(args, args+sizeof(Network::thread_params_t), &largs);
986 Handler
*hand
= reinterpret_cast<Handler
*>(args
->handler
);
988 largs
.protocol
= args
->protocol
;
989 largs
.netfd
= args
->netfd
;
990 largs
.port
= args
->port
;
991 largs
.buffer
= args
->buffer
;
992 largs
.entry
= args
->entry
;
993 largs
.filespec
= args
->filespec
;
1002 FD_SET(args
->netfd
, &hits
);
1006 // We need to calculate the highest numbered file descriptor
1007 // for select. We may want to do this elsewhere, as it could
1008 // be a performance hit as the number of file descriptors gets
1010 log_debug("Handler has %d clients attached, %d threads",
1011 hand
->getClients().size(), tids
.num_of_tids());
1014 for (size_t i
= 0; i
<hand
->getClients().size(); i
++) {
1015 log_debug("Handler client[%d] is: %d", i
, hand
->getClient(i
));
1016 if (hand
->getClient(i
) >= max
) {
1017 max
= hand
->getClient(i
);
1024 // If we have active disk streams, send those packets first.
1025 // 0 is a reserved stream, so we start with 1, as the reserved
1026 // stream isn't one we care about here.
1027 if (hand
->getActiveDiskStreams()) {
1028 log_network(_("%d active disk streams"),
1029 hand
->getActiveDiskStreams());
1033 boost::shared_ptr
<DiskStream
> filestream(cache
.findFile(args
->filespec
));
1041 boost::shared_ptr
<DiskStream
> ds
;
1042 for (int i
=1; i
<= hand
->getActiveDiskStreams(); i
++) {
1043 ds
= hand
->getDiskStream(i
);
1046 // Only play the next chunk of the file.
1047 //log_network("Sending following chunk of %s", ds->getFilespec());
1048 if (ds
->play(i
, false)) {
1049 if (ds
->getState() == DiskStream::CLOSED
) {
1050 net
.closeNet(args
->netfd
);
1051 hand
->removeClient(args
->netfd
);
1055 // something went wrong, the stream failed
1056 net
.closeNet(args
->netfd
);
1057 hand
->removeClient(args
->netfd
);
1063 // See if we have any data waiting behind any of the file
1065 for (int i
=0; i
<= max
+ 1; i
++) {
1066 if (FD_ISSET(i
, &hits
)) {
1068 log_network(_("Got a hit for fd #%d, protocol %s"), i
,
1069 proto_str
[hand
->getProtocol(i
)]);
1070 switch (hand
->getProtocol(i
)) {
1072 log_error(_("No protocol specified!"));
1077 // largs.filespec = fullpath;
1078 boost::shared_ptr
<HTTPServer
> &http
= hand
->getHTTPHandler(i
);
1079 if (!http
->http_handler(hand
, args
->netfd
, args
->buffer
)) {
1080 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i
, args
->filespec
);
1081 net
.closeNet(args
->netfd
);
1082 hand
->removeClient(args
->netfd
);
1085 log_network(_("Not Done with HTTP connection for fd #%d, it's a persistent connection."), i
);
1092 // args->filespec = path;
1093 if (!rtmp_handler(args
)) {
1094 log_network(_("Done with RTMP connection for fd #%d, CGI "), i
, args
->filespec
);
1098 case Network::RTMPT
:
1100 net
.setTimeout(timeout
);
1102 boost::shared_ptr
<HTTPServer
> &http
= hand
->getHTTPHandler(i
);
1103 // args->filespec = path;
1104 if (!http
->http_handler(hand
, args
->netfd
, args
->buffer
)) {
1105 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i
, largs
.filespec
);
1110 case Network::RTMPTS
:
1113 // args->filespec = path;
1114 boost::shared_ptr
<HTTPServer
> &http
= hand
->getHTTPHandler(i
);
1115 if (!http
->http_handler(hand
, args
->netfd
, args
->buffer
)) {
1116 log_network(_("Done with HTTP connection for fd #%d, CGI %s"), i
, args
->filespec
);
1121 case Network::RTMPE
:
1123 case Network::RTMPS
:
1128 log_error(_("Unsupported network protocol for fd #%d, %d"),
1129 largs
.netfd
, hand
->getProtocol(i
));
1133 // delete args->buffer;
1137 // // Clear the current message so next time we read new data
1138 // args->buffer->clear();
1139 // largs.buffer->clear();
1141 // Wait for something from one of the file descriptors. This timeout
1142 // is the time between sending packets to the client when there is
1143 // no client input, which effects the streaming speed of big files.
1145 hits
= net
.waitForNetData(hand
->getClients());
1146 if (FD_ISSET(0, &hits
)) {
1148 log_network(_("Got no hits, %d retries"), retries
);
1149 // net.closeNet(args->netfd);
1150 // hand->removeClient(args->netfd);
1155 if (retries
>= 10) {
1156 net
.closeNet(args
->netfd
);
1157 hand
->removeClient(args
->netfd
);
1165 } // end of event_handler
1169 // indent-tabs-mode: nil