/*
* Copyright 2008, 2011 Free Software Foundation, Inc.
*
* SPDX-License-Identifier: AGPL-3.0+
*
* This software is distributed under the terms of the GNU Affero Public License.
* See the COPYING file in the main directory for details.
*
* This use of this software may be subject to additional restrictions.
* See the LEGAL file in the main directory for details.

	This program is free software: you can redistribute it and/or modify
	it under the terms of the GNU Affero General Public License as published by
	the Free Software Foundation, either version 3 of the License, or
	(at your option) any later version.

	This program is distributed in the hope that it will be useful,
	but WITHOUT ANY WARRANTY; without even the implied warranty of
	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
	GNU Affero General Public License for more details.

	You should have received a copy of the GNU Affero General Public License
	along with this program.  If not, see <http://www.gnu.org/licenses/>.

*/


#ifndef INTERTHREAD_H
#define INTERTHREAD_H

#include "Timeval.h"
#include "Threads.h"
#include "LinkedLists.h"
#include <map>
#include <vector>
#include <queue>





/**@defgroup Templates for interthread mechanisms. */
//@{


/** Pointer FIFO for interthread operations.  */
// (pat) The elements in the queue are type T*, and
// the Fifo class implements the underlying queue.
// The default is class PointerFIFO, which does not place any restrictions on the type of T,
// and is implemented by allocating auxiliary structures for the queue,
// or SingleLinkedList, which implements the queue using an internal pointer in type T,
// which must implement the functional interface of class SingleLinkListNode,
// namely: functions T*next() and void setNext(T*).
template <class T, class Fifo=PointerFIFO> class InterthreadQueue {

	protected:

	Fifo mQ;
	mutable Mutex mLock;
	mutable Signal mWriteSignal;

	public:

	/** Delete contents. */
	void clear()
	{
		ScopedLock lock(mLock);
		while (mQ.size()>0) delete (T*)mQ.get();
	}

	/** Empty the queue, but don't delete. */
	void flushNoDelete()
	{
		ScopedLock lock(mLock);
		while (mQ.size()>0) mQ.get();
	}


	~InterthreadQueue()
		{ clear(); }


	size_t size() const
	{
		ScopedLock lock(mLock);
		return mQ.size();
	}

	size_t totalSize() const		// pat added
	{
		ScopedLock lock(mLock);
		return mQ.totalSize();
	}

	/**
		Blocking read.
		@return Pointer to object (will not be NULL).
	*/
	T* read()
	{
		ScopedLock lock(mLock);
		T* retVal = (T*)mQ.get();
		while (retVal==NULL) {
			mWriteSignal.wait(mLock);
			retVal = (T*)mQ.get();
		}
		return retVal;
	}

	/** Non-blocking peek at the first element; returns NULL if empty. */
	T* front()
	{
		ScopedLock lock(mLock);
		return (T*) mQ.front();
	}

	/**
		Blocking read with a timeout.
		@param timeout The read timeout in ms.
		@return Pointer to object or NULL on timeout.
	*/
	T* read(unsigned timeout)
	{
		if (timeout==0) return readNoBlock();
		Timeval waitTime(timeout);
		ScopedLock lock(mLock);
		while ((mQ.size()==0) && (!waitTime.passed()))
			mWriteSignal.wait(mLock,waitTime.remaining());
		T* retVal = (T*)mQ.get();
		return retVal;
	}

	/**
		Non-blocking read.
		@return Pointer to object or NULL if FIFO is empty.
	*/
	T* readNoBlock()
	{
		ScopedLock lock(mLock);
		return (T*)mQ.get();
	}

	/** Non-blocking write. */
	void write(T* val)
	{
		ScopedLock lock(mLock);
		mQ.put(val);
		mWriteSignal.signal();
	}

	/** Non-block write to the front of the queue. */
	void write_front(T* val)	// pat added
	{
		ScopedLock lock(mLock);
		mQ.push_front(val);
		mWriteSignal.signal();
	}
};

// (pat) Identical to above but with the threading problem fixed.
template <class T, class Fifo=PointerFIFO> class InterthreadQueue2 {

	protected:

	Fifo mQ;
	mutable Mutex mLock;
	mutable Signal mWriteSignal;

	public:

	/** Delete contents. */
	void clear()
	{
		ScopedLock lock(mLock);
		while (mQ.size()>0) delete (T*)mQ.get();
	}

	/** Empty the queue, but don't delete. */
	void flushNoDelete()
	{
		ScopedLock lock(mLock);
		while (mQ.size()>0) mQ.get();
	}


	~InterthreadQueue2()
		{ clear(); }


	size_t size() const
	{
		ScopedLock lock(mLock);
		return mQ.size();
	}

	size_t totalSize() const		// pat added
	{
		ScopedLock lock(mLock);
		return mQ.totalSize();
	}

	/**
		Blocking read.
		@return Pointer to object (will not be NULL).
	*/
	T* read()
	{
		ScopedLock lock(mLock);
		T* retVal = (T*)mQ.get();
		while (retVal==NULL) {
			mWriteSignal.wait(mLock);
			retVal = (T*)mQ.get();
		}
		return retVal;
	}

	/** Non-blocking peek at the first element; returns NULL if empty. */
	T* front()
	{
		ScopedLock lock(mLock);
		return (T*) mQ.front();
	}

	/**
		Blocking read with a timeout.
		@param timeout The read timeout in ms.
		@return Pointer to object or NULL on timeout.
	*/
	T* read(unsigned timeout)
	{
		if (timeout==0) return readNoBlock();
		Timeval waitTime(timeout);
		ScopedLock lock(mLock);
		while ((mQ.size()==0) && (!waitTime.passed()))
			mWriteSignal.wait(mLock,waitTime.remaining());
		T* retVal = (T*)mQ.get();
		return retVal;
	}

	/**
		Non-blocking read.
		@return Pointer to object or NULL if FIFO is empty.
	*/
	T* readNoBlock()
	{
		ScopedLock lock(mLock);
		return (T*)mQ.get();
	}

	/** Non-blocking write. */
	void write(T* val)
	{
		// (pat) The Mutex mLock must be released before signaling the mWriteSignal condition.
		// This is an implicit requirement of pthread_cond_wait() called from signal().
		// If you do not do that, the InterthreadQueue read() function cannot start
		// because the mutex is still locked by the thread calling the write(),
		// so the read() thread yields its immediate execution opportunity.
		// This recurs (and the InterthreadQueue fills up with data)
		// until the read thread's accumulated temporary priority causes it to
		// get a second pre-emptive activation over the writing thread,
		// resulting in bursts of activity by the read thread.
		{ ScopedLock lock(mLock);
		  mQ.put(val);
		}
		mWriteSignal.signal();
	}

	/** Non-block write to the front of the queue. */
	void write_front(T* val)	// pat added
	{
		// (pat) See comments above.
		{ ScopedLock lock(mLock);
		  mQ.push_front(val);
		}
		mWriteSignal.signal();
	}
};



/** Pointer FIFO for interthread operations.  */
template <class T> class InterthreadQueueWithWait {

	protected:

	PointerFIFO mQ;
	mutable Mutex mLock;
	mutable Signal mWriteSignal;
	mutable Signal mReadSignal;

	virtual void freeElement(T* element) const { delete element; };

	public:

	/** Delete contents. */
	void clear()
	{
		ScopedLock lock(mLock);
		while (mQ.size()>0) freeElement((T*)mQ.get());
		mReadSignal.signal();
	}



	virtual ~InterthreadQueueWithWait()
		{ clear(); }


	size_t size() const
	{
		ScopedLock lock(mLock);
		return mQ.size();
	}

	/**
		Blocking read.
		@return Pointer to object (will not be NULL).
	*/
	T* read()
	{
		ScopedLock lock(mLock);
		T* retVal = (T*)mQ.get();
		while (retVal==NULL) {
			mWriteSignal.wait(mLock);
			retVal = (T*)mQ.get();
		}
		mReadSignal.signal();
		return retVal;
	}

	/**
		Blocking read with a timeout.
		@param timeout The read timeout in ms.
		@return Pointer to object or NULL on timeout.
	*/
	T* read(unsigned timeout)
	{
		if (timeout==0) return readNoBlock();
		Timeval waitTime(timeout);
		ScopedLock lock(mLock);
		while ((mQ.size()==0) && (!waitTime.passed()))
			mWriteSignal.wait(mLock,waitTime.remaining());
		T* retVal = (T*)mQ.get();
		if (retVal!=NULL) mReadSignal.signal();
		return retVal;
	}

	/**
		Non-blocking read.
		@return Pointer to object or NULL if FIFO is empty.
	*/
	T* readNoBlock()
	{
		ScopedLock lock(mLock);
		T* retVal = (T*)mQ.get();
		if (retVal!=NULL) mReadSignal.signal();
		return retVal;
	}

	/** Non-blocking write. */
	void write(T* val)
	{
		// (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
		ScopedLock lock(mLock);
		mQ.put(val);
		mWriteSignal.signal();
	}

	/** Wait until the queue falls below a low water mark. */
	// (pat) This function suffers from the same problem as documented
	// at InterthreadQueue.write(), but I am not fixing it because I cannot test it.
	// The caller of this function will eventually get to run, just not immediately
	// after the mReadSignal condition is fulfilled.
	void wait(size_t sz=0)
	{
		ScopedLock lock(mLock);
		while (mQ.size()>sz) mReadSignal.wait(mLock);
	}

};





/** Thread-safe map of pointers to class D, keyed by class K. */
template <class K, class D > class InterthreadMap {

protected:

	typedef std::map<K,D*> Map;
	Map mMap;
	mutable Mutex mLock;
	Signal mWriteSignal;

public:

	void clear()
	{
		// Delete everything in the map.
		ScopedLock lock(mLock);
		typename Map::iterator iter = mMap.begin();
		while (iter != mMap.end()) {
			delete iter->second;
			++iter;
		}
		mMap.clear();
	}

	~InterthreadMap() { clear(); }

	/**
		Non-blocking write.
		@param key The index to write to.
		@param wData Pointer to data, not to be deleted until removed from the map.
	*/
	void write(const K &key, D * wData)
	{
		ScopedLock lock(mLock);
		typename Map::iterator iter = mMap.find(key);
		if (iter!=mMap.end()) {
			delete iter->second;
			iter->second = wData;
		} else {
			mMap[key] = wData;
		}
		mWriteSignal.broadcast();
	}

	/**
		Non-blocking read with element removal.
		@param key Key to read from.
		@return Pointer at key or NULL if key not found, to be deleted by caller.
	*/
	D* getNoBlock(const K& key)
	{
		ScopedLock lock(mLock);
		typename Map::iterator iter = mMap.find(key);
		if (iter==mMap.end()) return NULL;
		D* retVal = iter->second;
		mMap.erase(iter);
		return retVal;
	}

	/**
		Blocking read with a timeout and element removal.
		@param key The key to read from.
		@param timeout The blocking timeout in ms.
		@return Pointer at key or NULL on timeout, to be deleted by caller.
	*/
	D* get(const K &key, unsigned timeout)
	{
		if (timeout==0) return getNoBlock(key);
		Timeval waitTime(timeout);
		ScopedLock lock(mLock);
		typename Map::iterator iter = mMap.find(key);
		while ((iter==mMap.end()) && (!waitTime.passed())) {
			mWriteSignal.wait(mLock,waitTime.remaining());
			iter = mMap.find(key);
		}
		if (iter==mMap.end()) return NULL;
		D* retVal = iter->second;
		mMap.erase(iter);
		return retVal;
	}

	/**
		Blocking read with and element removal.
		@param key The key to read from.
		@return Pointer at key, to be deleted by caller.
	*/
	D* get(const K &key)
	{
		ScopedLock lock(mLock);
		typename Map::iterator iter = mMap.find(key);
		while (iter==mMap.end()) {
			mWriteSignal.wait(mLock);
			iter = mMap.find(key);
		}
		D* retVal = iter->second;
		mMap.erase(iter);
		return retVal;
	}


	/**
		Remove an entry and delete it.
		@param key The key of the entry to delete.
		@return True if it was actually found and deleted.
	*/
	bool remove(const  K &key )
	{
		D* val = getNoBlock(key);
		if (!val) return false;
		delete val;
		return true;
	}


	/**
		Non-blocking read.
		@param key Key to read from.
		@return Pointer at key or NULL if key not found.
	*/
	D* readNoBlock(const K& key) const
	{
		D* retVal=NULL;
		ScopedLock lock(mLock);
		typename Map::const_iterator iter = mMap.find(key);
		if (iter!=mMap.end()) retVal = iter->second;
		return retVal;
	}

	/**
		Blocking read with a timeout.
		@param key The key to read from.
		@param timeout The blocking timeout in ms.
		@return Pointer at key or NULL on timeout.
	*/
	D* read(const K &key, unsigned timeout)
	{
		if (timeout==0) return readNoBlock(key);
		ScopedLock lock(mLock);
		Timeval waitTime(timeout);
		typename Map::const_iterator iter = mMap.find(key);
		while ((iter==mMap.end()) && (!waitTime.passed())) {
			mWriteSignal.wait(mLock,waitTime.remaining());
			iter = mMap.find(key);
		}
		if (iter==mMap.end()) return NULL;
		D* retVal = iter->second;
		return retVal;
	}

	/**
		Blocking read.
		@param key The key to read from.
		@return Pointer at key.
	*/
	D* read(const K &key)
	{
		ScopedLock lock(mLock);
		typename Map::const_iterator iter = mMap.find(key);
		while (iter==mMap.end()) {
			mWriteSignal.wait(mLock);
			iter = mMap.find(key);
		}
		D* retVal = iter->second;
		return retVal;
	}

};







/** This class is used to provide pointer-based comparison in priority_queues. */
template <class T> class PointerCompare {

	public:

	/** Compare the objects pointed to, not the pointers themselves. */
	bool operator()(const T *v1, const T *v2)
		{ return (*v1)>(*v2); }

};



/**
	Priority queue for interthread operations.
	Passes pointers to objects.
*/
template <class T, class C = std::vector<T*>, class Cmp = PointerCompare<T> > class InterthreadPriorityQueue {

	protected:

	std::priority_queue<T*,C,Cmp> mQ;
	mutable Mutex mLock;
	mutable Signal mWriteSignal;

	public:


	/** Clear the FIFO. */
	void clear()
	{
		ScopedLock lock(mLock);
		while (mQ.size()>0)	{
			T* ptr = mQ.top();
			mQ.pop();
			delete ptr;
		}
	}


	~InterthreadPriorityQueue()
	{
		clear();
	}

	size_t size() const
	{
		ScopedLock lock(mLock);
		return mQ.size();
	}


	/** Non-blocking read. */
	T* readNoBlock()
	{
		ScopedLock lock(mLock);
		T* retVal = NULL;
		if (mQ.size()!=0) {
			retVal = mQ.top();
			mQ.pop();
		}
		return retVal;
	}

	/** Blocking read. */
	T* read()
	{
		ScopedLock lock(mLock);
		T* retVal;
		while (mQ.size()==0) mWriteSignal.wait(mLock);
		retVal = mQ.top();
		mQ.pop();
		return retVal;
	}

	/** Non-blocking write. */
	void write(T* val)
	{
		// (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
		ScopedLock lock(mLock);
		mQ.push(val);
		mWriteSignal.signal();
	}

};





class Semaphore {

	private:

	bool mFlag;
	Signal mSignal;
	mutable Mutex mLock;

	public:

	Semaphore()
		:mFlag(false)
	{ }

	void post()
	{
		ScopedLock lock(mLock);
		mFlag=true;
		mSignal.signal();
	}

	void get()
	{
		ScopedLock lock(mLock);
		while (!mFlag) mSignal.wait(mLock);
		mFlag=false;
	}

	bool semtry()
	{
		ScopedLock lock(mLock);
		bool retVal = mFlag;
		mFlag = false;
		return retVal;
	}

};





//@}




#endif
// vim: ts=4 sw=4
