[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r5050 - in gnuradio/branches/developers/eb/ibu: mblock
From: |
eb |
Subject: |
[Commit-gnuradio] r5050 - in gnuradio/branches/developers/eb/ibu: mblock/src/lib pmt/src/lib |
Date: |
Thu, 19 Apr 2007 13:50:06 -0600 (MDT) |
Author: eb
Date: 2007-04-19 13:50:06 -0600 (Thu, 19 Apr 2007)
New Revision: 5050
Added:
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.cc
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.h
Modified:
gnuradio/branches/developers/eb/ibu/mblock/src/lib/Makefile.am
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_base.h
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
gnuradio/branches/developers/eb/ibu/pmt/src/lib/pmt.cc
Log:
Restructured mb_runtime_thread_per_block::run to use an event loop.
This sets the stage for timeout handling.
Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/Makefile.am
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/Makefile.am
2007-04-19 08:01:39 UTC (rev 5049)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/Makefile.am
2007-04-19 19:50:06 UTC (rev 5050)
@@ -48,6 +48,7 @@
mb_mblock_impl.cc \
mb_message.cc \
mb_msg_accepter.cc \
+ mb_msg_accepter_msgq.cc \
mb_msg_accepter_smp.cc \
mb_msg_queue.cc \
mb_port.cc \
@@ -78,6 +79,7 @@
mb_mblock.h \
mb_message.h \
mb_msg_accepter.h \
+ mb_msg_accepter_msgq.h \
mb_msg_queue.h \
mb_port.h \
mb_port_simple.h \
Added:
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.cc
(rev 0)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.cc
2007-04-19 19:50:06 UTC (rev 5050)
@@ -0,0 +1,46 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <mb_msg_accepter_msgq.h>
+#include <mb_message.h>
+
+pmt_t s_sys_port = pmt_intern("%sys-port");
+
+mb_msg_accepter_msgq::mb_msg_accepter_msgq(mb_msg_queue *msgq)
+ : d_msgq(msgq)
+{
+}
+
+mb_msg_accepter_msgq::~mb_msg_accepter_msgq()
+{
+}
+
+void
+mb_msg_accepter_msgq::operator()(pmt_t signal, pmt_t data,
+ pmt_t metadata, mb_pri_t priority)
+{
+ mb_message_sptr msg = mb_make_message(signal, data, metadata, priority);
+ msg->set_port_id(s_sys_port);
+ d_msgq->insert(msg);
+}
Added: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.h
(rev 0)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_msg_accepter_msgq.h
2007-04-19 19:50:06 UTC (rev 5050)
@@ -0,0 +1,39 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_MB_MSG_ACCEPTER_MSGQ_H
+#define INCLUDED_MB_MSG_ACCEPTER_MSGQ_H
+
+#include <mb_msg_accepter.h>
+#include <mb_msg_queue.h>
+
+/*!
+ * \brief Concrete class that accepts messages and inserts them into a message
queue.
+ */
+class mb_msg_accepter_msgq : public mb_msg_accepter {
+ mb_msg_queue *d_msgq;
+
+public:
+ mb_msg_accepter_msgq(mb_msg_queue *msgq);
+ ~mb_msg_accepter_msgq();
+ void operator()(pmt_t signal, pmt_t data, pmt_t metadata, mb_pri_t priority);
+};
+
+#endif /* INCLUDED_MB_MSG_ACCEPTER_MSGQ_H */
Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_base.h
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_base.h
2007-04-19 08:01:39 UTC (rev 5049)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_base.h
2007-04-19 19:50:06 UTC (rev 5050)
@@ -33,6 +33,9 @@
{
omni_mutex d_brl; // big runtime lock (avoid using this if
possible...)
+protected:
+ mb_msg_accepter_sptr d_accepter;
+
public:
/*!
@@ -68,6 +71,9 @@
mb_msg_accepter_sptr accepter);
virtual void
cancel_timeout(pmt_t handle);
+
+ mb_msg_accepter_sptr
+ accepter() { return d_accepter; }
};
Modified:
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
===================================================================
---
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
2007-04-19 08:01:39 UTC (rev 5049)
+++
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.cc
2007-04-19 19:50:06 UTC (rev 5050)
@@ -30,12 +30,17 @@
#include <mb_worker.h>
#include <omnithread.h>
#include <iostream>
+#include <mb_msg_accepter_msgq.h>
+
+static pmt_t s_halt = pmt_intern("%halt");
static pmt_t s_sys_port = pmt_intern("%sys-port");
static pmt_t s_shutdown = pmt_intern("%shutdown");
-static pmt_t s_halt = pmt_intern("%halt");
+static pmt_t s_shutdown_request = pmt_intern("%shutdown-request");
+static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed");
+static pmt_t s_timeout = pmt_intern("%timeout");
+static pmt_t s_timeout_request = pmt_intern("%timeout-request");
-
static void
send_sys_msg(mb_msg_queue &msgq, pmt_t signal,
pmt_t data = PMT_F, pmt_t metadata = PMT_F,
@@ -48,10 +53,10 @@
mb_runtime_thread_per_block::mb_runtime_thread_per_block()
- : d_runtime_cond(&d_mutex),
- d_shutdown_requested(false), d_shutdown_in_progress(false),
+ : d_shutdown_in_progress(false),
d_shutdown_result(PMT_T)
{
+ d_accepter = mb_msg_accepter_sptr(new mb_msg_accepter_msgq(&d_msgq));
}
mb_runtime_thread_per_block::~mb_runtime_thread_per_block()
@@ -66,13 +71,7 @@
void
mb_runtime_thread_per_block::request_shutdown(pmt_t result)
{
- omni_mutex_lock l1(d_mutex);
-
- if (!d_shutdown_requested){
- d_shutdown_result = result;
- d_shutdown_requested = true;
- d_runtime_cond.broadcast();
- }
+ (*accepter())(s_shutdown_request, result, PMT_F, MB_PRI_BEST);
}
bool
@@ -84,7 +83,6 @@
*result = PMT_F;
// reset the shutdown state
- d_shutdown_requested = false;
d_shutdown_in_progress = false;
d_shutdown_result = PMT_T;
@@ -114,31 +112,40 @@
void
mb_runtime_thread_per_block::run_loop()
{
- /*
- * FIXME probably ought to recode this to use a message queue
- * and state machine like the rest of the world ;)
- */
+ while (1){
- omni_mutex_lock l1(d_mutex);
+ // FIXME change this to a timed wait
+ mb_message_sptr msg = d_msgq.get_highest_pri_msg();
- while (1){
+ if (!msg){
+ // FIXME we timed out, must be time to send somebody a %timeout
+ continue;
+ }
- reap_dead_workers();
+ pmt_t signal = msg->signal();
- if (d_workers.empty()) // no work left to do...
- return;
+ if (pmt_eq(signal, s_worker_state_changed)){ // %worker-state-changed
+ omni_mutex_lock l1(d_mutex);
+ reap_dead_workers();
+ if (d_workers.empty()) // no work left to do...
+ return;
+ }
+ else if (pmt_eq(signal, s_shutdown_request)){ // %shutdown-request
+ if (!d_shutdown_in_progress){
+ d_shutdown_in_progress = true;
+ d_shutdown_result = msg->data();
- if (d_shutdown_requested && !d_shutdown_in_progress){
- d_shutdown_in_progress = true;
+ // FIXME state machine, delay before sending %halt
+ send_all_sys_msg(s_shutdown);
+ send_all_sys_msg(s_halt);
- // FIXME state machine, delay before sending %halt
- send_all_sys_msg(s_shutdown);
- send_all_sys_msg(s_halt);
-
- continue;
+ }
}
-
- d_runtime_cond.wait(); // wait for something to do.
+ //else if (pmt_eq(signal, s_timeout_request)){ // %timeout-request
+ //}
+ else {
+ std::cerr << "mb_runtime_thread_per_block: unhandled msg: " << msg <<
std::endl;
+ }
}
}
Modified:
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
===================================================================
---
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
2007-04-19 08:01:39 UTC (rev 5049)
+++
gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_runtime_thread_per_block.h
2007-04-19 19:50:06 UTC (rev 5050)
@@ -23,6 +23,7 @@
#include <mb_runtime_base.h>
#include <mb_worker.h>
+#include <mb_msg_queue.h>
/*!
* \brief Concrete runtime that uses a thread per mblock
@@ -34,14 +35,11 @@
{
public:
omni_mutex d_mutex;
- omni_condition d_runtime_cond; // runtime waits here
- //std::vector<mb_worker_sptr> d_workers;
std::vector<mb_worker*> d_workers;
- bool d_shutdown_requested;
bool d_shutdown_in_progress;
pmt_t d_shutdown_result;
+ mb_msg_queue d_msgq;
- //typedef std::vector<mb_worker_sptr>::iterator worker_iter_t;
typedef std::vector<mb_worker*>::iterator worker_iter_t;
mb_runtime_thread_per_block();
Modified: gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
2007-04-19 08:01:39 UTC (rev 5049)
+++ gnuradio/branches/developers/eb/ibu/mblock/src/lib/mb_worker.cc
2007-04-19 19:50:06 UTC (rev 5050)
@@ -27,6 +27,7 @@
#include <mb_exception.h>
#include <mb_mblock.h>
#include <mb_gettid.h>
+#include <mb_msg_accepter.h>
#include <iostream>
#ifdef HAVE_SCHED_H
#include <sched.h>
@@ -34,6 +35,10 @@
#define VERBOSE 0 // define to 0 or 1
+
+static pmt_t s_worker_state_changed = pmt_intern("%worker-state-changed");
+
+
mb_worker::mb_worker(mb_runtime_thread_per_block *runtime,
mb_mblock_maker_t maker,
const std::string &instance_name,
@@ -81,12 +86,15 @@
void
mb_worker::set_state(worker_state_t state)
{
- omni_mutex_lock l1(d_runtime->d_mutex); // lock runtime first, then
worker
- omni_mutex_lock l2(d_mutex);
+ {
+ omni_mutex_lock l2(d_mutex);
- d_state = state; // update our state
- d_state_cond.broadcast(); // Notify everybody who cares...
- d_runtime->d_runtime_cond.broadcast();
+ d_state = state; // update our state
+ d_state_cond.broadcast(); // Notify everybody who cares...
+ }
+
+ // send msg to runtime, telling it something changed.
+ (*d_runtime->accepter())(s_worker_state_changed, PMT_F, PMT_F, MB_PRI_BEST);
}
void *
Modified: gnuradio/branches/developers/eb/ibu/pmt/src/lib/pmt.cc
===================================================================
--- gnuradio/branches/developers/eb/ibu/pmt/src/lib/pmt.cc 2007-04-19
08:01:39 UTC (rev 5049)
+++ gnuradio/branches/developers/eb/ibu/pmt/src/lib/pmt.cc 2007-04-19
19:50:06 UTC (rev 5050)
@@ -665,7 +665,7 @@
pmt_any_set(pmt_t obj, const boost::any &any)
{
if (!obj->is_any())
- throw pmt_wrong_type("pmt_any_ref", obj);
+ throw pmt_wrong_type("pmt_any_set", obj);
_any(obj)->set(any);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r5050 - in gnuradio/branches/developers/eb/ibu: mblock/src/lib pmt/src/lib,
eb <=