gnash-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnash-commit] /srv/bzr/gnash/rtmp r9916: refactor heavily to work with


From: rob
Subject: [Gnash-commit] /srv/bzr/gnash/rtmp r9916: refactor heavily to work with the new network engine and threading scheme.
Date: Sat, 27 Dec 2008 18:48:41 -0700
User-agent: Bazaar (1.5)

------------------------------------------------------------
revno: 9916
committer: address@hidden
branch nick: rtmp
timestamp: Sat 2008-12-27 18:48:41 -0700
message:
  refactor heavily to work with the new network engine and threading scheme.
modified:
  libnet/rtmp_server.cpp
  libnet/rtmp_server.h
=== modified file 'libnet/rtmp_server.cpp'
--- a/libnet/rtmp_server.cpp    2008-12-20 17:11:55 +0000
+++ b/libnet/rtmp_server.cpp    2008-12-28 01:48:41 +0000
@@ -52,13 +52,14 @@
 extern map<int, Handler *> handlers;
 
 RTMPServer::RTMPServer() 
+    : _filesize(0)
 {
 //    GNASH_REPORT_FUNCTION;
 //     _inbytes = 0;
 //     _outbytes = 0;
     
-//    _body = new unsigned char(RTMP_BODY_SIZE+1);
-//    memset(_body, 0, RTMP_BODY_SIZE+1);
+//    _body = new unsigned char(RTMP_HANDSHAKE_SIZE+1);
+//    memset(_body, 0, RTMP_HANDSHAKE_SIZE+1);
 }
 
 RTMPServer::~RTMPServer()
@@ -68,36 +69,35 @@
 //    delete _body;
 }
 
+#if 0
 // The handshake is a byte with the value of 0x3, followed by 1536
 // bytes of gibberish which we need to store for later.
 bool
-RTMPServer::handShakeWait()
+RTMPServer::processClientHandShake(int fd, amf::Buffer &buf)
 {
     GNASH_REPORT_FUNCTION;
 
-//     char buffer[RTMP_BODY_SIZE+16];
-//     memset(buffer, 0, RTMP_BODY_SIZE+16);
-    boost::shared_ptr<amf::Buffer> buf = _handler->pop();
-
-    if (buf == 0) {
-       log_debug("Que empty, net connection dropped for fd #%d", getFileFd());
+    if (buf.reference() == 0) {
+       log_debug("no data in buffer, net connection dropped for fd #%d", fd);
        return false;
-    }    
+    }
 
-    if (*buf->reference() == RTMP_HANDSHAKE) {
+    cerr << buf.hexify(false) << endl;
+    
+    if (*buf.reference() == RTMP_HANDSHAKE) {
         log_debug (_("Handshake request is correct"));
     } else {
         log_error (_("Handshake request isn't correct"));
         return false;
     }
 
-//     if (buf->size() >= RTMP_BODY_SIZE) {
+//     if (buf->size() >= RTMP_HANDSHAKE_SIZE) {
 //     secret = _handler->merge(buf->reference());
 //     }
 
-    if (buf->size() >= static_cast<size_t>(RTMP_BODY_SIZE)) {
-       _handshake = new amf::Buffer(RTMP_BODY_SIZE);
-       _handshake->copy(buf->reference() + 1, RTMP_BODY_SIZE);
+    if (buf.size() >= static_cast<size_t>(RTMP_HANDSHAKE_SIZE)) {
+       _handshake = new amf::Buffer(RTMP_HANDSHAKE_SIZE);
+       _handshake->copy(buf.reference() + 1, RTMP_HANDSHAKE_SIZE);
        log_debug (_("Handshake Data matched"));
 //     return true;
     } else {
@@ -107,93 +107,90 @@
     
     return true;
 }
+#endif
 
 // The response is the gibberish sent back twice, preceeded by a byte
 // with the value of 0x3.
 bool
-RTMPServer::handShakeResponse()
+RTMPServer::handShakeResponse(int fd, amf::Buffer &handshake)
 {
     GNASH_REPORT_FUNCTION;
 
-    boost::shared_ptr<amf::Buffer> buf1(new amf::Buffer(RTMP_BODY_SIZE + 1));
-    *buf1 = RTMP_HANDSHAKE;
-    *buf1 += _handshake;
-//  _handler->pushout(buf1); FIXME:
-
-    boost::shared_ptr<amf::Buffer> buf2(new amf::Buffer(RTMP_BODY_SIZE));
-    buf2->copy(_handshake->begin(), RTMP_BODY_SIZE);
-//    _handler->pushout(buf2); FIXME:
-    
-//     std::copy(_handshake->begin(), _handshake->end(), (buf1->begin() + 1)); 
   
-//     boost::shared_ptr<amf::Buffer> buf = new amf::Buffer(RTMP_BODY_SIZE + 
1);
-//     std::copy(_handshake->begin(), _handshake->end(), buf->begin() + 1 + 
RTMP_BODY_SIZE);
-//    _handler->notifyout();
-
-    log_debug("Sent RTMP Handshake response");
+    boost::uint8_t byte;
+    byte = RTMP_HANDSHAKE;
+    
+    int ret1 = writeNet(fd, &byte, 1);
+    int ret2 = writeNet(fd, handshake);
+    int ret3 = writeNet(fd, handshake);
+    
+    if ((ret2 == handshake.allocated()) && (ret3 == handshake.allocated())) {
+       log_debug("Sent RTMP Handshake response");
+    } else {
+       log_error("Couldn't sent RTMP Handshake response!");
+    }
 
     return true;    
 }
 
-bool
-RTMPServer::serverFinish()
+boost::shared_ptr<amf::Buffer>
+RTMPServer::serverFinish(int fd, amf::Buffer &handshake1, amf::Buffer 
&handshake2)
 {
     GNASH_REPORT_FUNCTION;
-
-    boost::shared_ptr<amf::Buffer> buf = _handler->pop();
-    boost::shared_ptr<amf::Buffer> obj = buf;
-    
-    if (buf == 0) {
-       log_debug("Que empty, net connection dropped for fd #%d", getFileFd());
-       return false;
-    }
-    
-    // The first data packet is often buried in with the end of the handshake.
-    // So after the handshake block, we strip that part off, and just pass on
-    // the remainder for processing.
-    if (buf->size() >= static_cast<size_t>(RTMP_BODY_SIZE)) {
-       size_t size = buf->size() - RTMP_BODY_SIZE;  
-       obj.reset(new amf::Buffer[size]);
-       obj->copy(buf->begin()+RTMP_BODY_SIZE, size);
-    } else {
-       _handler->wait();
-       obj = _handler->pop();
-    }
-    
-    int diff = std::memcmp(buf->begin(), _handshake->begin(), RTMP_BODY_SIZE);
+    boost::shared_ptr<amf::Buffer> buf;
+
+    if ((handshake1.reference() == 0) || (handshake2.reference() == 0)) {
+       log_debug("Que empty, net connection dropped for fd #%d", fd);
+       return buf;
+    }
+
+    int diff = std::memcmp(handshake1.begin(), handshake2.begin(), 
RTMP_HANDSHAKE_SIZE);
     if (diff == 0) {
        log_debug (_("Handshake Finish Data matched"));
     } else {
        log_error (_("Handshake Finish Data didn't match by %d bytes"), diff);
-//        return false;
+    }
+
+    // Copy the extra data from the end of the handshake to the new buffer. 
Normally we
+    // try to avoiud copying anything around, but as this is only used once 
for each connection,
+    // there isn't a real performance hit from it.
+    if (handshake2.allocated() >= static_cast<size_t>(RTMP_HANDSHAKE_SIZE)) {
+       log_debug("Got extra data in handshake, %d bytes for fd #%d",
+                 handshake2.allocated() - RTMP_HANDSHAKE_SIZE, fd);
+       buf.reset(new Buffer(handshake2.allocated() - RTMP_HANDSHAKE_SIZE));
+       buf->copy(handshake2.reference() + RTMP_HANDSHAKE_SIZE, 
handshake2.allocated() - RTMP_HANDSHAKE_SIZE);
     }
     
-    packetRead(obj);
-        
-    return true;
+//    packetRead(*buf);
+    return buf;
 }
 
 bool
-RTMPServer::packetSend(boost::shared_ptr<amf::Buffer>  /* buf */)
+RTMPServer::packetSend(amf::Buffer &/* buf */)
 {
     GNASH_REPORT_FUNCTION;
     return false;
 }
 
+// This overrides using same method from the base RTMP class.
 bool
-RTMPServer::packetRead(boost::shared_ptr<amf::Buffer> buf)
+RTMPServer::packetRead(amf::Buffer &buf)
 {
     GNASH_REPORT_FUNCTION;
 
-    unsigned int amf_index, headersize;
-    boost::uint8_t *ptr = buf->reference();
+    boost::uint8_t amf_index, headersize;
+    boost::uint8_t *ptr = buf.reference();
     AMF amf;
     
-    if (buf->reference() == 0) {
+    if (ptr == 0) {
        return false;
     }
-    
-    amf_index = *buf->reference() & RTMP_INDEX_MASK;
-    headersize = headerSize(*buf->reference());
+
+    cerr << "FIXME3: " << buf.hexify(true) << endl;
+    
+//    ptr += 1;                        // skip past the header byte
+    
+    amf_index = *ptr & RTMP_INDEX_MASK;
+    headersize = headerSize(*ptr);
     log_debug (_("The Header size is: %d"), headersize);
     log_debug (_("The AMF index is: 0x%x"), amf_index);
 
@@ -262,7 +259,7 @@
 //    buf->dump();
     if (buf->size() < actual_size) {
        log_debug("FIXME: MERGING");
-       buf = _handler->merge(buf);
+       buf = _que->merge(buf);
     }
     while ((ptr - buf->begin()) < static_cast<int>(actual_size)) {
        boost::shared_ptr<amf::Element> el(new amf::Element);
@@ -286,7 +283,7 @@
          break;
       case PING:
       {
-         rtmp_ping_t *ping = decodePing(ptr);
+         boost::shared_ptr<rtmp_ping_t> ping = decodePing(ptr);
          switch (ping->type) {
            case PING_CLEAR:
                break;
@@ -331,11 +328,12 @@
           log_error (_("ERROR: Unidentified RTMP message content type 0x%x"), 
_header.type);
           break;
     };
-    
+
+#if 0
     boost::shared_ptr<amf::Element> url = getProperty("tcUrl");
     boost::shared_ptr<amf::Element> file = getProperty("swfUrl");
     boost::shared_ptr<amf::Element> app = getProperty("app");
-
+    
     if (file) {
        log_debug("SWF file %s", file->to_string());
     }
@@ -345,6 +343,7 @@
     if (app) {
        log_debug("is file name is %s", app->to_string());
     }
+#endif
     
     return true;
 }
@@ -502,73 +501,53 @@
 }
 
 // This is the thread for all incoming RTMP connections
-void
+bool
 rtmp_handler(Network::thread_params_t *args)
 {
     GNASH_REPORT_FUNCTION;
-    Handler *hand = reinterpret_cast<Handler *>(args->handler);
-    RTMPServer rtmp;
-
-    rtmp.setHandler(hand);
+//    Handler *hand = reinterpret_cast<Handler *>(args->handler);
+    RTMPServer *rtmp = new RTMPServer;
     string docroot = args->filespec;
-
+    string url, filespec;
+    url = docroot;
+    bool done = false;
+    
     log_debug(_("Starting RTMP Handler for fd #%d, tid %ld"),
              args->netfd, get_thread_id());
     
-    while (!hand->timetodie()) {       
-       log_debug(_("Waiting for RTMP request on fd #%d..."), args->netfd);
-       hand->wait();
-       // This thread is the last to wake up when the browser
-       // closes the network connection. When browsers do this
-       // varies, elinks and lynx are very forgiving to a more
-       // flexible HTTP protocol, which Firefox/Mozilla & Opera
-       // are much pickier, and will hang or fail to load if
-       // you aren't careful.
-       if (hand->timetodie()) {
-           log_debug("Not waiting no more, no more for RTMP data for fd 
#%d...", args->netfd);
-           map<int, Handler *>::iterator hit = handlers.find(args->netfd);
-           if ((*hit).second) {
-               log_debug("Removing handle %x for RTMP on fd #%d", (void 
*)hand, args->netfd);
-               handlers.erase(args->netfd);
-           }
-
-           return;
-       }
 #ifdef USE_STATISTICS
-       struct timespec start;
-       clock_gettime (CLOCK_REALTIME, &start);
+    struct timespec start;
+    clock_gettime (CLOCK_REALTIME, &start);
 #endif
-       if (!rtmp.handShakeWait()) {
- //        hand->clearout();   // remove all data from the outgoing que
-           hand->die();        // tell all the threads for this connection to 
die
-           hand->notifyin();
-           log_debug("Net RTMP done for fd #%d...", args->netfd);
-//         hand->closeNet(args->netfd);
-           return;
-       }
-       string url, filespec;
-       url = docroot;
-       
-       rtmp.handShakeResponse();
-
-       hand->wait();
-       // This thread is the last to wake up when the browser
-       // closes the network connection. When browsers do this
-       // varies, elinks and lynx are very forgiving to a more
-       // flexible HTTP protocol, which Firefox/Mozilla & Opera
-       // are much pickier, and will hang or fail to load if
-       // you aren't careful.
-       if (hand->timetodie()) {
-           log_debug("Not waiting no more, no more for RTMP data for fd 
#%d...", args->netfd);
-           map<int, Handler *>::iterator hit = handlers.find(args->netfd);
-           if ((*hit).second) {
-               log_debug("Removing handle %x for RTMP on fd #%d", (void 
*)hand, args->netfd);
-               handlers.erase(args->netfd);
-           }
-
-           return;
-       }
-       rtmp.serverFinish();
+
+    // Adjust the timeout
+    rtmp->setTimeout(5);
+    
+    // Read the handshake bytes sent by the client when requesting
+    // a connection.
+    boost::shared_ptr<amf::Buffer> handshake1 = rtmp->recvMsg(args->netfd);
+    // See if we have data in the handshake, we should have 1537 bytes
+    if (handshake1->allocated() == 0) {
+       log_error("failed to read the handshake from the client.");
+       return false;
+    }
+
+    // Send our response to the handshake, which primarily is the bytes
+    // we just recieved.
+    rtmp->handShakeResponse(args->netfd, *handshake1);
+    
+    boost::shared_ptr<amf::Buffer> handshake2 = rtmp->recvMsg(args->netfd);
+    // See if we have data in the handshake, we should have 1536 bytes
+    if (handshake2->allocated() == 0) {
+       log_error("failed to read the handshake from the client.");
+       return false;
+    }
+    boost::shared_ptr<amf::Buffer> start = rtmp->serverFinish(args->netfd, 
*handshake1, *handshake2);
+
+    start->dump();
+    
+    boost::shared_ptr<RTMP::rtmp_head_t> head = rtmp->decodeHeader(*start);
+    rtmp->decodeMsgBody(*start);
     
     // Keep track of the network statistics
 //    Statistics st;
@@ -580,9 +559,25 @@
 //     st.addStats();
 //     proto.resetBytesIn();
 //     proto.resetBytesOut();  
-
+    
 //     st.dump(); 
-    }
+    do {
+       // See if we have any messages waiting
+       boost::shared_ptr<amf::Buffer> buf = rtmp->recvMsg(args->netfd);
+       if (buf->allocated()) {
+           boost::uint8_t *ptr = buf->reference();
+           if (ptr == 0) {
+               log_debug("Que empty, net connection dropped for fd #%d", 
args->netfd);
+               return false;
+           }
+           boost::shared_ptr<RTMP::rtmp_head_t> rthead = 
rtmp->decodeHeader(ptr);
+           rtmp->decodeMsgBody(*buf);
+       } else {
+           done = true;
+       }
+    } while (!done);
+
+       return false;
 }
 
 // A Ping packet has two parameters that ae always specified, and 2 that are 
optional.

=== modified file 'libnet/rtmp_server.h'
--- a/libnet/rtmp_server.h      2008-12-15 03:46:09 +0000
+++ b/libnet/rtmp_server.h      2008-12-28 01:48:41 +0000
@@ -28,6 +28,7 @@
 #include "handler.h"
 #include "network.h"
 #include "buffer.h"
+#include "diskstream.h"
 
 namespace gnash
 {
@@ -37,11 +38,11 @@
 public:
     RTMPServer();
     ~RTMPServer();
-    bool handShakeWait();
-    bool handShakeResponse();
-    bool serverFinish();
-    bool packetSend(boost::shared_ptr<amf::Buffer> buf);
-    bool packetRead(boost::shared_ptr<amf::Buffer> buf);
+//    bool processClientHandShake(int fd, amf::Buffer &buf);
+    bool handShakeResponse(int fd, amf::Buffer &buf);
+    boost::shared_ptr<amf::Buffer> serverFinish(int fd, amf::Buffer 
&handshake1, amf::Buffer &handshake2);
+    bool packetSend(amf::Buffer &buf);
+    bool packetRead(amf::Buffer &buf);
     
     // These are handlers for the various types
     boost::shared_ptr<amf::Buffer> encodeResult(RTMPMsg::rtmp_status_e status);
@@ -50,10 +51,15 @@
     
     void dump();
   private:
+    typedef boost::char_separator<char> Sep;
+    typedef boost::tokenizer<Sep> Tok;
+    DiskStream::filetype_e  _filetype;
+    std::string                _filespec;
+    boost::uint32_t     _filesize;
 };
 
 // This is the thread for all incoming RTMP connections
-void rtmp_handler(Network::thread_params_t *args);
+bool rtmp_handler(Network::thread_params_t *args);
 
 } // end of gnash namespace
 // end of _RTMP_SERVER_H_


reply via email to

[Prev in Thread] Current Thread [Next in Thread]