/*
 * (C) 2021-2022 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
 * All Rights Reserved.
 *
 * Author: Neels Janosch Hofmeyr <nhofmeyr@sysmocom.de>
 *
 * SPDX-License-Identifier: GPL-2.0+
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 */

#include <errno.h>

#include <osmocom/core/utils.h>
#include <osmocom/core/fsm.h>

#include <osmocom/pfcp/pfcp_msg.h>
#include <osmocom/pfcp/pfcp_endpoint.h>

#include <osmocom/upf/upf.h>
#include <osmocom/upf/up_endpoint.h>
#include <osmocom/upf/up_peer.h>
#include <osmocom/upf/up_session.h>
#include <osmocom/upf/up_gtp_action.h>
#include <osmocom/upf/netinst.h>

static enum osmo_pfcp_cause up_session_setup_gtp(struct up_session *session);

void up_session_set_msg_ctx(struct up_session *session, struct osmo_pfcp_msg *m)
{
	if (!m->ctx.peer_fi)
		up_peer_set_msg_ctx(session->up_peer, m);

	OSMO_ASSERT(!m->ctx.session_fi);

	m->ctx.session_fi = session->fi;
	m->ctx.session_use_count = &session->use_count;
	m->ctx.session_use_token = (m->rx ? UP_USE_MSG_RX : UP_USE_MSG_TX);
	OSMO_ASSERT(osmo_use_count_get_put(m->ctx.session_use_count, m->ctx.session_use_token, 1) == 0);
}

enum up_session_fsm_state {
	UP_SESSION_ST_INIT,
	UP_SESSION_ST_ESTABLISHED,
	UP_SESSION_ST_WAIT_USE_COUNT,
};

static const struct value_string up_session_fsm_event_names[] = {
	OSMO_VALUE_STRING(UP_SESSION_EV_RX_SESSION_EST_REQ),
	OSMO_VALUE_STRING(UP_SESSION_EV_RX_SESSION_MOD_REQ),
	OSMO_VALUE_STRING(UP_SESSION_EV_RX_SESSION_DEL_REQ),
	OSMO_VALUE_STRING(UP_SESSION_EV_USE_COUNT_ZERO),
	{}
};

static struct osmo_fsm up_session_fsm;

static const struct osmo_tdef_state_timeout up_session_fsm_timeouts[32] = {
	[UP_SESSION_ST_INIT] = { .T = 0 },
	[UP_SESSION_ST_ESTABLISHED] = { .T = 0 },
	[UP_SESSION_ST_WAIT_USE_COUNT] = { .T = 0 },
};

/* Transition to a state, using the T timer defined in up_session_fsm_timeouts.
 * Assumes local variable fi exists. */
#define up_session_fsm_state_chg(STATE) do { \
		if (fi->state != STATE) \
			osmo_tdef_fsm_inst_state_chg(fi, STATE, \
						     up_session_fsm_timeouts, \
						     osmo_pfcp_tdefs, \
						     5); \
	} while (0)

static int up_session_fsm_timer_cb(struct osmo_fsm_inst *fi)
{
	/* Return 1 to terminate FSM instance, 0 to keep running */
	return 1;
}

struct osmo_pfcp_msg *up_session_init_tx(struct up_session *session, struct osmo_pfcp_msg *in_reply_to,
					 enum osmo_pfcp_message_type message_type)
{
	struct osmo_pfcp_msg *tx = up_peer_init_tx(session->up_peer, in_reply_to, message_type);
	tx->h.seid = session->cp_f_seid.seid;
	tx->h.seid_present = true;
	up_session_set_msg_ctx(session, tx);
	return tx;
}

struct chosen_f_teid {
	struct llist_head entry;
	uint8_t choose_id;
	struct osmo_pfcp_ie_f_teid f_teid;
};

struct chosen_f_teid *chosen_f_teid_find(struct llist_head *list, uint8_t choose_id)
{
	struct chosen_f_teid *chosen;
	llist_for_each_entry(chosen, list, entry) {
		if (chosen->choose_id == choose_id)
			return chosen;
	}
	return NULL;
}

/* Find local interface's IP address by Network Instance name. Return 0 on success, or an OSMO_PFCP_CAUSE_* value on
 * failure. */
static int up_session_choose_local_ip(struct up_session *session, struct osmo_pfcp_ip_addrs *local_addr,
				      const char *netinst_name)
{
	const struct network_instance *netinst;
	struct osmo_sockaddr osa = {};

	if (llist_empty(&g_upf->netinst)) {
		/* No network instances are configured in osmo-upf.cfg. Instead use the local address configured for
		 * PFCP, assuming that in a simplistic setup the host has only one interface. It is unlikely to be
		 * useful for a production environment where the entire point is to hand packet data from one interface
		 * to another, and where PFCP most probably happens on an entirely different interface, but may make
		 * things simpler for lab testing. */
		if (osmo_pfcp_ip_addrs_set(local_addr,
					   osmo_pfcp_endpoint_get_local_addr(session->up_peer->up_endpoint->pfcp_ep))) {
			LOGPFSML(session->fi, LOGL_ERROR, "Invalid local address in pfcp_endpoint cfg\n");
			return OSMO_PFCP_CAUSE_SYSTEM_FAILURE;
		}
		LOGPFSML(session->fi, LOGL_NOTICE,
			 "Cannot look up Network Instance %s: No 'netinst' is configured, setting up GTP on same local"
			 " interface as PFCP: %s (makes sense only for lab testing)\n",
			 osmo_quote_str_c(OTC_SELECT, netinst_name, -1),
			 osmo_pfcp_ip_addrs_to_str_c(OTC_SELECT, local_addr));
		return 0;
	}

	if (!netinst_name || !*netinst_name) {
		/* Empty or no Network Instance IE in incoming PFCP request. Pick the first network instance; makes
		 * sense only in a simplistic lab setup where packet data is forwarded to the same interface that it is
		 * received on, and where no Network Instance is indicated by the CPF. Warn if more than one network
		 * instance is configured to choose from. */
		if (llist_count(&g_upf->netinst) > 1)
			LOGPFSML(session->fi, LOGL_NOTICE,
				 "Missing Network Instance in incoming request, using the first 'netinst' from cfg\n");
		netinst = netinst_first(&g_upf->netinst);
		/* there has to be a first entry, because we handled the empty list above. */
		OSMO_ASSERT(netinst);
	} else {
		netinst = netinst_find(&g_upf->netinst, netinst_name);
		if (!netinst) {
			LOGPFSML(session->fi, LOGL_ERROR, "Network Instance from PFCP request not found: %s"
				 " -- ensure there is a 'netinst' / 'add %s <ip-addr>' entry in your config\n",
				 osmo_quote_str_c(OTC_SELECT, netinst_name, -1),
				 osmo_escape_str_c(OTC_SELECT, netinst_name, -1));
			return OSMO_PFCP_CAUSE_RULE_CREATION_MOD_FAILURE;
		}
	}

	/* Convert netinst IP address string first to osmo_sockaddr and then to osmo_pfcp_ip_addrs. */
	if (osmo_sockaddr_str_to_sockaddr(&netinst->addr, &osa.u.sas)
	    || osmo_pfcp_ip_addrs_set(local_addr, &osa)) {
		LOGPFSML(session->fi, LOGL_ERROR,
			 "Network Instance %s from PFCP request yields no valid IP address: "
				OSMO_SOCKADDR_STR_FMT "\n",
			 osmo_quote_str_c(OTC_SELECT, netinst_name, -1),
			 OSMO_SOCKADDR_STR_FMT_ARGS(&netinst->addr));
		return OSMO_PFCP_CAUSE_RULE_CREATION_MOD_FAILURE;
	}
	return 0;
}

/* Choose an F-TEID (when the peer has sent CHOOSE = 1).
 * If the peer also sent a CHOOSE_ID, then remember this F-TEID choice under the given ID, and re-use that choice when
 * the same ID re-appears. The chosen IDs are saved in session->chosen_f_teids.
 * Return 0 on success, or an OSMO_PFCP_CAUSE_* value on failure. */
static enum osmo_pfcp_cause up_session_choose_f_teid(struct up_session *session, struct osmo_pfcp_ie_f_teid *dst,
						     bool choose_id_present, uint8_t choose_id,
						     const char *netinst_name)
{
	struct chosen_f_teid *chosen;
	int rc;

	if (choose_id_present) {
		chosen = chosen_f_teid_find(&session->chosen_f_teids, choose_id);
		if (chosen) {
			/* Re-use a previous F-TEID */
			*dst = chosen->f_teid;
			return OSMO_PFCP_CAUSE_REQUEST_ACCEPTED;
		}
		/* No previous F-TEID found, allocate a new one below */
	}

	*dst = (struct osmo_pfcp_ie_f_teid){
		.choose_flag = false,
	};

	/* Determine local IP address from Network Instance value received in PFCP request */
	rc = up_session_choose_local_ip(session, &dst->fixed.ip_addr, netinst_name);
	if (rc)
		return rc;

	/* Choose a new TEID */
	dst->fixed.teid = upf_next_local_teid();
	if (dst->fixed.teid == 0) {
		LOGPFSML(session->fi, LOGL_ERROR, "Failed to allocate an unused TEID\n");
		return OSMO_PFCP_CAUSE_PFCP_ENTITY_IN_CONGESTION;
	}

	LOGPFSML(session->fi, LOGL_INFO, "Allocated new local F-TEID %s\n",
		 osmo_pfcp_ie_f_teid_to_str_c(OTC_SELECT, dst));

	/* Save this choice */
	if (choose_id_present) {
		chosen = talloc(session, struct chosen_f_teid);
		*chosen = (struct chosen_f_teid){
			.f_teid = *dst,
			.choose_id = choose_id,
		};
		llist_add_tail(&chosen->entry, &session->chosen_f_teids);
	}
	return OSMO_PFCP_CAUSE_REQUEST_ACCEPTED;
}

static struct far *far_create(struct up_session *session,
			      const struct osmo_pfcp_ie_create_far *create_far)
{
	struct far *far = talloc(session, struct far);
	*far = (struct far){
		.session = session,
		.desc = *create_far,
	};
	llist_add_tail(&far->entry, &session->fars);

	return far;
}

static struct far *far_find(struct up_session *session, uint32_t far_id)
{
	struct far *far;
	llist_for_each_entry(far, &session->fars, entry) {
		if (far->desc.far_id == far_id)
			return far;
	}
	return NULL;
}

static void far_upd(struct far *far, const struct osmo_pfcp_ie_upd_far *upd)
{
	if (upd->apply_action_present)
		far->desc.apply_action = upd->apply_action;
	if (upd->upd_forw_params_present) {
		const struct osmo_pfcp_ie_upd_forw_params *u = &upd->upd_forw_params;
		struct osmo_pfcp_ie_forw_params *p = &far->desc.forw_params;
		far->desc.forw_params_present = true;
		if (u->destination_iface_present)
			p->destination_iface = u->destination_iface;
		if (u->network_inst_present) {
			p->network_inst = u->network_inst;
			p->network_inst_present = true;
		}
		if (u->outer_header_creation_present) {
			p->outer_header_creation = u->outer_header_creation;
			p->outer_header_creation_present = true;
		}
		if (u->linked_te_id_present) {
			p->linked_te_id = u->linked_te_id;
			p->linked_te_id_present = true;
		}
		if (u->destination_iface_type_present) {
			p->destination_iface_type = u->destination_iface_type;
			p->destination_iface_type_present = true;
		}
	}
}

struct pdr;
static void pdr_classify(struct pdr *pdr);

static void far_del(struct far *far)
{
	struct pdr *pdr;
	llist_for_each_entry(pdr, &far->session->pdrs, entry) {
		if (pdr->far == far) {
			pdr->far = NULL;
			pdr_classify(pdr);
		}
	}

	llist_del(&far->entry);
	talloc_free(far);
}

static int far_to_str_buf(char *buf, size_t len, const struct far *far)
{
	struct osmo_strbuf sb = { .buf = buf, .len = len };
	const struct osmo_pfcp_ie_create_far *f = &far->desc;

	OSMO_STRBUF_PRINTF(sb, "FAR-%u{", f->far_id);
	OSMO_STRBUF_APPEND(sb, osmo_pfcp_bits_to_str_buf, f->apply_action.bits, osmo_pfcp_apply_action_strs);
	if (f->forw_params_present) {
		OSMO_STRBUF_PRINTF(sb, " dst:%s", osmo_pfcp_dest_iface_str(f->forw_params.destination_iface));
		if (f->forw_params.outer_header_creation_present) {
			OSMO_STRBUF_PRINTF(sb, ",");
			OSMO_STRBUF_APPEND(sb, osmo_pfcp_ie_outer_header_creation_to_str_buf,
					   &f->forw_params.outer_header_creation);
		}
	}
	OSMO_STRBUF_PRINTF(sb, "}");
	return sb.chars_needed;
}

#if 0
static char *far_to_str_c(void *ctx, const struct far *far)
{
	OSMO_NAME_C_IMPL(ctx, 64, "ERROR", far_to_str_buf, far)
}
#endif

int pdr_to_str_buf(char *buf, size_t buflen, const struct pdr *pdr)
{
	struct osmo_strbuf sb = { .buf = buf, .len = buflen };
	const struct osmo_pfcp_ie_create_pdr *d = &pdr->desc;

	OSMO_STRBUF_PRINTF(sb, "PDR-%u{src:%s", d->pdr_id, osmo_pfcp_source_iface_str(d->pdi.source_iface));
	if (pdr->desc.pdi.ue_ip_address_present) {
		if (pdr->desc.pdi.ue_ip_address.ip_addr.v4_present) {
			OSMO_STRBUF_PRINTF(sb, " %s",
					   pdr->desc.pdi.ue_ip_address.ip_is_destination ? "dst:" : "src:");
			OSMO_STRBUF_APPEND(sb, osmo_sockaddr_to_str_buf2, &pdr->desc.pdi.ue_ip_address.ip_addr.v4);
		}
		if (pdr->desc.pdi.ue_ip_address.ip_addr.v6_present) {
			OSMO_STRBUF_PRINTF(sb, " ");
			OSMO_STRBUF_APPEND(sb, osmo_sockaddr_to_str_buf2, &pdr->desc.pdi.ue_ip_address.ip_addr.v6);
		}
	}
	if (pdr->desc.pdi.network_inst_present) {
		OSMO_STRBUF_PRINTF(sb, " netinst:");
		OSMO_STRBUF_APPEND(sb, osmo_quote_str_buf3, pdr->desc.pdi.network_inst.str, -1);
	}
	if (pdr->local_f_teid) {
		OSMO_STRBUF_PRINTF(sb, " ");
		OSMO_STRBUF_APPEND(sb, osmo_pfcp_ie_f_teid_to_str_buf, pdr->local_f_teid);
	}
	if (d->outer_header_removal_present)
		OSMO_STRBUF_PRINTF(sb, " decaps-%s",
				   osmo_pfcp_outer_header_removal_desc_str(d->outer_header_removal.desc));
	OSMO_STRBUF_PRINTF(sb, "}");

	if (pdr->far) {
		OSMO_STRBUF_PRINTF(sb, " --> ");
		OSMO_STRBUF_APPEND(sb, far_to_str_buf, pdr->far);
	}

	return sb.chars_needed;
}

char *pdr_to_str_c(void *ctx, const struct pdr *pdr)
{
	OSMO_NAME_C_IMPL(ctx, 128, "ERROR", pdr_to_str_buf, pdr)
}

static struct pdr *pdr_find(struct up_session *session, uint16_t pdr_id)
{
	struct pdr *pdr;
	llist_for_each_entry(pdr, &session->pdrs, entry) {
		if (pdr->desc.pdr_id == pdr_id)
			return pdr;
	}
	return NULL;
}

static void pdr_del(struct pdr *pdr)
{
	if (!hlist_unhashed(&pdr->node_by_local_f_teid))
		hash_del(&pdr->node_by_local_f_teid);
	llist_del(&pdr->entry);
	talloc_free(pdr);
}

static void pdr_set_far(struct pdr *pdr, struct far *far)
{
	pdr->far = far;
}

/* Set up a new Packet Detection Rule, append the response to the end of the created_pdr/created_pdr_count array. */
static struct pdr *pdr_create(struct up_session *session,
			      const struct osmo_pfcp_ie_create_pdr *create_pdr,
			      enum osmo_pfcp_cause *cause,
			      bool *offending_ie_present,
			      enum osmo_pfcp_iei *offending_ie,
			      struct osmo_pfcp_ie_created_pdr created_pdr[],
			      unsigned int *created_pdr_count,
			      size_t created_pdr_maxcount)
{
	struct pdr *pdr = NULL;

	/* Is there still room in the response for 'Created PDR' IEs? */
	if (*created_pdr_count >= created_pdr_maxcount) {
		*cause = OSMO_PFCP_CAUSE_NO_RESOURCES_AVAILABLE;
		goto nack_resp;
	}

	pdr = talloc(session, struct pdr);
	*pdr = (struct pdr){
		.session = session,
		.desc = *create_pdr,
	};
	INIT_HLIST_NODE(&pdr->node_by_local_f_teid);
	llist_add_tail(&pdr->entry, &session->pdrs);

	if (pdr->desc.far_id_present) {
		struct far *far = far_find(session, pdr->desc.far_id);
		if (!far) {
			LOGPFSML(session->fi, LOGL_ERROR, "PDR-%u requests FAR-%u, but there is no such FAR\n",
				 pdr->desc.pdr_id, pdr->desc.far_id);
			*cause = OSMO_PFCP_CAUSE_REQUEST_REJECTED;
			*offending_ie_present = true;
			*offending_ie = OSMO_PFCP_IEI_FAR_ID;
			goto nack_resp;
		}
		pdr_set_far(pdr, far);
	} else if (pdr->desc.activate_predefined_rules_present) {
		LOGPFSML(session->fi, LOGL_ERROR,
			 "Predefined Rules feature not implemented: PDR-%u requests predefined rule '%s'\n",
			 pdr->desc.pdr_id, pdr->desc.activate_predefined_rules.str);
		*cause = OSMO_PFCP_CAUSE_RULE_CREATION_MOD_FAILURE;
		*offending_ie_present = true;
		*offending_ie = OSMO_PFCP_IEI_ACTIVATE_PREDEFINED_RULES;
		goto nack_resp;
	} else {
		LOGPFSML(session->fi, LOGL_ERROR, "No FAR defined for PDR-%u\n", pdr->desc.pdr_id);
		*cause = OSMO_PFCP_CAUSE_CONDITIONAL_IE_MISSING;
		*offending_ie_present = true;
		*offending_ie = OSMO_PFCP_IEI_FAR_ID;
		goto nack_resp;
	}

	/* Figure out the TEID and IP address for this PDR */
	if (pdr->desc.pdi.local_f_teid_present) {
		if (pdr->desc.pdi.local_f_teid.choose_flag) {
			/* CHOOSE = 1: we need to pick our own local F-TEID */
			struct osmo_pfcp_ie_f_teid local_f_teid;
			const char *netinst_name = NULL;
			if (pdr->desc.pdi.network_inst_present)
				netinst_name = pdr->desc.pdi.network_inst.str;
			*cause = up_session_choose_f_teid(session, &local_f_teid,
							  pdr->desc.pdi.local_f_teid.choose.choose_id_present,
							  pdr->desc.pdi.local_f_teid.choose.choose_id,
							  netinst_name);
			if (*cause != OSMO_PFCP_CAUSE_REQUEST_ACCEPTED) {
				*offending_ie = OSMO_PFCP_IEI_F_TEID;
				*offending_ie_present = true;
				goto nack_resp;
			}
			pdr->_local_f_teid_buf = local_f_teid;
			pdr->local_f_teid = &pdr->_local_f_teid_buf;
		} else {
			/* CHOOSE = 0: just use the provided TEID and IP addr */
			pdr->local_f_teid = &pdr->desc.pdi.local_f_teid;
			if (!(pdr->local_f_teid->fixed.ip_addr.v4_present
			      || pdr->local_f_teid->fixed.ip_addr.v6_present)) {
				LOGPFSML(session->fi, LOGL_ERROR,
					 "peer requested to choose an F-TEID, but neither IPv4 nor IPv6 is selected"
					 " in the incoming Create PDR IE for PDR-%u.\n",
					 pdr->desc.pdr_id);
				*cause = OSMO_PFCP_CAUSE_RULE_CREATION_MOD_FAILURE;
				*offending_ie = OSMO_PFCP_IEI_F_TEID;
				*offending_ie_present = true;
				goto nack_resp;
			}
		}
	}

	if (pdr->local_f_teid) {
		created_pdr[*created_pdr_count] = (struct osmo_pfcp_ie_created_pdr){
			.pdr_id = pdr->desc.pdr_id,
			.local_f_teid_present = true,
			.local_f_teid = *pdr->local_f_teid,
		};
		hash_add(g_upf->gtp.pdrs_by_local_f_teid, &pdr->node_by_local_f_teid, pdr->local_f_teid->fixed.teid);
	} else {
		created_pdr[*created_pdr_count] = (struct osmo_pfcp_ie_created_pdr){
			.pdr_id = pdr->desc.pdr_id,
		};
	}
	(*created_pdr_count)++;

	LOGPFSML(session->fi, LOGL_INFO, "New %s\n", pdr_to_str_c(OTC_SELECT, pdr));

	return pdr;

nack_resp:
	if (pdr)
		pdr_del(pdr);
	if (!*offending_ie_present) {
		*offending_ie = OSMO_PFCP_IEI_CREATE_PDR;
		*offending_ie_present = true;
	}
	return NULL;
}

static struct pdr *pdr_upd(struct pdr *pdr,
			   const struct osmo_pfcp_ie_upd_pdr *update_pdr,
			   enum osmo_pfcp_cause *cause,
			   bool *offending_ie_present,
			   enum osmo_pfcp_iei *offending_ie,
			   struct osmo_pfcp_ie_updated_pdr updated_pdr[],
			   unsigned int *updated_pdr_count,
			   size_t updated_pdr_maxcount)
{
	struct up_session *session = pdr->session;

	/* Is there still room in the response for 'Updated PDR' IEs? */
	if (*updated_pdr_count >= updated_pdr_maxcount) {
		*cause = OSMO_PFCP_CAUSE_NO_RESOURCES_AVAILABLE;
		goto nack_resp;
	}

	if (update_pdr->outer_header_removal_present) {
		pdr->desc.outer_header_removal = update_pdr->outer_header_removal;
		pdr->desc.outer_header_removal_present = true;
	}
	if (update_pdr->pdi_present)
		pdr->desc.pdi = update_pdr->pdi;

	if (update_pdr->far_id_present) {
		struct far *far = far_find(session, update_pdr->far_id);
		if (!far) {
			LOGPFSML(session->fi, LOGL_ERROR, "PDR-%u requests FAR-%u, but there is no such FAR\n",
				 pdr->desc.pdr_id, pdr->desc.far_id);
			*cause = OSMO_PFCP_CAUSE_REQUEST_REJECTED;
			*offending_ie_present = true;
			*offending_ie = OSMO_PFCP_IEI_FAR_ID;
			goto nack_resp;
		}
		pdr_set_far(pdr, far);
	}

	if (pdr->local_f_teid) {
		updated_pdr[*updated_pdr_count] = (struct osmo_pfcp_ie_updated_pdr){
			.pdr_id = pdr->desc.pdr_id,
			.local_f_teid_present = true,
			.local_f_teid = *pdr->local_f_teid,
		};
	} else {
		updated_pdr[*updated_pdr_count] = (struct osmo_pfcp_ie_updated_pdr){
			.pdr_id = pdr->desc.pdr_id,
		};
	}
	(*updated_pdr_count)++;

	pdr_classify(pdr);
	LOGPFSML(session->fi, LOGL_INFO, "Updated %s\n", pdr_to_str_c(OTC_SELECT, pdr));

	return pdr;

nack_resp:
	pdr_del(pdr);
	if (!*offending_ie_present) {
		*offending_ie = OSMO_PFCP_IEI_UPD_PDR;
		*offending_ie_present = true;
	}
	return NULL;
}

char *up_session_gtp_status(struct up_session *session)
{
	struct pdr *pdr;
	struct far *far;
	int active_pdrs = 0;
	int total_pdrs = 0;
	int active_fars = 0;
	int total_fars = 0;

	llist_for_each_entry(pdr, &session->pdrs, entry) {
		if (pdr->active)
			active_pdrs++;
		total_pdrs++;
	}

	llist_for_each_entry(far, &session->fars, entry) {
		if (far->active)
			active_fars++;
		total_fars++;
	}

	return talloc_asprintf(OTC_SELECT, "PDR-active:%d/%d FAR-active:%d/%d GTP-active:%u",
			       active_pdrs, total_pdrs, active_fars, total_fars,
			       llist_count(&session->active_gtp_actions));
}

static void up_session_est(struct up_session *session, struct osmo_pfcp_msg *m)
{
	struct osmo_fsm_inst *fi = session->fi;
	struct up_peer *peer = session->up_peer;
	struct osmo_pfcp_msg_session_est_req *req = &m->ies.session_est_req;
	struct osmo_pfcp_msg_session_est_resp *resp;
	struct osmo_pfcp_msg *tx;
	int i;
	int rc;

	tx = up_session_init_tx(session, m, OSMO_PFCP_MSGT_SESSION_EST_RESP);
	resp = &tx->ies.session_est_resp;

	for (i = 0; i < req->create_far_count; i++) {
		struct far *far = far_create(session, &req->create_far[i]);
		if (!far)
			goto nack_response;
	}

	for (i = 0; i < req->create_pdr_count; i++) {
		struct pdr *pdr = pdr_create(session, &req->create_pdr[i],
					     &resp->cause,
					     &resp->offending_ie_present, &resp->offending_ie,
					     resp->created_pdr, &resp->created_pdr_count,
					     ARRAY_SIZE(resp->created_pdr));
		if (!pdr)
			goto nack_response;
	}

	resp->cause = up_session_setup_gtp(session);
	if (resp->cause != OSMO_PFCP_CAUSE_REQUEST_ACCEPTED)
		goto nack_response;

	/* Success, send ACK */
	osmo_pfcp_ie_f_seid_set(&resp->up_f_seid, session->up_seid,
				osmo_pfcp_endpoint_get_local_addr(peer->up_endpoint->pfcp_ep));
	resp->up_f_seid_present = true;

	rc = osmo_pfcp_endpoint_tx(peer->up_endpoint->pfcp_ep, tx);
	if (rc) {
		/* sending ACK failed, discard session. It might seem like a good idea to keep the session around,
		 * because the creation succeeded, only the ACK failed. But in the greater scheme of things, if we
		 * cannot ACK to the PFCP peer, all is lost. Rather not keep stale sessions around. */
		up_session_fsm_state_chg(UP_SESSION_ST_WAIT_USE_COUNT);
		return;
	}
	up_session_fsm_state_chg(UP_SESSION_ST_ESTABLISHED);
	return;

nack_response:
	resp->created_pdr_count = 0;
	osmo_pfcp_endpoint_tx(peer->up_endpoint->pfcp_ep, tx);
	/* No matter if sending the NACK succeeded or not, discard the session. */
	up_session_fsm_state_chg(UP_SESSION_ST_WAIT_USE_COUNT);
}

static void up_session_mod(struct up_session *session, struct osmo_pfcp_msg *m)
{
	struct osmo_fsm_inst *fi = session->fi;
	struct up_peer *peer = session->up_peer;
	struct osmo_pfcp_msg_session_mod_req *req = &m->ies.session_mod_req;
	struct osmo_pfcp_msg_session_mod_resp *resp;
	struct osmo_pfcp_msg *tx;
	int i;

	tx = up_session_init_tx(session, m, OSMO_PFCP_MSGT_SESSION_MOD_RESP);
	resp = &tx->ies.session_mod_resp;

	for (i = 0; i < req->remove_far_count; i++) {
		uint32_t far_id = req->remove_far[i].far_id;
		struct far *far = far_find(session, far_id);
		if (!far) {
			OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "Cannot remove, does not exist: FAR-%u\n", far_id);
			continue;
		}
		far_del(far);
	}

	for (i = 0; i < req->remove_pdr_count; i++) {
		uint16_t pdr_id = req->remove_pdr[i].pdr_id;
		struct pdr *pdr = pdr_find(session, pdr_id);
		if (!pdr) {
			OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "Cannot remove, does not exist: PDR-%u\n", pdr_id);
			continue;
		}
		pdr_del(pdr);
	}

	for (i = 0; i < req->create_far_count; i++) {
		struct far *far = far_create(session, &req->create_far[i]);
		if (!far)
			goto nack_response;
	}

	for (i = 0; i < req->create_pdr_count; i++) {
		struct pdr *pdr = pdr_create(session, &req->create_pdr[i],
					     &resp->cause,
					     &resp->offending_ie_present, &resp->offending_ie,
					     resp->created_pdr, &resp->created_pdr_count,
					     ARRAY_SIZE(resp->created_pdr));
		if (!pdr)
			goto nack_response;
	}

	for (i = 0; i < req->upd_far_count; i++) {
		uint32_t far_id = req->upd_far[i].far_id;
		struct far *far = far_find(session, req->upd_far[i].far_id);
		if (!far) {
			OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "Cannot update, does not exist: FAR-%u\n", far_id);
			goto nack_response;
		}
		far_upd(far, &req->upd_far[i]);
	}

	for (i = 0; i < req->upd_pdr_count; i++) {
		uint16_t pdr_id = req->upd_pdr[i].pdr_id;
		struct pdr *pdr = pdr_find(session, req->upd_pdr[i].pdr_id);
		if (!pdr) {
			OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "Cannot update, does not exist: PDR-%u\n", pdr_id);
			goto nack_response;
		}
		pdr_upd(pdr, &req->upd_pdr[i],
			&resp->cause,
			&resp->offending_ie_present, &resp->offending_ie,
			resp->updated_pdr, &resp->updated_pdr_count,
			ARRAY_SIZE(resp->updated_pdr));
	}

	resp->cause = up_session_setup_gtp(session);
	if (resp->cause != OSMO_PFCP_CAUSE_REQUEST_ACCEPTED)
		goto nack_response;

	/* Success, send ACK */
	if (osmo_pfcp_endpoint_tx(peer->up_endpoint->pfcp_ep, tx)) {
		/* sending ACK failed, discard session. It might seem like a good idea to keep the session around,
		 * because the modification succeeded, only the ACK failed. But in the greater scheme of things, if we
		 * cannot ACK to the PFCP peer, all is lost. Rather not keep stale sessions around. */
		up_session_fsm_state_chg(UP_SESSION_ST_WAIT_USE_COUNT);
		return;
	}

	LOGPFSML(fi, LOGL_NOTICE, "Session modified: %s\n", up_session_gtp_status(session));
	return;

nack_response:
	resp->created_pdr_count = 0;
	osmo_pfcp_endpoint_tx(peer->up_endpoint->pfcp_ep, tx);
	/* No matter if sending the NACK succeeded or not, discard the session. */
	up_session_fsm_state_chg(UP_SESSION_ST_WAIT_USE_COUNT);
}

static void up_session_del(struct up_session *session, struct osmo_pfcp_msg *m)
{
	struct osmo_fsm_inst *fi = session->fi;
	struct up_peer *peer = session->up_peer;
	struct osmo_pfcp_msg *tx;

	tx = up_session_init_tx(session, m, OSMO_PFCP_MSGT_SESSION_DEL_RESP);
	tx->ies.session_del_resp = (struct osmo_pfcp_msg_session_del_resp){
		.cause = OSMO_PFCP_CAUSE_REQUEST_ACCEPTED
	};
	osmo_pfcp_endpoint_tx(peer->up_endpoint->pfcp_ep, tx);
	/* No matter if sending the deletion ACK succeeded or not, discard the session. */
	up_session_fsm_state_chg(UP_SESSION_ST_WAIT_USE_COUNT);
}

static void up_session_init_action(struct osmo_fsm_inst *fi, uint32_t event, void *data)
{
	struct up_session *up_session = fi->priv;

	switch (event) {

	case UP_SESSION_EV_RX_SESSION_EST_REQ:
		up_session_est(up_session, data);
		break;

	case UP_SESSION_EV_RX_SESSION_DEL_REQ:
		up_session_del(up_session, data);
		break;

	case UP_SESSION_EV_USE_COUNT_ZERO:
		/* ignore */
		break;

	default:
		OSMO_ASSERT(false);
	}
}

static void up_session_established_onenter(struct osmo_fsm_inst *fi, uint32_t prev_state)
{
	struct up_session *session = fi->priv;
	LOGPFSML(fi, LOGL_NOTICE, "Session established: %s %s\n", up_session_to_str_c(OTC_SELECT, session),
		 up_session_gtp_status(session));
}

static void up_session_established_action(struct osmo_fsm_inst *fi, uint32_t event, void *data)
{
	struct up_session *up_session = fi->priv;

	switch (event) {

	case UP_SESSION_EV_RX_SESSION_MOD_REQ:
		up_session_mod(up_session, data);
		break;

	case UP_SESSION_EV_RX_SESSION_DEL_REQ:
		up_session_del(up_session, data);
		break;

	case UP_SESSION_EV_USE_COUNT_ZERO:
		/* ignore */
		break;

	default:
		OSMO_ASSERT(false);
	}
}

static void up_session_established_onleave(struct osmo_fsm_inst *fi, uint32_t next_state)
{
	struct up_session *session = fi->priv;
	struct up_gtp_action *a;
	LOGPFSML(fi, LOGL_NOTICE, "Session releasing: %s %s\n", up_session_to_str_c(OTC_SELECT, session), up_session_gtp_status(session));

	/* Shut down all active GTP rules */
	while ((a = llist_first_entry_or_null(&session->active_gtp_actions, struct up_gtp_action, entry))) {
		up_gtp_action_free(a);
	}
}


static void up_session_clear_pdr_far(struct up_session *session);
static void drop_gtp_actions(struct up_session *session);

int up_session_discard(struct up_session *session)
{
	struct osmo_fsm_inst *fi = session->fi;
	if (fi->state == UP_SESSION_ST_WAIT_USE_COUNT)
		return 0;
	up_session_fsm_state_chg(UP_SESSION_ST_WAIT_USE_COUNT);
	return 1;
}

static void up_session_wait_use_count_onenter(struct osmo_fsm_inst *fi, uint32_t prev_state)
{
	struct up_session *session = fi->priv;

	drop_gtp_actions(session);
	up_session_clear_pdr_far(session);

	if (!osmo_use_count_total(&session->use_count))
		osmo_fsm_inst_term(fi, OSMO_FSM_TERM_REGULAR, NULL);
}

static void up_session_wait_use_count_action(struct osmo_fsm_inst *fi, uint32_t event, void *data)
{
	switch (event) {

	case UP_SESSION_EV_USE_COUNT_ZERO:
		osmo_fsm_inst_term(fi, OSMO_FSM_TERM_REGULAR, NULL);
		return;

	default:
		OSMO_ASSERT(false);
	}
}

static void up_session_clear_pdr_far(struct up_session *session)
{
	struct pdr *pdr;
	struct far *far;
	while ((pdr = llist_first_entry_or_null(&session->pdrs, struct pdr, entry)))
		pdr_del(pdr);
	while ((far = llist_first_entry_or_null(&session->fars, struct far, entry)))
		far_del(far);
}

static void drop_gtp_actions(struct up_session *session);

static void up_session_fsm_cleanup(struct osmo_fsm_inst *fi, enum osmo_fsm_term_cause cause)
{
	struct up_session *session = fi->priv;

	drop_gtp_actions(session);

	up_session_clear_pdr_far(session);

	hash_del(&session->ep_node_by_up_seid);
	hash_del(&session->node_by_up_seid);
	hash_del(&session->node_by_cp_seid);
}

#define S(x)    (1 << (x))

static const struct osmo_fsm_state up_session_fsm_states[] = {
	[UP_SESSION_ST_INIT] = {
		.name = "INIT",
		.in_event_mask = 0
			| S(UP_SESSION_EV_RX_SESSION_EST_REQ)
			| S(UP_SESSION_EV_RX_SESSION_DEL_REQ)
			| S(UP_SESSION_EV_USE_COUNT_ZERO)
			,
		.out_state_mask = 0
			| S(UP_SESSION_ST_ESTABLISHED)
			| S(UP_SESSION_ST_WAIT_USE_COUNT)
			,
		.action = up_session_init_action,
	},
	[UP_SESSION_ST_ESTABLISHED] = {
		.name = "ESTABLISHED",
		.in_event_mask = 0
			| S(UP_SESSION_EV_RX_SESSION_MOD_REQ)
			| S(UP_SESSION_EV_RX_SESSION_DEL_REQ)
			| S(UP_SESSION_EV_USE_COUNT_ZERO)
			,
		.out_state_mask = 0
			| S(UP_SESSION_ST_WAIT_USE_COUNT)
			,
		.onenter = up_session_established_onenter,
		.action = up_session_established_action,
		.onleave = up_session_established_onleave,
	},
	[UP_SESSION_ST_WAIT_USE_COUNT] = {
		.name = "WAIT_USE_COUNT",
		.in_event_mask = 0
			| S(UP_SESSION_EV_USE_COUNT_ZERO)
			,
		.onenter = up_session_wait_use_count_onenter,
		.action = up_session_wait_use_count_action,
	},
};

static struct osmo_fsm up_session_fsm = {
	.name = "up_session",
	.states = up_session_fsm_states,
	.num_states = ARRAY_SIZE(up_session_fsm_states),
	.log_subsys = DSESSION,
	.event_names = up_session_fsm_event_names,
	.timer_cb = up_session_fsm_timer_cb,
	.cleanup = up_session_fsm_cleanup,
};

static __attribute__((constructor)) void up_session_fsm_register(void)
{
	OSMO_ASSERT(osmo_fsm_register(&up_session_fsm) == 0);
}

static int up_session_use_cb(struct osmo_use_count_entry *e, int32_t old_use_count, const char *file, int line)
{
	struct up_session *session = e->use_count->talloc_object;
	int32_t total;
	int level;

	if (!e->use)
		return -EINVAL;

	total = osmo_use_count_total(&session->use_count);

	if (total == 0
	    || (total == 1 && old_use_count == 0 && e->count == 1))
		level = LOGL_INFO;
	else
		level = LOGL_DEBUG;

	LOGPFSMSLSRC(session->fi, DREF, level, file, line,
		     "%s %s: now used by %s\n",
		     (e->count - old_use_count) > 0 ? "+" : "-", e->use,
		     osmo_use_count_to_str_c(OTC_SELECT, &session->use_count));

	if (e->count < 0)
		return -ERANGE;

	if (total == 0)
		osmo_fsm_inst_dispatch(session->fi, UP_SESSION_EV_USE_COUNT_ZERO, NULL);
	return 0;
}

static void up_session_update_id(struct up_session *session)
{
	osmo_fsm_inst_update_id_f_sanitize(session->fi, '-', "%s-0x%" PRIx64,
					   up_peer_remote_addr_str(session->up_peer),
					   session->up_seid);
	LOGPFSML(session->fi, LOGL_DEBUG, "Updated id\n");
}

static inline uint64_t up_session_key(uint64_t cp_seid, uint64_t up_seid)
{
	return cp_seid + up_seid;
}

static struct up_session *up_session_add(struct up_peer *peer, const struct osmo_pfcp_ie_f_seid *cp_f_seid)
{
	struct up_session *session;
	uint64_t up_seid = up_endpoint_next_up_seid(peer->up_endpoint);

	if (!up_seid)
		return NULL;

	struct osmo_fsm_inst *fi = osmo_fsm_inst_alloc_child(&up_session_fsm, peer->fi, UP_PEER_EV_SESSION_TERM);
	OSMO_ASSERT(fi);

	session = talloc(fi, struct up_session);
	OSMO_ASSERT(session);
	fi->priv = session;

	*session = (struct up_session) {
		.fi = fi,
		.up_peer = peer,
		.cp_f_seid = *cp_f_seid,
		.up_seid = up_seid,
		.use_count = {
			.talloc_object = session,
			.use_cb = up_session_use_cb,
		},
	};
	INIT_LLIST_HEAD(&session->pdrs);
	INIT_LLIST_HEAD(&session->fars);
	INIT_LLIST_HEAD(&session->chosen_f_teids);
	INIT_LLIST_HEAD(&session->active_gtp_actions);
	osmo_use_count_make_static_entries(&session->use_count, session->use_count_buf, ARRAY_SIZE(session->use_count_buf));
	LOGPFSML(session->fi, LOGL_INFO, "Allocated new UP-SEID: 0x%" PRIx64 "\n", session->up_seid);
	up_session_update_id(session);

	hash_add(peer->sessions_by_up_seid, &session->node_by_up_seid, session->up_seid);
	hash_add(peer->sessions_by_cp_seid, &session->node_by_cp_seid, session->cp_f_seid.seid);
	hash_add(peer->up_endpoint->sessions_by_up_seid, &session->ep_node_by_up_seid, session->up_seid);
	return session;
}

struct up_session *up_session_find_or_add(struct up_peer *peer, const struct osmo_pfcp_ie_f_seid *cp_f_seid)
{
	struct up_session *session;
	OSMO_ASSERT(cp_f_seid);
	session = up_session_find_by_cp_f_seid(peer, cp_f_seid);
	if (session)
		return session;

	return up_session_add(peer, cp_f_seid);
}

struct up_session *up_session_find_by_up_seid(struct up_peer *peer, uint64_t up_seid)
{
	struct up_session *session;
	hash_for_each_possible(peer->sessions_by_up_seid, session, node_by_up_seid, up_seid) {
		if (up_seid == session->up_seid)
			return session;
	}
	return NULL;
}

struct up_session *up_session_find_by_cp_f_seid(struct up_peer *peer, const struct osmo_pfcp_ie_f_seid *cp_f_seid)
{
	struct up_session *session;
	hash_for_each_possible(peer->sessions_by_cp_seid, session, node_by_cp_seid, cp_f_seid->seid) {
		if (osmo_pfcp_ie_f_seid_cmp(&session->cp_f_seid, cp_f_seid) == 0)
			return session;
	}
	return NULL;
}

static bool action_is_forw(const struct osmo_pfcp_ie_apply_action *aa)
{
	return osmo_pfcp_bits_get(aa->bits, OSMO_PFCP_APPLY_ACTION_FORW)
		&& !osmo_pfcp_bits_get(aa->bits, OSMO_PFCP_APPLY_ACTION_DROP);
}

static void pdr_classify(struct pdr *pdr)
{
	pdr->rx_decaps = false;
	pdr->forw_encaps = false;
	pdr->access_to_core = false;
	pdr->core_to_access = false;
	if (!pdr->far)
		return;

	pdr->rx_decaps = (pdr->desc.outer_header_removal_present
			  && pdr->desc.outer_header_removal.desc == OSMO_PFCP_OUTER_HEADER_REMOVAL_GTP_U_UDP_IPV4);
	pdr->forw_encaps = (pdr->far->desc.forw_params_present
			    && pdr->far->desc.forw_params.outer_header_creation_present);

	if (!action_is_forw(&pdr->far->desc.apply_action))
		return;

	pdr->access_to_core = (pdr->desc.pdi.source_iface == OSMO_PFCP_SOURCE_IFACE_ACCESS
			       && pdr->far->desc.forw_params.destination_iface == OSMO_PFCP_DEST_IFACE_CORE);

	pdr->core_to_access = (pdr->desc.pdi.source_iface == OSMO_PFCP_SOURCE_IFACE_CORE
			       && pdr->far->desc.forw_params.destination_iface == OSMO_PFCP_DEST_IFACE_ACCESS);
}

void pdr_reverse_set(struct pdr *a, struct pdr *b)
{
	if (a)
		a->reverse_pdr = b;
	if (b)
		b->reverse_pdr = a;
}

void pdr_reverse_unset(struct pdr *pdr)
{
	if (!pdr->reverse_pdr)
		return;
	pdr->reverse_pdr->reverse_pdr = NULL;
	pdr->reverse_pdr = NULL;
}

/* Log that a PDR (and its reverse-PDR) is inactive.
 * \param pdr  The Access-to-Core PDR.
 * \param desc  Why it is inactive.
 * \param pdr_to_str  The PDR that desc describes, can be pdr or the reverse Core-to-Access PDR.
 */
static void log_inactive_pdr_set(struct pdr *pdr, const char *desc, const struct pdr *pdr_to_str)
{
	struct pdr *rpdr = pdr->reverse_pdr;
	pdr_to_str = pdr_to_str ? : pdr;
	osmo_talloc_replace_string_fmt(pdr, &pdr->inactive_reason, "%s (in PDR-%u %s)", desc,
				       pdr_to_str->desc.pdr_id,
				       osmo_pfcp_source_iface_str(pdr_to_str->desc.pdi.source_iface));
	if (rpdr)
		osmo_talloc_replace_string_fmt(rpdr, &rpdr->inactive_reason, "%s (in PDR-%u %s)", desc,
					       pdr_to_str->desc.pdr_id,
					       osmo_pfcp_source_iface_str(pdr_to_str->desc.pdi.source_iface));
	if (rpdr)
		LOGPFSML(pdr->session->fi, LOGL_INFO, "Inactive set: PDR-%u + PDR-%u: %s: %s\n",
			 pdr->desc.pdr_id, rpdr->desc.pdr_id, desc, pdr_to_str_c(OTC_SELECT, pdr_to_str));
	else
		LOGPFSML(pdr->session->fi, LOGL_INFO, "Inactive: PDR-%u: %s: %s\n",
			 pdr->desc.pdr_id, desc, pdr_to_str_c(OTC_SELECT, pdr_to_str));
}

/* A GTP tunnel on Access side, plain IP on Core side.
 * The given PDR must have an outer-header-removal and a local F-TEID.
 * Its reverse-PDR must have a UE address flagged as "Destination" IP addr.
 * Its reverse-PDR's FAR must have an outer-header creation with a remote TEID.
 * \param pdr  A rule detecting packets on Access, where pdr->reverse_pdr detects packets on Core.
 */
static void add_gtp_action_tunend(void *ctx, struct llist_head *dst, struct pdr *pdr)
{
	struct up_session *session = pdr->session;
	struct up_gtp_action *a;
	struct pdr *rpdr;
	struct far *rfar;
	struct osmo_pfcp_ie_forw_params *rfar_forw;

	OSMO_ASSERT(pdr->far);
	OSMO_ASSERT(pdr->reverse_pdr);
	OSMO_ASSERT(pdr->reverse_pdr->far);
	rpdr = pdr->reverse_pdr;
	rfar = rpdr->far;
	rfar_forw = &rfar->desc.forw_params;

	OSMO_ASSERT(pdr->access_to_core);
	OSMO_ASSERT(rpdr->core_to_access);

	/* To decaps incoming on Access, we need to have a local F-TEID assigned for which to receive GTP packets. */
	if (!pdr->local_f_teid || pdr->local_f_teid->choose_flag) {
		log_inactive_pdr_set(pdr, "missing local F-TEID", pdr);
		return;
	}

	/* To encaps outgoing on Access, we need to have a remote F-TEID assigned to send out in GTP packets */
	if (!rfar->desc.forw_params_present) {
		log_inactive_pdr_set(pdr, "missing FAR Forwarding Parameters", rpdr);
		return;
	}
	if (!rfar_forw->outer_header_creation_present) {
		log_inactive_pdr_set(pdr, "missing FAR Outer Header Creation", rpdr);
		return;
	}
	if (!rfar_forw->outer_header_creation.teid_present) {
		log_inactive_pdr_set(pdr, "missing TEID in FAR Outer Header Creation", rpdr);
		return;
	}
	if (!rfar_forw->outer_header_creation.ip_addr.v4_present) {
		log_inactive_pdr_set(pdr, "missing IPv4 in FAR Outer Header Creation", rpdr);
		return;
	}

	/* To receive IP packets incoming on Core, we need to know the assigned IP address for the UE */
	if (!rpdr->desc.pdi.ue_ip_address_present) {
		log_inactive_pdr_set(pdr, "missing UE IP Address in PDI", rpdr);
		return;
	}
	if (!rpdr->desc.pdi.ue_ip_address.ip_addr.v4_present) {
		log_inactive_pdr_set(pdr, "UE IP Address in PDI is not IPv4", rpdr);
		return;
	}
	if (!rpdr->desc.pdi.ue_ip_address.ip_is_destination) {
		log_inactive_pdr_set(pdr, "UE IP Address in PDI is not flagged as destination", rpdr);
		return;
	}

	pdr->active = true;
	pdr->far->active = true;
	rpdr->active = true;
	rpdr->far->active = true;
	LOGPFSML(session->fi, LOGL_DEBUG, "Active PDR set:   %s\n", pdr_to_str_c(OTC_SELECT, pdr));
	LOGPFSML(session->fi, LOGL_DEBUG, "Active PDR set: + %s\n", pdr_to_str_c(OTC_SELECT, rpdr));

	talloc_free(pdr->inactive_reason);
	pdr->inactive_reason = NULL;
	talloc_free(rpdr->inactive_reason);
	rpdr->inactive_reason = NULL;

	a = up_gtp_action_alloc(ctx, session, UP_GTP_U_TUNEND, dst);
	a->pdr_access = pdr->desc.pdr_id;
	a->pdr_core = rpdr->desc.pdr_id;
	a->tunend.access.local.addr = pdr->local_f_teid->fixed.ip_addr.v4;
	a->tunend.access.local.teid = pdr->local_f_teid->fixed.teid;
	a->tunend.access.remote.addr = rfar_forw->outer_header_creation.ip_addr.v4;
	a->tunend.access.remote.teid = rfar_forw->outer_header_creation.teid;
	a->tunend.core.ue_local_addr = rpdr->desc.pdi.ue_ip_address.ip_addr.v4;
}

/* A GTP tunnel on Access side, mapping to another GTP tunnel on Core side and vice versa.
 * The PDR and its reverse PDR must both have an outer-header-removal and a local F-TEID.
 * Both FARs must have an outer-header creation with a remote F-TEID.
 * \param pdr  A rule detecting packets on Access, where pdr->reverse_pdr detects packets on Core.
 */
static void add_gtp_action_tunmap(void *ctx, struct llist_head *dst, struct pdr *pdr)
{
	struct up_session *session = pdr->session;
	struct up_gtp_action *a;
	struct pdr *rpdr;
	struct far *far;
	struct osmo_pfcp_ie_forw_params *far_forw;
	struct far *rfar;
	struct osmo_pfcp_ie_forw_params *rfar_forw;

	OSMO_ASSERT(pdr->far);
	OSMO_ASSERT(pdr->reverse_pdr);
	OSMO_ASSERT(pdr->reverse_pdr->far);

	far = pdr->far;
	far_forw = &far->desc.forw_params;
	rpdr = pdr->reverse_pdr;
	rfar = rpdr->far;
	rfar_forw = &rfar->desc.forw_params;

	OSMO_ASSERT(pdr->access_to_core);
	OSMO_ASSERT(rpdr->core_to_access);

	/* To decaps incoming on Access, we need to have a local F-TEID assigned for which to receive GTP packets. */
	if (!pdr->local_f_teid || pdr->local_f_teid->choose_flag) {
		log_inactive_pdr_set(pdr, "missing local F-TEID (Access side)", pdr);
		return;
	}
	/* To decaps incoming on Core, we need to have a local F-TEID assigned for which to receive GTP packets. */
	if (!rpdr->local_f_teid || rpdr->local_f_teid->choose_flag) {
		log_inactive_pdr_set(pdr, "missing local F-TEID (Core side)", pdr);
		return;
	}

	/* To encaps outgoing on Core, we need to have a remote F-TEID assigned to send out in GTP packets */
	if (!far->desc.forw_params_present) {
		log_inactive_pdr_set(pdr, "missing FAR Forwarding Parameters (Access side)", pdr);
		return;
	}
	if (!far_forw->outer_header_creation_present) {
		log_inactive_pdr_set(pdr, "missing FAR Outer Header Creation (Access side)", pdr);
		return;
	}
	if (!far_forw->outer_header_creation.teid_present) {
		log_inactive_pdr_set(pdr, "missing TEID in FAR Outer Header Creation (Access side)", pdr);
		return;
	}
	if (!far_forw->outer_header_creation.ip_addr.v4_present) {
		log_inactive_pdr_set(pdr, "missing IPv4 in FAR Outer Header Creation (Access side)", pdr);
		return;
	}

	/* To encaps outgoing on Access, we need to have a remote F-TEID assigned to send out in GTP packets */
	if (!rfar->desc.forw_params_present) {
		log_inactive_pdr_set(pdr, "missing FAR Forwarding Parameters (Access side)", rpdr);
		return;
	}
	if (!rfar_forw->outer_header_creation_present) {
		log_inactive_pdr_set(pdr, "missing FAR Outer Header Creation (Access side)", rpdr);
		return;
	}
	if (!rfar_forw->outer_header_creation.teid_present) {
		log_inactive_pdr_set(pdr, "missing TEID in FAR Outer Header Creation (Access side)", rpdr);
		return;
	}
	if (!rfar_forw->outer_header_creation.ip_addr.v4_present) {
		log_inactive_pdr_set(pdr, "missing IPv4 in FAR Outer Header Creation (Access side)", rpdr);
		return;
	}

	pdr->active = true;
	far->active = true;
	rpdr->active = true;
	rfar->active = true;
	LOGPFSML(session->fi, LOGL_DEBUG, "Active PDR set:   %s\n", pdr_to_str_c(OTC_SELECT, pdr));
	LOGPFSML(session->fi, LOGL_DEBUG, "Active PDR set: + %s\n", pdr_to_str_c(OTC_SELECT, rpdr));

	talloc_free(pdr->inactive_reason);
	pdr->inactive_reason = NULL;
	talloc_free(rpdr->inactive_reason);
	rpdr->inactive_reason = NULL;

	a = up_gtp_action_alloc(ctx, session, UP_GTP_U_TUNMAP, dst);
	a->pdr_access = pdr->desc.pdr_id;
	a->pdr_core = rpdr->desc.pdr_id;
	a->tunmap.access.tun.local.addr = pdr->local_f_teid->fixed.ip_addr.v4;
	a->tunmap.access.tun.local.teid = pdr->local_f_teid->fixed.teid;
	a->tunmap.access.tun.remote.addr = rfar_forw->outer_header_creation.ip_addr.v4;
	a->tunmap.access.tun.remote.teid = rfar_forw->outer_header_creation.teid;
	a->tunmap.core.tun.local.addr = rpdr->local_f_teid->fixed.ip_addr.v4;
	a->tunmap.core.tun.local.teid = rpdr->local_f_teid->fixed.teid;
	a->tunmap.core.tun.remote.addr = far_forw->outer_header_creation.ip_addr.v4;
	a->tunmap.core.tun.remote.teid = far_forw->outer_header_creation.teid;
}

/* Analyse all PDRs and FARs and find configurations that match either a GTP encaps/decaps or a GTP forward rule. Add to
 * list dst, containing struct up_gtp_action instances allocated from ctx. */
static enum osmo_pfcp_cause find_gtp_actions(void *ctx, struct llist_head *dst, struct up_session *session)
{
	struct far *far;
	struct pdr *pdr;

	llist_for_each_entry(far, &session->fars, entry) {
		far->active = false;
	}

	llist_for_each_entry(pdr, &session->pdrs, entry) {
		pdr->reverse_pdr = NULL;
		pdr_classify(pdr);
		pdr->active = false;
	}

	llist_for_each_entry(pdr, &session->pdrs, entry) {
		struct pdr *other;

		/* Already paired up in an earlier iteration? */
		if (pdr->reverse_pdr)
			continue;

		/* In this outer loop, only follow the access_to_core directed PDRs, in the inner loop find the matching
		 * core_to_access PDR. i.e. we are looking only at PDRs detecting packets on the Access side, pairing up
		 * with "reverse PDRs" detecting packets on the Core side. */
		if (!pdr->access_to_core)
			continue;

		/* If a required local addr + TEID is not known, we cannot pair this PDR up */
		if (pdr->rx_decaps && !pdr->local_f_teid)
			continue;

		/* Try to find a matching PDR+FAR that points in the reverse direction. */
		llist_for_each_entry(other, &session->pdrs, entry) {
			/* Already paired up in an earlier iteration? */
			if (other->reverse_pdr)
				continue;

			/* Looking for a PDR facing the other way */
			if (!other->core_to_access)
				continue;
			/* GTP header-ness must match, in reverse. */
			if (pdr->rx_decaps != other->forw_encaps
			    || pdr->forw_encaps != other->rx_decaps)
				continue;

			/* TEID: when adding a GTP header, we only know the remote side TEID sent out.
			 * When removing a GTP header, we only know the local TEID that the remote side sends here.
			 * So can't match up TEIDs. */

			/* Just match these up, simply based on direction and encaps/decaps match */
			pdr_reverse_set(pdr, other);
		}
	}

	/* Iterate again to create the GTP actions (and debug log what is going on) */
	llist_for_each_entry(pdr, &session->pdrs, entry) {
		if (!pdr->reverse_pdr) {
			LOGPFSML(session->fi, LOGL_INFO, "Inactive PDR: no matching reverse PDR for: %s\n", pdr_to_str_c(OTC_SELECT, pdr));
			continue;
		}

		/* Iterate in direction Access-to-Core, where pdr->reverse_pdr will be the Core-to-Access counterpart. */
		if (!pdr->access_to_core)
			continue;

		if (pdr->rx_decaps && !pdr->forw_encaps)
			add_gtp_action_tunend(ctx, dst, pdr);
		else if (pdr->rx_decaps && pdr->forw_encaps)
			add_gtp_action_tunmap(ctx, dst, pdr);
		else {
			/* log the details of both PDRs in two separate log lines */
			log_inactive_pdr_set(pdr, "not implemented", pdr);
			log_inactive_pdr_set(pdr, "not implemented", pdr->reverse_pdr);
		}

	}

	return OSMO_PFCP_CAUSE_REQUEST_ACCEPTED;
}

static enum osmo_pfcp_cause setup_gtp_actions(struct up_session *session, struct llist_head *want)
{
	struct up_gtp_action *a, *a_safe;
	struct up_gtp_action *w, *w_safe;

	LOGPFSML(session->fi, LOGL_DEBUG, "GTP actions: %u previously active; want active: %u\n",
		 llist_count(&session->active_gtp_actions), llist_count(want));

	llist_for_each_entry(w, want, entry) {
		LOGPFSML(session->fi, LOGL_DEBUG, "want: %s\n", up_gtp_action_to_str_c(OTC_SELECT, w));
		w->handle = NULL;
	}

	/* Match up the wanted GTP tunnels with the already active ones */
	llist_for_each_entry(a, &session->active_gtp_actions, entry) {
		LOGPFSML(session->fi, LOGL_DEBUG, "active: %s\n", up_gtp_action_to_str_c(OTC_SELECT, a));
		a->handle = NULL;
		llist_for_each_entry(w, want, entry) {
			/* Already matched up? */
			if (w->handle)
				continue;
			if (up_gtp_action_cmp(a, w))
				continue;
			/* Found a match, mark. */
			a->handle = w;
			w->handle = a;
		}
	}

	/* At this point, all matching entries in session->active_gtp_actions and the 'want' list have a handle != NULL.
	 * If handle == NULL in active_gtp_actions, it means it has no match in the wanted list and is to be torn down.
	 * If handle == NULL in 'want', it means it has no match in the active list and should be created. */

	/* Shut down all active GTP rules that no longer appear in the session setup. */
	llist_for_each_entry_safe(a, a_safe, &session->active_gtp_actions, entry) {
		if (a->handle)
			continue;

		LOGPFSML(session->fi, LOGL_DEBUG, "disabling: %s\n", up_gtp_action_to_str_c(OTC_SELECT, a));
		up_gtp_action_free(a);
	}

	/* Set up all GTP tunnels requested in the session setup, but not active yet */
	llist_for_each_entry_safe(w, w_safe, want, entry) {
		if (w->handle)
			continue;

		LOGPFSML(session->fi, LOGL_DEBUG, "enabling: %s\n", up_gtp_action_to_str_c(OTC_SELECT, w));

		/* If enabling fails, don't add to the active list. Error logging is done in up_gtp_action_enable(). */
		if (up_gtp_action_enable(w))
			continue;

		/* Successfully activated, move the entry from 'want' to 'active_gtp_actions' */
		llist_del(&w->entry);
		talloc_steal(session, w);
		llist_add_tail(&w->entry, &session->active_gtp_actions);
	}

	return OSMO_PFCP_CAUSE_REQUEST_ACCEPTED;
}

static void drop_gtp_actions(struct up_session *session)
{
	struct llist_head empty;
	INIT_LLIST_HEAD(&empty);
	/* Call setup_gtp_actions() with an empty list, to clean up all active GTP actions */
	setup_gtp_actions(session, &empty);
}

/* Check whether the Packet Detection and Forwarding Action Rules amount to an encaps/decaps of GTP or a GTP tunnel
 * mapping, or none of the two. */
static enum osmo_pfcp_cause up_session_setup_gtp(struct up_session *session)
{
	enum osmo_pfcp_cause cause;
	struct llist_head want_gtp_actions;
	INIT_LLIST_HEAD(&want_gtp_actions);
	cause = find_gtp_actions(OTC_SELECT, &want_gtp_actions, session);
	if (cause != OSMO_PFCP_CAUSE_REQUEST_ACCEPTED)
		return cause;
	cause = setup_gtp_actions(session, &want_gtp_actions);
	return cause;
}

/* Return true when the session is in Established state and has active GTP actions. */
bool up_session_is_active(struct up_session *session)
{
	return session && (session->fi->state == UP_SESSION_ST_ESTABLISHED) && !llist_empty(&session->active_gtp_actions);
}

/* Return true when up_session_is_active() == true *and* it has only active PDR/FAR pairs.
 * A PDR/FAR is inactive when it is not part of an active GTP action. Reasons may be that it has no PDR-to-FAR relation,
 * there is no matching reverse PDR/FAR, that a FAR is not set to FORW, an ignored Source/Destination Interface, ...
 */
bool up_session_is_fully_active(struct up_session *session, int *active_p, int *inactive_p)
{
	struct pdr *pdr;
	int inactive = 0;
	int active = 0;
	llist_for_each_entry(pdr, &session->pdrs, entry) {
		if (pdr->active)
			active++;
		else
			inactive++;
	}
	if (!up_session_is_active(session)) {
		inactive += active;
		active = 0;
	}
	if (active_p)
		*active_p = active;
	if (inactive_p)
		*inactive_p = inactive;
	return active && !inactive;
}

int up_session_to_str_buf(char *buf, size_t buflen, struct up_session *session)
{
	struct osmo_strbuf sb = { .buf = buf, .len = buflen };
	if (!session) {
		OSMO_STRBUF_PRINTF(sb, "session=null");
		return sb.chars_needed;
	}
	OSMO_STRBUF_PRINTF(sb, "peer:%s SEID-r:0x%"PRIx64" SEID-l:0x%"PRIx64" state:%s",
			   up_peer_remote_addr_str(session->up_peer),
			   session->cp_f_seid.seid, session->up_seid,
			   osmo_fsm_inst_state_name(session->fi));
	return sb.chars_needed;
}

char *up_session_to_str_c(void *ctx, struct up_session *session)
{
	OSMO_NAME_C_IMPL(ctx, 128, "ERROR", up_session_to_str_buf, session)
}