/* ==== gzip.c ============================================================
 * Copyright (c) 1995 by Chris Provenzano, proven@mit.edu
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. All advertising materials mentioning features or use of this software
 *    must display the following acknowledgement:
 *  This product includes software developed by Chris Provenzano.
 * 4. The name of Chris Provenzano may not be used to endorse or promote 
 *	  products derived from this software without specific prior written
 *	  permission.
 *
 * THIS SOFTWARE IS PROVIDED BY CHRIS PROVENZANO ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL CHRIS PROVENZANO BE LIABLE FOR ANY 
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 
 * SUCH DAMAGE.
 *
 * Description : Push a pthread that compresses a stream but only compress 
 *				 if the I/O is too slow.
 *
 *  1.00 95/10/28 proven
 *      -Started coding this file.
 */

#define PPS_ALWAYS			0x0000
#define PPS_NEVER			0x0001		/* Kinda silly */
#define PPS_CHANGE_NEVER	0x0002
#define PPS_CHANGE_ONCE		0x0004
#define PPS_USE_FUNCTION	0x0008
/* =============================================================================
 *
 */
pthread_push_stream(int in_fd, int out_fd, int flags,
  void * (* function_name(int in, int out)))
{
	pthread_create().

}

/* =============================================================================
 *
 */
pthread_init_stream(int in_fd, int out_fd, int flags,
  void * (* function_name(int in, int out)))
{

}

stream_buf {
	ssize_t	buf_count;
	size_t	buf_size;
	void *	buf_data;
};

/* =============================================================================
 *
 */
stream_read(int in, stream_fd out  int out_fd, int flags,
  void * (* function_name(int in, int out)))
{
	stream_buf stream;

	stream.buf_size = mtu;
	if ((stream.buf_data = stream_alloc(mtu))

	while (stream.buf_count = read(in_fd, stream.buf_data, mtu)) {
		if (stream.buf_count < 0)
		
		stream_put
	}
}

pthread_stream_read

typedef void * 		stream_buf_t;

typedef struct stream_buf {
	size_t			buf_count;
	size_t			buf_size;
	void *			buf_data;
	void			buf;
} stream_buf;

typedef struct stream {
	pthread_mutex_t	  mutex;
	pthread_cond_t	  empty;
	pthread_cond_t	  full;
	stream_buf *	  buf;
} stream;

stream_init(stream * stream)
{
	pthread_mutex_init(&stream->mutex);
	pthread_cond_init(&stream->cond);
	stream->empty = NULL;
	stream->full = NULL;
}

int stream_buf_alloc(size_t size, stream_buf_t *buf)
{
	stream_buf * stream_buf;

	if (stream_buf = (stream_buf *)malloc(sizeof(stream_buf) + size)) {
		stream_buf->buf_data = &stream_buf->buf;
		stream_buf->buf_size = size;
		stream_buf->buf_count = 0;
		*buf = &stream_buf->buf;
		return(OK);
	}
	errno = ENOMEM;
	return(ENOMEM);
}

void stream_buf_free(stream_buf_t * buf)
{
	stream_buf * stream_buf = (stream_buf *)((*buf) - sizeof(stream_buf));
	free(stream_buf);
}

stream_lock(stream * stream)
{
	pthread_mutex_lock(fd->mutex);
}

stream_unlock(stream * stream)
{
	pthread_mutex_unlock(fd->mutex);
}

/* =================================================================
 * stream_get()
 *
 * This routine has the option to reuse the current stream_buf_t or
 * free it and allocate a new stream_buf_t
 */
ssize_t stream_get(stream * stream, stream_buf_t * buf, size_t count)
{
	stream_buf * stream_buf = (stream_buf *)((*buf) - sizeof(stream_buf));
	ssize_t ret;

	if (ret = count) {
		stream_lock(stream);
		while (stream->buf == NULL) {
			pthread_cond_wait(stream->empty, stream->mutex);
		}
		/* Do data copy here */
		if ((stream->buf->buf != stream->buf->buf_data) ||
		    (count < stream->buf->buf_count)) {
			if (count > stream_buf->buf_size) { 
				stream_buf_free(buf);
				if (stream_buf_alloc(buf, count)) {
					stream_unlock(stream);
					return(NOTOK);
				}	
			}
			stream_buf = (stream_buf *)((*buf) - sizeof(stream_buf));
			memcpy(stream_buf->buf, stream->buf->buf_data, count);
			stream->buf->buf_count -= count;
			stream->buf->buf_data += count;
		} else {
			stream_buf_free(buf);
			ret = buf->buf_count;
			*buf = &(stream->buf->buf);

			stream->buf->buf = NULL;
			stream->buf->buf_data = NULL;
			stream->buf->buf_size = 0;
			stream->buf->buf_count = 0;
			pthread_cond_signal(stream->full);
		}
		stream_unlock(stream);
	}
	return(ret);
}

/* =================================================================
 * stream_get()
 *
 * Upon return the stream_buf is nolonger valid
 */
ssize_t stream_put(stream * stream, stream_buf * buf, size_t count)
{
	stream_buf * stream_buf = (stream_buf *)((*buf) - sizeof(stream_buf));
	ssize_t ret;

	if (
	if (ret = count) {
		stream_lock(stream);
		while (stream->buf != NULL) {
			pthread_cond_wait(stream->full, stream->mutex);
		}
		pthread_cond_signal(stream->empty);
		stream_buf->buf_count = count;
		stream_buf->buf_data = *buf;
		stream->buf = stream_buf;
		stream_unlock(stream);
	}
	return(count);
}

/* Example streams */

typedef struct comp_thread_arg {
	stream_buf_t 	  in;
	stream_buf_t 	  out;
	pthread_mutex_t	  mutex;
	size_t			  count;
} comp_thread_arg;

static void * comp_thread_routine(void * arg)
{
	comp_thread_arg * comp_thread_arg = (comp_thread_arg *)arg;

	pthread_cancel_settype(ASYNC);
	pthread_cancel_settype(DEFFERED);
	pthread_mutex_lock(comp_thread_arg.mutex);
		comp_thread_arg.finished = TRUE;
	pthread_mutex_unlock(comp_thread_arg.mutex);
}

static void comp_thread_arg_init(comp_thread_arg * comp_thread_arg)
{
	pthread_mutex_init(NULL, &comp_thread_arg->mutex);
	comp_thread_arg->count = 0;
	comp_thread_arg->out = NULL;
	comp_thread_arg->in = NULL;
}

/* =============================================================================
 *
 */
void * stream_func_comp(void * arg)
{
	stream_buf_t stream_buf, comp_stream_buf;
	stream * stream_in, * stream_out;

	comp_thread_arg comp_thread_arg;
	pthread_t comp_thread;

	ssize_t stream_count;

	/* Do some initialization first */
	pthread_attr_init(&comp_thread_attr);
	pthread_get_schedparam();
	pthread_attr_setschedparam(comp_thread_attr, 

	comp_thread_arg_init(&comp_thread_arg);
	stream_alloc(30000, &comp_thread_arg->out);

	while (stream_count = stream_get(stream_in, &stream_buf, 30000)) {
		stream_lock(stream_out);
		if (stream_out->buf == NULL) {
			comp_thread_arg.in = stream_buf;
			comp_thread_arg.count = stream_count;
			pthread_create(&comp_thread, comp_thread_attr, 
			  comp_thread_routine, &comp_thread_arg);
			do { /* Now just wait until the out stream is ready */
				pthread_cond_wait(stream_out->empty, stream_out->mutex);
			while (stream_out->buf == NULL);

			pthread_mutex_lock(comp_thread_arg.mutex);
			if (comp_thread_arg.count >= stream_count) {
				stream_put_basic(stream_out, stream_buf, stream_count);
				pthread_schedparam(comp_thread, /* Raise priority to that of current thread */
				pthread_mutex_unlock(comp_thread_arg.mutex);
				pthread_cancel(comp_thread);
				pthread_join(comp_thread);
				stream_unlock(stream_out);
			} else {
				stream_put_basic(stream_out, &comp_thread_arg->out,
				  comp_thread_arg.count);
				stream_unlock(stream_out);

				stream_alloc(30000, &comp_thread_arg->out);
			}
		}
	}
}


/* =============================================================================
 *
 */
void * stream_func_rread(void * arg)
{
	stream_buf_t stream_buf;
	stream * stream;

	ssize_t count;

	while (stream_alloc(&stream_buf, 30000) == OK) {
		if ((count = rread(fd, stream_buf, 30000)) < OK) {
			stream_free(&stream_buf);
			return(NULL);
		}
		stream_put(stream, stream_buf, count);
	}
	
}

struct request_queue {
	struct request	* first;
	struct request	* last;
	pthread_cond_t	  cond;
	pthread_mutex_t	  mutex;
} queue;
	
enum request_op {
	R_NONE = 0,
	R_READ,
};

struct request {
	struct request *next;
	pthread_mutex_t	mutex;
	pthread_cond_t	cond;
	enum request_op op;
	int				fd;
	void *			buf;
	ssize_t			count;
	int				finished;
};

/* =============================================================================
 *
 */
ssize_t rread(int fd, void * buf, size_t count)
{
	struct request request;

	pthread_mutex_init(mutex);
	pthread_cond_init(cond);
	request.op = R_READ;
	request.fd = fd;
	request.buf = buf;
	request.count = count;
	request.finished = FALSE;

	request.next = NULL;
	pthread_mutex_lock(queue->mutex);
	if (queue->last == NULL) {
		pthread_signal(queue->cond);
		queue->first = &request;
		queue->last = &request;
	} else {
		queue->last->next = &request;
	}
	pthread_mutex_unlock(queue->mutex);

	pthread_mutex_lock(request->mutex);
	while (request.finished == FALSE) {
		pthread_cond_wait(request.cond, request.mutex);
	}
	pthread_mutex_unlock(request->mutex);
}

void rdo(struct request_queue * queue, int n)
{
	struct request * request;
	int i;

	for(i = 0; (!n) || (i < n); i++) {
		pthread_mutex_lock(queue->mutex); 
		while((request = queue->first) == NULL)
			pthread_cond_wait(queue->cond, queue->mutex);

		if ((queue->first = request->next) == NULL) 
			queue->last = NULL;

		pthread_mutex_unlock(queue->mutex); 

		pthread_mutex_lock(request->mutex);
		request->count = read(request.fd, request.buf, request.count);
		request->finished = TRUE;
		pthread_signal(request->cond);
		pthread_mutex_unlock(request->mutex);
	}
	
}
