/* Processing Queue Management */
/*
 * This file is part of gapk (GSM Audio Pocket Knife).
 *
 * gapk is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * gapk is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with gapk.  If not, see .
 */
#include 
#include 
#include 
#include 
#include 
#include 
#include 
/* Internal root talloc context */
extern TALLOC_CTX *gapk_root_ctx;
/* crate a new (empty) processing queue */
struct osmo_gapk_pq *
osmo_gapk_pq_create(const char *name)
{
	struct osmo_gapk_pq *pq;
	/* Allocate memory for a new processing queue */
	pq = talloc_zero(gapk_root_ctx, struct osmo_gapk_pq);
	if (!pq)
		return NULL;
	if (name != NULL) {
		/* Rename talloc context */
		talloc_set_name(pq, "struct osmo_gapk_pq '%s'", name);
		/* Set queue name */
		pq->name = name;
	}
	/* Init its list of items */
	INIT_LLIST_HEAD(&pq->items);
	return pq;
}
/*! destroy a processing queue, calls exit() callback of each item
 *  \param[in] pq Processing Queue to be destroyed */
void
osmo_gapk_pq_destroy(struct osmo_gapk_pq *pq)
{
	struct osmo_gapk_pq_item *item, *item_next;
	if (!pq)
		return;
	/* Iterate over all items in queue */
	llist_for_each_entry_safe(item, item_next, &pq->items, list) {
		/* Free output buffer memory */
		talloc_free(item->buf);
		/* Call exit handler if preset */
		if (item->exit)
			item->exit(item->state);
		/* Delete an item from list */
		llist_del(&item->list);
		talloc_free(item);
	}
	talloc_free(pq);
}
/*! allocate + add an item to a processing queue; return new item
 *  \param[in] pq Processing Queue to which item is added
 *  \returns new PQ item; NULL on error */
struct osmo_gapk_pq_item *
osmo_gapk_pq_add_item(struct osmo_gapk_pq *pq)
{
	struct osmo_gapk_pq_item *item;
	/* Allocate memory for a new item */
	item = talloc_zero(pq, struct osmo_gapk_pq_item);
	if (!item)
		return NULL;
	/* Add one to the end of a queue */
	llist_add_tail(&item->list, &pq->items);
	/* Increase the items count */
	pq->n_items++;
	return item;
}
/*! check a processing queue; make sure I/O data lengths are equal
 *  \param[in] pq     Make sure both source and sink are preset
 *  \param[in] strict Processing Queue to be checked
 *  \returns 0 on succcess; negative on error */
int
osmo_gapk_pq_check(struct osmo_gapk_pq *pq, int strict)
{
	struct osmo_gapk_pq_item *item_prev = NULL;
	struct osmo_gapk_pq_item *item;
	/* Make sure I/O data lengths are equal */
	llist_for_each_entry(item, &pq->items, list) {
		if (item_prev && item->len_in) {
			if (item->len_in != item_prev->len_out) {
				LOGPGAPK(LOGL_ERROR, "PQ '%s': item '%s/%s' requires "
					"input size %u, but previous '%s/%s' has %u\n",
					pq->name, item->cat_name, item->sub_name,
					item->len_in, item_prev->cat_name,
					item_prev->sub_name, item_prev->len_out);
				return -EINVAL;
			}
		}
		/* Save pointer to the previous item */
		item_prev = item;
	}
	if (strict) {
		/* Make sure the first item is a source */
		item = llist_first_entry(&pq->items,
			struct osmo_gapk_pq_item, list);
		if (item->type != OSMO_GAPK_ITEM_TYPE_SOURCE)
			goto src_sink_err;
		/* Make sure the last item is a sink */
		item = llist_last_entry(&pq->items,
			struct osmo_gapk_pq_item, list);
		if (item->type != OSMO_GAPK_ITEM_TYPE_SINK)
			goto src_sink_err;
	}
	return 0;
src_sink_err:
	LOGPGAPK(LOGL_ERROR, "PQ '%s': the first item should be a source, "
		"and the last one should be a sink\n", pq->name);
	return -EINVAL;
}
/*! prepare a processing queue; allocates buffers
 *  \param[in] pq Processing Queue to be prepared
 *  \returns 0 on succcess; negative on error */
int
osmo_gapk_pq_prepare(struct osmo_gapk_pq *pq)
{
	struct osmo_gapk_pq_item *item;
	unsigned int buf_size;
	/* Iterate over all items in queue */
	llist_for_each_entry(item, &pq->items, list) {
		/* The sink item doesn't require an output buffer */
		if (item->type == OSMO_GAPK_ITEM_TYPE_SINK)
			continue;
		buf_size = item->len_out;
		/**
		 * Use maximum known buffer size
		 * for variable-length codec output
		 */
		if (!buf_size)
			buf_size = VAR_BUF_SIZE;
		/* Allocate memory for an output buffer */
		item->buf = talloc_named_const(item, buf_size, ".buffer");
		if (!item->buf)
			return -ENOMEM;
	}
	return 0;
}
/*! execute a processing queue; iterate over processing elements
 *  \param[in] pq Processing Queue to be executed
 *  \returns 0 on success; negative on error (if any item returns negative) */
int
osmo_gapk_pq_execute(struct osmo_gapk_pq *pq)
{
	struct osmo_gapk_pq_item *item;
	unsigned int len_prev = 0;
	uint8_t *buf_prev = NULL;
	int rv;
	/* Iterate over all items in queue */
	llist_for_each_entry(item, &pq->items, list) {
		/* Call item's processing handler */
		rv = item->proc(item->state, item->buf, buf_prev, len_prev);
		if (rv < 0) {
			LOGPGAPK(LOGL_ERROR, "PQ '%s': execution aborted: "
				"item '%s/%s' returned %d\n", pq->name,
				item->cat_name, item->sub_name, rv);
			return rv;
		}
		buf_prev = item->buf;
		len_prev = rv;
	}
	return 0;
}
char *
osmo_gapk_pq_describe(struct osmo_gapk_pq *pq)
{
	struct osmo_gapk_pq_item *item;
	char *result = NULL;
	int i = 0;
	/* Nothing to describe */
	if (!pq->n_items)
		return NULL;
	/* Iterate over all items in queue */
	llist_for_each_entry(item, &pq->items, list) {
		result = talloc_asprintf_append(result, "%s/%s%s",
			item->cat_name, item->sub_name,
			++i < pq->n_items ? " -> " : "");
	}
	/* Change talloc context name */
	talloc_set_name_const(result, ".description");
	/* Change parent talloc context to pq */
	talloc_steal(pq, result);
	return result;
}