/* (C) 2019 by Harald Welte * All Rights Reserved * * The idea is that these tests are executed against sccp_demo_user from * libosmo-sigtran.git in server mode. * * Released under the terms of GNU General Public License, Version 2 or * (at your option) any later version. * * SPDX-License-Identifier: GPL-2.0-or-later */ module SCCP_Tests_RAW { import from General_Types all; import from Osmocom_Types all; import from M3UA_Emulation all; import from SCCP_Types all; import from SCCPasp_Types all; import from SCCP_Templates all; import from SCCP_Emulation all; import from SCCP_CodecPort all; import from TELNETasp_PortType all; import from Osmocom_VTY_Functions all; import from SCCP_Tests all; type component SCCP_Test_RAW_CT { /* VTY to sccp_demo_user (not used yet) */ port TELNETasp_PT SCCP_DEMO_USER_VTY; /* SCCP raw port runs on top of M3UA Emulation. * "System Under Test" is libosmo-sigtran's sccp_demo_user example program. */ var M3UA_CT vc_M3UA; port SCCP_CODEC_PT MTP3; var MSC_SCCP_MTP3_parameters g_param; var OCT3 g_own_lref := '000001'O /*Configure T(tias) over VTY, seconds */ var integer g_demo_sccp_timer_ias := 7 * 60; /*Configure T(tiar) over VTY, seconds */ var integer g_demo_sccp_timer_iar := 15 * 60; } type record of charstring Commands; private function f_cs7_inst_0_cfg(TELNETasp_PT pt, Commands cmds := {}) { f_vty_enter_cfg_cs7_inst(pt, 0); for (var integer i := 0; i < sizeof(cmds); i := i+1) { f_vty_transceive(pt, cmds[i]); } f_vty_transceive(pt, "end"); } function f_init_vty() runs on SCCP_Test_RAW_CT { if (SCCP_DEMO_USER_VTY.checkstate("Mapped")) { /* skip initialization if already executed once */ return; } map(self:SCCP_DEMO_USER_VTY, system:SCCP_DEMO_USER_VTY); f_vty_set_prompts(SCCP_DEMO_USER_VTY); f_vty_transceive(SCCP_DEMO_USER_VTY, "enable"); f_cs7_inst_0_cfg(SCCP_DEMO_USER_VTY, {"sccp-timer ias " & int2str(g_demo_sccp_timer_ias), "sccp-timer iar " & int2str(g_demo_sccp_timer_iar)}); } function f_vty_cmd_connect_req(integer conn_id) runs on SCCP_Test_RAW_CT { f_vty_transceive(SCCP_DEMO_USER_VTY, "sccp-user"); f_vty_transceive(SCCP_DEMO_USER_VTY, "connect-req " & int2str(conn_id)); f_vty_transceive(SCCP_DEMO_USER_VTY, "end"); } function f_vty_cmd_disconnect_req(integer conn_id) runs on SCCP_Test_RAW_CT { f_vty_transceive(SCCP_DEMO_USER_VTY, "sccp-user"); f_vty_transceive(SCCP_DEMO_USER_VTY, "disconnect-req " & int2str(conn_id)); f_vty_transceive(SCCP_DEMO_USER_VTY, "end"); } private function f_init_raw(SCCP_Configuration cfg) runs on SCCP_Test_RAW_CT { g_param := { sio := { ni := substr(oct2bit(cfg.sio),0,2), prio := substr(oct2bit(cfg.sio),2,2), si := substr(oct2bit(cfg.sio),4,4) }, opc := cfg.own_pc, dpc := cfg.peer_pc, sls := 0, sccp_serviceType := cfg.sccp_service_type, ssn := cfg.own_ssn }; f_init_vty(); /* Create and connect test components */ vc_M3UA := M3UA_CT.create; connect(self:MTP3, vc_M3UA:MTP3_SP_PORT); map(vc_M3UA:SCTP_PORT, system:sctp); vc_M3UA.start(f_M3UA_Emulation(cfg.sctp_addr)); } private function f_cleanup() runs on SCCP_Test_RAW_CT { all component.stop; unmap(vc_M3UA:SCTP_PORT, system:sctp); disconnect(vc_M3UA:MTP3_SP_PORT, self:MTP3); self.stop } private function f_send_sccp(template PDU_SCCP sccp) runs on SCCP_Test_RAW_CT { var SCCP_MTP3_TRANSFERreq tx := { sio := g_param.sio, opc := g_param.opc, dpc := g_param.dpc, sls := g_param.sls, data := valueof(sccp) }; MTP3.send(tx); } private function tr_SCCP_MTP3_TRANSFERind(template PDU_SCCP sccp) runs on SCCP_Test_RAW_CT return template SCCP_MTP3_TRANSFERind { var template SCCP_MTP3_TRANSFERind exp := { sio := g_param.sio, opc := g_param.dpc, dpc := g_param.opc, sls := g_param.sls, data := sccp }; return exp; } private function f_exp_sccp(template PDU_SCCP sccp) runs on SCCP_Test_RAW_CT return SCCP_MTP3_TRANSFERind { var template SCCP_MTP3_TRANSFERind exp := tr_SCCP_MTP3_TRANSFERind(sccp); var SCCP_MTP3_TRANSFERind rx; timer T := 10.0; T.start; alt { [] MTP3.receive(exp) -> value rx { return rx; } [] MTP3.receive { setverdict(fail, "Unexpected MTP/SCCP received"); self.stop } [] T.timeout { setverdict(fail, "Timeout waiting for ", exp); self.stop } } return rx; } private function f_establish_conn(SCCP_PAR_Address calling, SCCP_PAR_Address called) runs on SCCP_Test_RAW_CT return OCT3 { var SCCP_MTP3_TRANSFERind mtp3_rx; f_send_sccp(ts_SCCP_CR(g_own_lref, calling, called)); mtp3_rx := f_exp_sccp(tr_SCCP_CC(?, g_own_lref)); return mtp3_rx.data.connconfirm.sourceLocRef; } private function f_tx_udt_exp(SCCP_PAR_Address calling, SCCP_PAR_Address called, octetstring data) runs on SCCP_Test_RAW_CT { f_send_sccp(ts_SCCP_UDT(calling, called, data)); f_exp_sccp(tr_SCCP_UDT(called, calling, data)); } /* Verify sccp_demo_user answers a CR with a CC for PC and SSN set up to echo back */ testcase TC_cr_cc() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].peer_pc, mp_sccp_cfg[0].peer_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); calling := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].own_pc, mp_sccp_cfg[0].own_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); f_establish_conn(calling, called); setverdict(pass); } /* Verify sccp_demo_user inactivty timers are not armed upon dealing with * connectionless data-unit messages. Since no connection exists. */ testcase TC_udt_without_cr_cc() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; var SCCP_MTP3_TRANSFERind rx; var octetstring data := f_rnd_octstring_rnd_len(100); /* Keep recommended ratio of T(iar) >= T(ias)*2, but anyway no IT should be received in this case. */ g_demo_sccp_timer_ias := 1; g_demo_sccp_timer_iar := 3; f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].peer_pc, mp_sccp_cfg[0].peer_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); calling := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].own_pc, mp_sccp_cfg[0].own_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); f_tx_udt_exp(calling, called, data); /* Make sure no SCCP message is received at all, since no connection is active. */ timer T := int2float(g_demo_sccp_timer_iar + 1); T.start; alt { [] MTP3.receive { setverdict(fail, "Unexpected MTP/SCCP received"); self.stop; } [] T.timeout {} } setverdict(pass); } /* Verify T(iar) triggers and releases the channel */ testcase TC_tiar_timeout() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; var OCT3 remote_lref; var octetstring data := f_rnd_octstring_rnd_len(100); /* Set T(iar) in sccp_demo_user low enough that it will trigger before other side has time to keep alive with a T(ias). Keep recommended ratio of T(iar) >= T(ias)*2 */ g_demo_sccp_timer_ias := 2; g_demo_sccp_timer_iar := 5; f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].peer_pc, mp_sccp_cfg[0].peer_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); calling := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].own_pc, mp_sccp_cfg[0].own_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); remote_lref := f_establish_conn(calling, called); f_tx_udt_exp(calling, called, data); log("Waiting for first IT"); f_exp_sccp(tr_SCCP_IT(remote_lref, g_own_lref)); log("Waiting for second IT"); f_exp_sccp(tr_SCCP_IT(remote_lref, g_own_lref)); log("Waiting for RLSD"); f_exp_sccp(tr_SCCP_RLSD(remote_lref, g_own_lref, hex2int('0D'H))); /* Cause: Expiration of Rx Inactivity Timer */ f_send_sccp(ts_SCCP_RLC(g_own_lref, remote_lref)); /* Let some time for peer to receive the RLC */ f_sleep(1.0); setverdict(pass); } /* Verify T(iar) triggers and releases the channel */ testcase TC_it_avoids_tiar() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; var OCT3 remote_lref; var boolean it_received := false; var SCCP_MTP3_TRANSFERind rx; g_demo_sccp_timer_ias := 1; g_demo_sccp_timer_iar := 3; f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].peer_pc, mp_sccp_cfg[0].peer_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); calling := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].own_pc, mp_sccp_cfg[0].own_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); remote_lref := f_establish_conn(calling, called); timer T_total := 7.0; /* Higher than g_demo_sccp_timer_iar */ timer T_tias := 1.0; /* Lower than g_demo_sccp_timer_iar */ T_total.start; T_tias.start; alt { [] MTP3.receive(tr_SCCP_MTP3_TRANSFERind(tr_SCCP_IT(remote_lref, g_own_lref))) { it_received := true; repeat; } [] MTP3.receive(tr_SCCP_MTP3_TRANSFERind(tr_SCCP_RLSD(remote_lref, g_own_lref, hex2int('0D'H)))) { setverdict(fail, "Unexpected SCCP RLSD received"); self.stop; } [] MTP3.receive(tr_SCCP_MTP3_TRANSFERind(*)) -> value rx { setverdict(fail, "Unexpected MTP/SCCP TRANSFERind received: ", rx); self.stop; } [] MTP3.receive { setverdict(fail, "Unexpected MTP/SCCP received"); self.stop; } [] T_tias.timeout { f_send_sccp(ts_SCCP_IT(g_own_lref, remote_lref)); T_tias.start; repeat; } [] T_total.timeout { /* We kept the connection alive only with IT messages for a while, cool! */ T_tias.stop; setverdict(pass); } } if (not it_received) { setverdict(fail, "Didn't receive any IT (Tias) from peer"); } /* After we stop sending IT, we should be receiving an RLSD triggered from T(iar) */ log("Waiting for RLSD"); alt { [] MTP3.receive(tr_SCCP_MTP3_TRANSFERind(tr_SCCP_IT(remote_lref, g_own_lref))) { repeat; } [] MTP3.receive(tr_SCCP_MTP3_TRANSFERind(tr_SCCP_RLSD(remote_lref, g_own_lref, hex2int('0D'H)))) { f_send_sccp(ts_SCCP_RLC(g_own_lref, remote_lref)); setverdict(pass); } [] MTP3.receive { setverdict(fail, "Unexpected MTP/SCCP received"); self.stop; } } /* Let some time for peer to receive the RLC */ f_sleep(1.0); } private function f_tx_xudt_exp(SCCP_PAR_Address calling, SCCP_PAR_Address called, octetstring data) runs on SCCP_Test_RAW_CT { var template PDU_SCCP exp_rx; f_send_sccp(ts_SCCP_XUDT(calling, called, data)); exp_rx := (tr_SCCP_UDT(called, calling, data), tr_SCCP_XUDT(called, calling, data)); f_exp_sccp(exp_rx); } /* Test if the IUT SCCP code processes an XUDT [treat it like UDT] and answers back. */ testcase TC_process_rx_xudt() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; var octetstring data := f_rnd_octstring_rnd_len(100); f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].peer_pc, mp_sccp_cfg[0].peer_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); calling := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].own_pc, mp_sccp_cfg[0].own_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); /* Make sure an XUDT is echoed back just like an UDT */ f_tx_xudt_exp(calling, called, data); setverdict(pass); } private function f_tx_ludt_exp(SCCP_PAR_Address calling, SCCP_PAR_Address called, octetstring data) runs on SCCP_Test_RAW_CT { var template PDU_SCCP exp_rx; f_send_sccp(ts_SCCP_LUDT(calling, called, data)); exp_rx := tr_SCCP_LUDT(called, calling, data); f_exp_sccp(exp_rx); } /* Test if the IUT SCCP code processes a LUDT [treat it like UDT] and answers back. */ testcase TC_process_rx_ludt() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; var octetstring data := f_rnd_octstring(1000); f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].peer_pc, mp_sccp_cfg[0].peer_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); calling := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].own_pc, mp_sccp_cfg[0].own_ssn, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); /* Make sure an LUDT is echoed back just like an UDT */ f_tx_ludt_exp(calling, called, data); setverdict(pass); } function f_scmg_xceive(SCCP_PAR_Address calling, SCCP_PAR_Address called, template (value) PDU_SCMG_message tx, template (omit) PDU_SCMG_message rx_exp, boolean accept_other_called_resp := false) runs on SCCP_Test_RAW_CT { var boolean exp_something := true; var SCCP_MTP3_TRANSFERind rx; timer T := 5.0; if (istemplatekind(rx_exp, "omit")) { exp_something := false; } MTP3.clear; f_send_sccp(ts_SCCP_UDT(calling, called, enc_PDU_SCMG_message(valueof(tx)))); T.start; alt { [exp_something] MTP3.receive(tr_SCCP_MTP3_TRANSFERind(tr_SCCP_UDT(called, calling, decmatch rx_exp))) { setverdict(pass); } [exp_something and accept_other_called_resp] MTP3.receive(tr_SCCP_MTP3_TRANSFERind(tr_SCCP_UDT(called, ?, decmatch rx_exp))) { setverdict(pass); } [] MTP3.receive(tr_SCCP_MTP3_TRANSFERind(tr_SCCP_UDT(called, calling, ?))) -> value rx { setverdict(fail, "Received unexpected SCCP/MTP3 TRANSFERind (specific): ", rx, " but waiting for (specific) ", rx_exp); } [] MTP3.receive(tr_SCCP_MTP3_TRANSFERind(*)) -> value rx { setverdict(fail, "Received unexpected SCCP/MTP3 TRANSFERind (*): ", rx, " but waiting for ", rx_exp); } [] MTP3.receive { setverdict(fail, "Received unexpected waiting for ", rx_exp); } [exp_something] T.timeout { setverdict(fail, "Timeout waiting for ", rx_exp); } [not exp_something] T.timeout { setverdict(pass); } } } /* Test is SST(SSN=1) returns SSA */ testcase TC_scmg_sst_ssn1() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; var template (value) PDU_SCMG_message tx; var template (present) PDU_SCMG_message rx_exp; f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].peer_pc, 1, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); calling := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].own_pc, 1, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); tx := ts_SCMG_SST(1, mp_sccp_cfg[0].peer_pc); rx_exp := ts_SCMG_SSA(1, mp_sccp_cfg[0].peer_pc); f_scmg_xceive(calling, called, tx, rx_exp); } /* Test is SST(SSN=valid) returns SSA */ testcase TC_scmg_sst_ssn_valid() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; var template (value) PDU_SCMG_message tx; var template (present) PDU_SCMG_message rx_exp; f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].peer_pc, 1, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); calling := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].own_pc, 1, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); tx := ts_SCMG_SST(mp_sccp_cfg[0].peer_ssn, mp_sccp_cfg[0].peer_pc); rx_exp := ts_SCMG_SSA(mp_sccp_cfg[0].peer_ssn, mp_sccp_cfg[0].peer_pc); f_scmg_xceive(calling, called, tx, rx_exp); } /* Test is SST(SSN=invalid) returns nothing */ testcase TC_scmg_sst_ssn_invalid() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; var template (value) PDU_SCMG_message tx; var template (omit) PDU_SCMG_message rx_exp; f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].peer_pc, 1, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); calling := valueof(ts_SccpAddr_PC_SSN(mp_sccp_cfg[0].own_pc, 1, mp_sccp_cfg[0].sio, mp_sccp_cfg[0].sccp_service_type)); tx := ts_SCMG_SST(123, mp_sccp_cfg[0].peer_pc); rx_exp := omit; f_scmg_xceive(calling, called, tx, rx_exp); } /* Test CallingParty(only SSN) solicits response (OS#5146) */ testcase TC_callingparty_ssn_only() runs on SCCP_Test_RAW_CT { var SCCP_PAR_Address calling, called; var template (value) PDU_SCMG_message tx; var template (present) PDU_SCMG_message rx_exp; f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); called := valueof(ts_SccpAddr_SSN(1)); calling := valueof(ts_SccpAddr_SSN(1)); tx := ts_SCMG_SST(1, mp_sccp_cfg[0].peer_pc); rx_exp := ts_SCMG_SSA(1, mp_sccp_cfg[0].peer_pc); f_scmg_xceive(calling, called, tx, rx_exp, accept_other_called_resp:=true); } /* Test user initiating a conn (eg RUA), which triggers SCCP CR. While waiting * for CC, user decides it timed out (eg. RUA side timeout) which makes it move to * WAIT_CONN_CONF state. In that state, wait for CC and then submit an RLSD and * wait for ack. */ testcase TC_cr_timeout_cc_too_late() runs on SCCP_Test_RAW_CT { var SCCP_MTP3_TRANSFERind mtp3_rx; var integer conn_id := 1234; var OCT3 remote_lref; f_init_raw(mp_sccp_cfg[0]); f_sleep(1.0); f_vty_cmd_connect_req(conn_id); mtp3_rx := f_exp_sccp(tr_SCCP_CR(?, ?, ?)); /* Emulate disconnection from the user: */ f_vty_cmd_disconnect_req(conn_id); remote_lref := mtp3_rx.data.connrequest.sourceLocRef; /* Now send CC, user should send RLSD to us: */ f_send_sccp(ts_SCCP_CC(g_own_lref, remote_lref)); log("Waiting for RLSD"); /* Cause set hardcoded by osmo_sccp_demo_user VTY code, nothing to test here: */ var template (present) integer cause := ?; f_exp_sccp(tr_SCCP_RLSD(remote_lref, g_own_lref, cause)); f_send_sccp(ts_SCCP_RLC(g_own_lref, remote_lref)); /* Let some time for peer to receive the RLC */ f_sleep(1.0); setverdict(pass); } control { execute( TC_cr_cc() ); execute( TC_udt_without_cr_cc() ); execute( TC_tiar_timeout() ); execute( TC_it_avoids_tiar() ); execute( TC_process_rx_xudt() ); execute( TC_process_rx_ludt() ); execute( TC_scmg_sst_ssn1() ); execute( TC_scmg_sst_ssn_valid() ); execute( TC_scmg_sst_ssn_invalid() ); execute( TC_callingparty_ssn_only() ); execute( TC_cr_timeout_cc_too_late() ); } }