[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 17/28: core: squashed in-place block work
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 17/28: core: squashed in-place block work |
Date: |
Mon, 15 Aug 2016 00:47:06 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
nwest pushed a commit to annotated tag gr_basic_work
in repository gnuradio.
commit 4e4cd2ce55d95ba08da91d86652ddba34694170e
Author: Josh Blum <address@hidden>
Date: Sat Nov 12 12:45:14 2011 -0800
core: squashed in-place block work
---
gnuradio-core/src/lib/general/gr_throttle.cc | 5 +-
gnuradio-core/src/lib/runtime/gr_block.cc | 1 +
gnuradio-core/src/lib/runtime/gr_block.h | 9 +++
gnuradio-core/src/lib/runtime/gr_buffer.cc | 16 ++++++
gnuradio-core/src/lib/runtime/gr_buffer.h | 18 ++++++
gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc | 67 ++++++++++++++++++++++
gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h | 2 +
gnuradio-core/src/lib/runtime/gr_tpb_detail.cc | 5 +-
8 files changed, 120 insertions(+), 3 deletions(-)
diff --git a/gnuradio-core/src/lib/general/gr_throttle.cc
b/gnuradio-core/src/lib/general/gr_throttle.cc
index a24b1da..f9406c2 100644
--- a/gnuradio-core/src/lib/general/gr_throttle.cc
+++ b/gnuradio-core/src/lib/general/gr_throttle.cc
@@ -37,7 +37,7 @@ public:
gr_make_io_signature(1, 1, itemsize)),
d_itemsize(itemsize)
{
- /* NOP */
+ this->set_inplace(true);
}
void set_sample_rate(double rate){
@@ -68,7 +68,8 @@ public:
//copy all samples output[i] <= input[i]
const char *in = (const char *) input_items[0];
char *out = (char *) output_items[0];
- std::memcpy(out, in, noutput_items * d_itemsize);
+ //only memcpy when the scheduler doesnt simplify to in-place
+ if (in != out) std::memcpy(out, in, noutput_items * d_itemsize);
d_total_samples += noutput_items;
return noutput_items;
}
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc
b/gnuradio-core/src/lib/runtime/gr_block.cc
index 9463869..daefb55 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -35,6 +35,7 @@ gr_block::gr_block (const std::string &name,
: gr_basic_block(name, input_signature, output_signature),
d_output_multiple (1),
d_relative_rate (1.0),
+ d_inplace(false),
d_history(1),
d_fixed_rate(false),
d_tag_propagation_policy(TPP_ALL_TO_ALL)
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h
b/gnuradio-core/src/lib/runtime/gr_block.h
index 86e0583..91a0c62 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -74,6 +74,14 @@ class GR_CORE_API gr_block : public gr_basic_block {
virtual ~gr_block ();
/*!
+ * Enable/disable inplace processing on this block.
+ * This does not guarantee that input memory is output memory.
+ * That is an important decision for the scheduler to make.
+ */
+ bool inplace () const { return d_inplace; }
+ void set_inplace (bool inplace) { d_inplace = inplace; }
+
+ /*!
* Assume block computes y_i = f(x_i, x_i-1, x_i-2, x_i-3...)
* History is the number of x_i's that are examined to produce one y_i.
* This comes in handy for FIR filters, where we use history to
@@ -233,6 +241,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
int d_output_multiple;
double d_relative_rate; // approx output_rate /
input_rate
gr_block_detail_sptr d_detail; // implementation details
+ bool d_inplace;
unsigned d_history;
bool d_fixed_rate;
tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags
downstream
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.cc
b/gnuradio-core/src/lib/runtime/gr_buffer.cc
index 8ccc9db..6c37de9 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.cc
@@ -88,6 +88,7 @@ gr_buffer::gr_buffer (int nitems, size_t sizeof_item,
gr_block_sptr link)
throw std::bad_alloc ();
s_buffer_count++;
+ d_writers.push_back(d_link);
}
gr_buffer_sptr
@@ -146,6 +147,14 @@ gr_buffer::allocate_buffer (int nitems, size_t sizeof_item)
return true;
}
+void gr_buffer::replace(gr_buffer_sptr master){
+ d_bufsize = master->d_bufsize;
+ d_base = master->d_base;
+ for (size_t i = 0; i < d_readers.size(); i++){
+ master->d_inplace_readers.push_back(d_readers[i]);
+ }
+ d_writers.push_back(master->link());
+}
int
gr_buffer::space_available ()
@@ -164,6 +173,13 @@ gr_buffer::space_available ()
min_items_read = std::min(min_items_read, d_readers[i]->nitems_read());
}
+ //and same process but compare this buffer's write index with downstream
in-place readers index
+ for (size_t i = 0; i < d_inplace_readers.size(); i++){
+ gruel::scoped_lock guard(*d_inplace_readers[i]->mutex());
+ const int items_available = this->index_sub (d_write_index,
d_inplace_readers[i]->d_read_index);
+ most_data = std::max(most_data, items_available);
+ }
+
if(min_items_read != d_last_min_items_read) {
prune_tags(d_last_min_items_read);
d_last_min_items_read = min_items_read;
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.h
b/gnuradio-core/src/lib/runtime/gr_buffer.h
index e8e3937..16b9180 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.h
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.h
@@ -56,6 +56,12 @@ class GR_CORE_API gr_buffer {
virtual ~gr_buffer ();
/*!
+ * Replace the memory and size in this block.
+ * Used for in-place operator blocks.
+ */
+ void replace(gr_buffer_sptr master);
+
+ /*!
* \brief return number of items worth of space available for writing
*/
int space_available ();
@@ -86,6 +92,16 @@ class GR_CORE_API gr_buffer {
*/
gr_block_sptr link() { return gr_block_sptr(d_link); }
+ typedef std::vector<boost::weak_ptr<gr_block> > writers_type;
+
+ /*!
+ * \brief Return a list of blocks that write to this resource
+ * This routine was added by the inplace work to tbp could
+ * notify all upstream parties after a read has occurred.
+ * Basically this will return this->link() + the in-place master.
+ */
+ const writers_type &writers(void) const { return d_writers; }
+
size_t nreaders() const { return d_readers.size(); }
gr_buffer_reader* reader(size_t index) { return d_readers[index]; }
@@ -126,7 +142,9 @@ class GR_CORE_API gr_buffer {
gr_vmcircbuf *d_vmcircbuf;
size_t d_sizeof_item; // in bytes
std::vector<gr_buffer_reader *> d_readers;
+ std::vector<gr_buffer_reader *> d_inplace_readers; //readers that share
this buffer resource
boost::weak_ptr<gr_block> d_link; // block that writes to
this buffer
+ writers_type d_writers; // blocks that write to this
buffer
//
// The mutex protects d_write_index, d_abs_write_offset, d_done, d_item_tags
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
index 5d1057e..377b3b1 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
@@ -28,10 +28,12 @@
#include <gr_block_detail.h>
#include <gr_io_signature.h>
#include <gr_buffer.h>
+#include <boost/format.hpp>
#include <iostream>
#include <map>
#define GR_FLAT_FLOWGRAPH_DEBUG 0
+#define GR_FLAT_FLOWGRAPH_INPLACE_DEBUG 1
// 32Kbyte buffer size between blocks
#define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
@@ -64,6 +66,10 @@ gr_flat_flowgraph::setup_connections()
// Connect inputs to outputs for each block
for(gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
connect_block_inputs(*p);
+
+ // Buffer re-use for in-place blocks
+ for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
+ simplify_inplace(*p);
}
gr_block_detail_sptr
@@ -151,6 +157,67 @@
gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
}
}
+static bool is_inplace(gr_basic_block_sptr block){
+ return cast_to_block_sptr(block)->inplace() and
cast_to_block_sptr(block)->fixed_rate();
+}
+
+void gr_flat_flowgraph::simplify_inplace(gr_basic_block_sptr block){
+
+ if (not is_inplace(block)) return;
+ typedef std::multimap<int, gr_endpoint> mastermap_type;
+ mastermap_type upstream_masters;
+
+ //get a map of io sizes to upstream masters
+ for (size_t i = 0; i < calc_used_ports(block, true).size(); i++){
+ const int in_size = block->input_signature()->sizeof_stream_item(i);
+ const gr_endpoint master_ep =
get_inplace_master(calc_upstream_edge(block, i).src());
+ if (master_ep.block().get() != NULL)
upstream_masters.insert(std::make_pair(in_size, master_ep));
+ }
+
+ //replace downstream buffers with upstream buffers
+ for (size_t i = 0; i < calc_used_ports(block, false).size(); i++){
+
+ //search for a itemsize match in the map
+ const int out_size = block->output_signature()->sizeof_stream_item(i);
+ mastermap_type::iterator it = upstream_masters.find(out_size);
+ if (it == upstream_masters.end()) continue;
+
+ //extract the endpoint and erase it from map
+ const gr_endpoint master_ep = (*it).second;
+ upstream_masters.erase(it);
+
+ //in-place the buffer by replacing its guts
+ gr_buffer_sptr master =
cast_to_block_sptr(master_ep.block())->detail()->output(master_ep.port());
+ cast_to_block_sptr(block)->detail()->output(i)->replace(master);
+ if (GR_FLAT_FLOWGRAPH_INPLACE_DEBUG) std::cout <<
+ boost::format("replaced %s out[%d] with %s out[%d]")
+ % block % i % master_ep.block() % master_ep.port()
+ << std::endl;
+ }
+}
+
+gr_endpoint gr_flat_flowgraph::get_inplace_master(gr_endpoint src){
+
+ //this endpoint must have only one destination, return null
+ if (calc_downstream_blocks(src.block(), src.port()).size() != 1) return
gr_endpoint();
+
+ //if this block is not in-place, end here, otherwise search upstream
+ if (!is_inplace(src.block())) return src;
+
+ const size_t out_size =
src.block()->output_signature()->sizeof_stream_item(src.port());
+ for (size_t i = 0; i < calc_used_ports(src.block(), true).size(); i++){
+ if (out_size !=
size_t(src.block()->input_signature()->sizeof_stream_item(i))) continue;
+
+ //when io size matches, look upstream for an inplace master
+ gr_endpoint master_ep =
get_inplace_master(calc_upstream_edge(src.block(), i).src());
+ if (master_ep.block().get() == NULL) continue;
+ return master_ep;
+ }
+
+ //didn't find anything upstream, use this endpoint
+ return src;
+}
+
void
gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
{
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
index 2cc8836..011e279 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
@@ -63,6 +63,8 @@ private:
gr_block_detail_sptr allocate_block_detail(gr_basic_block_sptr block);
gr_buffer_sptr allocate_buffer(gr_basic_block_sptr block, int port);
void connect_block_inputs(gr_basic_block_sptr block);
+ void simplify_inplace(gr_basic_block_sptr block);
+ gr_endpoint get_inplace_master(gr_endpoint);
};
#endif /* INCLUDED_GR_FLAT_FLOWGRAPH_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
index c6311cc..ac18cc2 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -44,7 +44,10 @@ gr_tpb_detail::notify_upstream(gr_block_detail *d)
for (size_t i = 0; i < d->d_input.size(); i++){
// Can you say, "pointer chasing?"
- d->d_input[i]->buffer()->link()->detail()->d_tpb.set_output_changed();
+ const gr_buffer::writers_type &writers =
d->d_input[i]->buffer()->writers();
+ for (size_t j = 0; j < writers.size(); j++){
+ writers[j].lock()->detail()->d_tpb.set_output_changed();
+ }
}
}
- [Commit-gnuradio] [gnuradio] 06/28: basic: call volk for float32 multiplier types, (continued)
- [Commit-gnuradio] [gnuradio] 06/28: basic: call volk for float32 multiplier types, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 09/28: Volk: added 32fc x scalar multiply, implemented in Orc & generic. Orc/SSE tested 10x faster than generic., git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 25/28: core: added set_output_alignment (policy 2 version), git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 11/28: basic: use volk multiply scalar function for multiply_const fc32, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 24/28: basic: set inplace on gr-basic math blocks, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 16/28: basic: performance tweak for sig source index mod, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 04/28: basic: added other basic operators, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 01/28: basic: attempt at new component, partial adder implementation, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 27/28: basic: added dynamic delay block, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 13/28: Volk: whoops, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 17/28: core: squashed in-place block work,
git <=
- [Commit-gnuradio] [gnuradio] 07/28: basic add/mult const, needs test, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 15/28: basic: added super fast signal source, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 26/28: basic: use set_output_alignment in the basic math blocks, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 19/28: core: make in-place buffering enabled per port, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 22/28: filter: added 32f decimating fir + tweaks, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 18/28: basic: added stream selector block, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 05/28: basic: whoops, wrong operator, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 12/28: Volk: 32f_s32f_multiply_32f, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 03/28: basic: added int16 data types and filled in float32, git, 2016/08/14
- [Commit-gnuradio] [gnuradio] 08/28: basic: working add/mult const blocks, block magic2 for static make, git, 2016/08/14