[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r9965: better handling of multiple RT
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r9965: better handling of multiple RTMP messages that aren't the echo test. |
Date: |
Wed, 04 Feb 2009 14:12:35 -0700 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 9965
committer: address@hidden
branch nick: rtmp
timestamp: Wed 2009-02-04 14:12:35 -0700
message:
better handling of multiple RTMP messages that aren't the echo test.
modified:
cygnal/rtmp_server.cpp
=== modified file 'cygnal/rtmp_server.cpp'
--- a/cygnal/rtmp_server.cpp 2009-01-05 05:32:18 +0000
+++ b/cygnal/rtmp_server.cpp 2009-02-04 21:12:35 +0000
@@ -451,7 +451,20 @@
break;
case RTMPMsg::NS_CLEAR_FAILED:
case RTMPMsg::NS_CLEAR_SUCCESS:
+ // After a successful NetConnection, we get a
+ // NetStream::createStream.
case RTMPMsg::NS_DATA_START:
+ {
+ boost::shared_ptr<amf::Element> id1(new Element);
+ id1->makeNumber(2);
+ top.addProperty(id1);
+
+ boost::shared_ptr<amf::Element> id2(new Element);
+ id2->makeNumber(1);
+ top.addProperty(id2);
+
+ break;
+ }
case RTMPMsg::NS_FAILED:
case RTMPMsg::NS_INVALID_ARGUMENT:
case RTMPMsg::NS_PAUSE_NOTIFY:
@@ -532,20 +545,19 @@
RTMPServer::encodePing(rtmp_ping_e type, boost::uint32_t milliseconds)
{
// GNASH_REPORT_FUNCTION;
-
+
+ // An encoded ping message
boost::shared_ptr<amf::Buffer> buf(new Buffer(sizeof(boost::uint16_t) *
3));
- boost::uint8_t *ptr = buf->reference();
- buf->clear(); // default everything to zeros, real data gets
optionally added.
- // Manually adjust the seek pointer since we add the data by
- // walking ou own temporary pointer, so none of the regular ways
- // of setting the seek pointer are appropriate.
- buf->setSeekPointer(buf->reference() + buf->size());
+// boost::uint8_t *ptr = buf->reference();
+ // Set the type of this ping message
boost::uint16_t typefield = htons(type);
- ptr += sizeof(boost::uint16_t); // go past the first short
+ *buf = typefield;
+
+// // go past the first short, which is the type field
+// ptr += sizeof(boost::uint16_t);
boost::uint32_t swapped = 0;
- *buf = typefield;
switch (type) {
// These two don't appear to have any paramaters
case PING_CLEAR:
@@ -554,7 +566,7 @@
// the third parameter is the buffer time in milliseconds
case PING_TIME:
{
- ptr += sizeof(boost::uint16_t); // go past the second short
+// ptr += sizeof(boost::uint16_t); // go past the second short
swapped = milliseconds;
swapBytes(&swapped, sizeof(boost::uint32_t));
*buf += swapped;
@@ -562,7 +574,12 @@
}
// reset doesn't have any parameters but zeros
case PING_RESET:
+ {
+ boost::uint16_t zero = 0;
+ *buf += zero;
+ *buf += zero;
break;
+ }
// For Ping and Pong, the second parameter is always the milliseconds
case PING_CLIENT:
case PONG_CLIENT:
@@ -580,7 +597,7 @@
// Manually adjust the seek pointer since we added the data by
// walking ou own temporary pointer, so none of the regular ways
// of setting the seek pointer are appropriate.
- buf->setSeekPointer(buf->reference() + buf->size());
+// buf->setSeekPointer(buf->reference() + buf->size());
return buf;
}
@@ -742,16 +759,36 @@
// NetConnection::connect().
boost::shared_ptr<RTMP::rtmp_head_t> head = rtmp->decodeHeader(*pkt);
boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
- cerr << "FIXME Connect Que size is: " << que->size() << endl;
- que->at(0)->dump();
// RTMP::queues_t *que = rtmp->split(start->reference() + head->head_size,
start->size());
if (que->size() > 0) {
- boost::shared_ptr<amf::Buffer> bufptr = que->at(0)->pop();
- body = rtmp->decodeMsgBody(bufptr->reference() + head->head_size,
head->bodysize);
+ for (size_t i=0; i<que->size(); i++) {
+ boost::shared_ptr<amf::Buffer> bufptr = que->at(i)->pop();
+// que->at(i)->dump();
+ if (bufptr) {
+// bufptr->dump();
+ boost::shared_ptr<RTMP::rtmp_head_t> qhead =
rtmp->decodeHeader(bufptr->reference());
+ log_debug("Message for channel #%d", qhead->channel);
+ if (qhead->channel == RTMP_SYSTEM_CHANNEL) {
+ boost::shared_ptr<RTMP::rtmp_ping_t> ping =
rtmp->decodePing(bufptr->reference());
+ log_debug("Processed Ping message from client, type
%d", ping->type);
+
+ } else {
+ // skip past the header bytes to the start of the data
+ log_debug("Processing non system channel message!");
+ boost::uint8_t *tmpptr = bufptr->reference() +
qhead->head_size;
+ body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
+ body->setChannel(qhead->channel);
+// body->dump();
+ break;
+ }
+ } else {
+ log_error("Message contains no data!");
+ }
+ }
}
- // Send a ping to clear the new stream
- boost::shared_ptr<amf::Buffer> ping_reset =
rtmp->encodePing(RTMP::PING_CLEAR, 0);
+ // Send a ping to reset the new stream
+ boost::shared_ptr<amf::Buffer> ping_reset =
rtmp->encodePing(RTMP::PING_RESET, 0);
if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL, RTMP::HEADER_12,
ping_reset->size(), RTMP::PING, RTMPMsg::FROM_SERVER,
*ping_reset)) {
log_debug("Sent Ping to client");
@@ -760,21 +797,34 @@
}
// send a response to the NetConnection::connect() request
- boost::shared_ptr<amf::Buffer> response =
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
- if (rtmp->sendMsg(args->netfd, head->channel, RTMP::HEADER_12,
response->allocated(),
+// boost::shared_ptr<amf::Buffer> response =
rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+ boost::shared_ptr<amf::Buffer> response;
+ if (body == 0) {
+ response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_FAILED);
+ log_error("No body found in message!");
+ return false;
+ } else {
+ response = rtmp->encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
+ // The initial packet from the client is always a NetConnection
object
+ // invoking the 'connect' method.
+ if (body->getMethodName() == "connect") {
+ tcurl = body->findProperty("tcUrl");
+ if (tcurl) {
+ log_debug("Client request for remote file is: %s",
tcurl->to_string());
+ }
+ swfurl = body->findProperty("swfUrl");
+ if (swfurl) {
+ log_debug("SWF filename making request is: %s",
swfurl->to_string());
+ }
+ }
+ }
+ if (rtmp->sendMsg(args->netfd, body->getChannel(), RTMP::HEADER_12,
response->allocated(),
RTMP::INVOKE, RTMPMsg::FROM_SERVER, *response)) {
log_error("Sent NetConnection::connect() response to client.");
} else {
log_error("Couldn't send NetConnection::connect() response to
client!");
}
- tcurl = body->findProperty("tcUrl");
- if (tcurl) {
- log_debug("Client request for remote file is: %s",
tcurl->to_string());
- }
- swfurl = body->findProperty("swfUrl");
- if (swfurl) {
- log_debug("SWF filename making request is: %s",
swfurl->to_string());
- }
+
} else {
// Read the handshake bytes sent by the client when requesting
// a connection.
@@ -788,12 +838,14 @@
// See if this is a Red5 style echo test.
string::size_type pos;
- filespec = tcurl->to_string();
- pos = filespec.rfind("/");
- if (pos != string::npos) {
- if (filespec.substr(pos, filespec.size()-pos) == "/echo") {
- log_debug("Red5 echo test request!");
- echo = true;
+ if (tcurl) {
+ filespec = tcurl->to_string();
+ pos = filespec.rfind("/");
+ if (pos != string::npos) {
+ if (filespec.substr(pos, filespec.size()-pos) == "/echo") {
+ log_debug("Red5 echo test request!");
+ echo = true;
+ }
}
}
@@ -810,12 +862,15 @@
// st.dump();
- // See if we have any messages waiting
+ // See if we have any messages waiting. After the initial connect, this is
+ // the main loop for processing messages.
+
+ // Adjust the timeout
+ rtmp->setTimeout(30);
do {
boost::shared_ptr<amf::Buffer> buf = rtmp->recvMsg(args->netfd);
if (buf) {
if (buf->allocated()) {
- buf->dump();
boost::uint8_t *ptr = buf->reference();
if (ptr == 0) {
log_debug("Que empty, net connection dropped for fd #%d",
args->netfd);
@@ -823,11 +878,15 @@
}
boost::shared_ptr<RTMP::rtmp_head_t> rthead =
rtmp->decodeHeader(ptr);
ptr += rthead->head_size; // skip past the header
+ // This is support for the Red5 'echo_test', which exercises
encoding and
+ // decoding of complex and nested AMF data types. FIXME: this
should be
+ // moved to a CGI type of thing that executes this as a
separate process,
+ // using a socket to pass output back to the client.
if (echo) {
boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*buf);
boost::shared_ptr<amf::Buffer> bufptr;
if (que->size() > 0) {
-// cerr << "FIXME echo Que size is: " << que->size() <<
endl;
+ cerr << "FIXME2 echo Que size is: " << que->size() <<
endl;
bufptr = que->at(0)->pop();
}
// process the echo test request
@@ -855,10 +914,54 @@
done = true;
}
} else {
- body = rtmp->decodeMsgBody(*buf);
+// buf->dump();
+ // This is a non-Red5 message, which should be the normal
mode of operating.
+ boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*buf);
+ if (que->size() > 0) {
+ boost::shared_ptr<amf::Buffer> bufptr;
+ if (que->size() > 0) {
+ cerr << "FIXME3 Que size is: " << que->size() <<
endl;
+ bufptr = que->at(0)->pop();
+ }
+ if (bufptr) {
+ boost::shared_ptr<RTMP::rtmp_head_t> qhead =
rtmp->decodeHeader(bufptr->reference());
+
+ for (size_t i=0; i<que->size(); i++) {
+ boost::uint8_t *tmpptr = bufptr->reference() +
qhead->head_size;
+ body = rtmp->decodeMsgBody(tmpptr,
qhead->bodysize);
+ boost::shared_ptr<amf::Buffer> response;
+ if (body->getMethodName() == "createStream") {
+ response =
rtmp->encodeResult(RTMPMsg::NS_DATA_START);
+ } else {
+ response =
rtmp->encodeResult(RTMPMsg::NS_FAILED);
+ }
+
+ if (rtmp->sendMsg(args->netfd, qhead->channel,
RTMP::HEADER_8, response->allocated(),
+ RTMP::INVOKE,
RTMPMsg::FROM_SERVER, *response)) {
+ log_error("Sent response to client.");
+ } else {
+ log_error("Couldn't send response to
client!");
+ }
+
+
+ }
+ } else {
+ log_error("%s:%d Message contains no data!",
__FUNCTION__, __LINE__);
+ }
+ }
}
} else {
log_error("Never read any data from fd #%d", args->netfd);
+#if 0
+ // Send a ping to reset the new stream
+ boost::shared_ptr<amf::Buffer> ping_reset =
rtmp->encodePing(RTMP::PING_CLEAR, 0);
+ if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
RTMP::HEADER_12,
+ ping_reset->size(), RTMP::PING,
RTMPMsg::FROM_SERVER, *ping_reset)) {
+ log_debug("Sent Ping to client");
+ } else {
+ log_error("Couldn't send Ping to client!");
+ }
+#endif
initialize = true;
return false;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r9965: better handling of multiple RTMP messages that aren't the echo test.,
rob <=