[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Discuss-gnuradio] Re: Msg Queues and Watcher Threads
From: |
Steven Clark |
Subject: |
[Discuss-gnuradio] Re: Msg Queues and Watcher Threads |
Date: |
Wed, 5 Mar 2008 18:07:06 -0500 |
On Tue, Mar 4, 2008 at 2:27 PM, Steven Clark <address@hidden> wrote:
> I like the use of the watcher thread in pkt.demod_pkts, and was
> wondering if I could use a similar technique in the following
> situation:
>
> Two disconnected subgraphs:
> msg_source_1 -> transform_blk_1 -> msg_sink_1
>
> msg_source_2 -> modulator
>
> The user places 'm'-byte messages in msg_source_1.
> transform_blk consumes 'm' bytes and produces 'n' different bytes.
> A queue watcher thread takes the 'n'-byte messages from msg_sink_1,
> applies a second transform, and places the result in msg_source_2.
>
> I want to use this for encoding packets with FEC (transform_blk_1 is a
> viterbi_encoder, and the queue watcher thread calls
> packet_utils.make_packet(msg) when msg = self.msg_queue.delete_head()
> unblocks).
> Does this seem reasonable?
>
> Is it guaranteed that x messages into msg_source_1 produces x messages
> into msg_source_2? (In other words, is there any danger of messages
> being combined?)
>
> Also, if transform_blk_1 has some saved state that I'd like to reset
> in between packets, is it possible/safe to do so from the Queue
> watcher thread? I'm worried about race conditions...
>
> -Steven
>
I wrote some code, and am seeing some message queue / message sink
related weirdness.
snippets:
class my_top_block(gr.top_block):
def __init__(self):
gr.top_block.__init__(self)
f=trellis.fsm(fsm_pn)
pkt_input = gr.message_source(gr.sizeof_char, msg_queue_limit)
self.input_queue = pkt_input.msgq()
enc = trellis.encoder_bb(f,0) # initial state = 0
joiner = gr.unpacked_to_packed_bb(2,gr.GR_MSB_FIRST)
#splitter = gr.packed_to_unpacked_bb(1,gr.GR_MSB_FIRST)
self.output_queue = gr.msg_queue(msg_queue_limit)
pkt_output = gr.message_sink(gr.sizeof_char, self.output_queue, True)
self.connect(pkt_input, enc, joiner, pkt_output)
#self.connect(pkt_input, enc, joiner, splitter, pkt_output)
def main():
f = open('../pn15.dat') #(2^15-1 bytes of 0x00 or 0x01)
d = f.read()
f.close()
tb = my_top_block()
iq = tb.input_queue
oq = tb.output_queue
tb.start()
for p in gen_packets(d, num_uncoded_bits_per_packet, num_packets):
iq.insert_tail(gr.message_from_string(p))
msg = oq.delete_head()
data = msg.to_string()
print len(data)
#print_pkt(data)
iq.insert_tail(gr.message(1))
tb.wait()
print 'Exiting.'
As a result of the "joiner" block, I expect to see a 4:1 rate
reduction in bytes. So when I put 10 messages of length 1024 into the
input queue, I expect the output queue to get 10 messages of length
256. Instead I see output message lengths:
255
1
511
1
511
1
511
1
511
1
Any idea what is going on? Full code is attached.
test1.py
Description: Text Data