commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 17/28: core: squashed in-place block work


From: git
Subject: [Commit-gnuradio] [gnuradio] 17/28: core: squashed in-place block work
Date: Mon, 15 Aug 2016 00:47:06 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

nwest pushed a commit to annotated tag gr_basic_work
in repository gnuradio.

commit 4e4cd2ce55d95ba08da91d86652ddba34694170e
Author: Josh Blum <address@hidden>
Date:   Sat Nov 12 12:45:14 2011 -0800

    core: squashed in-place block work
---
 gnuradio-core/src/lib/general/gr_throttle.cc       |  5 +-
 gnuradio-core/src/lib/runtime/gr_block.cc          |  1 +
 gnuradio-core/src/lib/runtime/gr_block.h           |  9 +++
 gnuradio-core/src/lib/runtime/gr_buffer.cc         | 16 ++++++
 gnuradio-core/src/lib/runtime/gr_buffer.h          | 18 ++++++
 gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc | 67 ++++++++++++++++++++++
 gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h  |  2 +
 gnuradio-core/src/lib/runtime/gr_tpb_detail.cc     |  5 +-
 8 files changed, 120 insertions(+), 3 deletions(-)

diff --git a/gnuradio-core/src/lib/general/gr_throttle.cc 
b/gnuradio-core/src/lib/general/gr_throttle.cc
index a24b1da..f9406c2 100644
--- a/gnuradio-core/src/lib/general/gr_throttle.cc
+++ b/gnuradio-core/src/lib/general/gr_throttle.cc
@@ -37,7 +37,7 @@ public:
             gr_make_io_signature(1, 1, itemsize)),
         d_itemsize(itemsize)
     {
-        /* NOP */
+        this->set_inplace(true);
     }
 
     void set_sample_rate(double rate){
@@ -68,7 +68,8 @@ public:
         //copy all samples output[i] <= input[i]
         const char *in = (const char *) input_items[0];
         char *out = (char *) output_items[0];
-        std::memcpy(out, in, noutput_items * d_itemsize);
+        //only memcpy when the scheduler doesnt simplify to in-place
+        if (in != out) std::memcpy(out, in, noutput_items * d_itemsize);
         d_total_samples += noutput_items;
         return noutput_items;
     }
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc 
b/gnuradio-core/src/lib/runtime/gr_block.cc
index 9463869..daefb55 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -35,6 +35,7 @@ gr_block::gr_block (const std::string &name,
   : gr_basic_block(name, input_signature, output_signature),
     d_output_multiple (1),
     d_relative_rate (1.0),
+    d_inplace(false),
     d_history(1),
     d_fixed_rate(false),
     d_tag_propagation_policy(TPP_ALL_TO_ALL)
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h 
b/gnuradio-core/src/lib/runtime/gr_block.h
index 86e0583..91a0c62 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -74,6 +74,14 @@ class GR_CORE_API gr_block : public gr_basic_block {
   virtual ~gr_block ();
 
   /*!
+   * Enable/disable inplace processing on this block.
+   * This does not guarantee that input memory is output memory.
+   * That is an important decision for the scheduler to make.
+   */
+  bool inplace () const { return d_inplace; }
+  void  set_inplace (bool inplace) { d_inplace = inplace; }
+
+  /*!
    * Assume block computes y_i = f(x_i, x_i-1, x_i-2, x_i-3...)
    * History is the number of x_i's that are examined to produce one y_i.
    * This comes in handy for FIR filters, where we use history to
@@ -233,6 +241,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
   int                   d_output_multiple;
   double                d_relative_rate;       // approx output_rate / 
input_rate
   gr_block_detail_sptr d_detail;               // implementation details
+  bool                  d_inplace;
   unsigned              d_history;
   bool                  d_fixed_rate;
   tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags 
downstream
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.cc 
b/gnuradio-core/src/lib/runtime/gr_buffer.cc
index 8ccc9db..6c37de9 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.cc
@@ -88,6 +88,7 @@ gr_buffer::gr_buffer (int nitems, size_t sizeof_item, 
gr_block_sptr link)
     throw std::bad_alloc ();
 
   s_buffer_count++;
+  d_writers.push_back(d_link);
 }
 
 gr_buffer_sptr 
@@ -146,6 +147,14 @@ gr_buffer::allocate_buffer (int nitems, size_t sizeof_item)
   return true;
 }
 
+void gr_buffer::replace(gr_buffer_sptr master){
+    d_bufsize = master->d_bufsize;
+    d_base = master->d_base;
+    for (size_t i = 0; i < d_readers.size(); i++){
+        master->d_inplace_readers.push_back(d_readers[i]);
+    }
+    d_writers.push_back(master->link());
+}
 
 int
 gr_buffer::space_available ()
@@ -164,6 +173,13 @@ gr_buffer::space_available ()
       min_items_read = std::min(min_items_read, d_readers[i]->nitems_read());
     }
 
+    //and same process but compare this buffer's write index with downstream 
in-place readers index
+    for (size_t i = 0; i < d_inplace_readers.size(); i++){
+        gruel::scoped_lock guard(*d_inplace_readers[i]->mutex());
+        const int items_available = this->index_sub (d_write_index, 
d_inplace_readers[i]->d_read_index);
+        most_data = std::max(most_data, items_available);
+    }
+
     if(min_items_read != d_last_min_items_read) {
       prune_tags(d_last_min_items_read);
       d_last_min_items_read = min_items_read;
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.h 
b/gnuradio-core/src/lib/runtime/gr_buffer.h
index e8e3937..16b9180 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.h
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.h
@@ -56,6 +56,12 @@ class GR_CORE_API gr_buffer {
   virtual ~gr_buffer ();
 
   /*!
+   * Replace the memory and size in this block.
+   * Used for in-place operator blocks.
+   */
+  void replace(gr_buffer_sptr master);
+
+  /*!
    * \brief return number of items worth of space available for writing
    */
   int space_available ();
@@ -86,6 +92,16 @@ class GR_CORE_API gr_buffer {
    */
   gr_block_sptr link() { return gr_block_sptr(d_link); }
 
+  typedef std::vector<boost::weak_ptr<gr_block> > writers_type;
+
+  /*!
+   * \brief Return a list of blocks that write to this resource
+   * This routine was added by the inplace work to tbp could
+   * notify all upstream parties after a read has occurred.
+   * Basically this will return this->link() + the in-place master.
+   */
+  const writers_type &writers(void) const { return d_writers; }
+
   size_t nreaders() const { return d_readers.size(); }
   gr_buffer_reader* reader(size_t index) { return d_readers[index]; }
 
@@ -126,7 +142,9 @@ class GR_CORE_API gr_buffer {
   gr_vmcircbuf                        *d_vmcircbuf;
   size_t                               d_sizeof_item;  // in bytes
   std::vector<gr_buffer_reader *>      d_readers;
+  std::vector<gr_buffer_reader *>      d_inplace_readers; //readers that share 
this buffer resource
   boost::weak_ptr<gr_block>            d_link;         // block that writes to 
this buffer
+  writers_type                         d_writers; // blocks that write to this 
buffer
 
   //
   // The mutex protects d_write_index, d_abs_write_offset, d_done, d_item_tags 
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc 
b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
index 5d1057e..377b3b1 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
@@ -28,10 +28,12 @@
 #include <gr_block_detail.h>
 #include <gr_io_signature.h>
 #include <gr_buffer.h>
+#include <boost/format.hpp>
 #include <iostream>
 #include <map>
 
 #define GR_FLAT_FLOWGRAPH_DEBUG 0
+#define GR_FLAT_FLOWGRAPH_INPLACE_DEBUG 1
 
 // 32Kbyte buffer size between blocks
 #define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
@@ -64,6 +66,10 @@ gr_flat_flowgraph::setup_connections()
   // Connect inputs to outputs for each block
   for(gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
     connect_block_inputs(*p);
+
+  // Buffer re-use for in-place blocks
+  for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
+    simplify_inplace(*p);
 }
 
 gr_block_detail_sptr
@@ -151,6 +157,67 @@ 
gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
   }
 }
 
+static bool is_inplace(gr_basic_block_sptr block){
+    return cast_to_block_sptr(block)->inplace() and 
cast_to_block_sptr(block)->fixed_rate();
+}
+
+void gr_flat_flowgraph::simplify_inplace(gr_basic_block_sptr block){
+
+    if (not is_inplace(block)) return;
+    typedef std::multimap<int, gr_endpoint> mastermap_type;
+    mastermap_type upstream_masters;
+
+    //get a map of io sizes to upstream masters
+    for (size_t i = 0; i < calc_used_ports(block, true).size(); i++){
+        const int in_size = block->input_signature()->sizeof_stream_item(i);
+        const gr_endpoint master_ep = 
get_inplace_master(calc_upstream_edge(block, i).src());
+        if (master_ep.block().get() != NULL) 
upstream_masters.insert(std::make_pair(in_size, master_ep));
+    }
+
+    //replace downstream buffers with upstream buffers
+    for (size_t i = 0; i < calc_used_ports(block, false).size(); i++){
+
+        //search for a itemsize match in the map
+        const int out_size = block->output_signature()->sizeof_stream_item(i);
+        mastermap_type::iterator it = upstream_masters.find(out_size);
+        if (it == upstream_masters.end()) continue;
+
+        //extract the endpoint and erase it from map
+        const gr_endpoint master_ep = (*it).second;
+        upstream_masters.erase(it);
+
+        //in-place the buffer by replacing its guts
+        gr_buffer_sptr master = 
cast_to_block_sptr(master_ep.block())->detail()->output(master_ep.port());
+        cast_to_block_sptr(block)->detail()->output(i)->replace(master);
+        if (GR_FLAT_FLOWGRAPH_INPLACE_DEBUG) std::cout <<
+            boost::format("replaced %s out[%d] with %s out[%d]")
+            % block % i % master_ep.block() % master_ep.port()
+        << std::endl;
+    }
+}
+
+gr_endpoint gr_flat_flowgraph::get_inplace_master(gr_endpoint src){
+
+    //this endpoint must have only one destination, return null
+    if (calc_downstream_blocks(src.block(), src.port()).size() != 1) return 
gr_endpoint();
+
+    //if this block is not in-place, end here, otherwise search upstream
+    if (!is_inplace(src.block())) return src;
+
+    const size_t out_size = 
src.block()->output_signature()->sizeof_stream_item(src.port());
+    for (size_t i = 0; i < calc_used_ports(src.block(), true).size(); i++){
+        if (out_size != 
size_t(src.block()->input_signature()->sizeof_stream_item(i))) continue;
+
+        //when io size matches, look upstream for an inplace master
+        gr_endpoint master_ep = 
get_inplace_master(calc_upstream_edge(src.block(), i).src());
+        if (master_ep.block().get() == NULL) continue;
+        return master_ep;
+    }
+
+    //didn't find anything upstream, use this endpoint
+    return src;
+}
+
 void
 gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
 {
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h 
b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
index 2cc8836..011e279 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
@@ -63,6 +63,8 @@ private:
   gr_block_detail_sptr allocate_block_detail(gr_basic_block_sptr block);
   gr_buffer_sptr allocate_buffer(gr_basic_block_sptr block, int port);
   void connect_block_inputs(gr_basic_block_sptr block);
+  void simplify_inplace(gr_basic_block_sptr block);
+  gr_endpoint get_inplace_master(gr_endpoint);
 };
 
 #endif /* INCLUDED_GR_FLAT_FLOWGRAPH_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc 
b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
index c6311cc..ac18cc2 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -44,7 +44,10 @@ gr_tpb_detail::notify_upstream(gr_block_detail *d)
 
   for (size_t i = 0; i < d->d_input.size(); i++){
     // Can you say, "pointer chasing?"
-    d->d_input[i]->buffer()->link()->detail()->d_tpb.set_output_changed();
+    const gr_buffer::writers_type &writers = 
d->d_input[i]->buffer()->writers();
+    for (size_t j = 0; j < writers.size(); j++){
+        writers[j].lock()->detail()->d_tpb.set_output_changed();
+    }
   }
 }
 



reply via email to

[Prev in Thread] Current Thread [Next in Thread]