import java.net.*;
import java.util.*;
import java.io.*;


/** The RealBridge class talks to a Distributor and relays those messages to
 * the Hyphos network.
 */

public class RealBridge extends Bridge implements Runnable {
  Hashtable expectAcks = new Hashtable();
  
  RealBridge(boolean type){
    super(type);
  }

  public void run() {
    Hyphos hyphos = new Hyphos(this);
    Random rand;
    DatagramSocket dist;
    byte packetData[] = new byte [Distributor.MAX_DATA_SIZE];
    byte addr[] = new byte[4];
    DatagramPacket packet=new DatagramPacket(packetData, Distributor.MAX_DATA_SIZE);
    Vector queue = new Vector();
    int ct, ct2, i, re;
    HyphosPayload hp;
    Byte x;
    byte pid;
    long now, retransmitTime;
    
    retransmitTime = 500;

    if(listener)
      {
	// Open the socket to the distributor
	try { dist = new DatagramSocket(Distributor.AM_PORT); 	dist.setSoTimeout(150);
	}
	catch(SocketException e) { System.out.println("Uh oh!"); return; }
	pid = 0;
	rand = new Random();


	while(true){
	  // Any pending
	  now  = System.currentTimeMillis();
	  for(Enumeration e = expectAcks.keys(); e.hasMoreElements(); ){
	    hp = (HyphosPayload) (expectAcks.get(x = (Byte) e.nextElement()));
	    if((hp.transmitTime+retransmitTime) > now){
	      hp.nTimeouts++;
	      if(hp.nTimeouts > 5){
		// Give up
		expectAcks.remove(x);
		// *** Add code here to queue nak with x.byteValue() as the id
		//		queue.addElement(new DatagramPacket());
	      }
	      else{
		// Retransmit
		hp.transmitTime = System.currentTimeMillis();
		hyphos.transmit(hp.addr, hp.payload,
				hp.payload.length, x.byteValue());
	      }
	    }
	  }
	  // Receive the packet
	  try { dist.receive(packet); }
	  catch (IOException e) { continue; }


	  // Send if off
	  packetData = packet.getData();
	  byte packetData2[] = new byte [packetData.length-4];
	  for(ct=0;ct<packetData.length;ct++){
	    if(ct<=3)
	      addr[ct] = packetData[ct];
	    else
	      packetData2[ct-4] = packetData[ct];
	  }
	  expectAcks.put(new Byte(pid), 
			 new HyphosPayload(addr, 
					   packetData2, 
					   System.currentTimeMillis()));
	  hyphos.transmit(addr, packetData2, packetData2.length, pid++);
	}
      }
    else
      {
	try { dist = new DatagramSocket(Distributor.DIST_PORT); }
	catch(SocketException e) { System.out.println("Uh oh(dist)!"); return; }
	rand = new Random();

	while(true){
	  // Are there any queued responses to send?
	  if(queue.size() >= 1){
	    // Pop the packet off the queue
	    packet = (DatagramPacket) queue.firstElement();
	    queue.removeElementAt(1);
	    
	    // And off it goes!
	    try { dist.send(packet); }
	    catch (IOException e) { }
	  }
	}
      }
  }

  public void fromAmulet(byte amulet[], byte uid, byte msg[]){
    Byte responseID;

    if(msg[0] == TSConstants.ACK){
      for(Enumeration e = expectAcks.keys(); e.hasMoreElements(); ){
	responseID = (Byte) e.nextElement();
	if(uid == responseID.byteValue()){
	  expectAcks.remove(responseID);
	}
      }
    }
    else {
      // *** Do something with other packets.  Put them on the queue or something 

    }
  }

}

class HyphosPayload {
  byte addr[];
  byte payload[];
  long transmitTime;
  int nTimeouts;

  HyphosPayload(byte a[], byte p[], long t){
    addr = a;
    payload = p;
    transmitTime = t;
  }
}
