#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: Testing Python Sinks # GNU Radio version: 3.8.1.0-as import os import signal import sys from gnuradio import blocks from gnuradio import gr import numpy class my_null_sink(gr.sync_block): def __init__(self, item=numpy.uint32, vlen=1): gr.sync_block.__init__( self, name="my_null_sink", in_sig=[(item, vlen)], out_sig=None) print('***** my_null_sink init ' + self.alias()) def work(self, input_items, output_items): return len(input_items[0]) class python_sink(gr.top_block): def __init__(self): gr.top_block.__init__(self, "Testing Python Sinks") file_source1 = blocks.file_source(gr.sizeof_gr_complex, '/dev/zero', False) # file_source2 = blocks.file_source(gr.sizeof_gr_complex, '/dev/zero', False) null_sink1 = blocks.null_sink(gr.sizeof_gr_complex) my_null_sink1 = my_null_sink(item=numpy.complex64) my_null_sink2 = my_null_sink(item=numpy.complex64) # self.connect(file_source1, null_sink1) self.connect(file_source1, my_null_sink1) self.connect(file_source1, my_null_sink2) # self.connect(file_source2, my_null_sink2) # print('Blocked waiting for GDB attach (pid = {})'.format(os.getpid())) # input('Press Enter to continue: ') def main(top_block_cls=python_sink, options=None): tb = top_block_cls() def sig_handler(sig=None, frame=None): tb.stop() tb.wait() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) tb.start() tb.wait() if __name__ == '__main__': main()