commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 57/148: Removed the ring buffer from the us


From: git
Subject: [Commit-gnuradio] [gnuradio] 57/148: Removed the ring buffer from the usrp2 impl.
Date: Mon, 15 Aug 2016 00:47:24 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

nwest pushed a commit to annotated tag old_usrp_devel_udp
in repository gnuradio.

commit 00ec6635b61a2ab03ceb21e12509affdf704ce2a
Author: Josh Blum <address@hidden>
Date:   Fri Dec 11 10:54:52 2009 -0800

    Removed the ring buffer from the usrp2 impl.
    
    The ring can be found encapsulated within the ethernet data transport.
    The future udp data transport will either use a ring, or kernel buffering
    or both pending some testing for performance and actual udp development.
    
    The transports' recv methods now take a data handler callable object,
    making it unnecessary to manage memory outside of the transport.
---
 usrp2/host/lib/Makefile.am           |   2 -
 usrp2/host/lib/eth_ctrl_transport.cc |  34 +----
 usrp2/host/lib/eth_ctrl_transport.h  |  10 +-
 usrp2/host/lib/eth_data_transport.cc | 127 ++++++++++------
 usrp2/host/lib/eth_data_transport.h  |  13 +-
 usrp2/host/lib/find.cc               |  51 ++++---
 usrp2/host/lib/ring.cc               |  15 +-
 usrp2/host/lib/ring.h                |  11 +-
 usrp2/host/lib/sbuff.h               |  91 -----------
 usrp2/host/lib/transport.cc          |  69 ---------
 usrp2/host/lib/transport.h           |  64 +++-----
 usrp2/host/lib/usrp2_impl.cc         | 285 +++++++++++++----------------------
 usrp2/host/lib/usrp2_impl.h          |  27 +---
 13 files changed, 270 insertions(+), 529 deletions(-)

diff --git a/usrp2/host/lib/Makefile.am b/usrp2/host/lib/Makefile.am
index d154ab5..2a14256 100644
--- a/usrp2/host/lib/Makefile.am
+++ b/usrp2/host/lib/Makefile.am
@@ -44,7 +44,6 @@ libusrp2_la_SOURCES = \
        open_usrp2_socket.cc \
        pktfilter.cc \
        ring.cc \
-       transport.cc \
        rx_nop_handler.cc \
        rx_sample_handler.cc \
        strtod_si.c \
@@ -68,7 +67,6 @@ noinst_HEADERS = \
        open_usrp2_socket.h \
        pktfilter.h \
        ring.h \
-       sbuff.h \
        transport.h \
        usrp2_bytesex.h \
        usrp2_impl.h
diff --git a/usrp2/host/lib/eth_ctrl_transport.cc 
b/usrp2/host/lib/eth_ctrl_transport.cc
index 2a622cc..e7e7386 100644
--- a/usrp2/host/lib/eth_ctrl_transport.cc
+++ b/usrp2/host/lib/eth_ctrl_transport.cc
@@ -19,7 +19,7 @@
 #include "eth_ctrl_transport.h"
 
 usrp2::eth_ctrl_transport::eth_ctrl_transport(const std::string &ifc, 
u2_mac_addr_t mac, bool target)
- : transport("ethernet control"), d_mac(mac), d_buff(NULL){
+ : transport("ethernet control"), d_mac(mac){
 
     //create raw ethernet device
     d_eth_ctrl = new ethernet();
@@ -39,7 +39,6 @@ usrp2::eth_ctrl_transport::~eth_ctrl_transport(){
     delete d_pf_ctrl;
     d_eth_ctrl->close();
     delete d_eth_ctrl;
-    delete[] d_buff;
 }
 
 bool usrp2::eth_ctrl_transport::sendv(const iovec *iov, size_t iovlen){
@@ -72,30 +71,11 @@ bool usrp2::eth_ctrl_transport::sendv(const iovec *iov, 
size_t iovlen){
     return d_eth_ctrl->write_packetv(all_iov, all_iov_len) > 0;
 }
 
-//helper function that deletes an array allocated by new
-//FIXME replace with the boost::lambda::delete_array
-static void delete_array(uint8_t *array){delete[] array;}
-
-usrp2::transport::sbuff_vec_t usrp2::eth_ctrl_transport::recv(){
-    sbuff_vec_t sbs;
-    for (size_t i = 0; i < max_buffs(); i++){
-        //conditionally allocate a new buffer
-        if (d_buff == NULL) d_buff = new uint8_t[ethernet::MAX_PKTLEN];
-        // This method must return immediately after getting a packet.
-        // Therefore, only the first call to read_packet (when size==0)
-        // may have a timeout and further calls must return immediately.
-        // This way, we return once all available packets have been read.
-        int recv_len = sbs.size()?
-            d_eth_ctrl->read_packet_dont_block(d_buff, ethernet::MAX_PKTLEN):
-            d_eth_ctrl->read_packet_timeout(d_buff, ethernet::MAX_PKTLEN, 
100); // FIXME magic timeout
-        //strip the ethernet headers from the buffer
-        if (recv_len > (signed)sizeof(u2_eth_packet_t)){
-            sbs.push_back(sbuff::make(
-                d_buff + sizeof(u2_eth_packet_t),
-                recv_len - sizeof(u2_eth_packet_t),
-                boost::bind(delete_array, d_buff)));
-            d_buff = NULL; //set to null to flag for a new allocation
-        } else break;
+void usrp2::eth_ctrl_transport::recv(data_handler *handler){
+    int recv_len = d_eth_ctrl->read_packet_timeout(d_buff, 
ethernet::MAX_PKTLEN, 100); // FIXME magic timeout
+    //strip the ethernet headers from the buffer
+    if (recv_len > (signed)sizeof(u2_eth_packet_t)){
+        data_handler::result result = (*handler)(d_buff + 
sizeof(u2_eth_packet_t), recv_len - sizeof(u2_eth_packet_t));
+        if (result == data_handler::DONE) return; //get out of here
     }
-    return sbs;
 }
diff --git a/usrp2/host/lib/eth_ctrl_transport.h 
b/usrp2/host/lib/eth_ctrl_transport.h
index 80fb10e..596b5954 100644
--- a/usrp2/host/lib/eth_ctrl_transport.h
+++ b/usrp2/host/lib/eth_ctrl_transport.h
@@ -31,7 +31,7 @@ namespace usrp2{
         ethernet      *d_eth_ctrl;  // unbuffered control frames
         pktfilter     *d_pf_ctrl;
         u2_mac_addr_t d_mac;
-        uint8_t       *d_buff;
+        uint8_t       d_buff[ethernet::MAX_PKTLEN];
         double_t      d_timeout;
         uint8_t       d_padding[ethernet::MIN_PKTLEN];
 
@@ -47,13 +47,7 @@ namespace usrp2{
         eth_ctrl_transport(const std::string &ifc, u2_mac_addr_t mac, bool 
target = true);
         ~eth_ctrl_transport();
         bool sendv(const iovec *iov, size_t iovlen);
-        sbuff_vec_t recv();
-        /*!
-         * \brief Controls the maximum size returned by recv
-         * Any integer larger than 0 would work here.
-         * \return the max size of sbuffs recv vector
-         */
-        size_t max_buffs(){return 7;} 
+        void recv(data_handler *handler);
 };
 
 
diff --git a/usrp2/host/lib/eth_data_transport.cc 
b/usrp2/host/lib/eth_data_transport.cc
index 7f556e6..e8928d9 100644
--- a/usrp2/host/lib/eth_data_transport.cc
+++ b/usrp2/host/lib/eth_data_transport.cc
@@ -36,6 +36,12 @@ usrp2::eth_data_transport::eth_data_transport(const 
std::string &ifc, u2_mac_add
     if (!d_pf_data || !d_eth_data->attach_pktfilter(d_pf_data))
         throw std::runtime_error("Unable to attach packet filter for data 
packets.");
 
+    //setup the ring
+    d_ring = ring::sptr(new ring(d_eth_data->max_frames()));
+
+    //start the thread
+    d_thread = new boost::thread(boost::bind(&eth_data_transport::recv_loop, 
this));
+
     memset(d_padding, 0, sizeof(d_padding));
 }
 
@@ -43,49 +49,13 @@ usrp2::eth_data_transport::~eth_data_transport(){
     delete d_pf_data;
     d_eth_data->close();
     delete d_eth_data;
+    //stop the recv thread
+    d_recv_on = false;
+    d_thread->interrupt();
+    d_thread->join();
 }
 
-void usrp2::eth_data_transport::init(){
-    if (gruel::enable_realtime_scheduling(gruel::sys_pri::usrp2_backend()) != 
gruel::RT_OK)
-        std::cerr << "usrp2: failed to enable realtime scheduling" << 
std::endl;
-}
-
-bool usrp2::eth_data_transport::sendv(const iovec *iov, size_t iovlen){
-    //create a new iov array with a space for ethernet header
-    // and move the current iovs to the center of the new array
-    size_t all_iov_len = iovlen + 2;
-    iovec all_iov[all_iov_len];
-    for (size_t i = 0; i < iovlen; i++){
-        all_iov[i+1] = iov[i];
-    }
-    //setup a new ethernet header
-    u2_eth_packet_t hdr;
-    hdr.ehdr.ethertype = htons(U2_DATA_ETHERTYPE);
-    memcpy(&hdr.ehdr.dst, d_mac.addr, 6);
-    memcpy(&hdr.ehdr.src, d_eth_data->mac(), 6);
-    hdr.thdr.flags = 0; // FIXME transport header values?
-    hdr.thdr.seqno = d_tx_seqno++;
-    hdr.thdr.ack = 0;
-    //feed the first iov the header
-    all_iov[0].iov_base = &hdr;
-    all_iov[0].iov_len = sizeof(hdr);
-    //get number of bytes in current iovs
-    int num_bytes = 0;
-    for (size_t i = 0; i < all_iov_len-1; i++){
-        num_bytes += all_iov[i].iov_len;
-    }
-    //handle padding, must be at least minimum length
-    all_iov[all_iov_len-1].iov_base = d_padding;
-    all_iov[all_iov_len-1].iov_len = 
std::max(int(eth_buffer::MIN_PKTLEN)-num_bytes, 0);
-    return (d_eth_data->tx_framev(all_iov, all_iov_len) == eth_buffer::EB_OK)? 
true : false;
-}
-
-usrp2::transport::sbuff_vec_t usrp2::eth_data_transport::recv(){
-    sbuff_vec_t sbs;
-    DEBUG_LOG(":");
-    // Receive available frames from ethernet buffer.  Handler will
-    // process control frames, enqueue data packets in channel
-    // rings, and signal blocked API threads
+void usrp2::eth_data_transport::recv_bg(void){
     std::vector<iovec> iovs = d_eth_data->rx_framev(100); // FIXME magic 
timeout
     for (size_t i = 0; i < iovs.size(); i++){
         void *base = iovs[i].iov_base;
@@ -120,11 +90,74 @@ usrp2::transport::sbuff_vec_t 
usrp2::eth_data_transport::recv(){
 
         /* --- end of fake transport layer handler --- */
 
-        //drop the ethernet and transport headers
-        sbs.push_back(sbuff::make(
-            (uint8_t*)base + sizeof(u2_eth_packet_t),
-            len - sizeof(u2_eth_packet_t),
-            boost::bind(&eth_buffer::release_frame, d_eth_data, base)));
+        ring_data rd;
+        rd.base = base;
+        rd.len = len;
+
+        //enqueue the ring data (release the data on failure)
+        DEBUG_LOG("+");
+        if (not d_ring->enqueue(rd)){
+            DEBUG_LOG("!");
+            d_eth_data->release_frame(rd.base);
+        }
+    }
+    d_ring->wait_for_empty();
+}
+
+void usrp2::eth_data_transport::recv_loop(){
+    if (gruel::enable_realtime_scheduling(gruel::sys_pri::usrp2_backend()) != 
gruel::RT_OK)
+        std::cerr << "usrp2: failed to enable realtime scheduling" << 
std::endl;
+    d_recv_on = true;
+    while (d_recv_on){
+        recv_bg();
+        boost::this_thread::interruption_point();
+    }
+}
+
+bool usrp2::eth_data_transport::sendv(const iovec *iov, size_t iovlen){
+    //create a new iov array with a space for ethernet header
+    // and move the current iovs to the center of the new array
+    size_t all_iov_len = iovlen + 2;
+    iovec all_iov[all_iov_len];
+    for (size_t i = 0; i < iovlen; i++){
+        all_iov[i+1] = iov[i];
     }
-    return sbs;
+    //setup a new ethernet header
+    u2_eth_packet_t hdr;
+    hdr.ehdr.ethertype = htons(U2_DATA_ETHERTYPE);
+    memcpy(&hdr.ehdr.dst, d_mac.addr, 6);
+    memcpy(&hdr.ehdr.src, d_eth_data->mac(), 6);
+    hdr.thdr.flags = 0; // FIXME transport header values?
+    hdr.thdr.seqno = d_tx_seqno++;
+    hdr.thdr.ack = 0;
+    //feed the first iov the header
+    all_iov[0].iov_base = &hdr;
+    all_iov[0].iov_len = sizeof(hdr);
+    //get number of bytes in current iovs
+    int num_bytes = 0;
+    for (size_t i = 0; i < all_iov_len-1; i++){
+        num_bytes += all_iov[i].iov_len;
+    }
+    //handle padding, must be at least minimum length
+    all_iov[all_iov_len-1].iov_base = d_padding;
+    all_iov[all_iov_len-1].iov_len = 
std::max(int(eth_buffer::MIN_PKTLEN)-num_bytes, 0);
+    return (d_eth_data->tx_framev(all_iov, all_iov_len) == eth_buffer::EB_OK)? 
true : false;
+}
+
+void usrp2::eth_data_transport::recv(data_handler *handler){
+    d_ring->wait_for_not_empty();
+    while (true){
+        ring_data rd;
+        DEBUG_LOG("-");
+        if (not d_ring->dequeue(rd)) break; //empty ring, get out of here
+        data_handler::result result = (*handler)((uint8_t*)rd.base + 
sizeof(u2_eth_packet_t), rd.len - sizeof(u2_eth_packet_t));
+        d_eth_data->release_frame(rd.base);
+        if (result == data_handler::DONE) break; //handler is done, get out of 
here
+    }
+}
+
+void usrp2::eth_data_transport::flush(void){
+    //dequeue everything in the ring
+    ring_data rd;
+    while (d_ring->dequeue(rd)){}
 }
diff --git a/usrp2/host/lib/eth_data_transport.h 
b/usrp2/host/lib/eth_data_transport.h
index 4955327..d32159e 100644
--- a/usrp2/host/lib/eth_data_transport.h
+++ b/usrp2/host/lib/eth_data_transport.h
@@ -24,6 +24,7 @@
 #include "eth_buffer.h"
 #include "pktfilter.h"
 #include "usrp2_impl.h"
+#include "ring.h"
 
 namespace usrp2{
 
@@ -40,14 +41,20 @@ namespace usrp2{
         unsigned int   d_num_rx_overruns;
         unsigned int   d_num_rx_bytes;
         uint8_t        d_padding[eth_buffer::MIN_PKTLEN];
+        ring::sptr     d_ring;
+
+        //for the recv thread
+        bool           d_recv_on;
+        boost::thread  *d_thread;
+        void recv_bg(void);
+        void recv_loop(void);
 
     public:
         eth_data_transport(const std::string &ifc, u2_mac_addr_t mac, size_t 
rx_bufsize);
         ~eth_data_transport();
         bool sendv(const iovec *iov, size_t iovlen);
-        sbuff_vec_t recv();
-        void init();
-        size_t max_buffs(){return d_eth_data->max_frames();}
+        void recv(data_handler *handler);
+        void flush(void);
 };
 
 
diff --git a/usrp2/host/lib/find.cc b/usrp2/host/lib/find.cc
index f8661d1..6e08dd4 100644
--- a/usrp2/host/lib/find.cc
+++ b/usrp2/host/lib/find.cc
@@ -36,7 +36,7 @@ static const u2_mac_addr_t broadcast_mac_addr =
       {{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff }};
 
 namespace usrp2{
-    class find_helper{
+    class find_helper : private data_handler{
     private:
         const std::string d_target_addr;
         transport::sptr   d_ctrl_transport;
@@ -47,7 +47,6 @@ namespace usrp2{
 
         find_helper(const std::string &ifc, const std::string &addr): 
d_target_addr(addr){
             d_ctrl_transport = transport::sptr(new eth_ctrl_transport(ifc, 
broadcast_mac_addr, false));
-            
d_ctrl_transport->set_callback(boost::bind(&find_helper::handle_control_packet, 
this, _1));
         }
 
         ~find_helper(void){/*NOP*/}
@@ -63,12 +62,37 @@ namespace usrp2{
             iov.iov_len = sizeof(op_generic_t);
             d_ctrl_transport->sendv(&iov, 1);
             //allow responses to gather
-            d_ctrl_transport->start();
+            boost::thread *ctrl_thread = new 
boost::thread(boost::bind(&find_helper::ctrl_thread_loop, this));
             boost::this_thread::sleep(gruel::get_new_timeout(0.05)); //50ms
-            d_ctrl_transport->stop();
+            ctrl_thread->interrupt();
+            ctrl_thread->join();
             return d_result;
         }
 
+        data_handler::result operator()(const void *base, size_t len){
+            //copy the packet into an reply structure
+            op_id_reply_t op_id_reply;
+            memset(&op_id_reply, 0, sizeof(op_id_reply_t));
+            memcpy(&op_id_reply, base, std::min(sizeof(op_id_reply_t), len));
+
+            //inspect the reply packet and store into result
+            if (op_id_reply.opcode != OP_ID_REPLY) // ignore
+                return data_handler::DONE;
+            props p = reply_to_props(&op_id_reply);
+            if (FIND_DEBUG)
+                std::cerr << "usrp2::find: response from " << p.addr << 
std::endl;
+            if ((d_target_addr == "") || (d_target_addr == p.addr))
+                d_result.push_back(p);
+            return data_handler::DONE;
+        }
+
+        void ctrl_thread_loop(void){
+            while(true){
+                d_ctrl_transport->recv(this);
+                boost::this_thread::interruption_point();
+            }
+        }
+
     private:
         static props
         reply_to_props(const op_id_reply_t *r)
@@ -85,25 +109,6 @@ namespace usrp2{
             memcpy(p.sw_md5sum, r->sw_md5sum, sizeof(p.sw_md5sum));
             return p;
         }
-
-        void handle_control_packet(const transport::sbuff_vec_t &sbs){
-            for (size_t i = 0; i < sbs.size(); i++){
-
-                //copy the packet into an reply structure
-                op_id_reply_t op_id_reply;
-                memset(&op_id_reply, 0, sizeof(op_id_reply_t));
-                memcpy(&op_id_reply, sbs[i]->buff(), 
std::min(sizeof(op_id_reply_t), sbs[i]->len()));
-
-                //inspect the reply packet and store into result
-                if (op_id_reply.opcode != OP_ID_REPLY) // ignore
-                    continue;
-                props p = reply_to_props(&op_id_reply);
-                if (FIND_DEBUG)
-                    std::cerr << "usrp2::find: response from " << p.addr << 
std::endl;
-                if ((d_target_addr == "") || (d_target_addr == p.addr))
-                    d_result.push_back(p);
-            }
-        }
     };
 
     props_vector_t
diff --git a/usrp2/host/lib/ring.cc b/usrp2/host/lib/ring.cc
index 2578e6a..9765497 100644
--- a/usrp2/host/lib/ring.cc
+++ b/usrp2/host/lib/ring.cc
@@ -30,15 +30,23 @@ namespace usrp2 {
   ring::ring(unsigned int entries)
     : d_max(entries), d_read_ind(0), d_write_ind(0),
       d_ring(entries),
-      d_mutex(), d_not_empty()
+      d_mutex(), d_empty_cond()
   {/*NOP*/}
 
   void 
+  ring::wait_for_empty() 
+  { 
+    gruel::scoped_lock l(d_mutex);
+    while (not empty()) 
+      d_empty_cond.wait(l);
+  }
+
+  void 
   ring::wait_for_not_empty() 
   { 
     gruel::scoped_lock l(d_mutex);
     while (empty()) 
-      d_not_empty.wait(l);
+      d_not_empty_cond.wait(l);
   }
 
   bool
@@ -51,7 +59,7 @@ namespace usrp2 {
     d_ring[d_write_ind] = rd;
 
     inc_write_ind();
-    d_not_empty.notify_one();
+    d_not_empty_cond.notify_one();
     return true;
   }
 
@@ -65,6 +73,7 @@ namespace usrp2 {
     rd = d_ring[d_read_ind];
 
     inc_read_ind();
+    d_empty_cond.notify_one();
     return true;
   }
   
diff --git a/usrp2/host/lib/ring.h b/usrp2/host/lib/ring.h
index 0be835b..2547a7e 100644
--- a/usrp2/host/lib/ring.h
+++ b/usrp2/host/lib/ring.h
@@ -26,7 +26,6 @@
 #include <boost/shared_ptr.hpp>
 #include <gruel/thread.h>
 #include <vrt/expanded_header.h>
-#include "sbuff.h"
 
 namespace usrp2 {
 
@@ -34,10 +33,8 @@ namespace usrp2 {
   class ring_data
   {
     public:
-        sbuff::sptr sb;
-        vrt::expanded_header hdr;
-        const uint32_t *payload;
-        size_t n32_bit_words_payload;
+        void *base;
+        size_t len;
   };
 
   class ring
@@ -54,7 +51,8 @@ namespace usrp2 {
     std::vector<ring_data> d_ring;
 
     gruel::mutex d_mutex;
-    gruel::condition_variable d_not_empty;
+    gruel::condition_variable d_empty_cond;
+    gruel::condition_variable d_not_empty_cond;
 
     void inc_read_ind()
     {
@@ -79,6 +77,7 @@ namespace usrp2 {
 
     ring(unsigned int entries);
 
+    void wait_for_empty();
     void wait_for_not_empty();
 
     bool enqueue(const ring_data &rd);
diff --git a/usrp2/host/lib/sbuff.h b/usrp2/host/lib/sbuff.h
deleted file mode 100644
index caef07f..0000000
--- a/usrp2/host/lib/sbuff.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2009 Free Software Foundation, Inc.
- * 
- * This file is part of GNU Radio
- * 
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- * 
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#ifndef INCLUDED_USRP2_SBUFF_H
-#define INCLUDED_USRP2_SBUFF_H
-
-#include <boost/bind.hpp>
-#include <boost/shared_ptr.hpp>
-#include <cstdio>
-
-#define USRP2_IMPL_DEBUG 0
-#if USRP2_IMPL_DEBUG
-#define DEBUG_LOG(x) ::write(2, x, 1)
-#else
-#define DEBUG_LOG(x)
-#endif
-
-namespace usrp2 {
-
-    /*******************************************************************
-     * This smart buffer class holds a buffer and its length in bytes
-     * A special callback can be passed into the sbuff as well.
-     * The callback (if set) will be called on deconstruction.
-     * 
-     * Typically, the callback will free the memory held by the buffer.
-     * But this is all up to the creator of the sbuff.
-    *******************************************************************/
-    class sbuff{
-    public:
-        //typedef for void no argument function
-        typedef boost::function<void()> cb_t;
-        typedef boost::shared_ptr<sbuff> sptr;
-    private:
-        void *d_buff;
-        size_t d_len;
-        cb_t d_cb;
-    public:
-        static sptr make(void *buff, size_t len, cb_t cb){
-            return sptr(new sbuff(buff, len, cb));
-        }
-        static sptr make(void *buff, size_t len){
-            return sptr(new sbuff(buff, len, NULL));
-        }
-        static sptr make(){
-            return sptr(new sbuff(NULL, 0, NULL));
-        }
-        sbuff(void *buff, size_t len, cb_t cb)
-         : d_buff(buff), d_len(len), d_cb(cb){}
-        ~sbuff(){done();}
-        //access methods
-        void *buff(){return d_buff;}
-        size_t len(){return d_len;}
-        /*!
-         * \brief mark this sbuff as done
-         * This method allows one to explicitly tell the sbuff that its no 
longer needed.
-         * Doing so will make the callback (if set) and zero out the other 
data.
-         *
-         * Although this method will be called automatically when the sptr 
calls delete,
-         * it is useful for the fast-path to have the ability to call done 
explicitly.
-         */
-        void done(){
-            if (d_cb) d_cb();
-            d_buff = NULL;
-            d_len = 0;
-            d_cb = NULL;
-        }
-
-    };
-
-}  // namespace usrp2
-
-
-#endif /* INCLUDED_USRP2_SBUFF_H */
diff --git a/usrp2/host/lib/transport.cc b/usrp2/host/lib/transport.cc
deleted file mode 100644
index 028f892..0000000
--- a/usrp2/host/lib/transport.cc
+++ /dev/null
@@ -1,69 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2009 Free Software Foundation, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "transport.h"
-#include <stdexcept>
-#include <cstdio>
-
-usrp2::transport::transport(const std::string &type_str){
-    d_cb = NULL;
-    d_type_str = type_str;
-    d_running = false;
-}
-
-usrp2::transport::~transport(){
-    if (d_running) stop();
-}
-
-void usrp2::transport::start(){
-    if (not d_cb){
-        throw std::runtime_error("usrp2::transport for " + d_type_str + " has 
no callback\n");
-    }
-    if (d_running){
-        throw std::runtime_error("usrp2::transport for " + d_type_str + " 
already started\n");
-    }
-    d_running = true;
-    d_thread = new boost::thread(boost::bind(&usrp2::transport::run, this));
-}
-
-void usrp2::transport::stop(){
-    if (not d_running){
-        throw std::runtime_error("usrp2::transport for " + d_type_str + " 
already stopped\n");
-    }
-    d_running = false;
-    d_thread->interrupt();
-    d_thread->join();
-}
-
-void usrp2::transport::run(){
-    init();
-    while (d_running){
-        try{
-            // call recv to get a new sbuffer
-            // pass the buffer into the callback
-            sbuff_vec_t sbs = recv();
-            if (sbs.size()) d_cb(sbs);
-        //catch thread interrupts, possibly from stop
-        //the running condition will be re-checked
-        }catch(boost::thread_interrupted const &){}
-    }
-}
diff --git a/usrp2/host/lib/transport.h b/usrp2/host/lib/transport.h
index 30a7bfb..902fe50 100644
--- a/usrp2/host/lib/transport.h
+++ b/usrp2/host/lib/transport.h
@@ -19,71 +19,51 @@
 #ifndef INCLUDED_TRANSPORT_H
 #define INCLUDED_TRANSPORT_H
 
-#include <boost/thread.hpp>
-#include <cstring>
+#include <string>
 #include <sys/uio.h>
-#include <vector>
-#include "sbuff.h"
+#include <usrp2/data_handler.h>
+#include <boost/shared_ptr.hpp>
+
+#define USRP2_IMPL_DEBUG 0
+#if USRP2_IMPL_DEBUG
+#define DEBUG_LOG(x) ::write(2, x, 1)
+#else
+#define DEBUG_LOG(x)
+#endif
 
 namespace usrp2 {
 
   class transport {
   public:
-    typedef std::vector<sbuff::sptr> sbuff_vec_t;
-    typedef boost::function<void(const sbuff_vec_t &)> callback_t;
     typedef boost::shared_ptr<transport> sptr;
+
   private:
     std::string              d_type_str;
-    volatile bool            d_running;
-    boost::thread            *d_thread;
-    callback_t               d_cb;
-    void run();
   public:
     /*!
      * \brief create a new transport
      * The callback takes a void * pointer and a length in bytes.
      * \param type_str a descriptive string
      */
-    transport(const std::string &type_str);
-    virtual ~transport();
-    /*!
-     * \brief Set the callback
-     * \param cb the callback created by boost::bind
-     */
-    void set_callback(callback_t cb){d_cb=cb;}
-    /*!
-     * \brief create a new thread for receiving
-     */
-    void start();
-    /*!
-     * \brief stop and join the current thread
-     */
-    void stop();
-    /*!
-     * \brief get the maximum number of buffs (override in a subclass)
-     * This number is the maximum number of buffers that recv can return at 
once.
-     * This number should be based upon the limitations of the internals of a 
subclass.
-     * Ex: for an ethernet packet ring, max buffs will be the max ring size.
-     * \return the number of buffs or 0 for undefined
-     */
-    virtual size_t max_buffs(){return 0;}
-    /*!
-     * \brief called from thread on init (override in a subclass)
-     * Purpose: to have a thread initialization hook.
-     */
-    virtual void init(){/*NOP*/}
+    transport(const std::string &type_str){d_type_str=type_str;}
     /*!
      * \brief send the contents of the buffer (override in a subclass)
      * \param iovec a list of iovecs
      * \param iovlen the number of iovecs
      * \return true for completion, false for error
      */
-    virtual bool sendv(const iovec *iov, size_t iovlen){return false;}
+    virtual bool sendv(const iovec *iov, size_t iovlen) = 0;
+    /*!
+     * \brief receive data and pass it to the handler
+     * \param handler the data handler callable object
+     * The handler will be called on the recieved data.
+     * If the handler returns done, recv must exit.
+     */
+    virtual void recv(data_handler *handler) = 0;
     /*!
-     * \brief receive data, possibly multiple buffers (override in a subclass)
-     * \return a new vector of sbuffs, an empty vector is no data
+     * \brief flush any samples in the rx buffers
      */
-    virtual sbuff_vec_t recv(){return sbuff_vec_t();}
+    virtual void flush(void){};
   };
   
 } // namespace usrp2
diff --git a/usrp2/host/lib/usrp2_impl.cc b/usrp2/host/lib/usrp2_impl.cc
index 23f0841..155804a 100644
--- a/usrp2/host/lib/usrp2_impl.cc
+++ b/usrp2/host/lib/usrp2_impl.cc
@@ -27,12 +27,9 @@
 #include <gruel/realtime.h>
 #include <gruel/sys_pri.h>
 #include <usrp2_types.h>
+#include <usrp2/rx_sample_handler.h>
 #include "usrp2_impl.h"
-#include "eth_buffer.h"
-#include "ethernet.h"
-#include "pktfilter.h"
 #include "control.h"
-#include "ring.h"
 #include <stdexcept>
 #include <iostream>
 #include <stdio.h>
@@ -82,24 +79,94 @@ namespace usrp2 {
     }
   }
 
+  /*********************************************************************
+   * control packet handler for control packets
+   *   creates a data handler with access to the pending replies
+   *   the operator() is called by the transport for each packet
+   ********************************************************************/
+  class ctrl_packet_handler : public data_handler{
+  private:
+    pending_reply **d_pending_replies;
+
+  public:
+    ctrl_packet_handler(pending_reply **pending_replies): 
d_pending_replies(pending_replies){}
+
+    data_handler::result operator()(const void *base, size_t len){
+        // point to beginning of payload (subpackets)
+        unsigned char *p = (unsigned char *)base;
+
+        // FIXME iterate over payload, handling more than a single subpacket.
+        
+        int opcode = p[0];
+        unsigned int oplen = p[1];
+        unsigned int rid = p[2];
+
+        pending_reply *rp = d_pending_replies[rid];
+        if (rp) {
+          unsigned int buflen = rp->len();
+          if (oplen != buflen) {
+        std::cerr << "usrp2: mismatched command reply length (expected: "
+              << buflen << " got: " << oplen << "). "
+              << "op = " << opcode_to_string(opcode) << std::endl;
+          }     
+        
+          // Copy reply into caller's buffer
+          memcpy(rp->buffer(), p, std::min(oplen, buflen));
+          rp->notify_completion();
+          d_pending_replies[rid] = NULL;
+          return data_handler::DONE;
+        }
+
+        // TODO: handle unsolicited, USRP2 initiated, or late replies
+        DEBUG_LOG("l");
+        return data_handler::DONE;
+    }
+  };
+
+  /*********************************************************************
+   * data packet handler for incoming samples
+   *   creates a data handler with rx sample handler
+   *   the operator() is called by the transport for each packet
+   ********************************************************************/
+  class data_packet_handler : public data_handler{
+  private:
+    rx_sample_handler *d_handler;
+
+  public:
+    data_packet_handler(rx_sample_handler *handler): d_handler(handler){}
+
+    data_handler::result operator()(const void *base, size_t len){
+        vrt::expanded_header hdr;
+        const uint32_t *payload;
+        size_t n32_bit_words_payload;
+        //parse the vrt header and store into the ring data structure
+        if (not vrt::expanded_header::parse(
+            (const uint32_t*)base,len/sizeof(uint32_t), //in
+            &hdr, &payload, &n32_bit_words_payload) //out
+            or not hdr.stream_id_p()
+        ){
+            printf("Bad vrt header 0x%.8x, Packet len %d\n", hdr.header, 
(int)len);
+            DEBUG_LOG("!");
+            return data_handler::RELEASE;
+        }
+        rx_metadata    md;
+        md.timestamp = hdr.fractional_secs; //FIXME temporary until we figure 
out new md for vrt
+        bool want_more = (*d_handler)(payload, n32_bit_words_payload, &md);
+        DEBUG_LOG("-"); 
+
+        return want_more? data_handler::RELEASE : data_handler::DONE;
+    }
+  };
+
   usrp2::impl::impl(transport::sptr data_transport, transport::sptr 
ctrl_transport) :
       d_next_rid(0),
-      d_num_enqueued(0),
-      d_enqueued_mutex(),
-      d_data_pending_cond(),
-      d_channel_rings(NCHANS),
       d_tx_interp(0),
       d_rx_decim(0),
-      d_dont_enqueue(true),
       d_ctrl_transport(ctrl_transport),
       d_data_transport(data_transport)
   {
-    
d_ctrl_transport->set_callback(boost::bind(&usrp2::impl::handle_control_packet, 
this, _1));
-    d_ctrl_transport->start();
-
-    
d_data_transport->set_callback(boost::bind(&usrp2::impl::handle_data_packet, 
this, _1));
-    d_data_transport->start();
-
+    d_ctrl_thread_running = true;
+    d_ctrl_thread = new 
boost::thread(boost::bind(&usrp2::impl::ctrl_thread_loop, this));
     memset(d_pending_replies, 0, sizeof(d_pending_replies));
 
     // In case the USRP2 was left streaming RX
@@ -154,8 +221,11 @@ namespace usrp2 {
   
   usrp2::impl::~impl()
   {
-    d_ctrl_transport->stop();
-    d_data_transport->stop();
+    //stop the control thread
+    d_ctrl_thread_running = false;
+    d_ctrl_thread->interrupt();
+    d_ctrl_thread->join();
+
   }
 
   void
@@ -205,90 +275,16 @@ namespace usrp2 {
     return res == 1;
   }
 
-  void
-  usrp2::impl::handle_control_packet(const transport::sbuff_vec_t &sbs)
-  {    
-    for (size_t i = 0; i < sbs.size(); i++) {
-        sbuff::sptr sb = sbs[i];
-
-        // point to beginning of payload (subpackets)
-        unsigned char *p = (unsigned char *)sb->buff();
-        
-        // FIXME (p % 4) == 2.  Not good.  Must watch for unaligned loads.
-
-        // FIXME iterate over payload, handling more than a single subpacket.
-        
-        int opcode = p[0];
-        unsigned int oplen = p[1];
-        unsigned int rid = p[2];
-
-        pending_reply *rp = d_pending_replies[rid];
-        if (rp) {
-          unsigned int buflen = rp->len();
-          if (oplen != buflen) {
-        std::cerr << "usrp2: mismatched command reply length (expected: "
-              << buflen << " got: " << oplen << "). "
-              << "op = " << opcode_to_string(opcode) << std::endl;
-          }     
-        
-          // Copy reply into caller's buffer
-          memcpy(rp->buffer(), p, std::min(oplen, buflen));
-          rp->notify_completion();
-          d_pending_replies[rid] = 0;
-          return;
-        }
-
-        // TODO: handle unsolicited, USRP2 initiated, or late replies
-        DEBUG_LOG("l");
-    }
-  }
-  
-  void
-  usrp2::impl::handle_data_packet(const transport::sbuff_vec_t &sbs)
+  void 
+  usrp2::impl::ctrl_thread_loop(void)
   {
-    if (d_dont_enqueue) return; //FIXME call done, or let the sptrs do it?
-
-    // Try to parse each packet and enqueue the data into the channel ring.
-    // Bad packets will be ignored and their data freed by done().
-    for (size_t i = 0; i < sbs.size(); i++) {
-        ring_data rd; rd.sb = sbs[i];
-
-        //parse the vrt header and store into the ring data structure
-        if (not vrt::expanded_header::parse(
-            (const uint32_t*)rd.sb->buff(), rd.sb->len()/sizeof(uint32_t), //in
-            &rd.hdr, &rd.payload, &rd.n32_bit_words_payload) //out
-            or not rd.hdr.stream_id_p()
-        ){
-            printf("Bad vrt header 0x%.8x, Packet len %d\n", rd.hdr.header, 
(int)rd.sb->len());
-            DEBUG_LOG("!");
-            rd.sb->done(); //mark done, this sbuff is no longer needed
-            continue;
-        }
-
-        //try to enqueue the data into the ring
-        gruel::scoped_lock l(d_channel_rings_mutex);
-        unsigned int chan = rd.hdr.stream_id;
-        if (d_channel_rings[chan] and d_channel_rings[chan]->enqueue(rd)) {
-            inc_enqueued();
-            DEBUG_LOG("+");
-        } else {
-            DEBUG_LOG("!");
-            rd.sb->done(); //mark done, this sbuff is no longer needed
-            continue;
-        }
-    }
-
-    // Wait for user API thread(s) to process all enqueued packets.
-    // The channel ring thread that decrements d_num_enqueued to zero 
-    // will signal this thread to continue.
-    if (d_num_enqueued > 0){
-        gruel::scoped_lock l(d_enqueued_mutex);
-        while(d_num_enqueued > 0)
-            d_data_pending_cond.wait(l);
+    data_handler *handler = new ctrl_packet_handler(d_pending_replies);
+    while (d_ctrl_thread_running){
+        d_ctrl_transport->recv(handler);
     }
+    delete handler;
   }
 
-
   // ----------------------------------------------------------------
   //                          Receive
   // ----------------------------------------------------------------
@@ -429,14 +425,9 @@ namespace usrp2 {
       return false;
     }
 
-    {
-      gruel::scoped_lock l(d_channel_rings_mutex);
-      if (d_channel_rings[channel]) {
-       std::cerr << "usrp2: channel " << channel
-                 << " already streaming" << std::endl;
-       return false;
-      }
-      
+    //flush any old samples in the data transport
+    d_data_transport->flush();
+
       if (items_per_frame == 0)
        items_per_frame = U2_MAX_SAMPLES;               // minimize overhead
       
@@ -451,20 +442,14 @@ namespace usrp2 {
       cmd.eop.opcode = OP_EOP;
       cmd.eop.len = sizeof(cmd.eop);
     
-      d_dont_enqueue = false;
       bool success = false;
       pending_reply p(cmd.op.rid, &reply, sizeof(reply));
       success = transmit_cmd_and_wait(&cmd, sizeof(cmd), &p, DEF_CMD_TIMEOUT);
       success = success && (ntohx(reply.ok) == 1);
-      
-      if (success)
-       d_channel_rings[channel] = ring::sptr(new 
ring(d_data_transport->max_buffs()));
-      else
-       d_dont_enqueue = true;
 
       //fprintf(stderr, "usrp2::start_rx_streaming: success = %d\n", success);
       return success;
-    }
+
   }
   
   bool
@@ -482,14 +467,10 @@ namespace usrp2 {
       return false;
     }
 
-    d_dont_enqueue = true;     // no new samples
-    flush_rx_samples(channel); // dump any we may already have
-
     op_stop_rx_cmd cmd;
     op_generic_t reply;
 
     {
-      gruel::scoped_lock l(d_channel_rings_mutex);
 
       memset(&cmd, 0, sizeof(cmd));
       cmd.op.opcode = OP_STOP_RX;
@@ -502,7 +483,6 @@ namespace usrp2 {
       pending_reply p(cmd.op.rid, &reply, sizeof(reply));
       success = transmit_cmd_and_wait(&cmd, sizeof(cmd), &p, DEF_CMD_TIMEOUT);
       success = success && (ntohx(reply.ok) == 1);
-      d_channel_rings[channel].reset();
       //fprintf(stderr, "usrp2::stop_rx_streaming:  success = %d\n", success);
       return success;
     }
@@ -511,72 +491,9 @@ namespace usrp2 {
   bool
   usrp2::impl::rx_samples(unsigned int channel, rx_sample_handler *handler)
   {
-    if (channel > MAX_CHAN) {
-      std::cerr << "usrp2: invalid channel (" << channel
-                << " )" << std::endl;
-      return false;
-    }
-    
-    if (channel > 0) {
-      std::cerr << "usrp2: channel " << channel
-                << " not implemented" << std::endl;
-      return false;
-    }
-    
-    ring::sptr rp = d_channel_rings[channel];
-    if (!rp){
-      std::cerr << "usrp2: channel " << channel
-                << " not receiving" << std::endl;
-      return false;
-    }
-    
-    // Wait for frames available in channel ring
-    DEBUG_LOG("W");
-    rp->wait_for_not_empty();
-    DEBUG_LOG("s");
-    
-    // Iterate through frames and present to user
-    ring_data rd;
-    while (rp->dequeue(rd)) {
-      rx_metadata      md;
-      md.timestamp = rd.hdr.fractional_secs; //FIXME temporary until we figure 
out new md for vrt
-      bool want_more = (*handler)(rd.payload, rd.n32_bit_words_payload, &md);
-      DEBUG_LOG("-");
-      rd.sb->done(); //mark done, this sbuff is no longer needed
-      dec_enqueued();
-
-      if (!want_more)
-        break;
-    }
-    return true;
-  }
-
-  bool
-  usrp2::impl::flush_rx_samples(unsigned int channel)
-  {
-    if (channel > MAX_CHAN) {
-      std::cerr << "usrp2: invalid channel (" << channel
-                << " )" << std::endl;
-      return false;
-    }
-
-    if (channel > 0) {
-      std::cerr << "usrp2: channel " << channel
-                << " not implemented" << std::endl;
-      return false;
-    }
-
-    ring::sptr rp = d_channel_rings[channel];
-    if (!rp){
-      return false;
-    }
-
-    // Iterate through frames and drop them
-    ring_data rd;
-    while (rp->dequeue(rd)) {
-      rd.sb->done(); //mark done, this sbuff is no longer needed
-      dec_enqueued();
-    }
+    data_handler *pkt_handler = new data_packet_handler(handler);
+    d_data_transport->recv(pkt_handler);
+    delete pkt_handler;
     return true;
   }
 
@@ -813,7 +730,7 @@ namespace usrp2 {
 
       size_t i = std::min((size_t) U2_MAX_SAMPLES, nitems - n);
 
-      eth_iovec iov[2];
+      iovec iov[2];
       iov[0].iov_base = &fixed_hdr;
       iov[0].iov_len = sizeof(fixed_hdr);
       iov[1].iov_base = const_cast<uint32_t *>(&items[n]);
diff --git a/usrp2/host/lib/usrp2_impl.h b/usrp2/host/lib/usrp2_impl.h
index 4638a5a..4958298 100644
--- a/usrp2/host/lib/usrp2_impl.h
+++ b/usrp2/host/lib/usrp2_impl.h
@@ -53,48 +53,28 @@ namespace usrp2 {
   class usrp2::impl
   {
     static const size_t NRIDS = 256;
-    static const size_t NCHANS = 32;
 
     int            d_next_rid;
 
-    unsigned int   d_num_enqueued;
-    gruel::mutex   d_enqueued_mutex;
-    gruel::condition_variable d_data_pending_cond;
-
     // all pending_replies are stack allocated, thus no possibility of leaking 
these
     pending_reply *d_pending_replies[NRIDS]; // indexed by 8-bit reply id
 
-    std::vector<ring::sptr>   d_channel_rings; // indexed by 5-bit channel 
number
-    gruel::mutex   d_channel_rings_mutex;
-
     db_info       d_tx_db_info;
     db_info       d_rx_db_info;
 
     int                   d_tx_interp;         // shadow tx interp 
     int                   d_rx_decim;          // shadow rx decim
 
-    bool          d_dont_enqueue;
-
-    void inc_enqueued() {
-      gruel::scoped_lock l(d_enqueued_mutex);
-      d_num_enqueued++;
-    }
-    
-    void dec_enqueued() {
-      gruel::scoped_lock l(d_enqueued_mutex);
-      if (--d_num_enqueued == 0)
-        d_data_pending_cond.notify_one();
-    }
-
     void init_config_rx_v2_cmd(op_config_rx_v2_cmd *cmd);
     void init_config_tx_v2_cmd(op_config_tx_v2_cmd *cmd);
     bool transmit_cmd_and_wait(void *cmd, size_t len, pending_reply *p, double 
secs=0.0);
     bool transmit_cmd(void *cmd, size_t len);
-    void handle_control_packet(const transport::sbuff_vec_t &sbs);
-    void handle_data_packet(const transport::sbuff_vec_t &sbs);
     bool dboard_info();
     bool reset_db();
 
+    void ctrl_thread_loop(void);
+    boost::thread *d_ctrl_thread;
+    bool d_ctrl_thread_running;
     transport::sptr d_ctrl_transport;
     transport::sptr d_data_transport;
 
@@ -122,7 +102,6 @@ namespace usrp2 {
     bool read_gpio(int bank, uint16_t *value);
     bool start_rx_streaming(unsigned int channel, unsigned int 
items_per_frame);
     bool rx_samples(unsigned int channel, rx_sample_handler *handler);
-    bool flush_rx_samples(unsigned int channel);
     bool stop_rx_streaming(unsigned int channel);
 
     // Tx



reply via email to

[Prev in Thread] Current Thread [Next in Thread]