%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH %% Author: Vadim Yanitskiy %% %% All Rights Reserved %% %% SPDX-License-Identifier: AGPL-3.0-or-later %% %% This program is free software; you can redistribute it and/or modify %% it under the terms of the GNU Affero General Public License as %% published by the Free Software Foundation; either version 3 of the %% License, or (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU Affero General Public License %% along with this program. If not, see . %% %% Additional Permission under GNU AGPL version 3 section 7: %% %% If you modify this Program, or any covered work, by linking or %% combining it with runtime libraries of Erlang/OTP as released by %% Ericsson on https://www.erlang.org (or a modified version of these %% libraries), containing parts covered by the terms of the Erlang Public %% License (https://www.erlang.org/EPLICENSE), the licensors of this %% Program grant you additional permission to convey the resulting work %% without the need to license the runtime libraries of Erlang/OTP under %% the GNU Affero General Public License. Corresponding Source for a %% non-source form of such a combination shall include the source code %% for the parts of the runtime libraries of Erlang/OTP used as well as %% that of the covered work. -module(enb_registry). -behaviour(gen_server). -export([init/1, handle_info/2, handle_call/3, handle_cast/2, terminate/2]). -export([start_link/0, enb_register/0, enb_unregister/1, enb_event/2, fetch_enb_info/1, fetch_enb_list/0, fetch_enb_list/1, shutdown/0]). -include_lib("kernel/include/logger.hrl"). -include("s1gw_metrics.hrl"). %% Heartbeat interval -define(HEARTBEAT_INTERVAL, 5_000). -type enb_handle() :: non_neg_integer(). -type enb_state() :: connecting %% S1GW -> MME connection in progress | connected %% S1GW -> MME connection established | s1setup. %% S1 SETUP procedure completed -type enb_event() :: {connecting, sctp_server:conn_info()} | {connected, sctp_proxy:conn_info()} | {s1setup, s1ap_proxy:enb_info()}. -type enb_filter() :: {genb_id_str, string()} | {enb_sctp_aid, gen_sctp:assoc_id()} | {mme_sctp_aid, gen_sctp:assoc_id()} | {enb_addr_port, sctp_server:addr_port()} | {mme_addr_port, sctp_server:addr_port()}. -type enb_info() :: #{handle := enb_handle(), %% unique identifier of this eNB pid := pid(), %% pid() of the registrant mon_ref := reference(), %% monitor() reference state := enb_state(), %% connection state reg_time := integer(), %% registration time (monotonic) uptime := non_neg_integer(), %% seconds since reg_time genb_id_str => string(), %% Global-eNB-ID enb_id => s1ap_proxy:enb_id(), %% eNB-ID plmn_id => s1ap_proxy:plmn_id(), %% PLMN-ID enb_conn_info => sctp_server:conn_info(), %% eNB -> S1GW connection info mme_conn_info => sctp_proxy:conn_info() %% S1GW -> MME connection info }. -record(state, {enbs :: #{enb_handle() => enb_info()}, pids :: #{pid() => enb_handle()}, next_handle :: enb_handle() }). -export_type([enb_handle/0, enb_state/0, enb_info/0]). %% ------------------------------------------------------------------ %% public API %% ------------------------------------------------------------------ -spec start_link() -> {ok, pid()} | term(). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec enb_register() -> {ok, enb_handle()} | {error, term()}. enb_register() -> gen_server:call(?MODULE, ?FUNCTION_NAME). -spec enb_unregister(enb_handle()) -> ok | {error, term()}. enb_unregister(Handle) -> gen_server:call(?MODULE, {?FUNCTION_NAME, Handle}). -spec enb_event(enb_handle(), enb_event()) -> ok. enb_event(Handle, Event) -> gen_server:cast(?MODULE, {?FUNCTION_NAME, Handle, Event}). -spec fetch_enb_info(enb_handle() | pid()) -> {ok, enb_info()} | error. fetch_enb_info(Handle) when is_integer(Handle) -> gen_server:call(?MODULE, {?FUNCTION_NAME, {handle, Handle}}); fetch_enb_info(Pid) when is_pid(Pid) -> gen_server:call(?MODULE, {?FUNCTION_NAME, {pid, Pid}}). -spec fetch_enb_list() -> [enb_info()]. fetch_enb_list() -> gen_server:call(?MODULE, ?FUNCTION_NAME). -spec fetch_enb_list(enb_filter()) -> [enb_info()]. fetch_enb_list(Filter) -> gen_server:call(?MODULE, {?FUNCTION_NAME, Filter}). -spec shutdown() -> ok. shutdown() -> gen_server:stop(?MODULE). %% ------------------------------------------------------------------ %% gen_server API %% ------------------------------------------------------------------ init([]) -> spawn_link(fun() -> heartbeat(?HEARTBEAT_INTERVAL) end), {ok, #state{enbs = maps:new(), pids = maps:new(), next_handle = 0}}. handle_call(enb_register, {Pid, _Tag}, #state{enbs = ENBs, pids = PIDs, next_handle = Handle} = S) -> case maps:find(Pid, PIDs) of {ok, Handle} -> ?LOG_ERROR("eNB (handle=~p, pid=~p) is *already* registered", [Handle, Pid]), {reply, {error, already_registered}, S}; error -> %% keep an eye on the process being registered MonRef = erlang:monitor(process, Pid), %% create and store an initial eNB state EnbInfo = #{handle => Handle, pid => Pid, mon_ref => MonRef, state => connecting, reg_time => erlang:monotonic_time(), uptime => 0}, ?LOG_INFO("eNB (handle=~p, pid ~p) registered", [Handle, Pid]), {reply, {ok, Handle}, S#state{enbs = ENBs#{Handle => EnbInfo}, pids = PIDs#{Pid => Handle}, next_handle = Handle + 1}} end; handle_call({enb_unregister, Handle}, _From, #state{enbs = ENBs, pids = PIDs} = S) -> case maps:find(Handle, ENBs) of {ok, #{pid := Pid, mon_ref := MonRef}} -> erlang:demonitor(MonRef, [flush]), enb_metrics_reset(maps:get(Handle, ENBs)), ?LOG_INFO("eNB (handle=~p) unregistered", [Handle]), {reply, ok, S#state{enbs = maps:remove(Handle, ENBs), pids = maps:remove(Pid, PIDs)}}; error -> ?LOG_ERROR("eNB (handle=~p) is *not* registered", [Handle]), {reply, {error, not_registered}, S} end; handle_call({fetch_enb_info, {handle, Handle}}, _From, #state{enbs = ENBs} = S) -> Reply = maps:find(Handle, ENBs), {reply, Reply, S}; handle_call({fetch_enb_info, {pid, Pid}}, _From, #state{enbs = ENBs, pids = PIDs} = S) -> case maps:find(Pid, PIDs) of {ok, Handle} -> {reply, maps:find(Handle, ENBs), S}; error -> {reply, error, S} end; handle_call(fetch_enb_list, _From, #state{enbs = ENBs} = S) -> EnbList = lists:sort(fun enb_list_sort/2, maps:values(ENBs)), {reply, EnbList, S}; handle_call({fetch_enb_list, Filter}, _From, #state{enbs = ENBs} = S) -> Filtered = maps:filter(enb_filter(Filter), ENBs), EnbList = lists:sort(fun enb_list_sort/2, maps:values(Filtered)), {reply, EnbList, S}; handle_call(Info, From, S) -> ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]), {reply, {error, not_implemented}, S}. handle_cast({enb_event, Handle, Event}, #state{enbs = ENBs} = S) -> case maps:find(Handle, ENBs) of {ok, EnbInfo0} -> ?LOG_INFO("eNB (handle=~p) event: ~p", [Handle, Event]), EnbInfo1 = enb_handle_event(EnbInfo0, Event), enb_metrics_register(EnbInfo1), {noreply, S#state{enbs = maps:update(Handle, EnbInfo1, ENBs)}}; error -> ?LOG_ERROR("eNB (handle=~p) is *not* registered", [Handle]), {noreply, S} end; handle_cast(heartbeat, #state{enbs = ENBs} = S) -> T = erlang:monotonic_time(), Fun = fun(Handle, EnbInfo) -> enb_report_uptime(Handle, EnbInfo, T) end, {noreply, S#state{enbs = maps:map(Fun, ENBs)}}; handle_cast(Info, S) -> ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), {noreply, S}. handle_info({'DOWN', _MonRef, process, Pid, Reason}, #state{enbs = ENBs, pids = PIDs} = S) -> ?LOG_INFO("eNB process (pid=~p) terminated with reason ~p", [Pid, Reason]), case maps:find(Pid, PIDs) of {ok, Pid} -> Handle = maps:get(Pid, PIDs), enb_metrics_reset(maps:get(Handle, ENBs)), ?LOG_INFO("eNB (handle=~p, pid=~p) has been unregistered", [Handle, Pid]), {noreply, S#state{enbs = maps:remove(Handle, ENBs), pids = maps:remove(Pid, PIDs)}}; error -> ?LOG_ERROR("eNB (pid=~p) is *not* registered", [Pid]), {noreply, S} end; handle_info(Info, S) -> ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), {noreply, S}. terminate(Reason, _S) -> ?LOG_NOTICE("Terminating, reason ~p", [Reason]), ok. %% ------------------------------------------------------------------ %% private API %% ------------------------------------------------------------------ -spec enb_metrics_register(enb_info()) -> term(). enb_metrics_register(#{genb_id_str := GlobalENBId}) -> catch exometer:new(?S1GW_CTR_ENB_UPTIME(GlobalENBId), counter); enb_metrics_register(_) -> ok. -spec enb_metrics_reset(enb_info()) -> term(). enb_metrics_reset(#{genb_id_str := GlobalENBId}) -> s1gw_metrics:ctr_reset(?S1GW_CTR_ENB_UPTIME(GlobalENBId)); enb_metrics_reset(_) -> ok. -spec enb_handle_event(enb_info(), enb_event()) -> enb_info(). enb_handle_event(EnbInfo, {connecting, ConnInfo}) -> EnbInfo#{state => connecting, enb_conn_info => ConnInfo}; enb_handle_event(EnbInfo, {connected, ConnInfo}) -> EnbInfo#{state => connected, mme_conn_info => ConnInfo}; enb_handle_event(EnbInfo, {s1setup, Info}) -> maps:merge(EnbInfo#{state => s1setup}, Info); enb_handle_event(EnbInfo, Event) -> ?LOG_ERROR("Unhandled event: ~p", [Event]), EnbInfo. -spec enb_list_sort(enb_info(), enb_info()) -> boolean(). enb_list_sort(#{handle := H0}, #{handle := H1}) -> H0 =< H1. -spec enb_filter(enb_filter()) -> fun((enb_handle(), enb_info()) -> boolean()). enb_filter({genb_id_str, GlobalENBId}) -> fun(_, Item) -> enb_filter_by_field({genb_id_str, GlobalENBId}, Item) end; enb_filter({enb_sctp_aid, Aid}) -> fun(_, Item) -> enb_filter_by_sub_field({enb_conn_info, aid, Aid}, Item) end; enb_filter({mme_sctp_aid, Aid}) -> fun(_, Item) -> enb_filter_by_sub_field({mme_conn_info, mme_aid, Aid}, Item) end; enb_filter({enb_addr_port, {Addr, Port}}) -> fun(_, Item) -> enb_filter_by_sub_field({enb_conn_info, addr, Addr}, Item), enb_filter_by_sub_field({enb_conn_info, port, Port}, Item) end; enb_filter({mme_addr_port, {Addr, Port}}) -> fun(_, Item) -> enb_filter_by_sub_field({mme_conn_info, mme_addr, Addr}, Item), enb_filter_by_sub_field({mme_conn_info, mme_port, Port}, Item) end; enb_filter(Filter) -> ?LOG_ERROR("Unknown eNB filter: ~p", [Filter]), fun(_, _) -> false end. enb_filter_by_field({Field, Value}, EnbInfo) -> maps:get(Field, EnbInfo, undefined) =:= Value. enb_filter_by_sub_field({Map, Field, Value}, EnbInfo) -> M = maps:get(Map, EnbInfo, #{}), maps:get(Field, M, undefined) =:= Value. -spec enb_report_uptime(Handle, EnbInfo, T1) -> enb_info() when Handle :: enb_handle(), EnbInfo :: enb_info(), T1 :: integer(). enb_report_uptime(Handle, #{reg_time := T0} = EnbInfo, T1) -> Uptime = erlang:convert_time_unit(T1 - T0, native, second), ?LOG_DEBUG("eNB (handle=~p) uptime ~p s", [Handle, Uptime]), %% update ?S1GW_CTR_ENB_UPTIME case EnbInfo of #{genb_id_str := GlobalENBId} -> Current = s1gw_metrics:get_current_value(?S1GW_CTR_ENB_UPTIME(GlobalENBId)), s1gw_metrics:ctr_inc(?S1GW_CTR_ENB_UPTIME(GlobalENBId), Uptime - Current); _ -> nop end, EnbInfo#{uptime => Uptime}. -spec heartbeat(timeout()) -> no_return(). heartbeat(Tval) -> timer:sleep(Tval), gen_server:cast(?MODULE, ?FUNCTION_NAME), heartbeat(Tval). %% keep going %% vim:set ts=4 sw=4 et: