[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Gnash-commit] /srv/bzr/gnash/rtmp r10067: Read multiple RTMP messages.
From: |
rob |
Subject: |
[Gnash-commit] /srv/bzr/gnash/rtmp r10067: Read multiple RTMP messages. Move the processing to net_handler() from call(). |
Date: |
Fri, 27 Mar 2009 11:56:15 -0600 |
User-agent: |
Bazaar (1.5) |
------------------------------------------------------------
revno: 10067
committer: address@hidden
branch nick: rtmp
timestamp: Fri 2009-03-27 11:56:15 -0600
message:
Read multiple RTMP messages. Move the processing to net_handler() from call().
modified:
libcore/asobj/NetConnection_as.cpp
=== modified file 'libcore/asobj/NetConnection_as.cpp'
--- a/libcore/asobj/NetConnection_as.cpp 2009-03-27 15:23:24 +0000
+++ b/libcore/asobj/NetConnection_as.cpp 2009-03-27 17:56:15 +0000
@@ -472,6 +472,7 @@
VM& vm = asCallback->getVM();
tdata->st = &vm.getStringTable();
+ tdata->nas = this;
// tdata->vm = vm;
// Send the request via HTTP
@@ -486,15 +487,20 @@
request += buf;
_http_client->writeNet(request);
tdata->network = reinterpret_cast<Network *>(_http_client.get());
+ tdata->network->setProtocol(url.protocol());
}
// Send the request via RTMP
if (url.protocol() == "rtmp") {
tdata->network = reinterpret_cast<Network *>(_rtmp_client.get());
+ tdata->network->setProtocol(url.protocol());
boost::shared_ptr<amf::Element> el = args[2].to_element();
// el->dump();
boost::shared_ptr<amf::Buffer> request =
_rtmp_client->encodeEchoRequest(app, 2.0, *el);
_rtmp_client->sendMsg(0x3, RTMP::HEADER_12, request->allocated(),
RTMP::INVOKE, RTMPMsg::FROM_CLIENT, *request);
+
+
+#if 0
boost::shared_ptr<amf::Buffer> response = _rtmp_client->recvMsg();
response->dump();
boost::shared_ptr<RTMP::rtmp_head_t> rthead;
@@ -523,19 +529,17 @@
}
}
}
+#endif
+
+
}
- // this->test();
-
// Start a thread to wait for the response
#if 0
boost::thread process_thread(boost::bind(&net_handler, tdata.get()));
#else
net_handler(tdata.get());
#endif
-
-// _currentConnection.reset(new HTTPRemotingHandler(*this, url));
-
}
std::auto_ptr<IOChannel>
@@ -759,125 +763,165 @@
// boost::mutex::scoped_lock lock(call_mutex);
args->network->setTimeout(50);
-
- // Suck all the data waiting for us in the network
- boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
- do {
- size_t ret = args->network->readNet(buf->reference() +
buf->allocated(),
- buf->size(), 60);
- // The timeout expired
- if (ret == 0) {
- log_debug("no data yet for fd #%d, continuing...",
- args->network->getFileFd());
- result = false;
- done = true;
- }
- // Something happened to the network connection
- if ((ret == static_cast<size_t>(string::npos)) || (ret ==
static_cast<size_t>(-1))) {
- log_debug("socket for fd #%d was closed...",
- args->network->getFileFd());
- return false;
- }
- // We got data.
- if (ret > 0) {
- // If we got less data than we tried to read, then we got the
- // whole packet most likely.
- if (ret < buf->size()) {
- done = true;
- result = true;
- }
- if (ret == buf->size()) {
- // become larger by another default block size.
- buf->resize(buf->size() + amf::NETBUFSIZE);
- log_debug("Got a full packet, making the buffer larger to %d",
- buf->size());
- result = true;
- }
- // manually set the seek pointer in the buffer, as we read
- // the data into the raw memory allocated to the buffer. We
- // only want to do this if we got data of course.
- buf->setSeekPointer(buf->end() + ret);
- } else {
- log_debug("no more data for fd #%d, exiting...",
- args->network->getFileFd());
- done = true;
- }
- } while(done != true);
-
- // Now process the data
- if (result) {
- HTTP *http = reinterpret_cast<HTTP *>(args->network);;
- amf::AMF amf;
- boost::uint8_t *data = http->processHeaderFields(*buf);
+ if (args->network->getProtocol() == "rtmp") {
+#if 1
+ do {
+ RTMPClient *client = reinterpret_cast<RTMPClient *>(args->network);
+ boost::shared_ptr<amf::Buffer> response = client->recvMsg();
+ response->dump();
+ boost::shared_ptr<RTMP::rtmp_head_t> rthead;
+ boost::shared_ptr<RTMP::queues_t> que = client->split(*response);
+
+ log_debug("%s: There are %d messages in the RTMP input queue",
__PRETTY_FUNCTION__, que->size());
+ while (que->size()) {
+ boost::shared_ptr<amf::Buffer> ptr = que->front()->pop();
+ log_debug("%s: There are %d messages in the RTMP input queue",
__PRETTY_FUNCTION__, que->size());
+ if (ptr) { // If there is legit data
+ rthead = client->decodeHeader(ptr->reference());
+ RTMPMsg *msg = client->decodeMsgBody(ptr->reference() +
rthead->head_size, rthead->bodysize);
+ msg->dump();
+ if (msg->getMethodName() == "_error") {
+ log_error("Got an error: %s", msg->getMethodName());
+ msg->at(0)->dump();
+ args->nas->notifyStatus(NetConnection_as::CALL_FAILED);
+ }
+ if (msg->getMethodName() == "_result") {
+ log_debug("Got a result: %s", msg->getMethodName());
+ if (msg->getElements().size() > 0) {
+ msg->at(0)->dump();
+ as_value tmp(*msg->at(0));
+// string_table::key methodKey =
tdata->st->find(methodName);
+ string_table::key methodKey =
args->st->find("onResult");
+ args->callback->callMethod(methodKey, tmp);
+ }
+ }
+ ptr.reset();
+ done = true;
+ break;
+ }
+ }
+ } while (!done);
+#endif
+ } else if (args->network->getProtocol() == "http") {
+ // Suck all the data waiting for us in the network
+ boost::shared_ptr<amf::Buffer> buf(new amf::Buffer);
+ do {
+ size_t ret = args->network->readNet(buf->reference() +
buf->allocated(),
+ buf->size(), 60);
+ // The timeout expired
+ if (ret == 0) {
+ log_debug("no data yet for fd #%d, continuing...",
+ args->network->getFileFd());
+ result = false;
+ done = true;
+ }
+ // Something happened to the network connection
+ if ((ret == static_cast<size_t>(string::npos)) || (ret ==
static_cast<size_t>(-1))) {
+ log_debug("socket for fd #%d was closed...",
+ args->network->getFileFd());
+ return false;
+ }
+ // We got data.
+ if (ret > 0) {
+ // If we got less data than we tried to read, then we got the
+ // whole packet most likely.
+ if (ret < buf->size()) {
+ done = true;
+ result = true;
+ }
+ if (ret == buf->size()) {
+ // become larger by another default block size.
+ buf->resize(buf->size() + amf::NETBUFSIZE);
+ log_debug("Got a full packet, making the buffer larger to
%d",
+ buf->size());
+ result = true;
+ }
+ // manually set the seek pointer in the buffer, as we read
+ // the data into the raw memory allocated to the buffer. We
+ // only want to do this if we got data of course.
+ buf->setSeekPointer(buf->end() + ret);
+ } else {
+ log_debug("no more data for fd #%d, exiting...",
+ args->network->getFileFd());
+ done = true;
+ }
+ } while(done != true);
+
+ // Now process the data
+ if (result) {
+ HTTP *http = reinterpret_cast<HTTP *>(args->network);;
+ amf::AMF amf;
+ boost::uint8_t *data = http->processHeaderFields(*buf);
// http->dump();
- size_t length = http->getContentLength();
- if (http->getField("transfer-encoding") == "chunked") {
- chunked = true;
- }
- // Make sure we have a sane length. If Chunked, then we don't have
- // a length field, so we use the size of the data that
- boost::shared_ptr<amf::Buffer> chunk;
- if (length == 0) {
- if (chunked) {
- size_t count = http->recvChunked(data, (buf->end() - data));
- log_debug("Got %d chunked data messages", count);
- } else {
- done = true;
- result = false;
- }
- }
-
+ size_t length = http->getContentLength();
+ if (http->getField("transfer-encoding") == "chunked") {
+ chunked = true;
+ }
+ // Make sure we have a sane length. If Chunked, then we don't have
+ // a length field, so we use the size of the data that
+ boost::shared_ptr<amf::Buffer> chunk;
+ if (length == 0) {
+ if (chunked) {
+ size_t count = http->recvChunked(data, (buf->end() - data));
+ log_debug("Got %d chunked data messages", count);
+ } else {
+ done = true;
+ result = false;
+ }
+ }
+
// for (size_t i=0; i<http->sizeChunks(); i++) {
- log_debug("Cookie is: \"%s\"", http->getField("cookie"));
- log_debug("Content type is: \"%s\"", http->getField("content-type"));
- if (http->getField("content-type").find("application/x-amf") !=
string::npos) {
- if (chunked) {
- chunk = http->mergeChunks();
- } else {
- chunk.reset(new amf::Buffer(buf->end() - data));
- chunk->copy(data,(buf->end() - data));
- }
-
+ log_debug("Cookie is: \"%s\"", http->getField("cookie"));
+ log_debug("Content type is: \"%s\"",
http->getField("content-type"));
+ if (http->getField("content-type").find("application/x-amf") !=
string::npos) {
+ if (chunked) {
+ chunk = http->mergeChunks();
+ } else {
+ chunk.reset(new amf::Buffer(buf->end() - data));
+ chunk->copy(data,(buf->end() - data));
+ }
+
// chunk = http->popChunk();
// chunk->dump();
- amf::AMF_msg amsg;
- boost::shared_ptr<amf::AMF_msg::context_header_t> head =
- amsg.parseAMFPacket(chunk->reference(), chunk->allocated());
+ amf::AMF_msg amsg;
+ boost::shared_ptr<amf::AMF_msg::context_header_t> head =
+ amsg.parseAMFPacket(chunk->reference(), chunk->allocated());
// amsg.dump();
- log_debug("%d messages in AMF packet", amsg.messageCount());
- for (size_t i=0; i<amsg.messageCount(); i++) {
+ log_debug("%d messages in AMF packet", amsg.messageCount());
+ for (size_t i=0; i<amsg.messageCount(); i++) {
// amsg.getMessage(i)->data->dump();
- boost::shared_ptr<amf::Element> el = amsg.getMessage(i)->data;
- as_value tmp(*el);
+ boost::shared_ptr<amf::Element> el =
amsg.getMessage(i)->data;
+ as_value tmp(*el);
// NetConnection_as *obj = (NetConnection_as *)args->network;
- log_debug("Calling NetConnection %s(%s)",
- amsg.getMessage(i)->header.target, tmp);
- // The method name looks something like this: /17/onResult
- // the first field is a sequence number so each response can
- // be matched to the request that made it. We only want the
- // name part, so we can call the method.
- string::size_type pos =
amsg.getMessage(i)->header.target.find('/', 1);
- string methodName;
- if (pos != string::npos) {
- methodName =
amsg.getMessage(i)->header.target.substr(pos+1,
amsg.getMessage(i)->header.target.size());
- }
+ log_debug("Calling NetConnection %s(%s)",
+ amsg.getMessage(i)->header.target, tmp);
+ // The method name looks something like this: /17/onResult
+ // the first field is a sequence number so each response can
+ // be matched to the request that made it. We only want the
+ // name part, so we can call the method.
+ string::size_type pos =
amsg.getMessage(i)->header.target.find('/', 1);
+ string methodName;
+ if (pos != string::npos) {
+ methodName =
amsg.getMessage(i)->header.target.substr(pos+1,
amsg.getMessage(i)->header.target.size());
+ }
// VM& vm = args->callback->getVM();
// string_table& st = vm.getStringTable();
- string_table::key methodKey;
+ string_table::key methodKey;
// boost::mutex::scoped_lock lock(_nc_mutex);
- methodKey = args->st->find(methodName);
- args->callback->callMethod(methodKey, tmp);
- }
- } else { // not AMF data
- if ((http->getField("content-type").find("application/xml") !=
string::npos)
- || (http->getField("content-type").find("text/html") !=
string::npos)) {
- log_debug("Textual Data is: %s", reinterpret_cast<char
*>(data));
- } else {
- log_debug("Binary Data is: %s", hexify(data, length, true));
+ methodKey = args->st->find(methodName);
+ args->callback->callMethod(methodKey, tmp);
+ }
+ } else { // not AMF data
+ if ((http->getField("content-type").find("application/xml") !=
string::npos)
+ || (http->getField("content-type").find("text/html") !=
string::npos)) {
+ log_debug("Textual Data is: %s", reinterpret_cast<char
*>(data));
+ } else {
+ log_debug("Binary Data is: %s", hexify(data, length, true));
+ }
}
}
}
-
+
log_debug("net_handler all done...");
return result;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Gnash-commit] /srv/bzr/gnash/rtmp r10067: Read multiple RTMP messages. Move the processing to net_handler() from call().,
rob <=