bug-commoncpp
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

many-to-many thread/socket model


From: Ricardo Gameiro
Subject: many-to-many thread/socket model
Date: Wed, 01 Sep 2004 12:50:28 +0100
User-agent: Mozilla Thunderbird 0.6 (X11/20040502)

Hello,
I've been trying to implement a many-to-many thread/socket relation using common c++, and I'm having a strange problem which is showing in Solaris, though it is working on Linux. My goal is to have a list of "workable" sockets, that will be "worked on" by one arbitrary thread (executing a state machine) from a thread pool, until it is not possible for the thread, to do any additional work on the socket (due to not having all required input available, etc), making the thread available again for work on any "workable" socket. To implement this, I'm using the SocketPort/SocketService class. Still, I'm not using the "pending" method to perform work on the socket, I'm using it to add "this" socket to the list of workable sockets (workToDo), and post a Semaphore, on which every thread in the worker thread pool is waiting.

This is working... more or less :-(... because if the "service" thread polls the socket again, before the worker thread does it's job, it will queue the socket in the workable list multiple times. Well to solve this, I added a "setDetectPending( false )" to the "pending" method in the SocketPort class, and added a "setDetectPending( true )" after the worker thread finishes working on the socket. On Linux it worked, but on Solaris it is possible to send the first chunk of data to the socket (it is polled by the service thread the first time), but any subsequent data will not trigger the "pending" method on the SocketPort object.

I'm not even sure if this should be working on Linux, or if it is a Solaris related problem, or if it is a Common C++ problem. I'm suspecting that for some reason, the "service" thread is not aware of the latest "setDetectPending( true )" which was executed on another thread, though after a first quick glance on the Common C++ code, I think it should (??).

I'm attaching a simple trimmed down piece of code wich will reproduce the problem (on Solaris). It's an echo server.

Did this make any sense? Any ideias, help, suggestions would be appreciated.

TIA,
Ricardo Gameiro
#include <list>
#include <iostream>

#include <cc++/thread.h>
#include <cc++/socket.h>


using namespace std;
using namespace ost;


class Sock;


Mutex wtdM;
list<Sock *> workToDo;
Semaphore workAvail( 0 );



class Sock : public virtual SocketPort {

        friend class SockWorker;

public:

        Sock( SocketService * pService, TCPSocket &pServer ) :
                        SocketPort( pService, pServer )
        {
                tpport_t port;
                cerr << "got connection from " << getPeer( &port );
                cerr << ":" << port << endl;
                setCompletion( false );
                setTimer( 20000 ); // Hardcoded, must be changed
        }


        virtual void pending()
        {
                tpport_t port;
                cerr << "pending data from " << getPeer( &port );
                cerr << ":" << port << endl;

                setTimer( 20000 ); // Reset timer
                setDetectPending( false );

                wtdM.enter();
                workToDo.insert( workToDo.end(), this );
                wtdM.leave();
                workAvail.post();
        }


        virtual void expired()
        {
                tpport_t port;
                cerr << "connection from " << getPeer( &port );
                cerr << ":" << port << " expired" << endl;
                delete this;
        }


        virtual void disconnect()
        {
                tpport_t port;
                cerr << "connection from " << getPeer( &port );
                cerr << ":" << port << " expired" << endl;
                delete this;
        }

};



class SockWorker : public Thread {

public:

        void run()
        {
                while( true ) {

                        cerr << "child " << getId() << " is waiting for work" 
<< endl;
                        workAvail.wait();
                        cerr << "child " << getId() << " was scheduled to work" 
<< endl;

                        wtdM.enter();
                        list<Sock *>::iterator myWork = workToDo.begin();
                        workToDo.pop_front();
                        wtdM.leave();

                        Sock * mySock = *myWork;

                        try {
                                int bytesRead;
                                unsigned int totalRead = 0;
                                char ioBuffer[ 4096 ];

                                while( ( bytesRead = mySock->receive( ioBuffer, 
sizeof( ioBuffer ) ) ) > 0 ) {
                                        cerr << "child " << getId() << " got " 
<< bytesRead << " bytes" << endl;
                                        mySock->send( ioBuffer, bytesRead );
                                        totalRead += bytesRead;
                                }

                                if( totalRead == 0 ) {
                                        cerr << "child " << getId() << " got 
broken sock" << endl;
                                        delete this;
                                }
                        }
                        catch ( ... )
                        {
                                cerr << "sock write threw an exception" << endl;
                        }
                        mySock->setDetectPending( true );

                }
        }

};



class SockScheduler : public virtual Thread, public virtual TCPSocket {

public:

        SockScheduler( IPV4Address & machine, int port ) :
                        TCPSocket( machine, port ), Thread()
        {
                pollSvc = new SocketService( 0 );
                start();
        }

        void run()
        {
                while ( 1 ) {
                        try {
                                new Sock( pollSvc, * ( TCPSocket * ) this );
                        }
                        catch ( ... )
                        {
                                cerr << "sock accept failed" << endl;
                                exit();
                        }
                }

        }

private:
        SocketService * pollSvc;

};



int main( int argc, char ** argv ) {

        try {

                IPV4Address listen_address( "0.0.0.0" );

                SockScheduler * X = new SockScheduler( listen_address, 4000 );

                for( int i = 0; i < 2; i++ ) {
                        SockWorker * Y = new SockWorker;
                        Y->start();
                }

                ( ( Thread * ) X )->join();

        }
        catch(...) {
                cerr << "caught someting" << endl;
        }

}

reply via email to

[Prev in Thread] Current Thread [Next in Thread]