commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 01/04: runtime: fix propagation of DONE sta


From: git
Subject: [Commit-gnuradio] [gnuradio] 01/04: runtime: fix propagation of DONE state to message blocks
Date: Tue, 29 Apr 2014 17:42:11 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 93494c1c17d6751e94fa8a1a5479d2c701750baf
Author: Tim O'Shea <address@hidden>
Date:   Wed Apr 23 21:33:15 2014 -0400

    runtime: fix propagation of DONE state to message blocks
---
 gnuradio-runtime/include/gnuradio/block.h | 19 +++++++++++
 gnuradio-runtime/lib/block.cc             | 57 +++++++++++++++++++++++++++++++
 gnuradio-runtime/lib/tpb_thread_body.cc   | 17 +++++++--
 3 files changed, 91 insertions(+), 2 deletions(-)

diff --git a/gnuradio-runtime/include/gnuradio/block.h 
b/gnuradio-runtime/include/gnuradio/block.h
index 7390e93..1a978f9 100644
--- a/gnuradio-runtime/include/gnuradio/block.h
+++ b/gnuradio-runtime/include/gnuradio/block.h
@@ -565,6 +565,15 @@ namespace gr {
 
     // 
----------------------------------------------------------------------------
 
+       /*!
+        * \breif the system message handler
+     */
+    void system_handler(pmt::pmt_t msg);
+
+       /*!
+     * \brief returns true when execution has completed due to a message 
connection
+    bool finished();
+
   private:
     int                   d_output_multiple;
     bool                  d_output_multiple_set;
@@ -583,6 +592,7 @@ namespace gr {
     int                   d_priority;              // thread priority level
     bool                  d_pc_rpc_set;
     bool                  d_update_rate;           // should sched update rel 
rate?
+    bool d_finished;    // true if msg ports think we are finished
 
   protected:
     block(void) {} // allows pure virtual interface sub-classes
@@ -766,6 +776,15 @@ namespace gr {
   public:
     block_detail_sptr detail() const { return d_detail; }
     void set_detail(block_detail_sptr detail) { d_detail = detail; }
+
+   /*! \brief Tell msg neighbors we are finished
+       */
+   void notify_msg_neighbors();
+
+   /*! \brief Make sure we dont think we are finished
+       */
+   void clear_finished(){ d_finished = false; }
+
   };
 
   typedef std::vector<block_sptr> block_vector_t;
diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc
index bdf484e..46cbc98 100644
--- a/gnuradio-runtime/lib/block.cc
+++ b/gnuradio-runtime/lib/block.cc
@@ -59,6 +59,8 @@ namespace gr {
       d_min_output_buffer(std::max(output_signature->max_streams(),1), -1)
   {
     global_block_registry.register_primitive(alias(), this);
+    message_port_register_in(pmt::mp("system"));
+    set_msg_handler(pmt::mp("system"), boost::bind(&block::system_handler, 
this, _1));
 
 #ifdef ENABLE_GR_LOG
 #ifdef HAVE_LOG4CPP
@@ -734,6 +736,61 @@ namespace gr {
     }
   }
 
+
+  void
+  block::system_handler(pmt::pmt_t msg)
+  {
+    //std::cout << "system_handler " << msg << "\n";
+    pmt::pmt_t op = pmt::car(msg);
+    if(pmt::eqv(op, pmt::mp("done"))){
+        d_finished = pmt::to_long(pmt::cdr(msg));
+        global_block_registry.notify_blk(alias());
+    } else {
+        std::cout << "WARNING: bad message op on system port!\n";
+        pmt::print(msg);
+    }
+  }
+
+  void
+  block::notify_msg_neighbors()
+  {
+    size_t len = pmt::length(d_message_subscribers);
+    pmt::pmt_t port_names = pmt::make_vector(len, pmt::PMT_NIL);
+    pmt::pmt_t keys = pmt::dict_keys(d_message_subscribers);
+    for(size_t i = 0; i < len; i++) {
+      // for each output port
+      pmt::pmt_t oport = pmt::nth(i,keys);
+
+      // for each subscriber on this port
+      pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, oport, 
pmt::PMT_NIL);
+
+      // iterate through subscribers on port
+      while(pmt::is_pair(currlist)) {
+        pmt::pmt_t target = pmt::car(currlist);
+
+        pmt::pmt_t block = pmt::car(target);
+        pmt::pmt_t port = pmt::mp("system");
+
+        currlist = pmt::cdr(currlist);
+        basic_block_sptr blk = global_block_registry.block_lookup(block);
+        blk->post(port, pmt::cons(pmt::mp("done"), pmt::mp(true)));
+
+        //std::cout << "notify finished --> ";
+        //pmt::print(pmt::cons(block,port));
+        //std::cout << "\n";
+
+        }
+    }
+  }
+
+  bool
+  block::finished()
+  {
+    return d_finished;
+  }
+
+
+
   void
   block::setup_pc_rpc()
   {
diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc 
b/gnuradio-runtime/lib/tpb_thread_body.cc
index 79abd0e..eb47a43 100644
--- a/gnuradio-runtime/lib/tpb_thread_body.cc
+++ b/gnuradio-runtime/lib/tpb_thread_body.cc
@@ -83,8 +83,12 @@ namespace gr {
     if(block->thread_priority() > 0) {
       gr::thread::set_thread_priority(d->thread, block->thread_priority());
     }
+    
+    // make sure our block isnt finished
+    block->clear_finished();
 
     while(1) {
+      tpb_loop_top:
       boost::this_thread::interruption_point();
 
       // handle any queued up messages
@@ -116,6 +120,10 @@ namespace gr {
         s = block_executor::BLKD_IN;
       }
 
+      // if msg ports think we are done, we are done
+      if(block->finished())
+        s = block_executor::DONE;
+
       switch(s){
       case block_executor::READY:              // Tell neighbors we made 
progress.
         d->d_tpb.notify_neighbors(d);
@@ -126,6 +134,7 @@ namespace gr {
         break;
 
       case block_executor::DONE:               // Game over.
+        block->notify_msg_neighbors();
         d->d_tpb.notify_neighbors(d);
         return;
 
@@ -135,8 +144,12 @@ namespace gr {
         while(!d->d_tpb.input_changed) {
 
           // wait for input or message
-          while(!d->d_tpb.input_changed && block->empty_handled_p())
-            d->d_tpb.input_cond.wait(guard);
+          while(!d->d_tpb.input_changed && block->empty_handled_p()){
+            boost::system_time const timeout=boost::get_system_time()+ 
boost::posix_time::milliseconds(250);
+            if(!d->d_tpb.input_cond.timed_wait(guard, timeout)){
+                goto tpb_loop_top; // timeout occured (perform sanity checks 
up top)
+                }
+            }
 
           // handle all pending messages
           BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, 
block->msg_queue) {



reply via email to

[Prev in Thread] Current Thread [Next in Thread]