commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r4911 - in gnuradio/branches/developers/eb/ibu: mblock


From: eb
Subject: [Commit-gnuradio] r4911 - in gnuradio/branches/developers/eb/ibu: mblock/src/lib omnithread
Date: Fri, 6 Apr 2007 23:49:56 -0600 (MDT)

Author: eb
Date: 2007-04-06 23:49:56 -0600 (Fri, 06 Apr 2007)
New Revision: 4911

Modified:
   gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.cc
   gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.h
   
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
   
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
   gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
   gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.h
   gnuradio/branches/developers/eb/ibu/omnithread/omnithread.h
Log:
work-in-progress on mblocks.  Currently broken.  Need to
rethink/rework calls of initial_transitions.


Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.cc  
2007-04-07 03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.cc  
2007-04-07 05:49:56 UTC (rev 4911)
@@ -91,6 +91,12 @@
 {
 }
 
+mbe_mblock_failed::mbe_mblock_failed(mb_mblock *mb,
+                                    const std::string &msg)
+  : mbe_base(mb, "Message block failed: " + msg)
+{
+}
+
 mbe_terminate::mbe_terminate()
 {
 }

Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.h   
2007-04-07 03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_exception.h   
2007-04-07 05:49:56 UTC (rev 4911)
@@ -93,6 +93,14 @@
                        const std::string &port_name);
 };
 
+class mbe_mblock_failed : public mbe_base
+{
+public:
+  mbe_mblock_failed(mb_mblock *, const std::string &msg);
+};
+
+
+
 // not derived from mbe_base to simplify try/catch
 class mbe_terminate
 {

Modified: 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
===================================================================
--- 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
   2007-04-07 03:06:38 UTC (rev 4910)
+++ 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
   2007-04-07 05:49:56 UTC (rev 4911)
@@ -27,11 +27,10 @@
 #include <mb_mblock_impl.h>
 #include <mb_class_registry.h>
 #include <mb_exception.h>
+#include <mb_worker.h>
 #include <omnithread.h>
 
 
-
-
 mb_runtime_thread_per_block::mb_runtime_thread_per_block()
   : d_runtime_cond(&d_mutex)
 {
@@ -40,45 +39,89 @@
 
 mb_runtime_thread_per_block::~mb_runtime_thread_per_block()
 {
-  // nop for now
+  // FIXME iterate over workers and ensure that they are dead.
 }
 
-
 bool
 mb_runtime_thread_per_block::run(const std::string &instance_name,
-                                 const std::string &class_name,
-                                 pmt_t user_arg)
+                                const std::string &class_name,
+                                pmt_t user_arg)
 {
-  class initial_visitor : public mb_visitor
+  // Create the top-level component, and recursively all of its
+  // subcomponents.
+  mb_mblock_sptr top = create_component(instance_name, class_name, user_arg);
+
+  // FIXME!  Rethink when to run initial_transitions.
+
+  // Now tell them all to run their initial_transitions
   {
-  public:
-    bool operator()(mb_mblock *mblock)
-    {
-      mblock->initial_transition();
-      return true;
-    }
-  };
+    omni_mutex_lock    l1(d_mutex);      // lock runtime first...
 
-  initial_visitor      visitor;
+    for (worker_iter_t w = d_workers.begin(); w != d_workers.end(); ++w){
+      omni_mutex_lock  l2((*w)->d_mutex);        // ...then worker.
+      switch ((*w)->d_state){
 
-  mb_mblock_sptr top = create_component(instance_name, class_name, user_arg);
+      case mb_worker::TS_CONSTRUCTED:          // expected case
+       (*w)->d_state = mb_worker::TS_RUN_INITIAL;
+       (*w)->d_state_cond.broadcast();
+       break;
 
-  // FIXME wait for barrier, then ask each mblock to run its initial_transition
+      default:
+       // FIXME...
+       break;
+      }
+    }
+  }
 
   return true;
 }
 
 //
-// FIXME create the thread, then create the component in the thread
+// Create the thread, then create the component in the thread.
+// Return a pointer to the created mblock.
 //
 mb_mblock_sptr
 mb_runtime_thread_per_block::create_component(const std::string &instance_name,
-                                              const std::string &class_name,
-                                              pmt_t user_arg)
+                                             const std::string &class_name,
+                                             pmt_t user_arg)
 {
   mb_mblock_maker_t maker;
   if (!mb_class_registry::lookup_maker(class_name, &maker))
     throw mbe_no_such_class(0, class_name + " (in " + instance_name + ")");
 
-  return maker(this, instance_name, user_arg);
+  // FIXME here's where we'd lookup NUMA placement requests & mblock
+  // priorities and communicate them to the worker we're creating...
+
+  // Create the worker thread
+  mb_worker_sptr w =
+    mb_worker_sptr(new mb_worker(this, maker, instance_name, user_arg));
+
+  w->start_undetached();  // start it
+
+  // Wait for it to reach TS_CONSTRUCTED or TS_DEAD
+  {
+    omni_mutex_lock l(w->d_mutex);
+    while (!(w->d_state != mb_worker::TS_CONSTRUCTED
+            || w->d_state != mb_worker::TS_DEAD))
+      w->d_state_cond.wait();
+  }
+
+  if (w->d_state == mb_worker::TS_DEAD){  // failed to init
+    void *ignore;
+    w->join(&ignore);          // reap it now
+
+    // FIXME with some work we ought to be able to propagate the
+    // exception from the worker.
+    throw mbe_mblock_failed(0, instance_name);
+  }
+
+  // The worker has successfully constructed the mblock and
+  // is blocked waiting to be told to run its initial_transition.
+  // Add w to the vector of workers, and return the mblock.
+  {
+    omni_mutex_lock l(d_mutex);
+    d_workers.push_back(w);
+  }
+
+  return w->d_mblock;
 }

Modified: 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
===================================================================
--- 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
    2007-04-07 03:06:38 UTC (rev 4910)
+++ 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
    2007-04-07 05:49:56 UTC (rev 4911)
@@ -18,23 +18,27 @@
  * with this program; if not, write to the Free Software Foundation, Inc.,
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
-#ifndef INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H
-#define INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H
+#ifndef INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H
+#define INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H
 
 #include <mb_runtime.h>
+#include <mb_worker.h>
 
 /*!
  * \brief Concrete runtime that uses a thread per mblock
  * \implementation
  *
- * This is all implementation details.
+ * These are all implementation details.
  */
 class mb_runtime_thread_per_block : public mb_runtime
 {
 public:
-  omni_mutex           d_mutex;
-  omni_condition       d_runtime_cond;   // runtime waits here when it's got 
nothing to do
+  omni_mutex                 d_mutex;
+  omni_condition             d_runtime_cond;  // runtime waits here
+  std::vector<mb_worker_sptr> d_workers;
 
+  typedef std::vector<mb_worker_sptr>::iterator  worker_iter_t;
+
   mb_runtime_thread_per_block();
   ~mb_runtime_thread_per_block();
 
@@ -49,4 +53,4 @@
                   pmt_t user_arg);
 };
 
-#endif /* INCLUDED_MB_RUNTIME_THREAD_PER_MBLOCK_H */
+#endif /* INCLUDED_MB_RUNTIME_THREAD_PER_BLOCK_H */

Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc     
2007-04-07 03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc     
2007-04-07 05:49:56 UTC (rev 4911)
@@ -25,9 +25,25 @@
 #include <mb_worker.h>
 #include <mb_runtime_thread_per_block.h>
 #include <mb_exception.h>
+#include <mb_mblock.h>
+#include <iostream>
 
+#include <sys/syscall.h>       // Move this somewhere else and autoconf
+#include <unistd.h>
 
+#ifdef SYS_gettid
+int mb_gettid()
+{
+  return syscall(SYS_gettid);
+}
+#else
+int mb_gettid()
+{
+  return -1;
+}
+#endif
 
+
 mb_worker::mb_worker(mb_runtime_thread_per_block *runtime,
                     mb_mblock_maker_t maker,
                     const std::string &instance_name,
@@ -35,7 +51,7 @@
   : omni_thread((void *) 0, PRIORITY_NORMAL),
     d_runtime(runtime), d_maker(maker),
     d_instance_name(instance_name), d_user_arg(user_arg),
-    d_state_cond(&d_mutex), d_state(TS_UNITIALIZED),
+    d_state_cond(&d_mutex), d_state(TS_UNINITIALIZED),
     d_why_dead(RIP_NOT_DEAD_YET)
 {
 }
@@ -44,11 +60,26 @@
 {
 }
 
+void
+mb_worker::set_state(worker_state_t state)
+{
+  omni_mutex_lock  l1(d_runtime->d_mutex);     // lock runtime first, then 
worker
+  omni_mutex_lock  l2(d_mutex);
+
+  d_state = state;                       // update our state
+  d_state_cond.broadcast();              // Notify everybody who cares...
+  d_runtime->d_runtime_cond.broadcast();
+}
+
+
 void *
 mb_worker::run_undetached(void *ignored)
 {
+  // FIXME add pthread_sigmask stuff
+
   try {
     worker_thread_top_level();
+    d_why_dead = RIP_EXIT;
   }
   catch (mbe_terminate){
     d_why_dead = RIP_TERMINATE;
@@ -61,17 +92,61 @@
       d_why_dead = RIP_UNHANDLED_EXCEPTION;
   }
 
-  omni_mutex_lock      l1(d_runtime->d_mutex); // lock runtime first, then 
worker
-  omni_mutex_lock      l2(d_mutex);
-
-  d_state = TS_DEAD;                     // update our state
-  d_state_cond.broadcast();              // Notify anybody who cares...
-  d_runtime->d_runtime_cond.broadcast();
-
+  set_state(TS_DEAD);
   return 0;
 }
 
 void
 mb_worker::worker_thread_top_level()
 {
+  std::cerr << "worker_thread_top_level (enter):" << std::endl
+           << "  instance_name: " << d_instance_name << std::endl
+           << "  omnithread id: " << id() << std::endl
+           << "  gettid:        " << mb_gettid() << std::endl
+           << "  getpid:        " << getpid() << std::endl;
+
+  try {
+    d_mblock = d_maker(d_runtime, d_instance_name, d_user_arg);
+  }
+  catch (...){
+    d_why_dead = RIP_CTOR_EXCEPTION;
+    throw;
+  }
+
+  // We've got an mblock.  Let runtime know we're good so far.
+  set_state(TS_CONSTRUCTED);
+
+
+  std::cerr << "worker_thread_top_level (post-construction):" << std::endl
+           << "  instance_name: " << d_instance_name << std::endl;
+
+  // Wait for runtime to change our state to TS_RUN_INITIAL.
+  {
+    omni_mutex_lock l(d_mutex);
+    while (d_state != TS_RUN_INITIAL)
+      d_state_cond.wait();
+  }
+
+  std::cerr << "worker_thread_top_level (got RUN_INITIAL):" << std::endl
+           << "  instance_name: " << d_instance_name << std::endl;
+
+  try {
+    d_mblock->initial_transition();
+  }
+  catch (...){
+    d_why_dead = RIP_INIT_EXCEPTION;
+    throw;
+  }
+
+  // initial_transition was OK, set state to TS_RUNNING
+  set_state(TS_RUNNING);
+
+  std::cerr << "worker_thread_top_level (post-initial-transition):" << 
std::endl
+           << "  instance_name: " << d_instance_name << std::endl;
+
+  // FIXME run the main_loop here
+  sleep(5);
+
+  std::cerr << "worker_thread_top_level (exit):" << std::endl
+           << "  instance_name: " << d_instance_name << std::endl;
 }

Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.h      
2007-04-07 03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.h      
2007-04-07 05:49:56 UTC (rev 4911)
@@ -27,6 +27,9 @@
 #include <mb_class_registry.h>
 
 
+class mb_worker;
+typedef boost::shared_ptr<mb_worker> mb_worker_sptr;
+
 class mb_runtime_thread_per_block;
 
 /*!
@@ -38,7 +41,7 @@
 public:
   //! worker thread states
   enum worker_state_t {
-    TS_UNITIALIZED,    // new, uninitialized
+    TS_UNINITIALIZED,  // new, uninitialized
     TS_CONSTRUCTED,    // mblock was successfully constructed by thread
     TS_RUN_INITIAL,    // thread should run initial_transition
     TS_RUNNING,                // normal steady-state condition.
@@ -58,7 +61,7 @@
   /*
    * Args used by new thread to create mb_mblock
    */
-  mb_runtime_thread_per_block *d_runtime;
+  mb_runtime_thread_per_block  *d_runtime;
   mb_mblock_maker_t            d_maker;
   std::string                  d_instance_name;
   pmt_t                                d_user_arg;
@@ -93,6 +96,11 @@
    * \brief Invokes the top-level of the new thread (name kind of sucks)
    */
   void *run_undetached(void *arg);
+
+private:
+  // Neither d_mutex nor runtime->d_mutex may be held while calling this.
+  // It locks and unlocks them itself.
+  void set_state(worker_state_t state);
 };
 
 

Modified: gnuradio/branches/developers/eb/ibu/omnithread/omnithread.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/omnithread/omnithread.h 2007-04-07 
03:06:38 UTC (rev 4910)
+++ gnuradio/branches/developers/eb/ibu/omnithread/omnithread.h 2007-04-07 
05:49:56 UTC (rev 4911)
@@ -391,11 +391,15 @@
        // execute the run() or run_undetached() member functions depending on
        // whether start() or start_undetached() is called respectively.
 
+public:
+
     void start_undetached(void);
        // can be used with the above constructor in a derived class to cause
        // the thread to be undetached.  In this case the thread executes the
        // run_undetached member function.
 
+protected:
+
     virtual ~omni_thread(void);
        // destructor cannot be called by user (except via a derived class).
        // Use exit() or cancel() instead. This also means a thread object must





reply via email to

[Prev in Thread] Current Thread [Next in Thread]