commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r8800 - in gnuradio/branches/developers/eb/sched-wip/g


From: eb
Subject: [Commit-gnuradio] r8800 - in gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src: lib/runtime tests
Date: Sat, 5 Jul 2008 14:17:37 -0600 (MDT)

Author: eb
Date: 2008-07-05 14:17:36 -0600 (Sat, 05 Jul 2008)
New Revision: 8800

Modified:
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.cc
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.h
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
   gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/tests/Makefile.am
Log:
work-in-progress on thread-per-block scheduler

Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
        2008-07-05 19:14:04 UTC (rev 8799)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
        2008-07-05 20:17:36 UTC (rev 8800)
@@ -35,7 +35,7 @@
 #include <stdio.h>
 
 // must be defined to either 0 or 1
-#define ENABLE_LOGGING 0
+#define ENABLE_LOGGING 1
 
 #if (ENABLE_LOGGING)
 #define LOG(x) do { x; } while(0)
@@ -68,6 +68,7 @@
   int  min_space = std::numeric_limits<int>::max();
 
   for (int i = 0; i < d->noutputs (); i++){
+    gr_buffer::scoped_lock guard(*d->output(i)->mutex());
     int n = round_down (d->output(i)->space_available (), output_multiple);
     if (n == 0){                       // We're blocked on output.
       if (d->output(i)->done()){       // Downstream is done, therefore we're 
done.
@@ -89,6 +90,7 @@
     char name[100];
     snprintf(name, sizeof(name), "sst-%d.log", which_scheduler++);
     d_log = new std::ofstream(name);
+    std::unitbuf(*d_log);              // make it unbuffered...
     *d_log << "gr_block_executor: "
           << d_block << std::endl;
   }
@@ -116,13 +118,16 @@
 
   LOG(*d_log << std::endl << m);
 
-  if (d->done())
+  if (d->done()){
+    assert(0);
     return DONE;
+  }
 
   if (d->source_p ()){
     d_ninput_items_required.resize (0);
     d_ninput_items.resize (0);
     d_input_items.resize (0);
+    d_input_done.resize(0);
     d_output_items.resize (d->noutputs ());
 
     // determine the minimum available output space
@@ -143,14 +148,25 @@
     d_ninput_items_required.resize (d->ninputs ());
     d_ninput_items.resize (d->ninputs ());
     d_input_items.resize (d->ninputs ());
+    d_input_done.resize(d->ninputs());
     d_output_items.resize (0);
     LOG(*d_log << " sink\n");
 
     max_items_avail = 0;
     for (int i = 0; i < d->ninputs (); i++){
-      d_ninput_items[i] = d->input(i)->items_available();
-      //if (d_ninput_items[i] == 0 && d->input(i)->done())
-      if (d_ninput_items[i] < m->output_multiple() && d->input(i)->done())
+      {
+       /*
+        * Acquire the mutex and grab local copies of items_available and done.
+        */
+       gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+       d_ninput_items[i] = d->input(i)->items_available();
+       d_input_done[i] = d->input(i)->done();
+      }
+
+      LOG(*d_log << "  d_ninput_items[" << i << "] = " << d_ninput_items[i] << 
std::endl);
+      LOG(*d_log << "  d_input_done[" << i << "] = " << d_input_done[i] << 
std::endl);
+      
+      if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
        goto were_done;
        
       max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
@@ -175,11 +191,19 @@
     d_ninput_items_required.resize (d->ninputs ());
     d_ninput_items.resize (d->ninputs ());
     d_input_items.resize (d->ninputs ());
+    d_input_done.resize(d->ninputs());
     d_output_items.resize (d->noutputs ());
 
     max_items_avail = 0;
     for (int i = 0; i < d->ninputs (); i++){
-      d_ninput_items[i] = d->input(i)->items_available ();
+      {
+       /*
+        * Acquire the mutex and grab local copies of items_available and done.
+        */
+       gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+       d_ninput_items[i] = d->input(i)->items_available ();
+       d_input_done[i] = d->input(i)->done();
+      }
       max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
     }
 
@@ -232,7 +256,7 @@
 
       // We're blocked on input
       LOG(*d_log << "  BLKD_IN\n");
-      if (d->input(i)->done())    // If the upstream block is done, we're done
+      if (d_input_done[i])     // If the upstream block is done, we're done
        goto were_done;
 
       // Is it possible to ever fulfill this request?

Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
 2008-07-05 19:14:04 UTC (rev 8799)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
 2008-07-05 20:17:36 UTC (rev 8800)
@@ -45,6 +45,7 @@
   gr_vector_int                        d_ninput_items_required;
   gr_vector_int                        d_ninput_items;
   gr_vector_const_void_star    d_input_items;
+  std::vector<bool>            d_input_done;
   gr_vector_void_star          d_output_items;
 
  public:

Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.cc
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.cc
        2008-07-05 19:14:04 UTC (rev 8799)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.cc
        2008-07-05 20:17:36 UTC (rev 8800)
@@ -79,8 +79,8 @@
 
 gr_buffer::gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link)
   : d_base (0), d_bufsize (0), d_vmcircbuf (0),
-    d_sizeof_item (sizeof_item), d_write_index (0),
-    d_done (false), d_link(link)
+    d_sizeof_item (sizeof_item), d_link(link),
+    d_write_index (0), d_done (false)
 {
   if (!allocate_buffer (nitems, sizeof_item))
     throw std::bad_alloc ();
@@ -146,7 +146,7 @@
 
 
 int
-gr_buffer::space_available () const
+gr_buffer::space_available ()
 {
   if (d_readers.empty ())
     return d_bufsize - 1;      // See comment below
@@ -175,9 +175,17 @@
 void
 gr_buffer::update_write_pointer (int nitems)
 {
+  scoped_lock  guard(*mutex());
   d_write_index = index_add (d_write_index, nitems);
 }
 
+void
+gr_buffer::set_done (bool done)
+{
+  scoped_lock  guard(*mutex());
+  d_done = done;
+}
+
 gr_buffer_reader_sptr
 gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr 
link)
 {
@@ -243,6 +251,7 @@
 void
 gr_buffer_reader::update_read_pointer (int nitems)
 {
+  scoped_lock  guard(*mutex());
   d_read_index = d_buffer->index_add (d_read_index, nitems);
 }
 

Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.h
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.h
 2008-07-05 19:14:04 UTC (rev 8799)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_buffer.h
 2008-07-05 20:17:36 UTC (rev 8800)
@@ -25,6 +25,7 @@
 
 #include <gr_runtime_types.h>
 #include <boost/weak_ptr.hpp>
+#include <boost/thread.hpp>
 
 class gr_vmcircbuf;
 
@@ -48,12 +49,15 @@
  */
 class gr_buffer {
  public:
+
+  typedef boost::unique_lock<boost::mutex>  scoped_lock;
+
   virtual ~gr_buffer ();
 
   /*!
    * \brief return number of items worth of space available for writing
    */
-  int space_available () const;
+  int space_available ();
 
   /*!
    * \brief return pointer to write buffer.
@@ -68,8 +72,7 @@
    */
   void update_write_pointer (int nitems);
 
-
-  void set_done (bool done)   { d_done = done; }
+  void set_done (bool done);
   bool done () const { return d_done; }
 
   /*!
@@ -77,6 +80,11 @@
    */
   gr_block_sptr link() { return gr_block_sptr(d_link); }
 
+  size_t nreaders() const { return d_readers.size(); }
+  gr_buffer_reader* reader(size_t index) { return d_readers[index]; }
+
+  boost::mutex *mutex() { return &d_mutex; }
+
   // -------------------------------------------------------------------------
 
  private:
@@ -91,10 +99,15 @@
  private:
   gr_vmcircbuf                        *d_vmcircbuf;
   size_t                               d_sizeof_item;  // in bytes
+  std::vector<gr_buffer_reader *>      d_readers;
+  boost::weak_ptr<gr_block>            d_link;         // block that writes 
this buffer
+
+  //
+  // The mutex protects d_write_index, d_done and the d_read_index's in the 
buffer readers.
+  //
+  boost::mutex                         d_mutex;
   unsigned int                         d_write_index;  // in items 
[0,d_bufsize)
-  std::vector<gr_buffer_reader *>      d_readers;
   bool                                 d_done;
-  boost::weak_ptr<gr_block>            d_link;         // block that writes 
this buffer
   
   unsigned
   index_add (unsigned a, unsigned b)
@@ -164,8 +177,10 @@
  */
 
 class gr_buffer_reader {
+ public:
 
- public:
+  typedef gr_buffer::scoped_lock scoped_lock;
+
   ~gr_buffer_reader ();
 
   /*!
@@ -200,6 +215,9 @@
   void set_done (bool done)   { d_buffer->set_done (done); }
   bool done () const { return d_buffer->done (); }
 
+  boost::mutex *mutex() { return d_buffer->mutex(); }
+
+
   /*!
    * \brief Return the block that reads via this reader.
    */

Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
        2008-07-05 19:14:04 UTC (rev 8799)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
        2008-07-05 20:17:36 UTC (rev 8800)
@@ -38,8 +38,8 @@
 #define GR_TOP_BLOCK_IMPL_DEBUG 0
 
 
-#define USE_STS        1
-#define        USE_TPB 0
+#define USE_STS        0
+#define        USE_TPB 1
 
 
 static gr_scheduler_sptr

Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
    2008-07-05 19:14:04 UTC (rev 8799)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
    2008-07-05 20:17:36 UTC (rev 8800)
@@ -23,7 +23,9 @@
 #include <config.h>
 #endif
 #include <gr_tpb_detail.h>
+#include <gr_block.h>
 #include <gr_block_detail.h>
+#include <gr_buffer.h>
 
 /*
  * We assume that no worker threads are ever running when the
@@ -39,11 +41,27 @@
   // available.
 
   for (size_t i = 0; i < d->d_input.size(); i++){
+    // Can you say, "pointer chasing?"
+    d->d_input[i]->buffer()->link()->detail()->d_tpb.set_output_changed();
   }
 }
 
 void
 gr_tpb_detail::notify_downstream(gr_block_detail *d)
 {
+  // For each of our outputs, tell the guys downstream that they have
+  // new input available.
+
+  for (size_t i = 0; i < d->d_output.size(); i++){
+    gr_buffer_sptr buf = d->d_output[i];
+    for (size_t j = 0, k = buf->nreaders(); j < k; j++)
+      buf->reader(j)->link()->detail()->d_tpb.set_input_changed();
+  }
 }
 
+void
+gr_tpb_detail::notify_neighbors(gr_block_detail *d)
+{
+  notify_downstream(d);
+  notify_upstream(d);
+}

Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
     2008-07-05 19:14:04 UTC (rev 8799)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
     2008-07-05 20:17:36 UTC (rev 8800)
@@ -47,6 +47,9 @@
   //! Called by us to tell all our downstream blocks that their input may have 
changed.
   void notify_downstream(gr_block_detail *d);
 
+  //! Called by us to notify both upstream and downstream
+  void notify_neighbors(gr_block_detail *d);
+
   //! Called by us
   void clear_changed()
   {

Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
       2008-07-05 19:14:04 UTC (rev 8799)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
       2008-07-05 20:17:36 UTC (rev 8800)
@@ -38,11 +38,11 @@
 
     switch(s){
     case gr_block_executor::READY:             // Tell neighbors we made 
progress.
-      d->d_tpb.notify_downstream(d);
-      d->d_tpb.notify_upstream(d);
+      d->d_tpb.notify_neighbors(d);
       break;
 
     case gr_block_executor::DONE:              // Game over.
+      d->d_tpb.notify_neighbors(d);
       return;
 
     case gr_block_executor::BLKD_IN:           // Wait for input.

Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/tests/Makefile.am
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/tests/Makefile.am   
    2008-07-05 19:14:04 UTC (rev 8799)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/tests/Makefile.am   
    2008-07-05 20:17:36 UTC (rev 8800)
@@ -47,11 +47,13 @@
        benchmark_vco           \
        test_runtime            \
        test_general            \
-       test_all                \
        test_filter             \
        test_vmcircbuf
 
+bin_PROGRAMS = \
+       test_all
 
+
 noinst_SCRIPTS = \
        benchmark_dotprod
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]