myserver-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[myserver-commit] [2812] Removed dependencies from the Server singleton


From: Giuseppe Scrivano
Subject: [myserver-commit] [2812] Removed dependencies from the Server singleton instance.
Date: Sun, 14 Sep 2008 15:55:17 +0000

Revision: 2812
          http://svn.sv.gnu.org/viewvc/?view=rev&root=myserver&revision=2812
Author:   gscrivano
Date:     2008-09-14 15:55:15 +0000 (Sun, 14 Sep 2008)

Log Message:
-----------
Removed dependencies from the Server singleton instance.

Modified Paths:
--------------
    trunk/myserver/include/connections_scheduler/connections_scheduler.h
    trunk/myserver/include/connections_scheduler/listen_threads.h
    trunk/myserver/src/connections_scheduler/connections_scheduler.cpp
    trunk/myserver/src/connections_scheduler/listen_threads.cpp
    trunk/myserver/src/server/server.cpp

Modified: trunk/myserver/include/connections_scheduler/connections_scheduler.h
===================================================================
--- trunk/myserver/include/connections_scheduler/connections_scheduler.h        
2008-09-13 21:16:59 UTC (rev 2811)
+++ trunk/myserver/include/connections_scheduler/connections_scheduler.h        
2008-09-14 15:55:15 UTC (rev 2812)
@@ -36,6 +36,9 @@
 
 #define PRIORITY_CLASSES 3
 
+
+class Server;
+
 class ConnectionsSchedulerVisitor
 {
 public:
@@ -53,10 +56,11 @@
     bool *terminate;
     Mutex* eventsMutex;
     ConnectionsScheduler *scheduler;
-    void reset(Socket* sock, u_short p){serverSocket = sock; port = p;}
-    ListenerArg(Socket* sock, u_short p){reset(sock, p);}
-    ListenerArg(){reset(0, 0);}
-    ListenerArg(ListenerArg* l){serverSocket = l->serverSocket; port = 
l->port;}
+    Server* server;
+    void reset(Socket* sock, u_short p, Server* ser){serverSocket = sock; port 
= p; server = ser;}
+    ListenerArg(Socket* sock, u_short p, Server* server){reset(sock, p, 
server);}
+    ListenerArg(){reset(NULL, NULL, NULL);}
+    ListenerArg(ListenerArg* l){serverSocket = l->serverSocket; port = 
l->port; server = l->server;}
   };
 
   struct DispatcherArg
@@ -66,9 +70,11 @@
     Mutex* mutex;
     int fd[2];
     event loopEvent;
+    Server* server;
+    ConnectionsScheduler* scheduler;
   };
 
-  ConnectionsScheduler();
+  ConnectionsScheduler(Server* server = NULL);
   ~ConnectionsScheduler();
 
   void addNewReadyConnection(ConnectionPtr);
@@ -83,7 +89,7 @@
   void initialize();
   void listener(struct ListenerArg* );
   void removeListener(struct ListenerArg*);
-  int getConnectionsNumber();
+  u_long getConnectionsNumber();
   void removeConnection(ConnectionPtr connection);
   void terminateConnections();
   void getConnections(list<ConnectionPtr> &out);
@@ -94,7 +100,10 @@
 
   u_long getNumTotalConnections();
 
+  void newData(short event, SocketHandle handle);
+
 private:
+  Server* server;
   void addWaitingConnectionImpl(ConnectionPtr, int lock);
   void addReadyConnectionImpl(ConnectionPtr);
   u_long nTotalConnections;

Modified: trunk/myserver/include/connections_scheduler/listen_threads.h
===================================================================
--- trunk/myserver/include/connections_scheduler/listen_threads.h       
2008-09-13 21:16:59 UTC (rev 2811)
+++ trunk/myserver/include/connections_scheduler/listen_threads.h       
2008-09-14 15:55:15 UTC (rev 2812)
@@ -16,6 +16,7 @@
 */
 #ifndef LISTEN_THREADS_H
 #define LISTEN_THREADS_H
+
 #include "stdafx.h"
 #include <include/base/xml/xml_parser.h>
 #include <include/base/sync/mutex.h>
@@ -35,7 +36,8 @@
        void commitFastReboot();
        void beginFastReboot();
        void rollbackFastReboot();
-       ListenThreads();
+       ListenThreads(ConnectionsScheduler*, Server*);
+
 private:
        struct SocketInformation
        {
@@ -56,6 +58,9 @@
        XmlParser *languageParser;
        int createServerAndListener(u_short port);
        void registerListener(SocketInformation*);
+  
+  Server* server;
+  ConnectionsScheduler* scheduler;
 };
 
 #endif

Modified: trunk/myserver/src/connections_scheduler/connections_scheduler.cpp
===================================================================
--- trunk/myserver/src/connections_scheduler/connections_scheduler.cpp  
2008-09-13 21:16:59 UTC (rev 2811)
+++ trunk/myserver/src/connections_scheduler/connections_scheduler.cpp  
2008-09-14 15:55:15 UTC (rev 2812)
@@ -114,6 +114,9 @@
 {
   ConnectionsScheduler::DispatcherArg *da = 
(ConnectionsScheduler::DispatcherArg*)p;
 
+  if(da == NULL)
+    return NULL;
+
   da->mutex->lock();
   if(!da->terminated)
   {
@@ -143,24 +146,35 @@
   da->terminated = true;
   da->mutex->unlock();
 
-  return 0;
+  return NULL;
 }
 
 static void newDataHandler(int fd, short event, void *arg)
 {
-  ConnectionPtr connection = static_cast<ConnectionPtr>(arg);
+  ConnectionsScheduler* scheduler = (ConnectionsScheduler*) arg;
 
+  if(scheduler)
+    scheduler->newData(event, (SocketHandle)fd);
+}
+
+
+void ConnectionsScheduler::newData(short event, SocketHandle handle)
+{
+  ConnectionPtr connection = connections.get(handle);
+
+  if(connection == NULL || server == NULL)
+    return;
+
   if(event == EV_TIMEOUT)
   {
-    Server::getInstance()->deleteConnection(connection, 0);
+    server->deleteConnection(connection, 0);
   }
   else if(event == EV_READ)
   {
-    
Server::getInstance()->getConnectionsScheduler()->addReadyConnection(connection);
+    server->getConnectionsScheduler()->addReadyConnection(connection);
   }
 }
 
-
 static void eventLoopHandler(int fd, short event, void *arg)
 {
   ConnectionsScheduler::DispatcherArg *da = 
(ConnectionsScheduler::DispatcherArg*)arg;
@@ -190,7 +204,7 @@
         sock.recv((char*)&c, sizeof(ConnectionPtr), 0);
         sock.recv((char*)&tv, sizeof(timeval), 0);
 
-        event_once(handle, EV_READ | EV_TIMEOUT, newDataHandler, c, &tv);
+        event_once(handle, EV_READ | EV_TIMEOUT, newDataHandler, 
da->scheduler, &tv);
       }
       /* Handle other cmd without do anything else.  */
     }
@@ -216,10 +230,12 @@
 
     asockInLen = sizeof(sockaddr_in);
     asock = s->serverSocket->accept(&asockIn, &asockInLen);
-    if(asock.getHandle() != 0 &&
+
+    if(s->server &&
+       asock.getHandle() != 0 &&
        asock.getHandle() != (SocketHandle)INVALID_SOCKET)
     {
-      Server::getInstance()->addConnection(asock, &asockIn);
+      s->server->addConnection(asock, &asockIn);
     }
 
     event_add (&(s->ev), &tv);
@@ -243,6 +259,7 @@
 
   arg->terminate = &dispatcherArg.terminate;
   arg->scheduler = this;
+  arg->server = server;
   arg->eventsMutex = &eventsMutex;
   la->serverSocket->setNonBlocking(1);
 
@@ -272,7 +289,7 @@
 /*!
  *C'tor.
  */
-ConnectionsScheduler::ConnectionsScheduler()
+ConnectionsScheduler::ConnectionsScheduler(Server* server)
 {
   readyMutex.init();
   eventsMutex.init();
@@ -283,6 +300,7 @@
   currentPriorityDone = 0;
   nTotalConnections = 0;
   ready = new queue<ConnectionPtr>[PRIORITY_CLASSES];
+  this->server = server;
 }
 
 /*!
@@ -334,6 +352,8 @@
 
   dispatcherArg.terminated = true;
   dispatcherArg.mutex = &eventsMutex;
+  dispatcherArg.server = server;
+  dispatcherArg.scheduler = this;
 
 #ifdef WIN32
 #define LOCAL_SOCKETPAIR_AF AF_INET
@@ -344,11 +364,14 @@
       dispatcherArg.fd);
   if (err == -1)
   {
-    Server::getInstance()->logLockAccess();
-    Server::getInstance()->logPreparePrintError();
-    Server::getInstance()->logWriteln("Error initializing socket pair.");
-    Server::getInstance()->logEndPrintError();
-    Server::getInstance()->logUnlockAccess();
+    if(server)
+    {
+      server->logLockAccess();
+      server->logPreparePrintError();
+      server->logWriteln("Error initializing socket pair.");
+      server->logEndPrintError();
+      server->logUnlockAccess();
+    }
     return;
   }
 
@@ -361,15 +384,18 @@
   event_add(&(dispatcherArg.loopEvent), NULL);
 
   if(Thread::create(&dispatchedThreadId, dispatcher, &dispatcherArg))
-   {
-     Server::getInstance()->logLockAccess();
-     Server::getInstance()->logPreparePrintError();
-     Server::getInstance()->logWriteln("Error initializing dispatcher 
thread.");
-     Server::getInstance()->logEndPrintError();
-     Server::getInstance()->logUnlockAccess();
-     dispatchedThreadId = 0;
-   }
-
+  {
+    if(server)
+    {
+      server->logLockAccess();
+      server->logPreparePrintError();
+      server->logWriteln("Error initializing dispatcher thread.");
+      server->logEndPrintError();
+      server->logUnlockAccess();
+    }
+    dispatchedThreadId = 0;
+  }
+  
   releasing = false;
 }
 
@@ -423,7 +449,8 @@
   ready[priority].push(c);
   readyMutex.unlock();
 
-  Server::getInstance()->checkThreadsNumber();
+  if(server)
+    server->checkThreadsNumber();
 
   readySemaphore->unlock();
 }
@@ -450,9 +477,13 @@
 void ConnectionsScheduler::addWaitingConnectionImpl(ConnectionPtr c, int lock)
 {
   static timeval tv = {10, 0};
-  SocketHandle handle = c->socket->getHandle();
+  SocketHandle handle = c->socket ? c->socket->getHandle() : NULL;
 
-  tv.tv_sec = Server::getInstance()->getTimeout() / 1000;
+  if(server)
+    tv.tv_sec = server->getTimeout() / 1000;
+  else
+    tv.tv_sec = 30000;
+
   c->setScheduled(0);
 
   connectionsMutex.lock();
@@ -479,7 +510,7 @@
   }
   else
   {
-    event_once(handle, EV_READ | EV_TIMEOUT, newDataHandler, c, &tv);
+    event_once(handle, EV_READ | EV_TIMEOUT, newDataHandler, this, &tv);
   }
 }
 
@@ -531,10 +562,14 @@
  */
 void ConnectionsScheduler::release()
 {
+  u_long max = 100;
   releasing = true;
   dispatcherArg.terminate = true;
 
-  for(u_long i = 0; i < Server::getInstance()->getNumThreads()*10; i++)
+  if(server)
+    max = server->getNumThreads() * 10;
+
+  for(u_long i = 0; i < max; i++)
   {
     readySemaphore->unlock();
   }
@@ -597,7 +632,7 @@
 /*!
  *Get the alive connections number.
  */
-int ConnectionsScheduler::getConnectionsNumber()
+u_long ConnectionsScheduler::getConnectionsNumber()
 {
   return connections.size();
 }
@@ -608,7 +643,8 @@
 void ConnectionsScheduler::removeConnection(ConnectionPtr connection)
 {
   connectionsMutex.lock();
-  connections.remove(connection->socket->getHandle());
+  if(connection->socket)
+    connections.remove(connection->socket->getHandle());
   connectionsMutex.unlock();
 }
 

Modified: trunk/myserver/src/connections_scheduler/listen_threads.cpp
===================================================================
--- trunk/myserver/src/connections_scheduler/listen_threads.cpp 2008-09-13 
21:16:59 UTC (rev 2811)
+++ trunk/myserver/src/connections_scheduler/listen_threads.cpp 2008-09-14 
15:55:15 UTC (rev 2812)
@@ -49,12 +49,14 @@
 using namespace std;
 
 /*!
- *Default c'tor.
+ *c'tor.
  */
-ListenThreads::ListenThreads()
+ListenThreads::ListenThreads(ConnectionsScheduler* scheduler, Server* server)
 {
   fastRebooting = false;
   committingFastReboot = false;
+  this->scheduler = scheduler;
+  this->server = server;
 }
 
 /*!
@@ -66,7 +68,6 @@
   int optvalReuseAddr = 1;
   ostringstream portBuff;
   string listenPortMsg;
-  Server* server = Server::getInstance();
 
   if(fastRebooting)
   {
@@ -302,14 +303,14 @@
 
     if(si->ipv4)
     {
-      si->laIpv4.reset(si->ipv4, si->port);
-      
Server::getInstance()->getConnectionsScheduler()->listener(&(si->laIpv4));
+      si->laIpv4.reset(si->ipv4, si->port, server);
+      scheduler->listener(&(si->laIpv4));
     }
 
     if(si->ipv6)
     {
-      si->laIpv6.reset(si->ipv6, si->port);
-      
Server::getInstance()->getConnectionsScheduler()->listener(&(si->laIpv6));
+      si->laIpv6.reset(si->ipv6, si->port, server);
+      scheduler->listener(&(si->laIpv6));
     }
 }
 

Modified: trunk/myserver/src/server/server.cpp
===================================================================
--- trunk/myserver/src/server/server.cpp        2008-09-13 21:16:59 UTC (rev 
2811)
+++ trunk/myserver/src/server/server.cpp        2008-09-14 15:55:15 UTC (rev 
2812)
@@ -75,7 +75,8 @@
  */
 Server* Server::instance = 0;
 
-Server::Server()
+Server::Server() : connectionsScheduler(this), 
+                   listenThreads(&connectionsScheduler, this)
 {
   toReboot = 0;
   autoRebootEnabled = 1;






reply via email to

[Prev in Thread] Current Thread [Next in Thread]