/*
 * @(#)StreamDemultiplexor.java				0.2-2 23/03/1997
 *
 *  This file is part of the HTTPClient package 
 *  Copyright (C) 1996,1997  Ronald Tschalaer
 *
 *  This library is free software; you can redistribute it and/or
 *  modify it under the terms of the GNU Library General Public
 *  License as published by the Free Software Foundation; either
 *  version 2 of the License, or (at your option) any later version.
 *
 *  This library is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 *  Library General Public License for more details.
 *
 *  You should have received a copy of the GNU Library General Public
 *  License along with this library; if not, write to the Free
 *  Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
 *  MA 02111-1307, USA
 *
 *  For questions, suggestions, bug-reports, enhancement-requests etc.
 *  I may be contacted at:
 *
 *  ronald@innovation.ch
 *  Ronald.Tschalaer@psi.ch
 *
 */

package HTTPClient;


import java.io.*;
import java.net.Socket;
import java.util.Vector;
import java.util.Enumeration;

/**
 * This class handles the demultiplexing of input stream. This is needed
 * for things like keep-alive in HTTP/1.0, persist in HTTP/1.1 and in HTTP-NG.
 *
 * @version	0.2 (bug fix 2)  23/03/1997
 * @author	Ronald Tschal&auml;r
 */

class StreamDemultiplexor
{
    /** the protocol were handling request for */
    private int                    Protocol;

    /** the connection we're working for */
    private HTTPConnection         Connection;
    /** the input stream to demultiplex */
    private ExtBufferedInputStream Stream;

    /** the socket this hangs off */
    private Socket                 Sock = null;

    /** used to mark the socket for close */
    private boolean                MarkedForClose;

    /** timer used to close the socket if unused for a given time */
    private SocketTimeout          Timer;

    /** marks if we are currrently trimming closed streams */
    private boolean                TrimInProgress;

    /** marks if we are currrently retrying aborted requests */
    private boolean                RetryInProgress = false;

    /** a Vector to hold the list of response handlers were serving */
    private ResponseHandlerList    RespHandlerList;

    /** number of unread bytes in current chunk (if transf-enc == chunked) */
    private int                    chunk_len;


    // Constructors

    /**
     * a simple contructor.
     *
     * @param protocol   the protocol used on this stream.
     * @param sock       the socket which we're to demux.
     * @param connection the http-connection this socket belongs to.
     */
    StreamDemultiplexor(int protocol, Socket sock, HTTPConnection connection)
	    throws IOException
    {
	this.Protocol   = protocol;
	this.Connection = connection;
	RespHandlerList = new ResponseHandlerList();
	init(sock);

	// start a timer to close the socket after 10 seconds
	Timer = new SocketTimeout(10000, this);
	Timer.start();
    }


    /**
     * Initializes the demultiplexor with a new socket.
     *
     * @param stream   the stream to demultiplex
     */
    private void init(Socket sock)  throws IOException
    {
	if (HTTPConnection.debug)
	    System.err.println("Initializing Stream Demultiplexor (" +
				this.hashCode() + ")");

	this.Sock       = sock;
	this.Stream     = new ExtBufferedInputStream(sock.getInputStream());
	MarkedForClose  = false;
	chunk_len       = -1;
	TrimInProgress  = false;
    }


    // Methods

    /**
     * Each HTTPResponse must register with us.
     */
    void register(HTTPResponse resp_handler, String[] con_hdrs,
		  ByteArrayOutputStream headers, byte[] data)
    {
	RespHandlerList.addToEnd(
	    new ResponseHandler(resp_handler, con_hdrs, headers, data, this));
    }

    /**
     * creates an input stream for the response.
     *
     * @param resp the response structure requesting the stream
     * @return an InputStream
     */
    InputStream getStream(HTTPResponse resp)
    {
	ResponseHandler resph = RespHandlerList.find(resp);
	if (resp != null)
	    return resph.stream;
	else
	    return null;
    }


    /**
     * Restarts the timer thread that will close an unused socket after
     * 10 seconds.
     */
    void restartTimer()
    {
	Timer.reset();
    }


    /**
     * reads an array of bytes from the master stream.
     */
    int read(byte[] b, int off, int len, ResponseHandler resph)
	    throws IOException
    {
	if (resph.eof)
	    return -1;

	try
	{
	    // read the headers and data for all responses preceding us.

	    while (RespHandlerList.Head != resph  &&
		   RespHandlerList.Head != null)
		RespHandlerList.Head.stream.readAll();


	    // Now we can read from the stream.

	    synchronized(this)
	    {
		if (resph.exception != null)
		    throw new IOException(resph.exception.getMessage());

		int rcvd = -1;

		switch (resph.resp.cl_type)
		{
		    case HTTPResponse.CL_HDRS:
			rcvd = Stream.read(b, off, len);
			if (rcvd == -1)
			    throw new EOFException("Premature EOF encountered");
			break;
		    case HTTPResponse.CL_0:
			rcvd = -1;
			close(resph);
			break;
		    case HTTPResponse.CL_CLOSE:
			rcvd = Stream.read(b, off, len);
			if (rcvd == -1)
			{
			    MarkedForClose = true;
			    close(resph);
			}
			break;
		    case HTTPResponse.CL_CONTLEN:
			int cl = resph.resp.getHeaderAsInt("Content-Length");
			if (len > cl - resph.stream.count)
			    len = cl - resph.stream.count;

			rcvd = Stream.read(b, off, len);
			if (rcvd == -1)
			    throw new EOFException("Premature EOF encountered");

			if (resph.stream.count+rcvd == cl)
			    close(resph);

			break;
		    case HTTPResponse.CL_CHUNKED:
			if (chunk_len == -1)	// it's a new chunk
			    chunk_len = Codecs.getChunkLength(Stream);

			if (chunk_len > 0)		// it's data
			{
			    if (len > chunk_len)  len = chunk_len;
			    rcvd = Stream.read(b, off, len);
			    if (rcvd == -1)
				throw new EOFException("Premature EOF encountered");
			    chunk_len -= rcvd;
			    if (chunk_len == 0)	// got the whole chunk
			    {
				Stream.read();	// CR
				Stream.read();	// LF
				chunk_len = -1;
			    }
			}
			else	// the footers
			{
			    rcvd = -1;
			    close(resph);
			    chunk_len = -1;

			    // remove footers
			    DataInputStream datain = new DataInputStream(Stream);
			    String line;
			    while ((line = datain.readLine()) != null  &&
				   line.length() > 0) ;
			    if (line == null)
				throw new EOFException("Premature EOF encountered");
			}
			break;
		    case HTTPResponse.CL_MP_BR:
			byte[] endbndry = resph.getEndBoundary(Stream);
			int[]  end_cmp  = resph.getEndCompiled(Stream);

			rcvd = Stream.read(b, off, len);
			if (rcvd == -1)
			    throw new EOFException("Premature EOF encountered");

			int ovf = Stream.pastEnd(endbndry, end_cmp);
			if (ovf != -1)
			{
			    rcvd -= ovf;
			    Stream.reset();
			    close(resph);
			}

			break;
		    default:
			throw new Error("Internal Error in StreamDemultiplexor: " +
					"Invalid cl_type " + resph.resp.cl_type);
		}

		return rcvd;
	    }

	}
	catch (IOException ioe)
	{
	    if (resph.exception != null)	// we threw this internally
		throw ioe;

	    if (HTTPConnection.debug)
		ioe.printStackTrace();

	    try { close(ioe); }
	    catch (IOException ioe2) { }

	    throw ioe;
	}
	catch (ParseException pe)
	{
	    if (HTTPConnection.debug)
		pe.printStackTrace();

	    try { close(new IOException(pe.toString())); }
	    catch (IOException ioe2) { }

	    throw new IOException(pe.toString());
	}
    }

    /**
     * skips a number of bytes in the master stream. This is done via a
     * dummy read, as the socket input stream doesn't like skip()'s.
     */
    synchronized long skip(long num, ResponseHandler resph) throws IOException
    {
	if (resph.eof)
	    return 0;

	byte[] dummy = new byte[(int) num];
	int rcvd = read(dummy, 0, (int) num, resph);
	if (rcvd == -1)
	    return 0;
	else
	    return rcvd;
    }

    /**
     * Determines the number of available bytes.
     */
    synchronized int available(ResponseHandler resph) throws IOException
    {
	int avail = Stream.available();

	if (resph.eof)
	    return 0;

	switch (resph.resp.cl_type)
	{
	    case HTTPResponse.CL_0:
		return 0;
	    case HTTPResponse.CL_HDRS:
		// this is something of a hack; I could return 0, but then
		// if you were waiting for something on a response that
		// wasn't first in line (and you didn't try to read the
		// other response) you'd wait forever. On the other hand,
		// we might be making a false promise here...
		return (avail > 0 ? 1 : 0);
	    case HTTPResponse.CL_CLOSE:
		return avail;
	    case HTTPResponse.CL_CONTLEN:
		int cl = resph.resp.getHeaderAsInt("Content-Length");
		cl -= resph.stream.count;
		return (avail < cl ? avail : cl);
	    case HTTPResponse.CL_CHUNKED:
		return avail;	// not perfect...
	    case HTTPResponse.CL_MP_BR:
		return avail;	// not perfect...
	    default:
		throw new Error("Internal Error in StreamDemultiplexor: " +
				"Invalid cl_type " + resph.resp.cl_type);
	}

    }


    /**
     * Closes the socket and all associated streams.
     *
     * @param exception the IOException to be sent to the streams.
     * @exception IOException If an exception occurs during Socket.close().
     */
    synchronized void close(IOException exception) throws IOException
    {
	if (Sock == null)	// already cleaned up
	    return;

	if (HTTPConnection.debug)
	    System.err.println("Closing all streams and socket (" +
				this.hashCode() + ")");

	Timer.kill();
	try
	    { Stream.close(); }
	catch (IOException ioe) { }
	try
	    { Sock.close(); }
	catch (IOException ioe) { }
	Sock = null;


	// Here comes the tricky part: redo outstanding requests!

	// For the moment however we just send the exception to every
	// outstanding request.

	ResponseHandler resph;
	while ((resph = RespHandlerList.Head) != null)
	{
	    resph.exception = exception;
	    RespHandlerList.remove(resph);
	}
    }


    /**
     * Closes the associated stream. Also flushes and removes all streams
     * next in line which the user has closed.
     */
    synchronized void close(ResponseHandler resph) throws IOException
    {
	if (resph != RespHandlerList.Head)
	    return;

	if (HTTPConnection.debug)
	    System.err.println("Closing stream (" + resph.stream.hashCode()
				+ ")");

	RespHandlerList.remove(resph);
	resph.eof = true;

	if (TrimInProgress)  return;
	TrimInProgress = true;

	trim: while (true)
	{
	    if (RespHandlerList.Head == null)
	    {
		if ((MarkedForClose  ||  !resph.keepAlive())  &&
		    Sock != null)
		{
		    if (HTTPConnection.debug)
			System.err.println("Closing socket (" +
					    this.hashCode() + ")");

		    Timer.kill();
		    Stream.close();
		    Sock.close();
		    Sock = null;
		}
		break trim;
	    }

	    resph = RespHandlerList.Head;
	    if (resph == null  ||  !resph.stream.closed)	// can't flush
		break trim;

	    resph.stream.readAll();
	}

	TrimInProgress = false;
    }


    /**
     * returns the socket associated with this demux
     */
    synchronized protected Socket getSocket()
    {
	if (MarkedForClose)
	    return null;

	Timer.hyber();
	return Sock;
    }


    /**
     * Mark this demux to not accept any more request and to close the
     * stream after all request have been processed, or close immediately
     * if no requests are registered.
     */
    synchronized void markForClose()
    {
	MarkedForClose = true;

	if (RespHandlerList.Head == null)	// no active request,
	{
	    try
		{ close((IOException) null); }	// so close the socket
	    catch (IOException ioe)
		{ }
	}
	else
	    Timer.hyber();
    }


    /**
     * A safety net to close the connection.
     */
    protected void finalize()
    {
	try
	    { close((IOException) null); }
	catch (IOException ioe)
	    { }
    }


    /**
     * produces a string. 
     * @return a string containing the class name and protocol number
     */
    public String toString()
    {
	return getClass().getName() + "[Protocol=" + Protocol + "]";
    }
}


class SocketTimeout extends Thread
{
    long                 timeout;
    StreamDemultiplexor  demux;
    boolean              restart,
			 hyber,
			 die;

    SocketTimeout(long time, StreamDemultiplexor demux)
    {
	super("SocketTimeout-"+demux.hashCode());
	try { setDaemon(true); }
	catch (SecurityException se) { }	// Oh well...
	timeout    = time;
	this.demux = demux;
    }

    public void run()
    {
	setPriority(MAX_PRIORITY);
	restart = false;
	hyber   = true;
	die     = false;
	long num_sec = timeout / 1000,
	     one_sec = timeout / num_sec;

	forever: while (true)
	{
	    while (hyber)
		{ try { sleep(one_sec); } catch (InterruptedException ie) { } }

	    if (die)
		break forever;

	    if (restart)
		restart = false;

	    // this loop is a hack to be able to restart the timer more
	    // precisely; if interrupt would work we could use that instead
	    for (long idx=num_sec; idx>0; idx--)
	    {
		try
		    { sleep(one_sec); }
		catch (InterruptedException ie)
		    { }

		if (restart  ||  hyber)
		    continue forever;
		if (die)
		    break forever;
	    }

	    synchronized(demux)
	    {
		if (restart  ||  hyber)
		    continue forever;
		if (die)
		    break forever;

		demux.markForClose();
	    }

	    break forever;
	}
    }


    /**
     * Ideally this would just call interrupt(), but the Thread stuff is
     * not fully implemented.
     */
    void reset()
    {
	restart = true;
	hyber   = false;
    }

    /**
     * Suspends the timer; suspend() ought to suffice, but Netscape seem to
     * be overtaxed when it comes to implementing this correctly (not that
     * it's trivial), so they opt to remove it instead...
     */
    void hyber()
    {
	restart = false;
	hyber   = true;
    }

    /**
     * Stops this timer if called by a different thread.
     */
    void kill()
    {
	die   = true;
	hyber = false;
    }
}


/**
 * Extends BufferedInputStream to allow searching for a string in the
 * internal buffer. Used for multipart content-types.
 */
final class ExtBufferedInputStream extends BufferedInputStream
{
    // Constructors

    ExtBufferedInputStream(InputStream inp_stream)
    {
	super(inp_stream);
    }

    ExtBufferedInputStream(InputStream inp_stream, int size)
    {
	super(inp_stream, size);
    }


    // Methods

    /**
     * Figures out how many bytes past the end of the multipart we read.
     * It then resets the markpos to either just past the end boundary
     * if we found it, or back enough from the current position so we
     * can always be sure to find the boundary.
     *
     * @param search     the search string (end boundary)
     * @param search_cmp the compiled info of the search string
     * @return how many bytes past the end of the boundary we went.
     */
    int pastEnd(byte[] search, int[] search_cmp)
    {
	int idx = Util.findStr(search, search_cmp, buf, markpos, pos);
	if (idx == -1)
	    markpos = pos - search.length;
	else
	{
	    markpos = idx + search.length;
	    idx = pos - markpos;
	}

	return idx;
    }

    /**
     * Initialises the mark and sets the marklimit to the buffer lenght.
     */
    void initMark()
    {
	mark(buf.length);
    }
}


/**
 * Implements a linked list of response handlers.
 */
final class ResponseHandlerList
{
    ResponseHandler Head = null;


    synchronized void addToEnd(ResponseHandler resph)
    {
	if (Head == null)
	{
	    Head = resph;
	    return;
	}

	ResponseHandler curr = Head;
	while (curr.next != null)
	    curr = curr.next;
	curr.next  = resph;
	resph.next = null;
    }


    synchronized void remove(ResponseHandler resph)
    {
	if (Head == null)  return;

	if (Head == resph)
	{
	    Head = resph.next;
	    resph.next = null;
	    return;
	}

	ResponseHandler curr = Head;
	while (curr != null)
	{
	    if (curr.next == resph)
	    {
		curr.next  = resph.next;
		resph.next = null;
		break;
	    }
	    curr = curr.next;
	}

	return;
    }


    synchronized ResponseHandler find(HTTPResponse resp)
    {
	ResponseHandler curr = Head;
	while (curr != null  &&  curr.resp != resp)
	    curr = curr.next;
	return curr;
    }


    private ResponseHandler enum_pos = null;

    ResponseHandler enumerate()
    {
	enum_pos = Head;
	return enum_pos;
    }

    ResponseHandler next()
    {
	if (enum_pos != null)
	    enum_pos = enum_pos.next;
	return enum_pos;
    }
}


/**
 * This holds various information about an active response.
 */
final class ResponseHandler
{
    /** our input demultiplexor */
    StreamDemultiplexor demux;

    /** the response stream */
    RespInputStream     stream;

    /** the response class */
    HTTPResponse        resp;

    /** any connection headers used in the request */
    String[]            con_hdrs;

    /** the full request headers used - needed for retransmission */
    ByteArrayOutputStream  headers;

    /** the data used in the request */
    byte[]                 data;

    /** next handler in linked list  */
    ResponseHandler     next = null;

    /** signals that the demux has closed the response stream, and that
	therefore no more data can be read */
    boolean             eof = false;

    /** this is non-null if the stream has an exception pending */
    IOException         exception = null;

    /** counts the number of retries this request has been subject to */
    int                 retries = 0;


    /**
     * Creates a new handler. This also allocates the response input
     * stream.
     *
     * @param resp     the reponse class.
     * @param con_hdrs an array of connection/keep-alive headers that were
     *                 used in sending the request.
     * @param headers  the complete headers as sent in the original request.
     * @param data     the data sent in the original request.
     * @param demux    our stream demultiplexor.
     */
    ResponseHandler(HTTPResponse resp, String[] con_hdrs,
		    ByteArrayOutputStream headers, byte[] data,
		    StreamDemultiplexor demux)
    {
	this.resp     = resp;
	this.con_hdrs = con_hdrs;
	this.headers  = headers;
	this.data     = data;
	this.demux    = demux;
	this.stream   = new RespInputStream(demux, this);

	if (HTTPConnection.debug)
	    System.err.println("Opening stream ("+this.stream.hashCode()+")");
    }


    /** holds the string that marks the end of this stream; used for
	multipart delimited responses. */
    private byte[] endbndry = null;

    /** holds the compilation of the above string */
    private int[]  end_cmp  = null;

    /**
     * return the boundary string for this response. Set's up the
     * InputStream buffer if neccessary.
     *
     * @param  MasterStream the input stream from which the stream demux
     *                      is reading.
     * @return the boundary string.
     */
    byte[] getEndBoundary(ExtBufferedInputStream MasterStream)
		throws IOException
    {
	if (endbndry == null)
	    setupBoundary(MasterStream);

	return endbndry;
    }

    /**
     * return the compilation of the boundary string for this response.
     * Set's up the InputStream buffer if neccessary.
     *
     * @param  MasterStream the input stream from which the stream demux
     *                      is reading.
     * @return the compiled boundary string.
     */
    int[] getEndCompiled(ExtBufferedInputStream MasterStream)
		throws IOException
    {
	if (end_cmp == null)
	    setupBoundary(MasterStream);

	return end_cmp;
    }

    /**
     * Gets the boundary string, compiles it for searching, and initializes
     * the buffered input stream.
     */
    void setupBoundary(ExtBufferedInputStream MasterStream)  throws IOException
    {
	String endstr = "--" + Codecs.getParameter("boundary",
			    resp.getHeader("Content-Type")) +
			"--\r\n";
	endbndry = new byte[endstr.length()];
	endstr.getBytes(0, endbndry.length, endbndry, 0);
	end_cmp = Util.compile_search(endbndry);
	MasterStream.initMark();
    }

    /**
     * Should the connection be kept alive after this request?
     *
     * @return true if the connection is to be kept alive.
     * @exception IOException If getting the headers results in an exception.
     */
    boolean keepAlive()  throws IOException
    {
	String con;

	// parse con_hdrs and resp_status
	if ((resp.getVersion().toUpperCase().equals("HTTP/1.1")  &&
	     (con = resp.getHeader("Connection")) != null  &&
	     con.toLowerCase().indexOf("close") == -1)
	    ||
	    (resp.getVersion().toUpperCase().equals("HTTP/1.0")  &&
	     (con = resp.getHeader("Connection")) != null  &&
	     con.toLowerCase().indexOf("keep-alive") != -1)
	    )
	    return true;
	else
	    return false;
    }
}


/**
 * This is the InputStream that gets returned to the user. The extensions
 * consist of the capability to have the data pushed into a buffer if the
 * stream demux needs to.
 */
final class RespInputStream extends InputStream
{
    /** the stream demultiplexor */
    private StreamDemultiplexor demux = null;

    /** our response handler */
    private ResponseHandler     resph;

    /** signals that the user has closed the stream and will therefore
	not read any further data */
	    boolean             closed = false;

    /** this buffer is used to buffer data that the demux has to get rid of */
    private byte[]              buffer = null;

    /** the offset at which the unread data starts in the buffer */
    private int                 offset = 0;

    /** the total number of bytes of entity data read from the demux so far */
            int                 count = 0;


    // Constructors

    RespInputStream(StreamDemultiplexor demux, ResponseHandler resph)
    {
	this.demux = demux;
	this.resph = resph;
    }


    // public Methods

    private byte[] ch = new byte[1];
    /**
     * Reads a single byte.
     *
     * @return the byte read, or -1 if EOF.
     * @exception IOException if any exception occured on the connection.
     */
    public synchronized int read() throws IOException
    {
	int rcvd = read(ch, 0, 1);
	if (rcvd == 1)
	    return ch[0] & 0xff;
	else
	    return -1;
    }


    /**
     * Reads <var>len</var> bytes into <var>b</var>, starting at offset
     * <var>off</var>.
     *
     * @return the number of bytes actually read, or -1 if EOF.
     * @exception IOException if any exception occured on the connection.
     */
    public synchronized int read(byte[] b, int off, int len) throws IOException
    {
	if (closed)
	    return -1;

	if (buffer != null)
	{
	    int left = buffer.length - offset;
	    if (left == 0)  return -1;

	    len = (len > left ? left : len);
	    System.arraycopy(buffer, offset, b, off, len);
	    offset += len;

	    return len;
	}
	else
	{
	    int rcvd = demux.read(b, off, len, resph);
	    if (rcvd != -1  &&  resph.resp.got_headers)
		count += rcvd;
	    return rcvd;
	}
    }

    /**
     * skips <var>num</var> bytes.
     *
     * @return the number of bytes actually skipped.
     * @exception IOException if any exception occured on the connection.
     */
    public synchronized long skip(long num) throws IOException
    {
	if (closed)
	    return 0;

	if (buffer != null)
	{
	    int left = buffer.length - offset;
	    num = (num > left ? left : num);
	    offset  += num;
	    return num;
	}
	else
	{
	    long skpd = demux.skip(num, resph);
	    if (resph.resp.got_headers)
		count += skpd;
	    return skpd;
	}
    }

    /**
     * gets the number of bytes available for reading without blocking.
     *
     * @return the number of bytes available.
     * @exception IOException if any exception occured on the connection.
     */
    public synchronized int available() throws IOException
    {
	if (closed)
	    return 0;

	if (buffer != null)
	    return buffer.length-offset;
	else
	    return demux.available(resph);
    }

    /**
     * closes the stream.
     *
     * @exception if any exception occured on the connection before or
     *            during close.
     */
    public synchronized void close() throws IOException
    {
	if (!closed)
	{
	    closed = true;
	    readAll();
	    demux.close(resph);
	}
    }


    /**
     * A safety net to clean up.
     */
    protected void finalize()
    {
	try
	    { close(); }
	catch (IOException ioe)
	    { }
    }


    // local Methods

    /**
     * Reads all remainings data into buffer.
     *
     * @exception IOException If any exception occurs while reading stream.
     */
    void readAll()  throws IOException
    {
	if (!resph.resp.got_headers)
	    resph.resp.getHeader("Content-length"); // force headers to be read

	synchronized(this)
	{
	    if (buffer != null)  return;

	    buffer   = new byte[0];
	    offset   = 0;
	    int rcvd = 0;
	    do
	    {
		count  += rcvd;
		offset += rcvd;
		buffer  = Util.resizeArray(buffer, offset+1000);
		rcvd    = demux.read(buffer, offset, 1000, resph);
	    } while (rcvd != -1);

	    buffer = Util.resizeArray(buffer, offset);
	    offset = 0;
	}
    }
}

