%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH %% Author: Vadim Yanitskiy %% %% All Rights Reserved %% %% SPDX-License-Identifier: AGPL-3.0-or-later %% %% This program is free software; you can redistribute it and/or modify %% it under the terms of the GNU Affero General Public License as %% published by the Free Software Foundation; either version 3 of the %% License, or (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU Affero General Public License %% along with this program. If not, see . %% %% Additional Permission under GNU AGPL version 3 section 7: %% %% If you modify this Program, or any covered work, by linking or %% combining it with runtime libraries of Erlang/OTP as released by %% Ericsson on https://www.erlang.org (or a modified version of these %% libraries), containing parts covered by the terms of the Erlang Public %% License (https://www.erlang.org/EPLICENSE), the licensors of this %% Program grant you additional permission to convey the resulting work %% without the need to license the runtime libraries of Erlang/OTP under %% the GNU Affero General Public License. Corresponding Source for a %% non-source form of such a combination shall include the source code %% for the parts of the runtime libraries of Erlang/OTP used as well as %% that of the covered work. -module(enb_registry). -behaviour(gen_server). -export([init/1, handle_info/2, handle_call/3, handle_cast/2, terminate/2]). -export([start_link/0, enb_register/0, enb_unregister/1, notify_enb_connected/2, notify_genb_id/2, notify_mme_connecting/1, notify_mme_wait_rsp/1, notify_mme_connected/2, fetch_enb_info/1, fetch_enb_list/0, fetch_enb_list/1, shutdown/0]). -include_lib("kernel/include/logger.hrl"). -include("s1gw_metrics.hrl"). %% Heartbeat interval -define(HEARTBEAT_INTERVAL, 5_000). -type enb_handle() :: non_neg_integer(). -type enb_prop() :: {state, atom()} | {enb_conn_info, sctp_server:conn_info()} | {mme_conn_info, undefined | enb_proxy:conn_info()} | {genb_id, s1ap_utils:genb_id()}. -type enb_proplist() :: [enb_prop()]. -type enb_filter() :: {genb_id_str, string()} | {enb_sctp_aid, gen_sctp:assoc_id()} | {mme_sctp_aid, gen_sctp:assoc_id()} | {enb_addr_port, sctp_server:addr_port()} | {mme_addr_port, sctp_server:addr_port()}. -type enb_info() :: #{handle := enb_handle(), %% unique identifier of this eNB pid := pid(), %% pid() of the registrant mon_ref := reference(), %% monitor() reference state := atom(), %% enb_proxy FSM state reg_time := integer(), %% registration time (monotonic) uptime := non_neg_integer(), %% seconds since reg_time genb_id => s1ap_utils:genb_id(), %% Global-eNB-ID genb_id_str => string(), %% Global-eNB-ID string enb_conn_info => sctp_server:conn_info(), %% eNB -> S1GW connection info mme_conn_info => enb_proxy:conn_info() %% S1GW -> MME connection info }. -record(state, {enbs :: #{enb_handle() => enb_info()}, pids :: #{pid() => enb_handle()}, next_handle :: enb_handle() }). -export_type([enb_handle/0, enb_info/0]). %% ------------------------------------------------------------------ %% public API %% ------------------------------------------------------------------ -spec start_link() -> {ok, pid()} | term(). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec enb_register() -> {ok, enb_handle()} | {error, term()}. enb_register() -> gen_server:call(?MODULE, ?FUNCTION_NAME). -spec enb_unregister(enb_handle()) -> ok | {error, term()}. enb_unregister(Handle) -> gen_server:call(?MODULE, {?FUNCTION_NAME, Handle}). %% eNB SCTP connection established; waiting for S1 SETUP REQUEST -spec notify_enb_connected(enb_handle(), sctp_server:conn_info()) -> ok. notify_enb_connected(Handle, ConnInfo) -> enb_update(Handle, [{state, wait_s1setup_req}, {enb_conn_info, ConnInfo}]). %% Global-eNB-ID is now known (from S1 SETUP REQUEST) -spec notify_genb_id(enb_handle(), s1ap_utils:genb_id()) -> ok. notify_genb_id(Handle, GENBId) -> enb_update(Handle, [{genb_id, GENBId}]). %% MME connection attempt in progress; clears any stale mme_conn_info -spec notify_mme_connecting(enb_handle()) -> ok. notify_mme_connecting(Handle) -> enb_update(Handle, [{state, connecting}, {mme_conn_info, undefined}]). %% MME SCTP link established; S1 SETUP REQUEST forwarded, awaiting response -spec notify_mme_wait_rsp(enb_handle()) -> ok. notify_mme_wait_rsp(Handle) -> enb_update(Handle, [{state, wait_s1setup_rsp}]). %% S1 SETUP complete; eNB is fully operational -spec notify_mme_connected(enb_handle(), enb_proxy:conn_info()) -> ok. notify_mme_connected(Handle, ConnInfo) -> enb_update(Handle, [{state, connected}, {mme_conn_info, ConnInfo}]). -spec fetch_enb_info(enb_handle() | pid()) -> {ok, enb_info()} | error. fetch_enb_info(Handle) when is_integer(Handle) -> gen_server:call(?MODULE, {?FUNCTION_NAME, {handle, Handle}}); fetch_enb_info(Pid) when is_pid(Pid) -> gen_server:call(?MODULE, {?FUNCTION_NAME, {pid, Pid}}). -spec fetch_enb_list() -> [enb_info()]. fetch_enb_list() -> gen_server:call(?MODULE, ?FUNCTION_NAME). -spec fetch_enb_list(enb_filter()) -> [enb_info()]. fetch_enb_list(Filter) -> gen_server:call(?MODULE, {?FUNCTION_NAME, Filter}). -spec shutdown() -> ok. shutdown() -> gen_server:stop(?MODULE). %% ------------------------------------------------------------------ %% gen_server API %% ------------------------------------------------------------------ init([]) -> spawn_link(fun() -> heartbeat(?HEARTBEAT_INTERVAL) end), {ok, #state{enbs = maps:new(), pids = maps:new(), next_handle = 0}}. handle_call(enb_register, {Pid, _Tag}, #state{enbs = ENBs, pids = PIDs} = S) -> case maps:find(Pid, PIDs) of {ok, Handle} -> ?LOG_ERROR("eNB (handle=~p, pid=~p) is *already* registered", [Handle, Pid]), {reply, {error, already_registered}, S}; error -> Handle = S#state.next_handle, %% keep an eye on the process being registered MonRef = erlang:monitor(process, Pid), %% create and store an initial eNB state EnbInfo = #{handle => Handle, pid => Pid, mon_ref => MonRef, state => unknown, reg_time => erlang:monotonic_time(), uptime => 0}, ?LOG_INFO("eNB (handle=~p, pid ~p) registered", [Handle, Pid]), {reply, {ok, Handle}, S#state{enbs = ENBs#{Handle => EnbInfo}, pids = PIDs#{Pid => Handle}, next_handle = Handle + 1}} end; handle_call({enb_unregister, Handle}, _From, #state{enbs = ENBs, pids = PIDs} = S) -> case maps:find(Handle, ENBs) of {ok, #{pid := Pid, mon_ref := MonRef}} -> erlang:demonitor(MonRef, [flush]), enb_metrics_reset(maps:get(Handle, ENBs)), ?LOG_INFO("eNB (handle=~p) unregistered", [Handle]), {reply, ok, S#state{enbs = maps:remove(Handle, ENBs), pids = maps:remove(Pid, PIDs)}}; error -> ?LOG_ERROR("eNB (handle=~p) is *not* registered", [Handle]), {reply, {error, not_registered}, S} end; handle_call({fetch_enb_info, {handle, Handle}}, _From, #state{enbs = ENBs} = S) -> Reply = maps:find(Handle, ENBs), {reply, Reply, S}; handle_call({fetch_enb_info, {pid, Pid}}, _From, #state{enbs = ENBs, pids = PIDs} = S) -> case maps:find(Pid, PIDs) of {ok, Handle} -> {reply, maps:find(Handle, ENBs), S}; error -> {reply, error, S} end; handle_call(fetch_enb_list, _From, #state{enbs = ENBs} = S) -> EnbList = lists:sort(fun enb_list_sort/2, maps:values(ENBs)), {reply, EnbList, S}; handle_call({fetch_enb_list, Filter}, _From, #state{enbs = ENBs} = S) -> Filtered = maps:filter(enb_filter(Filter), ENBs), EnbList = lists:sort(fun enb_list_sort/2, maps:values(Filtered)), {reply, EnbList, S}; handle_call(Info, From, S) -> ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]), {reply, {error, not_implemented}, S}. handle_cast({enb_update, Handle, PropList}, #state{enbs = ENBs} = S) -> case maps:find(Handle, ENBs) of {ok, EnbInfo0} -> ?LOG_DEBUG("eNB (handle=~p) update: ~p", [Handle, PropList]), EnbInfo1 = enb_update_proplist(EnbInfo0, PropList), {noreply, S#state{enbs = maps:update(Handle, EnbInfo1, ENBs)}}; error -> ?LOG_ERROR("eNB (handle=~p) is *not* registered", [Handle]), {noreply, S} end; handle_cast(heartbeat, #state{enbs = ENBs} = S) -> T = erlang:monotonic_time(), Fun = fun(Handle, EnbInfo) -> enb_report_uptime(Handle, EnbInfo, T) end, {noreply, S#state{enbs = maps:map(Fun, ENBs)}}; handle_cast(Info, S) -> ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), {noreply, S}. handle_info({'DOWN', _MonRef, process, Pid, Reason}, #state{enbs = ENBs, pids = PIDs} = S) -> ?LOG_INFO("eNB process (pid=~p) terminated with reason ~p", [Pid, Reason]), case maps:find(Pid, PIDs) of {ok, Handle} -> enb_metrics_reset(maps:get(Handle, ENBs)), ?LOG_INFO("eNB (handle=~p, pid=~p) has been unregistered", [Handle, Pid]), {noreply, S#state{enbs = maps:remove(Handle, ENBs), pids = maps:remove(Pid, PIDs)}}; error -> ?LOG_ERROR("eNB (pid=~p) is *not* registered", [Pid]), {noreply, S} end; handle_info(Info, S) -> ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), {noreply, S}. terminate(Reason, _S) -> ?LOG_NOTICE("Terminating, reason ~p", [Reason]), ok. %% ------------------------------------------------------------------ %% private API %% ------------------------------------------------------------------ -spec enb_update(enb_handle(), enb_proplist()) -> ok. enb_update(Handle, PropList) -> gen_server:cast(?MODULE, {?FUNCTION_NAME, Handle, PropList}). -spec enb_metrics_register(string()) -> term(). enb_metrics_register(GlobalENBId) -> catch exometer:new(?S1GW_CTR_ENB_UPTIME(GlobalENBId), counter). -spec enb_metrics_reset(enb_info()) -> term(). enb_metrics_reset(#{genb_id_str := GlobalENBId}) -> s1gw_metrics:ctr_reset(?S1GW_CTR_ENB_UPTIME(GlobalENBId)); enb_metrics_reset(_) -> ok. -spec enb_update_proplist(enb_info(), enb_proplist()) -> enb_info(). enb_update_proplist(EnbInfo, PropList) -> lists:foldl(fun enb_update_prop/2, EnbInfo, PropList). -spec enb_update_prop(enb_info(), enb_prop()) -> enb_info(). enb_update_prop({state, State}, EnbInfo) -> EnbInfo#{state => State}; enb_update_prop({enb_conn_info, ConnInfo}, EnbInfo) -> EnbInfo#{enb_conn_info => ConnInfo}; enb_update_prop({mme_conn_info, undefined}, EnbInfo) -> maps:remove(mme_conn_info, EnbInfo); enb_update_prop({mme_conn_info, ConnInfo}, EnbInfo) -> EnbInfo#{mme_conn_info => ConnInfo}; enb_update_prop({genb_id, GENBId}, EnbInfo) -> GlobalENBId = s1ap_utils:genb_id_str(GENBId), enb_metrics_register(GlobalENBId), EnbInfo#{genb_id => GENBId, genb_id_str => GlobalENBId}; enb_update_prop(Prop, EnbInfo) -> ?LOG_ERROR("eNB (handle=~p) update: unknown property ~p", [maps:get(handle, EnbInfo), Prop]), EnbInfo. -spec enb_list_sort(enb_info(), enb_info()) -> boolean(). enb_list_sort(#{handle := H0}, #{handle := H1}) -> H0 =< H1. -spec enb_filter(enb_filter()) -> fun((enb_handle(), enb_info()) -> boolean()). enb_filter({genb_id_str, GlobalENBId}) -> fun(_, Item) -> enb_filter_by_field({genb_id_str, GlobalENBId}, Item) end; enb_filter({enb_sctp_aid, Aid}) -> fun(_, Item) -> enb_filter_by_sub_field({enb_conn_info, aid, Aid}, Item) end; enb_filter({mme_sctp_aid, Aid}) -> fun(_, Item) -> enb_filter_by_sub_field({mme_conn_info, mme_aid, Aid}, Item) end; enb_filter({enb_addr_port, {Addr, Port}}) -> fun(_, Item) -> enb_filter_by_sub_field({enb_conn_info, addr, Addr}, Item) andalso enb_filter_by_sub_field({enb_conn_info, port, Port}, Item) end; enb_filter({mme_addr_port, {Addr, Port}}) -> fun(_, Item) -> enb_filter_by_sub_field({mme_conn_info, mme_addr, Addr}, Item) andalso enb_filter_by_sub_field({mme_conn_info, mme_port, Port}, Item) end; enb_filter(Filter) -> ?LOG_ERROR("Unknown eNB filter: ~p", [Filter]), fun(_, _) -> false end. enb_filter_by_field({Field, Value}, EnbInfo) -> maps:get(Field, EnbInfo, undefined) =:= Value. enb_filter_by_sub_field({Map, Field, Value}, EnbInfo) -> M = maps:get(Map, EnbInfo, #{}), maps:get(Field, M, undefined) =:= Value. -spec enb_report_uptime(Handle, EnbInfo, T1) -> enb_info() when Handle :: enb_handle(), EnbInfo :: enb_info(), T1 :: integer(). enb_report_uptime(Handle, #{reg_time := T0} = EnbInfo, T1) -> Uptime = erlang:convert_time_unit(T1 - T0, native, second), ?LOG_DEBUG("eNB (handle=~p) uptime ~p s", [Handle, Uptime]), %% update ?S1GW_CTR_ENB_UPTIME case EnbInfo of #{genb_id_str := GlobalENBId} -> Current = s1gw_metrics:get_current_value(?S1GW_CTR_ENB_UPTIME(GlobalENBId)), s1gw_metrics:ctr_inc(?S1GW_CTR_ENB_UPTIME(GlobalENBId), Uptime - Current); _ -> nop end, EnbInfo#{uptime => Uptime}. -spec heartbeat(timeout()) -> no_return(). heartbeat(Tval) -> timer:sleep(Tval), gen_server:cast(?MODULE, ?FUNCTION_NAME), heartbeat(Tval). %% keep going %% vim:set ts=4 sw=4 et: