discuss-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Discuss-gnuradio] Merge flow-graph paths


From: Aaron Swan
Subject: [Discuss-gnuradio] Merge flow-graph paths
Date: Mon, 11 Dec 2017 15:47:30 -0700

Hi,

I appear to be running into a deadlock issue when I try to merge two flow-graph paths. I've created an FFT block to perform strided FFTs. The basic idea is to store samples in an internal buffer until I have a full matrix. Once a matrix is full, perform a strided FFT into a second internal buffer and write to output. I'm consuming all samples, but not always returning samples.

This block works great in a flow-graph until I try to merge two paths where one path contains the FFT block. So my simplified flow graph looks like

[Tags strobe] --> [Throttle] --> [Strided FFT] --> [Multiply] --> [Null sink],

where there is a second connection from [Throttle] --> [Multiply].

Some restrictions: I need to use tags, so not using vectors on input/output makes things easier. I'm dealing with matrix sizes on the order of 128x512. I need to avoid tagged stream blocks.

Some things I've tried: Inserting copy blocks in various places. Setting the min buffer size on the advanced tab of all the blocks to be 10*(matrix size). Inserting delay blocks on the second path. Modifying the forecast function to return nOutputs, 0, and rounded up to an integer multiple of the matrix size.

Inserting a Strided FFT block on the second path does seem to get things to move, but inserting a delay does not. I could create a block that uses the same buffering scheme as the FFT, but I'm uncertain if that is an actual solution.

Any thoughts as to why this deadlocks? See code below.

Thanks in advance,

Aaron


import numpy as np
from gnuradio import gr

class fft_py_cc(gr.basic_block):
    """
    Perform FFT/IFFT on a matrix of dim0 x dim1 where dim1 is contiguous.
    """
    def __init__(self, dim0, dim1, axis=0, forward=True):

        self.axis = axis
        self.dim0 = dim0
        self.dim1 = dim1
        self.forward = forward

        self.buffer = np.zeros((dim0,dim1), dtype=np.complex64)
        self.bufferOut = np.zeros((dim0,dim1), dtype=np.complex64)

        self.iBuffer = 0
        self.iBufferOut = dim0*dim1 # Set to the last sample to indicate not valid

        gr.basic_block.__init__(self,
            name="fft_py_cc",
            in_sig=[np.complex64],
            out_sig=[np.complex64])

    def forecast(self, noutput_items, ninput_items_required):
        nMatrix = self.dim0*self.dim1
        nRequired = np.floor( (noutput_items - 1)/nMatrix )*nMatrix + nMatrix
        for i in range(len(ninput_items_required)):
            ninput_items_required[i] = int(nRequired)
            #ninput_items_required[i] = noutput_items
            #ninput_items_required[i] = 0

    def general_work(self, input_items, output_items):

        nIn = input_items[0].shape[0]
        nOut = output_items[0].shape[0]
        iIn = 0
        iOut = 0

        nBuf = self.dim0*self.dim1
        nBo = self.dim0*self.dim1
        iBuf = self.iBuffer
        iBo = self.iBufferOut

        iteration = 0

        while True:

            # Copy input samples to input buffer
            nBufferRemain = nBuf - iBuf
            nInputRemain = nIn - iIn
            nCopy = min(nBufferRemain, nInputRemain)
            if nCopy > 0:
                self.buffer.ravel()[iBuf:iBuf + nCopy] = input_items[0][iIn:iIn + nCopy]
                iIn += nCopy
                iBuf += nCopy

            # FFT if ready (input buffer is full and output buffer is available)
            if iBuf == nBuf and iBo == nBo:
                if self.forward:
                    self.bufferOut[:] = np.fft.fft(self.buffer, axis=self.axis)
                else:
                    self.bufferOut[:] = np.fft.ifft(self.buffer, axis=self.axis)

                iBuf = 0
                iBo = 0

            # Copy to output
            nBufferOutRemain = nBo - iBo
            nOutputRemain = nOut - iOut
            nCopy = min(nBufferOutRemain, nOutputRemain)
            if nCopy > 0:
                output_items[0][iOut:iOut + nCopy] = self.bufferOut.ravel()[iBo:iBo + nCopy]
                iOut += nCopy
                iBo += nCopy

            # Check if we are done
            if iOut >= nOut or iIn >= nIn:
                break

            iteration += 1
            if iteration == 10000000:
                print "Error: FFT may be stuck."

        # Update and report consumed and processed samples
        self.iBuffer = iBuf
        self.iBufferOut = iBo

        self.consume(0, iIn)

        return iOut

reply via email to

[Prev in Thread] Current Thread [Next in Thread]