[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9974: improved main message processi
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9974: improved main message processing loop. |
Date: |
Wed, 04 Feb 2009 17:43:21 -0700 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9974
committer: address@hidden
branch nick: rtmp
timestamp: Wed 2009-02-04 17:43:21 -0700
message:
improved main message processing loop.
modified:
cygnal/rtmp_server.cpp
=== modified file 'cygnal/rtmp_server.cpp'
--- a/cygnal/rtmp_server.cpp 2009-02-04 21:12:35 +0000
+++ b/cygnal/rtmp_server.cpp 2009-02-05 00:43:21 +0000
@@ -366,7 +366,7 @@
boost::shared_ptr<Buffer>
RTMPServer::encodeResult(RTMPMsg::rtmp_status_e status)
{
- GNASH_REPORT_FUNCTION;
+// GNASH_REPORT_FUNCTION;
// Buffer *buf = new Buffer;
// boost::uint8_t *ptr = buf->reference();
@@ -715,9 +715,12 @@
// Adjust the timeout
rtmp->setTimeout(10);
- boost::shared_ptr<amf::Buffer> pkt;
+ boost::shared_ptr<amf::Buffer> pkt;
boost::shared_ptr<amf::Element> tcurl;
boost::shared_ptr<amf::Element> swfurl;
+ boost::shared_ptr<amf::Buffer> response;
+
+ RTMP::rtmp_headersize_e response_head_size = RTMP::HEADER_12;
// This handler is called everytime there is RTMP data on a socket to
process the
// messsage. Unlike HTTP, RTMP always uses persistant network connections,
so we
@@ -755,6 +758,7 @@
initialize = false;
}
+#if 0
// The very first message after the handshake is the Invoke call of
// NetConnection::connect().
boost::shared_ptr<RTMP::rtmp_head_t> head = rtmp->decodeHeader(*pkt);
@@ -786,6 +790,7 @@
}
}
}
+#endif
// Send a ping to reset the new stream
boost::shared_ptr<amf::Buffer> ping_reset =
rtmp->encodePing(RTMP::PING_RESET, 0);
@@ -795,7 +800,8 @@
} else {
log_error("Couldn't send Ping to client!");
}
-
+
+#if 0
// send a response to the NetConnection::connect() request
// boost::shared_ptr<amf::Buffer> response =
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
boost::shared_ptr<amf::Buffer> response;
@@ -804,7 +810,11 @@
log_error("No body found in message!");
return false;
} else {
- response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+ if (0) {
+ response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_FAILED);
+ } else {
+ response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+ }
// The initial packet from the client is always a NetConnection
object
// invoking the 'connect' method.
if (body->getMethodName() == "connect") {
@@ -824,16 +834,18 @@
} else {
log_error("Couldn't send NetConnection::connect() response to
client!");
}
-
+#endif
} else {
// Read the handshake bytes sent by the client when requesting
// a connection.
+#if 0
pkt = rtmp->recvMsg(args->netfd);
// See if we have data in the handshake, we should have 1537 bytes
if (pkt->allocated() == 0) {
log_error("failed to read RTMP data from the client.");
return false;
}
+#endif
}
// See if this is a Red5 style echo test.
@@ -848,7 +860,6 @@
}
}
}
-
// Keep track of the network statistics
// Statistics st;
// st.setFileType(NetStats::RTMP);
@@ -867,23 +878,95 @@
// Adjust the timeout
rtmp->setTimeout(30);
+// boost::shared_ptr<amf::Buffer> buf;
do {
- boost::shared_ptr<amf::Buffer> buf = rtmp->recvMsg(args->netfd);
- if (buf) {
- if (buf->allocated()) {
- boost::uint8_t *ptr = buf->reference();
- if (ptr == 0) {
- log_debug("Que empty, net connection dropped for fd #%d",
args->netfd);
- return false;
- }
- boost::shared_ptr<RTMP::rtmp_head_t> rthead =
rtmp->decodeHeader(ptr);
- ptr += rthead->head_size; // skip past the header
+ // If there is no data left from the previous chunk, process that before
+ // reading more data.
+ if (pkt != 0) {
+ log_debug("data left from previous packet");
+ } else {
+ pkt = rtmp->recvMsg(args->netfd);
+ }
+
+ if (pkt != 0) {
+ boost::uint8_t *tmpptr = 0;
+ if (pkt->allocated()) {
+ boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
+ boost::shared_ptr<RTMP::rtmp_head_t> qhead;
+ cerr << "FIXME1 Que size is: " << que->size() << endl;
+ for (size_t i=0; i<que->size(); i++) {
+ boost::shared_ptr<amf::Buffer> bufptr = que->at(i)->pop();
+// que->at(i)->dump();
+ if (bufptr) {
+ bufptr->dump();
+ qhead = rtmp->decodeHeader(bufptr->reference());
+ log_debug("Message for channel #%d", qhead->channel);
+// tmpptr = bufptr->reference();
+ tmpptr = bufptr->reference() + qhead->head_size;
+ if (qhead->channel == RTMP_SYSTEM_CHANNEL) {
+ boost::shared_ptr<RTMP::rtmp_ping_t> ping =
rtmp->decodePing(tmpptr);
+ log_debug("Processed Ping message from client, type
%d", ping->type);
+ } else {
+ body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
+ if (body) {
+ body->setChannel(qhead->channel);
+ // Invoke the NetConnection::connect() method
+ if (body->getMethodName() == "connect") {
+ response_head_size = RTMP::HEADER_12;
+ tcurl = body->findProperty("tcUrl");
+ if (tcurl) {
+ log_debug("Client request for remote
file is: %s", tcurl->to_string());
+ }
+ swfurl = body->findProperty("swfUrl");
+ if (swfurl) {
+ log_debug("SWF filename making request
is: %s", swfurl->to_string());
+ }
+ response =
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+
+ // Send a ping to reset the new stream
+ boost::shared_ptr<amf::Buffer> ping_reset =
rtmp->encodePing(RTMP::PING_RESET, 0);
+ if (rtmp->sendMsg(args->netfd,
RTMP_SYSTEM_CHANNEL, RTMP::HEADER_12,
+ ping_reset->size(),
RTMP::PING, RTMPMsg::FROM_SERVER, *ping_reset)) {
+ log_debug("Sent Ping to client");
+ } else {
+ log_error("Couldn't send Ping to
client!");
+ }
+
+ }
+
+ // Invoke the NetStream::createStream() method
+ if (body->getMethodName() == "createStream") {
+ double streamid = body->getStreamID();
+ log_debug("The streamID from
NetStream::createStream() is: %d", streamid);
+ response_head_size = RTMP::HEADER_8;
+ response =
rtmp->encodeResult(RTMPMsg::NS_DATA_START);
+ body->dump();
+ }
+ if (rtmp->sendMsg(args->netfd,
body->getChannel(), response_head_size, response->allocated(),
+ RTMP::INVOKE,
RTMPMsg::FROM_SERVER, *response)) {
+ log_error("Sent response to client.");
+ } else {
+ log_error("Couldn't send response to
client!");
+ }
+ }
+ }
+ } else {
+ log_error("Message contains no data!");
+ }
+ } // end of processing all the messages in the que
+
+ // we're done processing these packets, so get rid of them
+ pkt.reset();
+
+
+
+#if 0
// This is support for the Red5 'echo_test', which exercises
encoding and
// decoding of complex and nested AMF data types. FIXME: this
should be
// moved to a CGI type of thing that executes this as a
separate process,
// using a socket to pass output back to the client.
if (echo) {
- boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*buf);
+ boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
boost::shared_ptr<amf::Buffer> bufptr;
if (que->size() > 0) {
cerr << "FIXME2 echo Que size is: " << que->size() <<
endl;
@@ -913,14 +996,13 @@
log_error("Couldn't send echo test response to
client!");
done = true;
}
- } else {
+ } else { // end of Red5 echo test support
// buf->dump();
// This is a non-Red5 message, which should be the normal
mode of operating.
- boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*buf);
+// boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
if (que->size() > 0) {
boost::shared_ptr<amf::Buffer> bufptr;
if (que->size() > 0) {
- cerr << "FIXME3 Que size is: " << que->size() <<
endl;
bufptr = que->at(0)->pop();
}
if (bufptr) {
@@ -930,8 +1012,14 @@
boost::uint8_t *tmpptr = bufptr->reference() +
qhead->head_size;
body = rtmp->decodeMsgBody(tmpptr,
qhead->bodysize);
boost::shared_ptr<amf::Buffer> response;
- if (body->getMethodName() == "createStream") {
- response =
rtmp->encodeResult(RTMPMsg::NS_DATA_START);
+ if (body) {
+ if (body->getMethodName() == "connect") {
+ response =
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+ } else if (body->getMethodName() ==
"createStream") {
+ response =
rtmp->encodeResult(RTMPMsg::NS_DATA_START);
+ } else {
+ response =
rtmp->encodeResult(RTMPMsg::NS_FAILED);
+ }
} else {
response =
rtmp->encodeResult(RTMPMsg::NS_FAILED);
}
@@ -950,6 +1038,9 @@
}
}
}
+#endif
+
+
} else {
log_error("Never read any data from fd #%d", args->netfd);
#if 0
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9974: improved main message processing loop.,
rob <=