gnash-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnash-commit] /srv/bzr/gnash/rtmp r9965: better handling of multiple RT


From: rob
Subject: [Gnash-commit] /srv/bzr/gnash/rtmp r9965: better handling of multiple RTMP messages that aren't the echo test.
Date: Wed, 04 Feb 2009 14:12:35 -0700
User-agent: Bazaar (1.5)

------------------------------------------------------------
revno: 9965
committer: address@hidden
branch nick: rtmp
timestamp: Wed 2009-02-04 14:12:35 -0700
message:
  better handling of multiple RTMP messages that aren't the echo test.
modified:
  cygnal/rtmp_server.cpp
=== modified file 'cygnal/rtmp_server.cpp'
--- a/cygnal/rtmp_server.cpp    2009-01-05 05:32:18 +0000
+++ b/cygnal/rtmp_server.cpp    2009-02-04 21:12:35 +0000
@@ -451,7 +451,20 @@
       break;
       case RTMPMsg::NS_CLEAR_FAILED:
       case RTMPMsg::NS_CLEAR_SUCCESS:
+         // After a successful NetConnection, we get a
+         // NetStream::createStream.
       case RTMPMsg::NS_DATA_START:
+      {
+         boost::shared_ptr<amf::Element> id1(new Element);
+         id1->makeNumber(2);
+         top.addProperty(id1);
+
+         boost::shared_ptr<amf::Element> id2(new Element);
+         id2->makeNumber(1);
+         top.addProperty(id2);
+         
+         break;
+      }
       case RTMPMsg::NS_FAILED:
       case RTMPMsg::NS_INVALID_ARGUMENT:
       case RTMPMsg::NS_PAUSE_NOTIFY:
@@ -532,20 +545,19 @@
 RTMPServer::encodePing(rtmp_ping_e type, boost::uint32_t milliseconds)
 {
 //    GNASH_REPORT_FUNCTION;
-    
+
+    // An encoded ping message 
     boost::shared_ptr<amf::Buffer> buf(new Buffer(sizeof(boost::uint16_t) * 
3));
-    boost::uint8_t *ptr = buf->reference();
-    buf->clear();      // default everything to zeros, real data gets 
optionally added.
-    // Manually adjust the seek pointer since we add the data by
-    // walking ou own temporary pointer, so none of the regular ways
-    // of setting the seek pointer are appropriate.
-    buf->setSeekPointer(buf->reference() + buf->size());
+//    boost::uint8_t *ptr = buf->reference();
 
+    // Set the type of this ping message
     boost::uint16_t typefield = htons(type);
-    ptr += sizeof(boost::uint16_t); // go past the first short
+    *buf = typefield;
+    
+//     // go past the first short, which is the type field
+//    ptr += sizeof(boost::uint16_t);
 
     boost::uint32_t swapped = 0;
-    *buf = typefield;
     switch (type) {
         // These two don't appear to have any paramaters
       case PING_CLEAR:
@@ -554,7 +566,7 @@
          // the third parameter is the buffer time in milliseconds
       case PING_TIME:
       {
-         ptr += sizeof(boost::uint16_t); // go past the second short
+//       ptr += sizeof(boost::uint16_t); // go past the second short
          swapped = milliseconds;
          swapBytes(&swapped, sizeof(boost::uint32_t));
          *buf += swapped;
@@ -562,7 +574,12 @@
       }
       // reset doesn't have any parameters but zeros
       case PING_RESET:
+      {
+         boost::uint16_t zero = 0;
+         *buf += zero;
+         *buf += zero;
          break;
+      }
       // For Ping and Pong, the second parameter is always the milliseconds
       case PING_CLIENT:
       case PONG_CLIENT:
@@ -580,7 +597,7 @@
     // Manually adjust the seek pointer since we added the data by
     // walking ou own temporary pointer, so none of the regular ways
     // of setting the seek pointer are appropriate.
-    buf->setSeekPointer(buf->reference() + buf->size());
+//    buf->setSeekPointer(buf->reference() + buf->size());
     
     return buf;
 }
@@ -742,16 +759,36 @@
        // NetConnection::connect().
        boost::shared_ptr<RTMP::rtmp_head_t> head = rtmp->decodeHeader(*pkt);
        boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
-       cerr << "FIXME Connect Que size is: " << que->size() << endl;
-       que->at(0)->dump();
 //    RTMP::queues_t *que = rtmp->split(start->reference() + head->head_size, 
start->size());
        if (que->size() > 0) {
-           boost::shared_ptr<amf::Buffer> bufptr = que->at(0)->pop();
-           body = rtmp->decodeMsgBody(bufptr->reference() + head->head_size, 
head->bodysize);
+           for (size_t i=0; i<que->size(); i++) {
+               boost::shared_ptr<amf::Buffer> bufptr = que->at(i)->pop();
+//             que->at(i)->dump();
+               if (bufptr) {
+//                 bufptr->dump();
+                   boost::shared_ptr<RTMP::rtmp_head_t> qhead = 
rtmp->decodeHeader(bufptr->reference());
+                   log_debug("Message for channel #%d", qhead->channel);
+                   if (qhead->channel == RTMP_SYSTEM_CHANNEL) {
+                       boost::shared_ptr<RTMP::rtmp_ping_t> ping = 
rtmp->decodePing(bufptr->reference());
+                       log_debug("Processed Ping message from client, type 
%d", ping->type);
+                       
+                   } else {
+                       // skip past the header bytes to the start of the data
+                       log_debug("Processing non system channel message!");
+                       boost::uint8_t *tmpptr = bufptr->reference() + 
qhead->head_size;
+                       body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
+                       body->setChannel(qhead->channel);
+//                     body->dump();
+                       break;
+                   }
+               } else {
+                   log_error("Message contains no data!");
+               }
+           }
        }
 
-       // Send a ping to clear the new stream
-       boost::shared_ptr<amf::Buffer> ping_reset = 
rtmp->encodePing(RTMP::PING_CLEAR, 0);
+       // Send a ping to reset the new stream
+       boost::shared_ptr<amf::Buffer> ping_reset = 
rtmp->encodePing(RTMP::PING_RESET, 0);
        if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL, RTMP::HEADER_12,
                          ping_reset->size(), RTMP::PING, RTMPMsg::FROM_SERVER, 
*ping_reset)) {
            log_debug("Sent Ping to client");
@@ -760,21 +797,34 @@
        }
        
        // send a response to the NetConnection::connect() request
-       boost::shared_ptr<amf::Buffer> response = 
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
-       if (rtmp->sendMsg(args->netfd, head->channel, RTMP::HEADER_12, 
response->allocated(),
+//     boost::shared_ptr<amf::Buffer> response = 
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+       boost::shared_ptr<amf::Buffer> response;
+       if (body == 0) {
+           response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_FAILED);
+           log_error("No body found in message!");
+           return false;
+       } else {
+           response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+           // The initial packet from the client is always a NetConnection 
object
+           // invoking the 'connect' method.
+           if (body->getMethodName() == "connect") {
+               tcurl  = body->findProperty("tcUrl");
+               if (tcurl) {
+                   log_debug("Client request for remote file is: %s", 
tcurl->to_string());
+               }
+               swfurl = body->findProperty("swfUrl");
+               if (swfurl) {
+                   log_debug("SWF filename making request is: %s", 
swfurl->to_string());
+               }
+           }
+       }
+       if (rtmp->sendMsg(args->netfd, body->getChannel(), RTMP::HEADER_12, 
response->allocated(),
                          RTMP::INVOKE, RTMPMsg::FROM_SERVER, *response)) {
            log_error("Sent NetConnection::connect() response to client.");
        } else {
            log_error("Couldn't send NetConnection::connect() response to 
client!");
        }
-       tcurl  = body->findProperty("tcUrl");
-       if (tcurl) {
-           log_debug("Client request for remote file is: %s", 
tcurl->to_string());
-       }
-       swfurl = body->findProperty("swfUrl");
-       if (swfurl) {
-           log_debug("SWF filename making request is: %s", 
swfurl->to_string());
-       }
+       
     } else {
        // Read the handshake bytes sent by the client when requesting
        // a connection.
@@ -788,12 +838,14 @@
 
     // See if this is a Red5 style echo test.
     string::size_type pos;
-    filespec = tcurl->to_string();
-    pos = filespec.rfind("/");
-    if (pos != string::npos) {
-       if (filespec.substr(pos, filespec.size()-pos) == "/echo") {
-           log_debug("Red5 echo test request!");
-           echo = true;
+    if (tcurl) {
+       filespec = tcurl->to_string();
+       pos = filespec.rfind("/");
+       if (pos != string::npos) {
+           if (filespec.substr(pos, filespec.size()-pos) == "/echo") {
+               log_debug("Red5 echo test request!");
+               echo = true;
+           }
        }
     }
   
@@ -810,12 +862,15 @@
     
 //     st.dump();
 
-    // See if we have any messages waiting
+    // See if we have any messages waiting. After the initial connect, this is
+    // the main loop for processing messages.
+    
+    // Adjust the timeout
+    rtmp->setTimeout(30);
     do {
        boost::shared_ptr<amf::Buffer> buf = rtmp->recvMsg(args->netfd);
        if (buf) {
            if (buf->allocated()) {
-               buf->dump();
                boost::uint8_t *ptr = buf->reference();
                if (ptr == 0) {
                    log_debug("Que empty, net connection dropped for fd #%d", 
args->netfd);
@@ -823,11 +878,15 @@
                }
                boost::shared_ptr<RTMP::rtmp_head_t> rthead = 
rtmp->decodeHeader(ptr);
                ptr += rthead->head_size; // skip past the header
+               // This is support for the Red5 'echo_test', which exercises 
encoding and
+               // decoding of complex and nested AMF data types. FIXME: this 
should be
+               // moved to a CGI type of thing that executes this as a 
separate process,
+               // using a socket to pass output back to the client.
                if (echo) {
                    boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*buf);
                    boost::shared_ptr<amf::Buffer> bufptr;
                    if (que->size() > 0) {
-//                     cerr << "FIXME echo Que size is: " << que->size() << 
endl;
+                       cerr << "FIXME2 echo Que size is: " << que->size() << 
endl;
                        bufptr = que->at(0)->pop();
                    }
                    // process the echo test request
@@ -855,10 +914,54 @@
                        done = true;
                    }
                } else {
-                   body = rtmp->decodeMsgBody(*buf);
+//                 buf->dump();
+                   // This is a non-Red5 message, which should be the normal 
mode of operating.
+                   boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*buf);
+                   if (que->size() > 0) {
+                       boost::shared_ptr<amf::Buffer> bufptr;
+                       if (que->size() > 0) {
+                           cerr << "FIXME3 Que size is: " << que->size() << 
endl;
+                           bufptr = que->at(0)->pop();
+                       }
+                       if (bufptr) {
+                           boost::shared_ptr<RTMP::rtmp_head_t> qhead = 
rtmp->decodeHeader(bufptr->reference());
+                           
+                           for (size_t i=0; i<que->size(); i++) {
+                               boost::uint8_t *tmpptr = bufptr->reference() + 
qhead->head_size;
+                               body = rtmp->decodeMsgBody(tmpptr, 
qhead->bodysize);
+                               boost::shared_ptr<amf::Buffer> response;
+                               if (body->getMethodName() == "createStream") {
+                                   response = 
rtmp->encodeResult(RTMPMsg::NS_DATA_START);
+                               } else {
+                                   response = 
rtmp->encodeResult(RTMPMsg::NS_FAILED);
+                               }
+                               
+                               if (rtmp->sendMsg(args->netfd, qhead->channel, 
RTMP::HEADER_8, response->allocated(),
+                                                 RTMP::INVOKE, 
RTMPMsg::FROM_SERVER, *response)) {
+                                   log_error("Sent response to client.");
+                               } else {
+                                   log_error("Couldn't send response to 
client!");
+                               }
+       
+                               
+                           }
+                       } else {
+                           log_error("%s:%d Message contains no data!", 
__FUNCTION__, __LINE__);
+                       }
+                   }
                }
            } else {
                log_error("Never read any data from fd #%d", args->netfd);
+#if 0
+               // Send a ping to reset the new stream
+               boost::shared_ptr<amf::Buffer> ping_reset = 
rtmp->encodePing(RTMP::PING_CLEAR, 0);
+               if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL, 
RTMP::HEADER_12,
+                                 ping_reset->size(), RTMP::PING, 
RTMPMsg::FROM_SERVER, *ping_reset)) {
+                   log_debug("Sent Ping to client");
+               } else {
+                   log_error("Couldn't send Ping to client!");
+               }
+#endif
                initialize = true;
                return false;
            }


reply via email to

[Prev in Thread] Current Thread [Next in Thread]