discuss-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Discuss-gnuradio] Re: Msg Queues and Watcher Threads


From: Steven Clark
Subject: [Discuss-gnuradio] Re: Msg Queues and Watcher Threads
Date: Wed, 5 Mar 2008 18:07:06 -0500

On Tue, Mar 4, 2008 at 2:27 PM, Steven Clark <address@hidden> wrote:
> I like the use of the watcher thread in pkt.demod_pkts, and was
>  wondering if I could use a similar technique in the following
>  situation:
>
>  Two disconnected subgraphs:
>  msg_source_1  ->  transform_blk_1  ->  msg_sink_1
>
>  msg_source_2  ->  modulator
>
>  The user places 'm'-byte messages in msg_source_1.
>  transform_blk consumes 'm' bytes and produces 'n' different bytes.
>  A queue watcher thread takes the 'n'-byte messages from msg_sink_1,
>  applies a second transform, and places the result in msg_source_2.
>
>  I want to use this for encoding packets with FEC (transform_blk_1 is a
>  viterbi_encoder, and the queue watcher thread calls
>  packet_utils.make_packet(msg) when msg = self.msg_queue.delete_head()
>  unblocks).
>  Does this seem reasonable?
>
>  Is it guaranteed that x messages into msg_source_1 produces x messages
>  into msg_source_2? (In other words, is there any danger of messages
>  being combined?)
>
>  Also, if transform_blk_1 has some saved state that I'd like to reset
>  in between packets, is it possible/safe to do so from the Queue
>  watcher thread? I'm worried about race conditions...
>
>  -Steven
>


I wrote some code, and am seeing some message queue / message sink
related weirdness.
snippets:

class my_top_block(gr.top_block):
    def __init__(self):
        gr.top_block.__init__(self)

        f=trellis.fsm(fsm_pn)

        pkt_input = gr.message_source(gr.sizeof_char, msg_queue_limit)
        self.input_queue = pkt_input.msgq()
        enc = trellis.encoder_bb(f,0) # initial state = 0
        joiner = gr.unpacked_to_packed_bb(2,gr.GR_MSB_FIRST)
        #splitter = gr.packed_to_unpacked_bb(1,gr.GR_MSB_FIRST)

        self.output_queue = gr.msg_queue(msg_queue_limit)
        pkt_output = gr.message_sink(gr.sizeof_char, self.output_queue, True)

        self.connect(pkt_input, enc, joiner, pkt_output)
        #self.connect(pkt_input, enc, joiner, splitter, pkt_output)


def main():
    f = open('../pn15.dat') #(2^15-1 bytes of 0x00 or 0x01)
    d = f.read()
    f.close()

    tb = my_top_block()
    iq = tb.input_queue
    oq = tb.output_queue

    tb.start()

    for p in gen_packets(d, num_uncoded_bits_per_packet, num_packets):
        iq.insert_tail(gr.message_from_string(p))
        msg = oq.delete_head()
        data = msg.to_string()
        print len(data)
        #print_pkt(data)

    iq.insert_tail(gr.message(1))

    tb.wait()

    print 'Exiting.'

As a result of the "joiner" block, I expect to see a 4:1 rate
reduction in bytes. So when I put 10 messages of length 1024 into the
input queue, I expect the output queue to get 10 messages of length
256. Instead I see output message lengths:
255
1
511
1
511
1
511
1
511
1

Any idea what is going on? Full code is attached.

Attachment: test1.py
Description: Text Data


reply via email to

[Prev in Thread] Current Thread [Next in Thread]