commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r5050 - in gnuradio/branches/developers/eb/ibu: mblock


From: eb
Subject: [Commit-gnuradio] r5050 - in gnuradio/branches/developers/eb/ibu: mblock/src/lib pmt/src/lib
Date: Thu, 19 Apr 2007 13:50:06 -0600 (MDT)

Author: eb
Date: 2007-04-19 13:50:06 -0600 (Thu, 19 Apr 2007)
New Revision: 5050

Added:
   gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.cc
   gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.h
Modified:
   gnuradio/branches/developers/eb/ibu/mblock/src/lib/Makefile.am
   gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_base.h
   
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
   
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
   gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
   gnuradio/branches/developers/eb/ibu/pmt/src/lib/pmt.cc
Log:
Restructured mb_runtime_thread_per_block::run to use an event loop.
This sets the stage for timeout handling.


Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/Makefile.am
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/Makefile.am      
2007-04-19 08:01:39 UTC (rev 5049)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/Makefile.am      
2007-04-19 19:50:06 UTC (rev 5050)
@@ -48,6 +48,7 @@
        mb_mblock_impl.cc               \
        mb_message.cc                   \
        mb_msg_accepter.cc              \
+       mb_msg_accepter_msgq.cc         \
        mb_msg_accepter_smp.cc          \
        mb_msg_queue.cc                 \
        mb_port.cc                      \
@@ -78,6 +79,7 @@
        mb_mblock.h                     \
        mb_message.h                    \
        mb_msg_accepter.h               \
+       mb_msg_accepter_msgq.h          \
        mb_msg_queue.h                  \
        mb_port.h                       \
        mb_port_simple.h                \

Added: 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.cc  
                        (rev 0)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.cc  
2007-04-19 19:50:06 UTC (rev 5050)
@@ -0,0 +1,46 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_msg_accepter_msgq.h>
+#include <mb_message.h>
+
+pmt_t s_sys_port = pmt_intern("%sys-port");
+
+mb_msg_accepter_msgq::mb_msg_accepter_msgq(mb_msg_queue *msgq)
+  : d_msgq(msgq)
+{
+}
+
+mb_msg_accepter_msgq::~mb_msg_accepter_msgq()
+{
+}
+
+void
+mb_msg_accepter_msgq::operator()(pmt_t signal, pmt_t data,
+                                pmt_t metadata, mb_pri_t priority)
+{
+  mb_message_sptr msg = mb_make_message(signal, data, metadata, priority);
+  msg->set_port_id(s_sys_port);
+  d_msgq->insert(msg);
+}

Added: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.h   
                        (rev 0)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.h   
2007-04-19 19:50:06 UTC (rev 5050)
@@ -0,0 +1,39 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_MB_MSG_ACCEPTER_MSGQ_H
+#define INCLUDED_MB_MSG_ACCEPTER_MSGQ_H
+
+#include <mb_msg_accepter.h>
+#include <mb_msg_queue.h>
+
+/*!
+ * \brief Concrete class that accepts messages and inserts them into a message 
queue.
+ */
+class mb_msg_accepter_msgq : public mb_msg_accepter {
+  mb_msg_queue *d_msgq;
+
+public:
+  mb_msg_accepter_msgq(mb_msg_queue *msgq);
+  ~mb_msg_accepter_msgq();
+  void operator()(pmt_t signal, pmt_t data, pmt_t metadata, mb_pri_t priority);
+};
+
+#endif /* INCLUDED_MB_MSG_ACCEPTER_MSGQ_H */

Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_base.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_base.h        
2007-04-19 08:01:39 UTC (rev 5049)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_base.h        
2007-04-19 19:50:06 UTC (rev 5050)
@@ -33,6 +33,9 @@
 {
   omni_mutex           d_brl;  // big runtime lock (avoid using this if 
possible...)
 
+protected:
+  mb_msg_accepter_sptr  d_accepter;
+
 public:
 
   /*!
@@ -68,6 +71,9 @@
                            mb_msg_accepter_sptr accepter);
   virtual void
   cancel_timeout(pmt_t handle);
+
+  mb_msg_accepter_sptr
+  accepter() { return d_accepter; }
   
 };
 

Modified: 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
===================================================================
--- 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
   2007-04-19 08:01:39 UTC (rev 5049)
+++ 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
   2007-04-19 19:50:06 UTC (rev 5050)
@@ -30,12 +30,17 @@
 #include <mb_worker.h>
 #include <omnithread.h>
 #include <iostream>
+#include <mb_msg_accepter_msgq.h>
 
+
+static pmt_t s_halt = pmt_intern("%halt");
 static pmt_t s_sys_port = pmt_intern("%sys-port");
 static pmt_t s_shutdown = pmt_intern("%shutdown");
-static pmt_t s_halt = pmt_intern("%halt");
+static pmt_t s_shutdown_request = pmt_intern("%shutdown-request");
+static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed");
+static pmt_t s_timeout = pmt_intern("%timeout");
+static pmt_t s_timeout_request = pmt_intern("%timeout-request");
 
-
 static void
 send_sys_msg(mb_msg_queue &msgq, pmt_t signal,
             pmt_t data = PMT_F, pmt_t metadata = PMT_F,
@@ -48,10 +53,10 @@
 
 
 mb_runtime_thread_per_block::mb_runtime_thread_per_block()
-  : d_runtime_cond(&d_mutex),
-    d_shutdown_requested(false), d_shutdown_in_progress(false),
+  : d_shutdown_in_progress(false),
     d_shutdown_result(PMT_T)
 {
+  d_accepter = mb_msg_accepter_sptr(new mb_msg_accepter_msgq(&d_msgq));
 }
 
 mb_runtime_thread_per_block::~mb_runtime_thread_per_block()
@@ -66,13 +71,7 @@
 void
 mb_runtime_thread_per_block::request_shutdown(pmt_t result)
 {
-  omni_mutex_lock l1(d_mutex);
-
-  if (!d_shutdown_requested){
-    d_shutdown_result = result;
-    d_shutdown_requested = true;
-    d_runtime_cond.broadcast();
-  }
+  (*accepter())(s_shutdown_request, result, PMT_F, MB_PRI_BEST);
 }
 
 bool
@@ -84,7 +83,6 @@
     *result = PMT_F;
   
   // reset the shutdown state
-  d_shutdown_requested = false;
   d_shutdown_in_progress = false;
   d_shutdown_result = PMT_T;
 
@@ -114,31 +112,40 @@
 void
 mb_runtime_thread_per_block::run_loop()
 {
-  /*
-   * FIXME probably ought to recode this to use a message queue
-   * and state machine like the rest of the world ;)
-   */
+  while (1){
 
-  omni_mutex_lock l1(d_mutex);
+    // FIXME change this to a timed wait
+    mb_message_sptr msg = d_msgq.get_highest_pri_msg();
 
-  while (1){
+    if (!msg){
+      // FIXME we timed out, must be time to send somebody a %timeout
+      continue;
+    }
 
-    reap_dead_workers();
+    pmt_t signal = msg->signal();
 
-    if (d_workers.empty())     // no work left to do...
-      return;          
+    if (pmt_eq(signal, s_worker_state_changed)){       // %worker-state-changed
+      omni_mutex_lock l1(d_mutex);
+      reap_dead_workers();
+      if (d_workers.empty())   // no work left to do...
+       return;
+    }
+    else if (pmt_eq(signal, s_shutdown_request)){      // %shutdown-request
+      if (!d_shutdown_in_progress){
+       d_shutdown_in_progress = true;
+       d_shutdown_result = msg->data();
 
-    if (d_shutdown_requested && !d_shutdown_in_progress){
-      d_shutdown_in_progress = true;
+       // FIXME state machine, delay before sending %halt
+       send_all_sys_msg(s_shutdown);
+       send_all_sys_msg(s_halt);
 
-      // FIXME state machine, delay before sending %halt
-      send_all_sys_msg(s_shutdown);
-      send_all_sys_msg(s_halt);
-
-      continue;
+      }
     }
-
-    d_runtime_cond.wait();     // wait for something to do.
+    //else if (pmt_eq(signal, s_timeout_request)){     // %timeout-request
+    //}
+    else {
+      std::cerr << "mb_runtime_thread_per_block: unhandled msg: " << msg << 
std::endl;
+    }
   }
 }
 

Modified: 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
===================================================================
--- 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
    2007-04-19 08:01:39 UTC (rev 5049)
+++ 
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
    2007-04-19 19:50:06 UTC (rev 5050)
@@ -23,6 +23,7 @@
 
 #include <mb_runtime_base.h>
 #include <mb_worker.h>
+#include <mb_msg_queue.h>
 
 /*!
  * \brief Concrete runtime that uses a thread per mblock
@@ -34,14 +35,11 @@
 {
 public:
   omni_mutex                 d_mutex;
-  omni_condition             d_runtime_cond;  // runtime waits here
-  //std::vector<mb_worker_sptr> d_workers;
   std::vector<mb_worker*>     d_workers;
-  bool                       d_shutdown_requested;
   bool                       d_shutdown_in_progress;
   pmt_t                              d_shutdown_result;
+  mb_msg_queue               d_msgq;
 
-  //typedef std::vector<mb_worker_sptr>::iterator  worker_iter_t;
   typedef std::vector<mb_worker*>::iterator  worker_iter_t;
 
   mb_runtime_thread_per_block();

Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc     
2007-04-19 08:01:39 UTC (rev 5049)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc     
2007-04-19 19:50:06 UTC (rev 5050)
@@ -27,6 +27,7 @@
 #include <mb_exception.h>
 #include <mb_mblock.h>
 #include <mb_gettid.h>
+#include <mb_msg_accepter.h>
 #include <iostream>
 #ifdef HAVE_SCHED_H
 #include <sched.h>
@@ -34,6 +35,10 @@
 
 #define VERBOSE 0              // define to 0 or 1
 
+
+static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed");
+
+
 mb_worker::mb_worker(mb_runtime_thread_per_block *runtime,
                     mb_mblock_maker_t maker,
                     const std::string &instance_name,
@@ -81,12 +86,15 @@
 void
 mb_worker::set_state(worker_state_t state)
 {
-  omni_mutex_lock  l1(d_runtime->d_mutex);     // lock runtime first, then 
worker
-  omni_mutex_lock  l2(d_mutex);
+  {
+    omni_mutex_lock  l2(d_mutex);
 
-  d_state = state;                       // update our state
-  d_state_cond.broadcast();              // Notify everybody who cares...
-  d_runtime->d_runtime_cond.broadcast();
+    d_state = state;                     // update our state
+    d_state_cond.broadcast();            // Notify everybody who cares...
+  }
+
+  // send msg to runtime, telling it something changed.
+  (*d_runtime->accepter())(s_worker_state_changed, PMT_F, PMT_F, MB_PRI_BEST);
 }
 
 void *

Modified: gnuradio/branches/developers/eb/ibu/pmt/src/lib/pmt.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/pmt/src/lib/pmt.cc      2007-04-19 
08:01:39 UTC (rev 5049)
+++ gnuradio/branches/developers/eb/ibu/pmt/src/lib/pmt.cc      2007-04-19 
19:50:06 UTC (rev 5050)
@@ -665,7 +665,7 @@
 pmt_any_set(pmt_t obj, const boost::any &any)
 {
   if (!obj->is_any())
-    throw pmt_wrong_type("pmt_any_ref", obj);
+    throw pmt_wrong_type("pmt_any_set", obj);
   _any(obj)->set(any);
 }
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]