%% Copyright (C) 2024 by sysmocom - s.f.m.c. GmbH %% Author: Vadim Yanitskiy %% %% All Rights Reserved %% %% SPDX-License-Identifier: AGPL-3.0-or-later %% %% This program is free software; you can redistribute it and/or modify %% it under the terms of the GNU Affero General Public License as %% published by the Free Software Foundation; either version 3 of the %% License, or (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU Affero General Public License %% along with this program. If not, see . %% %% Additional Permission under GNU AGPL version 3 section 7: %% %% If you modify this Program, or any covered work, by linking or %% combining it with runtime libraries of Erlang/OTP as released by %% Ericsson on https://www.erlang.org (or a modified version of these %% libraries), containing parts covered by the terms of the Erlang Public %% License (https://www.erlang.org/EPLICENSE), the licensors of this %% Program grant you additional permission to convey the resulting work %% without the need to license the runtime libraries of Erlang/OTP under %% the GNU Affero General Public License. Corresponding Source for a %% non-source form of such a combination shall include the source code %% for the parts of the runtime libraries of Erlang/OTP used as well as %% that of the covered work. -module(pfcp_peer). -behaviour(gen_statem). -export([init/1, callback_mode/0, connecting/3, connected/3, code_change/4, terminate/3]). -export([start_link/2, seid_alloc/0, heartbeat_req/0, heartbeat_req/1, session_establish_req/3, session_modify_req/3, session_delete_req/1, shutdown/0]). -include_lib("kernel/include/logger.hrl"). -include_lib("pfcplib/include/pfcp_packet.hrl"). -include("s1gw_metrics.hrl"). %% 3GPP TS 29.244, section 4.2 "UDP Header and Port Numbers" -define(PFCP_PORT, 8805). -define(PFCP_SEID_MAX, 16#ffffffffffffffff). -define(PFCP_HEARTBEAT_REQ_TIMEOUT, 2000). -type pfcp_session_rsp() :: ok | {error, term()}. -type pfcp_msg_type() :: atom(). -type pfcp_seq_nr() :: 0..16#ffffff. -type pfcp_seid() :: 0..?PFCP_SEID_MAX. -type pfcp_f_seid() :: #f_seid{}. -type pfcp_ies() :: [term()] | map() | binary(). -type pfcp_msg() :: {pfcp_msg_type(), pfcp_ies()}. -type pfcp_pdu() :: #pfcp{}. -export_type([pfcp_session_rsp/0, pfcp_msg_type/0, pfcp_seq_nr/0, pfcp_seid/0, pfcp_ies/0, pfcp_msg/0, pfcp_pdu/0]). -record(heartbeat_state, {from :: undefined | gen_statem:from(), seq_nr :: pfcp_seq_nr(), pid :: pid() }). -record(peer_state, {seid :: pfcp_seid(), sock :: gen_udp:socket(), loc_addr :: inet:ip_address(), rem_addr :: inet:ip_address(), loc_rts :: pos_integer(), rem_rts :: undefined | pos_integer(), seq_nr :: pfcp_seq_nr(), registry :: dict:dict(), heartbeat :: undefined | #heartbeat_state{} }). -type peer_state() :: #peer_state{}. %% ------------------------------------------------------------------ %% public API %% ------------------------------------------------------------------ start_link(LocAddr, RemAddr) -> gen_statem:start_link({local, ?MODULE}, ?MODULE, [LocAddr, RemAddr], []). %% Request to allocate a unique SEID -spec seid_alloc() -> {ok, pfcp_seid()} | {error, term()}. seid_alloc() -> gen_statem:call(?MODULE, ?FUNCTION_NAME). -spec heartbeat_req() -> ok | {error, term()}. heartbeat_req() -> heartbeat_req(block). -spec heartbeat_req(block | noblock) -> ok | {error, term()}. heartbeat_req(block) -> gen_statem:call(?MODULE, ?FUNCTION_NAME); heartbeat_req(noblock) -> gen_statem:cast(?MODULE, ?FUNCTION_NAME). -spec session_establish_req(pfcp_seid(), list(), list()) -> pfcp_session_rsp(). session_establish_req(SEID, PDRs, FARs) -> gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID, PDRs, FARs}). -spec session_modify_req(pfcp_seid(), list(), list()) -> pfcp_session_rsp(). session_modify_req(SEID, PDRs, FARs) -> gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID, PDRs, FARs}). -spec session_delete_req(pfcp_seid()) -> pfcp_session_rsp(). session_delete_req(SEID) -> gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID}). shutdown() -> gen_statem:stop(?MODULE). %% ------------------------------------------------------------------ %% gen_statem API %% ------------------------------------------------------------------ init([LocAddrStr, RemAddr]) when is_list(LocAddrStr) -> {ok, LocAddr} = inet:parse_address(LocAddrStr), init([LocAddr, RemAddr]); init([LocAddr, RemAddrStr]) when is_list(RemAddrStr) -> {ok, RemAddr} = inet:parse_address(RemAddrStr), init([LocAddr, RemAddr]); init([LocAddr, RemAddr]) -> process_flag(trap_exit, true), s1gw_metrics:gauge_set(?S1GW_GAUGE_PFCP_ASSOCIATED, 0), {ok, Sock} = gen_udp:open(?PFCP_PORT, [binary, {ip, LocAddr}, {reuseaddr, true}, {active, true}]), ?LOG_INFO("PFCP peer @ ~p will talk to UPF @ ~p", [LocAddr, RemAddr]), {ok, connecting, #peer_state{seid = 1, %% SEID=0 is special, see 7.2.2.4.2 sock = Sock, loc_addr = LocAddr, rem_addr = RemAddr, seq_nr = 0, loc_rts = get_recovery_timestamp(), registry = dict:new()}}. callback_mode() -> [state_functions, state_enter]. %% CONNECTING state connecting(enter, OldState, #peer_state{} = S0) -> ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), s1gw_metrics:gauge_set(?S1GW_GAUGE_PFCP_ASSOCIATED, 0), %% Tx PFCP Association Setup {ok, S1} = send_assoc_setup(S0), {keep_state, S1, [{state_timeout, 2_000, assoc_setup_timeout}]}; %% Handle Association Setup timeout connecting(state_timeout, assoc_setup_timeout, S) -> % Re-start sending PFCP Association Setup above: ?LOG_NOTICE("PFCP Association Setup timeout, UPF may be down, retrying..."), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_REQ_TIMEOUT), {repeat_state, S}; %% Handle incoming PFCP PDU(s) connecting(info, {udp, Sock, FromIp, FromPort, Data}, #peer_state{sock = Sock} = S0) -> PDU = decode_pdu(Data, {FromIp, FromPort}, S0), case PDU of #pfcp{type = association_setup_response, ie = #{pfcp_cause := 'Request accepted', recovery_time_stamp := #recovery_time_stamp{time = RRTS}}} -> ?LOG_INFO("Rx Association Setup Response (Request accepted)"), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX_ACK), {next_state, connected, S0#peer_state{rem_rts = RRTS}}; #pfcp{type = association_setup_response, ie = #{pfcp_cause := Cause}} -> ?LOG_ERROR("Rx Association Setup Response (~p)", [Cause]), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX_NACK), {stop, {shutdown, assoc_setup_nack}}; %% 3GPP TS 29.244, 6.2.2.2 Heartbeat Request %% A CP function or UP function shall be prepared to receive a Heartbeat Request %% at any time (even from unknown peers) and it shall reply with a Heartbeat Response. #pfcp{type = heartbeat_request} -> {_, S1} = recv_heartbeat_request(PDU, {FromIp, FromPort}, S0), {keep_state, S1}; #pfcp{type = heartbeat_response} -> {_, S1} = recv_heartbeat_response(PDU, {FromIp, FromPort}, S0), {keep_state, S1}; _ -> ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_UNEXPECTED_PDU), {keep_state, S0} end; connecting(Event, EventData, S) -> handle_event(?FUNCTION_NAME, Event, EventData, S). %% CONNECTED state connected(enter, OldState, S) -> ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), s1gw_metrics:gauge_set(?S1GW_GAUGE_PFCP_ASSOCIATED, 1), {keep_state, S}; connected({call, From}, {session_establish_req, SEID, PDRs, FARs}, #peer_state{} = S0) -> {Result, S1} = send_session_establish_req(SEID, PDRs, FARs, S0), {keep_state, S1, [{reply, From, Result}]}; connected({call, From}, {session_modify_req, SEID, PDRs, FARs}, #peer_state{} = S0) -> {Result, S1} = send_session_modify_req(SEID, PDRs, FARs, S0), {keep_state, S1, [{reply, From, Result}]}; connected({call, From}, {session_delete_req, SEID}, #peer_state{} = S0) -> {Result, S1} = send_session_delete_req(SEID, S0), {keep_state, S1, [{reply, From, Result}]}; %% Handle incoming PFCP PDU(s) connected(info, {udp, Sock, FromIp, FromPort, Data}, #peer_state{sock = Sock} = S0) -> PDU = decode_pdu(Data, {FromIp, FromPort}, S0), case PDU of #pfcp{type = heartbeat_request} -> {_, S1} = recv_heartbeat_request(PDU, {FromIp, FromPort}, S0), {keep_state, S1}; #pfcp{type = heartbeat_response} -> {_, S1} = recv_heartbeat_response(PDU, {FromIp, FromPort}, S0), {keep_state, S1}; #pfcp{seid = SEID} when is_integer(SEID) -> route_pdu(PDU, S0), {keep_state, S0}; _ -> ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_UNEXPECTED_PDU), {keep_state, S0} end; %% Catch-all handler for this state connected(Event, EventData, S) -> handle_event(?FUNCTION_NAME, Event, EventData, S). %% Event handler for all states handle_event(_State, {call, From}, seid_alloc, #peer_state{seid = SEID} = S0) -> {Pid, _Ref} = From, S1 = registry_add(Pid, S0), {keep_state, S1, [{reply, From, {ok, SEID}}]}; %% Heartbeat Req (non-blocking) handle_event(_State, cast, heartbeat_req, #peer_state{} = S0) -> {_, S1} = send_heartbeat_request(undefined, S0), {keep_state, S1}; %% Heartbeat Req (blocking) handle_event(_State, {call, From}, heartbeat_req, #peer_state{} = S0) -> case send_heartbeat_request(From, S0) of {ok, S1} -> %% postpone reply until we get the Resp {keep_state, S1}; {Error, S1} -> {keep_state, S1, [{reply, From, Error}]} end; %% Heartbeat Req (timeout) handle_event(_State, cast, heartbeat_request_watchdog, #peer_state{heartbeat = HB} = S) -> case HB of #heartbeat_state{from = From, seq_nr = SeqNr} -> ?LOG_NOTICE("Heartbeat Request (SeqNr=~p) timeout", [SeqNr]), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_HEARTBEAT_REQ_TIMEOUT), if From =/= undefined -> gen_statem:reply(From, {error, timeout}); true -> ok end, {keep_state, S#peer_state{heartbeat = undefined}}; undefined -> {keep_state, S} end; handle_event(_State, {call, From}, {session_establish_req, _SEID, _PDRs, _FARs}, #peer_state{} = S) -> {keep_state, S, [{reply, From, {error, not_connected}}]}; handle_event(_State, info, {'DOWN', Ref, process, Pid, _Reason}, #peer_state{} = S0) -> S1 = registry_del({Pid, Ref}, S0), {keep_state, S1}; handle_event(State, Event, EventData, S) -> ?LOG_ERROR("Unexpected event ~p in state ~p: ~p", [Event, State, EventData]), {keep_state, S}. code_change(_Vsn, State, S, _Extra) -> {ok, State, S}. terminate(Reason, State, S) -> ?LOG_NOTICE("Terminating in state ~p, reason ~p", [State, Reason]), case State of connected -> send_assoc_release(S), s1gw_metrics:gauge_set(?S1GW_GAUGE_PFCP_ASSOCIATED, 0); _ -> nop end, gen_udp:close(S#peer_state.sock), ok. %% ------------------------------------------------------------------ %% private API %% ------------------------------------------------------------------ -spec registry_add(pid(), peer_state()) -> peer_state(). registry_add(Pid, #peer_state{registry = Reg, seid = SEID} = S) -> Ref = erlang:monitor(process, Pid), NReg = dict:store(SEID, {Pid, Ref}, Reg), ?LOG_DEBUG("Allocated SEID ~p to process ~p", [SEID, Pid]), NSEID = case SEID of ?PFCP_SEID_MAX -> 1; _ -> SEID + 1 end, S#peer_state{registry = NReg, seid = NSEID}. -spec registry_del({pid(), reference()}, peer_state()) -> peer_state(). registry_del({Pid, Ref}, #peer_state{registry = Reg} = S) -> ?LOG_DEBUG("Unregister process ~p", [Pid]), Fun = fun(_Key, Val) -> Val =/= {Pid, Ref} end, NReg = dict:filter(Fun, Reg), S#peer_state{registry = NReg}. -spec decode_pdu(binary(), tuple(), peer_state()) -> pfcp_pdu(). decode_pdu(Data, {FromIp, FromPort}, _S) -> ?LOG_DEBUG("Rx encoded PFCP PDU from ~p:~p: ~p", [FromIp, FromPort, Data]), PDU = pfcp_packet:decode(Data), ?LOG_DEBUG("Rx PFCP PDU from ~p:~p: ~p", [FromIp, FromPort, PDU]), %% TODO: check PDU#pfcp.seq_no %% TODO: there can be batched PDUs PDU. -spec route_pdu(pfcp_pdu(), peer_state()) -> term(). route_pdu(#pfcp{seid = SEID} = PDU, #peer_state{registry = Reg}) -> case dict:find(SEID, Reg) of {ok, {Pid, _Ref}} -> ?LOG_DEBUG("PFCP PDU routed to ~p", [Pid]), Pid ! PDU; %% XXX: we may crash here error -> ?LOG_NOTICE("PFCP PDU cannot be routed: ~p", [PDU]) end. -spec send_data(binary(), peer_state()) -> ok | {error, term()}. send_data(Data, #peer_state{sock = Sock, rem_addr = RemAddr}) -> ?LOG_DEBUG("Tx encoded PFCP PDU to ~p:~p: ~p", [RemAddr, ?PFCP_PORT, Data]), gen_udp:send(Sock, RemAddr, ?PFCP_PORT, Data). -spec send_pdu(pfcp_msg(), peer_state()) -> {ok, peer_state()} | {{error, term()}, peer_state()}. send_pdu({MsgType, IEs}, S) -> send_pdu(undefined, {MsgType, IEs}, S). -spec send_pdu(pfcp_seid() | undefined, pfcp_msg(), peer_state()) -> {ok, peer_state()} | {{error, term()}, peer_state()}. send_pdu(SEID, {MsgType, IEs}, #peer_state{seq_nr = SeqNr} = S) -> PDU = #pfcp{version = v1, type = MsgType, seid = SEID, seq_no = SeqNr, ie = IEs}, ?LOG_DEBUG("Tx PFCP PDU: ~p", [PDU]), Data = pfcp_packet:encode(PDU), case send_data(Data, S) of ok -> {ok, S#peer_state{seq_nr = SeqNr + 1}}; {error, Error} -> {{error, Error}, S} end. -spec get_node_id(peer_state()) -> binary(). get_node_id(#peer_state{loc_addr = LocAddr}) -> list_to_binary(tuple_to_list(LocAddr)). -spec get_recovery_timestamp() -> pos_integer(). get_recovery_timestamp() -> {NowMS, NowS, _NowUS} = erlang:timestamp(), %% erlang:timestamp() returns time relative to the UNIX epoch (1970), %% but for PFCP we need time relative to the NTP era 0 (1900). %% 2208988800 is the diff between 1900 and 1970. NowMS * 1_000_000 + NowS + 2208988800. -spec get_f_seid(pfcp_seid(), peer_state()) -> pfcp_f_seid(). get_f_seid(SEID, S) -> IE = #f_seid{seid = SEID, ipv4 = undefined, ipv6 = undefined}, NodeId = get_node_id(S), case NodeId of << _:32 >> -> IE#f_seid{ipv4 = NodeId}; << _:128 >> -> IE#f_seid{ipv6 = NodeId}; _ -> IE %% shall not happen end. %% 6.2.6 PFCP Association Setup Procedure %% 7.4.4.1 PFCP Association Setup Request send_assoc_setup(#peer_state{loc_rts = LRTS} = S) -> IEs = #{node_id => #node_id{id = get_node_id(S)}, recovery_time_stamp => #recovery_time_stamp{time = LRTS}}, s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_REQ_TX), send_pdu({association_setup_request, IEs}, S). %% 6.2.8 PFCP Association Release Procedure %% 7.4.4.5 PFCP Association Release Request send_assoc_release(S) -> IEs = #{node_id => #node_id{id = get_node_id(S)}}, send_pdu({association_release_request, IEs}, S). %% 6.3.2 PFCP Session Establishment Procedure %% 7.5.2 PFCP Session Establishment Request send_session_establish_req(F_SEID, PDRs, FARs, S) -> IEs = #{node_id => #node_id{id = get_node_id(S)}, f_seid => get_f_seid(F_SEID, S), create_pdr => PDRs, create_far => FARs}, send_pdu(16#00, {session_establishment_request, IEs}, S). %% 6.3.3 PFCP Session Modification Procedure %% 7.5.4 PFCP Session Modification Request send_session_modify_req(SEID, PDRs, FARs, S) -> IEs = #{update_pdr => PDRs, update_far => FARs}, send_pdu(SEID, {session_modification_request, IEs}, S). %% 6.3.4 PFCP Session Deletion Procedure %% 7.5.6 PFCP Session Deletion Request send_session_delete_req(SEID, S) -> send_pdu(SEID, {session_deletion_request, []}, S). %% 6.2.2 Heartbeat Procedure %% 7.4.2 Heartbeat Messages send_heartbeat_request(_From, #peer_state{heartbeat = #heartbeat_state{seq_nr = SeqNr}} = S) -> ?LOG_ERROR("Another Heartbeat Request is still pending (SeqNr=~p)", [SeqNr]), {{error, heartbeat_req_pending}, S}; send_heartbeat_request(From, #peer_state{heartbeat = undefined, seq_nr = SeqNr, loc_rts = LRTS} = S0) -> ReqIEs = #{recovery_time_stamp => #recovery_time_stamp{time = LRTS}}, ?LOG_INFO("Tx Heartbeat Request (SeqNr=~p): ~p", [SeqNr, ReqIEs]), case send_pdu({heartbeat_request, ReqIEs}, S0) of {ok, S1} -> s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_HEARTBEAT_REQ_TX), Pid = spawn(fun heartbeat_request_watchdog/0), HB = #heartbeat_state{from = From, seq_nr = SeqNr, pid = Pid}, {ok, S1#peer_state{heartbeat = HB}}; Result -> Result end. heartbeat_request_watchdog() -> receive heartbeat_response -> ok after ?PFCP_HEARTBEAT_REQ_TIMEOUT -> gen_statem:cast(?MODULE, ?FUNCTION_NAME) end. recv_heartbeat_response(#pfcp{type = heartbeat_response, seq_no = SeqNr, ie = RspIEs}, {_FromIp, _FromPort}, #peer_state{heartbeat = HB} = S) -> ?LOG_INFO("Rx Heartbeat Response (SeqNr=~p): ~p", [SeqNr, RspIEs]), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_HEARTBEAT_RESP_RX), case HB of #heartbeat_state{from = From, seq_nr = SeqNr, pid = Pid} -> if From =/= undefined -> gen_statem:reply(From, ok); true -> ok end, Pid ! heartbeat_response, {ok, S#peer_state{heartbeat = undefined}}; _ -> ?LOG_NOTICE("Heartbeat Response (SeqNr=~p) was not expected", [SeqNr]), {{error, unexpected}, S} end. recv_heartbeat_request(#pfcp{type = heartbeat_request, seq_no = SeqNr, ie = ReqIEs}, {FromIp, FromPort}, #peer_state{loc_rts = LRTS} = S) -> ?LOG_INFO("Rx Heartbeat Request from ~p:~p: ~p", [FromIp, FromPort, ReqIEs]), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_HEARTBEAT_REQ_RX), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_HEARTBEAT_RESP_TX), RspIEs = #{recovery_time_stamp => #recovery_time_stamp{time = LRTS}}, %% Fake #peer_state to force the right destination and SeqNr {Result, _} = send_pdu({heartbeat_response, RspIEs}, S#peer_state{seq_nr = SeqNr, rem_addr = FromIp}), {Result, S}. %% vim:set ts=4 sw=4 et: