#!/usr/bin/env python2 # -*- coding: utf-8 -*- ################################################## # GNU Radio Python Flow Graph # Title: segfault_test # Author: Devin Kelly # Generated: Mon Mar 7 14:21:06 2016 ################################################## if __name__ == '__main__': import ctypes import sys if sys.platform.startswith('linux'): try: x11 = ctypes.cdll.LoadLibrary('libX11.so') x11.XInitThreads() except: print "Warning: failed to XInitThreads()" from PyQt4 import Qt from gnuradio import blocks from gnuradio import digital from gnuradio import eng_notation from gnuradio import gr from gnuradio import qtgui from gnuradio.eng_option import eng_option from gnuradio.filter import firdes from gnuradio.qtgui import Range, RangeWidget from optparse import OptionParser import pmt import sip import sys class segfault_test(gr.top_block, Qt.QWidget): def __init__(self): gr.top_block.__init__(self, "segfault_test") Qt.QWidget.__init__(self) self.setWindowTitle("segfault_test") try: self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) except: pass self.top_scroll_layout = Qt.QVBoxLayout() self.setLayout(self.top_scroll_layout) self.top_scroll = Qt.QScrollArea() self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) self.top_scroll_layout.addWidget(self.top_scroll) self.top_scroll.setWidgetResizable(True) self.top_widget = Qt.QWidget() self.top_scroll.setWidget(self.top_widget) self.top_layout = Qt.QVBoxLayout(self.top_widget) self.top_grid_layout = Qt.QGridLayout() self.top_layout.addLayout(self.top_grid_layout) self.settings = Qt.QSettings("GNU Radio", "segfault_test") self.restoreGeometry(self.settings.value("geometry").toByteArray()) ################################################## # Variables ################################################## self.sps = sps = 4 self.preamble = preamble = [1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0] self.bpsk = bpsk = digital.constellation_calcdist(([-1, 1]), ([0, 1]), 2, 1).base() self.samp_rate = samp_rate = 800e3 self.rxmod = rxmod = digital.generic_mod(bpsk, False, sps, True, 0.35, False, False) self.preamble_packed = preamble_packed = [int(''.join(map(str, preamble[ii:kk])), 2) for ii, kk in zip(range(0, len(preamble), 8), range(7, len(preamble) + 8, 8))] self.nfilts = nfilts = 32 self.access_code = access_code = [1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0] self.rrc_taps = rrc_taps = firdes.root_raised_cosine(nfilts, nfilts * sps, 1, 0.35, 11*sps*nfilts) self.rf_gain = rf_gain = 20 self.preamble_packed_orig = preamble_packed_orig = [0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc] self.offset = offset = 1500 self.modulated_sync_word = modulated_sync_word = digital.modulate_vector_bc(rxmod .to_basic_block(), (preamble_packed), ([1])) self.firdes_taps_highpass = firdes_taps_highpass = firdes.high_pass(1, samp_rate, 1e3, 500, firdes.WIN_BLACKMAN, 6.76) self.fc = fc = 450e6 self.access_code_str_0 = access_code_str_0 = ''.join(map(str, access_code)) ################################################## # Blocks ################################################## self._rf_gain_range = Range(1, 80, 1, 20, 200) self._rf_gain_win = RangeWidget(self._rf_gain_range, self.set_rf_gain, "rf_gain", "counter_slider", float) self.top_layout.addWidget(self._rf_gain_win) self.qtgui_time_sink_x_0_0_0 = qtgui.time_sink_c( 5000, #size samp_rate , #samp_rate "", #name 1 #number of inputs ) self.qtgui_time_sink_x_0_0_0.set_update_time(0.10) self.qtgui_time_sink_x_0_0_0.set_y_axis(-0.05, 0.05) self.qtgui_time_sink_x_0_0_0.set_y_label("Amplitude", "") self.qtgui_time_sink_x_0_0_0.enable_tags(-1, True) self.qtgui_time_sink_x_0_0_0.set_trigger_mode(qtgui.TRIG_MODE_AUTO, qtgui.TRIG_SLOPE_POS, 0.003, 0, 0, "") self.qtgui_time_sink_x_0_0_0.enable_autoscale(False) self.qtgui_time_sink_x_0_0_0.enable_grid(False) self.qtgui_time_sink_x_0_0_0.enable_control_panel(False) if not True: self.qtgui_time_sink_x_0_0_0.disable_legend() labels = ["", "", "", "", "", "", "", "", "", ""] widths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] colors = ["blue", "red", "green", "black", "cyan", "magenta", "yellow", "dark red", "dark green", "blue"] styles = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] markers = [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1] alphas = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] for i in xrange(2*1): if len(labels[i]) == 0: if(i % 2 == 0): self.qtgui_time_sink_x_0_0_0.set_line_label(i, "Re{{Data {0}}}".format(i/2)) else: self.qtgui_time_sink_x_0_0_0.set_line_label(i, "Im{{Data {0}}}".format(i/2)) else: self.qtgui_time_sink_x_0_0_0.set_line_label(i, labels[i]) self.qtgui_time_sink_x_0_0_0.set_line_width(i, widths[i]) self.qtgui_time_sink_x_0_0_0.set_line_color(i, colors[i]) self.qtgui_time_sink_x_0_0_0.set_line_style(i, styles[i]) self.qtgui_time_sink_x_0_0_0.set_line_marker(i, markers[i]) self.qtgui_time_sink_x_0_0_0.set_line_alpha(i, alphas[i]) self._qtgui_time_sink_x_0_0_0_win = sip.wrapinstance(self.qtgui_time_sink_x_0_0_0.pyqwidget(), Qt.QWidget) self.top_layout.addWidget(self._qtgui_time_sink_x_0_0_0_win) self._offset_range = Range(-5000, 5000, 1, 1500, 200) self._offset_win = RangeWidget(self._offset_range, self.set_offset, "offset", "counter_slider", int) self.top_layout.addWidget(self._offset_win) self._fc_range = Range(400e6, 500e6, 1e3, 450e6, 200) self._fc_win = RangeWidget(self._fc_range, self.set_fc, "fc", "counter_slider", float) self.top_layout.addWidget(self._fc_win) self.digital_packet_headergenerator_bb_default_0 = digital.packet_headergenerator_bb(32, "packet_len") self.digital_crc32_async_bb_3 = digital.crc32_async_bb(False) self.digital_corr_est_cc_0 = digital.corr_est_cc((modulated_sync_word), sps, 1, 2) self.digital_constellation_modulator_0 = digital.generic_mod( constellation=bpsk, differential=False, samples_per_symbol=sps, pre_diff_code=True, excess_bw=0.35, verbose=False, log=False, ) self.blocks_vector_source_x_0_0 = blocks.vector_source_b(preamble, True, 1, "") self.blocks_tagged_stream_mux_0 = blocks.tagged_stream_mux(gr.sizeof_char*1, "packet_len", 2) self.blocks_tagged_stream_multiply_length_0 = blocks.tagged_stream_multiply_length(gr.sizeof_gr_complex*1, "packet_len", sps) self.blocks_tag_gate_1 = blocks.tag_gate(gr.sizeof_gr_complex * 1, False) self.blocks_tag_debug_2 = blocks.tag_debug(gr.sizeof_gr_complex*1, "", ""); self.blocks_tag_debug_2.set_display(True) self.blocks_stream_to_tagged_stream_0_0 = blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(preamble), "packet_len") self.blocks_repack_bits_bb_1 = blocks.repack_bits_bb(1, 8, "", False, gr.GR_LSB_FIRST) self.blocks_repack_bits_bb_0 = blocks.repack_bits_bb(8, 1, "packet_len", False, gr.GR_LSB_FIRST) self.blocks_random_pdu_0 = blocks.random_pdu(10, 10, chr(0xFF), 2) self.blocks_pdu_to_tagged_stream_0 = blocks.pdu_to_tagged_stream(blocks.byte_t, "packet_len") ################################################## # Connections ################################################## self.msg_connect((self.blocks_random_pdu_0, 'pdus'), (self.digital_crc32_async_bb_3, 'in')) self.msg_connect((self.digital_crc32_async_bb_3, 'out'), (self.blocks_pdu_to_tagged_stream_0, 'pdus')) self.connect((self.blocks_pdu_to_tagged_stream_0, 0), (self.blocks_repack_bits_bb_0, 0)) self.connect((self.blocks_repack_bits_bb_0, 0), (self.blocks_tagged_stream_mux_0, 2)) self.connect((self.blocks_repack_bits_bb_0, 0), (self.digital_packet_headergenerator_bb_default_0, 0)) self.connect((self.blocks_repack_bits_bb_1, 0), (self.digital_constellation_modulator_0, 0)) self.connect((self.blocks_stream_to_tagged_stream_0_0, 0), (self.blocks_tagged_stream_mux_0, 0)) self.connect((self.blocks_tag_gate_1, 0), (self.digital_corr_est_cc_0, 0)) self.connect((self.blocks_tagged_stream_multiply_length_0, 0), (self.blocks_tag_gate_1, 0)) self.connect((self.blocks_tagged_stream_mux_0, 0), (self.blocks_repack_bits_bb_1, 0)) self.connect((self.blocks_vector_source_x_0_0, 0), (self.blocks_stream_to_tagged_stream_0_0, 0)) self.connect((self.digital_constellation_modulator_0, 0), (self.blocks_tagged_stream_multiply_length_0, 0)) self.connect((self.digital_corr_est_cc_0, 0), (self.blocks_tag_debug_2, 0)) self.connect((self.digital_corr_est_cc_0, 0), (self.qtgui_time_sink_x_0_0_0, 0)) self.connect((self.digital_packet_headergenerator_bb_default_0, 0), (self.blocks_tagged_stream_mux_0, 1)) def closeEvent(self, event): self.settings = Qt.QSettings("GNU Radio", "segfault_test") self.settings.setValue("geometry", self.saveGeometry()) event.accept() def get_sps(self): return self.sps def set_sps(self, sps): self.sps = sps self.set_rrc_taps(firdes.root_raised_cosine(self.nfilts, self.nfilts * self.sps, 1, 0.35, 11*self.sps*self.nfilts)) self.blocks_tagged_stream_multiply_length_0.set_scalar(self.sps) self.set_rxmod(digital.generic_mod(self.bpsk, False, self.sps, True, 0.35, False, False)) def get_preamble(self): return self.preamble def set_preamble(self, preamble): self.preamble = preamble self.set_preamble_packed([int(''.join(map(str, self.preamble[ii:kk])), 2) for ii, kk in zip(range(0, len(self.preamble), 8), range(7, len(self.preamble) + 8, 8))]) self.blocks_stream_to_tagged_stream_0_0.set_packet_len(len(self.preamble)) self.blocks_stream_to_tagged_stream_0_0.set_packet_len_pmt(len(self.preamble)) self.blocks_vector_source_x_0_0.set_data(self.preamble, "") def get_bpsk(self): return self.bpsk def set_bpsk(self, bpsk): self.bpsk = bpsk self.set_rxmod(digital.generic_mod(self.bpsk, False, self.sps, True, 0.35, False, False)) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.set_firdes_taps_highpass(firdes.high_pass(1, self.samp_rate, 1e3, 500, firdes.WIN_BLACKMAN, 6.76)) self.qtgui_time_sink_x_0_0_0.set_samp_rate(self.samp_rate ) def get_rxmod(self): return self.rxmod def set_rxmod(self, rxmod): self.rxmod = rxmod def get_preamble_packed(self): return self.preamble_packed def set_preamble_packed(self, preamble_packed): self.preamble_packed = preamble_packed def get_nfilts(self): return self.nfilts def set_nfilts(self, nfilts): self.nfilts = nfilts self.set_rrc_taps(firdes.root_raised_cosine(self.nfilts, self.nfilts * self.sps, 1, 0.35, 11*self.sps*self.nfilts)) def get_access_code(self): return self.access_code def set_access_code(self, access_code): self.access_code = access_code self.set_access_code_str_0(''.join(map(str, self.access_code))) def get_rrc_taps(self): return self.rrc_taps def set_rrc_taps(self, rrc_taps): self.rrc_taps = rrc_taps def get_rf_gain(self): return self.rf_gain def set_rf_gain(self, rf_gain): self.rf_gain = rf_gain def get_preamble_packed_orig(self): return self.preamble_packed_orig def set_preamble_packed_orig(self, preamble_packed_orig): self.preamble_packed_orig = preamble_packed_orig def get_offset(self): return self.offset def set_offset(self, offset): self.offset = offset def get_modulated_sync_word(self): return self.modulated_sync_word def set_modulated_sync_word(self, modulated_sync_word): self.modulated_sync_word = modulated_sync_word def get_firdes_taps_highpass(self): return self.firdes_taps_highpass def set_firdes_taps_highpass(self, firdes_taps_highpass): self.firdes_taps_highpass = firdes_taps_highpass def get_fc(self): return self.fc def set_fc(self, fc): self.fc = fc def get_access_code_str_0(self): return self.access_code_str_0 def set_access_code_str_0(self, access_code_str_0): self.access_code_str_0 = access_code_str_0 def main(top_block_cls=segfault_test, options=None): from distutils.version import StrictVersion if StrictVersion(Qt.qVersion()) >= StrictVersion("4.5.0"): style = gr.prefs().get_string('qtgui', 'style', 'raster') Qt.QApplication.setGraphicsSystem(style) qapp = Qt.QApplication(sys.argv) tb = top_block_cls() tb.start() tb.show() def quitting(): tb.stop() tb.wait() qapp.connect(qapp, Qt.SIGNAL("aboutToQuit()"), quitting) qapp.exec_() if __name__ == '__main__': main()