gnash-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnash-commit] /srv/bzr/gnash/rtmp r9843: don't use the dispatch_handler


From: rob
Subject: [Gnash-commit] /srv/bzr/gnash/rtmp r9843: don't use the dispatch_handler when in single threaded mode.
Date: Wed, 17 Dec 2008 10:35:48 -0700
User-agent: Bazaar (1.5)

------------------------------------------------------------
revno: 9843
committer: address@hidden
branch nick: rtmp
timestamp: Wed 2008-12-17 10:35:48 -0700
message:
  don't use the dispatch_handler when in single threaded mode.
  change who owns the network handle for multi-threaded mode.
modified:
  cygnal/cygnal.cpp
=== modified file 'cygnal/cygnal.cpp'
--- a/cygnal/cygnal.cpp 2008-12-15 03:48:31 +0000
+++ b/cygnal/cygnal.cpp 2008-12-17 17:35:48 +0000
@@ -69,6 +69,8 @@
 #include "buffer.h"
 #include "handler.h"
 #include "cache.h"
+#include "gettext.h"
+#include "cygnal.h"
 
 #ifdef ENABLE_NLS
 #include <locale.h>
@@ -82,6 +84,7 @@
 #include <boost/bind.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition.hpp>
+#include <boost/thread/tss.hpp>
 
 //using gnash::log_debug;
 using namespace std;
@@ -125,6 +128,9 @@
 // Admin commands are small
 const int ADMINPKTSIZE = 80;
 
+// These keep track of the number of active threads.
+ThreadCounter tids;
+
 // end of globals
 
 static LogFile& dbglogfile = LogFile::getDefaultInstance();
@@ -138,6 +144,9 @@
 static boost::condition        alldone;
 static boost::mutex    alldone_mutex;
 
+static boost::condition        noclients;
+static boost::mutex    noclients_mutex;
+
 static void
 usage()
 {
@@ -264,7 +273,7 @@
 //     struct thread_params ssl_data;
 //     rtmp_data.port = port_offset + 1935;
 //     boost::thread rtmp_port(boost::bind(&rtmp_thread, &rtmp_data));
-    // Admin handler
+    // Admin handler    
     if (admin) {
        Network::thread_params_t admin_data;
        admin_data.port = gnash::ADMIN_PORT;
@@ -479,6 +488,8 @@
     Network net;
     bool done = false;
     static int tid = 0;
+    map<int, Network *> networks;
+    
     if (netdebug) {
        net.toggleDebug(true);
     }
@@ -499,10 +510,6 @@
     
     log_debug("This system is configured for %d file descriptors to be watched 
by each thread.", nfds);
     
-    // cap the number of threads
-    int cpu = 0;
-    cpu = (cpu % ncpus);
-    
     // Get the next thread ID to hand off handling this file
     // descriptor to. If the limit for threads per cpu hasn't been
     // set or is set to 0, assume one thread per processor by
@@ -514,33 +521,29 @@
     } else {
        spawn_limit = ncpus * nfds;
     }
+    log_debug("Spawn limit is: %d", spawn_limit);
 
-    // Rotate in a range of 0 to the limit.
-    tid = (tid + 1) % (spawn_limit + 1);
-    log_debug("thread ID %d for fd #%d", tid, fd);
-       
 //    Handler *hand = new Handler;
 
     args->handler = &net;
-    if (crcfile.getThreadingFlag() == true) {
-       boost::bind(dispatch_handler, args);
-        log_debug("Multi-threaded mode for server on fd #%d", fd);
-//      log_debug("Starting handler: %x for fd #%d", (void *)hand, 
args->netfd);
-      boost::thread handler(boost::bind(&dispatch_handler, args));
-    }
     
-    // FIXME: this runs forever, we probably want a cleaner way to
+    // FIXME: this may run forever, we probably want a cleaner way to
     // test for the end of time.
     do {
        net.setPort(args->port);
        if (netdebug) {
            net.toggleDebug(true);
        }
+
+       // Rotate in a range of 0 to the limit.
+       tid = (tid + 1) % (spawn_limit + 1);
+       log_debug("thread ID %d for fd #%d", tid, fd);
+       
        // Wait for a connection to this tcp/ip from a client. If set
        // to true, this will block until a request comes in. If set
        // to single threaded mode, this will only allow one client to
        // connect at a time. This is to make it easier to debug
-       // things when you have a heavily threadd application.
+       // things when you have a heavily threaded application.
        args->netfd = net.newConnection(true, fd);
        if (args->netfd <= 0) {
            log_debug("No new network connections");
@@ -552,36 +555,52 @@
        struct pollfd fds;
        fds.fd = args->netfd;
        fds.events = POLLIN | POLLRDHUP;
-       if (args->port == (port_offset + RTMPT_PORT)) {
-//         Handler::thread_params_t *targs = new Handler::thread_params_t;
-//         targs->netfd = args->netfd;
-//         targs->handler = args->handler;
-           HTTP *http = new HTTP;
-           http->setFileFd(args->netfd);
-           args->handler = http;
-           boost::bind(http_handler, args);
-           http->addPollFD(fds, http_handler);
-//         hand->notify();
-       }
-//     if (args->port == RTMP_PORT) {
+       if (crcfile.getThreadingFlag() == true) {
+           // Each dispatch thread gets it's own argument data and
+           // network connection data.
+           log_debug("Multi-threaded mode for server on fd #%d", fd);
+           Network::thread_params_t *targs = new Network::thread_params_t;
+           Network *tnet = 0;
+           targs->netfd = args->netfd;
+           // If we haven't spawned up to our max allowed, start a
+           // new dispatch thread to handle data.
+           if (networks[tid] == 0) {
+               log_debug("Starting new dispatch thread for tid #%d", tid);
+               tids.increment();
+               tnet = new Network;
+               tnet->setFileFd(args->netfd);
+               targs->handler = tnet;
+           } else {
+               log_debug("Not starting new HTTP thread, spawned already for 
tid #%d", tid);
+               tnet = networks[tid];
+           }
+           if (args->port == (port_offset + RTMPT_PORT)) {
+               boost::bind(http_handler, args);
+               tnet->addPollFD(fds, http_handler);
+           } else if (args->port == RTMP_PORT) {
+//             tnet->addPollFD(fds, rtmp_handler);
+               log_unimpl("Not ready for RTMP data yet.");
+           }
+           if (networks[tid] == 0) {
+               networks[tid] = tnet;
+               boost::thread handler(boost::bind(&dispatch_handler, targs));
+           }
+       } else {
+           // When in single threaded mode, just call the protocol
+           // handler directly. As this is primarily only used when
+           // debugging Cygnal itself, we don't want the extra
+           // overhead of the distpatch_handler.
+           log_debug("Single threaded mode for fd #%d", args->netfd);
+           if (args->port == (port_offset + RTMPT_PORT)) {
+               http_handler(args);
+           } else if (args->port == RTMP_PORT) {
 //         hand->addPollFD(fds, rtmp_handler);
-//     }
-       if (crcfile.getThreadingFlag() == false) {     // single threaded
-         log_debug("Single threaded mode for fd #%d", args->netfd);
-         dispatch_handler(args);
-#if 0
-         if (args->port == (port_offset + RTMPT_PORT)) {
-           boost::thread handler(boost::bind(&http_handler, args));
-         }
-         if (args->port == (port_offset + RTMP_PORT)) {
-           boost::thread handler(boost::bind(&rtmp_handler, args));
-         }
-#endif
-       } else {
-           //              hand->wait();
-           //      handler->join();
-           log_debug("Debug mode, waiting for thread to complete");
+               log_unimpl("Not ready for RTMP data yet.");
+           }
        }
+       
+       log_debug("Number of active Threads is %d", tids.num_of_tids());
+       
 //     net.closeNet(args->netfd);              // this shuts down this socket 
connection
        log_debug("Restarting loop for next connection for port %d...", 
args->port);
     } while(!done);
@@ -600,8 +619,9 @@
     Network *net = reinterpret_cast<Network *>(args->handler);
 //    Network net;
     int timeout = 5000;
+    bool done = false;
 
-//    while(!hand->timetodie()) {
+    do {
        int limit = net->getPollFDSize();
        net->setTimeout(timeout);
        cerr << "LIMIT is: " << limit << endl;
@@ -615,25 +635,33 @@
                cerr << "Hits: " << hits->size() << endl;
                cerr << "Pollfds: " << net->getPollFDSize() << endl;
                for (it = hits->begin(); it != hits->end(); it++) {
+                   // We got an error, which isn't always a crises, as some 
are normal
+                   // if the client disconnects while we're talking to it.
                    if ((it->revents & POLLRDHUP) || (it->revents & POLLNVAL))  
{
                        log_debug("Revents has a POLLRDHUP or POLLNVAL set to 
%d for fd #%d",
                                  it->revents, it->fd);
-//                     hand->erasePollFD(it->fd);
-//                     net.closeNet(it->fd);
+                       net->erasePollFD(it->fd);
+                       net->closeNet(it->fd);
 //                     continue;
                        break;
                    } else {
-                     log_debug("Got something on fd #%d, 0x%x", it->fd, 
it->revents);
-                     // Call the protocol handler for this network connection
-                     net->getEntry(it->fd)(args);
+                       // We got some data, so process it
+                       log_debug("Got something on fd #%d, 0x%x", it->fd, 
it->revents);
+                       // Call the protocol handler for this network connection
+                       bool ret = net->getEntry(it->fd)(args);
+                       log_debug("Handler returned %s", (ret) ? "true" : 
"false");
+                       if (ret) {
+                           net->closeNet(it->fd);
+                           net->erasePollFD(it->fd);
+                       }
                    }
 //                 if (!crcfile.getThreadingFlag()) {
 //                     hand->die();
 //                 }
-                   if (it->fd <= net->getPollFDSize()) {
-                     net->closeNet(it->fd);
-                     net->erasePollFD(it->fd);
-                   }
+//                 if (it->fd <= net->getPollFDSize()) {
+//                   net->closeNet(it->fd);
+//                   net->erasePollFD(it->fd);
+//                 }
                }
            } catch (std::exception& e) {
                log_error("Network connection was dropped:  %s", e.what());
@@ -649,15 +677,15 @@
         } else {
            log_debug("nothing to wait for...");
            if (crcfile.getThreadingFlag()) {
-//             hand->wait();
-               log_debug("Got new network file descriptor to watch");
-           } else {
-               return;
+               done = true;
            }
-//        }
-    }
+        }
+    } while (!done);
+    
+    tids.decrement();
+    
 } // end of dispatch_handler
-       
+
 
 // local Variables:
 // mode: C++


reply via email to

[Prev in Thread] Current Thread [Next in Thread]