commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r8779 - gnuradio/branches/developers/eb/sched-wip/gnur


From: eb
Subject: [Commit-gnuradio] r8779 - gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime
Date: Thu, 3 Jul 2008 20:12:21 -0600 (MDT)

Author: eb
Date: 2008-07-03 20:12:18 -0600 (Thu, 03 Jul 2008)
New Revision: 8779

Added:
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
Modified:
   
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/Makefile.am
Log:
work-in-progress: gr_block_executor is the generic thread and/or task
per block manager.  We should be able to use it with either a
thread-per-block or a TBB-style task implementation.


Modified: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/Makefile.am
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/Makefile.am
 2008-07-04 01:26:58 UTC (rev 8778)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/Makefile.am
 2008-07-04 02:12:18 UTC (rev 8779)
@@ -35,6 +35,7 @@
        gr_flat_flowgraph.cc                    \
        gr_block.cc                             \
        gr_block_detail.cc                      \
+       gr_block_executor.cc                    \
        gr_hier_block2.cc                       \
        gr_hier_block2_detail.cc                \
        gr_buffer.cc                            \
@@ -81,6 +82,7 @@
        gr_flat_flowgraph.h                     \
        gr_block.h                              \
        gr_block_detail.h                       \
+       gr_block_executor.h                     \
        gr_hier_block2.h                        \
        gr_hier_block2_detail.h                 \
        gr_buffer.h                             \

Copied: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
 (from rev 8762, 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc)
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
                                (rev 0)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.cc
        2008-07-04 02:12:18 UTC (rev 8779)
@@ -0,0 +1,314 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+#include <boost/thread.hpp>
+#include <iostream>
+#include <limits>
+#include <assert.h>
+#include <stdio.h>
+
+// must be defined to either 0 or 1
+#define ENABLE_LOGGING 0
+
+#if (ENABLE_LOGGING)
+#define LOG(x) do { x; } while(0)
+#else
+#define LOG(x) do {;} while(0)
+#endif
+
+static int which_scheduler  = 0;
+
+
+std::ostream&
+operator << (std::ostream& os, const gr_block *m)
+{
+  os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
+  return os;
+}
+
+
+inline static unsigned int
+round_up (unsigned int n, unsigned int multiple)
+{
+  return ((n + multiple - 1) / multiple) * multiple;
+}
+
+inline static unsigned int
+round_down (unsigned int n, unsigned int multiple)
+{
+  return (n / multiple) * multiple;
+}
+
+//
+// Return minimum available write space in all our downstream buffers
+// or -1 if we're output blocked and the output we're blocked
+// on is done.
+//
+static int
+min_available_space (gr_block_detail *d, int output_multiple)
+{
+  int  min_space = std::numeric_limits<int>::max();
+
+  for (int i = 0; i < d->noutputs (); i++){
+    int n = round_down (d->output(i)->space_available (), output_multiple);
+    if (n == 0){                       // We're blocked on output.
+      if (d->output(i)->done()){       // Downstream is done, therefore we're 
done.
+       return -1;
+      }
+      return 0;
+    }
+    min_space = std::min (min_space, n);
+  }
+  return min_space;
+}
+
+
+
+gr_block_executor::gr_block_executor (gr_block_sptr block)
+  : d_block(block), d_log(0)
+{
+  if (ENABLE_LOGGING){
+    char name[100];
+    snprintf(name, sizeof(name), "sst-%d.log", which_scheduler++);
+    d_log = new std::ofstream(name);
+    *d_log << "gr_block_executor: "
+          << d_block << std::endl;
+  }
+
+  d_block->detail()->set_done(false);  // reset done flag
+  d_block->start();                    // enable any drivers, etc.
+}
+
+gr_block_executor::~gr_block_executor ()
+{
+  if (ENABLE_LOGGING)
+    delete d_log;
+
+  d_block->stop();                     // stop any drivers, etc.
+}
+
+
+bool
+gr_block_executor::run_one_iteration()
+{
+  int                          noutput_items;
+  int                          max_items_avail;
+  bool                         making_progress;
+
+  making_progress = false;
+
+  gr_block             *m = d_block.get();
+  gr_block_detail      *d = m->detail().get();
+
+  LOG(*d_log << std::endl << m);
+
+  if (d->done())
+    goto next_block;
+
+  if (d->source_p ()){
+    d_ninput_items_required.resize (0);
+    d_ninput_items.resize (0);
+    d_input_items.resize (0);
+    d_output_items.resize (d->noutputs ());
+
+    // determine the minimum available output space
+    noutput_items = min_available_space (d, m->output_multiple ());
+    LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
+    if (noutput_items == -1)           // we're done
+      goto were_done;
+
+    if (noutput_items == 0){           // we're output blocked
+      LOG(*d_log << "  BLKD_OUT\n");
+      goto next_block;
+    }
+
+    goto setup_call_to_work;           // jump to common code
+  }
+
+  else if (d->sink_p ()){
+    d_ninput_items_required.resize (d->ninputs ());
+    d_ninput_items.resize (d->ninputs ());
+    d_input_items.resize (d->ninputs ());
+    d_output_items.resize (0);
+    LOG(*d_log << " sink\n");
+
+    max_items_avail = 0;
+    for (int i = 0; i < d->ninputs (); i++){
+      d_ninput_items[i] = d->input(i)->items_available();
+      //if (d_ninput_items[i] == 0 && d->input(i)->done())
+      if (d_ninput_items[i] < m->output_multiple() && d->input(i)->done())
+       goto were_done;
+       
+      max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+    }
+
+    // take a swag at how much output we can sink
+    noutput_items = (int) (max_items_avail * m->relative_rate ());
+    noutput_items = round_down (noutput_items, m->output_multiple ());
+    LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
+    LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);
+
+    if (noutput_items == 0){   // we're blocked on input
+      LOG(*d_log << "  BLKD_IN\n");
+      goto next_block;
+    }
+
+    goto try_again;            // Jump to code shared with regular case.
+  }
+
+  else {
+    // do the regular thing
+    d_ninput_items_required.resize (d->ninputs ());
+    d_ninput_items.resize (d->ninputs ());
+    d_input_items.resize (d->ninputs ());
+    d_output_items.resize (d->noutputs ());
+
+    max_items_avail = 0;
+    for (int i = 0; i < d->ninputs (); i++){
+      d_ninput_items[i] = d->input(i)->items_available ();
+      max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+    }
+
+    // determine the minimum available output space
+    noutput_items = min_available_space (d, m->output_multiple ());
+    if (ENABLE_LOGGING){
+      *d_log << " regular ";
+      if (m->relative_rate() >= 1.0)
+       *d_log << "1:" << m->relative_rate() << std::endl;
+      else
+       *d_log << 1.0/m->relative_rate() << ":1\n";
+      *d_log << "  max_items_avail = " << max_items_avail << std::endl;
+      *d_log << "  noutput_items = " << noutput_items << std::endl;
+    }
+    if (noutput_items == -1)           // we're done
+      goto were_done;
+
+    if (noutput_items == 0){           // we're output blocked
+      LOG(*d_log << "  BLKD_OUT\n");
+      goto next_block;
+    }
+
+#if 0
+    // Compute best estimate of noutput_items that we can really use.
+    noutput_items =
+      std::min ((unsigned) noutput_items,
+               std::max ((unsigned) m->output_multiple(),
+                         round_up ((unsigned) (max_items_avail * 
m->relative_rate()),
+                                   m->output_multiple ())));
+
+    LOG(*d_log << "  revised noutput_items = " << noutput_items << std::endl);
+#endif
+
+  try_again:
+    if (m->fixed_rate()){
+      // try to work it forward starting with max_items_avail.
+      // We want to try to consume all the input we've got.
+      int reqd_noutput_items = 
m->fixed_rate_ninput_to_noutput(max_items_avail);
+      reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
+      if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
+       noutput_items = reqd_noutput_items;
+    }
+
+    // ask the block how much input they need to produce noutput_items
+    m->forecast (noutput_items, d_ninput_items_required);
+
+    // See if we've got sufficient input available
+
+    int i;
+    for (i = 0; i < d->ninputs (); i++)
+      if (d_ninput_items_required[i] > d_ninput_items[i])      // not enough
+       break;
+
+    if (i < d->ninputs ()){                    // not enough input on input[i]
+      // if we can, try reducing the size of our output request
+      if (noutput_items > m->output_multiple ()){
+       noutput_items /= 2;
+       noutput_items = round_up (noutput_items, m->output_multiple ());
+       goto try_again;
+      }
+
+      // We're blocked on input
+      LOG(*d_log << "  BLKD_IN\n");
+      if (d->input(i)->done())    // If the upstream block is done, we're done
+       goto were_done;
+
+      // Is it possible to ever fulfill this request?
+      if (d_ninput_items_required[i] > 
d->input(i)->max_possible_items_available ()){
+       // Nope, never going to happen...
+       std::cerr << "\nsched: <gr_block " << m->name()
+                 << " (" << m->unique_id() << ")>"
+                 << " is requesting more input data\n"
+                 << "  than we can provide.\n"
+                 << "  ninput_items_required = "
+                 << d_ninput_items_required[i] << "\n"
+                 << "  max_possible_items_available = "
+                 << d->input(i)->max_possible_items_available() << "\n"
+                 << "  If this is a filter, consider reducing the number of 
taps.\n";
+       goto were_done;
+      }
+
+      goto next_block;
+    }
+
+    // We've got enough data on each input to produce noutput_items.
+    // Finish setting up the call to work.
+
+    for (int i = 0; i < d->ninputs (); i++)
+      d_input_items[i] = d->input(i)->read_pointer();
+
+  setup_call_to_work:
+
+    for (int i = 0; i < d->noutputs (); i++)
+      d_output_items[i] = d->output(i)->write_pointer();
+
+    // Do the actual work of the block
+    int n = m->general_work (noutput_items, d_ninput_items,
+                            d_input_items, d_output_items);
+    LOG(*d_log << "  general_work: noutput_items = " << noutput_items
+       << " result = " << n << std::endl);
+
+    if (n == -1)               // block is done
+      goto were_done;
+
+    d->produce_each (n);       // advance write pointers
+    if (n > 0)
+      making_progress = true;
+
+    goto next_block;
+  }
+  assert (0);
+    
+ were_done:
+  LOG(*d_log << "  were_done\n");
+  d->set_done (true);
+
+ next_block:
+  return making_progress;
+}

Copied: 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
 (from rev 8747, 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.h)
===================================================================
--- 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
                         (rev 0)
+++ 
gnuradio/branches/developers/eb/sched-wip/gnuradio-core/src/lib/runtime/gr_block_executor.h
 2008-07-04 02:12:18 UTC (rev 8779)
@@ -0,0 +1,61 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_BLOCK_EXECUTOR_H
+#define INCLUDED_GR_BLOCK_EXECUTOR_H
+
+#include <gr_runtime_types.h>
+#include <fstream>
+
+//class gr_block_executor;
+//typedef boost::shared_ptr<gr_block_executor> gr_block_executor_sptr;
+
+
+/*!
+ * \brief Manage the execution of a single block.
+ * \ingroup internal
+ */
+
+class gr_block_executor {
+protected:
+  gr_block_sptr                        d_block;        // The block we're 
trying to run
+  std::ofstream                       *d_log;
+
+  // These are allocated here so we don't have to on each iteration
+
+  gr_vector_int                        d_ninput_items_required;
+  gr_vector_int                        d_ninput_items;
+  gr_vector_const_void_star    d_input_items;
+  gr_vector_void_star          d_output_items;
+
+ public:
+  gr_block_executor(gr_block_sptr block);
+  ~gr_block_executor ();
+
+  /*
+   * \brief Run one iteration.
+   * \returns true if progress was made, else false.
+   */
+  bool run_one_iteration();
+};
+
+#endif /* INCLUDED_GR_BLOCK_EXECUTOR_H */





reply via email to

[Prev in Thread] Current Thread [Next in Thread]