/*
****************************************************************************
*        Copyright IBM Corporation 1988, 1989 - All Rights Reserved        *
*                                                                          *
* Permission to use, copy, modify, and distribute this software and its    *
* documentation for any purpose and without fee is hereby granted,         *
* provided that the above copyright notice appear in all copies and        *
* that both that copyright notice and this permission notice appear in     *
* supporting documentation, and that the name of IBM not be used in        *
* advertising or publicity pertaining to distribution of the software      *
* without specific, written prior permission.                              *
*                                                                          *
* IBM DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL *
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL IBM *
* BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY      *
* DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER  *
* IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING   *
* OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.    *
****************************************************************************
*/

/* RX:  Extended Remote Procedure Call */

#include "rx_locl.h"

RCSID("$Id: rx.c,v 1.5 1998/02/24 01:36:30 art Exp $");

/*
 * quota system: each attached server process must be able to make
 *  progress to avoid system deadlock, so we ensure that we can always
 *  handle the arrival of the next unacknowledged data packet for an
 *  attached call.  rxi_dataQuota gives the max # of packets that must be
 *  reserved for active calls for them to be able to make progress, which is
 *  essentially enough to queue up a window-full of packets (the first packet
 *  may be missing, so these may not get read) + the # of packets the thread
 *  may use before reading all of its input (# free must be one more than send
 *  packet quota).  Thus, each thread allocates rx_Window+1 (max queued input
 *  packets) + an extra for sending data.  The system also reserves
 *  RX_MAX_QUOTA (must be more than RX_PACKET_QUOTA[i], which is 10), so that
 *  the extra packet can be sent (must be under the system-wide send packet
 *  quota to send any packets)
 */
/* # to reserve so that thread with input can still make calls (send packets)
   without blocking */
long rx_tq_dropped = 0;		       /* Solaris only temp variable */
long rxi_dataQuota = RX_MAX_QUOTA;     /* packets to reserve for active
				        * threads */

/*
 * Variables for handling the minProcs implementation.  availProcs gives the
 * number of threads available in the pool at this moment (not counting dudes
 * executing right now).  totalMin gives the total number of procs required
 * for handling all minProcs requests.  minDeficit is a dynamic variable
 * tracking the # of procs required to satisfy all of the remaining minProcs
 * demands.
 */

long rxi_availProcs = 0;	       /* number of threads in the pool */
long rxi_totalMin;		       /* Sum(minProcs) forall services */
long rxi_minDeficit = 0;	       /* number of procs needed to handle
				        * all minProcs */
long Rx0 = 0, Rx1 = 0;

struct rx_serverQueueEntry *rx_waitForPacket = 0;
struct rx_packet *rx_allocedP = 0;
#if 0
static char rxrcsid[] = "@(#)$Id: rx.c,v 1.5 1998/02/24 01:36:30 art Exp $";
#endif

/* ------------Exported Interfaces------------- */


/*
 * This function allows rxkad to set the epoch to a suitably random number
 * which rx_NewConnection will use in the future.  The principle purpose is to
 * get rxnull connections to use the same epoch as the rxkad connections do, at
 * least once the first rxkad connection is established.  This is important now
 * that the host/port addresses aren't used in FindConnection: the uniqueness
 * of epoch/cid matters and the start time won't do.
 */

void
rx_SetEpoch(u_long epoch)
{
    rx_epoch = epoch;
}

/*
 * Initialize rx.  A port number may be mentioned, in which case this
 * becomes the default port number for any service installed later.
 * If 0 is provided for the port number, a random port will be chosen
 * by the kernel.  Whether this will ever overlap anything in
 * /etc/services is anybody's guess...  Returns 0 on success, -1 on
 * error.
 */
static int rxinit_status = 1;

int
rx_Init(u_short port)
{
    struct timeval tv;
    char *htable, *ptable;

    SPLVAR;

    if (rxinit_status != 1)
	return rxinit_status;	       /* Already done; return previous error
				        * code. */

    /*
     * Allocate and initialize a socket for client and perhaps server
     * connections
     */
    rx_socket = rxi_GetUDPSocket(port);

    if (rx_socket == OSI_NULLSOCKET) {
	return RX_ADDRINUSE;
    }
#if	defined(AFS_GLOBAL_SUNLOCK) && defined(KERNEL)
    LOCK_INIT(&afs_rxglobal_lock, "afs_rxglobal_lock");
#endif
#ifdef	RX_ENABLE_LOCKS
    LOCK_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock");
    LOCK_INIT(&freeSQEList_lock, "freeSQEList lock");
    LOCK_INIT(&rx_waitingForPackets_lock, "rx_waitingForPackets lock");
    LOCK_INIT(&rx_freeCallQueue_lock, "rx_waitingForPackets lock");
#endif

    rxi_nCalls = 0;
    rx_connDeadTime = 12;
    memset((char *) &rx_stats, 0, sizeof(struct rx_stats));

    htable = (char *) osi_Alloc(rx_hashTableSize *
				sizeof(struct rx_connection *));
    PIN(htable, rx_hashTableSize *
	sizeof(struct rx_connection *));	/* XXXXX */
    memset(htable, 0, rx_hashTableSize *
	  sizeof(struct rx_connection *));
    ptable = (char *) osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
    PIN(ptable, rx_hashTableSize * sizeof(struct rx_peer *));	/* XXXXX */
    memset(ptable, 0, rx_hashTableSize * sizeof(struct rx_peer *));
    NETPRI;

    osi_GetTime(&tv);
    /*
     * *Slightly* random start time for the cid.  This is just to help
     * out with the hashing function at the peer
     */
    rx_port = port;
    rx_stats.minRtt.sec = 9999999;
    rx_SetEpoch(tv.tv_sec);	       /*
					* Start time of this package, rxkad
				        * will provide a randomer value. 
					*/
    rxi_dataQuota += rx_extraQuota;    /*
					* + extra packets caller asked to
				        * reserve
					*/
    rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;	/* fudge */
    rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
    rx_connHashTable = (struct rx_connection **) htable;
    rx_peerHashTable = (struct rx_peer **) ptable;

    rx_lastAckDelay.sec = 0;
    rx_lastAckDelay.usec = 500000;

    clock_Init();
    rxevent_Init(20, rxi_ReScheduleEvents);

    /* Malloc up a bunch of packet buffers */
    rx_nFreePackets = 0;
    queue_Init(&rx_freePacketQueue);
    rx_nFreeCbufs = 0;
    rxi_NeedMoreCbufs = FALSE;
    queue_Init(&rx_freeCbufQueue);
    USERPRI;
    rxi_MorePackets(rx_nPackets);
    rx_CheckCbufs(0);
    NETPRI;

    /* Initialize various global queues */
    queue_Init(&rx_idleServerQueue);
    queue_Init(&rx_incomingCallQueue);
    queue_Init(&rx_freeCallQueue);

    /*
     * Start listener process (exact function is dependent on the
     * implementation environment--kernel or user space)
     */
    rxi_StartListener();

    USERPRI;
    rxinit_status = 0;
    return rxinit_status;
}

/*
 * called with unincremented nRequestsRunning to see if it is OK to start
 * a new thread in this service.  Could be "no" for two reasons: over the
 * max quota, or would prevent others from reaching their min quota.
 */
static int
QuotaOK(struct rx_service * aservice)
{
    /* check if over max quota */
    if (aservice->nRequestsRunning >= aservice->maxProcs)
	return 0;

    /* under min quota, we're OK */
    if (aservice->nRequestsRunning < aservice->minProcs)
	return 1;

    /*
     * otherwise, can use only if there are enough to allow everyone to go to
     * their min quota after this guy starts.
     */
    if (rxi_availProcs > rxi_minDeficit)
	return 1;
    else
	return 0;
}


/*
 * This routine must be called if any services are exported.  If the
 * donateMe flag is set, the calling process is donated to the server
 * process pool
 */
void
rx_StartServer(int donateMe)
{
    struct rx_service *service;
    int i;

    SPLVAR;
    clock_NewTime();

    NETPRI;
    /*
     * Start server processes, if necessary (exact function is dependent
     * on the implementation environment--kernel or user space).  DonateMe
     * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
     * case, one less new proc will be created rx_StartServerProcs.
     */
    rxi_StartServerProcs(donateMe);

    /*
     * count up the # of threads in minProcs, and add set the min deficit to
     * be that value, too.
     */
    for (i = 0; i < RX_MAX_SERVICES; i++) {
	service = rx_services[i];
	if (service == (struct rx_service *) 0)
	    break;
	rxi_totalMin += service->minProcs;
	/*
	 * below works even if a thread is running, since minDeficit would
	 * still have been decremented and later re-incremented.
	 */
	rxi_minDeficit += service->minProcs;
    }

    /* Turn on reaping of idle server connections */
    rxi_ReapConnections();

    if (donateMe)
	rx_ServerProc();	       /* Never returns */
    USERPRI;
    return;
}

/*
 * Create a new client connection to the specified service, using the
 * specified security object to implement the security model for this
 * connection.
 */
struct rx_connection *
rx_NewConnection(u_long shost, u_short sport, u_short sservice,
		 struct rx_securityClass * securityObject,
		 int serviceSecurityIndex)
{
    int hashindex;
    long cid;
    struct rx_connection *conn;

    SPLVAR;
#if defined(AFS_SGIMP_ENV)
    GLOCKSTATE ms;

#endif

    clock_NewTime();
    dpf(("rx_NewConnection(host %x, port %u, service %u, "
	 "securityObject %x, serviceSecurityIndex %d)\n",
	 shost, sport, sservice, securityObject, serviceSecurityIndex));
    GLOBAL_LOCK();
#if defined(AFS_SGIMP_ENV)
    AFS_GRELEASE(&ms);
    /* NETPRI protects Cid and Alloc */
    NETPRI;
#endif
    cid = (rx_nextCid += RX_MAXCALLS);
    conn = rxi_AllocConnection();
#if !defined(AFS_SGIMP_ENV)
    NETPRI;
#endif
    conn->type = RX_CLIENT_CONNECTION;
    conn->cid = cid;
    conn->epoch = rx_epoch;
    conn->peer = rxi_FindPeer(shost, sport);
    conn->serviceId = sservice;
    conn->securityObject = securityObject;
    conn->securityData = (void *) 0;
    conn->securityIndex = serviceSecurityIndex;
    conn->maxPacketSize = MIN(conn->peer->packetSize, OLD_MAX_PACKET_SIZE);
    rx_SetConnDeadTime(conn, rx_connDeadTime);

    RXS_NewConnection(securityObject, conn);
    hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch,
			  RX_CLIENT_CONNECTION);
    conn->next = rx_connHashTable[hashindex];
    rx_connHashTable[hashindex] = conn;
    rx_stats.nClientConns++;

    conn->refCount++;
    USERPRI;
#if defined(AFS_SGIMP_ENV)
    AFS_GACQUIRE(&ms);
#endif
    GLOBAL_UNLOCK();
    return conn;
}

void
rxi_SetConnDeadTime(struct rx_connection * conn, int seconds)
{
    /*
     * This is completely silly; perhaps exponential back off
     * would be more reasonable? XXXX
     */
    conn->secondsUntilDead = seconds;
    conn->secondsUntilPing = seconds / 6;
    if (conn->secondsUntilPing == 0)
	conn->secondsUntilPing = 1;    /* XXXX */
}

/* Destroy the specified connection */
void
rx_DestroyConnection(struct rx_connection * conn)
{
    struct rx_connection **conn_ptr;
    int i;
    int havecalls = 0;

    SPLVAR;

    clock_NewTime();

    NETPRI;
    if (--conn->refCount > 0) {
	/* Busy; wait till the last guy before proceeding */
	USERPRI;
	return;
    }
    /*
     * If the client previously called rx_NewCall, but it is still
     * waiting, treat this as a running call, and wait to destroy the
     * connection later when the call completes.
     */
    if ((conn->type == RX_CLIENT_CONNECTION) &&
	(conn->flags & RX_CONN_MAKECALL_WAITING)) {
	conn->flags |= RX_CONN_DESTROY_ME;
	USERPRI;
	return;
    }
    /* Check for extant references to this connection */
    for (i = 0; i < RX_MAXCALLS; i++) {
	struct rx_call *call = conn->call[i];

	if (call) {
	    havecalls = 1;
	    if (conn->type == RX_CLIENT_CONNECTION) {
		if (call->delayedAckEvent) {
		    /*
		     * Push the final acknowledgment out now--there
                     * won't be a subsequent call to acknowledge the
                     * last reply packets
		     */
		    rxevent_Cancel(call->delayedAckEvent);
		    rxi_AckAll((struct rxevent *) 0, call, 0);
		}
	    }
	}
    }

    if (havecalls) {
	/*
	 * Don't destroy the connection if there are any call
         * structures still in use
	 */
	conn->flags |= RX_CONN_DESTROY_ME;
	USERPRI;
	return;
    }
    /* Remove from connection hash table before proceeding */
    conn_ptr = &rx_connHashTable[CONN_HASH(peer->host, peer->port, conn->cid,
					   conn->epoch, conn->type)];
    for (; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
	if (*conn_ptr == conn) {
	    *conn_ptr = conn->next;
	    break;
	}
    }

    /*
     * Notify the service exporter, if requested, that this connection
     * is being destroyed
     */
    if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
	(*conn->service->destroyConnProc) (conn);

    /* Notify the security module that this connection is being destroyed */
    RXS_DestroyConnection(conn->securityObject, conn);

    /* Make sure that the connection is completely reset before deleting it. */
    rxi_ResetConnection(conn);

    if (--conn->peer->refCount == 0)
	conn->peer->idleWhen = clock_Sec();

    if (conn->type == RX_SERVER_CONNECTION)
	rx_stats.nServerConns--;
    else
	rx_stats.nClientConns--;


    MUTEX_DESTROY(&conn->lock);
    rxi_FreeConnection(conn);
    USERPRI;
}

/*
 * Start a new rx remote procedure call, on the specified connection.
 * If wait is set to 1, wait for a free call channel; otherwise return
 * 0.  Maxtime gives the maximum number of seconds this call may take,
 * after rx_MakeCall returns.  After this time interval, a call to any
 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
 */
struct rx_call *
rx_NewCall(struct rx_connection * conn)
{
    int i;
    struct rx_call *call;

    SPLVAR;
#if defined(AFS_SGIMP_ENV)
    GLOCKSTATE ms;

#endif
    clock_NewTime();
    dpf(("rx_MakeCall(conn %x)\n", conn));

    GLOBAL_LOCK();
#if defined(AFS_SGIMP_ENV)
    AFS_GRELEASE(&ms);
#endif
    NETPRI;
    for (;;) {
	for (i = 0; i < RX_MAXCALLS; i++) {
	    call = conn->call[i];
	    if (call) {
		if (call->state == RX_STATE_DALLY) {
		    MUTEX_ENTER(&call->lock);
		    rxi_ResetCall(call);
		    (*call->callNumber)++;
		    break;
		}
	    } else {
		call = rxi_NewCall(conn, i);
		MUTEX_ENTER(&call->lock);
		break;
	    }
	}
	if (i < RX_MAXCALLS) {
	    break;
	}
	conn->flags |= RX_CONN_MAKECALL_WAITING;
#ifdef	RX_ENABLE_LOCKS
	cv_wait(&conn->cv, &conn->lock);
#else
	osi_rxSleep(conn);
#endif
    }

    /* Client is initially in send mode */
    call->state = RX_STATE_ACTIVE;
    call->mode = RX_MODE_SENDING;

    /* remember start time for call in case we have hard dead time limit */
    call->startTime = clock_Sec();

    /* Turn on busy protocol. */
    rxi_KeepAliveOn(call);

    MUTEX_EXIT(&call->lock);
    USERPRI;
#if defined(AFS_SGIMP_ENV)
    AFS_GACQUIRE(&ms);
#endif
    GLOBAL_UNLOCK();
    return call;
}

int
rxi_HasActiveCalls(struct rx_connection * aconn)
{
    int i;
    struct rx_call *tcall;

    SPLVAR;

    NETPRI;
    for (i = 0; i < RX_MAXCALLS; i++) {
	if ((tcall = aconn->call[i]) != NULL) {
	    if ((tcall->state == RX_STATE_ACTIVE)
		|| (tcall->state == RX_STATE_PRECALL)) {
		USERPRI;
		return 1;
	    }
	}
    }
    USERPRI;
    return 0;
}

int
rxi_GetCallNumberVector(const struct rx_connection * aconn,
			int32_t *alongs)
{
    int i;
    struct rx_call *tcall;

    SPLVAR;

    NETPRI;
    for (i = 0; i < RX_MAXCALLS; i++) {
	if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
	    alongs[i] = aconn->callNumber[i] + 1;
	else
	    alongs[i] = aconn->callNumber[i];
    }
    USERPRI;
    return 0;
}

int
rxi_SetCallNumberVector(struct rx_connection * aconn,
			int32_t *alongs)
{
    int i;
    struct rx_call *tcall;

    SPLVAR;

    NETPRI;
    for (i = 0; i < RX_MAXCALLS; i++) {
	if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
	    aconn->callNumber[i] = alongs[i] - 1;
	else
	    aconn->callNumber[i] = alongs[i];
    }
    USERPRI;
    return 0;
}

/*
 * Advertise a new service.  A service is named locally by a UDP port
 * number plus a 16-bit service id.  Returns (struct rx_service *) 0
 * on a failure.
 */
struct rx_service *
rx_NewService(u_short port, u_short serviceId, char *serviceName,
	      struct rx_securityClass ** securityObjects,
	      int nSecurityObjects, long (*serviceProc) ())
{
    osi_socket socket = OSI_NULLSOCKET;
    struct rx_service *tservice;
    int i;

    SPLVAR;

    clock_NewTime();

    if (serviceId == 0) {
	(osi_Msg "rx_NewService:  service id for service %s is not"
	 " non-zero.\n",
	 serviceName);
	return 0;
    }
    if (port == 0) {
	if (rx_port == 0) {
	    (osi_Msg "rx_NewService: A non-zero port must be specified on "
	     "this call if a non-zero port was not provided at Rx "
	     "initialization (service %s).\n", serviceName);
	    return 0;
	}
	port = rx_port;
	socket = rx_socket;
    }
    tservice = rxi_AllocService();
    NETPRI;
    for (i = 0; i < RX_MAX_SERVICES; i++) {
	struct rx_service *service = rx_services[i];

	if (service) {
	    if (port == service->servicePort) {
		if (service->serviceId == serviceId) {
		    /*
		     * The identical service has already been
                     * installed; if the caller was intending to
                     * change the security classes used by this
                     * service, he/she loses.
		     */
		    (osi_Msg "rx_NewService: tried to install service %s "
		     "with service id %d, which is already in use "
		     "for service %s\n", serviceName, serviceId,
		     service->serviceName);
		    USERPRI;
		    rxi_FreeService(tservice);
		    return service;
		}

		/*
		 * Different service, same port: re-use the socket which is
		 * bound to the same port
		 */
		socket = service->socket;
	    }
	} else {
	    if (socket == OSI_NULLSOCKET) {

		/*
		 * If we don't already have a socket (from another service on
		 * same port) get a new one
		 */
		socket = rxi_GetUDPSocket(port);
		if (socket == OSI_NULLSOCKET) {
		    USERPRI;
		    rxi_FreeService(tservice);
		    return 0;
		}
	    }
	    service = tservice;
	    service->socket = socket;
	    service->servicePort = port;
	    service->serviceId = serviceId;
	    service->serviceName = serviceName;
	    service->nSecurityObjects = nSecurityObjects;
	    service->securityObjects = securityObjects;
	    service->minProcs = 0;
	    service->maxProcs = 1;
	    service->idleDeadTime = 60;
	    service->connDeadTime = rx_connDeadTime;
	    service->executeRequestProc = serviceProc;
	    rx_services[i] = service;  /* not visible until now */
	    USERPRI;
	    return service;
	}
    }
    USERPRI;
    rxi_FreeService(tservice);
    (osi_Msg "rx_NewService: cannot support > %d services\n", RX_MAX_SERVICES);
    return 0;
}

/*
 * This is the server process's request loop.  This routine should
 * either be each server proc's entry point, or it should be called by
 * the server process (depending upon the rx implementation
 * environment).
 */
void
rx_ServerProc(void)
{
    struct rx_call *call;
    long code;
    struct rx_service *tservice;

#if defined(AFS_SGIMP_ENV)
    SPLVAR;
    GLOCKSTATE ms;

    AFS_GRELEASE(&ms);
    NETPRI;
#endif

    rxi_dataQuota += rx_Window + 2;    /*
				        * reserve a window of packets for
				        * hard times
				        */
    rxi_MorePackets(rx_Window + 2);    /* alloc more packets, too */
    rxi_availProcs++;		       /*
				        * one more thread handling incoming
				        * calls
				        */
#if defined(AFS_SGIMP_ENV)
    USERPRI;
#endif
    for (;;) {

	call = rx_GetCall();
#ifdef	KERNEL
	if (afs_termState == AFSOP_STOP_RXCALLBACK) {
	    MUTEX_ENTER(&afs_termStateLock);
	    afs_termState = AFSOP_STOP_AFS;
#ifdef	RX_ENABLE_LOCKS
	    cv_signal(&afs_termStateCv);
#else
	    osi_rxWakeup(&afs_termState);
#endif
	    MUTEX_EXIT(&afs_termStateLock);
	    return;
	}
#endif
	tservice = call->conn->service;
#if defined(AFS_SGIMP_ENV)
	AFS_GLOCK();
#endif
	if (tservice->beforeProc)
	    (*tservice->beforeProc) (call);

	code = call->conn->service->executeRequestProc(call);

	if (tservice->afterProc)
	    (*tservice->afterProc) (call, code);

	rx_EndCall(call, code);
	rxi_nCalls++;
#if defined(AFS_SGIMP_ENV)
	AFS_GUNLOCK();
#endif
    }
}


/*
 * Sleep until a call arrives.  Returns a pointer to the call, ready
 * for an rx_Read.
 */
struct rx_call *
rx_GetCall(void)
{
    struct rx_serverQueueEntry *sq;
    struct rx_call *call = (struct rx_call *) 0;
    struct rx_service *service = NULL;

    SPLVAR;

    GLOBAL_LOCK();
    MUTEX_ENTER(&freeSQEList_lock);

    if ((sq = rx_FreeSQEList) != NULL) {
	rx_FreeSQEList = *(struct rx_serverQueueEntry **) sq;
	MUTEX_EXIT(&freeSQEList_lock);
    } else {			       /* otherwise allocate a new one and
				        * return that */
	MUTEX_EXIT(&freeSQEList_lock);
	sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
#ifdef	RX_ENABLE_LOCKS
	LOCK_INIT(&sq->lock, "server Queue lock");
#endif
    }
    MUTEX_ENTER(&sq->lock);
#if defined(AFS_SGIMP_ENV)
    ASSERT(!isafs_glock());
#endif
    NETPRI;

    if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
	struct rx_call *tcall, *ncall;

	/*
	 * Scan for eligible incoming calls.  A call is not eligible
         * if the maximum number of calls for its service type are
         * already executing
	 */
	for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
	    service = tcall->conn->service;
	    if (QuotaOK(service)) {
		call = tcall;
		break;
	    }
	}
    }
    if (call) {
	queue_Remove(call);
	call->flags &= (~RX_CALL_WAIT_PROC);
	service->nRequestsRunning++;
	/*
	 * just started call in minProcs pool, need fewer to maintain
         * guarantee
	 */
	if (service->nRequestsRunning <= service->minProcs)
	    rxi_minDeficit--;
	rxi_availProcs--;
	rx_nWaiting--;
    } else {
	/*
	 * If there are no eligible incoming calls, add this process
         * to the idle server queue, to wait for one
	 */
	sq->newcall = 0;
	queue_Append(&rx_idleServerQueue, sq);
	rx_waitForPacket = sq;
	do {
#ifdef	RX_ENABLE_LOCKS
	    cv_wait(&sq->cv, &sq->lock);
#else
	    osi_rxSleep(sq);
#endif
#ifdef	KERNEL
	    if (afs_termState == AFSOP_STOP_RXCALLBACK) {
		GLOBAL_UNLOCK();
		USERPRI;
		return (struct rx_call *) 0;
	    }
#endif
	} while (!(call = sq->newcall));
    }
    MUTEX_EXIT(&sq->lock);

    MUTEX_ENTER(&freeSQEList_lock);
    *(struct rx_serverQueueEntry **) sq = rx_FreeSQEList;
    rx_FreeSQEList = sq;
    MUTEX_EXIT(&freeSQEList_lock);

    call->state = RX_STATE_ACTIVE;
    call->mode = RX_MODE_RECEIVING;
    if (queue_IsEmpty(&call->rq)) {
	/* we can't schedule a call if there's no data!!! */
	rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY);
    }
    rxi_calltrace(RX_CALL_START, call);
    dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
	 call->conn->service->servicePort,
	 call->conn->service->serviceId, call));

    GLOBAL_UNLOCK();
    USERPRI;

    return call;
}



/*
 * Establish a procedure to be called when a packet arrives for a
 * call.  This routine will be called at most once after each call,
 * and will also be called if there is an error condition on the or
 * the call is complete.  Used by multi rx to build a selection
 * function which determines which of several calls is likely to be a
 * good one to read from.
 * NOTE: the way this is currently implemented it is probably only a
 * good idea to (1) use it immediately after a newcall (clients only)
 * and (2) only use it once.  Other uses currently void your warranty
 */
void
rx_SetArrivalProc(struct rx_call * call, void (*proc) (),
		   void *handle, void *arg)
{
    call->arrivalProc = proc;
    call->arrivalProcHandle = handle;
    call->arrivalProcArg = arg;
}

/*
 * Call is finished (possibly prematurely).  Return rc to the peer, if
 * appropriate, and return the final error code from the conversation
 * to the caller
 */
long
rx_EndCall(struct rx_call * call, long rc)
{
    struct rx_connection *conn = call->conn;
    struct rx_service *service;
    long error;

    SPLVAR;
#if defined(AFS_SGIMP_ENV)
    GLOCKSTATE ms;

#endif

    dpf(("rx_EndCall(call %x)\n", call));

#if defined(AFS_SGIMP_ENV)
    AFS_GRELEASE(&ms);
#endif
    NETPRI;
    GLOBAL_LOCK();
    MUTEX_ENTER(&call->lock);

    call->arrivalProc = (void (*) ()) 0;
    if (rc && call->error == 0) {
	rxi_CallError(call, rc);
	/*
	 * Send an abort message to the peer if this error code has
         * only just been set.  If it was set previously, assume the
         * peer has already been sent the error code or will request it
         */
	rxi_SendCallAbort(call, (struct rx_packet *) 0);
    }
    if (conn->type == RX_SERVER_CONNECTION) {
	/* Make sure reply or at least dummy reply is sent */
	if (call->mode == RX_MODE_RECEIVING) {
	    MUTEX_EXIT(&call->lock);
	    GLOBAL_UNLOCK();

	    rx_WriteProc(call, 0, 0);

	    GLOBAL_LOCK();
	    MUTEX_ENTER(&call->lock);
	}
	if (call->mode == RX_MODE_SENDING) {
	    rx_FlushWrite(call);
	}
	service = conn->service;
	service->nRequestsRunning--;
	if (service->nRequestsRunning < service->minProcs)
	    rxi_minDeficit++;
	rxi_availProcs++;
	rxi_calltrace(RX_CALL_END, call);
    } else {			       /* Client connection */
	char dummy;

	/*
	 * Make sure server receives input packets, in the case where
         * no reply arguments are expected
	 */
	if (call->mode == RX_MODE_SENDING) {
	    MUTEX_EXIT(&call->lock);
	    GLOBAL_UNLOCK();

	    (void) rx_ReadProc(call, &dummy, 1);

	    GLOBAL_LOCK();
	    MUTEX_ENTER(&call->lock);
	}
	if (conn->flags & RX_CONN_MAKECALL_WAITING) {
	    conn->flags &= (~RX_CONN_MAKECALL_WAITING);
#ifdef	RX_ENABLE_LOCKS
	    cv_signal(&conn->cv);
#else
	    osi_rxWakeup(conn);
#endif
	}
    }
    call->state = RX_STATE_DALLY;
    error = call->error;

    /*
     * currentPacket, nLeft, and NFree must be zeroed here, because
     * ResetCall cannot: ResetCall may be called at splnet(), in the
     * kernel version, and may interrupt the macros rx_Read or
     * rx_Write, which run at normal priority for efficiency.
     */
    if (call->currentPacket) {
	rxi_FreePacket(call->currentPacket);
	call->currentPacket = (struct rx_packet *) 0;
	call->nLeft = call->nFree = 0;
    } else
	call->nLeft = call->nFree = 0;


    MUTEX_EXIT(&call->lock);
    GLOBAL_UNLOCK();
    USERPRI;
#if defined(AFS_SGIMP_ENV)
    AFS_GACQUIRE(&ms);
#endif
    /*
     * Map errors to the local host's errno.h format.
     */
    error = ntoh_syserr_conv(error);
    return error;
}

/*
 * Call this routine when shutting down a server or client (especially
 * clients).  This will allow Rx to gracefully garbage collect server
 * connections, and reduce the number of retries that a server might
 * make to a dead client
 */
void
rx_Finalize(void)
{
    struct rx_connection **conn_ptr, **conn_end;

    SPLVAR;
    NETPRI;
    if (rx_connHashTable)
	for (conn_ptr = &rx_connHashTable[0],
	     conn_end = &rx_connHashTable[rx_hashTableSize];
	     conn_ptr < conn_end; conn_ptr++) {
	    struct rx_connection *conn, *next;

	    for (conn = *conn_ptr; conn; conn = next) {
		next = conn->next;
		if (conn->type == RX_CLIENT_CONNECTION)
		    rx_DestroyConnection(conn);
	    }
	}

    rxi_flushtrace();
    USERPRI;
}

/*
 * if we wakeup packet waiter too often, can get in loop with two
 * AllocSendPackets each waking each other up (from ReclaimPacket calls)
 */
void
rxi_PacketsUnWait(void)
{

    MUTEX_ENTER(&rx_waitingForPackets_lock);
    if (!rx_waitingForPackets) {
	MUTEX_EXIT(&rx_waitingForPackets_lock);
	return;
    }
    if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
	MUTEX_EXIT(&rx_waitingForPackets_lock);
	return;			       /* still over quota */
    }
    rx_waitingForPackets = 0;
#ifdef	RX_ENABLE_LOCKS
    cv_signal(&rx_waitingForPackets_cv);
#else
    osi_rxWakeup(&rx_waitingForPackets);
#endif
    MUTEX_EXIT(&rx_waitingForPackets_lock);
    return;
}


/* ------------------Internal interfaces------------------------- */

/*
 * Return this process's service structure for the
 * specified socket and service
 */
struct rx_service *
rxi_FindService(osi_socket socket, u_short serviceId)
{
    struct rx_service **sp;

    for (sp = &rx_services[0]; *sp; sp++) {
	if ((*sp)->serviceId == serviceId && (*sp)->socket == socket)
	    return *sp;
    }
    return 0;
}

/*
 * Allocate a call structure, for the indicated channel of the
 * supplied connection.  The mode and state of the call must be set by
 * the caller.
 */
struct rx_call *
rxi_NewCall(struct rx_connection * conn, int channel)
{
    struct rx_call *call;

    /*
     * Grab an existing call structure, or allocate a new one.
     * Existing call structures are assumed to have been left reset by
     * rxi_FreeCall
     */
    MUTEX_ENTER(&rx_freeCallQueue_lock);

    if (queue_IsNotEmpty(&rx_freeCallQueue)) {
	call = queue_First(&rx_freeCallQueue, rx_call);
	queue_Remove(call);
	rx_stats.nFreeCallStructs--;
	MUTEX_EXIT(&rx_freeCallQueue_lock);
	MUTEX_ENTER(&call->lock);

#ifdef	ADAPT_PERF
	/* Bind the call to its connection structure */
	call->conn = conn;
#endif
    } else {
	call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));

	MUTEX_EXIT(&rx_freeCallQueue_lock);
	MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
	MUTEX_INIT(&call->lockw, "call wlock", MUTEX_DEFAULT, NULL);
	MUTEX_ENTER(&call->lock);

	rx_stats.nCallStructs++;
	/* Initialize once-only items */
	queue_Init(&call->tq);
	queue_Init(&call->rq);
#ifdef	ADAPT_PERF
	/* Bind the call to its connection structure (prereq for reset) */
	call->conn = conn;
#endif
	rxi_ResetCall(call);
    }
#ifndef	ADAPT_PERF
    /* Bind the call to its connection structure (prereq for reset) */
    call->conn = conn;
#endif
    call->channel = channel;
    call->callNumber = &conn->callNumber[channel];
    /*
     * Note that the next expected call number is retained (in
     * conn->callNumber[i]), even if we reallocate the call structure
     */
    conn->call[channel] = call;
    /*
     * if the channel's never been used (== 0), we should start at 1, otherwise
     * the call number is valid from the last time this channel was used
     */
    if (*call->callNumber == 0)
	*call->callNumber = 1;

    MUTEX_EXIT(&call->lock);
    return call;
}

/*
 * A call has been inactive long enough that so we can throw away
 * state, including the call structure, which is placed on the call
 * free list.
 */
void
rxi_FreeCall(struct rx_call * call)
{
    int channel = call->channel;
    struct rx_connection *conn = call->conn;


    MUTEX_ENTER(&call->lock);

    if (call->state == RX_STATE_DALLY)
	(*call->callNumber)++;
    rxi_ResetCall(call);
    call->conn->call[channel] = (struct rx_call *) 0;

    MUTEX_ENTER(&rx_freeCallQueue_lock);

    queue_Append(&rx_freeCallQueue, call);
    rx_stats.nFreeCallStructs++;

    MUTEX_EXIT(&rx_freeCallQueue_lock);
    MUTEX_EXIT(&call->lock);

    /*
     * Destroy the connection if it was previously slated for
     * destruction, i.e. the Rx client code previously called
     * rx_DestroyConnection (client connections), or
     * rxi_ReapConnections called the same routine (server
     * connections).  Only do this, however, if there are no
     * outstanding calls
     */
    if (conn->flags & RX_CONN_DESTROY_ME)
	rx_DestroyConnection(conn);
}


long rxi_Alloccnt = 0, rxi_Allocsize = 0;
char *
rxi_Alloc(int size)
{
    char *p;

    rxi_Alloccnt++;
    rxi_Allocsize += size;
#if	(defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && defined(KERNEL)
    p = (char *) osi_AllocSmall(size, 1);
#else
    p = (char *) osi_Alloc(size);
#endif
    if (!p)
	osi_Panic("rxi_Alloc error");
    memset(p, 0, size);
    return p;
}

void
rxi_Free(void *addr, int size)
{
    rxi_Alloccnt--;
    rxi_Allocsize -= size;
#if	(defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && defined(KERNEL)
    osi_FreeSmall(addr);
#else
    osi_Free(addr, size);
#endif
}

/*
 * Find the peer process represented by the supplied (host,port)
 * combination.  If there is no appropriate active peer structure, a
 * new one will be allocated and initialized
 */
struct rx_peer *
rxi_FindPeer(u_long host, u_short port)
{
    struct rx_peer *pp;
    int hashIndex;

    hashIndex = PEER_HASH(host, port);
    for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
	if ((pp->host == host) && (pp->port == port))
	    break;
    }
    if (!pp) {
	pp = rxi_AllocPeer();	       /*
					* This bzero's *pp: anything not
				        * explicitly
					*/
	pp->host = host;	       /*
					* set here or in InitPeerParams is
				        * zero
					*/
	pp->port = port;
	queue_Init(&pp->congestionQueue);
	pp->next = rx_peerHashTable[hashIndex];
	rx_peerHashTable[hashIndex] = pp;
	rxi_InitPeerParams(pp);
	rx_stats.nPeerStructs++;
    }
    pp->refCount++;
    return pp;
}

/*
 * Destroy the specified peer structure, removing it from the peer hash
 * table
 */
void
rxi_DestroyPeer(struct rx_peer * peer)
{
    struct rx_peer **peer_ptr;

    for (peer_ptr = &rx_peerHashTable[PEER_HASH(peer->host, peer->port)];
	 *peer_ptr; peer_ptr = &(*peer_ptr)->next) {
	if (*peer_ptr == peer) {
	    *peer_ptr = peer->next;
	    rxi_FreePeer(peer);
	    rx_stats.nPeerStructs--;
	    return;
	}
    }
}

/*
 * Find the connection at (host, port) started at epoch, and with the
 * given connection id.  Creates the server connection if necessary.
 * The type specifies whether a client connection or a server
 * connection is desired.  In both cases, (host, port) specify the
 * peer's (host, pair) pair.  Client connections are not made
 * automatically by this routine.  The parameter socket gives the
 * socket descriptor on which the packet was received.  This is used,
 * in the case of server connections, to check that *new* connections
 * come via a valid (port, serviceId).  Finally, the securityIndex
 * parameter must match the existing index for the connection.  If a
 * server connection is created, it will be created using the supplied
 * index, if the index is valid for this service
 */
struct rx_connection *
rxi_FindConnection(osi_socket socket, long host,
		   u_short port, u_short serviceId, u_long cid,
		   u_long epoch, int type, u_int securityIndex)
{
    int hashindex;
    struct rx_connection *conn;

    hashindex = CONN_HASH(host, port, cid, epoch, type);
    for (conn = rx_connHashTable[hashindex]; conn; conn = conn->next) {
	if ((conn->type == type) && ((cid & RX_CIDMASK) == conn->cid) &&
	    (epoch == conn->epoch) &&
	    (securityIndex == conn->securityIndex)) {
	    struct rx_peer *pp = conn->peer;

	    /*
	     * epoch's high order bits mean route for security reasons only on
	     * the cid, not the host and port fields.
	     */
	    if (conn->epoch & 0x80000000)
		break;
	    if (((type == RX_CLIENT_CONNECTION)
		 || (pp->host == host)) && (pp->port == port))
		break;
	}
    }
    if (!conn) {
	struct rx_service *service;

	if (type == RX_CLIENT_CONNECTION)
	    return (struct rx_connection *) 0;
	service = rxi_FindService(socket, serviceId);
	if (!service || (securityIndex >= service->nSecurityObjects)
	    || (service->securityObjects[securityIndex] == 0))
	    return (struct rx_connection *) 0;
	conn = rxi_AllocConnection();  /* This bzero's the connection */
#ifdef	RX_ENABLE_LOCKS
	LOCK_INIT(&conn->lock, "conn lock");
#endif
	conn->next = rx_connHashTable[hashindex];
	rx_connHashTable[hashindex] = conn;
	conn->peer = rxi_FindPeer(host, port);
	conn->maxPacketSize = MIN(conn->peer->packetSize, OLD_MAX_PACKET_SIZE);
	conn->type = RX_SERVER_CONNECTION;
	conn->lastSendTime = clock_Sec();	/* don't GC immediately */
	conn->epoch = epoch;
	conn->cid = cid & RX_CIDMASK;
	/* conn->serial = conn->lastSerial = 0; */
	/* conn->rock = 0; */
	/* conn->timeout = 0; */
	conn->service = service;
	conn->serviceId = serviceId;
	conn->securityIndex = securityIndex;
	conn->securityObject = service->securityObjects[securityIndex];
	rx_SetConnDeadTime(conn, service->connDeadTime);
	/* Notify security object of the new connection */
	RXS_NewConnection(conn->securityObject, conn);
	/* XXXX Connection timeout? */
	if (service->newConnProc)
	    (*service->newConnProc) (conn);
	rx_stats.nServerConns++;
    }
    conn->refCount++;
    return conn;
}

/*
 * There are two packet tracing routines available for testing and monitoring
 * Rx.  One is called just after every packet is received and the other is
 * called just before every packet is sent.  Received packets, have had their
 * headers decoded, and packets to be sent have not yet had their headers
 * encoded.  Both take two parameters: a pointer to the packet and a sockaddr
 * containing the network address.  Both can be modified.  The return value, if
 * non-zero, indicates that the packet should be dropped.
 */

int (*rx_justReceived)() = NULL;
int (*rx_almostSent)() = NULL;

/*
 * A packet has been received off the interface.  Np is the packet, socket is
 * the socket number it was received from (useful in determining which service
 * this packet corresponds to), and (host, port) reflect the host,port of the
 * sender.  This call returns the packet to the caller if it is finished with
 * it, rather than de-allocating it, just as a small performance hack
 */

struct rx_packet *
rxi_ReceivePacket(struct rx_packet * np, osi_socket socket,
		  u_long host, u_short port)
{
    struct rx_call *call;
    struct rx_connection *conn;
    int channel;
    unsigned long currentCallNumber;
    int type;
    int skew;

#ifdef RXDEBUG
    char *packetType;

#endif
    struct rx_packet *tnp;

#ifdef RXDEBUG
/*
 * We don't print out the packet until now because (1) the time may not be
 * accurate enough until now in the lwp implementation (rx_Listener only gets
 * the time after the packet is read) and (2) from a protocol point of view,
 * this is the first time the packet has been seen
 */
    packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
	? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
    dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
	 np->header.serial, packetType, host, port, np->header.serviceId,
	 np->header.epoch, np->header.cid, np->header.callNumber,
	 np->header.seq, np->header.flags, np));
#endif

    if (np->header.type == RX_PACKET_TYPE_VERSION)
	return rxi_ReceiveVersionPacket(np, socket, host, port);

    if (np->header.type == RX_PACKET_TYPE_DEBUG)
	return rxi_ReceiveDebugPacket(np, socket, host, port);

#ifdef RXDEBUG

    /*
     * If an input tracer function is defined, call it with the packet and
     * network address.  Note this function may modify its arguments.
     */
    if (rx_justReceived) {
	struct sockaddr_in addr;
	int drop;

	addr.sin_family = AF_INET;
	addr.sin_port = port;
	addr.sin_addr.s_addr = host;
	drop = (*rx_justReceived) (np, &addr);
	/* drop packet if return value is non-zero */
	if (drop)
	    return np;
	port = addr.sin_port;	       /* in case fcn changed addr */
	host = addr.sin_addr.s_addr;
    }
#endif

    /* If packet was not sent by the client, then *we* must be the client */
    type = ((np->header.flags & RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
	? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;

    /*
     * Find the connection (or fabricate one, if we're the server & if
     * necessary) associated with this packet
     */
    conn = rxi_FindConnection(socket, host, port, np->header.serviceId,
			      np->header.cid, np->header.epoch, type,
			      np->header.securityIndex);

    if (!conn) {
	/*
         * If no connection found or fabricated, just ignore the packet.
         * (An argument could be made for sending an abort packet for
         * the conn)
         */
	return np;
    }
    /* compute the max serial number seen on this connection */
    if (conn->maxSerial < np->header.serial)
	conn->maxSerial = np->header.serial;

    /*
     * If the connection is in an error state, send an abort packet and
     * ignore the incoming packet
     */
    if (conn->error) {
	/* Don't respond to an abort packet--we don't want loops! */
	if (np->header.type != RX_PACKET_TYPE_ABORT)
	    np = rxi_SendConnectionAbort(conn, np);
	conn->refCount--;
	return np;
    }
    /* Check for connection-only requests (i.e. not call specific). */
    if (np->header.callNumber == 0) {
	switch (np->header.type) {
	case RX_PACKET_TYPE_ABORT:
	    /* What if the supplied error is zero? */
	    rxi_ConnectionError(conn, ntohl(rx_GetLong(np, 0)));
	    conn->refCount--;
	    return np;
	case RX_PACKET_TYPE_CHALLENGE:
	    tnp = rxi_ReceiveChallengePacket(conn, np);
	    conn->refCount--;
	    return tnp;
	case RX_PACKET_TYPE_RESPONSE:
	    tnp = rxi_ReceiveResponsePacket(conn, np);
	    conn->refCount--;
	    return tnp;
	case RX_PACKET_TYPE_PARAMS:
	case RX_PACKET_TYPE_PARAMS + 1:
	case RX_PACKET_TYPE_PARAMS + 2:
	    /* ignore these packet types for now */
	    conn->refCount--;
	    return np;


	default:
	    /*
	     * Should not reach here, unless the peer is broken: send an
             * abort packet
	     */
	    rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
	    tnp = rxi_SendConnectionAbort(conn, np);
	    conn->refCount--;
	    return tnp;
	}
    }
    channel = np->header.cid & RX_CHANNELMASK;
    call = conn->call[channel];
#ifdef	RX_ENABLE_LOCKSX
    if (call)
	mutex_enter(&call->lock);
#endif
    currentCallNumber = conn->callNumber[channel];

    if (type == RX_SERVER_CONNECTION) {/* We're the server */
	if (np->header.callNumber < currentCallNumber) {
	    rx_stats.spuriousPacketsRead++;
#ifdef	RX_ENABLE_LOCKSX
	    if (call)
		mutex_exit(&call->lock);
#endif
	    conn->refCount--;
	    return np;
	}
	if (!call) {
	    call = rxi_NewCall(conn, channel);
#ifdef	RX_ENABLE_LOCKSX
	    mutex_enter(&call->lock);
#endif
	    *call->callNumber = np->header.callNumber;
	    call->state = RX_STATE_PRECALL;
	    rxi_KeepAliveOn(call);
	} else if (np->header.callNumber != currentCallNumber) {
	    /*
	     * If the new call cannot be taken right now send a busy and set
             * the error condition in this call, so that it terminates as
             * quickly as possible
	     */
	    if (call->state == RX_STATE_ACTIVE) {
		struct rx_packet *tp;

		rxi_CallError(call, RX_CALL_DEAD);
		tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0);
#ifdef	RX_ENABLE_LOCKSX
		mutex_exit(&call->lock);
#endif
		conn->refCount--;
		return tp;
	    }
	    /*
	     * If the new call can be taken right now (it's not busy) then
             * accept it.
	     */
	    rxi_ResetCall(call);
	    *call->callNumber = np->header.callNumber;
	    call->state = RX_STATE_PRECALL;
	    rxi_KeepAliveOn(call);
	} else {
	    /* Continuing call; do nothing here. */
	}
    } else {			       /* we're the client */

	/*
	 * Ignore anything that's not relevant to the current call.  If there
	 * isn't a current call, then no packet is relevant.
	 */
	if (!call || (np->header.callNumber != currentCallNumber)) {
	    rx_stats.spuriousPacketsRead++;
#ifdef	RX_ENABLE_LOCKSX
	    if (call)
		mutex_exit(&call->lock);
#endif
	    conn->refCount--;
	    return np;
	}
	/*
	 * If the service security object index stamped in the packet does not
         * match the connection's security index, ignore the packet
	 */
	if (np->header.securityIndex != conn->securityIndex) {
	    conn->refCount--;
#ifdef	RX_ENABLE_LOCKSX
	    mutex_exit(&call->lock);
#endif
	    return np;
	}
	/*
	 * If we're receiving the response, then all transmit packets are
         * implicitly acknowledged.  Get rid of them.
	 */
	if (np->header.type == RX_PACKET_TYPE_DATA) {
#ifdef	AFS_SUN5_ENV
	    /*
	     * XXX Hack. Because we can't release the global rx lock when
	     * sending packets (osi_NetSend) we drop all acks while we're
	     * traversing the tq in rxi_Start sending packets out because
	     * packets may move to the freePacketQueue as result of being
	     * here! So we drop these packets until we're safely out of the
	     * traversing. Really ugly!
	     */
	    if (call->flags & RX_CALL_TQ_BUSY) {
		rx_tq_dropped++;
		return np;	       /* xmitting; drop packet */
	    }
#endif
	    rxi_ClearTransmitQueue(call);
	} else {
	    if (np->header.type == RX_PACKET_TYPE_ACK) {
		/*
		 * now check to see if this is an ack packet acknowledging
		 * that the server actually *lost* some hard-acked data.  If
		 * this happens we ignore this packet, as it may indicate that
		 * the server restarted in the middle of a call.  It is also
		 * possible that this is an old ack packet.  We don't abort
		 * the connection in this case, because this *might* just be
		 * an old ack packet.  The right way to detect a server restart
		 * in the midst of a call is to notice that the server epoch
		 * changed, btw.
		 */
		/*
		 * LWSXXX I'm not sure this is exactly right, since tfirst
		 * LWSXXX **IS** unacknowledged.  I think that this is
		 * LWSXXX off-by-one, but I don't dare change it just yet,
		 * LWSXXX since it will interact badly with the
		 * LWSXXX server-restart detection code in receiveackpacket.
		 */
		if (ntohl(rx_GetLong(np, FIRSTACKOFFSET)) < call->tfirst) {
		    rx_stats.spuriousPacketsRead++;
#ifdef	RX_ENABLE_LOCKSX
		    mutex_exit(&call->lock);
#endif
		    conn->refCount--;
		    return np;
		}
	    }
	}			       /* else not a data packet */
    }

    /* Set remote user defined status from packet */
    call->remoteStatus = np->header.userStatus;

    /*
     * Note the gap between the expected next packet and the actual packet
     * that arrived, when the new packet has a smaller serial number than
     * expected.  Rioses frequently reorder packets all by themselves, so
     * this will be quite important with very large window sizes. Skew is
     * checked against 0 here to avoid any dependence on the type of
     * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is
     * always true! The inPacketSkew should be a smoothed running value, not
     * just a maximum.  MTUXXX see CalculateRoundTripTime for an example of
     * how to keep smoothed values. I think using a beta of 1/8 is probably
     * appropriate.  lws 93.04.21
     */
    skew = conn->lastSerial - np->header.serial;
    conn->lastSerial = np->header.serial;
    if (skew > 0) {
	struct rx_peer *peer;

	peer = conn->peer;
	if (skew > peer->inPacketSkew) {
	    dpf(("*** In skew changed from %d to %d\n",
		 peer->inPacketSkew, skew));
	    peer->inPacketSkew = skew;
	}
    }
    /* Now do packet type-specific processing */
    switch (np->header.type) {
    case RX_PACKET_TYPE_DATA:
	np = rxi_ReceiveDataPacket(call, np);
	break;
    case RX_PACKET_TYPE_ACK:
	/*
         * Respond immediately to ack packets requesting acknowledgement
         * (ping packets)
         */
	if (np->header.flags & RX_REQUEST_ACK) {
	    if (call->error)
		(void) rxi_SendCallAbort(call, 0);
	    else
		(void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE);
	}
	np = rxi_ReceiveAckPacket(call, np);
	break;
    case RX_PACKET_TYPE_ABORT:
	/*
         * An abort packet: reset the connection, passing the error up to
         * the user
         */
	/* What if error is zero? */
	rxi_CallError(call, ntohl(*(long *) rx_DataOf(np)));
	break;
    case RX_PACKET_TYPE_BUSY:
	/* XXXX */
	break;
    case RX_PACKET_TYPE_ACKALL:
	/*
         * All packets acknowledged, so we can drop all packets previously
         * readied for sending
         */
#ifdef	AFS_SUN5_ENV
	/*
         * XXX Hack. We because we can't release the global rx lock
         * when sending packets (osi_NetSend) we drop all ack pkts while
         * we're traversing the tq in rxi_Start sending packets out
         * because packets may move to the freePacketQueue as result of
         * being here! So we drop these packets until we're
         * safely out of the traversing. Really ugly!
         */
	if (call->flags & RX_CALL_TQ_BUSY) {
	    rx_tq_dropped++;
	    return np;		       /* xmitting; drop packet */
	}
#endif
	rxi_ClearTransmitQueue(call);
	break;
    default:
	/*
         * Should not reach here, unless the peer is broken: send an abort
         * packet
         */
	rxi_CallError(call, RX_PROTOCOL_ERROR);
	np = rxi_SendCallAbort(call, np);
	break;
    };
    /*
     * Note when this last legitimate packet was received, for keep-alive
     * processing.  Note, we delay getting the time until now in the hope that
     * the packet will be delivered to the user before any get time is required
     * (if not, then the time won't actually be re-evaluated here).
     */
    call->lastReceiveTime = clock_Sec();
#ifdef	RX_ENABLE_LOCKSX
    mutex_exit(&call->lock);
#endif
    conn->refCount--;
    return np;
}

/*
 * return true if this is an "interesting" connection from the point of view
 * of someone trying to debug the system
 */
int
rxi_IsConnInteresting(struct rx_connection * aconn)
{
    int i;
    struct rx_call *tcall;

    if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
	return 1;
    for (i = 0; i < RX_MAXCALLS; i++) {
	tcall = aconn->call[i];
	if (tcall) {
	    if ((tcall->state == RX_STATE_PRECALL) ||
		(tcall->state == RX_STATE_ACTIVE))
		return 1;
	    if ((tcall->mode == RX_MODE_SENDING) ||
		(tcall->mode == RX_MODE_RECEIVING))
		return 1;
	}
    }
    return 0;
}

/*
 * if this is one of the last few packets AND it wouldn't be used by the
 * receiving call to immediately satisfy a read request, then drop it on
 * the floor, since accepting it might prevent a lock-holding thread from
 * making progress in its reading
 */

static int
TooLow(struct rx_packet * ap, struct rx_call * acall)
{
    if ((rx_nFreePackets < rxi_dataQuota + 2) &&
	!((ap->header.seq == acall->rnext) &&
	  (acall->flags & RX_CALL_READER_WAIT))) {
	return 1;
    } else
	return 0;
}

/* try to attach call, if authentication is complete */
static void
TryAttach(struct rx_call * acall)
{
    struct rx_connection *conn;

    conn = acall->conn;
    if ((conn->type == RX_SERVER_CONNECTION) &&
	(acall->state == RX_STATE_PRECALL)) {
	/* Don't attach until we have any req'd. authentication. */
	if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
	    rxi_AttachServerProc(acall);
	    /*
	     * Note:  this does not necessarily succeed; there
	     * may not any proc available
	     */
	} else {
	    rxi_ChallengeOn(acall->conn);
	}
    }
}

/*
 * A data packet has been received off the interface.  This packet is
 * appropriate to the call (the call is in the right state, etc.).  This
 * routine can return a packet to the caller, for re-use
 */
struct rx_packet *
rxi_ReceiveDataPacket(struct rx_call * call,
		      struct rx_packet * np)
{
    u_long seq, serial, flags;

    seq = np->header.seq;
    rx_stats.dataPacketsRead++;

    /* If the call is in an error state, send an abort message */
    /* XXXX this will send too many aborts for multi-packet calls */
    if (call->error)
	return rxi_SendCallAbort(call, np);

    if (np->header.spare != 0)
	call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;

    /*
     * If there are no packet buffers, drop this new packet, unless we can find
     * packet buffers from inactive calls
     */
    if (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call)) {
	rx_stats.noPacketBuffersOnRead++;
	call->rprev = seq;
	TryAttach(call);
	rxi_calltrace(RX_TRACE_DROP, call);
	return np;
    }
    /* The usual case is that this is the expected next packet */
    if (np->header.seq == call->rnext) {

	/* Check to make sure it is not a duplicate of one already queued */
	if (queue_IsNotEmpty(&call->rq)
	    && queue_First(&call->rq, rx_packet)->header.seq == seq) {
	    rx_stats.dupPacketsRead++;
	    np = rxi_SendAck(call, np, seq, np->header.serial,
			     np->header.flags, RX_ACK_DUPLICATE);
	    call->rprev = seq;
	    return np;
	}
	/*
	 * It's the next packet.  Stick it on the receive queue for this call
	 */
	queue_Prepend(&call->rq, np);

	/* I don't think these two variables are needed (lws) */
	serial = np->header.serial;
	flags = np->header.flags;
#ifndef	ADAPT_PERF
	np = 0;			       /* We can't use this any more */

	/*
	 * Provide asynchronous notification for those who want it
	 * (e.g. multi rx)
	 */
	if (call->arrivalProc) {
	    (*call->arrivalProc) (call, call->arrivalProcHandle,
				  call->arrivalProcArg);
	    call->arrivalProc = (void (*) ()) 0;
	}
	/* Wakeup the reader, if any */
	if (call->flags & RX_CALL_READER_WAIT) {
	    call->flags &= ~RX_CALL_READER_WAIT;
	    MUTEX_ENTER(&call->lockq);

#ifdef	RX_ENABLE_LOCKS
	    cv_broadcast(&call->cv_rq);
#else
	    osi_rxWakeup(&call->rq);
#endif
	    MUTEX_EXIT(&call->lockq);
	}
#endif

	/*
	 * ACK packet right away, in order to keep the window going, and to
	 * reduce variability in round-trip-time estimates.
	 */

	if (flags & RX_REQUEST_ACK) {

	    /*
	     * Acknowledge, if requested.  Also take this opportunity to
	     * revise MTU estimate.
	     */
#ifdef MISCMTU
	    /* Copy a lower estimate of the MTU from the other end.  (cfe) */
	    /*
	     * We can figure out what the other end is using by checking out
	     * the size of its packets, and then adding back the header size.
	     * We don't count the last packet, since it might be partly empty.
	     * We shouldn't do this check on every packet, it's overkill.
	     * Perhaps it would be better done in
	     * ComputeRate if I decide it's ever worth doing. (lws)
	     */
	    if (!(np->header.flags & RX_LAST_PACKET) &&
		call->conn && call->conn->peer) {
		u_long length;
		struct rx_peer *peer = call->conn->peer;

		length = np->length + RX_HEADER_SIZE;
		if (length < peer->packetSize) {
		    dpf(("CONG peer %lx/%u: packetsize %lu=>%lu (rtt %u)",
		     ntohl(peer->host), ntohs(peer->port), peer->packetSize,
			 length, peer->rtt));
		    peer->packetSize = length;
		}
	    }
#endif				       /* MISCMTU */
	    np = rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_REQUESTED);
	} else if (flags & RX_LAST_PACKET) {
	    struct clock when;

	    /* Or schedule an acknowledgement later on. */
	    rxevent_Cancel(call->delayedAckEvent);
	    clock_GetTime(&when);
	    clock_Add(&when, &rx_lastAckDelay);
	    call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
						 call, 0);
	}
#ifdef	ADAPT_PERF
	/*
	 * Provide asynchronous notification for those who want it
	 * (e.g. multi rx)
	 */
	if (call->arrivalProc) {
	    (*call->arrivalProc) (call, call->arrivalProcHandle,
				  call->arrivalProcArg);
	    call->arrivalProc = (void (*) ()) 0;
	}
	/* Wakeup the reader, if any */
	MUTEX_ENTER(&call->lockq);
	if (call->flags & RX_CALL_READER_WAIT) {
	    call->flags &= ~RX_CALL_READER_WAIT;
#ifdef	RX_ENABLE_LOCKS
/*	    cv_signal(&call->cv_rq);*/
	    cv_broadcast(&call->cv_rq);
#else
	    osi_rxWakeup(&call->rq);
#endif
	}
	MUTEX_EXIT(&call->lockq);

	np = 0;			       /* We can't use this any more */
#endif				       /* ADAPT_PERF */

	/* Update last packet received */
	call->rprev = seq;

	/*
	 * If there is no server process serving this call, grab one, if
	 * available
	 */
	TryAttach(call);
    }
    /* This is not the expected next packet */
    else {
	/*
	 * Determine whether this is a new or old packet, and if it's
         * a new one, whether it fits into the current receive window.
         * It's also useful to know if the packet arrived out of
         * sequence, so that we can force an acknowledgement in that
         * case.  We have a slightly complex definition of
         * out-of-sequence: the previous packet number received off
         * the wire is remembered.  If the new arrival's sequence
         * number is less than previous, then previous is reset (to
         * 0).  MTUXXX  This should change slightly if skew is taken into
	 * consideration. lws 93.04.20
	 * The new packet is then declared out-of-sequence if
         * there are any packets missing between the "previous" packet
         * and the one which just arrived (because the missing packets
         * should have been filled in between the previous packet and
         * the new arrival).  This works regardless of whether the
         * peer's retransmission algorithm has been invoked, or not
         * (i.e. whether this is the first or subsequent pass over the
         * sequence of packets).  All this assumes that "most" of the
         * time, packets are delivered in the same *order* as they are
         * transmitted, with, possibly, some packets lost due to
         * transmission errors along the way.
	 */

	u_long prev;		       /* "Previous packet" sequence number */
	struct rx_packet *tp; /* Temporary packet pointer */
	struct rx_packet *nxp;/*
				        * Next packet pointer, for queue_Scan
				        */
	int nTwixt;		       /*
				        * Number of packets between previous
				        * and new one
				        */

	/*
	 * If the new packet's sequence number has been sent to the
         * application already, then this is a duplicate
	 */
	if (np->header.seq < call->rnext) {
	    rx_stats.dupPacketsRead++;
	    np = rxi_SendAck(call, np, seq, np->header.serial,
			     np->header.flags, RX_ACK_DUPLICATE);
	    call->rprev = seq;
	    return np;
	}
	/*
	 * If the sequence number is greater than what can be
         * accomodated by the current window, then send a negative
         * acknowledge and drop the packet
	 */
	if ((call->rnext + call->rwind) <= np->header.seq) {
	    np = rxi_SendAck(call, np, seq, np->header.serial,
			     np->header.flags, RX_ACK_EXCEEDS_WINDOW);
	    call->rprev = seq;
	    return np;
	}
	/* Look for the packet in the queue of old received packets */
	prev = call->rprev;
	if (prev > np->header.seq)
	    prev = 0;
	for (nTwixt = 0, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
	    /* Check for duplicate packet */
	    if (np->header.seq == tp->header.seq) {
		rx_stats.dupPacketsRead++;
		np = rxi_SendAck(call, np, np->header.seq, np->header.serial,
				 np->header.flags, RX_ACK_DUPLICATE);
		call->rprev = seq;
		return np;
	    }

	    /*
	     * Count the number of packets received 'twixt the previous
	     * packet and the new packet
	     */
	    if (tp->header.seq > prev && tp->header.seq < np->header.seq)
		nTwixt++;

	    /*
	     * If we find a higher sequence packet, break out and insert the
	     * new packet here.
	     */
	    if (np->header.seq < tp->header.seq)
		break;
	}

	/*
	 * It's within the window: add it to the the receive queue.
         * tp is left by the previous loop either pointing at the
         * packet before which to insert the new packet, or at the
         * queue head if the queue is empty or the packet should be
         * appended.
	 */
	queue_InsertBefore(tp, np);

	/*
	 * Acknowledge the packet if it's out of sequence or if
         * requested by peer
	 */
	if (np->header.flags & RX_REQUEST_ACK) {
	    np = rxi_SendAck(call, 0, np->header.seq, np->header.serial,
			     np->header.flags, RX_ACK_REQUESTED);
	    call->rprev = seq;
	    return np;
	}
	call->rprev = seq;
	np = 0;
    }
    return np;
}

#ifdef	ADAPT_WINDOW
static void rxi_ComputeRate();

#endif

/* The real smarts of the whole thing.  Right now somewhat short-changed. */
struct rx_packet *
rxi_ReceiveAckPacket(struct rx_call * call, struct rx_packet * np)
{
    struct rx_ackPacket *ap;
    int nAcks;
    struct rx_packet *tp;
    struct rx_packet *nxp;    /*
				        * Next packet pointer for queue_Scan
				        */
    struct rx_connection *conn = call->conn;
    struct rx_peer *peer = conn->peer;
    u_long first;
    u_long serial;

    /* because there are CM's that are bogus, sending weird values for this. */
    u_long skew = 0;
    int needRxStart = 0;
    int nbytes;

    rx_stats.ackPacketsRead++;
    ap = (struct rx_ackPacket *) rx_DataOf(np);
    nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *) ap);
    if (nbytes < 0)
	return np;		       /* truncated ack packet */

    nAcks = MIN(nbytes, ap->nAcks);    /* depends on ack packet struct */
    first = ntohl(ap->firstPacket);
    serial = ntohl(ap->serial);
#ifdef notdef
    skew = ntohs(ap->maxSkew);
#endif


#ifdef RXDEBUG
    if (Log) {
	fprintf(Log,
		"RACK: reason %x previous %lu seq %lu serial %lu skew %lu "
		"first %lu", ap->reason, 
		(unsigned long)ntohl(ap->previousPacket), 
		(unsigned long)np->header.seq, 
		(unsigned long)serial, 
		(unsigned long)skew, 
		(unsigned long)ntohl(ap->firstPacket));
	if (nAcks) {
	    int offset;

	    for (offset = 0; offset < nAcks; offset++)
		putc(ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*', Log);
	}
	putc('\n', Log);
    }
#endif

    /*
     * if a server connection has been re-created, it doesn't remember what
     * serial # it was up to.  An ack will tell us, since the serial field
     * contains the largest serial received by the other side
     */
    if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
	conn->serial = serial + 1;
    }

    /*
     * Update the outgoing packet skew value to the latest value of the
     * peer's incoming packet skew value.  The ack packet, of course, could
     * arrive out of order, but that won't affect things much
     */
    peer->outPacketSkew = skew;

#ifdef	AFS_SUN5_ENV
    /*
     * XXX Hack. Because we can't release the global rx lock when sending
     * packets (osi_NetSend) we drop all acks while we're traversing the tq in
     * rxi_Start sending packets out because packets
     * may move to the freePacketQueue as result of being here! So we drop
     * these packets until we're safely out of the traversing. Really ugly!
     */
    if (call->flags & RX_CALL_TQ_BUSY) {
	rx_tq_dropped++;
	return np;		       /* xmitting; drop packet */
    }
#endif
    /*
     * Check for packets that no longer need to be transmitted, and
     * discard them.  This only applies to packets positively
     * acknowledged as having been sent to the peer's upper level.
     * All other packets must be retained.  So only packets with
     * sequence numbers < ap->firstPacket are candidates.
     */
    while (queue_IsNotEmpty(&call->tq)) {
	tp = queue_First(&call->tq, rx_packet);
	if (tp->header.seq >= first)
	    break;
	call->tfirst = tp->header.seq + 1;
	if (tp->header.serial == serial) {
#ifdef	ADAPT_PERF
	    rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
#else
	    rxi_ComputeRoundTripTime(tp, 0, 0);
#endif
#ifdef ADAPT_WINDOW
	    rxi_ComputeRate(peer, call, tp, np, ap->reason);
#endif
	}
#ifdef	ADAPT_PERF
	else if ((tp->firstSerial == serial)) {
	    rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
#ifdef ADAPT_WINDOW
	    rxi_ComputeRate(peer, call, tp, np, ap->reason);
#endif
	}
#endif				       /* ADAPT_PERF */
	queue_Remove(tp);
	rxi_FreePacket(tp);	       /*
				        * rxi_FreePacket mustn't wake up anyone,
				        * preemptively.
				        */
    }

#ifdef ADAPT_WINDOW
    /* Give rate detector a chance to respond to ping requests */
    if (ap->reason == RX_ACK_PING_RESPONSE) {
	rxi_ComputeRate(peer, call, 0, np, ap->reason);
    }
#endif

    /*
     * N.B. we don't turn off any timers here.  They'll go away by themselves,
     * anyway
     */

    /*
     * Now go through explicit acks/nacks and record the results in
     * the waiting packets.  These are packets that can't be released
     * yet, even with a positive acknowledge.  This positive
     * acknowledge only means the packet has been received by the
     * peer, not that it will be retained long enough to be sent to
     * the peer's upper level.  In addition, reset the transmit timers
     * of any missing packets (those packets that must be missing
     * because this packet was out of sequence)
     */

    for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {

	/*
	 * Update round trip time if the ack was stimulated on receipt of
	 * this packet
	 */
	if (tp->header.serial == serial) {
#ifdef	ADAPT_PERF
	    rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
#else
	    rxi_ComputeRoundTripTime(tp, 0, 0);
#endif
#ifdef ADAPT_WINDOW
	    rxi_ComputeRate(peer, call, tp, np, ap->reason);
#endif
	}
#ifdef	ADAPT_PERF
	else if ((tp->firstSerial == serial)) {
	    rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
#ifdef ADAPT_WINDOW
	    rxi_ComputeRate(peer, call, tp, np, ap->reason);
#endif
	}
#endif				       /* ADAPT_PERF */

	/*
	 * Set the acknowledge flag per packet based on the
         * information in the ack packet.  It's possible for an
         * acknowledged packet to be downgraded
	 */
	if (tp->header.seq < first + nAcks) {
	    /* Explicit ack information:  set it in the packet appropriately */
	    tp->acked = (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK);
	} else {
	    /*
	     * No ack information: the packet may have been
             * acknowledged previously, but that is now rescinded (the
             * peer may have reduced the window size)
	     */
	    tp->acked = 0;
	}


#ifdef	ADAPT_PERF
	/*
	 * If packet isn't yet acked, and it has been transmitted at least
	 * once, reset retransmit time using latest timeout
	 * ie, this should readjust the retransmit timer for all outstanding
	 * packets...  So we don't just retransmit when we should know better
	 */

	if (!tp->acked && tp->header.serial) {
	    tp->retryTime = tp->timeSent;
	    clock_Add(&tp->retryTime, &peer->timeout);
	    /* shift by eight because one quarter-sec ~ 256 milliseconds */
	    clock_Addmsec(&(tp->retryTime), ((unsigned long) tp->backoff) << 8);
	}
#endif				       /* ADAPT_PERF */

	/*
	 * If the packet isn't yet acked, and its serial number
         * indicates that it was transmitted before the packet which
         * prompted the acknowledge (that packet's serial number is
         * supplied in the ack packet), then schedule the packet to be
         * transmitted *soon*.  This is done by resetting the
         * retransmit time in the packet to the current time.
         * Actually this is slightly more intelligent: to guard
         * against packets that have been transmitted out-of-order by
         * the network (this even happens on the local token ring with
         * our IBM RT's!), the degree of out-of-orderness (skew) of
         * the packet is compared against the maximum skew for this
         * peer.  If it is less, we don't retransmit yet.  Note that
         * we don't do this for packets with zero serial numbers: they
         * never have been transmitted.
	 */

	/*
	 * I don't know if we should add in the new retransmit backoff time
	 * here or not.  I think that we should also consider reducing
	 * the "congestion window" size as an alternative.  LWSXXX
	 */

	if (!tp->acked && tp->header.serial
	    && ((tp->header.serial + skew) <= serial)) {
	    rx_stats.dataPacketsPushed++;
	    clock_GetTime(&tp->retryTime);
	    needRxStart = 1;

	    dpf(("Pushed packet seq %d serial %d, new time %d.%d\n",
		 tp->header.seq, tp->header.serial, tp->retryTime.sec,
		 tp->retryTime.usec / 1000));
	}
    }

    /*
     * If the window has been extended by this acknowledge packet,
     * then wakeup a sender waiting in alloc for window space, or try
     * sending packets now, if he's been sitting on packets due to
     * lack of window space
     */
    if (call->tnext < (call->tfirst + call->twind)) {
#ifdef	RX_ENABLE_LOCKS
	MUTEX_ENTER(&call->lockw);
	cv_signal(&call->cv_twind);
	MUTEX_EXIT(&call->lockw);
#else
	if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
	    call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
	    osi_rxWakeup(&call->twind);
	}
#endif
	if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
	    call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
	    needRxStart = 1;
	}
    }
    /*
     * if the ack packet has a receivelen field hanging off it,
     * update our state
     */
    if (np->length >= rx_AckDataSize(ap->nAcks) + 4) {
	unsigned long maxPacketSize;

	rx_packetread(np, rx_AckDataSize(ap->nAcks), 4, &maxPacketSize);
	maxPacketSize = (unsigned long) ntohl(maxPacketSize);
	dpf(("maxPacketSize=%ul\n", maxPacketSize));

	/*
	 * sanity check - peer might have restarted with different params.
	 * If peer says "send less", dammit, send less...  Peer should never
	 * be unable to accept packets of the size that prior AFS versions
	 * would send without asking.
	 */
	if (OLD_MAX_PACKET_SIZE <= maxPacketSize)
	    conn->maxPacketSize = MIN(maxPacketSize, conn->peer->packetSize);
    }
    /* if (needRxStart) rxi_Start(0, call); */
    rxi_Start(0, call);		       /* Force rxi_Restart for now:  skew
				        * problems!!! */
    return np;
}

/* Received a response to a challenge packet */
struct rx_packet *
rxi_ReceiveResponsePacket(struct rx_connection * conn,
			  struct rx_packet * np)
{
    int error;

    /* Ignore the packet if we're the client */
    if (conn->type == RX_CLIENT_CONNECTION)
	return np;

    /* If already authenticated, ignore the packet (it's probably a retry) */
    if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
	return np;

    /* Otherwise, have the security object evaluate the response packet */
    error = RXS_CheckResponse(conn->securityObject, conn, np);
    if (error) {

	/*
	 * If the response is invalid, reset the connection, sending an abort
	 * to the peer
	 */
#ifndef KERNEL
	IOMGR_Sleep(1);
#endif
	rxi_ConnectionError(conn, error);
	return rxi_SendConnectionAbort(conn, np);
    } else {

	/*
	 * If the response is valid, any calls waiting to attach servers can
	 * now do so
	 */
	int i;

	for (i = 0; i < RX_MAXCALLS; i++) {
	    struct rx_call *call = conn->call[i];

	    if (call && (call->state == RX_STATE_PRECALL))
		rxi_AttachServerProc(call);
	}
    }
    return np;
}

/*
 * A client has received an authentication challenge: the security
 * object is asked to cough up a respectable response packet to send
 * back to the server.  The server is responsible for retrying the
 * challenge if it fails to get a response.
 */

struct rx_packet *
rxi_ReceiveChallengePacket(struct rx_connection * conn,
			   struct rx_packet * np)
{
    int error;

    /* Ignore the challenge if we're the server */
    if (conn->type == RX_SERVER_CONNECTION)
	return np;

    /*
     * Ignore the challenge if the connection is otherwise idle; someone's
     * trying to use us as an oracle.
     */
    if (!rxi_HasActiveCalls(conn))
	return np;

    /*
     * Send the security object the challenge packet.  It is expected to fill
     * in the response.
     */
    error = RXS_GetResponse(conn->securityObject, conn, np);

    /*
     * If the security object is unable to return a valid response, reset the
     * connection and send an abort to the peer.  Otherwise send the response
     * packet to the peer connection.
     */
    if (error) {
	rxi_ConnectionError(conn, error);
	np = rxi_SendConnectionAbort(conn, np);
    } else {
	np = rxi_SendSpecial((struct rx_call *) 0, conn, np,
			     RX_PACKET_TYPE_RESPONSE, (char *) 0, -1);
    }
    return np;
}


/*
 * Find an available server process to service the current request in
 * the given call structure.  If one isn't available, queue up this
 * call so it eventually gets one
 */
void
rxi_AttachServerProc(struct rx_call * call)
{
    struct rx_serverQueueEntry *sq;
    struct rx_service *service = call->conn->service;

    /* May already be attached */
    if (call->state == RX_STATE_ACTIVE)
	return;

    if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
	/*
	 * If there are no processes available to service this call,
         * put the call on the incoming call queue (unless it's
         * already on the queue).
         */
	if (!(call->flags & RX_CALL_WAIT_PROC)) {
	    call->flags |= RX_CALL_WAIT_PROC;
	    rx_nWaiting++;
	    rxi_calltrace(RX_CALL_ARRIVAL, call);
	    queue_Append(&rx_incomingCallQueue, call);
	}
    } else {
	sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);

	MUTEX_ENTER(&sq->lock);

	queue_Remove(sq);
	sq->newcall = call;
	if (call->flags & RX_CALL_WAIT_PROC) {
	    /* Conservative:  I don't think this should happen */
	    call->flags &= ~RX_CALL_WAIT_PROC;
	    rx_nWaiting--;
	    queue_Remove(call);
	}
	call->state = RX_STATE_ACTIVE;
	call->mode = RX_MODE_RECEIVING;
	if (call->flags & RX_CALL_CLEARED) {
	    /* send an ack now to start the packet flow up again */
	    call->flags &= ~RX_CALL_CLEARED;
	    rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY);
	}
	service->nRequestsRunning++;
	if (service->nRequestsRunning <= service->minProcs)
	    rxi_minDeficit--;
	rxi_availProcs--;
#ifdef	RX_ENABLE_LOCKS
	cv_signal(&sq->cv);
#else
	osi_rxWakeup(sq);
#endif
	MUTEX_EXIT(&sq->lock);
    }
}

/*
 * Delay the sending of an acknowledge event for a short while, while
 * a new call is being prepared (in the case of a client) or a reply
 * is being prepared (in the case of a server).  Rather than sending
 * an ack packet, an ACKALL packet is sent.
 */
void
rxi_AckAll(struct rxevent * event, struct rx_call * call, char *dummy)
{
    if (event)
	call->delayedAckEvent = (struct rxevent *) 0;
    rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
		    RX_PACKET_TYPE_ACKALL, (char *) 0, 0);
}

void
rxi_SendDelayedAck(struct rxevent * event, struct rx_call * call,
		   char *dummy)
{
    if (event)
	call->delayedAckEvent = (struct rxevent *) 0;
    (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY);
}

/*
 * Clear out the transmit queue for the current call (all packets have
 * been received by peer)
 */
void
rxi_ClearTransmitQueue(struct rx_call * call)
{
    struct rx_packet *p, *tp;

    for (queue_Scan(&call->tq, p, tp, rx_packet)) {
	queue_Remove(p);
	rxi_FreePacket(p);
    }

    rxevent_Cancel(call->resendEvent);
    call->tfirst = call->tnext;	       /*
				        * implicitly acknowledge all data already sent
				        */
    MUTEX_ENTER(&call->lockw);
#ifdef	RX_ENABLE_LOCKS
    cv_signal(&call->cv_twind);
#else
    osi_rxWakeup(&call->twind);
#endif
    MUTEX_EXIT(&call->lockw);
}

void
rxi_ClearReceiveQueue(struct rx_call * call)
{
    struct rx_packet *p, *tp;

    for (queue_Scan(&call->rq, p, tp, rx_packet)) {
	queue_Remove(p);
	rxi_FreePacket(p);
    }
    if (call->state == RX_STATE_PRECALL)
	call->flags |= RX_CALL_CLEARED;
}

/* Send an abort packet for the specified call */
struct rx_packet *
rxi_SendCallAbort(struct rx_call * call, struct rx_packet * packet)
{
    if (call->error) {
	long error;

	error = htonl(call->error);
	packet = rxi_SendSpecial(call, call->conn, packet, RX_PACKET_TYPE_ABORT,
				 (char *) &error, sizeof(error));
    }
    return packet;
}

/*
 * Send an abort packet for the specified connection.  Np is an
 * optional packet that can be used to send the abort.
 */
struct rx_packet *
rxi_SendConnectionAbort(struct rx_connection * conn,
			struct rx_packet * packet)
{
    if (conn->error) {
	long error;

	error = htonl(conn->error);
	packet = rxi_SendSpecial((struct rx_call *) 0, conn, packet,
		      RX_PACKET_TYPE_ABORT, (char *) &error, sizeof(error));
    }
    return packet;
}

/*
 * Associate an error all of the calls owned by a connection.  Called
 * with error non-zero.  This is only for really fatal things, like
 * bad authentication responses.  The connection itself is set in
 * error at this point, so that future packets received will be
 * rejected.
 */
void
rxi_ConnectionError(struct rx_connection * conn, long error)
{
    if (error) {
	int i;

	if (conn->challengeEvent)
	    rxevent_Cancel(conn->challengeEvent);
	for (i = 0; i < RX_MAXCALLS; i++) {
	    struct rx_call *call = conn->call[i];

	    if (call)
		rxi_CallError(call, error);
	}
	conn->error = error;
	rx_stats.fatalErrors++;
    }
}

/* Reset all of the calls associated with a connection. */
void
rxi_ResetConnection(struct rx_connection * conn)
{
    int i;

    for (i = 0; i < RX_MAXCALLS; i++) {
	struct rx_call *call = conn->call[i];

	if (call)
	    rxi_ResetCall(call);
    }

    /* get rid of pending events that could zap us later */
    if (conn->challengeEvent)
	rxevent_Cancel(conn->challengeEvent);
}

void
rxi_CallError(struct rx_call * call, long error)
{
    if (call->error)
	error = call->error;
    rxi_ResetCall(call);
    call->error = error;
    call->mode = RX_MODE_ERROR;
}

/*
 * Reset various fields in a call structure, and wakeup waiting
 * processes.  Some fields aren't changed: state & mode are not
 * touched (these must be set by the caller), and bufptr, nLeft, and
 * nFree are not reset, since these fields are manipulated by
 * unprotected macros, and may only be reset by non-interrupting code.
 */
#ifdef ADAPT_WINDOW
/* this code requires that call->conn be set properly as a pre-condition. */
#endif				       /* ADAPT_WINDOW */

void
rxi_ResetCall(struct rx_call * call)
{
    int flags;

    /* Notify anyone who is waiting for asynchronous packet arrival */
    if (call->arrivalProc) {
	(*call->arrivalProc) (call, call->arrivalProcHandle,
			      call->arrivalProcArg);
	call->arrivalProc = (void (*) ()) 0;
    }
    flags = call->flags;
    rxi_ClearReceiveQueue(call);
    rxi_ClearTransmitQueue(call);
    call->error = 0;
    call->flags = 0;
    call->rwind = rx_Window;	       /* XXXX */
#ifdef ADAPT_WINDOW
    call->twind = call->conn->peer->maxWindow;	/* XXXX */
#else
    call->twind = rx_Window;	       /* XXXX */
#endif
    call->tfirst = call->rnext = call->tnext = 1;
    call->rprev = 0;
    call->lastAcked = 0;
    call->localStatus = call->remoteStatus = 0;

    MUTEX_ENTER(&call->lockq);
    if (flags & RX_CALL_READER_WAIT) {
#ifdef	RX_ENABLE_LOCKS
/*	cv_signal(&call->cv_rq);*/
	cv_broadcast(&call->cv_rq);
#else
	osi_rxWakeup(&call->rq);
#endif
    }
    MUTEX_EXIT(&call->lockq);
    if (flags & RX_CALL_WAIT_PACKETS)
	rxi_PacketsUnWait();	       /* XXX */
    MUTEX_ENTER(&call->lockw);

#ifdef	RX_ENABLE_LOCKS
    cv_signal(&call->cv_twind);
#else
    if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
	osi_rxWakeup(&call->twind);
#endif
    MUTEX_EXIT(&call->lockw);

    if (queue_IsOnQueue(call)) {
	queue_Remove(call);
	if (flags & RX_CALL_WAIT_PROC)
	    rx_nWaiting--;
    }
    rxi_KeepAliveOff(call);
    rxevent_Cancel(call->delayedAckEvent);
}

/*
 * Send an acknowledge for the indicated packet (seq,serial) of the
 * indicated call, for the indicated reason (reason).  This
 * acknowledge will specifically acknowledge receiving the packet, and
 * will also specify which other packets for this call have been
 * received.  This routine returns the packet that was used to the
 * caller.  The caller is responsible for freeing it or re-using it.
 * This acknowledgement also returns the highest sequence number
 * actually read out by the higher level to the sender; the sender
 * promises to keep around packets that have not been read by the
 * higher level yet (unless, of course, the sender decides to abort
 * the call altogether).  Any of p, seq, serial, pflags, or reason may
 * be set to zero without ill effect.  That is, if they are zero, they
 * will not convey any information.
 * NOW there is a trailer field, after the ack where it will safely be
 * ignored by mundanes, which indicates the maximum size packet this
 * host can swallow.
 */
struct rx_packet *
rxi_SendAck(struct rx_call * call,
	    struct rx_packet * optionalPacket, int seq, int serial,
	    int pflags, int reason)
#if 0
    struct rx_call *call;
    struct rx_packet *optionalPacket;	/* use to send ack (or null) */
    int seq;			       /* Sequence number of the packet we
				        * are acking */
    int serial;			       /* Serial number of the packet */
    int pflags;			       /* Flags field from packet header */
    int reason;			       /* Reason an acknowledge was prompted */

#endif
{
    struct rx_ackPacket *ap;
    struct rx_packet *rqp;
    struct rx_packet *nxp;    /* For queue_Scan */
    struct rx_packet *p;
    u_char offset;
    long templ;

    if (call->rnext > call->lastAcked)
	call->lastAcked = call->rnext;
    p = optionalPacket;

    if (p) {
	rx_computelen(p, p->length);   /* reset length, you never know */
    }
     /* where that's been...         */ 
    else if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL)))
	osi_Panic("rxi_SendAck");

    templ = rx_AckDataSize(call->rwind) + 4 - rx_GetDataSize(p);
    if (templ > 0) {
	if (rxi_AllocDataBuf(p, templ))
	    return optionalPacket;
	templ = rx_AckDataSize(call->rwind) + 4;
	if (rx_Contiguous(p) < templ)
	    return optionalPacket;
    }				       /* MTUXXX failing to send an ack is
				        * very serious.  We should */
    /* try as hard as possible to send even a partial ack; it's */
    /* better than nothing. */
    ap = (struct rx_ackPacket *) rx_DataOf(p);
    ap->bufferSpace = htonl(0);	       /* Something should go here, sometime */
    ap->reason = reason;

    /* The skew computation used to bullshit, I think it's better now. */
    /* We should start paying attention to skew.    XXX  */
    ap->serial = htonl(call->conn->maxSerial);
    ap->maxSkew = 0;		       /* used to be peer->inPacketSkew */

    ap->firstPacket = htonl(call->rnext);	/*
					         * First packet not yet forwarded
					         * to reader
					         */
    ap->previousPacket = htonl(call->rprev);	/* Previous packet received */

    /*
     * No fear of running out of ack packet here because there can only be
     * at most one window full of unacknowledged packets.  The window size
     * must be constrained to be less than the maximum ack size, of course.
     * Also, an ack should always  fit into a single packet -- it should not
     * ever be fragmented.
     */
    for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
	while (rqp->header.seq > call->rnext + offset)
	    ap->acks[offset++] = RX_ACK_TYPE_NACK;
	ap->acks[offset++] = RX_ACK_TYPE_ACK;
    }
    ap->nAcks = offset;
    p->length = rx_AckDataSize(offset) + 4;
    templ = htonl(rx_maxReceiveSize);
    rx_packetwrite(p, rx_AckDataSize(offset), 4, &templ);
    p->header.serviceId = call->conn->serviceId;
    p->header.cid = (call->conn->cid | call->channel);
    p->header.callNumber = *call->callNumber;
    p->header.seq = seq;
    p->header.securityIndex = call->conn->securityIndex;
    p->header.epoch = call->conn->epoch;
    p->header.type = RX_PACKET_TYPE_ACK;
    p->header.flags = 0;
    if (reason == RX_ACK_PING) {
	p->header.flags |= RX_REQUEST_ACK;
#ifdef ADAPT_WINDOW
	clock_GetTime(&call->pingRequestTime);
#endif
    }
    if (call->conn->type == RX_CLIENT_CONNECTION)
	p->header.flags |= RX_CLIENT_INITIATED;


#ifdef RXDEBUG
    if (Log) {
	fprintf(Log, "SACK: reason %x previous %lu seq %lu first %lu",
		ap->reason, 
		(unsigned long)ntohl(ap->previousPacket), 
		(unsigned long)p->header.seq,
		(unsigned long)ntohl(ap->firstPacket));
	if (ap->nAcks) {
	    for (offset = 0; offset < ap->nAcks; offset++)
		putc(ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*', Log);
	}
	putc('\n', Log);
    }
#endif

    {
	int i, nbytes = p->length;

	for (i = 1; i < p->niovecs; i++) {	/* vec 0 is ALWAYS header */
	    if (nbytes <= p->wirevec[i].iov_len) {
		int savelen, saven;

		savelen = p->wirevec[i].iov_len;
		saven = p->niovecs;
		p->wirevec[i].iov_len = nbytes;
		p->niovecs = i + 1;
		rxi_Send(call, p);
		p->wirevec[i].iov_len = savelen;
		p->niovecs = saven;
		break;
	    } else
		nbytes -= p->wirevec[i].iov_len;
	}
    }
    rx_stats.ackPacketsSent++;
    if (!optionalPacket)
	rxi_FreePacket(p);
    return optionalPacket;	       /* Return packet for re-use by caller */
}


/*
 * This routine is called when new packets are readied for
 * transmission and when retransmission may be necessary, or when the
 * transmission window or burst count are favourable.  This should be
 * better optimized for new packets, the usual case, now that we've
 * got rid of queues of send packets. XXXXXXXXXXX
 */
void
rxi_Start(struct rxevent * event, struct rx_call * call)
{
    int nSent = 0;
    struct rx_packet *p;
    struct rx_packet *nxp;    /* Next pointer for queue_Scan */
    struct rx_packet *lastPacket;
    struct rx_peer *peer = call->conn->peer;
    struct clock now, retryTime;
    int haveEvent;

    /*
     * If rxi_Start is being called as a result of a resend event,
     * then make sure that the event pointer is removed from the call
     * structure, since there is no longer a per-call retransmission
     * event pending.
     */
    if (event && event == call->resendEvent)
	call->resendEvent = 0;

    if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
	/*
	 * Get clock to compute the re-transmit time for any packets
         * in this burst.  Note, if we back off, it's reasonable to
         * back off all of the packets in the same manner, even if
         * some of them have been retransmitted more times than more
         * recent additions
	 */
	clock_GetTime(&now);
	retryTime = now;	       /* initialize before use */
	clock_Add(&retryTime, &peer->timeout);

	/*
	 * Send (or resend) any packets that need it, subject to
         * window restrictions and congestion burst control
         * restrictions.  Ask for an ack on the last packet sent in
         * this burst.  For now, we're relying upon the window being
         * considerably bigger than the largest number of packets that
         * are typically sent at once by one initial call to
         * rxi_Start.  This is probably bogus (perhaps we should ask
         * for an ack when we're half way through the current
         * window?).  Also, for non file transfer applications, this
         * may end up asking for an ack for every packet.  Bogus. XXXX
         */
	call->flags |= RX_CALL_TQ_BUSY;
	for (lastPacket = (struct rx_packet *) 0,
	     queue_Scan(&call->tq, p, nxp, rx_packet)) {
	    if (p->acked) {
		rx_stats.ignoreAckedPacket++;
		continue;	       /* Ignore this packet if it has been
				        * acknowledged */
	    }
	    /*
	     * Turn off all flags except these ones, which are the same
             * on each transmission
	     */
	    p->header.flags &= RX_PRESET_FLAGS;

	    if (p->header.seq >= call->tfirst + call->twind) {
		call->flags |= RX_CALL_WAIT_WINDOW_SEND;	/*
							         * Wait for transmit
							         * window
							         */
		/*
	         * Note: if we're waiting for more window space, we can
                 * still send retransmits; hence we don't return here, but
                 * break out to schedule a retransmit event
	         */
		break;
	    }
	    /*
	     * If we're not allowed to send any more in the current
             * burst, make sure we get scheduled later.  Also schedule
             * an event to "give back" the packets we've used, when the
             * burst time has elapsed (if we used any packets at all).
             */
	    if (peer->burstSize && !peer->burst) {
		if (nSent) {
		    int foo; /* XXX - ugly */
		    /* Send off the prior packet */
		    /*
	             * Don't request an ack if it's a short packet, 'cuz the
		     * peer will cut down its MTU as a result.
	             */
		    if (((lastPacket->header.flags & RX_LAST_PACKET) == 0)) {
			if ((lastPacket->length + RX_HEADER_SIZE +
			     call->conn->securityMaxTrailerSize) >= call->conn->maxPacketSize)
			    foo = ACKHACK(lastPacket);
			else
			    Rx0++;
		    }
		    rxi_Send(call, lastPacket);
		    rxi_ScheduleDecongestionEvent(call, nSent);
		}
		rxi_CongestionWait(call);
		/*
	         * Note: if we're waiting for congestion to ease, we can't
                 * send any packets, including retransmits.  Hence we do
                 * not schedule a new retransmit event right now
	         */
		call->flags &= ~RX_CALL_TQ_BUSY;
		return;
	    }
	    /*
	     * Transmit the packet if it has never been sent, or
             * retransmit it if the current timeout for this host for
             * this packet has elapsed
	     */
	    if (!clock_IsZero(&p->retryTime)) {
		if (clock_Lt(&now, &p->retryTime))
		    continue;

		/*
	         * Always request an ack on a retransmitted packet; this
                 * will help to get the data moving again, especially if
                 * the packet is near the beginning of the window.
                 * Actually, XXXX, we may want to do just that: only
                 * request the acks if the packet is in, say, the first
                 * half of the window
	         */
		p->header.flags |= RX_REQUEST_ACK;
		peer->reSends++, rx_stats.dataPacketsReSent++;
		p->retryTime = retryTime;

		/*
	         * if a packet gets dropped, don't keep hammering on it --
	         * back off exponentially, at least up to a point.  I had
	         * to trade off global congestion avoidance against individual
	         * performance.  Note that most connections will time out
	         * after 20 - 60 seconds.  In pathological cases, retransmits
	         * must still continue to disperse.  For instance, there is
	         * a condition where the server discards received packets, but
	         * still sends keep-alives on the call, so the call may live
	         * much longer than 60 seconds.
	         */
		if (p->backoff < MAXBACKOFF)
		    p->backoff = (p->backoff << 1) + 1;	/* so it can't stay == 0 */
		else
		    p->backoff++;
		clock_Addmsec(&(p->retryTime), ((unsigned long) p->backoff) << 8);
		/* consider shrinking the packet size? XXXX */
		/* no, shrink the burst size.  LWSXXX */
	    } else {
		peer->nSent++, rx_stats.dataPacketsSent++;
		p->firstSent = now;    /* improved RTO calculation- not Karn */
		p->retryTime = retryTime;
		/*
	         * Ask for an ack for the first packet on a new
                 * connection, since it may carry some interesting info
                 * like maxReceiveSize.  It will also help us train to a
                 * new estimate of RTT, for good or bad.  This has one
	         * neat side effect:  since the first call on a connection
	         * usually triggers a challenge/response exchange, the
	         * first packet was often retransmitted before the call
	         * could complete.  Getting this ack prevents those
	         * retransmissions.  Admittedly, this is straining at gnats.
	         */
		if ((p->header.callNumber == 1) && (p->header.seq == 1) &&
		    (p->length >= call->conn->maxPacketSize)) {
		    p->header.flags |= RX_REQUEST_ACK;
		}
	    }

	    /*
	     * Install the new retransmit time for the packet, and
             * record the time sent
	     */
	    p->timeSent = now;

	    /*
	     * Send the previous packet, and remember this one--we don't
             * send it immediately, so we can tag it as requiring an ack
             * later, if necessary
	     */
	    if (peer->burstSize)
		peer->burst--;
	    nSent++;
	    if (lastPacket) {
		/*
	         * Tag this packet as not being the last in this group,
                 * for the receiver's benefit
	         */
		lastPacket->header.flags |= RX_MORE_PACKETS;
		rxi_Send(call, lastPacket);
	    }
	    lastPacket = p;
	}
	call->flags &= ~RX_CALL_TQ_BUSY;

	/*
	 * If any packets are to be sent, send them and post a
         * decongestion event to bump the burst count that we used up
         * sending the packets
	 */
	if (nSent) {

	    /*
	     * we don't ask for an ack on the final packet, since the
	     * response from the peer implicitly acks it, but we do wait a
	     * little longer for the ack on the last packet on server conns.
	     */
	    if (((lastPacket->header.flags & RX_LAST_PACKET) == 0)) {
		int foo; /* XXX - ugly */
		if ((lastPacket->length + RX_HEADER_SIZE +
		     call->conn->securityMaxTrailerSize) >= 
		    call->conn->maxPacketSize)
		    foo = ACKHACK(lastPacket);
		else
		    Rx1++;
	    } else if (!(lastPacket->header.flags & RX_CLIENT_INITIATED))
		clock_Addmsec(&(lastPacket->retryTime), 400);

	    rxi_Send(call, lastPacket);
	    if (peer->burstSize)
		rxi_ScheduleDecongestionEvent(call, nSent);
	}

	/*
	 * Always post a resend event, if there is anything in the queue, and
	 * resend is possible.  There should be at least one unacknowledged
	 * packet in the queue ... otherwise none of these packets should be
	 * on the queue in the first place.
	 */
	if (call->resendEvent) {

	    /*
	     * If there's an existing resend event, then if its expiry time
	     * is sooner than the new one, then it must be less than any
	     * possible expiry time (because it represents all previous
	     * packets sent that may still need retransmitting).  In this
	     * case, just leave that event as scheduled
	     */
	    if (clock_Le(&call->resendEvent->eventTime, &retryTime))
		return;
	    /* Otherwise, cancel the existing event and post a new one */
	    rxevent_Cancel(call->resendEvent);
	}

	/*
	 * Loop to find the earliest event.  I *know* XXXX that this can be
	 * coded more elegantly (perhaps rolled into the above code)
	 */
	for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
	    if (!p->acked && !clock_IsZero(&p->retryTime)) {
		haveEvent = 1;
		if (clock_Lt(&p->retryTime, &retryTime))
		    retryTime = p->retryTime;
	    }
	}

	/* Post a new event to re-run rxi_Start when retries may be needed */
	if (haveEvent) {
	    call->resendEvent = rxevent_Post(&retryTime, rxi_Start, (char *) call, 0);
	}
    }
}

/*
 * Also adjusts the keep alive parameters for the call, to reflect
 * that we have just sent a packet (so keep alives aren't sent
 * immediately)
 */
void
rxi_Send(struct rx_call *call, struct rx_packet *p)
{
    struct rx_connection *conn = call->conn;

    /* Stamp each packet with the user supplied status */
    p->header.userStatus = call->localStatus;

    /*
     * Allow the security object controlling this call's security to make any
     * last-minute changes to the packet
     */
    RXS_SendPacket(conn->securityObject, call, p);

    /* Actually send the packet, filling in more connection-specific fields */
    rxi_SendPacket(conn, p);

    /*
     * Update last send time for this call (for keep-alive processing), and
     * for the connection (so that we can discover idle connections)
     */
    conn->lastSendTime = call->lastSendTime = clock_Sec();

    /*
     * Since we've just sent SOME sort of packet to the peer, it's safe to
     * nuke any scheduled end-of-packets ack
     */
    rxevent_Cancel(call->delayedAckEvent);
}


/*
 * Check if a call needs to be destroyed.  Called by keep-alive code to ensure
 * that things are fine.  Also called periodically to guarantee that nothing
 * falls through the cracks (e.g. (error + dally) connections have keepalive
 * turned off.  Returns 0 if conn is well, -1 otherwise.  If otherwise, call
 * may be freed! 
 */
int 
rxi_CheckCall(struct rx_call *call)
{
    struct rx_connection *conn = call->conn;
    struct rx_service *tservice;
    u_long now;

    now = clock_Sec();

    /*
     * These are computed to the second (+- 1 second).  But that's good
     * enough for these values, which should be a significant number of
     * seconds.
     */
    if (now > (call->lastReceiveTime + conn->secondsUntilDead)) {
	if (call->state == RX_STATE_ACTIVE)
	    rxi_CallError(call, RX_CALL_DEAD);
	else
	    rxi_FreeCall(call);

	/*
	 * Non-active calls are destroyed if they are not responding to
	 * pings; active calls are simply flagged in error, so the attached
	 * process can die reasonably gracefully.
	 */
	return -1;
    }
    /* see if we have a non-activity timeout */
    tservice = conn->service;
    if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
	&& tservice->idleDeadTime
	&& ((call->startWait + tservice->idleDeadTime) < now)) {
	if (call->state == RX_STATE_ACTIVE) {
	    rxi_CallError(call, RX_CALL_TIMEOUT);
	    return -1;
	}
    }
    /* see if we have a hard timeout */
    if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime))) {
	if (call->state == RX_STATE_ACTIVE)
	    rxi_CallError(call, RX_CALL_TIMEOUT);
	return -1;
    }
    return 0;
}


/*
 * When a call is in progress, this routine is called occasionally to
 * make sure that some traffic has arrived (or been sent to) the peer.
 * If nothing has arrived in a reasonable amount of time, the call is
 * declared dead; if nothing has been sent for a while, we send a
 * keep-alive packet (if we're actually trying to keep the call alive)
 */
void 
rxi_KeepAliveEvent(struct rxevent *event, struct rx_call *call,
		   char *dummy)
{
    struct rx_connection *conn = call->conn;
    u_long now;

    call->keepAliveEvent = (struct rxevent *) 0;
    now = clock_Sec();

    if (rxi_CheckCall(call))
	return;

    /* Don't try to keep alive dallying calls */
    if ((call->state != RX_STATE_DALLY)
	&& ((now - call->lastSendTime) > conn->secondsUntilPing)) {
	/* Don't try to send keepalives if there is unacknowledged data */

	/*
	 * the rexmit code should be good enough, this little hack doesn't
	 * quite work LWSXXX
	 */
	(void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING);
    }
    rxi_ScheduleKeepAliveEvent(call);
}


void 
rxi_ScheduleKeepAliveEvent(struct rx_call *call)
{
    if (!call->keepAliveEvent) {
	struct clock when;

	clock_GetTime(&when);
	when.sec += call->conn->secondsUntilPing;
	call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
    }
}

/* N.B. rxi_KeepAliveOff:  is defined earlier as a macro */
void 
rxi_KeepAliveOn(struct rx_call *call)
{

    /*
     * Pretend last packet received was received now--i.e. if another packet
     * isn't received within the keep alive time, then the call will die;
     * Initialize last send time to the current time--even if a packet hasn't
     * been sent yet.  This will guarantee that a keep-alive is sent within
     * the ping time
     */
    call->lastReceiveTime = call->lastSendTime = clock_Sec();
    rxi_ScheduleKeepAliveEvent(call);
}

/*
 * This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
 * seconds) to ask the client to authenticate itself.  The routine
 * issues a challenge to the client, which is obtained from the
 * security object associated with the connection
 */
void 
rxi_ChallengeEvent(struct rxevent *event, struct rx_connection *conn,
		   char *dummy)
{
    conn->challengeEvent = (struct rxevent *) 0;
    if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
	struct rx_packet *packet;
	struct clock when;

	packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
	if (!packet)
	    osi_Panic("rxi_ChallengeEvent");
	RXS_GetChallenge(conn->securityObject, conn, packet);
	rxi_SendSpecial((struct rx_call *) 0, conn, packet,
			RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1);
	rxi_FreePacket(packet);
	clock_GetTime(&when);
	when.sec += RX_CHALLENGE_TIMEOUT;
	conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent,
					    conn, 0);
    }
}

/*
 * Call this routine to start requesting the client to authenticate
 * itself.  This will continue until authentication is established,
 * the call times out, or an invalid response is returned.  The
 * security object associated with the connection is asked to create
 * the challenge at this time.  N.B.  rxi_ChallengeOff is a macro,
 * defined earlier.
 */
void 
rxi_ChallengeOn(struct rx_connection *conn)
{
    if (!conn->challengeEvent) {
	RXS_CreateChallenge(conn->securityObject, conn);
	rxi_ChallengeEvent((struct rxevent *) 0, conn, 0);
    };
}

/*
 * Called by event.c when a decongestion event (setup by
 * rxi_CongestionWait) occurs.  This adds back in to the burst count
 * for the specified host the number of packets that were sent at the
 * time the event was scheduled.  It also calls rxi_Start on as many
 * waiting calls as possible before the burst count goes down to zero,
 * again.
 */
void 
rxi_DecongestionEvent(struct rxevent *event, struct rx_peer *peer,
		      int nPackets)
{
    struct rx_call *call;
    struct rx_call *nxcall;   /* Next pointer for queue_Scan */

    peer->refCount--;		       /* It was bumped by the callee */
    peer->burst += nPackets;
    if (peer->burst > peer->burstSize)
	peer->burst = peer->burstSize;
    for (queue_Scan(&peer->congestionQueue, call, nxcall, rx_call)) {
	queue_Remove(call);

	/*
	 * The rxi_Start may put the call back on the congestion queue.  In
	 * that case, peer->burst should be 0 (otherwise no congestion was
	 * encountered).  It should go on the end of the queue, to allow
	 * other calls to proceed when the next burst is allowed
	 */
	rxi_Start((struct rxevent *) 0, call);
	if (!peer->burst)
	    return;
    }
}

/*
 * Schedule an event at a host-dependent time in the future which will
 * add back nPackets to the current allowed burst window.  Any number
 * of these events may be scheduled.
 */
void 
rxi_ScheduleDecongestionEvent(struct rx_call *call, int nPackets)
{
    struct rx_peer *peer = call->conn->peer;
    struct clock tmp;

    clock_GetTime(&tmp);
    clock_Add(&tmp, &peer->burstWait);
    peer->refCount++;		       /* So it won't disappear underneath
				        * us! */
    /* this is stupid  - sending an int as a pointer is begging for trouble */
    rxevent_Post(&tmp, rxi_DecongestionEvent, (void *) peer, (void *)nPackets);
}

/*
 * The caller wishes to have rxi_Start called when the burst count has
 * gone up, and more packets can therefore be sent.  Add the caller to
 * the end of the list of calls waiting for decongestion events to
 * happen.  It's important that it's added to the end so that the
 * rxi_DecongestionEvent procedure always terminates (aside from
 * matters of scheduling fairness).
 */
void 
rxi_CongestionWait(struct rx_call *call)
{
    if (queue_IsOnQueue(call))
	return;
    queue_Append(&call->conn->peer->congestionQueue, call);
}

/*
 * Compute round trip time of the packet provided, in *rttp.
 */
#ifdef ADAPT_PERF
#define ADAPT_RTO
#endif

void 
rxi_ComputeRoundTripTime(struct rx_packet *p, 
			 struct clock *sentp, 
			 struct rx_peer *peer)
{
    struct clock thisRtt, *rttp = &thisRtt;

#ifdef	ADAPT_RTO
    int rtt_timeout;
    static char id[] = "@(#)adaptive RTO";
    *id = *id; /* so it won't complain about unsed variables */

    clock_GetTime(rttp);
    if (clock_Lt(rttp, sentp)) {
	clock_Zero(rttp);
	return;			       /* somebody set the clock back, don't
				        * count this time. */
    }
    clock_Sub(rttp, sentp);
#else
    clock_GetTime(rttp);
    clock_Sub(rttp, &p->timeSent);
#endif
    if (clock_Lt(rttp, &rx_stats.minRtt))
	rx_stats.minRtt = *rttp;
    if (clock_Gt(rttp, &rx_stats.maxRtt)) {
	if (rttp->sec > 110)
	    return;		       /* somebody set the clock ahead */
	rx_stats.maxRtt = *rttp;
    }
    clock_Add(&rx_stats.totalRtt, rttp);
    rx_stats.nRttSamples++;

#ifdef	ADAPT_RTO
    /* better rtt calculation courtesy of UMich crew (dave,larry,peter,???) */

    /* Apply VanJacobson round-trip estimations */
    if (peer->rtt) {
	int delta;

	/*
         * srtt (peer->rtt) is in units of one-eighth-milliseconds.
	 * srtt is stored as fixed point with 3 bits after the binary
	 * point (i.e., scaled by 8). The following magic is
	 * equivalent to the smoothing algorithm in rfc793 with an
	 * alpha of .875 (srtt = rtt/8 + srtt*7/8 in fixed point).
         * srtt*8 = srtt*8 + rtt - srtt
         * srtt = srtt + rtt/8 - srtt/8
	 */

	delta = MSEC(rttp) - (peer->rtt >> 3);
	peer->rtt += delta;

	/*
	 * We accumulate a smoothed rtt variance (actually, a smoothed
	 * mean difference), then set the retransmit timer to smoothed
         * rtt + 4 times the smoothed variance (was 2x in van's original
         * paper, but 4x works better for me, and apparently for him as
         * well).
         * rttvar is stored as
	 * fixed point with 2 bits after the binary point (scaled by
	 * 4).  The following is equivalent to rfc793 smoothing with
	 * an alpha of .75 (rttvar = rttvar*3/4 + |delta| / 4).  This
	 * replaces rfc793's wired-in beta.
         * dev*4 = dev*4 + (|actual - expected| - dev)
	 */

	if (delta < 0)
	    delta = -delta;

	delta -= (peer->rtt_dev >> 2);
	peer->rtt_dev += delta;
    } else {
	peer->rtt = MSEC(rttp) << 3;
	peer->rtt_dev = peer->rtt >> 2;/* rtt/2 */
    }

    /*
     * the timeout is RTT + 4*MDEV + 0.4 sec   This is because one end or the
     * other of these connections is usually in a user process, and can be
     * switched and/or swapped out.  So on fast, reliable networks, the
     * timeout would otherwise be too short.
     */
    rtt_timeout = (peer->rtt >> 3) + peer->rtt_dev + 400;
    clock_Zero(&(peer->timeout));
    clock_Addmsec(&(peer->timeout), rtt_timeout);

    dpf(("rxi_ComputeRoundTripTime(rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%0.3d sec)\n",
	 MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2,
	 (peer->timeout.sec), (peer->timeout.usec)));
#endif				       /* ADAPT_RTO */
}


/*
 * Find all server connections that have not been active for a long time,
 * and toss them
 */
void 
rxi_ReapConnections(void)
{
    struct clock now;

    clock_GetTime(&now);

    /*
     * Find server connection structures that haven't been used for greater
     * than rx_idleConnectionTime
     */
    {
	struct rx_connection **conn_ptr, **conn_end;
	int i, havecalls = 0;

	for (conn_ptr = &rx_connHashTable[0],
	     conn_end = &rx_connHashTable[rx_hashTableSize];
	     conn_ptr < conn_end; conn_ptr++) {
	    struct rx_connection *conn, *next;

	    for (conn = *conn_ptr; conn; conn = next) {
		next = conn->next;
		/* once a minute look at everything to see what's up */
		havecalls = 0;
		for (i = 0; i < RX_MAXCALLS; i++) {
		    if (conn->call[i]) {
			havecalls = 1;
			rxi_CheckCall(conn->call[i]);
		    }
		}
		if (conn->type == RX_SERVER_CONNECTION) {

		    /*
		     * This only actually destroys the connection if there
		     * are no outstanding calls
		     */
		    if (!havecalls && !conn->refCount &&
			((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
			conn->refCount++;	/* it will be decr in
						 * rx_DestroyConn */
			rx_DestroyConnection(conn);
		    }
		}
	    }
	}
    }

    /*
     * Find any peer structures that haven't been used (haven't had an
     * associated connection) for greater than rx_idlePeerTime
     */
    {
	struct rx_peer **peer_ptr, **peer_end;

	for (peer_ptr = &rx_peerHashTable[0],
	     peer_end = &rx_peerHashTable[rx_hashTableSize];
	     peer_ptr < peer_end; peer_ptr++) {
	    struct rx_peer *peer, *next;

	    for (peer = *peer_ptr; peer; peer = next) {
		next = peer->next;
		if (peer->refCount == 0
		    && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
		    rxi_DestroyPeer(peer);
		}
	    }
	}
    }

    /*
     * THIS HACK IS A TEMPORARY HACK.  The idea is that the race condition in
     * rxi_AllocSendPacket, if it hits, will be handled at the next conn GC,
     * just below.  Really, we shouldn't have to keep moving packets from one
     * place to another, but instead ought to always know if we can afford to
     * hold onto a packet in its particular use.
     */
    MUTEX_ENTER(&rx_waitingForPackets_lock);
    if (rx_waitingForPackets) {
	rx_waitingForPackets = 0;
#ifdef	RX_ENABLE_LOCKS
	cv_signal(&rx_waitingForPackets_cv);
#else
	osi_rxWakeup(&rx_waitingForPackets);
#endif
    }
    MUTEX_EXIT(&rx_waitingForPackets_lock);

    now.sec += RX_REAP_TIME;	       /* Check every RX_REAP_TIME seconds */
    rxevent_Post(&now, rxi_ReapConnections, 0, 0);
}


/*
 * rxs_Release - This isn't strictly necessary but, since the macro name from
 * rx.h is sort of strange this is better.  This is called with a security
 * object before it is discarded.  Each connection using a security object has
 * its own refcount to the object so it won't actually be freed until the last
 * connection is destroyed.
 *
 * This is the only rxs module call.  A hold could also be written but no one
 * needs it.
 */

int 
rxs_Release(struct rx_securityClass *aobj)
{
    return RXS_Close(aobj);
}

#ifdef ADAPT_WINDOW
#define	RXRATE_PKT_OH	(RX_HEADER_SIZE + RX_IPUDP_SIZE)
#define	RXRATE_SMALL_PKT    (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
#define	RXRATE_AVG_SMALL_PKT	(RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
#define	RXRATE_LARGE_PKT    (RXRATE_SMALL_PKT + 256)

/*
 * Adjust our estimate of the transmission rate to this peer, given
 * that the packet p was just acked. We can adjust peer->timeout and
 * call->twind (and peer->maxWindow). Pragmatically, this is called
 * only with packets of maximal length.
 */

static void 
rxi_ComputeRate(struct rx_peer *peer, struct rx_call *call,
		struct rx_packet *p, struct rx_packet *ackp, u_char ackReason)
{
    long xferSize, xferMs;
    long minTime;
    struct clock newTO;

    /* Count down packets */
    if (peer->rateFlag > 0)
	peer->rateFlag--;
    /* Do nothing until we're enabled */
    if (peer->rateFlag != 0)
	return;
    if (!call->conn)
	return;

    /* Count only when the ack seems legitimate */
    switch (ackReason) {
    case RX_ACK_REQUESTED:
	xferSize = p->length + RX_HEADER_SIZE +
	    call->conn->securityMaxTrailerSize;
	xferMs = peer->rtt;
	break;

    case RX_ACK_PING_RESPONSE:
	if (p)			       /* want the response to ping-request,
				        * not data send */
	    return;
	clock_GetTime(&newTO);
	if (clock_Gt(&newTO, &call->pingRequestTime)) {
	    clock_Sub(&newTO, &call->pingRequestTime);
	    xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
	} else {
	    return;
	}
	xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
	break;

    default:
	return;
    }

    dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, "
         "rtt %u, win %u, ps %u)",
	 ntohl(peer->host), ntohs(peer->port),
	 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
	 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
	 peer->maxWindow, peer->packetSize));

    /* Track only packets that are big enough. */
    if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
	peer->packetSize)
	return;

    /* absorb RTT data (in milliseconds) for these big packets */
    if (peer->smRtt == 0) {
	peer->smRtt = xferMs;
    } else {
	peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
	if (!peer->smRtt)
	    peer->smRtt = 1;
    }

    if (peer->countDown) {
	peer->countDown--;
	return;
    }
    peer->countDown = 10;	       /* recalculate only every so often */

#if 0

    /*
     * We here assume that we can approximate the total elapsed time for a
     * window-full of full packets as: time = RTT + ((winSize *
     * (packetSize+overhead)) - minPktSize) / byteRate
     */
    /* The RTT and byteRate numbers are what is measured above. */

    /*
     * In principle, we can change the other parameters: - winSize, the
     * number of packets in the transmission window; - packetSize, the max
     * size of a data packet; - the timeout, which must be larger than the
     * expected time.
     */

    /*
     * In practice, we do this in two steps: (a) ensure that the timeout is
     * large enough for a single packet to get through; (b) ensure that the
     * transmit-window is small enough to fit in the timeout.
     */

    /* First, an expression for the expected RTT for a full packet */
    minTime = peer->smRtt + ((1000 * (peer->packetSize +
			    RX_HEADER_SIZE + RX_IPUDP_SIZE)) / peer->smBps);

    /* Get a reasonable estimate for a timeout period */
    minTime += minTime;
    newTO.sec = minTime / 1000;
    newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;

    /*
     * Increase the timeout period so that we can always do at least one
     * packet exchange
     */
    if (clock_Gt(&newTO, &peer->timeout)) {

	dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu "
	     "(rtt %u, win %u, ps %u, Bps %u)",
	     ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
	     peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
	     peer->maxWindow, peer->packetSize, peer->smBps));

	peer->timeout = newTO;
    }
    /* Now, get an estimate for the transmit window size. */
    minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);

    /*
     * Now, convert to the number of full packets that could fit in that
     * interval
     */
    minTime = ((((minTime - peer->smRtt) * peer->smBps) / 1000) +
	       RXRATE_AVG_SMALL_PKT) /
	(peer->packetSize + RX_HEADER_SIZE + RX_IPUDP_SIZE);
    minTime >>= 1;		       /* Take half that many */
    xferSize = minTime;		       /* (make a copy) */

    /* Now clamp the size to reasonable bounds. */
    if (minTime <= 1)
	minTime = 1;
    else if (minTime > rx_Window)
	minTime = rx_Window;
    if (minTime != peer->maxWindow) {
	dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, "
	     "rtt %u, ps %u, Bps %u)",
	     ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
	     peer->timeout.sec, peer->timeout.usec, peer->smRtt,
	     peer->packetSize, peer->smBps));

	peer->maxWindow = minTime;
	/* call->twind = minTime; */
    }

    /*
     * Cut back on the peer timeout if it has grown unreasonably. Discern
     * this by calculating the timeout necessary for rx_Window packets.
     */
    if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
	/* calculate estimate for transmission interval in milliseconds */
	minTime = (((1000 * rx_Window *
		     (peer->packetSize + RX_HEADER_SIZE + RX_IPUDP_SIZE))
		    - RXRATE_AVG_SMALL_PKT) / peer->smBps) + peer->smRtt;
	if (minTime < 1000) {

	    dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, "
		 "win %u, ps %u, Bps %u)",
		 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
		 peer->timeout.usec, peer->smRtt, peer->maxWindow,
		 peer->packetSize, peer->smBps));

	    newTO.sec = 0;	       /* cut back on timeout by half a
				        * second */
	    newTO.usec = 500000;
	    clock_Sub(&peer->timeout, &newTO);
	}
    }
#endif				       /* 0 */

    /*
     * In practice, we can measure only the RTT for full packets, because of
     * the way Rx acks the data that it receives.  (If it's smaller than a
     * full packet, it often gets implicitly acked either by the call
     * response (from a server) or by the next call (from a client), and
     * either case confuses transmission times with processing times.)
     * Therefore, replace the above more-sophisticated processing with a
     * simpler version, where the smoothed RTT is kept for full-size packets,
     * and the time to transmit a windowful of full-size packets is simply
     * RTT * windowSize. Again, we take two steps: - ensure the timeout is
     * large enough for a single packet's RTT; - ensure that the window is
     * small enough to fit in the desired timeout.
     */

    /* First, the timeout check. */
    minTime = peer->smRtt;
    /* Get a reasonable estimate for a timeout period */
    minTime += minTime;
    newTO.sec = minTime / 1000;
    newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;

    /*
     * Increase the timeout period so that we can always do at least one
     * packet exchange
     */
    if (clock_Gt(&newTO, &peer->timeout)) {

	dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, "
	     "win %u, ps %u)",
	     ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
	     peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
	     peer->maxWindow, peer->packetSize));

	peer->timeout = newTO;
    }
    /* Now, get an estimate for the transmit window size. */
    minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);

    /*
     * Now, convert to the number of full packets that could fit in a
     * reasonable fraction of that interval
     */
    minTime /= (peer->smRtt << 1);
    xferSize = minTime;		       /* (make a copy) */

    /* Now clamp the size to reasonable bounds. */
    if (minTime <= 1)
	minTime = 1;
    else if (minTime > rx_Window)
	minTime = rx_Window;
    if (minTime != peer->maxWindow) {
	dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, "
	     "rtt %u, ps %u)",
	     ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
	     peer->timeout.sec, peer->timeout.usec, peer->smRtt,
	     peer->packetSize));
	peer->maxWindow = minTime;
	/* call->twind = minTime; */
    }

    /*
     * Cut back on the peer timeout if it had earlier grown unreasonably.
     * Discern this by calculating the timeout necessary for rx_Window
     * packets.
     */
    if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
	/* calculate estimate for transmission interval in milliseconds */
	minTime = rx_Window * peer->smRtt;
	if (minTime < 1000) {
	    dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, "
		 "win %u, ps %u)",
		 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
		 peer->timeout.usec, peer->smRtt, peer->maxWindow,
		 peer->packetSize));

	    newTO.sec = 0;	       /* cut back on timeout by half a
				        * second */
	    newTO.usec = 500000;
	    clock_Sub(&peer->timeout, &newTO);
	}
    }
    return;
}				       /* end of rxi_ComputeRate */

#endif				       /* ADAPT_WINDOW */






#ifdef RXDEBUG
/* Don't call this debugging routine directly; use dpf */
void
rxi_DebugPrint(const char *fmt, ...)
{
    struct clock now;
    va_list ap;

    clock_GetTime(&now);
    
    fprintf(Log, " %lu.%.3lu:", now.sec, now.usec / 1000);
    va_start(ap, fmt);
    vfprintf(Log, fmt, ap);
    va_end(ap);
    putc('\n', Log);
}

#endif

#if defined(RXDEBUG) || 1
void 
rx_PrintTheseStats(FILE *file, struct rx_stats *s, int size)
{
    int i;

    if (size != sizeof(struct rx_stats))
	fprintf(file, "Unexpected size of stats structure: was %d, "
		"expected %d\n", 
		size, 
		(int)sizeof(struct rx_stats));

    fprintf(file, 
	    "rx stats: free packets %d, allocs %d, "
	    "alloc-failures(rcv %d,send %d,ack %d)\n", rx_nFreePackets,
	    s->packetRequests, s->noPackets[0], s->noPackets[1],
	    s->noPackets[2]);
    fprintf(file, 
	    "   greedy %d, bogusReads %d (last from host %x), "
	    "noPackets %d, noBuffers %d, selects %d, sendSelects %d\n", 
	    s->socketGreedy, s->bogusPacketOnRead, s->bogusHost, 
	    s->noPacketOnRead, s->noPacketBuffersOnRead, s->selects, 
	    s->sendSelects);
    fprintf(file, "   packets read: ");
    for (i = 0; i < RX_N_PACKET_TYPES; i++)
	fprintf(file, "%s %d ", rx_packetTypes[i], s->packetsRead[i]);

    fprintf(file, "\n");
    fprintf(file,
	    "   other read counters: data %d, ack %d, dup %d "
	    "spurious %d\n", s->dataPacketsRead, s->ackPacketsRead,
	    s->dupPacketsRead, s->spuriousPacketsRead);
    fprintf(file, "   packets sent: ");
    for (i = 0; i < RX_N_PACKET_TYPES; i++)
	fprintf(file, "%s %d ", rx_packetTypes[i], s->packetsSent[i]);
    fprintf(file, "\n");
    fprintf(file,
	    "   other send counters: ack %d, data %d (not resends), "
	    "resends %d, pushed %d, acked&ignored %d\n", s->ackPacketsSent,
	    s->dataPacketsSent, s->dataPacketsReSent, s->dataPacketsPushed,
	    s->ignoreAckedPacket);
    fprintf(file,
	    "   \t(these should be small) sendFailed %d, "
	    "fatalErrors %ld\n",
	    s->netSendFailures, s->fatalErrors);
    if (s->nRttSamples) {
	fprintf(file, "   Average rtt is %0.3f, with %d samples\n",
		clock_Float(&s->totalRtt) / s->nRttSamples, s->nRttSamples);

	fprintf(file, "   Minimum rtt is %0.3f, maximum is %0.3f\n",
		clock_Float(&s->minRtt), clock_Float(&s->maxRtt));
    }
    fprintf(file,
	    "   %d server connections, %d client connections, %d "
	    "peer structs, %d call structs, %d free call structs\n",
	    s->nServerConns, s->nClientConns, s->nPeerStructs,
	    s->nCallStructs, s->nFreeCallStructs);
    fprintf(file, "   %d clock updates\n", clock_nUpdates);
}

/* for backward compatibility */
void 
rx_PrintStats(FILE *file)
{
    rx_PrintTheseStats(file, &rx_stats, sizeof(rx_stats));
}

void 
rx_PrintPeerStats(FILE *file, struct rx_peer *peer)
{
    fprintf(file, "Peer %lx.%d.  Burst size %d, burst wait %lu.%ld.\n",
	    (unsigned long)ntohl(peer->host), 
	    peer->port, 
	    peer->burstSize,
	    (unsigned long)peer->burstWait.sec, 
	    (unsigned long)peer->burstWait.usec);
    fprintf(file, "   Rtt %d, retry time %lu.%06ld, total sent %d, resent %d\n",
	    peer->rtt, 
	    (unsigned long)peer->timeout.sec, 
	    (long)peer->timeout.usec, 
	    peer->nSent,
	    peer->reSends);
    fprintf(file,
	    "   Packet size %d, max in packet skew %ld, max out packet "
	    "skew %ld\n", peer->packetSize, peer->inPacketSkew,
	    peer->outPacketSkew);
}

#endif				       /* RXDEBUG */

void
shutdown_rx(void)
{
    struct rx_serverQueueEntry *np;
    int i, j;

    rxinit_status = 0;
    {
	struct rx_peer **peer_ptr, **peer_end;

	for (peer_ptr = &rx_peerHashTable[0],
	     peer_end = &rx_peerHashTable[rx_hashTableSize];
	     peer_ptr < peer_end; peer_ptr++) {
	    struct rx_peer *peer, *next;

	    for (peer = *peer_ptr; peer; peer = next) {
		next = peer->next;
		rxi_DestroyPeer(peer);
	    }
	}
    }
    for (i = 0; i < RX_MAX_SERVICES; i++) {
	if (rx_services[i])
	    rxi_Free(rx_services[i], sizeof(*rx_services));
    }
    for (i = 0; i < rx_hashTableSize; i++) {
	struct rx_connection *tc, *ntc;

	for (tc = rx_connHashTable[i]; tc; tc = ntc) {
	    ntc = tc->next;
	    for (j = 0; j < RX_MAXCALLS; j++) {
		if (tc->call[j]) {
		    rxi_Free(tc->call[j], sizeof(*(tc->call)));
		}
	    }
	    rxi_Free(tc, sizeof(tc));
	}
    }

    MUTEX_ENTER(&freeSQEList_lock);

    while ((np = rx_FreeSQEList) != NULL) {
	rx_FreeSQEList = *(struct rx_serverQueueEntry **) np;
	MUTEX_DESTROY(&np->lock);
	rxi_Free(np, sizeof(np));
    }

    MUTEX_EXIT(&freeSQEList_lock);
    MUTEX_DESTROY(&freeSQEList_lock);
    MUTEX_DESTROY(&rx_waitingForPackets_lock);
    MUTEX_DESTROY(&rx_freeCallQueue_lock);

    osi_Free(rx_connHashTable, 
	     rx_hashTableSize * sizeof(struct rx_connection *));
    osi_Free(rx_peerHashTable, rx_hashTableSize * sizeof(struct rx_peer *));
    osi_Free(rx_allocedP, sizeof(struct rx_packet) * rx_nPackets);

    UNPIN(rx_connHashTable, rx_hashTableSize * sizeof(struct rx_connection *));
    UNPIN(rx_peerHashTable, rx_hashTableSize * sizeof(struct rx_peer *));
    UNPIN(rx_allocedP, sizeof(struct rx_packet) * rx_nPackets);

    rxi_FreeAllPackets();

    rxi_dataQuota = RX_MAX_QUOTA;
    rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
}
