Merge branch 'master' of git.sv.gnu.org:/srv/git/gnash
[gnash.git] / cygnal / rtmp_server.cpp
blobf92c632077bb1cb4961d53fb588bc90cb6ef1139
1 // rtmp.cpp: Adobe/Macromedia Real Time Message Protocol handler, for Gnash.
2 //
3 // Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Free Software
4 // Foundation, Inc
5 //
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 #ifdef HAVE_CONFIG_H
22 #include "gnashconfig.h"
23 #endif
25 #include <iostream>
26 #include <string>
27 #include <map>
28 #include <cstdlib>
29 #include <cstdio>
31 #include <boost/cstdint.hpp>
32 #include <boost/shared_ptr.hpp>
33 #include <boost/detail/endian.hpp>
34 #include <boost/random/uniform_real.hpp>
35 #include <boost/random/uniform_int.hpp>
36 #include <boost/random/mersenne_twister.hpp>
37 #include <boost/lexical_cast.hpp>
39 #if ! (defined(_WIN32) || defined(WIN32))
40 # include <netinet/in.h>
41 #endif
43 #include "log.h"
44 #include "URL.h"
45 #include "amf.h"
46 #include "rtmp.h"
47 #include "rtmp_server.h"
48 #include "network.h"
49 #include "element.h"
50 #include "handler.h"
51 #include "utility.h"
52 #include "buffer.h"
53 #include "GnashSleep.h"
54 #include "crc.h"
55 #include "cache.h"
56 #include "diskstream.h"
57 #ifdef HAVE_SYS_TIME_H
58 # include <sys/time.h>
59 #endif
60 using namespace gnash;
61 using namespace std;
63 namespace cygnal
66 // Get access to the global config data for Cygnal
67 static CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();
69 // Get access to the global Cygnal cache
70 static Cache& cache = Cache::getDefaultInstance();
72 extern map<int, Handler *> handlers;
74 RTMPServer::RTMPServer()
75 : _filesize(0),
76 _streamid(1)
78 // GNASH_REPORT_FUNCTION;
79 // _inbytes = 0;
80 // _outbytes = 0;
82 // _body = new unsigned char(RTMP_HANDSHAKE_SIZE+1);
83 // memset(_body, 0, RTMP_HANDSHAKE_SIZE+1);
86 RTMPServer::~RTMPServer()
88 // GNASH_REPORT_FUNCTION;
89 _properties.clear();
90 // delete _body;
94 boost::shared_ptr<cygnal::Element>
95 RTMPServer::processClientHandShake(int fd)
97 GNASH_REPORT_FUNCTION;
99 log_network(_("Processing RTMP Handshake for fd #%d"), fd);
101 #ifdef USE_STATISTICS
102 struct timespec start;
103 clock_gettime (CLOCK_REALTIME, &start);
104 #endif
106 // Adjust the timeout for reading from the network
107 RTMP::setTimeout(10);
109 // These store the information we need from the initial
110 /// NetConnection object.
111 boost::scoped_ptr<cygnal::Element> nc;
112 boost::shared_ptr<cygnal::Buffer> pkt;
113 boost::shared_ptr<cygnal::Element> tcurl;
114 boost::shared_ptr<cygnal::Element> swfurl;
115 boost::shared_ptr<cygnal::Element> encoding;
117 // RTMP::rtmp_headersize_e response_head_size = RTMP::HEADER_12;
119 // Read the handshake bytes sent by the client when requesting
120 // a connection.
121 boost::shared_ptr<cygnal::Buffer> handshake1 = RTMP::recvMsg(fd);
122 // See if we have data in the handshake, we should have 1537 bytes
123 if (!handshake1) {
124 log_error("Failed to read the handshake from the client.");
125 return tcurl; // nc is empty
126 } else {
127 log_network("Read first handshake from the client.");
130 // Send our response to the handshake, which primarily is the bytes
131 // we just received.
132 handShakeResponse(fd, *handshake1);
134 // Read the response from the client from the handshale reponse we
135 // just sent.
136 boost::shared_ptr<cygnal::Buffer> handshake2 = RTMP::recvMsg(fd);
137 // See if we have data in the handshake, we should have 1536 bytes
138 if (handshake2 == 0) {
139 log_error("failed to read the handshake from the client.");
140 return tcurl; // nc is empty
141 } else {
142 log_network("Read second handshake from the client.");
145 // Don't assume the data we just read is a handshake.
146 pkt = serverFinish(fd, *handshake1, *handshake2);
147 // Wmake sure we got data before trying to process it
148 if (!pkt) {
149 log_error("Didn't receive any data in handshake!");
150 tcurl.reset(new cygnal::Element);
151 return tcurl; // nc is empty
154 // the packet is a raw RTMP message. Since the header can be a
155 // variety of sizes, and this effects the data size, we need to
156 // decode that first.
157 boost::shared_ptr<RTMP::rtmp_head_t> qhead = RTMP::decodeHeader(pkt->reference());
159 if (!qhead) {
160 log_error("RTMP header had parsing error!");
161 return tcurl; // nc is empty
164 // We know the first packet is always a NetConnection INVOKE of
165 // the connect() method. These are usually around 300-400 bytes in
166 // testing, so anything larger than that is suspicios.
167 if (qhead->bodysize > 1024) {
168 log_error("NetConnection unusually large! %d", qhead->bodysize);
171 // Get the actual start of the data
172 boost::uint8_t *ptr = pkt->reference() + qhead->head_size;
174 // See if we have enough data to go past the chunksize, which is
175 // probable. If so, all chunks are the default size of 128, the
176 // same size as used for video packets. This means every chunksize
177 // boundary is an RTMP header byte that must be removed, or the
178 // data in the NetConnection::connect() packet will be
179 // corrupted. There is probably a better way to do this, but for
180 // now build a copy of the data but skip over the RTMP header
181 // bytes every chunk size biundary. All RTMP headers at this stage
182 // are 1 byte ones.
183 boost::scoped_ptr<cygnal::Buffer> newptr(new cygnal::Buffer(qhead->bodysize));
184 if (qhead->bodysize > RTMP_VIDEO_PACKET_SIZE) {
185 log_network("De chunkifying the NetConnection packet.");
186 int nbytes = 0;
187 while (nbytes < qhead->bodysize) {
188 size_t chunk = RTMP_VIDEO_PACKET_SIZE;
189 if ((qhead->bodysize - nbytes) < RTMP_VIDEO_PACKET_SIZE) {
190 chunk = qhead->bodysize - nbytes;
192 newptr->append(ptr + nbytes, chunk);
193 nbytes += chunk + 1;
195 } else {
196 newptr->copy(ptr, qhead->bodysize);
199 // extract the body of the message from the packet
200 _netconnect = RTMP::decodeMsgBody(newptr->begin(), qhead->bodysize);
201 if (!_netconnect) {
202 log_error("failed to read the body of the handshake data from the client.");
203 return tcurl; // nc is empty
204 } else {
205 log_network("Read handshake data body from the client.");
208 // make sure this is actually a NetConnection packet.
209 if (_netconnect->getMethodName() != "connect") {
210 log_error("Didn't receive NetConnection object in handshake!");
211 return tcurl; // nc is empty
212 } else {
213 log_network("Got NetConnection ::connect() INVOKE.");
214 _netconnect->dump(); // FIXME: debug crap
217 // Get the data for the fields we want.
218 tcurl = _netconnect->findProperty("tcUrl");
219 swfurl = _netconnect->findProperty("swfUrl");
220 encoding = _netconnect->findProperty("objectEncoding");
222 // based on the Red5 tests, I see two behaviours with this next
223 // packet. If only gets sent when the "objectEncoding" field of
224 // the NetConnection object is in the initial packet. When this is
225 // supplied, it's more remoting than streaming, so sending this
226 // causes Async I/O errors in the client.
227 if (!encoding) {
228 // Send a onBWDone to the client to start the new NetConnection,
229 boost::shared_ptr<cygnal::Buffer> bwdone = encodeBWDone(2.0);
230 if (RTMP::sendMsg(fd, qhead->channel, RTMP::HEADER_8,
231 bwdone->size(), RTMP::INVOKE, RTMPMsg::FROM_SERVER, *bwdone)) {
232 log_network("Sent onBWDone to client");
233 } else {
234 log_error("Couldn't send onBWDone to client!");
235 tcurl.reset();
236 return tcurl; // nc is empty
240 // Send a Set Client Window Size to the client
241 boost::shared_ptr<cygnal::Buffer> winsize(new cygnal::Buffer(sizeof(boost::uint32_t)));
242 boost::uint32_t swapped = 0x20000;
243 swapBytes(&swapped, sizeof(boost::uint32_t));
244 *winsize += swapped;
245 if (RTMP::sendMsg(fd, RTMP_SYSTEM_CHANNEL, RTMP::HEADER_12,
246 winsize->size(), RTMP::WINDOW_SIZE, RTMPMsg::FROM_CLIENT, *winsize)) {
247 log_network("Sent set Client Window Size to client");
248 } else {
249 log_error("Couldn't send set Client Window Size to client!");
250 tcurl.reset();
251 return tcurl; // nc is empty
254 // Send a ping to the client to reset the new NetConnection,
255 boost::shared_ptr<cygnal::Buffer> ping_reset =
256 encodePing(RTMP::PING_RESET, 0);
257 if (RTMP::sendMsg(fd, RTMP_SYSTEM_CHANNEL, RTMP::HEADER_8,
258 ping_reset->size(), RTMP::USER, RTMPMsg::FROM_SERVER, *ping_reset)) {
259 log_network("Sent Ping to client");
260 } else {
261 log_error("Couldn't send Ping to client!");
262 tcurl.reset();
263 return tcurl; // nc is empty
266 // Send the packet to notify the client that the
267 // NetConnection::connect() was sucessful. After the client
268 // receives this, the handhsake is completed.
269 boost::shared_ptr<cygnal::Buffer> response =
270 encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
271 if (RTMP::sendMsg(fd, 3, RTMP::HEADER_8, response->allocated(),
272 RTMP::INVOKE, RTMPMsg::FROM_SERVER, *response)) {
273 log_network("Sent response to client.");
274 } else {
275 log_error("Couldn't send response to client!");
276 tcurl.reset();
277 return tcurl; // nc is empty
280 return tcurl;
283 // The response is the gibberish sent back twice, preceeded by a byte
284 // with the value of 0x3. We have to very carefully send the handshake
285 // in one big packet as doing otherwise seems to cause subtle timing
286 // problems with the Adobe player. This way it connects every time.
287 bool
288 RTMPServer::handShakeResponse(int fd, cygnal::Buffer &handshake)
290 GNASH_REPORT_FUNCTION;
292 boost::uint8_t byte;
293 byte = RTMP_VERSION;
295 // the response handshake is twice the size of the one we just
296 // received for a total of 3072 bytes, plus room for the version.
297 boost::scoped_ptr<cygnal::Buffer> zeros(new cygnal::Buffer(RTMP_HANDSHAKE_SIZE*2
298 + RTMP_HANDSHAKE_VERSION_SIZE));
299 zeros->clear(); // set entire buffer to zeros
301 boost::uint8_t *ptr = zeros->reference();
303 // the first byte of the handshake response is the RTMP version
304 // number.
305 *ptr = RTMP_VERSION;
307 // the first half we make all zeros, as it doesn't appear to be
308 // used for anything. More data is the second half of the
309 // response.
310 zeros->setSeekPointer(ptr + RTMP_HANDSHAKE_VERSION_SIZE +
311 RTMP_HANDSHAKE_SIZE);
313 // the handhshake has a two field header, which appears to be
314 // timestamp, followed by another field that appears to be another
315 // timestamp or version number, which is probably ignored.
316 // the first field of the header is the timestamp
317 boost::uint32_t timestamp;
318 // Get the timestamp of when this message was read
319 timestamp = RTMP::getTime();
320 *zeros += timestamp;
322 // the second field is always zero
323 boost::uint32_t pad = 0;
324 *zeros += pad;
326 // the data starts after the vesion and header bytes
327 size_t offset = RTMP_HANDSHAKE_VERSION_SIZE + RTMP_HANDSHAKE_HEADER_SIZE;
329 // add the handshake data, which is 1528 byte of random stuff.
330 zeros->append(handshake.reference() + offset, RTMP_RANDOM_SIZE);
332 // send the handshake to the client
333 size_t ret = writeNet(fd, *zeros);
335 if (ret == zeros->allocated()) {
336 log_network("Sent RTMP Handshake response at %d", timestamp);
337 } else {
338 log_error("Couldn't sent RTMP Handshake response at %d!", timestamp);
341 return true;
344 boost::shared_ptr<cygnal::Buffer>
345 RTMPServer::serverFinish(int fd, cygnal::Buffer &handshake1, cygnal::Buffer &handshake2)
347 GNASH_REPORT_FUNCTION;
348 boost::shared_ptr<cygnal::Buffer> buf;
350 // sanity check our input data. We do this seperately as an empty
351 // buffer means data wasn't read correctly from the network. We
352 // should never get this far with bad data, but when it comes to
353 // network programming, a little caution is always good.
354 if (handshake1.empty()) {
355 log_error("No data in original handshake buffer.");
356 return buf; // return empty buffer
358 if (handshake2.empty()) {
359 log_error("No data in response handshake buffer.");
360 return buf; // return empty buffer
363 // the first field of the header is the timestamp of the original
364 // packet sent by this server.
365 boost::uint32_t timestamp1 = *reinterpret_cast<boost::uint32_t *>
366 (handshake1.reference() + RTMP_HANDSHAKE_VERSION_SIZE);
368 // the second field of the header is the timestamp of the previous
369 // packet sent by this server.
370 boost::uint32_t timestamp2 = *reinterpret_cast<boost::uint32_t *>
371 (handshake1.reference() + RTMP_HANDSHAKE_VERSION_SIZE + sizeof(boost::uint32_t));
373 log_network("The timestamp delta is %d", timestamp2 - timestamp1);
375 // This is the location in the second handshake to the random data
376 // block used in the handshake.
377 size_t pkt_size = RTMP_HANDSHAKE_VERSION_SIZE + RTMP_HANDSHAKE_SIZE;
378 // the handshakes are supposed to match.
379 int diff = std::memcmp(handshake1.begin()
380 + RTMP_HANDSHAKE_VERSION_SIZE + RTMP_HANDSHAKE_HEADER_SIZE,
381 handshake2.begin()
382 + pkt_size + RTMP_HANDSHAKE_HEADER_SIZE,
383 RTMP_RANDOM_SIZE);
384 if (diff <= 1) {
385 log_network (_("Handshake Finish Data matched"));
386 } else {
387 log_error (_("Handshake Finish Data didn't match by %d bytes"), diff);
388 // return buf; // return empty buffer
391 // Copy the extra data from the end of the handshake to the new
392 // buffer. Normally we try to avoid copying anything around, but
393 // as this is only used once for each connection, there isn't a
394 // real performance hit from it.
395 size_t amf_size = handshake2.allocated() - pkt_size;
396 if (handshake2.allocated() >= pkt_size) {
397 log_network("Got AMF data in handshake, %d bytes for fd #%d",
398 amf_size, fd);
399 buf.reset(new Buffer(amf_size));
400 // populate the buffer with the AMF data
401 boost::uint8_t *ptr = handshake2.reference() + RTMP_HANDSHAKE_SIZE;
402 buf->copy(ptr, amf_size);
405 return buf;
408 bool
409 RTMPServer::packetSend(cygnal::Buffer &/* buf */)
411 GNASH_REPORT_FUNCTION;
412 return false;
415 // This overrides using same method from the base RTMP class.
416 bool
417 RTMPServer::packetRead(cygnal::Buffer &buf)
419 GNASH_REPORT_FUNCTION;
421 boost::uint8_t amf_index, headersize;
422 boost::uint8_t *ptr = buf.reference();
423 AMF amf;
425 if (ptr == 0) {
426 return false;
429 // cerr << "FIXME3: " << buf.hexify(true) << endl;
431 // ptr += 1; // skip past the header byte
433 amf_index = *ptr & RTMP_INDEX_MASK;
434 headersize = headerSize(*ptr);
435 log_network (_("The Header size is: %d"), headersize);
436 log_network (_("The AMF index is: 0x%x"), amf_index);
438 // if (headersize > 1) {
439 // packetsize = parseHeader(ptr);
440 // if (packetsize) {
441 // log_network (_("Read first RTMP packet header of size %d"), packetsize);
442 // } else {
443 // log_error (_("Couldn't read first RTMP packet header"));
444 // return false;
445 // }
446 // }
448 // #if 1
449 // boost::uint8_t *end = buf->remove(0xc3);
450 // #else
451 // boost::uint8_t *end = buf->find(0xc3);
452 // log_network("END is %x", (void *)end);
453 // *end = '*';
454 // #endif
455 decodeHeader(ptr);
456 ptr += headersize;
458 boost::uint8_t* tooFar = ptr+300+sizeof(int); // FIXME:
460 AMF amf_obj;
461 boost::shared_ptr<cygnal::Element> el1 = amf_obj.extractAMF(ptr, tooFar);
462 ptr += amf_obj.totalsize();
463 boost::shared_ptr<cygnal::Element> el2 = amf_obj.extractAMF(ptr, tooFar);
465 int size = 0;
466 boost::shared_ptr<cygnal::Element> el;
467 while ( size < static_cast<boost::uint16_t>(_header.bodysize) - 24 ) {
468 if (ptr) {
469 el = amf_obj.extractProperty(ptr, tooFar);
470 if (el != 0) {
471 size += amf_obj.totalsize();
472 ptr += amf_obj.totalsize();
473 // _properties[el->getName()] = el;
474 } else {
475 break;
477 // log_network("Bodysize is: %d size is: %d for %s", _total_size, size, el->getName());
478 } else {
479 break;
483 # if 0
484 Element el;
485 ptr = amf.extractElement(&el, ptr);
486 el.dump();
487 ptr = amf.extractElement(&el, ptr) + 1;
488 el.dump();
489 log_network (_("Reading AMF packets till we're done..."));
490 // buf->dump();
491 while (ptr < end) {
492 boost::shared_ptr<cygnal::Element> el(new cygnal::Element);
493 ptr = amf.extractProperty(el, ptr);
494 addProperty(el);
495 // el->dump();
497 ptr += 1;
498 size_t actual_size = _total_size - RTMP_HEADER_SIZE;
499 log_network("Total size in header is %d, buffer size is: %d", _total_size, buf->size());
500 // buf->dump();
501 if (buf->size() < actual_size) {
502 log_network("FIXME: MERGING");
503 buf = _que->merge(buf);
505 while ((ptr - buf->begin()) < static_cast<int>(actual_size)) {
506 boost::shared_ptr<cygnal::Element> el(new cygnal::Element);
507 if (ptr) {
508 ptr = amf.extractProperty(el, ptr);
509 addProperty(el);
510 } else {
511 return true;
513 el->dump(); // FIXME: dump the AMF objects as they are read in
516 RTMPproto::dump();
517 #endif
518 switch(_header.type) {
519 case CHUNK_SIZE:
520 decodeChunkSize();
521 break;
522 case BYTES_READ:
523 decodeBytesRead();
524 break;
525 case USER:
527 boost::shared_ptr<rtmp_ping_t> ping = decodePing(ptr);
528 switch (ping->type) {
529 case PING_CLEAR:
530 break;
531 case PING_PLAY:
532 break;
533 case PING_TIME:
534 break;
535 case PING_RESET:
536 break;
537 case PING_CLIENT:
538 break;
539 case PONG_CLIENT:
540 break;
541 default:
542 return 0;
543 break;
545 break;
547 case WINDOW_SIZE:
548 decodeServer();
549 break;
550 case SET_BANDWITH:
551 decodeClient();
552 break;
553 case ROUTE:
554 log_unimpl("Route");
555 break;
556 case AUDIO_DATA:
557 decodeAudioData();
558 break;
559 case VIDEO_DATA:
560 decodeVideoData();
561 break;
562 case SHARED_OBJ:
563 decodeSharedObj();
564 break;
565 case AMF3_NOTIFY:
566 log_unimpl("AMF3 Notify");
567 break;
568 case AMF3_SHARED_OBJ:
569 log_unimpl("AMF3 Shared Object");
570 break;
571 case AMF3_INVOKE:
572 log_unimpl("AMF3 Invoke");
573 break;
574 case NOTIFY:
575 decodeNotify();
576 break;
577 case INVOKE:
578 decodeInvoke();
579 break;
580 case FLV_DATA:
581 log_unimpl("FLV Dat");
582 break;
583 default:
584 log_error (_("ERROR: Unidentified RTMP message content type 0x%x"), _header.type);
585 break;
588 return true;
591 // A result packet looks like this:
593 // 03 00 00 00 00 00 81 14 00 00 00 00 02 00 07 5f ..............._
594 // 72 65 73 75 6c 74 00 3f f0 00 00 00 00 00 00 05 result.?........
595 // 03 00 0b 61 70 70 6c 69 63 61 74 69 6f 6e 05 00 ...application..
596 // 05 6c 65 76 65 6c 02 00 06 73 74 61 74 75 73 00 .level...status.
597 // 0b 64 65 73 63 72 69 70 74 69 6f 6e 02 00 15 43 .description...C
598 // 6f 6e 6e 65 63 74 69 6f 6e 20 73 75 63 63 65 65 onnection succee
599 // 64 65 64 2e 00 04 63 6f 64 65 02 00 1d 4e 65 74 ded...code...Net
600 // 43 6f 6e 6e 65 63 74 69 6f 6e 2e 43 6f 6e 6e 65 Connection.Conne
601 // 63 74 2e 53 75 63 63 65 73 73 00 00 c3 09 ct.Success....
603 // _result(double ClientStream, NULL, double ServerStream)
604 // These are handlers for the various types
605 boost::shared_ptr<Buffer>
606 RTMPServer::encodeResult(RTMPMsg::rtmp_status_e status)
608 // GNASH_REPORT_FUNCTION;
609 return encodeResult(status, _filespec, _streamid);
612 boost::shared_ptr<cygnal::Buffer>
613 RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, const std::string &filename)
615 // GNASH_REPORT_FUNCTION;
616 double clientid = 0.0;
617 return encodeResult(status, filename, _streamid, clientid);
620 boost::shared_ptr<cygnal::Buffer>
621 RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, const std::string &filename, double &clientid)
623 // GNASH_REPORT_FUNCTION;
624 return encodeResult(status, filename, _streamid, clientid);
627 boost::shared_ptr<cygnal::Buffer>
628 RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, double &transid)
630 // GNASH_REPORT_FUNCTION;
631 double clientid = 0.0;
632 return encodeResult(status, "", transid, clientid);
635 boost::shared_ptr<cygnal::Buffer>
636 RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, const std::string &filename, double &transid, double &clientid)
638 // GNASH_REPORT_FUNCTION;
639 // Buffer *buf = new Buffer;
640 // boost::uint8_t *ptr = buf->reference();
641 // buf->clear(); // default everything to zeros, real data gets optionally added.
642 // ptr += sizeof(boost::uint16_t); // go past the first short
643 // const char *capabilities = 0;
644 // const char *description = 0;
645 // const char *code = 0;
646 // const char *status = 0;
647 bool notobject = false;
649 Element *str = new Element;
650 str->makeString("_result");
652 Element *number = new Element;
653 // add the transaction ID
654 number->makeNumber(transid);
656 Element top;
657 // top.makeObject("application");
658 top.makeObject();
660 switch (status) {
661 case RTMPMsg::APP_GC:
662 case RTMPMsg::APP_RESOURCE_LOWMEMORY:
663 case RTMPMsg::APP_SCRIPT_ERROR:
664 case RTMPMsg::APP_SCRIPT_WARNING:
665 case RTMPMsg::APP_SHUTDOWN:
666 case RTMPMsg::NC_CALL_BADVERSION:
667 case RTMPMsg::NC_CALL_FAILED:
668 // status = 0;
669 // code = "NetConnection.Call.Failed";
670 case RTMPMsg::NC_CONNECT_APPSHUTDOWN:
671 case RTMPMsg::NC_CONNECT_CLOSED:
672 case RTMPMsg::NC_CONNECT_FAILED:
674 // errstr = new Element;
675 // errstr->makeString("error");
676 boost::shared_ptr<cygnal::Element> level(new Element);
677 level->makeString("level", "error");
678 top.addProperty(level);
680 boost::shared_ptr<cygnal::Element> description(new Element);
681 description->makeString("description", "Connection Failed.");
682 top.addProperty(description);
684 boost::shared_ptr<cygnal::Element> code(new Element);
685 code->makeString("code", "Connection.Connect.Failed");
686 top.addProperty(code);
688 case RTMPMsg::NC_CONNECT_INVALID_APPLICATION:
689 case RTMPMsg::NC_CONNECT_REJECTED:
691 // delete str;
692 // str = new Element;
693 // str->makeString("error");
694 boost::shared_ptr<cygnal::Element> level(new Element);
695 level->makeString("level", "error");
696 top.addProperty(level);
698 boost::shared_ptr<cygnal::Element> description(new Element);
699 description->makeString("description", "Connection Rejected.");
700 top.addProperty(description);
702 boost::shared_ptr<cygnal::Element> code(new Element);
703 code->makeString("code", "NetConnection.Connect.Rejected");
704 top.addProperty(code);
706 case RTMPMsg::NC_CONNECT_SUCCESS:
708 boost::shared_ptr<cygnal::Element> level(new Element);
709 level->makeString("level", "status");
710 top.addProperty(level);
712 boost::shared_ptr<cygnal::Element> code(new Element);
713 code->makeString("code", "NetConnection.Connect.Success");
714 top.addProperty(code);
716 boost::shared_ptr<cygnal::Element> description(new Element);
717 description->makeString("description", "Connection succeeded.");
718 top.addProperty(description);
720 break;
721 case RTMPMsg::NS_CLEAR_FAILED:
722 case RTMPMsg::NS_CLEAR_SUCCESS:
723 // After a successful NetConnection, we get a
724 // NetStream::createStream.
725 case RTMPMsg::NS_DATA_START:
726 case RTMPMsg::NS_FAILED:
727 case RTMPMsg::NS_INVALID_ARGUMENT:
728 // The response to a successful pauseStream command is this
729 // message.
730 case RTMPMsg::NS_PAUSE_NOTIFY:
732 str->makeString("onStatus");
734 boost::shared_ptr<cygnal::Element> level(new Element);
735 level->makeString("level", "status");
736 top.addProperty(level);
738 boost::shared_ptr<cygnal::Element> code(new Element);
739 code->makeString("code", "NetStream.Pause.Notify");
740 top.addProperty(code);
742 boost::shared_ptr<cygnal::Element> description(new Element);
743 string field = "Pausing ";
744 if (!filename.empty()) {
745 field += filename;
747 description->makeString("description", field);
748 top.addProperty(description);
750 boost::shared_ptr<cygnal::Element> details(new Element);
751 details->makeString("details", filename);
752 top.addProperty(details);
754 boost::shared_ptr<cygnal::Element> cid(new Element);
755 cid->makeNumber("clientid", clientid);
756 top.addProperty(cid);
758 break;
760 case RTMPMsg::NS_PLAY_COMPLETE:
761 case RTMPMsg::NS_PLAY_FAILED:
762 case RTMPMsg::NS_PLAY_FILE_STRUCTURE_INVALID:
763 case RTMPMsg::NS_PLAY_INSUFFICIENT_BW:
764 case RTMPMsg::NS_PLAY_NO_SUPPORTED_TRACK_FOUND:
765 case RTMPMsg::NS_PLAY_PUBLISHNOTIFY:
766 break;
767 // Reset the stream. We also do this after receiving a
768 // NetStream::createStream() packet
769 case RTMPMsg::NS_PLAY_RESET:
771 str->makeString("onStatus");
772 // "clientid"
773 boost::shared_ptr<cygnal::Element> level(new Element);
774 level->makeString("level", "status");
775 top.addProperty(level);
777 boost::shared_ptr<cygnal::Element> code(new Element);
778 code->makeString("code", "NetStream.Play.Reset");
779 top.addProperty(code);
781 boost::shared_ptr<cygnal::Element> description(new Element);
782 string field = "Playing and resetting ";
783 if (!filename.empty()) {
784 field += filename;
786 description->makeString("description", field);
787 top.addProperty(description);
789 boost::shared_ptr<cygnal::Element> details(new Element);
790 details->makeString("details", filename);
791 top.addProperty(details);
793 boost::shared_ptr<cygnal::Element> cid(new Element);
794 #ifdef CLIENT_ID_NUMERIC
795 double clientid = createClientID();
796 cid->makeNumber("clientid", clientid);
797 #else
798 string clientid;
799 if (!_clientids[transid].empty()) {
800 clientid =_clientids[transid].c_str();
801 } else {
802 clientid = createClientID();
803 _clientids[transid] = clientid;
805 cid->makeString("clientid", _clientids[transid]);
806 #endif
807 top.addProperty(cid);
809 break;
811 case RTMPMsg::NS_PLAY_START:
813 str->makeString("onStatus");
815 boost::shared_ptr<cygnal::Element> level(new Element);
816 level->makeString("level", "status");
817 top.addProperty(level);
819 boost::shared_ptr<cygnal::Element> code(new Element);
820 code->makeString("code", "NetStream.Play.Start");
821 top.addProperty(code);
823 boost::shared_ptr<cygnal::Element> description(new Element);
824 string field = "Started playing ";
825 if (!filename.empty()) {
826 field += filename;
828 description->makeString("description", field);
829 top.addProperty(description);
831 boost::shared_ptr<cygnal::Element> details(new Element);
832 details->makeString("details", filename);
833 top.addProperty(details);
835 boost::shared_ptr<cygnal::Element> cid(new Element);
836 #ifdef CLIENT_ID_NUMERIC
837 double clientid = createClientID();
838 cid->makeNumber("clientid", clientid);
839 #else
840 string clientid;
841 if (!_clientids[transid].empty()) {
842 clientid =_clientids[transid].c_str();
843 } else {
844 clientid = createClientID();
845 _clientids[transid] = clientid;
847 cid->makeString("clientid", _clientids[transid]);
848 #endif
849 top.addProperty(cid);
851 break;
853 case RTMPMsg::NS_PLAY_STOP:
854 case RTMPMsg::NS_PLAY_STREAMNOTFOUND:
856 boost::shared_ptr<cygnal::Element> level(new Element);
857 level->makeString("level", "error");
858 top.addProperty(level);
860 boost::shared_ptr<cygnal::Element> description(new Element);
861 description->makeString("description", "NetStream.Play.StreamNotFound.");
862 top.addProperty(description);
864 boost::shared_ptr<cygnal::Element> code(new Element);
865 code->makeString("code", "NetStream.Play.StreamNotFound");
866 top.addProperty(code);
867 break;
869 case RTMPMsg::NS_PLAY_SWITCH:
870 case RTMPMsg::NS_PLAY_UNPUBLISHNOTIFY:
871 case RTMPMsg::NS_PUBLISH_BADNAME:
872 case RTMPMsg::NS_PUBLISH_START:
873 case RTMPMsg::NS_RECORD_FAILED:
874 case RTMPMsg::NS_RECORD_NOACCESS:
875 case RTMPMsg::NS_RECORD_START:
876 case RTMPMsg::NS_RECORD_STOP:
877 // The reponse to a failed seekStream is this message.
878 case RTMPMsg::NS_SEEK_FAILED:
879 // The reponse to a successful seekStream is this message.
880 case RTMPMsg::NS_SEEK_NOTIFY:
881 break;
882 // The response to a successful pauseStream command is this
883 // message when the stream is started again.
884 case RTMPMsg::NS_UNPAUSE_NOTIFY:
885 case RTMPMsg::NS_UNPUBLISHED_SUCCESS:
886 case RTMPMsg::SO_CREATION_FAILED:
887 case RTMPMsg::SO_NO_READ_ACCESS:
888 case RTMPMsg::SO_NO_WRITE_ACCESS:
889 case RTMPMsg::SO_PERSISTENCE_MISMATCH:
890 break;
891 // The response for a createStream message is the
892 // transaction ID, followed by the command object (usually a
893 // NULL object), and the Stream ID. The Stream ID is just a
894 // simple incrementing counter of streams.
895 case RTMPMsg::NS_CREATE_STREAM:
897 // Don't encode as an object, just the properties
898 notobject = true;
900 boost::shared_ptr<cygnal::Element> id2(new Element);
902 double sid = createStreamID();
903 id2->makeNumber(sid);
904 top.addProperty(id2);
906 break;
908 // There is no response to a deleteStream request.
909 case RTMPMsg::NS_DELETE_STREAM:
910 default:
911 break;
914 boost::shared_ptr<cygnal::Buffer> strbuf = str->encode();
915 boost::shared_ptr<cygnal::Buffer> numbuf = number->encode();
916 boost::shared_ptr<cygnal::Buffer> topbuf = top.encode(notobject);
918 boost::shared_ptr<cygnal::Buffer> buf(new Buffer(strbuf->size() + numbuf->size() + topbuf->size()));
919 *buf += strbuf;
920 *buf += numbuf;
921 boost::uint8_t byte = static_cast<boost::uint8_t>(RTMP::WINDOW_SIZE & 0x000000ff);
922 *buf += byte;
923 *buf += topbuf;
925 delete str;
926 delete number;
928 return buf;
930 // A Ping packet has two parameters that ae always specified, and 2 that are optional.
931 // The first two bytes are the ping type, as in rtmp_ping_e, the second is the ping
932 // target, which is always zero as far as we can tell.
934 // More notes from: http://jira.red5.org/confluence/display/docs/Ping
935 // type 0: Clear the stream. No third and fourth parameters. The second parameter could be 0.
936 // After the connection is established, a Ping 0,0 will be sent from server to client. The
937 // message will also be sent to client on the start of Play and in response of a Seek or
938 // Pause/Resume request. This Ping tells client to re-calibrate the clock with the timestamp
939 // of the next packet server sends.
940 // type 1: Tell the stream to clear the playing buffer.
941 // type 3: Buffer time of the client. The third parameter is the buffer time in millisecond.
942 // type 4: Reset a stream. Used together with type 0 in the case of VOD. Often sent before type 0.
943 // type 6: Ping the client from server. The second parameter is the current time.
944 // type 7: Pong reply from client. The second parameter is the time the server sent with his
945 // ping request.
947 // A RTMP Ping packet looks like this: "02 00 00 00 00 00 06 04 00 00 00 00 00 00 00 00 00 0",
948 // which is the Ping type byte, followed by two shorts that are the parameters. Only the first
949 // two paramters are required.
950 boost::shared_ptr<Buffer>
951 RTMPServer::encodePing(rtmp_ping_e type)
953 // GNASH_REPORT_FUNCTION;
954 return encodePing(type, 0);
957 boost::shared_ptr<Buffer>
958 RTMPServer::encodePing(rtmp_ping_e type, boost::uint32_t milliseconds)
960 // GNASH_REPORT_FUNCTION;
962 // An encoded ping message
963 boost::shared_ptr<cygnal::Buffer> buf(new Buffer(sizeof(boost::uint16_t) * 3));
964 // boost::uint8_t *ptr = buf->reference();
966 // Set the type of this ping message
967 boost::uint16_t typefield = htons(type);
968 *buf = typefield;
970 // // go past the first short, which is the type field
971 // ptr += sizeof(boost::uint16_t);
973 boost::uint32_t swapped = 0;
974 switch (type) {
975 // These two don't appear to have any paramaters
976 case PING_CLEAR:
977 case PING_PLAY:
978 break;
979 // the third parameter is the buffer time in milliseconds
980 case PING_TIME:
982 // ptr += sizeof(boost::uint16_t); // go past the second short
983 swapped = milliseconds;
984 swapBytes(&swapped, sizeof(boost::uint32_t));
985 *buf += swapped;
986 break;
988 // reset doesn't have any parameters but zeros
989 case PING_RESET:
991 boost::uint16_t zero = 0;
992 *buf += zero;
993 *buf += zero;
994 break;
996 // For Ping and Pong, the second parameter is always the milliseconds
997 case PING_CLIENT:
998 case PONG_CLIENT:
1000 // swapped = htonl(milliseconds);
1001 swapped = milliseconds;
1002 swapBytes(&swapped, sizeof(boost::uint32_t));
1003 *buf += swapped;
1004 break;
1006 default:
1007 break;
1010 // Manually adjust the seek pointer since we added the data by
1011 // walking ou own temporary pointer, so none of the regular ways
1012 // of setting the seek pointer are appropriate.
1013 // buf->setSeekPointer(buf->reference() + buf->size());
1015 return buf;
1018 // Encode a onBWDone message for the client. These are of a fixed size.
1019 boost::shared_ptr<cygnal::Buffer>
1020 RTMPServer::encodeBWDone(double id)
1022 // GNASH_REPORT_FUNCTION;
1023 string command = "onBWDone";
1025 Element cmd;
1026 cmd.makeString(command);
1028 Element num;
1029 num.makeNumber(id);
1031 Element null;
1032 null.makeNull();
1034 boost::shared_ptr<cygnal::Buffer> enccmd = cmd.encode();
1035 boost::shared_ptr<cygnal::Buffer> encnum = num.encode();
1036 boost::shared_ptr<cygnal::Buffer> encnull = null.encode();
1038 boost::shared_ptr<cygnal::Buffer> buf(new cygnal::Buffer(enccmd->size()
1039 + encnum->size()
1040 + encnull->size()));
1042 *buf += enccmd;
1043 *buf += encnum;
1044 *buf += encnull;
1046 return buf;
1049 boost::shared_ptr<cygnal::Buffer>
1050 RTMPServer::encodeAudio(boost::uint8_t *data, size_t size)
1052 GNASH_REPORT_FUNCTION;
1054 boost::shared_ptr<cygnal::Buffer> buf;
1056 if (size) {
1057 if (data) {
1058 buf.reset(new cygnal::Buffer(size));
1059 buf->copy(data, size);
1063 return buf;
1066 boost::shared_ptr<cygnal::Buffer>
1067 RTMPServer::encodeVideo(boost::uint8_t *data, size_t size)
1069 GNASH_REPORT_FUNCTION;
1072 #if 0
1073 // Parse an Echo Request message coming from the Red5 echo_test. This
1074 // method should only be used for testing purposes.
1075 vector<boost::shared_ptr<cygnal::Element > >
1076 RTMPServer::parseEchoRequest(boost::uint8_t *ptr, size_t size)
1078 // GNASH_REPORT_FUNCTION;
1079 AMF amf;
1080 vector<boost::shared_ptr<cygnal::Element > > headers;
1082 // The first element is the name of the test, 'echo'
1083 boost::shared_ptr<cygnal::Element> el1 = amf.extractAMF(ptr, ptr+size);
1084 ptr += amf.totalsize();
1085 headers.push_back(el1);
1087 // The second element is the number of the test,
1088 boost::shared_ptr<cygnal::Element> el2 = amf.extractAMF(ptr, ptr+size);
1089 ptr += amf.totalsize();
1090 headers.push_back(el2);
1092 // This one has always been a NULL object from my tests
1093 boost::shared_ptr<cygnal::Element> el3 = amf.extractAMF(ptr, ptr+size);
1094 ptr += amf.totalsize();
1095 headers.push_back(el3);
1097 // This one has always been an NULL or Undefined object from my tests
1098 boost::shared_ptr<cygnal::Element> el4 = amf.extractAMF(ptr, ptr+size);
1099 if (!el4) {
1100 log_error("Couldn't reliably extract the echo data!");
1102 ptr += amf.totalsize();
1103 headers.push_back(el4);
1105 return headers;
1108 // format a response to the 'echo' test used for testing Gnash. This
1109 // is only used for testing by developers. The format appears to be
1110 // a string '_result', followed by the number of the test, and then two
1111 // NULL objects.
1112 boost::shared_ptr<cygnal::Buffer>
1113 RTMPServer::formatEchoResponse(double num, cygnal::Element &el)
1115 // GNASH_REPORT_FUNCTION;
1116 boost::shared_ptr<cygnal::Buffer> data = amf::AMF::encodeElement(el);
1117 return formatEchoResponse(num, data->reference(), data->allocated());
1120 boost::shared_ptr<cygnal::Buffer>
1121 RTMPServer::formatEchoResponse(double num, cygnal::Buffer &data)
1123 // GNASH_REPORT_FUNCTION;
1124 return formatEchoResponse(num, data.reference(), data.allocated());
1127 boost::shared_ptr<cygnal::Buffer>
1128 RTMPServer::formatEchoResponse(double num, boost::uint8_t *data, size_t size)
1130 // GNASH_REPORT_FUNCTION;
1132 string result = "_result";
1133 Element echo;
1134 echo.makeString(result);
1136 Element index;
1137 index.makeNumber(num);
1139 Element null;
1140 null.makeNull();
1142 boost::shared_ptr<cygnal::Buffer> encecho = echo.encode();
1143 boost::shared_ptr<cygnal::Buffer> encidx = index.encode();
1144 boost::shared_ptr<cygnal::Buffer> encnull = null.encode();
1146 boost::shared_ptr<cygnal::Buffer> buf(new cygnal::Buffer(encecho->size()
1147 + encidx->size()
1148 + encnull->size() + size));
1150 *buf = encecho;
1151 *buf += encidx;
1152 *buf += encnull;
1153 buf->append(data, size);
1155 return buf;
1157 #endif
1159 // Create a new client ID, which appears to be a random double,
1160 // although I also see a temporary 8 character string used often as
1161 // well.
1162 #ifdef CLIENT_ID_NUMERIC
1163 double
1164 RTMPServer::createClientID()
1166 // GNASH_REPORT_FUNCTION;
1168 boost::mt19937 seed;
1169 // Pick the number of errors to create based on the Buffer's data size
1170 boost::uniform_real<> numbers(1, 65535);
1172 double id = numbers(seed);
1173 _clientids.push_back(id);
1175 return id;
1177 #else
1178 std::string
1179 RTMPServer::createClientID()
1181 // GNASH_REPORT_FUNCTION;
1182 string id;
1184 // FIXME: This turns out to be a crappy random number generator,
1185 // and should be replaced with something less repititous.
1186 #if 0
1187 boost::mt19937 seed;
1188 for (size_t i=0; i < 8; i++) {
1189 boost::uniform_int<> numbers(0x30, 0x7a);
1190 id += numbers(seed);
1192 #else
1193 char letters[] =
1194 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1195 boost::uint64_t random_time_bits = 0;
1196 boost::uint64_t value = 0;
1197 # ifdef HAVE_GETTIMEOFDAY
1198 timeval tv;
1199 gettimeofday(&tv, NULL);
1200 random_time_bits = ((uint64_t)tv.tv_usec << 16) ^ tv.tv_sec;
1201 # else
1202 random_time_bits = time(NULL);
1203 # endif
1204 value += random_time_bits ^ getpid();
1205 boost::uint64_t v = value;
1206 id = letters[v % 62];
1207 v /= 62;
1208 id += letters[v % 62];
1209 v /= 62;
1210 id += letters[v % 62];
1211 v /= 62;
1212 id += letters[v % 62];
1213 v /= 62;
1214 id += letters[v % 62];
1215 v /= 62;
1216 id += letters[v % 62];
1217 v /= 62;
1218 id += letters[v % 62];
1219 v /= 62;
1220 #endif
1222 return id;
1224 #endif
1226 // Get the next streamID
1227 double
1228 RTMPServer::createStreamID()
1230 // GNASH_REPORT_FUNCTION;
1231 return _streamid++;
1234 bool
1235 RTMPServer::sendFile(int fd, const std::string &filespec)
1237 GNASH_REPORT_FUNCTION;
1238 // See if the file is in the cache and already opened.
1239 boost::shared_ptr<DiskStream> filestream(cache.findFile(filespec));
1240 if (filestream) {
1241 cerr << "FIXME: found file in cache!" << endl;
1242 } else {
1243 filestream.reset(new DiskStream);
1244 // cerr << "New Filestream at 0x" << hex << filestream.get() << endl;
1246 // cache.addFile(url, filestream); FIXME: always reload from disk for now.
1248 // Open the file and read the first chunk into memory
1249 if (!filestream->open(filespec)) {
1250 return false;
1251 } else {
1252 // Get the file size for the HTTP header
1253 if (filestream->getFileType() == DiskStream::FILETYPE_NONE) {
1254 return false;
1255 } else {
1256 cache.addPath(filespec, filestream->getFilespec());
1261 size_t filesize = filestream->getFileSize();
1262 size_t bytes_read = 0;
1263 int ret = 0;
1264 size_t page = 0;
1265 if (filesize) {
1266 #ifdef USE_STATS_CACHE
1267 struct timespec start;
1268 clock_gettime (CLOCK_REALTIME, &start);
1269 #endif
1270 size_t getbytes = 0;
1271 if (filesize <= filestream->getPagesize()) {
1272 getbytes = filesize;
1273 } else {
1274 getbytes = filestream->getPagesize();
1276 if (filesize >= CACHE_LIMIT) {
1277 if (sendMsg(fd, getChannel(), RTMP::HEADER_12, filesize,
1278 RTMP::NOTIFY, RTMPMsg::FROM_SERVER, filestream->get(),
1279 filesize)) {
1281 do {
1282 filestream->loadToMem(page);
1283 // ret = writeNet(fd, filestream->get(), getbytes);
1284 // if (ret <= 0) {
1285 // break;
1286 // }
1287 if (sendMsg(fd, getChannel(), RTMP::HEADER_4, filesize,
1288 RTMP::NOTIFY, RTMPMsg::FROM_SERVER, filestream->get(),
1289 getbytes)) {
1291 bytes_read += ret;
1292 page += filestream->getPagesize();
1293 } while (bytes_read <= filesize);
1294 } else {
1295 filestream->loadToMem(filesize, 0);
1296 // ret = writeNet(fd, filestream->get(), filesize);
1297 if (sendMsg(fd, getChannel(), RTMP::HEADER_12, filesize,
1298 RTMP::NOTIFY, RTMPMsg::FROM_SERVER, filestream->get()+24,
1299 filesize-24)) {
1303 filestream->close();
1304 #ifdef USE_STATS_CACHE
1305 struct timespec end;
1306 clock_gettime (CLOCK_REALTIME, &end);
1307 double time = (end.tv_sec - start.tv_sec) + ((end.tv_nsec - start.tv_nsec)/1e9);
1308 cerr << "File " << _filespec
1309 << " transferred " << filesize << " bytes in: " << fixed
1310 << time << " seconds for net fd #" << fd << endl;
1311 #endif
1314 return true;
1317 size_t
1318 RTMPServer::sendToClient(std::vector<int> &fds, cygnal::Buffer &data)
1320 // GNASH_REPORT_FUNCTION;
1321 return sendToClient(fds, data.reference(), data.allocated());
1324 size_t
1325 RTMPServer::sendToClient(std::vector<int> &fds, boost::uint8_t *data,
1326 size_t size)
1328 // GNASH_REPORT_FUNCTION;
1329 size_t ret = 0;
1331 std::vector<int>::iterator it;
1332 for (it=fds.begin(); it< fds.end(); it++) {
1333 ret = writeNet(data, size);
1336 return ret;
1339 // This is the thread for all incoming RTMP connections
1340 bool
1341 rtmp_handler(Network::thread_params_t *args)
1343 GNASH_REPORT_FUNCTION;
1345 Handler *hand = reinterpret_cast<Handler *>(args->handler);
1346 RTMPServer *rtmp = reinterpret_cast<RTMPServer *>(args->entry);
1347 // RTMPServer *rtmp = new RTMPServer;
1349 string docroot = args->filespec;
1350 string url, filespec;
1351 url = docroot;
1352 bool done = false;
1353 boost::shared_ptr<RTMPMsg> body;
1354 static bool initialize = true;
1355 // bool sendfile = false;
1356 log_network(_("Starting RTMP Handler for fd #%d, cgi-bin is \"%s\""),
1357 args->netfd, args->filespec);
1359 #ifdef USE_STATISTICS
1360 struct timespec start;
1361 clock_gettime (CLOCK_REALTIME, &start);
1362 #endif
1364 // Adjust the timeout
1365 rtmp->setTimeout(10);
1367 boost::shared_ptr<cygnal::Buffer> pkt;
1368 boost::shared_ptr<cygnal::Element> tcurl;
1369 boost::shared_ptr<cygnal::Element> swfurl;
1370 boost::shared_ptr<cygnal::Buffer> response;
1372 // Keep track of the network statistics
1373 // See if we have any messages waiting. After the initial connect, this is
1374 // the main loop for processing messages.
1376 // Adjust the timeout
1377 rtmp->setTimeout(30);
1378 // boost::shared_ptr<cygnal::Buffer> buf;
1380 // If we have active disk streams, send those packets first.
1381 // 0 is a reserved stream, so we start with 1, as the reserved
1382 // stream isn't one we care about here.
1383 log_network("%d active disk streams", hand->getActiveDiskStreams());
1384 for (int i=1; i <= hand->getActiveDiskStreams(); i++) {
1385 hand->getDiskStream(i)->dump();
1386 if (hand->getDiskStream(i)->getState() == DiskStream::PLAY) {
1387 boost::uint8_t *ptr = hand->getDiskStream(i)->get();
1388 if (ptr) {
1389 if (rtmp->sendMsg(hand->getClient(i), 8,
1390 RTMP::HEADER_8, 4096,
1391 RTMP::NOTIFY, RTMPMsg::FROM_SERVER,
1392 ptr, 4096)) {
1394 } else {
1395 log_network("ERROR: No stream for client %d", i);
1400 // This is the main message processing loop for rtmp. Most
1401 // messages received require a response.
1402 do {
1403 // If there is no data left from the previous chunk, process
1404 // that before reading more data.
1405 if (pkt != 0) {
1406 log_network("data left from previous packet");
1407 } else {
1408 pkt = rtmp->recvMsg(args->netfd);
1411 if (pkt != 0) {
1412 boost::uint8_t *tmpptr = 0;
1413 if (pkt->allocated()) {
1414 boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
1415 if (!que) {
1416 // FIXME: send _error result
1417 return false;
1419 boost::shared_ptr<RTMP::rtmp_head_t> qhead;
1420 for (size_t i=0; i<que->size(); i++) {
1421 boost::shared_ptr<cygnal::Buffer> bufptr = que->at(i)->pop();
1422 // que->at(i)->dump();
1423 if (bufptr) {
1424 // bufptr->dump();
1425 qhead = rtmp->decodeHeader(bufptr->reference());
1426 if (!qhead) {
1427 return false;
1429 // log_network("Message for channel #%d", qhead->channel);
1430 tmpptr = bufptr->reference() + qhead->head_size;
1431 if (qhead->channel == RTMP_SYSTEM_CHANNEL) {
1432 if (qhead->type == RTMP::USER) {
1433 boost::shared_ptr<RTMP::user_event_t> user
1434 = rtmp->decodeUserControl(tmpptr);
1435 switch (user->type) {
1436 case RTMP::STREAM_START:
1437 log_unimpl("Stream Start");
1438 break;
1439 case RTMP::STREAM_EOF:
1440 log_unimpl("Stream EOF");
1441 break;
1442 case RTMP::STREAM_NODATA:
1443 log_unimpl("Stream No Data");
1444 break;
1445 case RTMP::STREAM_BUFFER:
1446 log_unimpl("Stream Set Buffer: %d", user->param2);
1447 break;
1448 case RTMP::STREAM_LIVE:
1449 log_unimpl("Stream Live");
1450 break;
1451 case RTMP::STREAM_PING:
1453 boost::shared_ptr<RTMP::rtmp_ping_t> ping
1454 = rtmp->decodePing(tmpptr);
1455 log_network("Processed Ping message from client, type %d",
1456 ping->type);
1457 break;
1459 case RTMP::STREAM_PONG:
1460 log_unimpl("Stream Pong");
1461 break;
1462 default:
1463 break;
1465 } else if (qhead->type == RTMP::AUDIO_DATA) {
1466 log_network("Got the 1st Audio packet!");
1467 } else if (qhead->type == RTMP::VIDEO_DATA) {
1468 log_network("Got the 1st Video packet!");
1469 } else if (qhead->type == RTMP::WINDOW_SIZE) {
1470 log_network("Got the Window Set Size packet!");
1471 } else {
1472 log_network("Got unknown system message!");
1473 bufptr->dump();
1477 switch (qhead->type) {
1478 case RTMP::CHUNK_SIZE:
1479 log_unimpl("Set Chunk Size");
1480 break;
1481 case RTMP::BYTES_READ:
1482 log_unimpl("Bytes Read");
1483 break;
1484 case RTMP::ABORT:
1485 case RTMP::USER:
1486 // already handled as this is a system channel message
1487 return true;
1488 break;
1489 case RTMP::WINDOW_SIZE:
1490 log_unimpl("Set Window Size");
1491 break;
1492 case RTMP::SET_BANDWITH:
1493 log_unimpl("Set Bandwidth");
1494 break;
1495 case RTMP::ROUTE:
1496 case RTMP::AUDIO_DATA:
1497 case RTMP::VIDEO_DATA:
1498 case RTMP::SHARED_OBJ:
1499 body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
1500 log_network("SharedObject name is \"%s\"", body->getMethodName());
1501 break;
1502 case RTMP::AMF3_NOTIFY:
1503 log_unimpl("RTMP type %d", qhead->type);
1504 break;
1505 case RTMP::AMF3_SHARED_OBJ:
1506 log_unimpl("RTMP type %d", qhead->type);
1507 break;
1508 case RTMP::AMF3_INVOKE:
1509 log_unimpl("RTMP type %d", qhead->type);
1510 break;
1511 case RTMP::NOTIFY:
1512 log_unimpl("RTMP type %d", qhead->type);
1513 break;
1514 case RTMP::INVOKE:
1516 body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
1517 if (!body) {
1518 log_error("Error INVOKING method \"%s\"!", body->getMethodName());
1519 continue;
1521 log_network("INVOKEing method \"%s\"", body->getMethodName());
1522 // log_network("%s", hexify(tmpptr, qhead->bodysize, true));
1524 // These next Invoke methods are for the
1525 // NetStream class, which like NetConnection,
1526 // is a speacial one handled directly by the
1527 // server instead of any cgi-bin plugins.
1528 double transid = body->getTransactionID();
1529 log_network("The Transaction ID from the client is: %g", transid);
1530 if (body->getMethodName() == "createStream") {
1531 hand->createStream(transid);
1532 response = rtmp->encodeResult(RTMPMsg::NS_CREATE_STREAM, transid);
1533 if (rtmp->sendMsg(args->netfd, qhead->channel,
1534 RTMP::HEADER_8, response->allocated(),
1535 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1536 *response)) {
1538 } else if (body->getMethodName() == "play") {
1539 string filespec;
1540 boost::shared_ptr<gnash::RTMPMsg> nc = rtmp->getNetConnection();
1541 boost::shared_ptr<cygnal::Element> tcurl = nc->findProperty("tcUrl");
1542 URL url(tcurl->to_string());
1543 filespec += url.hostname() + url.path();
1544 filespec += '/';
1545 filespec += body->at(1)->to_string();
1547 if (hand->playStream(filespec)) {
1548 // Send the Set Chunk Size response
1549 #if 1
1550 response = rtmp->encodeChunkSize(4096);
1551 if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
1552 RTMP::HEADER_12, response->allocated(),
1553 RTMP::CHUNK_SIZE, RTMPMsg::FROM_SERVER,
1554 *response)) {
1556 #endif
1557 // Send the Play.Resetting response
1558 response = rtmp->encodeResult(RTMPMsg::NS_PLAY_RESET, body->at(1)->to_string(), transid);
1559 if (rtmp->sendMsg(args->netfd, qhead->channel,
1560 RTMP::HEADER_8, response->allocated(),
1561 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1562 *response)) {
1564 // Send the Play.Start response
1565 response = rtmp->encodeResult(RTMPMsg::NS_PLAY_START, body->at(1)->to_string(), transid);
1566 if (rtmp->sendMsg(args->netfd, qhead->channel,
1567 RTMP::HEADER_8, response->allocated(),
1568 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1569 *response)) {
1571 } else {
1572 response = rtmp->encodeResult(RTMPMsg::NS_PLAY_STREAMNOTFOUND, body->at(1)->to_string(), transid);
1573 if (rtmp->sendMsg(args->netfd, qhead->channel,
1574 RTMP::HEADER_8, response->allocated(),
1575 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1576 *response)) {
1579 sleep(1); // FIXME: debugging crap
1580 // Send the User Control - Stream Live
1581 response = rtmp->encodeUserControl(RTMP::STREAM_LIVE, 1);
1582 if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
1583 RTMP::HEADER_12, response->allocated(),
1584 RTMP::USER, RTMPMsg::FROM_SERVER,
1585 *response)) {
1587 sleep(1); // FIXME: debugging crap
1588 // Send an empty Audio packet to get
1589 // things started.
1590 if (rtmp->sendMsg(args->netfd, 6,
1591 RTMP::HEADER_12, 0,
1592 RTMP::AUDIO_DATA, RTMPMsg::FROM_SERVER,
1593 0, 0)) {
1595 // Send an empty Video packet to get
1596 // things started.
1597 if (rtmp->sendMsg(args->netfd, 5,
1598 RTMP::HEADER_12, 0,
1599 RTMP::VIDEO_DATA, RTMPMsg::FROM_SERVER,
1600 0, 0)) {
1602 sleep(1); // FIXME: debugging crap
1603 // Send the User Control - Stream Start
1604 response = rtmp->encodeUserControl(RTMP::STREAM_START, 1);
1605 if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
1606 RTMP::HEADER_12, response->allocated(),
1607 RTMP::USER, RTMPMsg::FROM_SERVER,
1608 *response)) {
1610 int active_stream = hand->getActiveDiskStreams();
1611 boost::uint8_t *ptr = hand->getDiskStream(active_stream)->get();
1612 if (ptr) {
1613 log_network("Sending %s to client",
1614 hand->getDiskStream(active_stream)->getFilespec());
1615 if (rtmp->sendMsg(args->netfd, 5,
1616 RTMP::HEADER_12, 400,
1617 RTMP::NOTIFY, RTMPMsg::FROM_SERVER,
1618 ptr, 400)) {
1619 log_network("Sent first page to client");
1622 } else if (body->getMethodName() == "seek") {
1623 hand->seekStream();
1624 } else if (body->getMethodName() == "pause") {
1625 hand->pauseStream(transid);
1626 } else if (body->getMethodName() == "close") {
1627 hand->closeStream(transid);
1628 } else if (body->getMethodName() == "resume") {
1629 hand->resumeStream(transid);
1630 } else if (body->getMethodName() == "delete") {
1631 hand->deleteStream(transid);
1632 } else if (body->getMethodName() == "publish") {
1633 hand->publishStream();
1634 } else if (body->getMethodName() == "togglePause") {
1635 hand->togglePause(transid);
1636 // This is a server installation specific method.
1637 } else if (body->getMethodName() == "FCSubscribe") {
1638 hand->setFCSubscribe(body->at(0)->to_string());
1639 } else if (body->getMethodName() == "_error") {
1640 log_error("Received an _error message from the client!");
1641 } else {
1642 /* size_t ret = */ hand->writeToPlugin(tmpptr, qhead->bodysize);
1643 boost::shared_ptr<cygnal::Buffer> result = hand->readFromPlugin();
1644 if (result) {
1645 if (rtmp->sendMsg(args->netfd, qhead->channel,
1646 RTMP::HEADER_8, result->allocated(),
1647 RTMP::INVOKE, RTMPMsg::FROM_SERVER,
1648 *result)) {
1649 log_network("Sent response to client.");
1652 done = true;
1654 break;
1656 case RTMP::FLV_DATA:
1657 log_unimpl("RTMP type %d", qhead->type);
1658 break;
1659 default:
1660 log_error (_("ERROR: Unidentified AMF header data type 0x%x"), qhead->type);
1661 break;
1664 // body->dump();
1666 // size_t ret = hand->writeToPlugin(tmpptr, qhead->bodysize);
1667 #if 0
1668 boost::shared_ptr<cygnal::Buffer> result = hand->readFromPlugin();
1669 if (result) { // FIXME: this needs a real channel number
1670 if (rtmp->sendMsg(args->netfd, 0x3, RTMP::HEADER_8, ret,
1671 RTMP::INVOKE, RTMPMsg::FROM_SERVER, *result)) {
1672 log_network("Sent response to client.");
1675 #endif
1676 // log_network("RET is: %d", ret);
1677 } // end of processing all the messages in the que
1679 // we're done processing these packets, so get rid of them
1680 pkt.reset();
1683 } else {
1684 log_network("Never read any data from fd #%d", args->netfd);
1685 #if 0
1686 // Send a ping to reset the new stream
1687 boost::shared_ptr<cygnal::Buffer> ping_reset =
1688 rtmp->encodePing(RTMP::PING_CLEAR, 0);
1689 if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
1690 RTMP::HEADER_12, ping_reset->size(),
1691 RTMP::PING, RTMPMsg::FROM_SERVER, *ping_reset)) {
1692 log_network("Sent Ping to client");
1693 } else {
1694 log_error("Couldn't send Ping to client!");
1696 #endif
1697 initialize = true;
1698 return true;
1700 } else {
1701 // log_error("Communication error with client using fd #%d", args->netfd);
1702 rtmp->closeNet(args->netfd);
1703 initialize = true;
1704 return false;
1706 } while (!done);
1708 return true;
1711 } // end of gnash namespace
1713 // local Variables:
1714 // mode: C++
1715 // indent-tabs-mode: t
1716 // End: