%% Copyright (C) 2024-2025 by sysmocom - s.f.m.c. GmbH %% Author: Vadim Yanitskiy %% %% All Rights Reserved %% %% SPDX-License-Identifier: AGPL-3.0-or-later %% %% This program is free software; you can redistribute it and/or modify %% it under the terms of the GNU Affero General Public License as %% published by the Free Software Foundation; either version 3 of the %% License, or (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU Affero General Public License %% along with this program. If not, see . %% %% Additional Permission under GNU AGPL version 3 section 7: %% %% If you modify this Program, or any covered work, by linking or %% combining it with runtime libraries of Erlang/OTP as released by %% Ericsson on https://www.erlang.org (or a modified version of these %% libraries), containing parts covered by the terms of the Erlang Public %% License (https://www.erlang.org/EPLICENSE), the licensors of this %% Program grant you additional permission to convey the resulting work %% without the need to license the runtime libraries of Erlang/OTP under %% the GNU Affero General Public License. Corresponding Source for a %% non-source form of such a combination shall include the source code %% for the parts of the runtime libraries of Erlang/OTP used as well as %% that of the covered work. -module(enb_proxy). -behaviour(gen_statem). -behaviour(sctp_client). -export([init/1, callback_mode/0, wait_s1setup_req/3, wait_s1setup_rsp/3, connecting/3, connected/3, code_change/4, terminate/3]). -export([start_link/2, send_data/2, fetch_info/1, shutdown/1]). -include_lib("kernel/include/logger.hrl"). -include_lib("kernel/include/inet.hrl"). -include_lib("kernel/include/inet_sctp.hrl"). -include("s1gw_metrics.hrl"). -include("S1AP-Constants.hrl"). -type conn_info() :: #{state := atom(), handler := pid(), mme_addr := inet:ip_address(), mme_port := inet:port_number(), enb_aid := gen_sctp:assoc_id(), mme_aid => gen_sctp:assoc_id() }. -record(state, {enb_aid :: gen_sctp:assoc_id(), mme_aid :: undefined | gen_sctp:assoc_id(), enb_conn_info :: sctp_server:conn_info(), mme_conn_cfg :: sctp_client:cfg(), s1setup_req :: undefined | binary(), sock :: undefined | gen_sctp:sctp_socket(), enb_handle :: enb_registry:enb_handle(), genb_id_str :: undefined | string(), handler :: pid() %% s1ap_proxy process' pid }). -type state() :: #state{}. -export_type([conn_info/0]). %% ------------------------------------------------------------------ %% public API %% ------------------------------------------------------------------ -spec start_link(ConnInfo, Priv) -> Result when ConnInfo :: sctp_server:conn_info(), Priv :: sctp_client:cfg(), Result :: gen_statem:start_ret(). start_link(ConnInfo, Priv) -> gen_statem:start_link(?MODULE, [ConnInfo, Priv], []). -spec send_data(pid(), binary()) -> ok. send_data(Pid, Data) -> gen_statem:cast(Pid, {send_data, Data}). -spec fetch_info(pid()) -> conn_info(). fetch_info(Pid) -> gen_statem:call(Pid, ?FUNCTION_NAME). -spec shutdown(pid()) -> ok. shutdown(Pid) -> gen_statem:stop(Pid). %% ------------------------------------------------------------------ %% gen_statem API %% ------------------------------------------------------------------ init([EnbConnInfo, MmeConnCfg]) -> {ok, EnbHandle} = enb_registry:enb_register(), enb_registry:enb_event(EnbHandle, {connecting, EnbConnInfo}), {ok, Pid} = s1ap_proxy:start_link(self()), {ok, wait_s1setup_req, #state{enb_aid = maps:get(aid, EnbConnInfo), enb_conn_info = EnbConnInfo, mme_conn_cfg = MmeConnCfg, enb_handle = EnbHandle, handler = Pid}}. callback_mode() -> [state_functions, state_enter]. %% WAIT_S1SETUP_REQ state wait_s1setup_req(enter, _OldState, S) -> ?LOG_INFO("State enter: ~p", [?FUNCTION_NAME]), {next_state, ?FUNCTION_NAME, S, [{state_timeout, 5_000, s1setup_req_timeout}]}; %% Handle S1 SETUP REQUEST timeout wait_s1setup_req(state_timeout, s1setup_req_timeout, _S) -> ?LOG_ERROR("Timeout waiting for S1 SETUP REQUEST from eNB"), {stop, {shutdown, s1setup_req_timeout}}; %% Handle PDUs coming from the eNB wait_s1setup_req(cast, {send_data, Data}, S) -> ?LOG_DEBUG("Rx S1AP PDU from eNB: ~p", [Data]), case s1ap_utils:parse_pdu(Data) of {{?'id-S1Setup', initiatingMessage}, IEs} -> %% fetch the Global-eNB-ID IE, convert it to a string GENBId = proplists:get_value(?'id-Global-ENB-ID', IEs), GlobalENBId = s1ap_utils:genb_id_str(GENBId), %% use it as the logging prefix osmo_s1gw:set_log_prefix("eNB " ++ GlobalENBId), ?LOG_INFO("Rx S1 SETUP REQUEST from eNB"), %% signal the Global-eNB-ID to other modules s1ap_proxy:set_genb_id(S#state.handler, GlobalENBId), enb_registry:enb_event(S#state.enb_handle, {s1setup, GENBId}), gtpu_kpi_enb_register(S#state{genb_id_str = GlobalENBId}), {next_state, connecting, S#state{s1setup_req = Data, genb_id_str = GlobalENBId}}; {{Proc, Type}, IEs} -> ?LOG_ERROR("Rx unexpected S1AP PDU from eNB: ~p/~p, ~p", [Proc, Type, IEs]), {stop, {shutdown, s1setup_error}}; {error, _Error} -> {stop, {shutdown, s1setup_error}} end; wait_s1setup_req(Event, EventData, S) -> handle_event(?FUNCTION_NAME, Event, EventData, S). %% CONNECTING state connecting(enter, OldState, #state{mme_conn_cfg = MmeConnCfg} = S) -> ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), %% Initiate connection establishment with the MME {ok, Sock} = sctp_client:connect(MmeConnCfg), %% loop transition to enable state_timeout {next_state, ?FUNCTION_NAME, S#state{sock = Sock}, [{state_timeout, 2_000, conn_est_timeout}]}; %% Handle connection establishment timeout connecting(state_timeout, conn_est_timeout, _S) -> {stop, {shutdown, conn_est_timeout}}; %% Handle PDUs coming from the eNB connecting(cast, {send_data, Data}, _S) -> ?LOG_ERROR("Rx unexpected S1AP PDU from eNB: ~p", [Data]), keep_state_and_data; %% Handle an #sctp_assoc_change event (connection state) connecting(info, {sctp, _Socket, MmeAddr, MmePort, {[], #sctp_assoc_change{state = ConnState, assoc_id = Aid}}}, S) -> case ConnState of comm_up -> ?LOG_NOTICE("MME connection (id=~p, ~p:~p) established", [Aid, MmeAddr, MmePort]), %% send the S1 SETUP REQUEST PDU to the MME sctp_send_from_enb(S#state.s1setup_req, S#state{mme_aid = Aid}), {next_state, wait_s1setup_rsp, S#state{mme_aid = Aid}}; _ -> ?LOG_NOTICE("MME connection establishment failed: ~p", [ConnState]), {stop, {shutdown, conn_est_fail}} end; connecting(Event, EventData, S) -> handle_event(?FUNCTION_NAME, Event, EventData, S). %% WAIT_S1SETUP_RSP state wait_s1setup_rsp(enter, OldState, S) -> ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), {next_state, ?FUNCTION_NAME, S, [{state_timeout, 5_000, s1setup_rsp_timeout}]}; %% Handle S1 SETUP RESPONSE timeout wait_s1setup_rsp(state_timeout, s1setup_rsp_timeout, _S) -> ?LOG_ERROR("Timeout waiting for S1 SETUP RESPONSE from MME"), {stop, {shutdown, s1setup_rsp_timeout}}; %% Handle PDUs coming from the eNB wait_s1setup_rsp(cast, {send_data, Data}, _S) -> ?LOG_ERROR("Rx unexpected S1AP PDU from eNB: ~p", [Data]), keep_state_and_data; %% Handle PDUs coming from the MME wait_s1setup_rsp(info, {sctp, _Socket, MmeAddr, MmePort, {[#sctp_sndrcvinfo{assoc_id = MmeAid, stream = SID, ssn = SSN, tsn = TSN}], Data}}, #state{mme_aid = MmeAid} = S) -> ?LOG_DEBUG("MME connection (id=~p, ~p:~p) -> eNB: ~p", [MmeAid, MmeAddr, MmePort, #{tsn => TSN, sid => SID, ssn => SSN, len => byte_size(Data), data => Data}]), case s1ap_utils:parse_pdu(Data) of {{?'id-S1Setup', successfulOutcome}, _IEs} -> ?LOG_INFO("Rx S1 SETUP RESPONSE from MME"), sctp_send_from_mme(Data, S), {next_state, connected, S}; {{?'id-S1Setup', unsuccessfulOutcome}, _IEs} -> ?LOG_NOTICE("Rx S1 SETUP FAILURE from MME"), sctp_send_from_mme(Data, S), {stop, {shutdown, s1setup_error}}; {{Proc, Type}, IEs} -> ?LOG_ERROR("Rx unexpected S1AP PDU from MME: ~p/~p, ~p", [Proc, Type, IEs]), {stop, {shutdown, s1setup_error}}; {error, _Error} -> {stop, {shutdown, s1setup_error}} end; %% Handle an #sctp_assoc_change event (MME connection state) %% We may loose connection while waiting for the S1 SETUP RESPONSE wait_s1setup_rsp(info, {sctp, _Socket, MmeAddr, MmePort, {[], #sctp_assoc_change{state = ConnState, assoc_id = Aid}}}, S) -> case ConnState of comm_up -> ?LOG_NOTICE("MME connection (id=~p, ~p:~p) is already established?!?", [Aid, MmeAddr, MmePort]), {keep_state, S}; _ -> ?LOG_NOTICE("MME connection state: ~p", [ConnState]), {stop, {shutdown, conn_fail}} end; wait_s1setup_rsp(Event, EventData, S) -> handle_event(?FUNCTION_NAME, Event, EventData, S). %% CONNECTED state connected(enter, OldState, #state{enb_handle = Handle} = S) -> ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), MmeConnInfo = conn_info(?FUNCTION_NAME, S), ok = enb_registry:enb_event(Handle, {?FUNCTION_NAME, MmeConnInfo}), {keep_state, S}; %% Handle an eNB -> MME data forwarding request (forward) connected(cast, {send_data, Data}, S0) -> sctp_send_from_enb(Data, S0), {keep_state, S0}; %% Handle an #sctp_assoc_change event (MME connection state) connected(info, {sctp, _Socket, MmeAddr, MmePort, {[], #sctp_assoc_change{state = ConnState, assoc_id = Aid}}}, S) -> case ConnState of comm_up -> ?LOG_NOTICE("MME connection (id=~p, ~p:~p) is already established?!?", [Aid, MmeAddr, MmePort]), {keep_state, S}; _ -> ?LOG_NOTICE("MME connection state: ~p", [ConnState]), {stop, {shutdown, conn_fail}} end; %% Handle an #sctp_sndrcvinfo event (MME -> eNB data) connected(info, {sctp, _Socket, MmeAddr, MmePort, {[#sctp_sndrcvinfo{assoc_id = MmeAid, stream = SID, ssn = SSN, tsn = TSN}], Data}}, #state{mme_aid = MmeAid} = S) -> ?LOG_DEBUG("MME connection (id=~p, ~p:~p) -> eNB: ~p", [MmeAid, MmeAddr, MmePort, #{tsn => TSN, sid => SID, ssn => SSN, len => byte_size(Data), data => Data}]), sctp_send_from_mme(Data, S), {keep_state, S}; %% eNB's GTP-U address indication from s1ap_proxy connected(info, {s1ap_proxy, {enb_addr, Addr}}, S) -> gtpu_kpi_enb_set_addr({s1ap, Addr}), {keep_state, S}; connected(Event, EventData, S) -> handle_event(?FUNCTION_NAME, Event, EventData, S). %% Event handler for all states handle_event(State, {call, From}, fetch_info, S) -> Info = conn_info(State, S), {keep_state_and_data, {reply, From, Info}}; handle_event(State, {call, From}, EventData, _S) -> ?LOG_ERROR("Unexpected call in state ~p: ~p", [State, EventData]), {keep_state_and_data, {reply, From, {error, {unexpected_call, State}}}}; %% Catch-all for unhandled SCTP events handle_event(State, info, {sctp, _Socket, MmeAddr, MmePort, {AncData, Data}}, _S) -> ?LOG_DEBUG("Unhandled SCTP event in state ~p (~p:~p): ~p, ~p", [State, MmeAddr, MmePort, AncData, Data]), keep_state_and_data; handle_event(State, info, {sctp_error, _Socket, MmeAddr, MmePort, {_, Error}}, _S) -> ?LOG_ERROR("Rx SCTP error in state ~p (~p:~p): ~p", [State, MmeAddr, MmePort, Error]), sctp_common:report_error(Error), keep_state_and_data; handle_event(State, Event, EventData, _S) -> ?LOG_ERROR("Unexpected event ~p in state ~p: ~p", [Event, State, EventData]), keep_state_and_data. code_change(_Vsn, State, S, _Extra) -> {ok, State, S}. terminate(Reason, State, #state{handler = Pid, enb_handle = Handle, sock = Sock, mme_aid = MmeAid}) -> ?LOG_NOTICE("Terminating in state ~p, reason ~p", [State, Reason]), enb_registry:enb_unregister(Handle), s1ap_proxy:shutdown(Pid), case Sock of undefined -> ok; _ -> case MmeAid of undefined -> ok; _ -> sctp_common:shutdown({Sock, MmeAid}) end, gen_sctp:close(Sock) end. %% ------------------------------------------------------------------ %% private API %% ------------------------------------------------------------------ %% Send a single message: eNB -> MME -spec sctp_send_from_enb(binary(), state()) -> ok | {error, term()}. sctp_send_from_enb(Data, #state{sock = Sock, enb_aid = EnbAid, mme_aid = MmeAid, handler = Pid}) -> case s1ap_proxy:process_pdu(Pid, Data) of {forward, FwdData} -> sctp_common:send_data({Sock, MmeAid}, FwdData); {reply, ReData} -> sctp_server:send_data(EnbAid, ReData); {drop, Data} -> ok %% no-op end. %% Send a single message: eNB <- MME -spec sctp_send_from_mme(binary(), state()) -> ok | {error, term()}. sctp_send_from_mme(Data, #state{sock = Sock, enb_aid = EnbAid, mme_aid = MmeAid, handler = Pid}) -> case s1ap_proxy:process_pdu(Pid, Data) of {forward, FwdData} -> sctp_server:send_data(EnbAid, FwdData); {reply, ReData} -> sctp_common:send_data({Sock, MmeAid}, ReData); {drop, Data} -> ok %% no-op end. -spec conn_info(atom(), state()) -> conn_info(). conn_info(State, #state{mme_conn_cfg = MmeConnCfg} = S) -> Info = #{state => State, handler => S#state.handler, mme_addr => maps:get(raddr, MmeConnCfg), mme_port => maps:get(rport, MmeConnCfg), enb_aid => S#state.enb_aid, mme_aid => S#state.mme_aid}, maps:filter(fun(_K, V) -> V =/= undefined end, Info). -spec gtpu_kpi_enb_register(state()) -> ok. gtpu_kpi_enb_register(#state{genb_id_str = GlobalENBId, enb_conn_info = EnbConnInfo}) -> %% register eNB using its Global-eNB-ID ok = gtpu_kpi:enb_register(GlobalENBId), %% indicate UL/DL addresses EnbAddr = inet:ntoa(maps:get(addr, EnbConnInfo)), gtpu_kpi_enb_set_addr({sctp, EnbAddr}). -spec gtpu_kpi_enb_set_addr({Source, EnbAddr}) -> ok when Source :: s1ap | sctp, EnbAddr :: string(). gtpu_kpi_enb_set_addr({Source, EnbAddr}) -> gtpu_kpi_enb_set_addr(gtpu_kpi_ul_addr, {Source, ul, EnbAddr}), gtpu_kpi_enb_set_addr(gtpu_kpi_dl_addr, {Source, dl, EnbAddr}), ok. -spec gtpu_kpi_enb_set_addr(EnvParam, {Source, ULDL, EnbAddr}) -> ok when EnvParam :: atom(), Source :: s1ap | sctp, ULDL :: ul | dl, EnbAddr :: string(). gtpu_kpi_enb_set_addr(EnvParam, {Source, ULDL, EnbAddr}) -> case osmo_s1gw:get_env(EnvParam, s1ap) of Source -> ?LOG_DEBUG("GTP-U KPI ~p address ~p learned from ~p", [ULDL, EnbAddr, Source]), gtpu_kpi:enb_set_addr({ULDL, EnbAddr}); Mode -> ?LOG_DEBUG("GTP-U KPI ~p address mode ~p != ~p", [ULDL, Mode, Source]) end. %% vim:set ts=4 sw=4 et: