commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 23/148: Changed the transport to return a v


From: git
Subject: [Commit-gnuradio] [gnuradio] 23/148: Changed the transport to return a vector of sbuffs. This way the ethernet can pass up as many buffs as ready.
Date: Mon, 15 Aug 2016 00:47:21 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

nwest pushed a commit to annotated tag old_usrp_devel_udp
in repository gnuradio.

commit a9c6502446c3249d3783415d789e31e7c9b2d981
Author: Josh Blum <address@hidden>
Date:   Thu Nov 5 15:39:59 2009 -0800

    Changed the transport to return a vector of sbuffs.
    This way the ethernet can pass up as many buffs as ready.
---
 usrp2/host/lib/eth_buffer.cc         |  22 +++---
 usrp2/host/lib/eth_buffer.h          |   2 +-
 usrp2/host/lib/eth_ctrl_transport.cc |  13 ++--
 usrp2/host/lib/eth_ctrl_transport.h  |   2 +-
 usrp2/host/lib/eth_data_transport.cc |  71 ++++++++++----------
 usrp2/host/lib/eth_data_transport.h  |   2 +-
 usrp2/host/lib/sbuff.h               |   3 +-
 usrp2/host/lib/transport.cc          |   8 +--
 usrp2/host/lib/transport.h           |   5 +-
 usrp2/host/lib/usrp2_impl.cc         | 126 +++++++++++++++++++----------------
 usrp2/host/lib/usrp2_impl.h          |   4 +-
 11 files changed, 141 insertions(+), 117 deletions(-)

diff --git a/usrp2/host/lib/eth_buffer.cc b/usrp2/host/lib/eth_buffer.cc
index 2fc4796..cba60bd 100644
--- a/usrp2/host/lib/eth_buffer.cc
+++ b/usrp2/host/lib/eth_buffer.cc
@@ -234,15 +234,16 @@ namespace usrp2 {
     return EB_OK;
   }
 
-int
-  eth_buffer::rx_frame(void **buff, int timeout_in_ms)
+std::vector<iovec>
+  eth_buffer::rx_framev(int timeout_in_ms)
   {
+    std::vector<iovec> iovs;
     DEBUG_LOG("\n");
       
     while (!frame_available()) {
       if (timeout_in_ms == 0) {
         DEBUG_LOG("w");
-        return -1;
+        return iovs;
       }
       
       struct pollfd pfd;
@@ -255,17 +256,17 @@ int
       int pres = poll(&pfd, 1, timeout_in_ms);
       if (pres == -1) {
         perror("poll");
-        return -1;
+        return iovs;
       }
 
       if (pres == 0) {
         DEBUG_LOG("t");
-        return -1;
+        return iovs;
       }
     }
 
     // Iterate through available packets
-    if (frame_available()) {
+    while (frame_available()) {
         // Get start of ethernet frame and length
         tpacket_hdr *hdr = (tpacket_hdr *)d_ring[d_head];
         void *base = (uint8_t *)hdr+hdr->tp_mac;
@@ -275,11 +276,14 @@ int
         // code.  This means that our uint32_t samples are not 4-byte
         // aligned.  We'll have to deal with it downstream.
 
-        *buff = base;
+        iovec iov;
+        iov.iov_base = base;
+        iov.iov_len = len;
+        iovs.push_back(iov);
+
         inc_head();
-        return len;
     }
-    return -1;
+    return iovs;
   }
 
   eth_buffer::result
diff --git a/usrp2/host/lib/eth_buffer.h b/usrp2/host/lib/eth_buffer.h
index 8c9f1d1..23365c2 100644
--- a/usrp2/host/lib/eth_buffer.h
+++ b/usrp2/host/lib/eth_buffer.h
@@ -147,7 +147,7 @@ namespace usrp2 {
      * \returns EB_ERROR if there was an unrecoverable error.
      */
     result rx_frames(data_handler *f, int timeout=-1);
-    int rx_frame(void **buff, int timeout_in_ms);
+    std::vector<iovec> rx_framev(int timeout_in_ms);
     /*
      * \brief Release frame from buffer
      *
diff --git a/usrp2/host/lib/eth_ctrl_transport.cc 
b/usrp2/host/lib/eth_ctrl_transport.cc
index 4efe840..cc0f89d 100644
--- a/usrp2/host/lib/eth_ctrl_transport.cc
+++ b/usrp2/host/lib/eth_ctrl_transport.cc
@@ -76,14 +76,17 @@ int usrp2::eth_ctrl_transport::sendv(const iovec *iov, 
size_t iovlen){
     return d_eth_ctrl->write_packetv(all_iov, all_iov_len);
 }
 
-usrp2::sbuff::sptr usrp2::eth_ctrl_transport::recv(){
+std::vector<usrp2::sbuff::sptr> usrp2::eth_ctrl_transport::recv(){
+    //TODO perform multiple non blocking recvs and pack into sbs
     int recv_len = d_eth_ctrl->read_packet_dont_block(d_buff, sizeof(d_buff));
     //strip the ethernet headers from the buffer
-    if (recv_len > sizeof(u2_eth_packet_only_t)){
-        return sbuff::make(
+    if (recv_len > (signed)sizeof(u2_eth_packet_only_t)){
+        std::vector<sbuff::sptr> sbs;
+        sbs.push_back(sbuff::make(
             d_buff + sizeof(u2_eth_packet_only_t),
-            recv_len - sizeof(u2_eth_packet_only_t));
+            recv_len - sizeof(u2_eth_packet_only_t)));
+        return sbs;
     }
     boost::this_thread::sleep(gruel::get_new_timeout(0.05)); //50ms timeout
-    return sbuff::make(); //nothing yet
+    return std::vector<sbuff::sptr>(); //nothing yet
 }
diff --git a/usrp2/host/lib/eth_ctrl_transport.h 
b/usrp2/host/lib/eth_ctrl_transport.h
index 4e67046..427e364 100644
--- a/usrp2/host/lib/eth_ctrl_transport.h
+++ b/usrp2/host/lib/eth_ctrl_transport.h
@@ -37,7 +37,7 @@ namespace usrp2{
         eth_ctrl_transport(const std::string &ifc, u2_mac_addr_t mac);
         ~eth_ctrl_transport();
         int sendv(const iovec *iov, size_t iovlen);
-        sbuff::sptr recv();
+        std::vector<sbuff::sptr> recv();
 };
 
 
diff --git a/usrp2/host/lib/eth_data_transport.cc 
b/usrp2/host/lib/eth_data_transport.cc
index 2f1013f..81a41c3 100644
--- a/usrp2/host/lib/eth_data_transport.cc
+++ b/usrp2/host/lib/eth_data_transport.cc
@@ -78,44 +78,47 @@ int usrp2::eth_data_transport::sendv(const iovec *iov, 
size_t iovlen){
     return d_eth_data->tx_framev(all_iov, all_iov_len);
 }
 
-usrp2::sbuff::sptr usrp2::eth_data_transport::recv(){
+std::vector<usrp2::sbuff::sptr> usrp2::eth_data_transport::recv(){
     void *base;
-
+    std::vector<sbuff::sptr> sbs;
     DEBUG_LOG(":");
     // Receive available frames from ethernet buffer.  Handler will
     // process control frames, enqueue data packets in channel
     // rings, and signal blocked API threads
-    int len = d_eth_data->rx_frame(&base, 100); // FIXME magic timeout
-
-    if (len <= 0) return sbuff::make();
-
-    u2_eth_packet_only_t *hdr = (u2_eth_packet_only_t *)base;
-    d_num_rx_frames++;
-    d_num_rx_bytes += len;
-    
-    /* --- FIXME start of fake transport layer handler --- */
-
-    if (d_rx_seqno != -1) {
-      int expected_seqno = (d_rx_seqno + 1) & 0xFF;
-      int seqno = hdr->thdr.seqno; 
-      
-      if (seqno != expected_seqno) {
-        DEBUG_LOG("S"); // missing sequence number
-        int missing = seqno - expected_seqno;
-        if (missing < 0)
-            missing += 256;
-        d_num_rx_overruns++;
-        d_num_rx_missing += missing;
-      }
+    std::vector<iovec> iovs = d_eth_data->rx_framev(100); // FIXME magic 
timeout
+    for (size_t i = 0; i < iovs.size(); i++){
+        void *base = iovs[i].iov_base;
+        size_t len = iovs[i].iov_len;
+
+        u2_eth_packet_only_t *hdr = (u2_eth_packet_only_t *)base;
+        d_num_rx_frames++;
+        d_num_rx_bytes += len;
+        
+        /* --- FIXME start of fake transport layer handler --- */
+
+        if (d_rx_seqno != -1) {
+          int expected_seqno = (d_rx_seqno + 1) & 0xFF;
+          int seqno = hdr->thdr.seqno; 
+          
+          if (seqno != expected_seqno) {
+            DEBUG_LOG("S"); // missing sequence number
+            int missing = seqno - expected_seqno;
+            if (missing < 0)
+                missing += 256;
+            d_num_rx_overruns++;
+            d_num_rx_missing += missing;
+          }
+        }
+
+        d_rx_seqno = hdr->thdr.seqno;
+
+        /* --- end of fake transport layer handler --- */
+
+        //drop the ethernet and transport headers
+        sbs.push_back(sbuff::make(
+            (uint8_t*)base + sizeof(u2_eth_packet_only_t),
+            len - sizeof(u2_eth_packet_only_t),
+            boost::bind(&eth_buffer::release_frame, d_eth_data, base)));
     }
-
-    d_rx_seqno = hdr->thdr.seqno;
-
-    /* --- end of fake transport layer handler --- */
-
-    //drop the ethernet and transport headers
-    return sbuff::make(
-        (uint8_t*)base + sizeof(u2_eth_packet_only_t),
-        len - sizeof(u2_eth_packet_only_t),
-        boost::bind(&eth_buffer::release_frame, d_eth_data, base));
+    return sbs;
 }
diff --git a/usrp2/host/lib/eth_data_transport.h 
b/usrp2/host/lib/eth_data_transport.h
index e524deb..452da9a 100644
--- a/usrp2/host/lib/eth_data_transport.h
+++ b/usrp2/host/lib/eth_data_transport.h
@@ -44,7 +44,7 @@ namespace usrp2{
         eth_data_transport(const std::string &ifc, u2_mac_addr_t mac, size_t 
rx_bufsize);
         ~eth_data_transport();
         int sendv(const iovec *iov, size_t iovlen);
-        sbuff::sptr recv();
+        std::vector<sbuff::sptr> recv();
         void init();
         size_t max_frames(){return d_eth_data->max_frames();} //FIXME hate to 
have this here
 };
diff --git a/usrp2/host/lib/sbuff.h b/usrp2/host/lib/sbuff.h
index dc5912a..bc6cb42 100644
--- a/usrp2/host/lib/sbuff.h
+++ b/usrp2/host/lib/sbuff.h
@@ -64,10 +64,11 @@ namespace usrp2 {
         }
         sbuff(void *buff, size_t len, cb_t cb)
          : d_buff(buff), d_len(len), d_cb(cb){}
-        ~sbuff(){if (d_cb) d_cb();}
+        ~sbuff(){done();}
         //access methods
         void *buff(){return d_buff;}
         size_t len(){return d_len;}
+        void done(){if (d_cb) d_cb();}
 
     };
 
diff --git a/usrp2/host/lib/transport.cc b/usrp2/host/lib/transport.cc
index 9b7c0b2..51beb38 100644
--- a/usrp2/host/lib/transport.cc
+++ b/usrp2/host/lib/transport.cc
@@ -63,8 +63,8 @@ void usrp2::transport::run(){
         try{
             // call recv to get a new sbuffer
             // pass the buffer into the callback
-            usrp2::sbuff::sptr sb = recv();
-            if (sb->len()) d_cb(sb);
+            std::vector<sbuff::sptr> sbs = recv();
+            if (sbs.size()) d_cb(sbs);
         //catch thread interrupts from join, sleep, etc
         //the running condition will be re-checked
         }catch(boost::thread_interrupted const &){}
@@ -75,6 +75,6 @@ int usrp2::transport::sendv(const iovec *iov, size_t iovlen){
     return -1; //NOP
 }
 
-usrp2::sbuff::sptr usrp2::transport::recv(){
-    return usrp2::sbuff::make(); //NOP
+std::vector<usrp2::sbuff::sptr> usrp2::transport::recv(){
+    return std::vector<sbuff::sptr>(); //NOP
 }
diff --git a/usrp2/host/lib/transport.h b/usrp2/host/lib/transport.h
index bddd9ff..a9c239a 100644
--- a/usrp2/host/lib/transport.h
+++ b/usrp2/host/lib/transport.h
@@ -22,13 +22,14 @@
 #include <boost/thread.hpp>
 #include <cstring>
 #include <sys/uio.h>
+#include <vector>
 #include "sbuff.h"
 
 namespace usrp2 {
 
   class transport {
   public:
-    typedef boost::function<void(sbuff::sptr)> callback_t;
+    typedef boost::function<void(std::vector<sbuff::sptr>)> callback_t;
     typedef boost::shared_ptr<transport> sptr;
   private:
     std::string              d_type_str;
@@ -73,7 +74,7 @@ namespace usrp2 {
      * \brief receive data into the sbuffer (override in a subclass)
      * \return a new sbuff, for now, an empty sbuff means nothing was recvd
      */
-    virtual sbuff::sptr recv();
+    virtual std::vector<sbuff::sptr> recv();
   };
   
 } // namespace usrp2
diff --git a/usrp2/host/lib/usrp2_impl.cc b/usrp2/host/lib/usrp2_impl.cc
index 4e8a63b..c16ad89 100644
--- a/usrp2/host/lib/usrp2_impl.cc
+++ b/usrp2/host/lib/usrp2_impl.cc
@@ -391,68 +391,80 @@ namespace usrp2 {
   }*/
 
   void
-  usrp2::impl::handle_control_packet(sbuff::sptr sb)
+  usrp2::impl::handle_control_packet(std::vector<sbuff::sptr> sbs)
   {    
-    // point to beginning of payload (subpackets)
-    unsigned char *p = (unsigned char *)sb->buff() + sizeof(u2_fixed_hdr_t);
-    
-    // FIXME (p % 4) == 2.  Not good.  Must watch for unaligned loads.
-
-    // FIXME iterate over payload, handling more than a single subpacket.
-    
-    int opcode = p[0];
-    unsigned int oplen = p[1];
-    unsigned int rid = p[2];
-
-    pending_reply *rp = d_pending_replies[rid];
-    if (rp) {
-      unsigned int buflen = rp->len();
-      if (oplen != buflen) {
-       std::cerr << "usrp2: mismatched command reply length (expected: "
-                 << buflen << " got: " << oplen << "). "
-                 << "op = " << opcode_to_string(opcode) << std::endl;
-      }     
-    
-      // Copy reply into caller's buffer
-      memcpy(rp->buffer(), p, std::min(oplen, buflen));
-      rp->notify_completion();
-      d_pending_replies[rid] = 0;
-      return;
+    for (size_t i = 0; i < sbs.size(); i++) {
+        sbuff::sptr sb = sbs[i];
+
+        // point to beginning of payload (subpackets)
+        unsigned char *p = (unsigned char *)sb->buff() + 
sizeof(u2_fixed_hdr_t);
+        
+        // FIXME (p % 4) == 2.  Not good.  Must watch for unaligned loads.
+
+        // FIXME iterate over payload, handling more than a single subpacket.
+        
+        int opcode = p[0];
+        unsigned int oplen = p[1];
+        unsigned int rid = p[2];
+
+        pending_reply *rp = d_pending_replies[rid];
+        if (rp) {
+          unsigned int buflen = rp->len();
+          if (oplen != buflen) {
+        std::cerr << "usrp2: mismatched command reply length (expected: "
+              << buflen << " got: " << oplen << "). "
+              << "op = " << opcode_to_string(opcode) << std::endl;
+          }     
+        
+          // Copy reply into caller's buffer
+          memcpy(rp->buffer(), p, std::min(oplen, buflen));
+          rp->notify_completion();
+          d_pending_replies[rid] = 0;
+          return;
+        }
+
+        // TODO: handle unsolicited, USRP2 initiated, or late replies
+        DEBUG_LOG("l");
     }
-
-    // TODO: handle unsolicited, USRP2 initiated, or late replies
-    DEBUG_LOG("l");
   }
   
   void
-  usrp2::impl::handle_data_packet(sbuff::sptr sb)
+  usrp2::impl::handle_data_packet(std::vector<sbuff::sptr> sbs)
   {
-    //d_num_rx_frames++;
-    //d_num_rx_bytes += sb->len();
-    u2_fixed_hdr_t *fixed_hdr = (u2_fixed_hdr_t*)sb->buff();
-
-    // FIXME unaligned load!
-    unsigned int chan = u2p_chan(fixed_hdr);
-
-    //printf("Recv over eth data %d (%d)\n", sb->len(), chan);
-
-    {
-      gruel::scoped_lock l(d_channel_rings_mutex);
+    if (d_dont_enqueue){
+        for (size_t i = 0; i < sbs.size(); i++) {
+            sbs[i]->done();
+        }
+        return;
+    }
 
-      if (!d_channel_rings[chan] or d_dont_enqueue) {
-        DEBUG_LOG("!");
-        return;        // discard packet, no channel handler
-      }
-      
-      
-      if (d_channel_rings[chan]->enqueue(sb)) {
-        inc_enqueued();
-        DEBUG_LOG("+");
-      }
-      else {
-        DEBUG_LOG("!");
-        return;     //discard packet, enqueue failed
-      }
+    for (size_t i = 0; i < sbs.size(); i++) {
+        sbuff::sptr sb = sbs[i];
+
+        u2_fixed_hdr_t *fixed_hdr = (u2_fixed_hdr_t*)sb->buff();
+        // FIXME unaligned load!
+        unsigned int chan = u2p_chan(fixed_hdr);
+
+        // process all data packets handed to us
+        // enqueue data packets in channel rings
+        {
+            gruel::scoped_lock l(d_channel_rings_mutex);
+
+            if (!d_channel_rings[chan]) {
+                DEBUG_LOG("!");
+                sb->done();
+                continue;      // discard packet, no channel handler
+            }
+
+            if (d_channel_rings[chan]->enqueue(sb)) {
+                inc_enqueued();
+                DEBUG_LOG("+");
+            } else {
+                DEBUG_LOG("!");
+                sb->done();
+                continue;     //discard packet, enqueue failed
+            }
+        }
     }
 
     // Wait for user API thread(s) to process all enqueued packets.
@@ -726,7 +738,7 @@ namespace usrp2 {
 
       bool want_more = (*handler)(items, nitems_in_uint32s, &md);
       DEBUG_LOG("-");
-      sb.reset(); //reset shared ptr so sbuff decontructs
+      sb->done(); //make done to call cleanup callback
       dec_enqueued();
 
       if (!want_more)
@@ -758,7 +770,7 @@ namespace usrp2 {
     // Iterate through frames and drop them
     sbuff::sptr sb;
     while (rp->dequeue(&sb)) {
-      sb.reset(); //reset shared ptr so sbuff decontructs
+      sb->done(); //make done to call cleanup callback
       dec_enqueued();
     }
     return true;
diff --git a/usrp2/host/lib/usrp2_impl.h b/usrp2/host/lib/usrp2_impl.h
index ad3f418..1ec288b 100644
--- a/usrp2/host/lib/usrp2_impl.h
+++ b/usrp2/host/lib/usrp2_impl.h
@@ -113,8 +113,8 @@ namespace usrp2 {
     bool transmit_cmd_and_wait(void *cmd, size_t len, pending_reply *p, double 
secs=0.0);
     bool transmit_cmd(void *cmd, size_t len);
     //virtual data_handler::result operator()(const void *base, size_t len);
-    void handle_control_packet(sbuff::sptr sb);
-    void handle_data_packet(sbuff::sptr sb);
+    void handle_control_packet(std::vector<sbuff::sptr> sbs);
+    void handle_data_packet(std::vector<sbuff::sptr> sbs);
     bool dboard_info();
     bool reset_db();
 



reply via email to

[Prev in Thread] Current Thread [Next in Thread]