#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: SQ5BPF Tetra live receiver 1ch simple UDP demo with fixed offset (gnuradio 3.10 version) xmlrpc # Author: Jacek Lipkowski SQ5BPF # Description: This is a receiver flowgraph, uses a rtl-sdr dongle, and outputs a downsampled stream to 127.0.0.1 42001/udp. This can be used with telive, or just osmo-tetra # GNU Radio version: 3.10.5.1 from packaging.version import Version as StrictVersion if __name__ == '__main__': import ctypes import sys if sys.platform.startswith('linux'): try: x11 = ctypes.cdll.LoadLibrary('libX11.so') x11.XInitThreads() except: print("Warning: failed to XInitThreads()") from PyQt5 import Qt from gnuradio import eng_notation from gnuradio import qtgui from gnuradio.filter import firdes import sip from gnuradio import analog from gnuradio import filter from gnuradio import gr from gnuradio.fft import window import sys import signal from argparse import ArgumentParser from gnuradio.eng_arg import eng_float, intx from gnuradio import network from gnuradio.qtgui import Range, RangeWidget from PyQt5 import QtCore import osmosdr import time from gnuradio import qtgui class telive_1ch_simple_gr310_udp(gr.top_block, Qt.QWidget): def __init__(self): gr.top_block.__init__(self, "SQ5BPF Tetra live receiver 1ch simple UDP demo with fixed offset (gnuradio 3.10 version) xmlrpc", catch_exceptions=True) Qt.QWidget.__init__(self) self.setWindowTitle("SQ5BPF Tetra live receiver 1ch simple UDP demo with fixed offset (gnuradio 3.10 version) xmlrpc") qtgui.util.check_set_qss() try: self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) except: pass self.top_scroll_layout = Qt.QVBoxLayout() self.setLayout(self.top_scroll_layout) self.top_scroll = Qt.QScrollArea() self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) self.top_scroll_layout.addWidget(self.top_scroll) self.top_scroll.setWidgetResizable(True) self.top_widget = Qt.QWidget() self.top_scroll.setWidget(self.top_widget) self.top_layout = Qt.QVBoxLayout(self.top_widget) self.top_grid_layout = Qt.QGridLayout() self.top_layout.addLayout(self.top_grid_layout) self.settings = Qt.QSettings("GNU Radio", "telive_1ch_simple_gr310_udp") try: if StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): self.restoreGeometry(self.settings.value("geometry").toByteArray()) else: self.restoreGeometry(self.settings.value("geometry")) except: pass ################################################## # Variables ################################################## self.xlate_offset_fine1 = xlate_offset_fine1 = 0 self.samp_rate = samp_rate = 2000000 self.freq = freq = 438.0125e6 self.first_decim = first_decim = 32 self.xlate_offset1 = xlate_offset1 = 500000 self.variable_qtgui_label_0_0 = variable_qtgui_label_0_0 = (freq+xlate_offset_fine1) self.udp_packet_size = udp_packet_size = 1472 self.udp_dest_addr = udp_dest_addr = "127.0.0.1" self.telive_receiver_name = telive_receiver_name = 'SQ5BPF 1-channel rx for telive' self.telive_receiver_channels = telive_receiver_channels = 1 self.sdr_ifgain = sdr_ifgain = 20 self.sdr_gain = sdr_gain = 30 self.ppm_corr = ppm_corr = 0 self.out_sample_rate = out_sample_rate = 36000 self.options_low_pass = options_low_pass = 12500 self.if_samp_rate = if_samp_rate = samp_rate/first_decim self.first_port = first_port = 42000 ################################################## # Blocks ################################################## self._xlate_offset_fine1_range = Range(-5e3, +5e3, 1, 0, 200) self._xlate_offset_fine1_win = RangeWidget(self._xlate_offset_fine1_range, self.set_xlate_offset_fine1, "Fine tune1", "counter_slider", float, QtCore.Qt.Horizontal) self.top_grid_layout.addWidget(self._xlate_offset_fine1_win, 0, 2, 1, 3) for r in range(0, 1): self.top_grid_layout.setRowStretch(r, 1) for c in range(2, 5): self.top_grid_layout.setColumnStretch(c, 1) self._sdr_gain_range = Range(0, 50, 1, 30, 200) self._sdr_gain_win = RangeWidget(self._sdr_gain_range, self.set_sdr_gain, "gain", "counter_slider", int, QtCore.Qt.Horizontal) self.top_grid_layout.addWidget(self._sdr_gain_win, 0, 8, 1, 2) for r in range(0, 1): self.top_grid_layout.setRowStretch(r, 1) for c in range(8, 10): self.top_grid_layout.setColumnStretch(c, 1) self._ppm_corr_range = Range(-100, 100, 0.5, 0, 200) self._ppm_corr_win = RangeWidget(self._ppm_corr_range, self.set_ppm_corr, "ppm", "counter_slider", float, QtCore.Qt.Horizontal) self.top_grid_layout.addWidget(self._ppm_corr_win, 0, 5, 1, 3) for r in range(0, 1): self.top_grid_layout.setRowStretch(r, 1) for c in range(5, 8): self.top_grid_layout.setColumnStretch(c, 1) self._freq_tool_bar = Qt.QToolBar(self) self._freq_tool_bar.addWidget(Qt.QLabel("Frequency" + ": ")) self._freq_line_edit = Qt.QLineEdit(str(self.freq)) self._freq_tool_bar.addWidget(self._freq_line_edit) self._freq_line_edit.returnPressed.connect( lambda: self.set_freq(eng_notation.str_to_num(str(self._freq_line_edit.text())))) self.top_grid_layout.addWidget(self._freq_tool_bar, 0, 0, 1, 2) for r in range(0, 1): self.top_grid_layout.setRowStretch(r, 1) for c in range(0, 2): self.top_grid_layout.setColumnStretch(c, 1) self._variable_qtgui_label_0_0_tool_bar = Qt.QToolBar(self) if lambda x: f'{x/1000000:.4f} MHz': self._variable_qtgui_label_0_0_formatter = lambda x: f'{x/1000000:.4f} MHz' else: self._variable_qtgui_label_0_0_formatter = lambda x: eng_notation.num_to_str(x) self._variable_qtgui_label_0_0_tool_bar.addWidget(Qt.QLabel("Receive frequency: ")) self._variable_qtgui_label_0_0_label = Qt.QLabel(str(self._variable_qtgui_label_0_0_formatter(self.variable_qtgui_label_0_0))) self._variable_qtgui_label_0_0_tool_bar.addWidget(self._variable_qtgui_label_0_0_label) self.top_layout.addWidget(self._variable_qtgui_label_0_0_tool_bar) self.qtgui_freq_sink_x_0_0 = qtgui.freq_sink_c( 256, #size window.WIN_BLACKMAN_hARRIS, #wintype 0, #fc if_samp_rate, #bw "IF", #name 1, None # parent ) self.qtgui_freq_sink_x_0_0.set_update_time(0.01) self.qtgui_freq_sink_x_0_0.set_y_axis((-180), (-30)) self.qtgui_freq_sink_x_0_0.set_y_label('Relative Gain', 'dB') self.qtgui_freq_sink_x_0_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, 0.0, 0, "") self.qtgui_freq_sink_x_0_0.enable_autoscale(True) self.qtgui_freq_sink_x_0_0.enable_grid(True) self.qtgui_freq_sink_x_0_0.set_fft_average(1.0) self.qtgui_freq_sink_x_0_0.enable_axis_labels(True) self.qtgui_freq_sink_x_0_0.enable_control_panel(True) self.qtgui_freq_sink_x_0_0.set_fft_window_normalized(False) labels = ['', '', '', '', '', '', '', '', '', ''] widths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] colors = ["blue", "red", "green", "black", "cyan", "magenta", "yellow", "dark red", "dark green", "dark blue"] alphas = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] for i in range(1): if len(labels[i]) == 0: self.qtgui_freq_sink_x_0_0.set_line_label(i, "Data {0}".format(i)) else: self.qtgui_freq_sink_x_0_0.set_line_label(i, labels[i]) self.qtgui_freq_sink_x_0_0.set_line_width(i, widths[i]) self.qtgui_freq_sink_x_0_0.set_line_color(i, colors[i]) self.qtgui_freq_sink_x_0_0.set_line_alpha(i, alphas[i]) self._qtgui_freq_sink_x_0_0_win = sip.wrapinstance(self.qtgui_freq_sink_x_0_0.qwidget(), Qt.QWidget) self.top_grid_layout.addWidget(self._qtgui_freq_sink_x_0_0_win, 1, 6, 1, 6) for r in range(1, 2): self.top_grid_layout.setRowStretch(r, 1) for c in range(6, 12): self.top_grid_layout.setColumnStretch(c, 1) self.qtgui_freq_sink_x_0 = qtgui.freq_sink_c( 1024, #size window.WIN_BLACKMAN_hARRIS, #wintype (freq-xlate_offset1), #fc samp_rate, #bw "", #name 1, None # parent ) self.qtgui_freq_sink_x_0.set_update_time(0.10) self.qtgui_freq_sink_x_0.set_y_axis((-140), 10) self.qtgui_freq_sink_x_0.set_y_label('Relative Gain', 'dB') self.qtgui_freq_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, 0.0, 0, "") self.qtgui_freq_sink_x_0.enable_autoscale(False) self.qtgui_freq_sink_x_0.enable_grid(False) self.qtgui_freq_sink_x_0.set_fft_average(0.2) self.qtgui_freq_sink_x_0.enable_axis_labels(True) self.qtgui_freq_sink_x_0.enable_control_panel(True) self.qtgui_freq_sink_x_0.set_fft_window_normalized(False) labels = ['', '', '', '', '', '', '', '', '', ''] widths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] colors = ["blue", "red", "green", "black", "cyan", "magenta", "yellow", "dark red", "dark green", "dark blue"] alphas = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] for i in range(1): if len(labels[i]) == 0: self.qtgui_freq_sink_x_0.set_line_label(i, "Data {0}".format(i)) else: self.qtgui_freq_sink_x_0.set_line_label(i, labels[i]) self.qtgui_freq_sink_x_0.set_line_width(i, widths[i]) self.qtgui_freq_sink_x_0.set_line_color(i, colors[i]) self.qtgui_freq_sink_x_0.set_line_alpha(i, alphas[i]) self._qtgui_freq_sink_x_0_win = sip.wrapinstance(self.qtgui_freq_sink_x_0.qwidget(), Qt.QWidget) self.top_grid_layout.addWidget(self._qtgui_freq_sink_x_0_win, 1, 0, 1, 6) for r in range(1, 2): self.top_grid_layout.setRowStretch(r, 1) for c in range(0, 6): self.top_grid_layout.setColumnStretch(c, 1) self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + '' ) self.osmosdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t()) self.osmosdr_source_0.set_sample_rate(samp_rate) self.osmosdr_source_0.set_center_freq((freq-xlate_offset1), 0) self.osmosdr_source_0.set_freq_corr(ppm_corr, 0) self.osmosdr_source_0.set_dc_offset_mode(0, 0) self.osmosdr_source_0.set_iq_balance_mode(0, 0) self.osmosdr_source_0.set_gain_mode(False, 0) self.osmosdr_source_0.set_gain(sdr_gain, 0) self.osmosdr_source_0.set_if_gain(sdr_ifgain, 0) self.osmosdr_source_0.set_bb_gain(20, 0) self.osmosdr_source_0.set_antenna('', 0) self.osmosdr_source_0.set_bandwidth(0, 0) self.network_udp_sink_0 = network.udp_sink(gr.sizeof_gr_complex, 1, udp_dest_addr, (first_port+1), 0, udp_packet_size, False) self.mmse_resampler_xx_0 = filter.mmse_resampler_cc(0, (float(float(if_samp_rate)/float(out_sample_rate)))) self.freq_xlating_fir_filter_xxx_0 = filter.freq_xlating_fir_filter_ccc(first_decim, firdes.low_pass(1, samp_rate, options_low_pass, options_low_pass*0.2), (xlate_offset1+xlate_offset_fine1), samp_rate) self.analog_agc3_xx_0 = analog.agc3_cc((1e-3), (1e-4), 1.0, 1.0, 1) self.analog_agc3_xx_0.set_max_gain(65536) ################################################## # Connections ################################################## self.connect((self.analog_agc3_xx_0, 0), (self.mmse_resampler_xx_0, 0)) self.connect((self.freq_xlating_fir_filter_xxx_0, 0), (self.analog_agc3_xx_0, 0)) self.connect((self.freq_xlating_fir_filter_xxx_0, 0), (self.qtgui_freq_sink_x_0_0, 0)) self.connect((self.mmse_resampler_xx_0, 0), (self.network_udp_sink_0, 0)) self.connect((self.osmosdr_source_0, 0), (self.freq_xlating_fir_filter_xxx_0, 0)) self.connect((self.osmosdr_source_0, 0), (self.qtgui_freq_sink_x_0, 0)) def closeEvent(self, event): self.settings = Qt.QSettings("GNU Radio", "telive_1ch_simple_gr310_udp") self.settings.setValue("geometry", self.saveGeometry()) self.stop() self.wait() event.accept() def get_xlate_offset_fine1(self): return self.xlate_offset_fine1 def set_xlate_offset_fine1(self, xlate_offset_fine1): self.xlate_offset_fine1 = xlate_offset_fine1 self.set_variable_qtgui_label_0_0((self.freq+self.xlate_offset_fine1)) self.freq_xlating_fir_filter_xxx_0.set_center_freq((self.xlate_offset1+self.xlate_offset_fine1)) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.set_if_samp_rate(self.samp_rate/self.first_decim) self.freq_xlating_fir_filter_xxx_0.set_taps(firdes.low_pass(1, self.samp_rate, self.options_low_pass, self.options_low_pass*0.2)) self.osmosdr_source_0.set_sample_rate(self.samp_rate) self.qtgui_freq_sink_x_0.set_frequency_range((self.freq-self.xlate_offset1), self.samp_rate) def get_freq(self): return self.freq def set_freq(self, freq): self.freq = freq Qt.QMetaObject.invokeMethod(self._freq_line_edit, "setText", Qt.Q_ARG("QString", eng_notation.num_to_str(self.freq))) self.set_variable_qtgui_label_0_0((self.freq+self.xlate_offset_fine1)) self.osmosdr_source_0.set_center_freq((self.freq-self.xlate_offset1), 0) self.qtgui_freq_sink_x_0.set_frequency_range((self.freq-self.xlate_offset1), self.samp_rate) def get_first_decim(self): return self.first_decim def set_first_decim(self, first_decim): self.first_decim = first_decim self.set_if_samp_rate(self.samp_rate/self.first_decim) def get_xlate_offset1(self): return self.xlate_offset1 def set_xlate_offset1(self, xlate_offset1): self.xlate_offset1 = xlate_offset1 self.freq_xlating_fir_filter_xxx_0.set_center_freq((self.xlate_offset1+self.xlate_offset_fine1)) self.osmosdr_source_0.set_center_freq((self.freq-self.xlate_offset1), 0) self.qtgui_freq_sink_x_0.set_frequency_range((self.freq-self.xlate_offset1), self.samp_rate) def get_variable_qtgui_label_0_0(self): return self.variable_qtgui_label_0_0 def set_variable_qtgui_label_0_0(self, variable_qtgui_label_0_0): self.variable_qtgui_label_0_0 = variable_qtgui_label_0_0 Qt.QMetaObject.invokeMethod(self._variable_qtgui_label_0_0_label, "setText", Qt.Q_ARG("QString", str(self._variable_qtgui_label_0_0_formatter(self.variable_qtgui_label_0_0)))) def get_udp_packet_size(self): return self.udp_packet_size def set_udp_packet_size(self, udp_packet_size): self.udp_packet_size = udp_packet_size def get_udp_dest_addr(self): return self.udp_dest_addr def set_udp_dest_addr(self, udp_dest_addr): self.udp_dest_addr = udp_dest_addr def get_telive_receiver_name(self): return self.telive_receiver_name def set_telive_receiver_name(self, telive_receiver_name): self.telive_receiver_name = telive_receiver_name def get_telive_receiver_channels(self): return self.telive_receiver_channels def set_telive_receiver_channels(self, telive_receiver_channels): self.telive_receiver_channels = telive_receiver_channels def get_sdr_ifgain(self): return self.sdr_ifgain def set_sdr_ifgain(self, sdr_ifgain): self.sdr_ifgain = sdr_ifgain self.osmosdr_source_0.set_if_gain(self.sdr_ifgain, 0) def get_sdr_gain(self): return self.sdr_gain def set_sdr_gain(self, sdr_gain): self.sdr_gain = sdr_gain self.osmosdr_source_0.set_gain(self.sdr_gain, 0) def get_ppm_corr(self): return self.ppm_corr def set_ppm_corr(self, ppm_corr): self.ppm_corr = ppm_corr self.osmosdr_source_0.set_freq_corr(self.ppm_corr, 0) def get_out_sample_rate(self): return self.out_sample_rate def set_out_sample_rate(self, out_sample_rate): self.out_sample_rate = out_sample_rate self.mmse_resampler_xx_0.set_resamp_ratio((float(float(self.if_samp_rate)/float(self.out_sample_rate)))) def get_options_low_pass(self): return self.options_low_pass def set_options_low_pass(self, options_low_pass): self.options_low_pass = options_low_pass self.freq_xlating_fir_filter_xxx_0.set_taps(firdes.low_pass(1, self.samp_rate, self.options_low_pass, self.options_low_pass*0.2)) def get_if_samp_rate(self): return self.if_samp_rate def set_if_samp_rate(self, if_samp_rate): self.if_samp_rate = if_samp_rate self.mmse_resampler_xx_0.set_resamp_ratio((float(float(self.if_samp_rate)/float(self.out_sample_rate)))) self.qtgui_freq_sink_x_0_0.set_frequency_range(0, self.if_samp_rate) def get_first_port(self): return self.first_port def set_first_port(self, first_port): self.first_port = first_port def main(top_block_cls=telive_1ch_simple_gr310_udp, options=None): if StrictVersion("4.5.0") <= StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): style = gr.prefs().get_string('qtgui', 'style', 'raster') Qt.QApplication.setGraphicsSystem(style) qapp = Qt.QApplication(sys.argv) tb = top_block_cls() tb.start() tb.show() def sig_handler(sig=None, frame=None): tb.stop() tb.wait() Qt.QApplication.quit() signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) timer = Qt.QTimer() timer.start(500) timer.timeout.connect(lambda: None) qapp.exec_() if __name__ == '__main__': main()