/* Processing Queue Management */
/*
* This file is part of gapk (GSM Audio Pocket Knife).
*
* gapk is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* gapk is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with gapk. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
/* Internal root talloc context */
extern TALLOC_CTX *gapk_root_ctx;
/* crate a new (empty) processing queue */
struct osmo_gapk_pq *
osmo_gapk_pq_create(const char *name)
{
struct osmo_gapk_pq *pq;
/* Allocate memory for a new processing queue */
pq = talloc_zero(gapk_root_ctx, struct osmo_gapk_pq);
if (!pq)
return NULL;
if (name != NULL) {
/* Rename talloc context */
talloc_set_name(pq, "struct osmo_gapk_pq '%s'", name);
/* Set queue name */
pq->name = name;
}
/* Init its list of items */
INIT_LLIST_HEAD(&pq->items);
return pq;
}
/*! destroy a processing queue, calls exit() callback of each item
* \param[in] pq Processing Queue to be destroyed */
void
osmo_gapk_pq_destroy(struct osmo_gapk_pq *pq)
{
struct osmo_gapk_pq_item *item, *item_next;
if (!pq)
return;
/* Iterate over all items in queue */
llist_for_each_entry_safe(item, item_next, &pq->items, list) {
/* Free output buffer memory */
talloc_free(item->buf);
/* Call exit handler if preset */
if (item->exit)
item->exit(item->state);
/* Delete an item from list */
llist_del(&item->list);
talloc_free(item);
}
talloc_free(pq);
}
/*! allocate + add an item to a processing queue; return new item
* \param[in] pq Processing Queue to which item is added
* \returns new PQ item; NULL on error */
struct osmo_gapk_pq_item *
osmo_gapk_pq_add_item(struct osmo_gapk_pq *pq)
{
struct osmo_gapk_pq_item *item;
/* Allocate memory for a new item */
item = talloc_zero(pq, struct osmo_gapk_pq_item);
if (!item)
return NULL;
/* Add one to the end of a queue */
llist_add_tail(&item->list, &pq->items);
/* Increase the items count */
pq->n_items++;
return item;
}
/*! check a processing queue; make sure I/O data lengths are equal
* \param[in] pq Make sure both source and sink are preset
* \param[in] strict Processing Queue to be checked
* \returns 0 on succcess; negative on error */
int
osmo_gapk_pq_check(struct osmo_gapk_pq *pq, int strict)
{
struct osmo_gapk_pq_item *item_prev = NULL;
struct osmo_gapk_pq_item *item;
/* Make sure I/O data lengths are equal */
llist_for_each_entry(item, &pq->items, list) {
if (item_prev && item->len_in) {
if (item->len_in != item_prev->len_out) {
LOGPGAPK(LOGL_ERROR, "PQ '%s': item '%s/%s' requires "
"input size %u, but previous '%s/%s' has %u\n",
pq->name, item->cat_name, item->sub_name,
item->len_in, item_prev->cat_name,
item_prev->sub_name, item_prev->len_out);
return -EINVAL;
}
}
/* Save pointer to the previous item */
item_prev = item;
}
if (strict) {
/* Make sure the first item is a source */
item = llist_first_entry(&pq->items,
struct osmo_gapk_pq_item, list);
if (item->type != OSMO_GAPK_ITEM_TYPE_SOURCE)
goto src_sink_err;
/* Make sure the last item is a sink */
item = llist_last_entry(&pq->items,
struct osmo_gapk_pq_item, list);
if (item->type != OSMO_GAPK_ITEM_TYPE_SINK)
goto src_sink_err;
}
return 0;
src_sink_err:
LOGPGAPK(LOGL_ERROR, "PQ '%s': the first item should be a source, "
"and the last one should be a sink\n", pq->name);
return -EINVAL;
}
/*! prepare a processing queue; allocates buffers
* \param[in] pq Processing Queue to be prepared
* \returns 0 on succcess; negative on error */
int
osmo_gapk_pq_prepare(struct osmo_gapk_pq *pq)
{
struct osmo_gapk_pq_item *item;
unsigned int buf_size;
/* Iterate over all items in queue */
llist_for_each_entry(item, &pq->items, list) {
/* The sink item doesn't require an output buffer */
if (item->type == OSMO_GAPK_ITEM_TYPE_SINK)
continue;
buf_size = item->len_out;
/**
* Use maximum known buffer size
* for variable-length codec output
*/
if (!buf_size)
buf_size = VAR_BUF_SIZE;
/* Allocate memory for an output buffer */
item->buf = talloc_named_const(item, buf_size, ".buffer");
if (!item->buf)
return -ENOMEM;
}
return 0;
}
/*! execute a processing queue; iterate over processing elements
* \param[in] pq Processing Queue to be executed
* \returns 0 on success; negative on error (if any item returns negative) */
int
osmo_gapk_pq_execute(struct osmo_gapk_pq *pq)
{
struct osmo_gapk_pq_item *item;
unsigned int len_prev = 0;
uint8_t *buf_prev = NULL;
int rv;
/* Iterate over all items in queue */
llist_for_each_entry(item, &pq->items, list) {
/* Call item's processing handler */
rv = item->proc(item->state, item->buf, buf_prev, len_prev);
if (rv < 0) {
LOGPGAPK(LOGL_ERROR, "PQ '%s': execution aborted: "
"item '%s/%s' returned %d\n", pq->name,
item->cat_name, item->sub_name, rv);
return rv;
}
buf_prev = item->buf;
len_prev = rv;
}
return 0;
}
char *
osmo_gapk_pq_describe(struct osmo_gapk_pq *pq)
{
struct osmo_gapk_pq_item *item;
char *result = NULL;
int i = 0;
/* Nothing to describe */
if (!pq->n_items)
return NULL;
/* Iterate over all items in queue */
llist_for_each_entry(item, &pq->items, list) {
result = talloc_asprintf_append(result, "%s/%s%s",
item->cat_name, item->sub_name,
++i < pq->n_items ? " -> " : "");
}
/* Change talloc context name */
talloc_set_name_const(result, ".description");
/* Change parent talloc context to pq */
talloc_steal(pq, result);
return result;
}