module OSMUX_Emulation { /* Functionalities that we want this module to implement: * * act as a Osmux source that generates a Osmux Stream * * act as a Osmux sink that consumes a Osmux Stream * * for all of the above, we want to be able to * * specify the payload type * * specify the interval / sample rate * * create drop-outs in the stream * * detect reordered or lost frames * * validate if the size of the frames matches epectations * * play back real audio (at least some tones?) * * enable/disable generation/verification of RTCP */ /* Ideas: * each component consists of transmitter and receiver * transmitters and receivers can be operated as tuple? * high-level operation ** set-up config at transmitter + receiver ** transmit sequence of payloads ** verify receiption of those payloads * can operate full-duplex/bi-directional as needed * transmitter ** trigger transmission of n number of packets ** transmit them at normal ptime interval ** payload size configurable ** payload contents PRBS or the like * receiver ** count number of related packets at receiver ** check received payload type ** check received timestamp increments ** check received seq_nr increments ** (optionally) check for SSRC ** (optionally) check for payload size ** (optionally) check for payload contents * later ** how to test transcoding? ** how to test pure play-out endpoints (rx only)? ** how to test "Rx from wrong IP/port" scenarios? ** how to test RTCP? ** maybe keep ports un-connected to show wrong src -lrt */ import from General_Types all; import from Osmocom_Types all; import from IPL4asp_Types all; import from Misc_Helpers all; import from AMR_Types all; import from OSMUX_Types all; import from OSMUX_CodecPort all; import from OSMUX_CodecPort_CtrlFunct all; type component OSMUX_Emulation_CT { /* down-facing ports for Osmux on top of IPL4asp */ port OSMUX_CODEC_PT OSMUX; var integer g_osmux_conn_id := -1; /* user-facing port for controlling the binding */ port OsmuxEM_CTRL_PT CTRL; /* user-facing port for sniffing Osmux frames */ port OsmuxEM_DATA_PT DATA; /* configurable by user, should be fixed */ var OsmuxemConfig g_cfg := c_OsmuxemDefaultCfg; /* statistics */ var OsmuxemStats g_stat := c_OsmuxemStatsReset; var HostName g_remote_host; var PortNumber g_remote_port; var HostName g_local_host; var PortNumber g_local_port; /* state variables, change over time */ var boolean g_rx_enabled := false; var boolean g_tx_connected := false; /* Set to true after connect() */ var INT7b g_rx_payload_type := 0; var LIN2_BO_LAST g_rx_last_seq; var uint32_t g_rx_last_ts; var RxHandleTableRec RxHandleTable[16]; var OsmuxTxHandle TxHandleList[16]; } type record RxHandleTableRec { OsmuxCID cid, OsmuxRxHandle vc_conn }; type record OsmuxRxHandle { OsmuxCID cid, boolean first_seq_seen, INT1 last_seq_ack }; const OsmuxRxHandle c_OsmuxemDefaultRxHandle := { cid := 0, first_seq_seen := false, last_seq_ack := 0 } type record OsmuxTxHandle { INT2b ft, BIT1 amr_f, BIT1 amr_q, INT1 seq, OsmuxCID cid, INT4b amr_ft, INT4b amr_cmr }; template OsmuxTxHandle t_TxHandleAMR590(OsmuxCID cid) := { ft := 1,//enum2int(OsmuxFT:OSMUX_FT_AMR) amr_f := '0'B, /* this frame is the last frame in this payload */ amr_q := '1'B, /* frame not damaged */ seq := 12, cid := cid, amr_ft := 2, /* AMR 5.90 */ amr_cmr := 0 }; type enumerated OsmuxemMode { OSMUXEM_MODE_NONE, OSMUXEM_MODE_TXONLY, OSMUXEM_MODE_RXONLY, OSMUXEM_MODE_BIDIR }; type record OsmuxemStats { /* number of packets transmitted */ integer num_pkts_tx, /* number of Osmux payload bytes transmitted */ integer bytes_payload_tx, /* number of packets received */ integer num_pkts_rx, /* number of Osmux payload bytes received */ integer bytes_payload_rx, /* number of packets received out-of-sequence */ integer num_pkts_rx_err_seq, /* number of packets received during Rx disable */ integer num_pkts_rx_err_disabled, /* number of packets received with mismatching payload */ integer num_pkts_rx_err_payload } const OsmuxemStats c_OsmuxemStatsReset := { num_pkts_tx := 0, bytes_payload_tx := 0, num_pkts_rx := 0, bytes_payload_rx := 0, num_pkts_rx_err_seq := 0, num_pkts_rx_err_disabled := 0, num_pkts_rx_err_payload := 0 } type record OsmuxemConfig { INT3b batch_size, integer tx_duration_ms, octetstring tx_fixed_payload optional, octetstring rx_fixed_payload optional }; const OsmuxemConfig c_OsmuxemDefaultCfg := { batch_size := 4, tx_duration_ms := 20 * 4, /* 4 is batch_size */ tx_fixed_payload := '010203040102030401020304010203040102030401020304'O, rx_fixed_payload := '010203040102030401020304010203040102030401020304'O } signature OsmuxEM_bind(in HostName local_host, inout PortNumber local_port); signature OsmuxEM_connect(in HostName remote_host, in PortNumber remote_port); signature OsmuxEM_mode(in OsmuxemMode mode); signature OsmuxEM_configure(in OsmuxemConfig cfg); signature OsmuxEM_stats_get(out OsmuxemStats stats); signature OsmuxEM_register_rxhandle(in OsmuxRxHandle hdl); signature OsmuxEM_register_txhandle(in OsmuxTxHandle hdl); type port OsmuxEM_CTRL_PT procedure { inout OsmuxEM_bind, OsmuxEM_connect, OsmuxEM_mode, OsmuxEM_configure, OsmuxEM_stats_get, OsmuxEM_register_rxhandle, OsmuxEM_register_txhandle; } with { extension "internal" }; type port OsmuxEM_DATA_PT message { inout OSMUX_PDU; } with { extension "internal" }; function f_osmuxem_bind(OsmuxEM_CTRL_PT pt, in HostName local_host, inout PortNumber local_port) { pt.call(OsmuxEM_bind:{local_host, local_port}) { [] pt.getreply(OsmuxEM_bind:{local_host, ?}) -> param (local_port) {}; } } function f_osmuxem_connect(OsmuxEM_CTRL_PT pt, in HostName remote_host, in PortNumber remote_port) { pt.call(OsmuxEM_connect:{remote_host, remote_port}) { [] pt.getreply(OsmuxEM_connect:{remote_host, remote_port}) {}; } } function f_osmuxem_mode(OsmuxEM_CTRL_PT pt, in OsmuxemMode mode) { pt.call(OsmuxEM_mode:{mode}) { [] pt.getreply(OsmuxEM_mode:{mode}) {}; } } function f_osmuxem_configure(OsmuxEM_CTRL_PT pt, in OsmuxemConfig cfg) { pt.call(OsmuxEM_configure:{cfg}) { [] pt.getreply(OsmuxEM_configure:{cfg}) {}; } } function f_osmuxem_stats_get(OsmuxEM_CTRL_PT pt) return OsmuxemStats { var OsmuxemStats stats; pt.call(OsmuxEM_stats_get:{-}) { [] pt.getreply(OsmuxEM_stats_get:{?}) -> param(stats) {}; } return stats; } function f_osmuxem_register_rxhandle(OsmuxEM_CTRL_PT pt, OsmuxRxHandle hdl) { pt.call(OsmuxEM_register_rxhandle:{hdl}) { [] pt.getreply(OsmuxEM_register_rxhandle:{hdl}) {}; } } function f_osmuxem_register_txhandle(OsmuxEM_CTRL_PT pt, OsmuxTxHandle hdl) { pt.call(OsmuxEM_register_txhandle:{hdl}) { [] pt.getreply(OsmuxEM_register_txhandle:{hdl}) {}; } } function f_osmuxem_stats_compare_value(integer a, integer b, integer tolerance := 0) return boolean { var integer temp; temp := (a - b) if (temp < 0) { temp := -temp; } if (temp > tolerance) { return false; } return true; } /* Cross-compare two osmuxem-statistics. The transmission statistics on the a side * must match the reception statistics on the other side and vice versa. The * user may also supply a tolerance value (number of packets) when deviations * are acceptable */ function f_osmuxem_stats_compare(OsmuxemStats a, OsmuxemStats b, integer tolerance := 0) return boolean { var integer plen; log("stats A: ", a); log("stats B: ", b); log("tolerance: ", tolerance, " packets"); if (f_osmuxem_stats_compare_value(a.num_pkts_tx, b.num_pkts_rx, tolerance) == false) { return false; } if (f_osmuxem_stats_compare_value(a.num_pkts_rx, b.num_pkts_tx, tolerance) == false) { return false; } if(a.num_pkts_tx > 0) { plen := a.bytes_payload_tx / a.num_pkts_tx; } else { plen := 0; } if (f_osmuxem_stats_compare_value(a.bytes_payload_tx, b.bytes_payload_rx, tolerance * plen) == false) { return false; } if (f_osmuxem_stats_compare_value(a.bytes_payload_rx, b.bytes_payload_tx, tolerance * plen) == false) { return false; } return true; } /* Check the statistics for general signs of errors. This is a basic general * check that will fit most situations and is intended to be executed by * the testcases as as needed. */ function f_osmuxem_stats_err_check(OsmuxemStats s) { log("stats: ", s); /* Check if there was some activity at either on the RX or on the * TX side, but complete silence would indicate some problem */ if (s.num_pkts_tx < 1 and s.num_pkts_rx < 1) { Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "no Osmux packet activity detected (packets)"); } if (s.bytes_payload_tx < 1 and s.bytes_payload_rx < 1) { Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "no Osmux packet activity detected (bytes)"); } /* Check error counters */ if (s.num_pkts_rx_err_seq != 0) { Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "Osmux packet sequence number errors occurred"); } if (s.num_pkts_rx_err_disabled != 0) { Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "Osmux packets received while RX was disabled"); } if (s.num_pkts_rx_err_payload != 0) { Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "Osmux packets with mismatching payload received"); } } template PDU_Osmux_AMR ts_OsmuxAMR(BIT1 marker, INT3b ctr, BIT1 amr_f, BIT1 amr_q, INT1 seq, OsmuxCID cid, INT4b amr_ft, INT4b amr_cmr, octetstring payload) := { header := { marker := marker, ft := 1, ctr := ctr, amr_f := amr_f, amr_q := amr_q, seq := seq, cid := cid, amr_ft := amr_ft, amr_cmr := amr_cmr }, data := payload } private function f_rxhandle_get_by_cid(OsmuxCID cid) runs on OSMUX_Emulation_CT return OsmuxRxHandle { var integer i; for (i := 0; i < sizeof(RxHandleTable); i := i+1) { if (isbound(RxHandleTable[i].cid) and RxHandleTable[i].cid == cid) { return RxHandleTable[i].vc_conn; } } Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, log2str("No Component for CID ", cid)); return RxHandleTable[0].vc_conn; /* make compiler happy, not reached */ } private function f_rxhandle_cid_add(OsmuxRxHandle hdl) runs on OSMUX_Emulation_CT { var integer i; for (i := 0; i < sizeof(RxHandleTable); i := i+1) { if (not isbound(RxHandleTable[i].cid)) { RxHandleTable[i].cid := hdl.cid; RxHandleTable[i].vc_conn := hdl; return; } } Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, log2str("No Space in RxHandleTable for ", hdl.cid)); } private function f_txhandle_cid_add(OsmuxTxHandle hdl) runs on OSMUX_Emulation_CT { var integer i; for (i := 0; i < sizeof(TxHandleList); i := i+1) { if (not isbound(TxHandleList[i])) { TxHandleList[i] := hdl; return; } } Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, log2str("No Space in TxHandleList for ", hdl.cid)); } /* Generate correctly-sized AMR payload based on amr_ft, using tx_fixed_payload as a fill pattern. */ function f_osmux_gen_expected_rx_rtp_payload(INT4b amr_ft, octetstring tx_fixed_payload) return octetstring { var integer payload_len_bits; var integer payload_len; var bitstring payload_truncated_bits; var octetstring payload_truncated; /* Generate the AMR payload as a bitstring, since some formats don't end up in octet boundary: */ payload_len_bits := f_amrft_payload_bits_len(amr_ft); payload_truncated_bits := substr(oct2bit(tx_fixed_payload), 0, payload_len_bits); /* Now convert it as an octet string filling final padding with zeroes: */ payload_len := f_amrft_payload_len(amr_ft); payload_truncated := bit2oct(f_pad_bit(payload_truncated_bits, payload_len*8, '0'B)); return payload_truncated; } private function f_osmux_gen_payload(INT3b ctr, INT4b amr_ft) runs on OSMUX_Emulation_CT return octetstring { var octetstring payload_truncated := ''O; var integer i; for (i := 0; i < ctr + 1; i := i+1) { payload_truncated := payload_truncated & f_osmux_gen_expected_rx_rtp_payload(amr_ft, g_cfg.tx_fixed_payload); } return payload_truncated; } private function f_tx_osmux(integer i, INT3b ctr, octetstring payload, BIT1 marker := '0'B) runs on OSMUX_Emulation_CT { var OsmuxTxHandle hdl := TxHandleList[i]; var PDU_Osmux_AMR osmux_amr := valueof(ts_OsmuxAMR(marker, ctr, hdl.amr_f, hdl.amr_q, hdl.seq, hdl.cid, hdl.amr_ft, hdl.amr_cmr, payload)); OSMUX.send(t_Osmux_Send(g_osmux_conn_id, OSMUX_PDU:{osmux_amr:=osmux_amr})); /* increment sequence + timestamp for next transmit */ TxHandleList[i].seq := TxHandleList[i].seq + 1; /* update counters */ g_stat.num_pkts_tx := g_stat.num_pkts_tx+1; g_stat.bytes_payload_tx := g_stat.bytes_payload_tx + lengthof(payload); } private function f_tx_osmux_all_cid(BIT1 marker := '0'B) runs on OSMUX_Emulation_CT { /* TODO: append all in one UDP packet and send together */ var integer i; var octetstring payload_truncated; var INT3b ctr := g_cfg.batch_size - 1; for (i := 0; i < sizeof(TxHandleList); i := i+1) { if (isbound(TxHandleList[i])) { payload_truncated := f_osmux_gen_payload(ctr, TxHandleList[i].amr_ft); f_tx_osmux(i, ctr, payload_truncated, marker); } } } function f_main() runs on OSMUX_Emulation_CT { var Result res; var OsmuxRxHandle rx_hdl; var OsmuxTxHandle tx_hdl; var octetstring payload_truncated; var PortEvent port_event; timer T_transmit := int2float(g_cfg.tx_duration_ms)/1000.0; var Osmux_RecvFrom rx_osmux; var PDU_Osmux_AMR rx_amr; var PDU_Osmux_DUMMY osmux_dummy; var OsmuxemConfig cfg; var template Osmux_RecvFrom tr_osmux_amr := { connId := ?, remName := ?, remPort := ?, locName := ?, locPort := ?, msg := ? }; tr_osmux_amr.msg := { osmux_amr := ? }; var template Osmux_RecvFrom tr_osmux_dummy := { connId := ?, remName := ?, remPort := ?, locName := ?, locPort := ?, msg := ? }; tr_osmux_dummy.msg := { osmux_dummy := ? }; while (true) { alt { /* control procedures (calls) from the user */ [] CTRL.getcall(OsmuxEM_bind:{?,?}) -> param(g_local_host, g_local_port) { g_tx_connected := false; /* will set it back to true upon next connect() call */ if (g_osmux_conn_id != -1) { res := OSMUX_CodecPort_CtrlFunct.f_IPL4_close(OSMUX, g_osmux_conn_id, {udp := {}}); g_osmux_conn_id := -1; } res := OSMUX_CodecPort_CtrlFunct.f_IPL4_listen(OSMUX, g_local_host, g_local_port, {udp:={}}); if (not ispresent(res.connId)) { Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "Could not listen on Osmux socket, check your configuration"); } g_osmux_conn_id := res.connId; tr_osmux_amr.connId := g_osmux_conn_id; tr_osmux_dummy.connId := g_osmux_conn_id; CTRL.reply(OsmuxEM_bind:{g_local_host, g_local_port}); } [] CTRL.getcall(OsmuxEM_connect:{?,?}) -> param (g_remote_host, g_remote_port) { res := OSMUX_CodecPort_CtrlFunct.f_IPL4_connect(OSMUX, g_remote_host, g_remote_port, g_local_host, g_local_port, g_osmux_conn_id, {udp:={}}); if (not ispresent(res.connId)) { Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "Could not connect to Osmux socket, check your configuration"); } g_tx_connected := true; CTRL.reply(OsmuxEM_connect:{g_remote_host, g_remote_port}); } [] CTRL.getcall(OsmuxEM_mode:{OSMUXEM_MODE_NONE}) { T_transmit.stop; g_rx_enabled := false; CTRL.reply(OsmuxEM_mode:{OSMUXEM_MODE_NONE}); } [] CTRL.getcall(OsmuxEM_mode:{OSMUXEM_MODE_TXONLY}) { /* start transmit timer */ T_transmit.start; g_rx_enabled := false; CTRL.reply(OsmuxEM_mode:{OSMUXEM_MODE_TXONLY}); } [] CTRL.getcall(OsmuxEM_mode:{OSMUXEM_MODE_RXONLY}) { T_transmit.stop; if (g_rx_enabled == false) { /* flush queues */ OSMUX.clear; g_rx_enabled := true; } CTRL.reply(OsmuxEM_mode:{OSMUXEM_MODE_RXONLY}); } [] CTRL.getcall(OsmuxEM_mode:{OSMUXEM_MODE_BIDIR}) { T_transmit.start; if (g_rx_enabled == false) { /* flush queues */ OSMUX.clear; g_rx_enabled := true; } CTRL.reply(OsmuxEM_mode:{OSMUXEM_MODE_BIDIR}); } [] CTRL.getcall(OsmuxEM_configure:{?}) -> param (cfg) { g_cfg := cfg; CTRL.reply(OsmuxEM_configure:{cfg}); } [] CTRL.getcall(OsmuxEM_register_txhandle:{?}) -> param (tx_hdl) { f_txhandle_cid_add(tx_hdl); CTRL.reply(OsmuxEM_register_txhandle:{tx_hdl}); } [] CTRL.getcall(OsmuxEM_register_rxhandle:{?}) -> param (rx_hdl) { f_rxhandle_cid_add(rx_hdl); CTRL.reply(OsmuxEM_register_rxhandle:{rx_hdl}); } [] CTRL.getcall(OsmuxEM_stats_get:{?}) { CTRL.reply(OsmuxEM_stats_get:{g_stat}); } /* simply ignore any Osmux AMR if receiver not enabled */ [g_rx_enabled==false] OSMUX.receive(tr_osmux_amr) { g_stat.num_pkts_rx_err_disabled := g_stat.num_pkts_rx_err_disabled+1; } /* simply ignore any Osmux Dummy if receiver not enabled */ [g_rx_enabled==false] OSMUX.receive(tr_osmux_dummy) -> value rx_osmux { log("Osmux Dummy received on CID ", rx_osmux.msg.osmux_dummy.header.cid, " (rx_disabled)"); } /* process received Osmux AMR if receiver enabled */ [g_rx_enabled] OSMUX.receive(tr_osmux_amr) -> value rx_osmux { /* increment counters */ g_stat.num_pkts_rx := g_stat.num_pkts_rx+1; g_stat.bytes_payload_rx := g_stat.bytes_payload_rx + lengthof(rx_osmux.msg.osmux_amr.data); rx_hdl := f_rxhandle_get_by_cid(rx_osmux.msg.osmux_amr.header.cid); if (rx_hdl.first_seq_seen and rx_hdl.last_seq_ack != rx_osmux.msg.osmux_amr.header.seq - 1 ) { g_stat.num_pkts_rx_err_seq := g_stat.num_pkts_rx_err_seq + 1; } rx_hdl.first_seq_seen := true; rx_hdl.last_seq_ack := rx_osmux.msg.osmux_amr.header.seq; payload_truncated := f_osmux_gen_payload(rx_osmux.msg.osmux_amr.header.ctr, rx_osmux.msg.osmux_amr.header.amr_ft); if (ispresent(g_cfg.rx_fixed_payload) and rx_osmux.msg.osmux_amr.data != payload_truncated) { g_stat.num_pkts_rx_err_payload := g_stat.num_pkts_rx_err_payload + 1; } if (DATA.checkstate("Connected")) { DATA.send(rx_osmux.msg); } } /* process received Osmux Dummy if receiver enabled */ [g_rx_enabled] OSMUX.receive(tr_osmux_dummy) -> value rx_osmux { log("Osmux Dummy received on CID", rx_osmux.msg.osmux_dummy.header.cid); rx_hdl := f_rxhandle_get_by_cid(rx_osmux.msg.osmux_dummy.header.cid); } /* transmit if timer has expired */ [g_tx_connected] T_transmit.timeout { /* send one Osmux frame, re-start timer */ f_tx_osmux_all_cid(); T_transmit.start; } [] OSMUX.receive(PortEvent:?) -> value port_event { Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, log2str("Received unexpected port event from Osmux:", port_event)); } /* fail on any unexpected messages */ [] OSMUX.receive { Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "Received unexpected msg type from Osmux"); } } } } }