[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r8800 - in gnuradio/branches/developers/eb/sched-wip/g
From: |
eb |
Subject: |
[Commit-gnuradio] r8800 - in gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src: lib/runtime tests |
Date: |
Sat, 5 Jul 2008 14:17:37 -0600 (MDT) |
Author: eb
Date: 2008-07-05 14:17:36 -0600 (Sat, 05 Jul 2008)
New Revision: 8800
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.cc
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.h
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/tests/Makefile.am
Log:
work-in-progress on thread-per-block scheduler
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
2008-07-05 19:14:04 UTC (rev 8799)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
2008-07-05 20:17:36 UTC (rev 8800)
@@ -35,7 +35,7 @@
#include <stdio.h>
// must be defined to either 0 or 1
-#define ENABLE_LOGGING 0
+#define ENABLE_LOGGING 1
#if (ENABLE_LOGGING)
#define LOG(x) do { x; } while(0)
@@ -68,6 +68,7 @@
int min_space = std::numeric_limits<int>::max();
for (int i = 0; i < d->noutputs (); i++){
+ gr_buffer::scoped_lock guard(*d->output(i)->mutex());
int n = round_down (d->output(i)->space_available (), output_multiple);
if (n == 0){ // We're blocked on output.
if (d->output(i)->done()){ // Downstream is done, therefore we're
done.
@@ -89,6 +90,7 @@
char name[100];
snprintf(name, sizeof(name), "sst-%d.log", which_scheduler++);
d_log = new std::ofstream(name);
+ std::unitbuf(*d_log); // make it unbuffered...
*d_log << "gr_block_executor: "
<< d_block << std::endl;
}
@@ -116,13 +118,16 @@
LOG(*d_log << std::endl << m);
- if (d->done())
+ if (d->done()){
+ assert(0);
return DONE;
+ }
if (d->source_p ()){
d_ninput_items_required.resize (0);
d_ninput_items.resize (0);
d_input_items.resize (0);
+ d_input_done.resize(0);
d_output_items.resize (d->noutputs ());
// determine the minimum available output space
@@ -143,14 +148,25 @@
d_ninput_items_required.resize (d->ninputs ());
d_ninput_items.resize (d->ninputs ());
d_input_items.resize (d->ninputs ());
+ d_input_done.resize(d->ninputs());
d_output_items.resize (0);
LOG(*d_log << " sink\n");
max_items_avail = 0;
for (int i = 0; i < d->ninputs (); i++){
- d_ninput_items[i] = d->input(i)->items_available();
- //if (d_ninput_items[i] == 0 && d->input(i)->done())
- if (d_ninput_items[i] < m->output_multiple() && d->input(i)->done())
+ {
+ /*
+ * Acquire the mutex and grab local copies of items_available and done.
+ */
+ gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+ d_ninput_items[i] = d->input(i)->items_available();
+ d_input_done[i] = d->input(i)->done();
+ }
+
+ LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i] <<
std::endl);
+ LOG(*d_log << " d_input_done[" << i << "] = " << d_input_done[i] <<
std::endl);
+
+ if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
goto were_done;
max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
@@ -175,11 +191,19 @@
d_ninput_items_required.resize (d->ninputs ());
d_ninput_items.resize (d->ninputs ());
d_input_items.resize (d->ninputs ());
+ d_input_done.resize(d->ninputs());
d_output_items.resize (d->noutputs ());
max_items_avail = 0;
for (int i = 0; i < d->ninputs (); i++){
- d_ninput_items[i] = d->input(i)->items_available ();
+ {
+ /*
+ * Acquire the mutex and grab local copies of items_available and done.
+ */
+ gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+ d_ninput_items[i] = d->input(i)->items_available ();
+ d_input_done[i] = d->input(i)->done();
+ }
max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
}
@@ -232,7 +256,7 @@
// We're blocked on input
LOG(*d_log << " BLKD_IN\n");
- if (d->input(i)->done()) // If the upstream block is done, we're done
+ if (d_input_done[i]) // If the upstream block is done, we're done
goto were_done;
// Is it possible to ever fulfill this request?
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
2008-07-05 19:14:04 UTC (rev 8799)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
2008-07-05 20:17:36 UTC (rev 8800)
@@ -45,6 +45,7 @@
gr_vector_int d_ninput_items_required;
gr_vector_int d_ninput_items;
gr_vector_const_void_star d_input_items;
+ std::vector<bool> d_input_done;
gr_vector_void_star d_output_items;
public:
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.cc
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.cc
2008-07-05 19:14:04 UTC (rev 8799)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.cc
2008-07-05 20:17:36 UTC (rev 8800)
@@ -79,8 +79,8 @@
gr_buffer::gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link)
: d_base (0), d_bufsize (0), d_vmcircbuf (0),
- d_sizeof_item (sizeof_item), d_write_index (0),
- d_done (false), d_link(link)
+ d_sizeof_item (sizeof_item), d_link(link),
+ d_write_index (0), d_done (false)
{
if (!allocate_buffer (nitems, sizeof_item))
throw std::bad_alloc ();
@@ -146,7 +146,7 @@
int
-gr_buffer::space_available () const
+gr_buffer::space_available ()
{
if (d_readers.empty ())
return d_bufsize - 1; // See comment below
@@ -175,9 +175,17 @@
void
gr_buffer::update_write_pointer (int nitems)
{
+ scoped_lock guard(*mutex());
d_write_index = index_add (d_write_index, nitems);
}
+void
+gr_buffer::set_done (bool done)
+{
+ scoped_lock guard(*mutex());
+ d_done = done;
+}
+
gr_buffer_reader_sptr
gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr
link)
{
@@ -243,6 +251,7 @@
void
gr_buffer_reader::update_read_pointer (int nitems)
{
+ scoped_lock guard(*mutex());
d_read_index = d_buffer->index_add (d_read_index, nitems);
}
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.h
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.h
2008-07-05 19:14:04 UTC (rev 8799)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.h
2008-07-05 20:17:36 UTC (rev 8800)
@@ -25,6 +25,7 @@
#include <gr_runtime_types.h>
#include <boost/weak_ptr.hpp>
+#include <boost/thread.hpp>
class gr_vmcircbuf;
@@ -48,12 +49,15 @@
*/
class gr_buffer {
public:
+
+ typedef boost::unique_lock<boost::mutex> scoped_lock;
+
virtual ~gr_buffer ();
/*!
* \brief return number of items worth of space available for writing
*/
- int space_available () const;
+ int space_available ();
/*!
* \brief return pointer to write buffer.
@@ -68,8 +72,7 @@
*/
void update_write_pointer (int nitems);
-
- void set_done (bool done) { d_done = done; }
+ void set_done (bool done);
bool done () const { return d_done; }
/*!
@@ -77,6 +80,11 @@
*/
gr_block_sptr link() { return gr_block_sptr(d_link); }
+ size_t nreaders() const { return d_readers.size(); }
+ gr_buffer_reader* reader(size_t index) { return d_readers[index]; }
+
+ boost::mutex *mutex() { return &d_mutex; }
+
// -------------------------------------------------------------------------
private:
@@ -91,10 +99,15 @@
private:
gr_vmcircbuf *d_vmcircbuf;
size_t d_sizeof_item; // in bytes
+ std::vector<gr_buffer_reader *> d_readers;
+ boost::weak_ptr<gr_block> d_link; // block that writes
this buffer
+
+ //
+ // The mutex protects d_write_index, d_done and the d_read_index's in the
buffer readers.
+ //
+ boost::mutex d_mutex;
unsigned int d_write_index; // in items
[0,d_bufsize)
- std::vector<gr_buffer_reader *> d_readers;
bool d_done;
- boost::weak_ptr<gr_block> d_link; // block that writes
this buffer
unsigned
index_add (unsigned a, unsigned b)
@@ -164,8 +177,10 @@
*/
class gr_buffer_reader {
+ public:
- public:
+ typedef gr_buffer::scoped_lock scoped_lock;
+
~gr_buffer_reader ();
/*!
@@ -200,6 +215,9 @@
void set_done (bool done) { d_buffer->set_done (done); }
bool done () const { return d_buffer->done (); }
+ boost::mutex *mutex() { return d_buffer->mutex(); }
+
+
/*!
* \brief Return the block that reads via this reader.
*/
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
2008-07-05 19:14:04 UTC (rev 8799)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
2008-07-05 20:17:36 UTC (rev 8800)
@@ -38,8 +38,8 @@
#define GR_TOP_BLOCK_IMPL_DEBUG 0
-#define USE_STS 1
-#define USE_TPB 0
+#define USE_STS 0
+#define USE_TPB 1
static gr_scheduler_sptr
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
2008-07-05 19:14:04 UTC (rev 8799)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
2008-07-05 20:17:36 UTC (rev 8800)
@@ -23,7 +23,9 @@
#include <config.h>
#endif
#include <gr_tpb_detail.h>
+#include <gr_block.h>
#include <gr_block_detail.h>
+#include <gr_buffer.h>
/*
* We assume that no worker threads are ever running when the
@@ -39,11 +41,27 @@
// available.
for (size_t i = 0; i < d->d_input.size(); i++){
+ // Can you say, "pointer chasing?"
+ d->d_input[i]->buffer()->link()->detail()->d_tpb.set_output_changed();
}
}
void
gr_tpb_detail::notify_downstream(gr_block_detail *d)
{
+ // For each of our outputs, tell the guys downstream that they have
+ // new input available.
+
+ for (size_t i = 0; i < d->d_output.size(); i++){
+ gr_buffer_sptr buf = d->d_output[i];
+ for (size_t j = 0, k = buf->nreaders(); j < k; j++)
+ buf->reader(j)->link()->detail()->d_tpb.set_input_changed();
+ }
}
+void
+gr_tpb_detail::notify_neighbors(gr_block_detail *d)
+{
+ notify_downstream(d);
+ notify_upstream(d);
+}
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
2008-07-05 19:14:04 UTC (rev 8799)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
2008-07-05 20:17:36 UTC (rev 8800)
@@ -47,6 +47,9 @@
//! Called by us to tell all our downstream blocks that their input may have
changed.
void notify_downstream(gr_block_detail *d);
+ //! Called by us to notify both upstream and downstream
+ void notify_neighbors(gr_block_detail *d);
+
//! Called by us
void clear_changed()
{
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
2008-07-05 19:14:04 UTC (rev 8799)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
2008-07-05 20:17:36 UTC (rev 8800)
@@ -38,11 +38,11 @@
switch(s){
case gr_block_executor::READY: // Tell neighbors we made
progress.
- d->d_tpb.notify_downstream(d);
- d->d_tpb.notify_upstream(d);
+ d->d_tpb.notify_neighbors(d);
break;
case gr_block_executor::DONE: // Game over.
+ d->d_tpb.notify_neighbors(d);
return;
case gr_block_executor::BLKD_IN: // Wait for input.
Modified:
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/tests/Makefile.am
===================================================================
---
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/tests/Makefile.am
2008-07-05 19:14:04 UTC (rev 8799)
+++
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/tests/Makefile.am
2008-07-05 20:17:36 UTC (rev 8800)
@@ -47,11 +47,13 @@
benchmark_vco \
test_runtime \
test_general \
- test_all \
test_filter \
test_vmcircbuf
+bin_PROGRAMS = \
+ test_all
+
noinst_SCRIPTS = \
benchmark_dotprod
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r8800 - in gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src: lib/runtime tests,
eb <=