# -*- coding: utf-8 -*- """ Osmocom GSMTAP python implementation. GSMTAP is a packet format used for conveying a number of different telecom-related protocol traces over UDP. """ # # Copyright (C) 2022 Harald Welte # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import socket from construct import Optional as COptional from construct import Int8ub, Int8sb, Int32ub, BitStruct, Enum, GreedyBytes, Struct, Switch from construct import this, PaddedString, Flag, BitsInteger, Bytes from osmocom.construct import * # The root definition of GSMTAP can be found at # https://cgit.osmocom.org/cgit/libosmocore/tree/include/osmocom/core/gsmtap.h GSMTAP_UDP_PORT = 4729 # GSMTAP_TYPE_* gsmtap_type_construct = Enum(Int8ub, gsm_um = 0x01, gsm_abis = 0x02, gsm_um_burst = 0x03, sim = 0x04, tetra_i1 = 0x05, tetra_i1_burst = 0x06, wimax_burst = 0x07, gprs_gb_llc = 0x08, gprs_gb_sndcp = 0x09, gmr1_um = 0x0a, umts_rlc_mac = 0x0b, umts_rrc = 0x0c, lte_rrc = 0x0d, lte_mac = 0x0e, lte_mac_framed = 0x0f, osmocore_log = 0x10, qc_diag = 0x11, lte_nas = 0x12, e1_t1 = 0x13) # TYPE_UM_BURST gsmtap_subtype_burst_construct = Enum(Int8ub, unknown = 0x00, fcch = 0x01, partial_sch = 0x02, sch = 0x03, cts_sch = 0x04, compact_sch = 0x05, normal = 0x06, dummy = 0x07, access = 0x08, none = 0x09) gsmtap_subtype_wimax_burst_construct = Enum(Int8ub, cdma_code = 0x10, fch = 0x11, ffb = 0x12, pdu = 0x13, hack = 0x14, phy_attributes = 0x15) # GSMTAP_CHANNEL_* gsmtap_subtype_um_construct = Enum(Int8ub, unknown = 0x00, bcch = 0x01, ccch = 0x02, rach = 0x03, agch = 0x04, pch = 0x05, sdcch = 0x06, sdcch4 = 0x07, sdcch8 = 0x08, facch_f = 0x09, facch_h = 0x0a, pacch = 0x0b, cbch52 = 0x0c, pdtch = 0x0d, ptcch = 0x0e, cbch51 = 0x0f, voice_f = 0x10, voice_h = 0x11) # GSMTAP_SIM_* gsmtap_subtype_sim_construct = Enum(Int8ub, apdu = 0x00, atr = 0x01, pps_req = 0x02, pps_rsp = 0x03, tpdu_hdr = 0x04, tpdu_cmd = 0x05, tpdu_rsp = 0x06, tpdu_sw = 0x07) gsmtap_subtype_tetra_construct = Enum(Int8ub, bsch = 0x01, aach = 0x02, sch_hu = 0x03, sch_hd = 0x04, sch_f = 0x05, bnch = 0x06, stch = 0x07, tch_f = 0x08, dmo_sch_s = 0x09, dmo_sch_h = 0x0a, dmo_sch_f = 0x0b, dmo_stch = 0x0c, dmo_tch = 0x0d) gsmtap_subtype_gmr1_construct = Enum(Int8ub, unknown = 0x00, bcch = 0x01, ccch = 0x02, pch = 0x03, agch = 0x04, bach = 0x05, rach = 0x06, cbch = 0x07, sdcch = 0x08, tachh = 0x09, gbch = 0x0a, tch3 = 0x10, tch6 = 0x14, tch9 = 0x18) gsmtap_subtype_e1t1_construct = Enum(Int8ub, lapd = 0x01, fr = 0x02, raw = 0x03, trau16 = 0x04, trau8 = 0x05) gsmtap_arfcn_construct = BitStruct('pcs'/Flag, 'uplink'/Flag, 'arfcn'/BitsInteger(14)) gsmtap_hdr_construct = Struct('version'/Int8ub, 'hdr_len'/Int8ub, 'type'/gsmtap_type_construct, 'timeslot'/Int8ub, 'arfcn'/gsmtap_arfcn_construct, 'signal_dbm'/Int8sb, 'snr_db'/Int8sb, 'frame_nr'/Int32ub, 'sub_type'/Switch(this.type, { 'gsm_um': gsmtap_subtype_um_construct, 'gsm_um_burst': gsmtap_subtype_burst_construct, 'sim': gsmtap_subtype_sim_construct, 'tetra_i1': gsmtap_subtype_tetra_construct, 'tetra_i1_burst': gsmtap_subtype_tetra_construct, 'wimax_burst': gsmtap_subtype_wimax_burst_construct, 'gmr1_um': gsmtap_subtype_gmr1_construct, 'e1_t1': gsmtap_subtype_e1t1_construct, }), 'antenna_nr'/Int8ub, 'sub_slot'/Int8ub, 'res'/Int8ub, 'body'/GreedyBytes) osmocore_log_ts_construct = Struct('sec'/Int32ub, 'usec'/Int32ub) osmocore_log_level_construct = Enum(Int8ub, debug=1, info=3, notice=5, error=7, fatal=8) gsmtap_osmocore_log_hdr_construct = Struct('ts'/osmocore_log_ts_construct, 'proc_name'/PaddedString(16, 'ascii'), 'pid'/Int32ub, 'level'/osmocore_log_level_construct, Bytes(3), 'subsys'/PaddedString(16, 'ascii'), 'src_file'/Struct('name'/PaddedString(32, 'ascii'), 'line_nr'/Int32ub)) class GsmtapMessage: """Class whose objects represent a single GSMTAP message. Can encode and decode messages.""" def __init__(self, encoded = None): self.encoded = encoded self.decoded = None def decode(self): self.decoded = parse_construct(gsmtap_hdr_construct, self.encoded) return self.decoded def encode(self, decoded): self.encoded = gsmtap_hdr_construct.build(decoded) return self.encoded class GsmtapReceiver: """Simple receive-side socket implementation for GSMTAP messages.""" def __init__(self, bind_ip:str='127.0.0.1', bind_port:int=4729): self.bind_ip = bind_ip self.bind_port = bind_port self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.bind((self.bind_ip, self.bind_port)) def read_packet(self) -> GsmtapMessage: """Perform a blocking read on the GSMTAP socket and decode it as GSMTAP message.""" data, addr = self.sock.recvfrom(65535) gsmtap_msg = GsmtapMessage(data) gsmtap_msg.decode() if gsmtap_msg.decoded['version'] != 0x02: raise ValueError('Unknown GSMTAP version 0x%02x' % gsmtap_msg.decoded['version']) return gsmtap_msg.decoded, addr