[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9829: move all polling methods to Ne
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9829: move all polling methods to Network class where they belong. |
Date: |
Sun, 14 Dec 2008 20:44:46 -0700 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9829
committer: address@hidden
branch nick: rtmp
timestamp: Sun 2008-12-14 20:44:46 -0700
message:
move all polling methods to Network class where they belong.
modified:
libnet/handler.cpp
libnet/handler.h
libnet/network.cpp
libnet/network.h
=== modified file 'libnet/handler.cpp'
--- a/libnet/handler.cpp 2008-11-22 05:11:00 +0000
+++ b/libnet/handler.cpp 2008-12-15 03:44:46 +0000
@@ -61,7 +61,7 @@
// GNASH_REPORT_FUNCTION;
// closeConnection();
_die = true;
- notifyout();
+// notifyout();
notifyin();
}
@@ -69,15 +69,17 @@
Handler::push(boost::shared_ptr<amf::Buffer> data, fifo_e direction)
{
// GNASH_REPORT_FUNCTION;
+#if 0
if (direction == Handler::OUTGOING) {
- _outgoing.push(data);
+ _outgoing.push(data);
return true;
}
+#endif
if (direction == Handler::INCOMING) {
- _incoming.push(data);
+ _incoming.push(data);
return true;
}
-
+
return false;
}
@@ -98,12 +100,14 @@
// GNASH_REPORT_FUNCTION;
boost::shared_ptr<amf::Buffer> buf;
+#if 0
if (direction == Handler::OUTGOING) {
if (_outgoing.size()) {
buf = _outgoing.pop();
return buf;
}
}
+#endif
if (direction == Handler::INCOMING) {
if (_incoming.size()) {
buf = _incoming.pop();
@@ -119,11 +123,13 @@
Handler::peek(fifo_e direction)
{
// GNASH_REPORT_FUNCTION;
+#if 0
if (direction == Handler::OUTGOING) {
if (_outgoing.size()) {
return _outgoing.peek();
}
}
+#endif
if (direction == Handler::INCOMING) {
if (_incoming.size()) {
return _incoming.peek();
@@ -137,9 +143,11 @@
Handler::size(fifo_e direction)
{
// GNASH_REPORT_FUNCTION;
+#if 0
if (direction == Handler::OUTGOING) {
return _outgoing.size();
}
+#endif
if (direction == Handler::INCOMING) {
return _incoming.size();
}
@@ -152,99 +160,23 @@
Handler::clear(fifo_e direction)
{
// GNASH_REPORT_FUNCTION;
+#if 0
if (direction == Handler::OUTGOING) {
_outgoing.clear();
}
+#endif
if (direction == Handler::INCOMING) {
_incoming.clear();
}
}
-void
-Handler::addPollFD(struct pollfd &fd, Handler::entry_t *func)
-{
-// GNASH_REPORT_FUNCTION;
- boost::mutex::scoped_lock lock(_poll_mutex);
- _handlers[fd.fd] = func;
- _pollfds.push_back(fd);
- notify();
-}
-
-void
-Handler::addPollFD(struct pollfd &fd)
-{
-// GNASH_REPORT_FUNCTION;
- boost::mutex::scoped_lock lock(_poll_mutex);
- _pollfds.push_back(fd);
- notify();
-}
-
-struct pollfd
-&Handler::getPollFD(int index)
-{
-// GNASH_REPORT_FUNCTION;
- boost::mutex::scoped_lock lock(_poll_mutex);
- return _pollfds[index];
-}
-
-struct pollfd *
-Handler::getPollFDPtr()
-{
-// GNASH_REPORT_FUNCTION;
- boost::mutex::scoped_lock lock(_poll_mutex);
- return &_pollfds[0];
-};
-
-void
-Handler::erasePollFD(int fd)
-{
-// GNASH_REPORT_FUNCTION;
- boost::mutex::scoped_lock lock(_poll_mutex);
- if (_pollfds.size() > 0) {
- vector<struct pollfd>::iterator it;
- for (it=_pollfds.begin(); it<_pollfds.end(); it++) {
- if ((*it).fd == fd) {
- _pollfds.erase(it);
- }
- }
- }
-}
-
-void
-Handler::erasePollFD(vector<struct pollfd>::iterator &itt)
-{
-// GNASH_REPORT_FUNCTION;
- boost::mutex::scoped_lock lock(_poll_mutex);
- if (_pollfds.size() == 1) {
- _pollfds.clear();
- } else {
- _pollfds.erase(itt);
- }
-}
-
-void
-Handler::addEntry(int fd, Handler::entry_t *func)
-{
-// GNASH_REPORT_FUNCTION;
- boost::mutex::scoped_lock lock(_poll_mutex);
- _handlers[fd] = func;
-}
-
-Handler::entry_t *
-Handler::getEntry(int fd)
-{
-// GNASH_REPORT_FUNCTION;
- boost::mutex::scoped_lock lock(_poll_mutex);
- return _handlers[fd];
-};
-
// Dump internal data.
void
Handler::dump()
{
// GNASH_REPORT_FUNCTION;
_incoming.dump();
- _outgoing.dump();
+// _outgoing.dump();
}
#if 0
@@ -288,13 +220,13 @@
// start the two thread handlers for the queues
bool
-Handler::start(thread_params_t *args)
+Handler::start(Network::thread_params_t *args)
{
GNASH_REPORT_FUNCTION;
// Handler *hand = reinterpret_cast<Handler *>(args->handle);
_incoming.setName("Incoming");
- _outgoing.setName("Outgoing");
+// _outgoing.setName("Outgoing");
log_debug(_("Starting Handlers for port %d, tid %ld"),
args->port, get_thread_id());
@@ -336,7 +268,7 @@
extern "C" {
void
-netin_handler(Handler::thread_params_t *args)
+netin_handler(Network::thread_params_t *args)
{
GNASH_REPORT_FUNCTION;
@@ -385,7 +317,7 @@
#if 0
void
-netout_handler(Handler::thread_params_t *args)
+netout_handler(Network::thread_params_t *args)
{
// GNASH_REPORT_FUNCTION;
int ret = 0;
=== modified file 'libnet/handler.h'
--- a/libnet/handler.h 2008-11-22 05:11:00 +0000
+++ b/libnet/handler.h 2008-12-15 03:44:46 +0000
@@ -65,16 +65,7 @@
INTERVAL,
QUIT,
} admin_cmd_e;
- // This is used to pass parameters to a thread using boost::bind
- typedef struct {
- int netfd;
- int port;
- void *handler;
- std::string filespec;
- } thread_params_t ;
- typedef void entry_t (thread_params_t *);
-
// Specify which queue should be used
typedef enum { INCOMING, OUTGOING } fifo_e;
@@ -89,63 +80,65 @@
{ return _incoming.push(data, nbytes); };
bool pushin(boost::shared_ptr<amf::Buffer> data)
{ return _incoming.push(data); };
-
+#if 0
// Push bytes on the incoming FIFO, which must be specified
bool pushout(gnash::Network::byte_t *data, int nbytes)
{ return _outgoing.push(data, nbytes); };
bool pushout(boost::shared_ptr<amf::Buffer> data)
{ return _outgoing.push(data); };
-
+#endif
+
// Pop the first date element off the incoming FIFO
boost::shared_ptr<amf::Buffer> pop() { return _incoming.pop(); };
boost::shared_ptr<amf::Buffer> pop(fifo_e direction);
boost::shared_ptr<amf::Buffer> popin()
{ return _incoming.pop(); };
+#if 0
// Pop the first date element off the outgoing FIFO
boost::shared_ptr<amf::Buffer> popout()
{ return _outgoing.pop(); };
-
+#endif
// Peek at the first data element without removing it
boost::shared_ptr<amf::Buffer> peek() { return _incoming.peek(); };
boost::shared_ptr<amf::Buffer> peek(fifo_e direction);
boost::shared_ptr<amf::Buffer> peekin()
{ return _incoming.peek(); };
// Pop the first date element off the outgoing FIFO
- boost::shared_ptr<amf::Buffer> peekout()
- { return _outgoing.peek(); };
+// boost::shared_ptr<amf::Buffer> peekout() { return _outgoing.peek(); };
// Removes all the buffers from the queues
boost::shared_ptr<amf::Buffer> merge(boost::shared_ptr<amf::Buffer> begin)
{ return _incoming.merge(begin); };
boost::shared_ptr<amf::Buffer> mergein(boost::shared_ptr<amf::Buffer>
begin) { return _incoming.merge(begin); };
- boost::shared_ptr<amf::Buffer> mergeout(boost::shared_ptr<amf::Buffer>
begin) { return _outgoing.merge(begin); };
+// boost::shared_ptr<amf::Buffer> mergeout(boost::shared_ptr<amf::Buffer>
begin) { return _outgoing.merge(begin); };
// Removes all the buffers from the queues
void clear() { _incoming.clear(); };
void clear(fifo_e direction);
void clearin() { _incoming.clear(); };
+#if 0
void clearout() { _outgoing.clear(); };
void clearall() { _outgoing.clear(); _incoming.clear(); };
-
+#endif
// Return the size of the queues, default to the incoming queue
size_t size(fifo_e direction);
size_t size() { return _incoming.size(); };
size_t insize() { return _incoming.size(); };
- size_t outsize() { return _outgoing.size(); };
+// size_t outsize() { return _outgoing.size(); };
// Notify the other thread a message is in the que
void notify() { _incoming.notify(); };
void notifyin() { _incoming.notify(); };
- void notifyout() { _outgoing.notify(); };
+ // void notifyout() { _outgoing.notify(); };
// Wait for a message from the other thread
void wait() { _incoming.wait(); };
void waitin() { _incoming.wait(); };
- void waitout() { _outgoing.wait(); };
+// void waitout() { _outgoing.wait(); };
// size_t readPacket(int fd);
// start the two thread handlers for the queues
- bool DSOEXPORT start(thread_params_t *args);
+ bool DSOEXPORT start(Network::thread_params_t *args);
#if 0
/// \brief Write a Buffer the network connection.
@@ -174,50 +167,24 @@
CQue::que_stats_t *statsin() { return _incoming.stats(); };
CQue::que_stats_t *statsout() { return _outgoing.stats(); };
#endif
- void die() { _die = true; _outgoing.notify(); };
+ void die() { _die = true; };
+// void die() { _die = true; _outgoing.notify(); };
void resetDie() { _die = false; };
bool timetodie() { return _die; };
- // The pollfd are an array of data structures used by the poll()
- // system call. We have to keep track of these as network
- // connections get added and disconnected.
- void addPollFD(struct pollfd &fd, entry_t *ptr);
- void addPollFD(struct pollfd &fd);
- void erasePollFD(int fd);
- void erasePollFD(std::vector<struct pollfd>::iterator &itt);
- struct pollfd &getPollFD(int fd);
- struct pollfd *getPollFDPtr();
- size_t getPollFDSize() { return _pollfds.size(); };
- void clearPollFD() { _pollfds.clear(); };
-
- // The entry point is an function pointer, which is the event
- // handler when there is data on a file descriptor.
- void addEntry(int fd, entry_t *func);
- entry_t *getEntry(int fd);
-
-// void executePollFD(int index) { _handler[index](); ];
-
private:
bool _die;
int _netfd;
CQue _incoming;
- CQue _outgoing;
- /// \var Handler::_handlers
- /// Keep a list of all active network connections
- std::map<int, entry_t *> _handlers;
-#ifdef HAVE_POLL
- // This is the mutex that controls access to the que.
- std::vector<struct pollfd> _pollfds;
- boost::mutex _poll_mutex;
-#endif
+ std::map<int, CQue> _outgoing;
};
// This is the thread for all incoming network connections, which
// has to be in C.
extern "C" {
- void netin_handler(Handler::thread_params_t *args);
- void netout_handler(Handler::thread_params_t *args);
- void start_handler(Handler::thread_params_t *args);
+ void netin_handler(Network::thread_params_t *args);
+ void netout_handler(Network::thread_params_t *args);
+ void start_handler(Network::thread_params_t *args);
}
} // end of gnash namespace
@@ -226,5 +193,6 @@
// local Variables:
// mode: C++
+// tab-width: 8
// indent-tabs-mode: t
// End:
=== modified file 'libnet/network.cpp'
--- a/libnet/network.cpp 2008-12-01 15:44:22 +0000
+++ b/libnet/network.cpp 2008-12-15 03:44:46 +0000
@@ -263,7 +263,7 @@
int
Network::newConnection(bool block, int fd)
{
- GNASH_REPORT_FUNCTION;
+// GNASH_REPORT_FUNCTION;
struct sockaddr newfsin;
socklen_t alen;
@@ -1105,6 +1105,89 @@
return ret;
}
+void
+Network::addPollFD(struct pollfd &fd, Network::entry_t *func)
+{
+ GNASH_REPORT_FUNCTION;
+
+ log_debug("%s: adding fd #%d to pollfds", __PRETTY_FUNCTION__, fd.fd);
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ _handlers[fd.fd] = func;
+ _pollfds.push_back(fd);
+// notify();
+}
+
+void
+Network::addPollFD(struct pollfd &fd)
+{
+ GNASH_REPORT_FUNCTION;
+ log_debug("%s: adding fd #%d to pollfds", __PRETTY_FUNCTION__, fd.fd);
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ _pollfds.push_back(fd);
+// notify();
+}
+
+struct pollfd
+&Network::getPollFD(int index)
+{
+// GNASH_REPORT_FUNCTION;
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ return _pollfds[index];
+}
+
+struct pollfd *
+Network::getPollFDPtr()
+{
+// GNASH_REPORT_FUNCTION;
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ return &_pollfds[0];
+};
+
+void
+Network::erasePollFD(int fd)
+{
+// GNASH_REPORT_FUNCTION;
+ log_debug("%s: erasing fd #%d from pollfds", __PRETTY_FUNCTION__, fd);
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ if (_pollfds.size() > 0) {
+ vector<struct pollfd>::iterator it;
+ for (it=_pollfds.begin(); it<_pollfds.end(); it++) {
+ if ((*it).fd == fd) {
+ _pollfds.erase(it);
+ break;
+ }
+ }
+ }
+}
+
+void
+Network::erasePollFD(vector<struct pollfd>::iterator &itt)
+{
+ GNASH_REPORT_FUNCTION;
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ if (_pollfds.size() == 1) {
+ _pollfds.clear();
+ } else {
+ _pollfds.erase(itt);
+ }
+}
+
+void
+Network::addEntry(int fd, Network::entry_t *func)
+{
+// GNASH_REPORT_FUNCTION;
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ _handlers[fd] = func;
+}
+
+Network::entry_t *
+Network::getEntry(int fd)
+{
+// GNASH_REPORT_FUNCTION;
+ boost::mutex::scoped_lock lock(_poll_mutex);
+ return _handlers[fd];
+}
+
boost::shared_ptr<std::vector<struct pollfd> >
Network::waitForNetData(int limit, struct pollfd *fds)
{
@@ -1144,49 +1227,49 @@
while (ret--) {
for (int i = 0; i<limit; i++) {
// If we get this event, the other end of the connection has been
shut down
-#if 0
+#if 1
if (fds[i].revents &POLLPRI ) {
- log_debug("%s: Revents has aPOLLPRI set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ log_debug("%s: Revents has a POLLPRI set 0x%x for fd #%d",
+ __FUNCTION__, fds[i].revents, fds[i].fd);
}
if (fds[i].revents & POLLRDNORM) {
log_debug("%s: Revents has a POLLRDNORM set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ __FUNCTION__, fds[i].revents, fds[i].fd);
}
if (fds[i].revents & POLLHUP) {
log_debug("%s: Revents has a POLLHUP set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ __FUNCTION__, fds[i].revents, fds[i].fd);
}
if (fds[i].revents & POLLERR) {
log_debug("%s: Revents has a POLLERR set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ __FUNCTION__, fds[i].revents, fds[i].fd);
}
if (fds[i].revents & POLLHUP) {
log_debug("%s: Revents has a POLLHUP set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ __FUNCTION__, fds[i].revents, fds[i].fd);
}
if (fds[i].revents & POLLNVAL) {
log_debug("%s: Revents has a POLLNVAL set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ __FUNCTION__, fds[i].revents, fds[i].fd);
// throw GnashException("Polling an invalid file descritor");
}
if (fds[i].revents & POLLIN) {
log_debug("%s: Revents has a POLLIN set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ __FUNCTION__, fds[i].revents, fds[i].fd);
}
if (fds[i].revents & POLLMSG) {
log_debug("%s: Revents has a POLLMSG set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ __FUNCTION__, fds[i].revents, fds[i].fd);
}
if (fds[i].revents & POLLREMOVE) {
log_debug("%s: Revents has a POLLREMOVE set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ __FUNCTION__, fds[i].revents, fds[i].fd);
}
if (fds[i].revents & POLLRDHUP) {
log_debug("%s: Revents has a POLLRDHUP set 0x%x for fd #%d",
- __FUNCTION__, fds[i].revents, i);
+ __FUNCTION__, fds[i].revents, fds[i].fd);
// throw GnashException("Connection dropped from client side.");
}
#endif
=== modified file 'libnet/network.h'
--- a/libnet/network.h 2008-12-01 01:17:41 +0000
+++ b/libnet/network.h 2008-12-15 03:44:46 +0000
@@ -22,6 +22,7 @@
#include "gnashconfig.h"
#endif
+#include <boost/thread/mutex.hpp>
#if !defined(HAVE_WINSOCK_H) || defined(__OS2__)
# include <sys/types.h>
# include <netinet/in.h>
@@ -87,7 +88,15 @@
/// side of a network connection.
class DSOEXPORT Network {
public:
+ // This is used to pass parameters to a thread using boost::bind
+ typedef struct {
+ int netfd;
+ int port;
+ void *handler;
+ std::string filespec;
+ } thread_params_t;
typedef boost::uint8_t byte_t;
+ typedef void entry_t (thread_params_t *);
Network();
~Network();
@@ -224,6 +233,25 @@
Network &operator = (Network &net);
+ // The pollfd are an array of data structures used by the poll()
+ // system call. We have to keep track of these as network
+ // connections get added and disconnected.
+ void addPollFD(struct pollfd &fd, entry_t *ptr);
+ void addPollFD(struct pollfd &fd);
+ void erasePollFD(int fd);
+ void erasePollFD(std::vector<struct pollfd>::iterator &itt);
+ struct pollfd &getPollFD(int fd);
+ struct pollfd *getPollFDPtr();
+ size_t getPollFDSize() { return _pollfds.size(); };
+ void clearPollFD() { _pollfds.clear(); };
+
+ // The entry point is an function pointer, which is the event
+ // handler when there is data on a file descriptor.
+ void addEntry(int fd, entry_t *func);
+ entry_t *getEntry(int fd);
+
+// void executePollFD(int index) { _handler[index](); ];
+
protected:
in_addr_t _ipaddr;
int _sockfd; // the file descriptor used for reading and
writing
@@ -237,6 +265,14 @@
bool _connected;
bool _debug;
int _timeout;
+ /// \var Handler::_handlers
+ /// Keep a list of all active network connections
+ std::map<int, entry_t *> _handlers;
+#ifdef HAVE_POLL
+ std::vector<struct pollfd> _pollfds;
+ // This is the mutex that controls access to the que.
+ boost::mutex _poll_mutex;
+#endif
};
} // end of gnash namespace
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9829: move all polling methods to Network class where they belong.,
rob <=