[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r5960 - in grc/trunk/src: . SignalBlockDefs
From: |
jblum |
Subject: |
[Commit-gnuradio] r5960 - in grc/trunk/src: . SignalBlockDefs |
Date: |
Sat, 14 Jul 2007 00:05:01 -0600 (MDT) |
Author: jblum
Date: 2007-07-14 00:05:00 -0600 (Sat, 14 Jul 2007)
New Revision: 5960
Modified:
grc/trunk/src/ExecFlowGraph.py
grc/trunk/src/MathExprParser.py
grc/trunk/src/SignalBlockDefs/Audio.py
grc/trunk/src/SignalBlockDefs/Filters.py
grc/trunk/src/SignalBlockDefs/Misc.py
grc/trunk/src/SignalBlockDefs/Packet.py
Log:
split up callbacks into ones needed locks and ones that did not
Modified: grc/trunk/src/ExecFlowGraph.py
===================================================================
--- grc/trunk/src/ExecFlowGraph.py 2007-07-14 05:50:20 UTC (rev 5959)
+++ grc/trunk/src/ExecFlowGraph.py 2007-07-14 06:05:00 UTC (rev 5960)
@@ -25,7 +25,7 @@
import Variables
from Elements import SignalBlock
from gnuradio import gr
-import os
+import os,time
from Constants import FLOW_GRAPH_FILE_EXTENSION,MUTEX
from optparse import OptionParser
@@ -40,6 +40,7 @@
gr.top_block.__init__(self, file_path)
#internal data structures
self.callbacks = list()
+ self.callbacks_locked = list()
self.var_keys = list()
self.runtime = gr.runtime(self)
self.started = False
@@ -68,26 +69,49 @@
Exiting!"""
exit(-1)
- def add_callback(self, function, data_type_params):
- """Register a callback function with its associated data."""
+ def add_callback(self, function, *data_type_params):
+ """
+ Register a callback function with its associated data.
+ @param function the callback function
+ @param data_type_params a list of data types
+ """
self.callbacks.append((function, data_type_params))
+ def add_callback_locked(self, function, *data_type_params):
+ """
+ Register a callback function with its associated data.
+ These callbacks will be called inside of a lock/unlock sequence.
+ @param function the callback function
+ @param data_type_params a list of data types
+ """
+ self.callbacks_locked.append((function, data_type_params))
+
+ def _parse_callback(self, function, *data_type_params):
+ """
+ Parse a single callback. Call function on the data types.
+ @param function the callback function
+ @param data_type_params a list of data types
+ """
+ print "***\nBegin A callback\n%s\n\n***"%function
+ try: function(*map(lambda param: param.parse(),
data_type_params))
+ except Exception, e: print "***\n\nerror parsing a callback ->
ignoring\n%s...\n\n***"%e
+ print "***\nEnd A callback\n***"
+
def parse_callbacks(self):
"""For each call back, parse all of the data and
call the registered callback function on that data."""
MUTEX.lock()
print "***\n\nCallback Time BEGIN\n\n***"
- started = self.started
- if started: self._hb.lock()
- for function, data_type_params in self.callbacks:
- print "***\nBegin A callback\n%s\n\n***"%function
- try:
- if type(data_type_params) in (type(list()),
type(tuple())):
- function(*map(lambda param:
param.parse(), data_type_params))
- else: function(data_type_params.parse())
- except Exception, e: print "***\n\nerror parsing a
callback -> ignoring\n%s...\n\n***"%e
- print "***\nEnd A callback\n***"
- if started: self._hb.unlock()
+ if self.started:
+ if self.callbacks: #parse regular callbacks
+ for function, data_type_params in
self.callbacks:
+ self._parse_callback(function,
*data_type_params)
+ if self.callbacks_locked: #parse locked callbacks
+ self._hb.lock()
+ for function, data_type_params in
self.callbacks_locked:
+ self._parse_callback(function,
*data_type_params)
+ self._hb.unlock()
+ time.sleep(.005) #sleep to lower chances
of possible thread-lockup
print "***\n\nCallback Time END\n\n***"
MUTEX.unlock()
Modified: grc/trunk/src/MathExprParser.py
===================================================================
--- grc/trunk/src/MathExprParser.py 2007-07-14 05:50:20 UTC (rev 5959)
+++ grc/trunk/src/MathExprParser.py 2007-07-14 06:05:00 UTC (rev 5960)
@@ -240,7 +240,7 @@
# Boolean tests for special characters and symbols
#########################################################
-def _is_list(symbol): return type(symbol) == type(list())
+def _is_list(symbol): return type(symbol) is list
def _is_function(symbol): return symbol in _FUNCTIONS.keys()
Modified: grc/trunk/src/SignalBlockDefs/Audio.py
===================================================================
--- grc/trunk/src/SignalBlockDefs/Audio.py 2007-07-14 05:50:20 UTC (rev
5959)
+++ grc/trunk/src/SignalBlockDefs/Audio.py 2007-07-14 06:05:00 UTC (rev
5960)
@@ -102,7 +102,7 @@
self._hb.unlock()
#######################################################################################
-## Selector Defs for Audio Source and Audio Sink
+## Defs for Audio Source and Audio Sink
#######################################################################################
def AudioSink(sb):
@@ -115,7 +115,7 @@
If only one audio stream is connected, both channels will receive the
stream.''')
def make(fg, samp_rate):
block = AudioHelper(samp_rate.parse(), SINK)
- fg.add_callback(block.reconstruct, samp_rate)
+ fg.add_callback_locked(block.reconstruct, samp_rate)
return block
return sb, make
@@ -127,7 +127,7 @@
sb.set_docs('''The left output must be connected, The right output is
optional.''')
def make(fg, samp_rate):
block = AudioHelper(samp_rate.parse(), SOURCE)
- fg.add_callback(block.reconstruct, samp_rate)
+ fg.add_callback_locked(block.reconstruct, samp_rate)
return block
return sb, make
Modified: grc/trunk/src/SignalBlockDefs/Filters.py
===================================================================
--- grc/trunk/src/SignalBlockDefs/Filters.py 2007-07-14 05:50:20 UTC (rev
5959)
+++ grc/trunk/src/SignalBlockDefs/Filters.py 2007-07-14 06:05:00 UTC (rev
5960)
@@ -162,7 +162,7 @@
decimation = decimation.parse()
taps = taps_maker(*map(lambda data:data.parse(), taps_args))
block = filter(decimation, taps)
- fg.add_callback(lambda *args: block.set_taps(taps_maker(*args)),
taps_args)
+ fg.add_callback(lambda *args: block.set_taps(taps_maker(*args)),
*taps_args)
return block
window_choices = [
Modified: grc/trunk/src/SignalBlockDefs/Misc.py
===================================================================
--- grc/trunk/src/SignalBlockDefs/Misc.py 2007-07-14 05:50:20 UTC (rev
5959)
+++ grc/trunk/src/SignalBlockDefs/Misc.py 2007-07-14 06:05:00 UTC (rev
5960)
@@ -199,8 +199,8 @@
input_index.parse(),
output_index.parse(),
)
- fg.add_callback(block.set_input_index, input_index)
- fg.add_callback(block.set_output_index, output_index)
+ fg.add_callback_locked(block.set_input_index, input_index)
+ fg.add_callback_locked(block.set_output_index, output_index)
return block
return sb, make
@@ -215,8 +215,8 @@
sb.set_docs('''When open is 0, the valve will forward data.''')
def make(fg, type, open, vlen):
item_size = type.parse().get_num_bytes()*vlen.parse()
- block = SelectorHelper(fg, item_size, 1, 1, 0, open.parse())
- fg.add_callback(block.set_output_index, open)
+ block = SelectorHelper(item_size, 1, 1, 0, open.parse())
+ fg.add_callback_locked(block.set_output_index, open)
return block
return sb, make
Modified: grc/trunk/src/SignalBlockDefs/Packet.py
===================================================================
--- grc/trunk/src/SignalBlockDefs/Packet.py 2007-07-14 05:50:20 UTC (rev
5959)
+++ grc/trunk/src/SignalBlockDefs/Packet.py 2007-07-14 06:05:00 UTC (rev
5960)
@@ -89,7 +89,7 @@
modulator.samples_per_symbol = lambda: samples_per_symbol
modulator.bits_per_symbol = lambda: bits_per_symbol
#create the packet modulator (handles the output data stream)
- packet_mod = blks.mod_pkts(
+ self.packet_mod = blks.mod_pkts(
fg=self,
modulator=modulator,
access_code=access_code,
@@ -98,13 +98,21 @@
use_whitener_offset=use_whitener_offset,
)
#the message sink (handles the input data stream)
- msgq = gr.msg_queue(DEFAULT_QUEUE_LIMIT)
- msg_sink = gr.message_sink(item_size, msgq, False)
+ self.msgq = gr.msg_queue(DEFAULT_QUEUE_LIMIT)
+ msg_sink = gr.message_sink(item_size, self.msgq, False)
#connections
- self.connect(packet_mod.tail, self)
+ self.connect(self.packet_mod.tail, self)
self.connect(self, msg_sink)
#create/start the thread
- PacketModThread(msgq, packet_mod.send_pkt, packet_length)
+ PacketModThread(self.msgq, self.packet_mod.send_pkt,
packet_length)
+
+ def flush(self):
+ """
+ Flush the message queues.
+ Special locked callback to avoid thread lockup.
+ """
+ self.msgq.flush()
+ self.packet_mod._pkt_input.msgq().flush()
class PacketDemodHelper(gr.hier_block2):
"""Forward data from demod packet to the gr data stream."""
@@ -185,7 +193,7 @@
packet_length = packet_length.parse()
if packet_length%item_size != 0: #verify that packet
length is a multiple of the stream size
raise ValueError('The packet length: "%d" is not a
mutiple of the stream size: "%d".'%(packet_length, item_size))
- return PacketModHelper(
+ block = PacketModHelper(
item_size=item_size,
packet_length=packet_length,
samples_per_symbol=samples_per_symbol.parse(),
@@ -194,6 +202,8 @@
pad_for_usrp=pad_for_usrp.parse(),
use_whitener_offset=use_whitener_offset.parse(),
) #build packet modulator
+ fg.add_callback_locked(block.flush)
+ return block
return sb, make
def PacketDemod(sb):
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r5960 - in grc/trunk/src: . SignalBlockDefs,
jblum <=