module RTP_Emulation { /* Functionalities that we want this module to imeplement: * * act as a RTP source that generates a RTP Stream * * act asaa RTP sink that consumes a RTP Stream * * for all of the above, we want to be able to * * specify the payload type * * specify the interval / sample rate * * create drop-outs in the stream * * detect reordered or lost frames * * validate if the size of the frames matches epectations * * play back real audio (at least some tones?) * * enable/disable generation/verification of RTCP */ /* (C) 2017-2018 Harald Welte * (C) 2018-2019 sysmocom - s.f.m.c. GmbH * All rights reserved. * * Released under the terms of GNU General Public License, Version 2 or * (at your option) any later version. * * SPDX-License-Identifier: GPL-2.0-or-later */ /* Ideas: * each component consists of transmitter and receiver * transmitters and receivers can be operated as tuple? * high-level operation ** set-up config at transmitter + receiver ** transmit sequence of payloads ** verify receiption of those payloads * can operate full-duplex/bi-directional as needed * transmitter ** trigger transmission of n number of packets ** transmit them at normal ptime interval ** payload size configurable ** payload contents PRBS or the like * receiver ** count number of related packets at receiver ** check received payload type ** check received timestamp increments ** check received seq_nr increments ** (optionally) check for SSRC ** (optionally) check for payload size ** (optionally) check for payload contents * later ** how to test transcoding? ** how to test pure play-out endpoints (rx only)? ** how to test "Rx from wrong IP/port" scenarios? ** how to test RTCP? ** maybe keep ports un-connected to show wrong src -lrt */ import from General_Types all; import from Osmocom_Types all; import from IPL4asp_Types all; import from RTP_Types all; import from RTP_CodecPort all; import from RTP_CodecPort_CtrlFunct all; import from IuUP_Types all; import from IuUP_Emulation all; type component RTP_Emulation_CT { /* down-facing ports for RTP and RTCP codec ports on top of IPL4asp */ port RTP_CODEC_PT RTP; var integer g_rtp_conn_id := -1; port RTP_CODEC_PT RTCP; var integer g_rtcp_conn_id := -1; /* user-facing port for controlling the binding */ port RTPEM_CTRL_PT CTRL; /* user-facing port for sniffing RTP frames */ port RTPEM_DATA_PT DATA; /* configurable by user, should be fixed */ var RtpemConfig g_cfg := c_RtpemDefaultCfg; /* statistics */ var RtpemStats g_stats_rtp := c_RtpemStatsReset; var RtpemStats g_stats_rtcp := c_RtpemStatsReset; var HostName g_remote_host; var PortNumber g_remote_port; var HostName g_local_host; var PortNumber g_local_port; /* state variables, change over time */ var boolean g_loopback := false; var boolean g_rx_enabled := false; var boolean g_tx_connected := false; /* Set to true after connect() */ var LIN2_BO_LAST g_tx_next_seq := 0; var uint32_t g_tx_next_ts := 0; var INT7b g_rx_payload_type := 0; var LIN2_BO_LAST g_rx_last_seq; var uint32_t g_rx_last_ts; var IuUP_Entity g_iuup_ent; // := valueof(t_IuUP_Entity(1)); var boolean g_conn_refuse_expect := false; var boolean g_conn_refuse_received := false; } type enumerated RtpemMode { RTPEM_MODE_NONE, RTPEM_MODE_TXONLY, RTPEM_MODE_RXONLY, RTPEM_MODE_BIDIR, RTPEM_MODE_LOOPBACK }; type record RtpemStats { /* number of packets transmitted */ integer num_pkts_tx, /* number of RTP payload bytes transmitted */ integer bytes_payload_tx, /* number of packets received */ integer num_pkts_rx, /* number of RTP payload bytes received */ integer bytes_payload_rx, /* number of packets received out-of-sequence */ integer num_pkts_rx_err_seq, /* number of packets received wrong timestamp */ integer num_pkts_rx_err_ts, /* number of packets received wrong payload type */ integer num_pkts_rx_err_pt, /* number of packets received during Rx disable */ integer num_pkts_rx_err_disabled, /* number of packets received with mismatching payload */ integer num_pkts_rx_err_payload } const RtpemStats c_RtpemStatsReset := { num_pkts_tx := 0, bytes_payload_tx := 0, num_pkts_rx := 0, bytes_payload_rx := 0, num_pkts_rx_err_seq := 0, num_pkts_rx_err_ts := 0, num_pkts_rx_err_pt := 0, num_pkts_rx_err_disabled := 0, num_pkts_rx_err_payload := 0 } type record RtpemConfigPayload { INT7b payload_type, octetstring fixed_payload optional }; type record RtpemConfig { integer tx_samplerate_hz, integer tx_duration_ms, BIT32_BO_LAST tx_ssrc, record of RtpemConfigPayload tx_payloads, record of RtpemConfigPayload rx_payloads, boolean iuup_mode, IuUP_Config iuup_cfg }; const RtpemConfig c_RtpemDefaultCfg := { tx_samplerate_hz := 8000, tx_duration_ms := 20, tx_ssrc := '11011110101011011011111011101111'B, tx_payloads := {{0, '01020304'O}}, rx_payloads := {{0, '01020304'O}}, iuup_mode := false, iuup_cfg := c_IuUP_Config_def } signature RTPEM_bind(in HostName local_host, inout PortNumber local_port); signature RTPEM_connect(in HostName remote_host, in PortNumber remote_port); signature RTPEM_mode(in RtpemMode mode); signature RTPEM_configure(in RtpemConfig cfg); signature RTPEM_stats_get(out RtpemStats stats, in boolean rtcp); signature RTPEM_conn_refuse_expect(in boolean expect); signature RTPEM_conn_refuse_received(out boolean received); type port RTPEM_CTRL_PT procedure { inout RTPEM_bind, RTPEM_connect, RTPEM_mode, RTPEM_configure, RTPEM_stats_get, RTPEM_conn_refuse_expect, RTPEM_conn_refuse_received; } with { extension "internal" }; type port RTPEM_DATA_PT message { inout PDU_RTP, PDU_RTCP; } with { extension "internal" }; function f_rtpem_bind(RTPEM_CTRL_PT pt, in HostName local_host, inout PortNumber local_port) { pt.call(RTPEM_bind:{local_host, local_port}) { [] pt.getreply(RTPEM_bind:{local_host, ?}) -> param (local_port) {}; } } function f_rtpem_connect(RTPEM_CTRL_PT pt, in HostName remote_host, in PortNumber remote_port) { pt.call(RTPEM_connect:{remote_host, remote_port}) { [] pt.getreply(RTPEM_connect:{remote_host, remote_port}) {}; } } function f_rtpem_mode(RTPEM_CTRL_PT pt, in RtpemMode mode) { pt.call(RTPEM_mode:{mode}) { [] pt.getreply(RTPEM_mode:{mode}) {}; } } function f_rtpem_configure(RTPEM_CTRL_PT pt, in RtpemConfig cfg) { pt.call(RTPEM_configure:{cfg}) { [] pt.getreply(RTPEM_configure:{cfg}) {}; } } function f_rtpem_stats_get(RTPEM_CTRL_PT pt, boolean rtcp := false) return RtpemStats { var RtpemStats stats; pt.call(RTPEM_stats_get:{-, rtcp}) { [] pt.getreply(RTPEM_stats_get:{?, rtcp}) -> param(stats) {}; } return stats; } function f_rtpem_stats_compare_value(integer a, integer b, integer tolerance := 0) return boolean { var integer temp; temp := (a - b) if (temp < 0) { temp := -temp; } if (temp > tolerance) { return false; } return true; } /* Cross-compare two rtpem-statistics. The transmission statistics on the a side * must match the reception statistics on the other side and vice versa. The * user may also supply a tolerance value (number of packets) when deviations * are acceptable */ function f_rtpem_stats_compare(RtpemStats a, RtpemStats b, integer tolerance := 0) return boolean { var integer plen; log("stats A: ", a); log("stats B: ", b); log("tolerance: ", tolerance, " packets"); if (f_rtpem_stats_compare_value(a.num_pkts_tx, b.num_pkts_rx, tolerance) == false) { return false; } if (f_rtpem_stats_compare_value(a.num_pkts_rx, b.num_pkts_tx, tolerance) == false) { return false; } if(a.num_pkts_tx > 0) { plen := a.bytes_payload_tx / a.num_pkts_tx; } else { plen := 0; } if (f_rtpem_stats_compare_value(a.bytes_payload_tx, b.bytes_payload_rx, tolerance * plen) == false) { return false; } if (f_rtpem_stats_compare_value(a.bytes_payload_rx, b.bytes_payload_tx, tolerance * plen) == false) { return false; } return true; } /* Check the statistics for general signs of errors. This is a basic general * check that will fit most situations and is intended to be executed by * the testcases as as needed. */ function f_rtpem_stats_err_check(RtpemStats s) { var boolean do_stop := false; log("stats: ", s); /* Check if there was some activity at either on the RX or on the * TX side, but complete silence would indicate some problem */ if (s.num_pkts_tx < 1 and s.num_pkts_rx < 1) { setverdict(fail, "no RTP packet activity detected (packets)"); do_stop := true; } if (s.bytes_payload_tx < 1 and s.bytes_payload_rx < 1) { setverdict(fail, "no RTP packet activity detected (bytes)"); do_stop := true; } /* Check error counters */ if (s.num_pkts_rx_err_seq != 0) { setverdict(fail, log2str(s.num_pkts_rx_err_seq, " RTP packet sequence number errors occurred")); do_stop := true; } if (s.num_pkts_rx_err_ts != 0) { setverdict(fail, log2str(s.num_pkts_rx_err_ts, " RTP packet timestamp errors occurred")); do_stop := true; } if (s.num_pkts_rx_err_pt != 0) { setverdict(fail, log2str(s.num_pkts_rx_err_pt, " RTP packet payload type errors occurred")); do_stop := true; } if (s.num_pkts_rx_err_disabled != 0) { setverdict(fail, log2str(s.num_pkts_rx_err_disabled, " RTP packets received while RX was disabled")); do_stop := true; } if (s.num_pkts_rx_err_payload != 0) { setverdict(fail, log2str(s.num_pkts_rx_err_payload, " RTP packets with mismatching payload received")); do_stop := true; } if (do_stop) { if (self == mtc) { /* Properly stop all ports before disconnecting them. This avoids * running into the dynamic testcase error due to messages arriving on * unconnected ports. */ all component.stop; } mtc.stop; } } function f_rtpem_conn_refuse_expect(RTPEM_CTRL_PT pt) { pt.call(RTPEM_conn_refuse_expect:{true}) { [] pt.getreply(RTPEM_conn_refuse_expect:{true}) {}; } } function f_rtpem_conn_refuse_verify(RTPEM_CTRL_PT pt) { pt.call(RTPEM_conn_refuse_received:{?}) { [] pt.getreply(RTPEM_conn_refuse_received:{true}) {}; [] pt.getreply(RTPEM_conn_refuse_received:{false}) { setverdict(fail, "Expected to receive connection refused"); }; } } template PDU_RTP ts_RTP(BIT32_BO_LAST ssrc, INT7b pt, LIN2_BO_LAST seq, uint32_t ts, octetstring payload, BIT1 marker := '0'B) := { version := 2, padding_ind := '0'B, extension_ind := '0'B, CSRC_count := 0, marker_bit := marker, payload_type := pt, sequence_number := seq, time_stamp := int2bit(ts, 32), SSRC_id := ssrc, CSRCs := omit, ext_header := omit, data := payload } private function f_tx_rtp(octetstring payload, INT7b rtp_payload_type, BIT1 marker := '0'B) runs on RTP_Emulation_CT { if (g_cfg.iuup_mode) { payload := f_IuUP_Em_tx_encap(g_iuup_ent, payload); if (lengthof(payload) == 0) { /* Nothing to transmit, waiting for INIT-ACK */ return; } } var PDU_RTP rtp := valueof(ts_RTP(g_cfg.tx_ssrc, rtp_payload_type, g_tx_next_seq, g_tx_next_ts, payload, marker)); RTP.send(t_RTP_Send(g_rtp_conn_id, RTP_messages_union:{rtp:=rtp})); /* increment sequence + timestamp for next transmit */ g_tx_next_seq := g_tx_next_seq + 1; g_tx_next_ts := g_tx_next_ts + (g_cfg.tx_samplerate_hz / (1000 / g_cfg.tx_duration_ms)); } private function f_check_fixed_rx_payloads(INT7b rtp_payload_type, octetstring rtp_data) runs on RTP_Emulation_CT { var boolean payload_type_match := false; /* The API user has the option to define zero or multiple sets of rx_payloads. Each rx_payload set contains the payload type number of the expected payload and an optional fixed_payload, which resembles the actual payload. In case zero rx_payloads are defined nothing is verified and no errors are counted. This is a corner case and should be avoided since it would not yield any good test coverage. During verification the payload type has the highest priority. It must match before the optional fixed payload is checked. Since the fixed_payload is optional multiple error situations may apply: | payload_type | fixed_payload | result | match | match | full match => no error counter is incremented | match | not present | counts as full match => no error counter is incremented | match | mismatch | payload type match => only num_pkts_rx_err_payload is incremented | | | unless something of the above is detected later. | mismatch | (not checked) | no match => num_pkts_rx_err_payload and num_pkts_rx_err_pt | | | are increment unless something of the above is | | | detected later. */ /* In case no rx payloads are defined any payload is accepted and no errors are counted. */ if (lengthof(g_cfg.rx_payloads) == 0) { return; } /* Evaluate rtp_data and rtp_payload_type */ for (var integer i := 0; i < lengthof(g_cfg.rx_payloads); i := i + 1) { if (rtp_payload_type == g_cfg.rx_payloads[i].payload_type) { if (not ispresent(g_cfg.rx_payloads[i].fixed_payload)) { /* full match */ return; } if (g_cfg.rx_payloads[i].fixed_payload == rtp_data) { /* counts as full match */ return; } /* At least the payload type number did match * (but we still may see a full match later) */ payload_type_match := true; } } g_stats_rtp.num_pkts_rx_err_payload := g_stats_rtp.num_pkts_rx_err_payload + 1; if (not payload_type_match) { g_stats_rtp.num_pkts_rx_err_pt := g_stats_rtp.num_pkts_rx_err_pt + 1; } } function f_main() runs on RTP_Emulation_CT { var Result res; var boolean is_rtcp timer T_transmit := int2float(g_cfg.tx_duration_ms)/1000.0; var RTP_RecvFrom rx_rtp; var RtpemConfig cfg; var template RTP_RecvFrom tr := { connId := ?, remName := ?, remPort := ?, locName := ?, locPort := ?, msg := ? }; var template RTP_RecvFrom tr_rtp := tr; var template RTP_RecvFrom tr_rtcp := tr; tr_rtp.msg := { rtp := ? }; tr_rtcp.msg := { rtcp := ? }; var template ASP_Event tr_conn_refuse := {result := { errorCode := ERROR_SOCKET, connId := ?, os_error_code := 111, os_error_text := ? /* "Connection refused" */}}; g_iuup_ent := valueof(t_IuUP_Entity(g_cfg.iuup_cfg)); while (true) { alt { /* control procedures (calls) from the user */ [] CTRL.getcall(RTPEM_bind:{?,?}) -> param(g_local_host, g_local_port) { if (g_local_port rem 2 == 1) { //CTRL.raise(RTPEM_bind, "Local Port is not an even port number!"); log("Local Port is not an even port number!"); continue; } g_tx_connected := false; /* will set it back to true upon next connect() call */ if (g_rtp_conn_id != -1) { res := RTP_CodecPort_CtrlFunct.f_IPL4_close(RTP, g_rtp_conn_id, {udp := {}}); g_rtp_conn_id := -1; } res := RTP_CodecPort_CtrlFunct.f_IPL4_listen(RTP, g_local_host, g_local_port, {udp:={}}); if (not ispresent(res.connId)) { setverdict(fail, "Could not listen on RTP socket, check your configuration"); mtc.stop; } g_rtp_conn_id := res.connId; tr_rtp.connId := g_rtp_conn_id; if (g_rtcp_conn_id != -1) { res := RTP_CodecPort_CtrlFunct.f_IPL4_close(RTCP, g_rtcp_conn_id, {udp := {}}); g_rtcp_conn_id := -1; } res := RTP_CodecPort_CtrlFunct.f_IPL4_listen(RTCP, g_local_host, g_local_port+1, {udp:={}}); if (not ispresent(res.connId)) { setverdict(fail, "Could not listen on RTCP socket, check your configuration"); mtc.stop; } g_rtcp_conn_id := res.connId; tr_rtcp.connId := g_rtcp_conn_id; CTRL.reply(RTPEM_bind:{g_local_host, g_local_port}); } [] CTRL.getcall(RTPEM_connect:{?,?}) -> param (g_remote_host, g_remote_port) { if (g_remote_port rem 2 == 1) { //CTRL.raise(RTPEM_connect, "Remote Port is not an even number!"); log("Remote Port is not an even number!"); continue; } res := RTP_CodecPort_CtrlFunct.f_IPL4_connect(RTP, g_remote_host, g_remote_port, g_local_host, g_local_port, g_rtp_conn_id, {udp:={}}); if (not ispresent(res.connId)) { setverdict(fail, "Could not connect to RTP socket, check your configuration"); mtc.stop; } res := RTP_CodecPort_CtrlFunct.f_IPL4_connect(RTCP, g_remote_host, g_remote_port+1, g_local_host, g_local_port+1, g_rtcp_conn_id, {udp:={}}); if (not ispresent(res.connId)) { setverdict(fail, "Could not connect to RTCP socket, check your configuration"); mtc.stop; } g_tx_connected := true; /* Send any pending IuUP CTRL message whichwas delayed due to not being connected: */ if (isvalue(g_iuup_ent.pending_tx_pdu)) { f_tx_rtp(''O, g_cfg.tx_payloads[0].payload_type); } CTRL.reply(RTPEM_connect:{g_remote_host, g_remote_port}); } [] CTRL.getcall(RTPEM_mode:{RTPEM_MODE_NONE}) { T_transmit.stop; g_rx_enabled := false; g_loopback := false; CTRL.reply(RTPEM_mode:{RTPEM_MODE_NONE}); } [] CTRL.getcall(RTPEM_mode:{RTPEM_MODE_TXONLY}) { /* start transmit timer */ T_transmit.start; g_rx_enabled := false; g_loopback := false; CTRL.reply(RTPEM_mode:{RTPEM_MODE_TXONLY}); } [] CTRL.getcall(RTPEM_mode:{RTPEM_MODE_RXONLY}) { T_transmit.stop; if (g_rx_enabled == false) { /* flush queues */ RTP.clear; RTCP.clear; g_rx_enabled := true; } g_loopback := false; CTRL.reply(RTPEM_mode:{RTPEM_MODE_RXONLY}); } [] CTRL.getcall(RTPEM_mode:{RTPEM_MODE_BIDIR}) { T_transmit.start; if (g_rx_enabled == false) { /* flush queues */ RTP.clear; RTCP.clear; g_rx_enabled := true; } g_loopback := false; CTRL.reply(RTPEM_mode:{RTPEM_MODE_BIDIR}); } [] CTRL.getcall(RTPEM_mode:{RTPEM_MODE_LOOPBACK}) { T_transmit.stop; if (g_rx_enabled == false) { /* flush queues */ RTP.clear; RTCP.clear; g_rx_enabled := true; } g_loopback := true; CTRL.reply(RTPEM_mode:{RTPEM_MODE_LOOPBACK}); } [] CTRL.getcall(RTPEM_configure:{?}) -> param (cfg) { g_cfg := cfg; g_iuup_ent.cfg := g_cfg.iuup_cfg; CTRL.reply(RTPEM_configure:{cfg}); } [] CTRL.getcall(RTPEM_stats_get:{?, ?}) -> param (is_rtcp) { if (is_rtcp) { CTRL.reply(RTPEM_stats_get:{g_stats_rtcp, is_rtcp}); } else { CTRL.reply(RTPEM_stats_get:{g_stats_rtp, is_rtcp}); } } [] CTRL.getcall(RTPEM_conn_refuse_expect:{?}) -> param(g_conn_refuse_expect) { CTRL.reply(RTPEM_conn_refuse_expect:{g_conn_refuse_expect}); } [] CTRL.getcall(RTPEM_conn_refuse_received:{?}) { CTRL.reply(RTPEM_conn_refuse_received:{g_conn_refuse_received}); } /* simply ignore any RTCP/RTP if receiver not enabled */ [not g_rx_enabled] RTP.receive(tr_rtp) -> value rx_rtp { /* In IuUP we need to decode possible Control packets, such as INIT-ACK: */ if (g_cfg.iuup_mode) { /* In IuUP we need to decode possible Control packets, such as INIT-ACK: */ rx_rtp.msg.rtp.data := f_IuUP_Em_rx_decaps(g_iuup_ent, rx_rtp.msg.rtp.data); if (lengthof(rx_rtp.msg.rtp.data) != 0) { /* Unexpected RTP payload (user data) arrived: */ g_stats_rtp.num_pkts_rx_err_disabled := g_stats_rtp.num_pkts_rx_err_disabled+1; } else if (g_tx_connected and isvalue(g_iuup_ent.pending_tx_pdu)) { /* IuUP Control packet was received and requires sending back something: */ f_tx_rtp(''O, g_cfg.tx_payloads[0].payload_type); } } else { g_stats_rtp.num_pkts_rx_err_disabled := g_stats_rtp.num_pkts_rx_err_disabled+1; } } [not g_rx_enabled] RTCP.receive(tr_rtcp) { g_stats_rtcp.num_pkts_rx_err_disabled := g_stats_rtcp.num_pkts_rx_err_disabled+1; } /* process received RTCP/RTP if receiver enabled */ [g_rx_enabled] RTP.receive(tr_rtp) -> value rx_rtp { /* increment counters */ g_stats_rtp.num_pkts_rx := g_stats_rtp.num_pkts_rx+1; g_stats_rtp.bytes_payload_rx := g_stats_rtp.bytes_payload_rx + lengthof(rx_rtp.msg.rtp.data); if (g_cfg.iuup_mode) { rx_rtp.msg.rtp.data := f_IuUP_Em_rx_decaps(g_iuup_ent, rx_rtp.msg.rtp.data); /* IuUP Control packet was received which may require sending back something: */ if (lengthof(rx_rtp.msg.rtp.data) == 0) { if (g_tx_connected and isvalue(g_iuup_ent.pending_tx_pdu)) { f_tx_rtp(''O, g_cfg.tx_payloads[0].payload_type); } repeat; } } if (g_loopback) { f_tx_rtp(rx_rtp.msg.rtp.data, rx_rtp.msg.rtp.payload_type); /* update counters */ g_stats_rtp.num_pkts_tx := g_stats_rtp.num_pkts_tx + 1; g_stats_rtp.bytes_payload_tx := g_stats_rtp.bytes_payload_tx + lengthof(rx_rtp.msg.rtp.data); } else { /* Match the received payload against any of the predefined fixed rx payloads */ f_check_fixed_rx_payloads(rx_rtp.msg.rtp.payload_type, rx_rtp.msg.rtp.data); } if (DATA.checkstate("Connected")) { DATA.send(rx_rtp.msg.rtp); } } [g_rx_enabled] RTCP.receive(tr_rtcp) -> value rx_rtp { //log("RX RTCP: ", rx_rtp); g_stats_rtcp.num_pkts_rx := g_stats_rtcp.num_pkts_rx+1; if (DATA.checkstate("Connected")) { DATA.send(rx_rtp.msg.rtcp); } } /* transmit if timer has expired */ [g_tx_connected] T_transmit.timeout { var octetstring payload := g_cfg.tx_payloads[g_tx_next_seq mod lengthof(g_cfg.tx_payloads)].fixed_payload; var INT7b rtp_payload_type := g_cfg.tx_payloads[g_tx_next_seq mod lengthof(g_cfg.tx_payloads)].payload_type; /* send one RTP frame, re-start timer */ f_tx_rtp(payload, rtp_payload_type); T_transmit.start; /* update counters */ g_stats_rtp.num_pkts_tx := g_stats_rtp.num_pkts_tx+1; g_stats_rtp.bytes_payload_tx := g_stats_rtp.bytes_payload_tx + lengthof(payload); } /* connection refused */ [g_conn_refuse_expect] RTP.receive(tr_conn_refuse) { log("Connection refused (expected)"); g_conn_refuse_received := true; } [not g_conn_refuse_expect] RTP.receive(tr_conn_refuse) { setverdict(fail, "Connection refused (unexpected)"); mtc.stop; } /* fail on any unexpected messages */ [] RTP.receive { setverdict(fail, "Received unexpected type from RTP"); mtc.stop; } [] RTCP.receive { setverdict(fail, "Received unexpected type from RTCP"); mtc.stop; } } } } }