[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 57/148: Removed the ring buffer from the us
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 57/148: Removed the ring buffer from the usrp2 impl. |
Date: |
Mon, 15 Aug 2016 00:47:24 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
nwest pushed a commit to annotated tag old_usrp_devel_udp
in repository gnuradio.
commit 00ec6635b61a2ab03ceb21e12509affdf704ce2a
Author: Josh Blum <address@hidden>
Date: Fri Dec 11 10:54:52 2009 -0800
Removed the ring buffer from the usrp2 impl.
The ring can be found encapsulated within the ethernet data transport.
The future udp data transport will either use a ring, or kernel buffering
or both pending some testing for performance and actual udp development.
The transports' recv methods now take a data handler callable object,
making it unnecessary to manage memory outside of the transport.
---
usrp2/host/lib/Makefile.am | 2 -
usrp2/host/lib/eth_ctrl_transport.cc | 34 +----
usrp2/host/lib/eth_ctrl_transport.h | 10 +-
usrp2/host/lib/eth_data_transport.cc | 127 ++++++++++------
usrp2/host/lib/eth_data_transport.h | 13 +-
usrp2/host/lib/find.cc | 51 ++++---
usrp2/host/lib/ring.cc | 15 +-
usrp2/host/lib/ring.h | 11 +-
usrp2/host/lib/sbuff.h | 91 -----------
usrp2/host/lib/transport.cc | 69 ---------
usrp2/host/lib/transport.h | 64 +++-----
usrp2/host/lib/usrp2_impl.cc | 285 +++++++++++++----------------------
usrp2/host/lib/usrp2_impl.h | 27 +---
13 files changed, 270 insertions(+), 529 deletions(-)
diff --git a/usrp2/host/lib/Makefile.am b/usrp2/host/lib/Makefile.am
index d154ab5..2a14256 100644
--- a/usrp2/host/lib/Makefile.am
+++ b/usrp2/host/lib/Makefile.am
@@ -44,7 +44,6 @@ libusrp2_la_SOURCES = \
open_usrp2_socket.cc \
pktfilter.cc \
ring.cc \
- transport.cc \
rx_nop_handler.cc \
rx_sample_handler.cc \
strtod_si.c \
@@ -68,7 +67,6 @@ noinst_HEADERS = \
open_usrp2_socket.h \
pktfilter.h \
ring.h \
- sbuff.h \
transport.h \
usrp2_bytesex.h \
usrp2_impl.h
diff --git a/usrp2/host/lib/eth_ctrl_transport.cc
b/usrp2/host/lib/eth_ctrl_transport.cc
index 2a622cc..e7e7386 100644
--- a/usrp2/host/lib/eth_ctrl_transport.cc
+++ b/usrp2/host/lib/eth_ctrl_transport.cc
@@ -19,7 +19,7 @@
#include "eth_ctrl_transport.h"
usrp2::eth_ctrl_transport::eth_ctrl_transport(const std::string &ifc,
u2_mac_addr_t mac, bool target)
- : transport("ethernet control"), d_mac(mac), d_buff(NULL){
+ : transport("ethernet control"), d_mac(mac){
//create raw ethernet device
d_eth_ctrl = new ethernet();
@@ -39,7 +39,6 @@ usrp2::eth_ctrl_transport::~eth_ctrl_transport(){
delete d_pf_ctrl;
d_eth_ctrl->close();
delete d_eth_ctrl;
- delete[] d_buff;
}
bool usrp2::eth_ctrl_transport::sendv(const iovec *iov, size_t iovlen){
@@ -72,30 +71,11 @@ bool usrp2::eth_ctrl_transport::sendv(const iovec *iov,
size_t iovlen){
return d_eth_ctrl->write_packetv(all_iov, all_iov_len) > 0;
}
-//helper function that deletes an array allocated by new
-//FIXME replace with the boost::lambda::delete_array
-static void delete_array(uint8_t *array){delete[] array;}
-
-usrp2::transport::sbuff_vec_t usrp2::eth_ctrl_transport::recv(){
- sbuff_vec_t sbs;
- for (size_t i = 0; i < max_buffs(); i++){
- //conditionally allocate a new buffer
- if (d_buff == NULL) d_buff = new uint8_t[ethernet::MAX_PKTLEN];
- // This method must return immediately after getting a packet.
- // Therefore, only the first call to read_packet (when size==0)
- // may have a timeout and further calls must return immediately.
- // This way, we return once all available packets have been read.
- int recv_len = sbs.size()?
- d_eth_ctrl->read_packet_dont_block(d_buff, ethernet::MAX_PKTLEN):
- d_eth_ctrl->read_packet_timeout(d_buff, ethernet::MAX_PKTLEN,
100); // FIXME magic timeout
- //strip the ethernet headers from the buffer
- if (recv_len > (signed)sizeof(u2_eth_packet_t)){
- sbs.push_back(sbuff::make(
- d_buff + sizeof(u2_eth_packet_t),
- recv_len - sizeof(u2_eth_packet_t),
- boost::bind(delete_array, d_buff)));
- d_buff = NULL; //set to null to flag for a new allocation
- } else break;
+void usrp2::eth_ctrl_transport::recv(data_handler *handler){
+ int recv_len = d_eth_ctrl->read_packet_timeout(d_buff,
ethernet::MAX_PKTLEN, 100); // FIXME magic timeout
+ //strip the ethernet headers from the buffer
+ if (recv_len > (signed)sizeof(u2_eth_packet_t)){
+ data_handler::result result = (*handler)(d_buff +
sizeof(u2_eth_packet_t), recv_len - sizeof(u2_eth_packet_t));
+ if (result == data_handler::DONE) return; //get out of here
}
- return sbs;
}
diff --git a/usrp2/host/lib/eth_ctrl_transport.h
b/usrp2/host/lib/eth_ctrl_transport.h
index 80fb10e..596b5954 100644
--- a/usrp2/host/lib/eth_ctrl_transport.h
+++ b/usrp2/host/lib/eth_ctrl_transport.h
@@ -31,7 +31,7 @@ namespace usrp2{
ethernet *d_eth_ctrl; // unbuffered control frames
pktfilter *d_pf_ctrl;
u2_mac_addr_t d_mac;
- uint8_t *d_buff;
+ uint8_t d_buff[ethernet::MAX_PKTLEN];
double_t d_timeout;
uint8_t d_padding[ethernet::MIN_PKTLEN];
@@ -47,13 +47,7 @@ namespace usrp2{
eth_ctrl_transport(const std::string &ifc, u2_mac_addr_t mac, bool
target = true);
~eth_ctrl_transport();
bool sendv(const iovec *iov, size_t iovlen);
- sbuff_vec_t recv();
- /*!
- * \brief Controls the maximum size returned by recv
- * Any integer larger than 0 would work here.
- * \return the max size of sbuffs recv vector
- */
- size_t max_buffs(){return 7;}
+ void recv(data_handler *handler);
};
diff --git a/usrp2/host/lib/eth_data_transport.cc
b/usrp2/host/lib/eth_data_transport.cc
index 7f556e6..e8928d9 100644
--- a/usrp2/host/lib/eth_data_transport.cc
+++ b/usrp2/host/lib/eth_data_transport.cc
@@ -36,6 +36,12 @@ usrp2::eth_data_transport::eth_data_transport(const
std::string &ifc, u2_mac_add
if (!d_pf_data || !d_eth_data->attach_pktfilter(d_pf_data))
throw std::runtime_error("Unable to attach packet filter for data
packets.");
+ //setup the ring
+ d_ring = ring::sptr(new ring(d_eth_data->max_frames()));
+
+ //start the thread
+ d_thread = new boost::thread(boost::bind(ð_data_transport::recv_loop,
this));
+
memset(d_padding, 0, sizeof(d_padding));
}
@@ -43,49 +49,13 @@ usrp2::eth_data_transport::~eth_data_transport(){
delete d_pf_data;
d_eth_data->close();
delete d_eth_data;
+ //stop the recv thread
+ d_recv_on = false;
+ d_thread->interrupt();
+ d_thread->join();
}
-void usrp2::eth_data_transport::init(){
- if (gruel::enable_realtime_scheduling(gruel::sys_pri::usrp2_backend()) !=
gruel::RT_OK)
- std::cerr << "usrp2: failed to enable realtime scheduling" <<
std::endl;
-}
-
-bool usrp2::eth_data_transport::sendv(const iovec *iov, size_t iovlen){
- //create a new iov array with a space for ethernet header
- // and move the current iovs to the center of the new array
- size_t all_iov_len = iovlen + 2;
- iovec all_iov[all_iov_len];
- for (size_t i = 0; i < iovlen; i++){
- all_iov[i+1] = iov[i];
- }
- //setup a new ethernet header
- u2_eth_packet_t hdr;
- hdr.ehdr.ethertype = htons(U2_DATA_ETHERTYPE);
- memcpy(&hdr.ehdr.dst, d_mac.addr, 6);
- memcpy(&hdr.ehdr.src, d_eth_data->mac(), 6);
- hdr.thdr.flags = 0; // FIXME transport header values?
- hdr.thdr.seqno = d_tx_seqno++;
- hdr.thdr.ack = 0;
- //feed the first iov the header
- all_iov[0].iov_base = &hdr;
- all_iov[0].iov_len = sizeof(hdr);
- //get number of bytes in current iovs
- int num_bytes = 0;
- for (size_t i = 0; i < all_iov_len-1; i++){
- num_bytes += all_iov[i].iov_len;
- }
- //handle padding, must be at least minimum length
- all_iov[all_iov_len-1].iov_base = d_padding;
- all_iov[all_iov_len-1].iov_len =
std::max(int(eth_buffer::MIN_PKTLEN)-num_bytes, 0);
- return (d_eth_data->tx_framev(all_iov, all_iov_len) == eth_buffer::EB_OK)?
true : false;
-}
-
-usrp2::transport::sbuff_vec_t usrp2::eth_data_transport::recv(){
- sbuff_vec_t sbs;
- DEBUG_LOG(":");
- // Receive available frames from ethernet buffer. Handler will
- // process control frames, enqueue data packets in channel
- // rings, and signal blocked API threads
+void usrp2::eth_data_transport::recv_bg(void){
std::vector<iovec> iovs = d_eth_data->rx_framev(100); // FIXME magic
timeout
for (size_t i = 0; i < iovs.size(); i++){
void *base = iovs[i].iov_base;
@@ -120,11 +90,74 @@ usrp2::transport::sbuff_vec_t
usrp2::eth_data_transport::recv(){
/* --- end of fake transport layer handler --- */
- //drop the ethernet and transport headers
- sbs.push_back(sbuff::make(
- (uint8_t*)base + sizeof(u2_eth_packet_t),
- len - sizeof(u2_eth_packet_t),
- boost::bind(ð_buffer::release_frame, d_eth_data, base)));
+ ring_data rd;
+ rd.base = base;
+ rd.len = len;
+
+ //enqueue the ring data (release the data on failure)
+ DEBUG_LOG("+");
+ if (not d_ring->enqueue(rd)){
+ DEBUG_LOG("!");
+ d_eth_data->release_frame(rd.base);
+ }
+ }
+ d_ring->wait_for_empty();
+}
+
+void usrp2::eth_data_transport::recv_loop(){
+ if (gruel::enable_realtime_scheduling(gruel::sys_pri::usrp2_backend()) !=
gruel::RT_OK)
+ std::cerr << "usrp2: failed to enable realtime scheduling" <<
std::endl;
+ d_recv_on = true;
+ while (d_recv_on){
+ recv_bg();
+ boost::this_thread::interruption_point();
+ }
+}
+
+bool usrp2::eth_data_transport::sendv(const iovec *iov, size_t iovlen){
+ //create a new iov array with a space for ethernet header
+ // and move the current iovs to the center of the new array
+ size_t all_iov_len = iovlen + 2;
+ iovec all_iov[all_iov_len];
+ for (size_t i = 0; i < iovlen; i++){
+ all_iov[i+1] = iov[i];
}
- return sbs;
+ //setup a new ethernet header
+ u2_eth_packet_t hdr;
+ hdr.ehdr.ethertype = htons(U2_DATA_ETHERTYPE);
+ memcpy(&hdr.ehdr.dst, d_mac.addr, 6);
+ memcpy(&hdr.ehdr.src, d_eth_data->mac(), 6);
+ hdr.thdr.flags = 0; // FIXME transport header values?
+ hdr.thdr.seqno = d_tx_seqno++;
+ hdr.thdr.ack = 0;
+ //feed the first iov the header
+ all_iov[0].iov_base = &hdr;
+ all_iov[0].iov_len = sizeof(hdr);
+ //get number of bytes in current iovs
+ int num_bytes = 0;
+ for (size_t i = 0; i < all_iov_len-1; i++){
+ num_bytes += all_iov[i].iov_len;
+ }
+ //handle padding, must be at least minimum length
+ all_iov[all_iov_len-1].iov_base = d_padding;
+ all_iov[all_iov_len-1].iov_len =
std::max(int(eth_buffer::MIN_PKTLEN)-num_bytes, 0);
+ return (d_eth_data->tx_framev(all_iov, all_iov_len) == eth_buffer::EB_OK)?
true : false;
+}
+
+void usrp2::eth_data_transport::recv(data_handler *handler){
+ d_ring->wait_for_not_empty();
+ while (true){
+ ring_data rd;
+ DEBUG_LOG("-");
+ if (not d_ring->dequeue(rd)) break; //empty ring, get out of here
+ data_handler::result result = (*handler)((uint8_t*)rd.base +
sizeof(u2_eth_packet_t), rd.len - sizeof(u2_eth_packet_t));
+ d_eth_data->release_frame(rd.base);
+ if (result == data_handler::DONE) break; //handler is done, get out of
here
+ }
+}
+
+void usrp2::eth_data_transport::flush(void){
+ //dequeue everything in the ring
+ ring_data rd;
+ while (d_ring->dequeue(rd)){}
}
diff --git a/usrp2/host/lib/eth_data_transport.h
b/usrp2/host/lib/eth_data_transport.h
index 4955327..d32159e 100644
--- a/usrp2/host/lib/eth_data_transport.h
+++ b/usrp2/host/lib/eth_data_transport.h
@@ -24,6 +24,7 @@
#include "eth_buffer.h"
#include "pktfilter.h"
#include "usrp2_impl.h"
+#include "ring.h"
namespace usrp2{
@@ -40,14 +41,20 @@ namespace usrp2{
unsigned int d_num_rx_overruns;
unsigned int d_num_rx_bytes;
uint8_t d_padding[eth_buffer::MIN_PKTLEN];
+ ring::sptr d_ring;
+
+ //for the recv thread
+ bool d_recv_on;
+ boost::thread *d_thread;
+ void recv_bg(void);
+ void recv_loop(void);
public:
eth_data_transport(const std::string &ifc, u2_mac_addr_t mac, size_t
rx_bufsize);
~eth_data_transport();
bool sendv(const iovec *iov, size_t iovlen);
- sbuff_vec_t recv();
- void init();
- size_t max_buffs(){return d_eth_data->max_frames();}
+ void recv(data_handler *handler);
+ void flush(void);
};
diff --git a/usrp2/host/lib/find.cc b/usrp2/host/lib/find.cc
index f8661d1..6e08dd4 100644
--- a/usrp2/host/lib/find.cc
+++ b/usrp2/host/lib/find.cc
@@ -36,7 +36,7 @@ static const u2_mac_addr_t broadcast_mac_addr =
{{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff }};
namespace usrp2{
- class find_helper{
+ class find_helper : private data_handler{
private:
const std::string d_target_addr;
transport::sptr d_ctrl_transport;
@@ -47,7 +47,6 @@ namespace usrp2{
find_helper(const std::string &ifc, const std::string &addr):
d_target_addr(addr){
d_ctrl_transport = transport::sptr(new eth_ctrl_transport(ifc,
broadcast_mac_addr, false));
-
d_ctrl_transport->set_callback(boost::bind(&find_helper::handle_control_packet,
this, _1));
}
~find_helper(void){/*NOP*/}
@@ -63,12 +62,37 @@ namespace usrp2{
iov.iov_len = sizeof(op_generic_t);
d_ctrl_transport->sendv(&iov, 1);
//allow responses to gather
- d_ctrl_transport->start();
+ boost::thread *ctrl_thread = new
boost::thread(boost::bind(&find_helper::ctrl_thread_loop, this));
boost::this_thread::sleep(gruel::get_new_timeout(0.05)); //50ms
- d_ctrl_transport->stop();
+ ctrl_thread->interrupt();
+ ctrl_thread->join();
return d_result;
}
+ data_handler::result operator()(const void *base, size_t len){
+ //copy the packet into an reply structure
+ op_id_reply_t op_id_reply;
+ memset(&op_id_reply, 0, sizeof(op_id_reply_t));
+ memcpy(&op_id_reply, base, std::min(sizeof(op_id_reply_t), len));
+
+ //inspect the reply packet and store into result
+ if (op_id_reply.opcode != OP_ID_REPLY) // ignore
+ return data_handler::DONE;
+ props p = reply_to_props(&op_id_reply);
+ if (FIND_DEBUG)
+ std::cerr << "usrp2::find: response from " << p.addr <<
std::endl;
+ if ((d_target_addr == "") || (d_target_addr == p.addr))
+ d_result.push_back(p);
+ return data_handler::DONE;
+ }
+
+ void ctrl_thread_loop(void){
+ while(true){
+ d_ctrl_transport->recv(this);
+ boost::this_thread::interruption_point();
+ }
+ }
+
private:
static props
reply_to_props(const op_id_reply_t *r)
@@ -85,25 +109,6 @@ namespace usrp2{
memcpy(p.sw_md5sum, r->sw_md5sum, sizeof(p.sw_md5sum));
return p;
}
-
- void handle_control_packet(const transport::sbuff_vec_t &sbs){
- for (size_t i = 0; i < sbs.size(); i++){
-
- //copy the packet into an reply structure
- op_id_reply_t op_id_reply;
- memset(&op_id_reply, 0, sizeof(op_id_reply_t));
- memcpy(&op_id_reply, sbs[i]->buff(),
std::min(sizeof(op_id_reply_t), sbs[i]->len()));
-
- //inspect the reply packet and store into result
- if (op_id_reply.opcode != OP_ID_REPLY) // ignore
- continue;
- props p = reply_to_props(&op_id_reply);
- if (FIND_DEBUG)
- std::cerr << "usrp2::find: response from " << p.addr <<
std::endl;
- if ((d_target_addr == "") || (d_target_addr == p.addr))
- d_result.push_back(p);
- }
- }
};
props_vector_t
diff --git a/usrp2/host/lib/ring.cc b/usrp2/host/lib/ring.cc
index 2578e6a..9765497 100644
--- a/usrp2/host/lib/ring.cc
+++ b/usrp2/host/lib/ring.cc
@@ -30,15 +30,23 @@ namespace usrp2 {
ring::ring(unsigned int entries)
: d_max(entries), d_read_ind(0), d_write_ind(0),
d_ring(entries),
- d_mutex(), d_not_empty()
+ d_mutex(), d_empty_cond()
{/*NOP*/}
void
+ ring::wait_for_empty()
+ {
+ gruel::scoped_lock l(d_mutex);
+ while (not empty())
+ d_empty_cond.wait(l);
+ }
+
+ void
ring::wait_for_not_empty()
{
gruel::scoped_lock l(d_mutex);
while (empty())
- d_not_empty.wait(l);
+ d_not_empty_cond.wait(l);
}
bool
@@ -51,7 +59,7 @@ namespace usrp2 {
d_ring[d_write_ind] = rd;
inc_write_ind();
- d_not_empty.notify_one();
+ d_not_empty_cond.notify_one();
return true;
}
@@ -65,6 +73,7 @@ namespace usrp2 {
rd = d_ring[d_read_ind];
inc_read_ind();
+ d_empty_cond.notify_one();
return true;
}
diff --git a/usrp2/host/lib/ring.h b/usrp2/host/lib/ring.h
index 0be835b..2547a7e 100644
--- a/usrp2/host/lib/ring.h
+++ b/usrp2/host/lib/ring.h
@@ -26,7 +26,6 @@
#include <boost/shared_ptr.hpp>
#include <gruel/thread.h>
#include <vrt/expanded_header.h>
-#include "sbuff.h"
namespace usrp2 {
@@ -34,10 +33,8 @@ namespace usrp2 {
class ring_data
{
public:
- sbuff::sptr sb;
- vrt::expanded_header hdr;
- const uint32_t *payload;
- size_t n32_bit_words_payload;
+ void *base;
+ size_t len;
};
class ring
@@ -54,7 +51,8 @@ namespace usrp2 {
std::vector<ring_data> d_ring;
gruel::mutex d_mutex;
- gruel::condition_variable d_not_empty;
+ gruel::condition_variable d_empty_cond;
+ gruel::condition_variable d_not_empty_cond;
void inc_read_ind()
{
@@ -79,6 +77,7 @@ namespace usrp2 {
ring(unsigned int entries);
+ void wait_for_empty();
void wait_for_not_empty();
bool enqueue(const ring_data &rd);
diff --git a/usrp2/host/lib/sbuff.h b/usrp2/host/lib/sbuff.h
deleted file mode 100644
index caef07f..0000000
--- a/usrp2/host/lib/sbuff.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2009 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#ifndef INCLUDED_USRP2_SBUFF_H
-#define INCLUDED_USRP2_SBUFF_H
-
-#include <boost/bind.hpp>
-#include <boost/shared_ptr.hpp>
-#include <cstdio>
-
-#define USRP2_IMPL_DEBUG 0
-#if USRP2_IMPL_DEBUG
-#define DEBUG_LOG(x) ::write(2, x, 1)
-#else
-#define DEBUG_LOG(x)
-#endif
-
-namespace usrp2 {
-
- /*******************************************************************
- * This smart buffer class holds a buffer and its length in bytes
- * A special callback can be passed into the sbuff as well.
- * The callback (if set) will be called on deconstruction.
- *
- * Typically, the callback will free the memory held by the buffer.
- * But this is all up to the creator of the sbuff.
- *******************************************************************/
- class sbuff{
- public:
- //typedef for void no argument function
- typedef boost::function<void()> cb_t;
- typedef boost::shared_ptr<sbuff> sptr;
- private:
- void *d_buff;
- size_t d_len;
- cb_t d_cb;
- public:
- static sptr make(void *buff, size_t len, cb_t cb){
- return sptr(new sbuff(buff, len, cb));
- }
- static sptr make(void *buff, size_t len){
- return sptr(new sbuff(buff, len, NULL));
- }
- static sptr make(){
- return sptr(new sbuff(NULL, 0, NULL));
- }
- sbuff(void *buff, size_t len, cb_t cb)
- : d_buff(buff), d_len(len), d_cb(cb){}
- ~sbuff(){done();}
- //access methods
- void *buff(){return d_buff;}
- size_t len(){return d_len;}
- /*!
- * \brief mark this sbuff as done
- * This method allows one to explicitly tell the sbuff that its no
longer needed.
- * Doing so will make the callback (if set) and zero out the other
data.
- *
- * Although this method will be called automatically when the sptr
calls delete,
- * it is useful for the fast-path to have the ability to call done
explicitly.
- */
- void done(){
- if (d_cb) d_cb();
- d_buff = NULL;
- d_len = 0;
- d_cb = NULL;
- }
-
- };
-
-} // namespace usrp2
-
-
-#endif /* INCLUDED_USRP2_SBUFF_H */
diff --git a/usrp2/host/lib/transport.cc b/usrp2/host/lib/transport.cc
deleted file mode 100644
index 028f892..0000000
--- a/usrp2/host/lib/transport.cc
+++ /dev/null
@@ -1,69 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2009 Free Software Foundation, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "transport.h"
-#include <stdexcept>
-#include <cstdio>
-
-usrp2::transport::transport(const std::string &type_str){
- d_cb = NULL;
- d_type_str = type_str;
- d_running = false;
-}
-
-usrp2::transport::~transport(){
- if (d_running) stop();
-}
-
-void usrp2::transport::start(){
- if (not d_cb){
- throw std::runtime_error("usrp2::transport for " + d_type_str + " has
no callback\n");
- }
- if (d_running){
- throw std::runtime_error("usrp2::transport for " + d_type_str + "
already started\n");
- }
- d_running = true;
- d_thread = new boost::thread(boost::bind(&usrp2::transport::run, this));
-}
-
-void usrp2::transport::stop(){
- if (not d_running){
- throw std::runtime_error("usrp2::transport for " + d_type_str + "
already stopped\n");
- }
- d_running = false;
- d_thread->interrupt();
- d_thread->join();
-}
-
-void usrp2::transport::run(){
- init();
- while (d_running){
- try{
- // call recv to get a new sbuffer
- // pass the buffer into the callback
- sbuff_vec_t sbs = recv();
- if (sbs.size()) d_cb(sbs);
- //catch thread interrupts, possibly from stop
- //the running condition will be re-checked
- }catch(boost::thread_interrupted const &){}
- }
-}
diff --git a/usrp2/host/lib/transport.h b/usrp2/host/lib/transport.h
index 30a7bfb..902fe50 100644
--- a/usrp2/host/lib/transport.h
+++ b/usrp2/host/lib/transport.h
@@ -19,71 +19,51 @@
#ifndef INCLUDED_TRANSPORT_H
#define INCLUDED_TRANSPORT_H
-#include <boost/thread.hpp>
-#include <cstring>
+#include <string>
#include <sys/uio.h>
-#include <vector>
-#include "sbuff.h"
+#include <usrp2/data_handler.h>
+#include <boost/shared_ptr.hpp>
+
+#define USRP2_IMPL_DEBUG 0
+#if USRP2_IMPL_DEBUG
+#define DEBUG_LOG(x) ::write(2, x, 1)
+#else
+#define DEBUG_LOG(x)
+#endif
namespace usrp2 {
class transport {
public:
- typedef std::vector<sbuff::sptr> sbuff_vec_t;
- typedef boost::function<void(const sbuff_vec_t &)> callback_t;
typedef boost::shared_ptr<transport> sptr;
+
private:
std::string d_type_str;
- volatile bool d_running;
- boost::thread *d_thread;
- callback_t d_cb;
- void run();
public:
/*!
* \brief create a new transport
* The callback takes a void * pointer and a length in bytes.
* \param type_str a descriptive string
*/
- transport(const std::string &type_str);
- virtual ~transport();
- /*!
- * \brief Set the callback
- * \param cb the callback created by boost::bind
- */
- void set_callback(callback_t cb){d_cb=cb;}
- /*!
- * \brief create a new thread for receiving
- */
- void start();
- /*!
- * \brief stop and join the current thread
- */
- void stop();
- /*!
- * \brief get the maximum number of buffs (override in a subclass)
- * This number is the maximum number of buffers that recv can return at
once.
- * This number should be based upon the limitations of the internals of a
subclass.
- * Ex: for an ethernet packet ring, max buffs will be the max ring size.
- * \return the number of buffs or 0 for undefined
- */
- virtual size_t max_buffs(){return 0;}
- /*!
- * \brief called from thread on init (override in a subclass)
- * Purpose: to have a thread initialization hook.
- */
- virtual void init(){/*NOP*/}
+ transport(const std::string &type_str){d_type_str=type_str;}
/*!
* \brief send the contents of the buffer (override in a subclass)
* \param iovec a list of iovecs
* \param iovlen the number of iovecs
* \return true for completion, false for error
*/
- virtual bool sendv(const iovec *iov, size_t iovlen){return false;}
+ virtual bool sendv(const iovec *iov, size_t iovlen) = 0;
+ /*!
+ * \brief receive data and pass it to the handler
+ * \param handler the data handler callable object
+ * The handler will be called on the recieved data.
+ * If the handler returns done, recv must exit.
+ */
+ virtual void recv(data_handler *handler) = 0;
/*!
- * \brief receive data, possibly multiple buffers (override in a subclass)
- * \return a new vector of sbuffs, an empty vector is no data
+ * \brief flush any samples in the rx buffers
*/
- virtual sbuff_vec_t recv(){return sbuff_vec_t();}
+ virtual void flush(void){};
};
} // namespace usrp2
diff --git a/usrp2/host/lib/usrp2_impl.cc b/usrp2/host/lib/usrp2_impl.cc
index 23f0841..155804a 100644
--- a/usrp2/host/lib/usrp2_impl.cc
+++ b/usrp2/host/lib/usrp2_impl.cc
@@ -27,12 +27,9 @@
#include <gruel/realtime.h>
#include <gruel/sys_pri.h>
#include <usrp2_types.h>
+#include <usrp2/rx_sample_handler.h>
#include "usrp2_impl.h"
-#include "eth_buffer.h"
-#include "ethernet.h"
-#include "pktfilter.h"
#include "control.h"
-#include "ring.h"
#include <stdexcept>
#include <iostream>
#include <stdio.h>
@@ -82,24 +79,94 @@ namespace usrp2 {
}
}
+ /*********************************************************************
+ * control packet handler for control packets
+ * creates a data handler with access to the pending replies
+ * the operator() is called by the transport for each packet
+ ********************************************************************/
+ class ctrl_packet_handler : public data_handler{
+ private:
+ pending_reply **d_pending_replies;
+
+ public:
+ ctrl_packet_handler(pending_reply **pending_replies):
d_pending_replies(pending_replies){}
+
+ data_handler::result operator()(const void *base, size_t len){
+ // point to beginning of payload (subpackets)
+ unsigned char *p = (unsigned char *)base;
+
+ // FIXME iterate over payload, handling more than a single subpacket.
+
+ int opcode = p[0];
+ unsigned int oplen = p[1];
+ unsigned int rid = p[2];
+
+ pending_reply *rp = d_pending_replies[rid];
+ if (rp) {
+ unsigned int buflen = rp->len();
+ if (oplen != buflen) {
+ std::cerr << "usrp2: mismatched command reply length (expected: "
+ << buflen << " got: " << oplen << "). "
+ << "op = " << opcode_to_string(opcode) << std::endl;
+ }
+
+ // Copy reply into caller's buffer
+ memcpy(rp->buffer(), p, std::min(oplen, buflen));
+ rp->notify_completion();
+ d_pending_replies[rid] = NULL;
+ return data_handler::DONE;
+ }
+
+ // TODO: handle unsolicited, USRP2 initiated, or late replies
+ DEBUG_LOG("l");
+ return data_handler::DONE;
+ }
+ };
+
+ /*********************************************************************
+ * data packet handler for incoming samples
+ * creates a data handler with rx sample handler
+ * the operator() is called by the transport for each packet
+ ********************************************************************/
+ class data_packet_handler : public data_handler{
+ private:
+ rx_sample_handler *d_handler;
+
+ public:
+ data_packet_handler(rx_sample_handler *handler): d_handler(handler){}
+
+ data_handler::result operator()(const void *base, size_t len){
+ vrt::expanded_header hdr;
+ const uint32_t *payload;
+ size_t n32_bit_words_payload;
+ //parse the vrt header and store into the ring data structure
+ if (not vrt::expanded_header::parse(
+ (const uint32_t*)base,len/sizeof(uint32_t), //in
+ &hdr, &payload, &n32_bit_words_payload) //out
+ or not hdr.stream_id_p()
+ ){
+ printf("Bad vrt header 0x%.8x, Packet len %d\n", hdr.header,
(int)len);
+ DEBUG_LOG("!");
+ return data_handler::RELEASE;
+ }
+ rx_metadata md;
+ md.timestamp = hdr.fractional_secs; //FIXME temporary until we figure
out new md for vrt
+ bool want_more = (*d_handler)(payload, n32_bit_words_payload, &md);
+ DEBUG_LOG("-");
+
+ return want_more? data_handler::RELEASE : data_handler::DONE;
+ }
+ };
+
usrp2::impl::impl(transport::sptr data_transport, transport::sptr
ctrl_transport) :
d_next_rid(0),
- d_num_enqueued(0),
- d_enqueued_mutex(),
- d_data_pending_cond(),
- d_channel_rings(NCHANS),
d_tx_interp(0),
d_rx_decim(0),
- d_dont_enqueue(true),
d_ctrl_transport(ctrl_transport),
d_data_transport(data_transport)
{
-
d_ctrl_transport->set_callback(boost::bind(&usrp2::impl::handle_control_packet,
this, _1));
- d_ctrl_transport->start();
-
-
d_data_transport->set_callback(boost::bind(&usrp2::impl::handle_data_packet,
this, _1));
- d_data_transport->start();
-
+ d_ctrl_thread_running = true;
+ d_ctrl_thread = new
boost::thread(boost::bind(&usrp2::impl::ctrl_thread_loop, this));
memset(d_pending_replies, 0, sizeof(d_pending_replies));
// In case the USRP2 was left streaming RX
@@ -154,8 +221,11 @@ namespace usrp2 {
usrp2::impl::~impl()
{
- d_ctrl_transport->stop();
- d_data_transport->stop();
+ //stop the control thread
+ d_ctrl_thread_running = false;
+ d_ctrl_thread->interrupt();
+ d_ctrl_thread->join();
+
}
void
@@ -205,90 +275,16 @@ namespace usrp2 {
return res == 1;
}
- void
- usrp2::impl::handle_control_packet(const transport::sbuff_vec_t &sbs)
- {
- for (size_t i = 0; i < sbs.size(); i++) {
- sbuff::sptr sb = sbs[i];
-
- // point to beginning of payload (subpackets)
- unsigned char *p = (unsigned char *)sb->buff();
-
- // FIXME (p % 4) == 2. Not good. Must watch for unaligned loads.
-
- // FIXME iterate over payload, handling more than a single subpacket.
-
- int opcode = p[0];
- unsigned int oplen = p[1];
- unsigned int rid = p[2];
-
- pending_reply *rp = d_pending_replies[rid];
- if (rp) {
- unsigned int buflen = rp->len();
- if (oplen != buflen) {
- std::cerr << "usrp2: mismatched command reply length (expected: "
- << buflen << " got: " << oplen << "). "
- << "op = " << opcode_to_string(opcode) << std::endl;
- }
-
- // Copy reply into caller's buffer
- memcpy(rp->buffer(), p, std::min(oplen, buflen));
- rp->notify_completion();
- d_pending_replies[rid] = 0;
- return;
- }
-
- // TODO: handle unsolicited, USRP2 initiated, or late replies
- DEBUG_LOG("l");
- }
- }
-
- void
- usrp2::impl::handle_data_packet(const transport::sbuff_vec_t &sbs)
+ void
+ usrp2::impl::ctrl_thread_loop(void)
{
- if (d_dont_enqueue) return; //FIXME call done, or let the sptrs do it?
-
- // Try to parse each packet and enqueue the data into the channel ring.
- // Bad packets will be ignored and their data freed by done().
- for (size_t i = 0; i < sbs.size(); i++) {
- ring_data rd; rd.sb = sbs[i];
-
- //parse the vrt header and store into the ring data structure
- if (not vrt::expanded_header::parse(
- (const uint32_t*)rd.sb->buff(), rd.sb->len()/sizeof(uint32_t), //in
- &rd.hdr, &rd.payload, &rd.n32_bit_words_payload) //out
- or not rd.hdr.stream_id_p()
- ){
- printf("Bad vrt header 0x%.8x, Packet len %d\n", rd.hdr.header,
(int)rd.sb->len());
- DEBUG_LOG("!");
- rd.sb->done(); //mark done, this sbuff is no longer needed
- continue;
- }
-
- //try to enqueue the data into the ring
- gruel::scoped_lock l(d_channel_rings_mutex);
- unsigned int chan = rd.hdr.stream_id;
- if (d_channel_rings[chan] and d_channel_rings[chan]->enqueue(rd)) {
- inc_enqueued();
- DEBUG_LOG("+");
- } else {
- DEBUG_LOG("!");
- rd.sb->done(); //mark done, this sbuff is no longer needed
- continue;
- }
- }
-
- // Wait for user API thread(s) to process all enqueued packets.
- // The channel ring thread that decrements d_num_enqueued to zero
- // will signal this thread to continue.
- if (d_num_enqueued > 0){
- gruel::scoped_lock l(d_enqueued_mutex);
- while(d_num_enqueued > 0)
- d_data_pending_cond.wait(l);
+ data_handler *handler = new ctrl_packet_handler(d_pending_replies);
+ while (d_ctrl_thread_running){
+ d_ctrl_transport->recv(handler);
}
+ delete handler;
}
-
// ----------------------------------------------------------------
// Receive
// ----------------------------------------------------------------
@@ -429,14 +425,9 @@ namespace usrp2 {
return false;
}
- {
- gruel::scoped_lock l(d_channel_rings_mutex);
- if (d_channel_rings[channel]) {
- std::cerr << "usrp2: channel " << channel
- << " already streaming" << std::endl;
- return false;
- }
-
+ //flush any old samples in the data transport
+ d_data_transport->flush();
+
if (items_per_frame == 0)
items_per_frame = U2_MAX_SAMPLES; // minimize overhead
@@ -451,20 +442,14 @@ namespace usrp2 {
cmd.eop.opcode = OP_EOP;
cmd.eop.len = sizeof(cmd.eop);
- d_dont_enqueue = false;
bool success = false;
pending_reply p(cmd.op.rid, &reply, sizeof(reply));
success = transmit_cmd_and_wait(&cmd, sizeof(cmd), &p, DEF_CMD_TIMEOUT);
success = success && (ntohx(reply.ok) == 1);
-
- if (success)
- d_channel_rings[channel] = ring::sptr(new
ring(d_data_transport->max_buffs()));
- else
- d_dont_enqueue = true;
//fprintf(stderr, "usrp2::start_rx_streaming: success = %d\n", success);
return success;
- }
+
}
bool
@@ -482,14 +467,10 @@ namespace usrp2 {
return false;
}
- d_dont_enqueue = true; // no new samples
- flush_rx_samples(channel); // dump any we may already have
-
op_stop_rx_cmd cmd;
op_generic_t reply;
{
- gruel::scoped_lock l(d_channel_rings_mutex);
memset(&cmd, 0, sizeof(cmd));
cmd.op.opcode = OP_STOP_RX;
@@ -502,7 +483,6 @@ namespace usrp2 {
pending_reply p(cmd.op.rid, &reply, sizeof(reply));
success = transmit_cmd_and_wait(&cmd, sizeof(cmd), &p, DEF_CMD_TIMEOUT);
success = success && (ntohx(reply.ok) == 1);
- d_channel_rings[channel].reset();
//fprintf(stderr, "usrp2::stop_rx_streaming: success = %d\n", success);
return success;
}
@@ -511,72 +491,9 @@ namespace usrp2 {
bool
usrp2::impl::rx_samples(unsigned int channel, rx_sample_handler *handler)
{
- if (channel > MAX_CHAN) {
- std::cerr << "usrp2: invalid channel (" << channel
- << " )" << std::endl;
- return false;
- }
-
- if (channel > 0) {
- std::cerr << "usrp2: channel " << channel
- << " not implemented" << std::endl;
- return false;
- }
-
- ring::sptr rp = d_channel_rings[channel];
- if (!rp){
- std::cerr << "usrp2: channel " << channel
- << " not receiving" << std::endl;
- return false;
- }
-
- // Wait for frames available in channel ring
- DEBUG_LOG("W");
- rp->wait_for_not_empty();
- DEBUG_LOG("s");
-
- // Iterate through frames and present to user
- ring_data rd;
- while (rp->dequeue(rd)) {
- rx_metadata md;
- md.timestamp = rd.hdr.fractional_secs; //FIXME temporary until we figure
out new md for vrt
- bool want_more = (*handler)(rd.payload, rd.n32_bit_words_payload, &md);
- DEBUG_LOG("-");
- rd.sb->done(); //mark done, this sbuff is no longer needed
- dec_enqueued();
-
- if (!want_more)
- break;
- }
- return true;
- }
-
- bool
- usrp2::impl::flush_rx_samples(unsigned int channel)
- {
- if (channel > MAX_CHAN) {
- std::cerr << "usrp2: invalid channel (" << channel
- << " )" << std::endl;
- return false;
- }
-
- if (channel > 0) {
- std::cerr << "usrp2: channel " << channel
- << " not implemented" << std::endl;
- return false;
- }
-
- ring::sptr rp = d_channel_rings[channel];
- if (!rp){
- return false;
- }
-
- // Iterate through frames and drop them
- ring_data rd;
- while (rp->dequeue(rd)) {
- rd.sb->done(); //mark done, this sbuff is no longer needed
- dec_enqueued();
- }
+ data_handler *pkt_handler = new data_packet_handler(handler);
+ d_data_transport->recv(pkt_handler);
+ delete pkt_handler;
return true;
}
@@ -813,7 +730,7 @@ namespace usrp2 {
size_t i = std::min((size_t) U2_MAX_SAMPLES, nitems - n);
- eth_iovec iov[2];
+ iovec iov[2];
iov[0].iov_base = &fixed_hdr;
iov[0].iov_len = sizeof(fixed_hdr);
iov[1].iov_base = const_cast<uint32_t *>(&items[n]);
diff --git a/usrp2/host/lib/usrp2_impl.h b/usrp2/host/lib/usrp2_impl.h
index 4638a5a..4958298 100644
--- a/usrp2/host/lib/usrp2_impl.h
+++ b/usrp2/host/lib/usrp2_impl.h
@@ -53,48 +53,28 @@ namespace usrp2 {
class usrp2::impl
{
static const size_t NRIDS = 256;
- static const size_t NCHANS = 32;
int d_next_rid;
- unsigned int d_num_enqueued;
- gruel::mutex d_enqueued_mutex;
- gruel::condition_variable d_data_pending_cond;
-
// all pending_replies are stack allocated, thus no possibility of leaking
these
pending_reply *d_pending_replies[NRIDS]; // indexed by 8-bit reply id
- std::vector<ring::sptr> d_channel_rings; // indexed by 5-bit channel
number
- gruel::mutex d_channel_rings_mutex;
-
db_info d_tx_db_info;
db_info d_rx_db_info;
int d_tx_interp; // shadow tx interp
int d_rx_decim; // shadow rx decim
- bool d_dont_enqueue;
-
- void inc_enqueued() {
- gruel::scoped_lock l(d_enqueued_mutex);
- d_num_enqueued++;
- }
-
- void dec_enqueued() {
- gruel::scoped_lock l(d_enqueued_mutex);
- if (--d_num_enqueued == 0)
- d_data_pending_cond.notify_one();
- }
-
void init_config_rx_v2_cmd(op_config_rx_v2_cmd *cmd);
void init_config_tx_v2_cmd(op_config_tx_v2_cmd *cmd);
bool transmit_cmd_and_wait(void *cmd, size_t len, pending_reply *p, double
secs=0.0);
bool transmit_cmd(void *cmd, size_t len);
- void handle_control_packet(const transport::sbuff_vec_t &sbs);
- void handle_data_packet(const transport::sbuff_vec_t &sbs);
bool dboard_info();
bool reset_db();
+ void ctrl_thread_loop(void);
+ boost::thread *d_ctrl_thread;
+ bool d_ctrl_thread_running;
transport::sptr d_ctrl_transport;
transport::sptr d_data_transport;
@@ -122,7 +102,6 @@ namespace usrp2 {
bool read_gpio(int bank, uint16_t *value);
bool start_rx_streaming(unsigned int channel, unsigned int
items_per_frame);
bool rx_samples(unsigned int channel, rx_sample_handler *handler);
- bool flush_rx_samples(unsigned int channel);
bool stop_rx_streaming(unsigned int channel);
// Tx
- [Commit-gnuradio] [gnuradio] 21/148: put 64 bit timer for vita49 on the settings bus, (continued)
- [Commit-gnuradio] [gnuradio] 21/148: put 64 bit timer for vita49 on the settings bus, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 47/148: Handled the case of short packets in eth data transport by using padding., git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 70/148: changed debug pins to see incoming data, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 51/148: progress on vita_tx. it compiles now, need to work on vita_tx_control., git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 46/148: Created and used a typedef for a vector of sbuffs. Changed the return type for the transport sendv to bool. Not all transports can return the number of bytes sent, and we only care if the transport succeeded or not. This fixes an issue of the usrp2 impl freezing on close after tx, because the return value from sednv was improperly handled., git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 66/148: Merge branch 'vita_rx' of http://gnuradio.org/git/matt into wip/usrp2, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 79/148: cleaned up the main ibs state machine, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 71/148: Merge branch 'vita_rx' of gnuradio.org:matt into vita_rx, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 53/148: very basic packet sending works, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 77/148: Merge branch 'master' of gnuradio.org:gnuradio into vita_rx, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 57/148: Removed the ring buffer from the usrp2 impl.,
git <=
- [Commit-gnuradio] [gnuradio] 67/148: reorder the memory map, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 55/148: First cut at vita tx, whole thing compiles, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 69/148: fixed memory map for split tx dsp and control, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 62/148: Add ability to clear state out when there is an underrun, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 65/148: Merge branch 'vita_rx' of gnuradio.org:matt into vita_rx, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 63/148: only pull from input fifo when really consuming or pushing into the next fifo, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 68/148: Merge branch 'vita_rx' of gnuradio.org:matt into vita_rx, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 103/148: 19-bit fifo handling for receive side of eth/udp system, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 86/148: Removed the channel ring from the eth transport. Made the eth transport a data handler to pass directly to the eth buffer, and to strip ethernet headers/transport information., git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 58/148: Merge branch 'vita_rx' of http://gnuradio.org/git/matt into wip/usrp2, git, 2016/08/14