compile on modern FreeBSD. After Robert Watson <rwatson@FreeBSD.org> and Alec Kloss
[arla.git] / rx / rx.c
blob45ad52cb597e76cf657d14c1616c48405d1c7fb5
1 /*
2 ****************************************************************************
3 * Copyright IBM Corporation 1988, 1989 - All Rights Reserved *
4 * *
5 * Permission to use, copy, modify, and distribute this software and its *
6 * documentation for any purpose and without fee is hereby granted, *
7 * provided that the above copyright notice appear in all copies and *
8 * that both that copyright notice and this permission notice appear in *
9 * supporting documentation, and that the name of IBM not be used in *
10 * advertising or publicity pertaining to distribution of the software *
11 * without specific, written prior permission. *
12 * *
13 * IBM DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL *
14 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL IBM *
15 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY *
16 * DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER *
17 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING *
18 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *
19 ****************************************************************************
22 /* RX: Extended Remote Procedure Call */
24 #include <assert.h>
25 #include "rx_locl.h"
27 RCSID("$Id$");
30 * quota system: each attached server process must be able to make
31 * progress to avoid system deadlock, so we ensure that we can always
32 * handle the arrival of the next unacknowledged data packet for an
33 * attached call. rxi_dataQuota gives the max # of packets that must be
34 * reserved for active calls for them to be able to make progress, which is
35 * essentially enough to queue up a window-full of packets (the first packet
36 * may be missing, so these may not get read) + the # of packets the thread
37 * may use before reading all of its input (# free must be one more than send
38 * packet quota). Thus, each thread allocates rx_Window+1 (max queued input
39 * packets) + an extra for sending data. The system also reserves
40 * RX_MAX_QUOTA (must be more than RX_PACKET_QUOTA[i], which is 10), so that
41 * the extra packet can be sent (must be under the system-wide send packet
42 * quota to send any packets)
44 /* # to reserve so that thread with input can still make calls (send packets)
45 without blocking */
46 long rx_tq_dropped = 0; /* Solaris only temp variable */
47 long rxi_dataQuota = RX_MAX_QUOTA; /* packets to reserve for active
48 * threads */
51 * Variables for handling the minProcs implementation. availProcs gives the
52 * number of threads available in the pool at this moment (not counting dudes
53 * executing right now). totalMin gives the total number of procs required
54 * for handling all minProcs requests. minDeficit is a dynamic variable
55 * tracking the # of procs required to satisfy all of the remaining minProcs
56 * demands.
59 long rxi_availProcs = 0; /* number of threads in the pool */
60 long rxi_totalMin; /* Sum(minProcs) forall services */
61 long rxi_minDeficit = 0; /* number of procs needed to handle
62 * all minProcs */
63 long Rx0 = 0, Rx1 = 0;
65 struct rx_serverQueueEntry *rx_waitForPacket = 0;
66 struct rx_packet *rx_allocedP = 0;
68 /* ------------Exported Interfaces------------- */
72 * This function allows rxkad to set the epoch to a suitably random number
73 * which rx_NewConnection will use in the future. The principle purpose is to
74 * get rxnull connections to use the same epoch as the rxkad connections do, at
75 * least once the first rxkad connection is established. This is important now
76 * that the host/port addresses aren't used in FindConnection: the uniqueness
77 * of epoch/cid matters and the start time won't do.
80 void
81 rx_SetEpoch(uint32_t epoch)
83 rx_epoch = epoch;
87 * Initialize rx. A port number may be mentioned, in which case this
88 * becomes the default port number for any service installed later.
89 * If 0 is provided for the port number, a random port will be chosen
90 * by the kernel. Whether this will ever overlap anything in
91 * /etc/services is anybody's guess... Returns 0 on success, -1 on
92 * error.
94 static int rxinit_status = 1;
96 int
97 rx_Init(uint16_t port)
99 struct timeval tv;
100 char *htable, *ptable;
101 uint16_t rport;
103 SPLVAR;
105 if (rxinit_status != 1)
106 return rxinit_status; /* Already done; return previous error
107 * code. */
110 * Allocate and initialize a socket for client and perhaps server
111 * connections
113 rx_socket = rxi_GetUDPSocket(port, &rport);
114 rx_socket_icmp = rxi_GetICMPSocket();
116 if (rx_socket == OSI_NULLSOCKET) {
117 return RX_ADDRINUSE;
119 #if defined(AFS_GLOBAL_SUNLOCK) && defined(KERNEL)
120 LOCK_INIT(&afs_rxglobal_lock, "afs_rxglobal_lock");
121 #endif
122 #ifdef RX_ENABLE_LOCKS
123 LOCK_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock");
124 LOCK_INIT(&freeSQEList_lock, "freeSQEList lock");
125 LOCK_INIT(&rx_waitingForPackets_lock, "rx_waitingForPackets lock");
126 LOCK_INIT(&rx_freeCallQueue_lock, "rx_waitingForPackets lock");
127 #endif
129 rxi_nCalls = 0;
130 rx_connDeadTime = 12;
131 memset((char *) &rx_stats, 0, sizeof(struct rx_stats));
133 htable = (char *) osi_Alloc(rx_hashTableSize *
134 sizeof(struct rx_connection *));
135 PIN(htable, rx_hashTableSize *
136 sizeof(struct rx_connection *)); /* XXXXX */
137 memset(htable, 0, rx_hashTableSize *
138 sizeof(struct rx_connection *));
139 ptable = (char *) osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
140 PIN(ptable, rx_hashTableSize * sizeof(struct rx_peer *)); /* XXXXX */
141 memset(ptable, 0, rx_hashTableSize * sizeof(struct rx_peer *));
142 NETPRI;
144 osi_GetTime(&tv);
146 * *Slightly* random start time for the cid. This is just to help
147 * out with the hashing function at the peer
149 rx_port = rport;
150 rx_stats.minRtt.sec = 9999999;
151 rx_SetEpoch(tv.tv_sec); /*
152 * Start time of this package, rxkad
153 * will provide a randomer value.
155 rxi_dataQuota += rx_extraQuota; /*
156 * + extra packets caller asked to
157 * reserve
159 rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
160 rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
161 rx_connHashTable = (struct rx_connection **) htable;
162 rx_peerHashTable = (struct rx_peer **) ptable;
164 clock_Init();
165 rxevent_Init(20, rxi_ReScheduleEvents);
167 /* Malloc up a bunch of packet buffers */
168 rx_nFreePackets = 0;
169 queue_Init(&rx_freePacketQueue);
170 rx_nFreeCbufs = 0;
171 queue_Init(&rx_freeCbufQueue);
172 USERPRI;
173 rxi_MorePackets(rx_nPackets);
174 rxi_MoreCbufs(rx_nPackets);
175 rx_CheckCbufs(0);
176 NETPRI;
178 rx_lastAckDelay.sec = 0;
179 rx_lastAckDelay.usec = 400000; /* 400 ms */
180 #ifdef SOFT_ACK
181 rx_hardAckDelay.sec = 0;
182 rx_hardAckDelay.usec = 100000; /* 100 ms */
183 rx_softAckDelay.sec = 0;
184 rx_softAckDelay.usec = 100000; /* 100 ms */
185 #endif /* SOFT_ACK */
187 /* Initialize various global queues */
188 queue_Init(&rx_idleServerQueue);
189 queue_Init(&rx_incomingCallQueue);
190 queue_Init(&rx_freeCallQueue);
193 * Start listener process (exact function is dependent on the
194 * implementation environment--kernel or user space)
196 rxi_StartListener();
198 USERPRI;
199 rxinit_status = 0;
200 return rxinit_status;
204 * called with unincremented nRequestsRunning to see if it is OK to start
205 * a new thread in this service. Could be "no" for two reasons: over the
206 * max quota, or would prevent others from reaching their min quota.
208 static int
209 QuotaOK(struct rx_service * aservice)
211 /* check if over max quota */
212 if (aservice->nRequestsRunning >= aservice->maxProcs)
213 return 0;
215 /* under min quota, we're OK */
216 if (aservice->nRequestsRunning < aservice->minProcs)
217 return 1;
220 * otherwise, can use only if there are enough to allow everyone to go to
221 * their min quota after this guy starts.
223 if (rxi_availProcs > rxi_minDeficit)
224 return 1;
225 else
226 return 0;
231 * This routine must be called if any services are exported. If the
232 * donateMe flag is set, the calling process is donated to the server
233 * process pool
235 void
236 rx_StartServer(int donateMe)
238 struct rx_service *service;
239 int i;
241 SPLVAR;
242 clock_NewTime();
244 NETPRI;
246 * Start server processes, if necessary (exact function is dependent
247 * on the implementation environment--kernel or user space). DonateMe
248 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
249 * case, one less new proc will be created rx_StartServerProcs.
251 rxi_StartServerProcs(donateMe);
254 * count up the # of threads in minProcs, and add set the min deficit to
255 * be that value, too.
257 for (i = 0; i < RX_MAX_SERVICES; i++) {
258 service = rx_services[i];
259 if (service == (struct rx_service *) 0)
260 break;
261 rxi_totalMin += service->minProcs;
263 * below works even if a thread is running, since minDeficit would
264 * still have been decremented and later re-incremented.
266 rxi_minDeficit += service->minProcs;
269 /* Turn on reaping of idle server connections */
270 rxi_ReapConnections();
272 if (donateMe)
273 rx_ServerProc(); /* Never returns */
274 USERPRI;
275 return;
279 * Create a new client connection to the specified service, using the
280 * specified security object to implement the security model for this
281 * connection.
283 struct rx_connection *
284 rx_NewConnection(uint32_t shost, uint16_t sport, uint16_t sservice,
285 struct rx_securityClass * securityObject,
286 int serviceSecurityIndex)
288 int hashindex, error;
289 long cid;
290 struct rx_connection *conn;
292 SPLVAR;
293 #if defined(AFS_SGIMP_ENV)
294 GLOCKSTATE ms;
296 #endif
298 clock_NewTime();
299 dpf(("rx_NewConnection(host %x, port %u, service %u, "
300 "securityObject %x, serviceSecurityIndex %d)\n",
301 shost, sport, sservice, securityObject, serviceSecurityIndex));
302 GLOBAL_LOCK();
303 #if defined(AFS_SGIMP_ENV)
304 AFS_GRELEASE(&ms);
305 /* NETPRI protects Cid and Alloc */
306 NETPRI;
307 #endif
308 cid = (rx_nextCid += RX_MAXCALLS);
309 conn = rxi_AllocConnection();
310 #if !defined(AFS_SGIMP_ENV)
311 NETPRI;
312 #endif
313 conn->type = RX_CLIENT_CONNECTION;
314 conn->cid = cid;
315 conn->epoch = rx_epoch;
316 conn->peer = rxi_FindPeer(shost, sport);
317 queue_Append(&conn->peer->connQueue, conn);
318 conn->serviceId = sservice;
319 conn->securityObject = securityObject;
320 conn->securityData = (void *) 0;
321 conn->securityIndex = serviceSecurityIndex;
322 conn->maxPacketSize = MIN(conn->peer->packetSize, OLD_MAX_PACKET_SIZE);
323 rx_SetConnDeadTime(conn, rx_connDeadTime);
325 error = RXS_NewConnection(securityObject, conn);
326 if (error)
327 conn->error = error;
328 hashindex = CONN_HASH(shost, sport, conn->cid, conn->epoch,
329 RX_CLIENT_CONNECTION);
330 conn->next = rx_connHashTable[hashindex];
331 rx_connHashTable[hashindex] = conn;
332 rx_stats.nClientConns++;
334 conn->refCount++;
335 USERPRI;
336 #if defined(AFS_SGIMP_ENV)
337 AFS_GACQUIRE(&ms);
338 #endif
339 GLOBAL_UNLOCK();
340 return conn;
343 void
344 rxi_SetConnDeadTime(struct rx_connection * conn, int seconds)
347 * This is completely silly; perhaps exponential back off
348 * would be more reasonable? XXXX
350 conn->secondsUntilDead = seconds;
351 conn->secondsUntilPing = seconds / 6;
352 if (conn->secondsUntilPing == 0)
353 conn->secondsUntilPing = 1; /* XXXX */
356 /* Destroy the specified connection */
357 void
358 rx_DestroyConnection(struct rx_connection * conn)
360 struct rx_connection **conn_ptr;
361 int i;
362 int havecalls = 0;
364 SPLVAR;
366 clock_NewTime();
368 NETPRI;
369 if (--conn->refCount > 0) {
370 /* Busy; wait till the last guy before proceeding */
371 USERPRI;
372 return;
375 * If the client previously called rx_NewCall, but it is still
376 * waiting, treat this as a running call, and wait to destroy the
377 * connection later when the call completes.
379 if ((conn->type == RX_CLIENT_CONNECTION) &&
380 (conn->flags & RX_CONN_MAKECALL_WAITING)) {
381 conn->flags |= RX_CONN_DESTROY_ME;
382 USERPRI;
383 return;
385 /* Check for extant references to this connection */
386 for (i = 0; i < RX_MAXCALLS; i++) {
387 struct rx_call *call = conn->call[i];
389 if (call) {
390 havecalls = 1;
391 if (conn->type == RX_CLIENT_CONNECTION) {
392 if (call->delayedAckEvent) {
394 * Push the final acknowledgment out now--there
395 * won't be a subsequent call to acknowledge the
396 * last reply packets
398 rxevent_Cancel(call->delayedAckEvent);
399 rxi_AckAll((struct rxevent *) 0, call, 0);
405 if (havecalls) {
407 * Don't destroy the connection if there are any call
408 * structures still in use
410 conn->flags |= RX_CONN_DESTROY_ME;
411 USERPRI;
412 return;
414 /* Remove from connection hash table before proceeding */
415 conn_ptr = &rx_connHashTable[CONN_HASH(peer->host, peer->port, conn->cid,
416 conn->epoch, conn->type)];
417 for (; *conn_ptr; conn_ptr = &(*conn_ptr)->next) {
418 if (*conn_ptr == conn) {
419 *conn_ptr = conn->next;
420 break;
425 * Notify the service exporter, if requested, that this connection
426 * is being destroyed
428 if (conn->type == RX_SERVER_CONNECTION && conn->service->destroyConnProc)
429 (*conn->service->destroyConnProc) (conn);
431 /* Notify the security module that this connection is being destroyed */
432 RXS_DestroyConnection(conn->securityObject, conn);
434 /* Make sure that the connection is completely reset before deleting it. */
435 rxi_ResetConnection(conn);
437 queue_Remove(conn);
438 if (--conn->peer->refCount == 0)
439 conn->peer->idleWhen = clock_Sec();
441 if (conn->type == RX_SERVER_CONNECTION)
442 rx_stats.nServerConns--;
443 else
444 rx_stats.nClientConns--;
447 RX_MUTEX_DESTROY(&conn->lock);
448 rxi_FreeConnection(conn);
449 USERPRI;
453 * Start a new rx remote procedure call, on the specified connection.
454 * If wait is set to 1, wait for a free call channel; otherwise return
455 * 0. Maxtime gives the maximum number of seconds this call may take,
456 * after rx_MakeCall returns. After this time interval, a call to any
457 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
459 struct rx_call *
460 rx_NewCall(struct rx_connection * conn)
462 int i;
463 struct rx_call *call;
465 SPLVAR;
466 #if defined(AFS_SGIMP_ENV)
467 GLOCKSTATE ms;
469 #endif
470 clock_NewTime();
471 dpf(("rx_NewCall(conn %x)\n", conn));
473 GLOBAL_LOCK();
474 #if defined(AFS_SGIMP_ENV)
475 AFS_GRELEASE(&ms);
476 #endif
477 NETPRI;
478 for (;;) {
479 for (i = 0; i < RX_MAXCALLS; i++) {
480 call = conn->call[i];
481 if (call) {
482 if (call->state == RX_STATE_DALLY) {
483 RX_MUTEX_ENTER(&call->lock);
484 rxi_ResetCall(call);
485 (*call->callNumber)++;
486 break;
488 } else {
489 call = rxi_NewCall(conn, i);
490 RX_MUTEX_ENTER(&call->lock);
491 break;
494 if (i < RX_MAXCALLS) {
495 break;
497 conn->flags |= RX_CONN_MAKECALL_WAITING;
498 #ifdef RX_ENABLE_LOCKS
499 cv_wait(&conn->cv, &conn->lock);
500 #else
501 osi_rxSleep(conn);
502 #endif
505 /* Client is initially in send mode */
506 call->state = RX_STATE_ACTIVE;
507 call->mode = RX_MODE_SENDING;
509 /* remember start time for call in case we have hard dead time limit */
510 call->startTime = clock_Sec();
512 /* Turn on busy protocol. */
513 rxi_KeepAliveOn(call);
515 RX_MUTEX_EXIT(&call->lock);
516 USERPRI;
517 #if defined(AFS_SGIMP_ENV)
518 AFS_GACQUIRE(&ms);
519 #endif
520 GLOBAL_UNLOCK();
521 return call;
524 static int
525 rxi_HasActiveCalls(struct rx_connection * aconn)
527 int i;
528 struct rx_call *tcall;
530 SPLVAR;
532 NETPRI;
533 for (i = 0; i < RX_MAXCALLS; i++) {
534 if ((tcall = aconn->call[i]) != NULL) {
535 if ((tcall->state == RX_STATE_ACTIVE)
536 || (tcall->state == RX_STATE_PRECALL)) {
537 USERPRI;
538 return 1;
542 USERPRI;
543 return 0;
547 rxi_GetCallNumberVector(const struct rx_connection * aconn,
548 int32_t *alongs)
550 int i;
551 struct rx_call *tcall;
553 SPLVAR;
555 NETPRI;
556 for (i = 0; i < RX_MAXCALLS; i++) {
557 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
558 alongs[i] = aconn->callNumber[i] + 1;
559 else
560 alongs[i] = aconn->callNumber[i];
562 USERPRI;
563 return 0;
567 rxi_SetCallNumberVector(struct rx_connection * aconn,
568 int32_t *alongs)
570 int i;
571 struct rx_call *tcall;
573 SPLVAR;
575 NETPRI;
576 for (i = 0; i < RX_MAXCALLS; i++) {
577 if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
578 aconn->callNumber[i] = alongs[i] - 1;
579 else
580 aconn->callNumber[i] = alongs[i];
582 USERPRI;
583 return 0;
587 * Advertise a new service. A service is named locally by a UDP port
588 * number plus a 16-bit service id. Returns (struct rx_service *) 0
589 * on a failure.
591 struct rx_service *
592 rx_NewService(uint16_t port, uint16_t serviceId, char *serviceName,
593 struct rx_securityClass ** securityObjects,
594 int nSecurityObjects,
595 int32_t (*serviceProc) (struct rx_call *))
597 osi_socket asocket = OSI_NULLSOCKET;
598 struct rx_securityClass **sersec;
599 struct rx_service *tservice;
600 int reuse = 0;
601 int i;
603 SPLVAR;
605 clock_NewTime();
607 if (serviceId == 0) {
608 osi_Msg(("rx_NewService: service id for service %s is not"
609 " non-zero.\n", serviceName));
610 return 0;
612 if (port == 0) {
613 if (rx_port == 0) {
614 osi_Msg(("rx_NewService: A non-zero port must be specified on "
615 "this call if a non-zero port was not provided at Rx "
616 "initialization (service %s).\n", serviceName));
617 return 0;
619 port = rx_port;
620 asocket = rx_socket;
622 sersec = (void *)rxi_Alloc(sizeof(*sersec) * nSecurityObjects);
623 for (i = 0; i < nSecurityObjects; i++)
624 sersec[i] = securityObjects[i];
625 tservice = rxi_AllocService();
626 NETPRI;
627 for (i = 0; i < RX_MAX_SERVICES; i++) {
628 struct rx_service *service = rx_services[i];
630 if (service) {
631 if (port == service->servicePort) {
632 if (service->serviceId == serviceId) {
634 * The identical service has already been
635 * installed; if the caller was intending to
636 * change the security classes used by this
637 * service, he/she loses.
639 osi_Msg(("rx_NewService: tried to install service %s "
640 "with service id %d, which is already in use "
641 "for service %s\n", serviceName, serviceId,
642 service->serviceName));
643 USERPRI;
644 rxi_FreeService(tservice);
645 return service;
648 * Different service, same port: re-use the socket which is
649 * bound to the same port
651 asocket = service->socket;
652 reuse = 1;
654 } else {
655 if (asocket == OSI_NULLSOCKET) {
658 * If we don't already have a socket (from another service on
659 * same port) get a new one
661 asocket = rxi_GetUDPSocket(port, NULL);
662 if (asocket == OSI_NULLSOCKET) {
663 USERPRI;
664 rxi_FreeService(tservice);
665 return 0;
668 service = tservice;
669 service->socket = asocket;
670 service->servicePort = port;
671 service->serviceId = serviceId;
672 service->serviceName = serviceName;
673 service->nSecurityObjects = nSecurityObjects;
674 service->securityObjects = sersec;
675 service->minProcs = 0;
676 service->maxProcs = 1;
677 service->idleDeadTime = 60;
678 service->connDeadTime = rx_connDeadTime;
679 service->executeRequestProc = serviceProc;
680 rx_services[i] = service; /* not visible until now */
681 for (i = 0; i < nSecurityObjects; i++) {
682 if (securityObjects[i])
683 RXS_NewService(securityObjects[i], service, reuse);
685 USERPRI;
686 return service;
689 USERPRI;
690 rxi_FreeService(tservice);
691 osi_Msg(("rx_NewService: cannot support > %d services\n",
692 RX_MAX_SERVICES));
693 return 0;
697 * This is the server process's request loop. This routine should
698 * either be each server proc's entry point, or it should be called by
699 * the server process (depending upon the rx implementation
700 * environment).
702 void
703 rx_ServerProc(void)
705 struct rx_call *call;
706 int32_t code;
707 struct rx_service *tservice;
709 #if defined(AFS_SGIMP_ENV)
710 SPLVAR;
711 GLOCKSTATE ms;
713 AFS_GRELEASE(&ms);
714 NETPRI;
715 #endif
717 rxi_dataQuota += rx_Window + 2; /*
718 * reserve a window of packets for
719 * hard times
721 rxi_MorePackets(rx_Window + 2); /* alloc more packets, too */
722 rxi_availProcs++; /*
723 * one more thread handling incoming
724 * calls
726 #if defined(AFS_SGIMP_ENV)
727 USERPRI;
728 #endif
729 for (;;) {
731 call = rx_GetCall();
732 #ifdef KERNEL
733 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
734 RX_MUTEX_ENTER(&afs_termStateLock);
735 afs_termState = AFSOP_STOP_AFS;
736 #ifdef RX_ENABLE_LOCKS
737 cv_signal(&afs_termStateCv);
738 #else
739 osi_rxWakeup(&afs_termState);
740 #endif
741 RX_MUTEX_EXIT(&afs_termStateLock);
742 return;
744 #endif
745 tservice = call->conn->service;
746 #if defined(AFS_SGIMP_ENV)
747 AFS_GLOCK();
748 #endif
749 if (tservice->beforeProc)
750 (*tservice->beforeProc) (call);
752 code = call->conn->service->executeRequestProc(call);
754 if (tservice->afterProc)
755 (*tservice->afterProc) (call, code);
757 rx_EndCall(call, code);
758 rxi_nCalls++;
759 #if defined(AFS_SGIMP_ENV)
760 AFS_GUNLOCK();
761 #endif
767 * Sleep until a call arrives. Returns a pointer to the call, ready
768 * for an rx_Read.
770 struct rx_call *
771 rx_GetCall(void)
773 struct rx_serverQueueEntry *sq;
774 struct rx_call *call = (struct rx_call *) 0;
775 struct rx_service *service = NULL;
777 SPLVAR;
779 GLOBAL_LOCK();
780 RX_MUTEX_ENTER(&freeSQEList_lock);
782 if ((sq = rx_FreeSQEList) != NULL) {
783 rx_FreeSQEList = *(struct rx_serverQueueEntry **) sq;
784 RX_MUTEX_EXIT(&freeSQEList_lock);
785 } else { /* otherwise allocate a new one and
786 * return that */
787 RX_MUTEX_EXIT(&freeSQEList_lock);
788 sq = (struct rx_serverQueueEntry *) rxi_Alloc(sizeof(struct rx_serverQueueEntry));
789 #ifdef RX_ENABLE_LOCKS
790 LOCK_INIT(&sq->lock, "server Queue lock");
791 #endif
793 RX_MUTEX_ENTER(&sq->lock);
794 #if defined(AFS_SGIMP_ENV)
795 ASSERT(!isafs_glock());
796 #endif
797 NETPRI;
799 if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
800 struct rx_call *tcall, *ncall;
803 * Scan for eligible incoming calls. A call is not eligible
804 * if the maximum number of calls for its service type are
805 * already executing
807 for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
808 service = tcall->conn->service;
809 if (QuotaOK(service)) {
810 call = tcall;
811 break;
815 if (call) {
816 queue_Remove(call);
817 call->flags &= (~RX_CALL_WAIT_PROC);
818 service->nRequestsRunning++;
820 * just started call in minProcs pool, need fewer to maintain
821 * guarantee
823 if (service->nRequestsRunning <= service->minProcs)
824 rxi_minDeficit--;
825 rxi_availProcs--;
826 rx_nWaiting--;
827 } else {
829 * If there are no eligible incoming calls, add this process
830 * to the idle server queue, to wait for one
832 sq->newcall = 0;
833 queue_Append(&rx_idleServerQueue, sq);
834 rx_waitForPacket = sq;
835 do {
836 #ifdef RX_ENABLE_LOCKS
837 cv_wait(&sq->cv, &sq->lock);
838 #else
839 osi_rxSleep(sq);
840 #endif
841 #ifdef KERNEL
842 if (afs_termState == AFSOP_STOP_RXCALLBACK) {
843 GLOBAL_UNLOCK();
844 USERPRI;
845 return (struct rx_call *) 0;
847 #endif
848 } while (!(call = sq->newcall));
850 RX_MUTEX_EXIT(&sq->lock);
852 RX_MUTEX_ENTER(&freeSQEList_lock);
853 *(struct rx_serverQueueEntry **) sq = rx_FreeSQEList;
854 rx_FreeSQEList = sq;
855 RX_MUTEX_EXIT(&freeSQEList_lock);
857 call->state = RX_STATE_ACTIVE;
858 call->mode = RX_MODE_RECEIVING;
859 if (queue_IsEmpty(&call->rq)) {
860 /* we can't schedule a call if there's no data!!! */
861 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY);
863 rxi_calltrace(RX_CALL_START, call);
864 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
865 call->conn->service->servicePort,
866 call->conn->service->serviceId, call));
868 GLOBAL_UNLOCK();
869 USERPRI;
871 return call;
877 * Establish a procedure to be called when a packet arrives for a
878 * call. This routine will be called at most once after each call,
879 * and will also be called if there is an error condition on the or
880 * the call is complete. Used by multi rx to build a selection
881 * function which determines which of several calls is likely to be a
882 * good one to read from.
883 * NOTE: the way this is currently implemented it is probably only a
884 * good idea to (1) use it immediately after a newcall (clients only)
885 * and (2) only use it once. Other uses currently void your warranty
887 void
888 rx_SetArrivalProc(struct rx_call * call, void (*proc) (),
889 void *handle, void *arg)
891 call->arrivalProc = proc;
892 call->arrivalProcHandle = handle;
893 call->arrivalProcArg = arg;
897 * Call is finished (possibly prematurely). Return rc to the peer, if
898 * appropriate, and return the final error code from the conversation
899 * to the caller
901 int32_t
902 rx_EndCall(struct rx_call * call, int32_t rc)
904 struct rx_connection *conn = call->conn;
905 struct rx_service *service;
906 int32_t error;
908 SPLVAR;
909 #if defined(AFS_SGIMP_ENV)
910 GLOCKSTATE ms;
912 #endif
914 dpf(("rx_EndCall(call %x)\n", call));
916 #if defined(AFS_SGIMP_ENV)
917 AFS_GRELEASE(&ms);
918 #endif
919 NETPRI;
920 GLOBAL_LOCK();
921 RX_MUTEX_ENTER(&call->lock);
923 call->arrivalProc = (void (*) ()) 0;
924 if (rc && call->error == 0) {
925 rxi_CallError(call, rc);
927 * Send an abort message to the peer if this error code has
928 * only just been set. If it was set previously, assume the
929 * peer has already been sent the error code or will request it
931 rxi_SendCallAbort(call, (struct rx_packet *) 0);
933 if (conn->type == RX_SERVER_CONNECTION) {
934 /* Make sure reply or at least dummy reply is sent */
935 if (call->mode == RX_MODE_RECEIVING) {
936 RX_MUTEX_EXIT(&call->lock);
937 GLOBAL_UNLOCK();
939 rx_Write(call, 0, 0);
941 GLOBAL_LOCK();
942 RX_MUTEX_ENTER(&call->lock);
944 if (call->mode == RX_MODE_SENDING) {
945 rx_FlushWrite(call);
947 service = conn->service;
948 service->nRequestsRunning--;
949 if (service->nRequestsRunning < service->minProcs)
950 rxi_minDeficit++;
951 rxi_availProcs++;
952 rxi_calltrace(RX_CALL_END, call);
953 } else { /* Client connection */
954 char dummy;
957 * Make sure server receives input packets, in the case where
958 * no reply arguments are expected
960 if (call->mode == RX_MODE_SENDING) {
961 RX_MUTEX_EXIT(&call->lock);
962 GLOBAL_UNLOCK();
964 (void) rx_Read(call, &dummy, 1);
966 GLOBAL_LOCK();
967 RX_MUTEX_ENTER(&call->lock);
969 if (conn->flags & RX_CONN_MAKECALL_WAITING) {
970 conn->flags &= (~RX_CONN_MAKECALL_WAITING);
971 #ifdef RX_ENABLE_LOCKS
972 cv_signal(&conn->cv);
973 #else
974 osi_rxWakeup(conn);
975 #endif
978 call->state = RX_STATE_DALLY;
979 error = call->error;
982 * currentPacket, nLeft, and NFree must be zeroed here, because
983 * ResetCall cannot: ResetCall may be called at splnet(), in the
984 * kernel version, and may interrupt the macros rx_Read or
985 * rx_Write, which run at normal priority for efficiency.
987 if (call->currentPacket) {
988 rxi_FreePacket(call->currentPacket);
989 call->currentPacket = (struct rx_packet *) 0;
990 call->nLeft = call->nFree = 0;
991 } else
992 call->nLeft = call->nFree = 0;
995 RX_MUTEX_EXIT(&call->lock);
996 GLOBAL_UNLOCK();
997 USERPRI;
998 #if defined(AFS_SGIMP_ENV)
999 AFS_GACQUIRE(&ms);
1000 #endif
1002 * Map errors to the local host's errno.h format.
1004 error = ntoh_syserr_conv(error);
1005 return error;
1009 * Call this routine when shutting down a server or client (especially
1010 * clients). This will allow Rx to gracefully garbage collect server
1011 * connections, and reduce the number of retries that a server might
1012 * make to a dead client
1014 void
1015 rx_Finalize(void)
1017 struct rx_connection **conn_ptr, **conn_end;
1019 SPLVAR;
1020 NETPRI;
1021 if (rx_connHashTable)
1022 for (conn_ptr = &rx_connHashTable[0],
1023 conn_end = &rx_connHashTable[rx_hashTableSize];
1024 conn_ptr < conn_end; conn_ptr++) {
1025 struct rx_connection *conn, *next;
1027 for (conn = *conn_ptr; conn; conn = next) {
1028 next = conn->next;
1029 if (conn->type == RX_CLIENT_CONNECTION)
1030 rx_DestroyConnection(conn);
1034 #ifdef RXDEBUG
1035 if (rx_debugFile) {
1036 fclose(rx_debugFile);
1037 rx_debugFile = NULL;
1039 #endif
1040 rxi_flushtrace();
1041 USERPRI;
1045 * if we wakeup packet waiter too often, can get in loop with two
1046 * AllocSendPackets each waking each other up (from ReclaimPacket calls)
1048 void
1049 rxi_PacketsUnWait(void)
1052 RX_MUTEX_ENTER(&rx_waitingForPackets_lock);
1053 if (!rx_waitingForPackets) {
1054 RX_MUTEX_EXIT(&rx_waitingForPackets_lock);
1055 return;
1057 if (rxi_OverQuota(RX_PACKET_CLASS_SEND)) {
1058 RX_MUTEX_EXIT(&rx_waitingForPackets_lock);
1059 return; /* still over quota */
1061 rx_waitingForPackets = 0;
1062 #ifdef RX_ENABLE_LOCKS
1063 cv_signal(&rx_waitingForPackets_cv);
1064 #else
1065 osi_rxWakeup(&rx_waitingForPackets);
1066 #endif
1067 RX_MUTEX_EXIT(&rx_waitingForPackets_lock);
1068 return;
1072 /* ------------------Internal interfaces------------------------- */
1075 * Return this process's service structure for the
1076 * specified socket and service
1078 static struct rx_service *
1079 rxi_FindService(osi_socket asocket, uint16_t serviceId)
1081 struct rx_service **sp;
1083 for (sp = &rx_services[0]; *sp; sp++) {
1084 if ((*sp)->serviceId == serviceId && (*sp)->socket == asocket)
1085 return *sp;
1087 return 0;
1091 * Allocate a call structure, for the indicated channel of the
1092 * supplied connection. The mode and state of the call must be set by
1093 * the caller.
1095 struct rx_call *
1096 rxi_NewCall(struct rx_connection * conn, int channel)
1098 struct rx_call *call;
1101 * Grab an existing call structure, or allocate a new one.
1102 * Existing call structures are assumed to have been left reset by
1103 * rxi_FreeCall
1105 RX_MUTEX_ENTER(&rx_freeCallQueue_lock);
1107 if (queue_IsNotEmpty(&rx_freeCallQueue)) {
1108 call = queue_First(&rx_freeCallQueue, rx_call);
1109 queue_Remove(call);
1110 rx_stats.nFreeCallStructs--;
1111 RX_MUTEX_EXIT(&rx_freeCallQueue_lock);
1112 RX_MUTEX_ENTER(&call->lock);
1114 #ifdef ADAPT_PERF
1115 /* Bind the call to its connection structure */
1116 call->conn = conn;
1117 #endif
1118 } else {
1119 call = (struct rx_call *) rxi_Alloc(sizeof(struct rx_call));
1121 RX_MUTEX_EXIT(&rx_freeCallQueue_lock);
1122 RX_MUTEX_INIT(&call->lock, "call lock", RX_MUTEX_DEFAULT, NULL);
1123 RX_MUTEX_INIT(&call->lockw, "call wlock", RX_MUTEX_DEFAULT, NULL);
1124 RX_MUTEX_ENTER(&call->lock);
1126 rx_stats.nCallStructs++;
1127 /* Initialize once-only items */
1128 queue_Init(&call->tq);
1129 queue_Init(&call->rq);
1130 #ifdef ADAPT_PERF
1131 /* Bind the call to its connection structure (prereq for reset) */
1132 call->conn = conn;
1133 #endif
1134 rxi_ResetCall(call);
1136 #ifndef ADAPT_PERF
1137 /* Bind the call to its connection structure (prereq for reset) */
1138 call->conn = conn;
1139 #endif
1140 call->channel = channel;
1141 call->callNumber = &conn->callNumber[channel];
1143 * Note that the next expected call number is retained (in
1144 * conn->callNumber[i]), even if we reallocate the call structure
1146 conn->call[channel] = call;
1148 * if the channel's never been used (== 0), we should start at 1, otherwise
1149 * the call number is valid from the last time this channel was used
1151 if (*call->callNumber == 0)
1152 *call->callNumber = 1;
1154 RX_MUTEX_EXIT(&call->lock);
1155 return call;
1159 * A call has been inactive long enough that so we can throw away
1160 * state, including the call structure, which is placed on the call
1161 * free list.
1163 void
1164 rxi_FreeCall(struct rx_call * call)
1166 int channel = call->channel;
1167 struct rx_connection *conn = call->conn;
1170 RX_MUTEX_ENTER(&call->lock);
1172 if (call->state == RX_STATE_DALLY)
1173 (*call->callNumber)++;
1174 rxi_ResetCall(call);
1175 call->conn->call[channel] = (struct rx_call *) 0;
1177 RX_MUTEX_ENTER(&rx_freeCallQueue_lock);
1179 queue_Append(&rx_freeCallQueue, call);
1180 rx_stats.nFreeCallStructs++;
1182 RX_MUTEX_EXIT(&rx_freeCallQueue_lock);
1183 RX_MUTEX_EXIT(&call->lock);
1186 * Destroy the connection if it was previously slated for
1187 * destruction, i.e. the Rx client code previously called
1188 * rx_DestroyConnection (client connections), or
1189 * rxi_ReapConnections called the same routine (server
1190 * connections). Only do this, however, if there are no
1191 * outstanding calls
1193 if (conn->flags & RX_CONN_DESTROY_ME) {
1194 #if 0
1195 conn->refCount++;
1196 #endif
1197 rx_DestroyConnection(conn);
1202 long rxi_Alloccnt = 0, rxi_Allocsize = 0;
1204 void *
1205 rxi_Alloc(int size)
1207 void *p;
1209 rxi_Alloccnt++;
1210 rxi_Allocsize += size;
1211 p = osi_Alloc(size);
1212 if (!p)
1213 osi_Panic("rxi_Alloc error");
1214 memset(p, 0, size);
1215 return p;
1218 void
1219 rxi_Free(void *addr, int size)
1221 rxi_Alloccnt--;
1222 rxi_Allocsize -= size;
1223 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && defined(KERNEL)
1224 osi_FreeSmall(addr);
1225 #else
1226 osi_Free(addr, size);
1227 #endif
1231 * Find the peer process represented by the supplied (host,port)
1232 * combination. If there is no appropriate active peer structure, a
1233 * new one will be allocated and initialized
1235 struct rx_peer *
1236 rxi_FindPeer(uint32_t host, uint16_t port)
1238 struct rx_peer *pp;
1239 int hashIndex;
1241 hashIndex = PEER_HASH(host, port);
1242 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
1243 if ((pp->host == host) && (pp->port == port))
1244 break;
1246 if (!pp) {
1247 pp = rxi_AllocPeer(); /*
1248 * This bzero's *pp: anything not
1249 * explicitly
1251 pp->host = host; /*
1252 * set here or in InitPeerParams is
1253 * zero
1255 pp->port = port;
1256 queue_Init(&pp->congestionQueue);
1257 queue_Init(&pp->connQueue);
1258 pp->next = rx_peerHashTable[hashIndex];
1259 rx_peerHashTable[hashIndex] = pp;
1260 rxi_InitPeerParams(pp);
1261 rx_stats.nPeerStructs++;
1263 pp->refCount++;
1264 return pp;
1268 * Remove `peer' from the hash table.
1271 static void
1272 rxi_RemovePeer(struct rx_peer *peer)
1274 struct rx_peer **peer_ptr;
1276 for (peer_ptr = &rx_peerHashTable[PEER_HASH(peer->host, peer->port)];
1277 *peer_ptr; peer_ptr = &(*peer_ptr)->next) {
1278 if (*peer_ptr == peer) {
1279 *peer_ptr = peer->next;
1280 return;
1286 * Destroy the specified peer structure, removing it from the peer hash
1287 * table
1290 static void
1291 rxi_DestroyPeer(struct rx_peer * peer)
1293 rxi_RemovePeer(peer);
1294 assert(queue_IsEmpty(&peer->connQueue));
1295 rxi_FreePeer(peer);
1296 rx_stats.nPeerStructs--;
1300 * Add `peer' to the hash table.
1303 static struct rx_peer *
1304 rxi_InsertPeer(struct rx_peer *peer)
1306 struct rx_peer *pp;
1307 int hashIndex;
1309 hashIndex = PEER_HASH(peer->host, peer->port);
1310 for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
1311 if ((pp->host == peer->host) && (pp->port == peer->port))
1312 break;
1314 if (pp != NULL) {
1315 struct rx_connection *conn, *next;
1317 pp->refCount += peer->refCount;
1318 pp->nSent += peer->nSent;
1319 pp->reSends += peer->reSends;
1321 for (queue_Scan(&peer->connQueue, conn, next, rx_connection)) {
1322 conn->peer = pp;
1323 queue_Remove(conn);
1324 queue_Append(&pp->connQueue, conn);
1327 assert(queue_IsEmpty(&peer->connQueue));
1328 rxi_FreePeer(peer);
1329 rx_stats.nPeerStructs--;
1330 return pp;
1331 } else {
1332 peer->next = rx_peerHashTable[hashIndex];
1333 rx_peerHashTable[hashIndex] = peer;
1334 return peer;
1339 * Change the key of a given peer
1342 static struct rx_peer *
1343 rxi_ChangePeer(struct rx_peer *peer, uint32_t host, uint16_t port)
1345 rxi_RemovePeer(peer);
1346 peer->host = host;
1347 peer->port = port;
1348 return rxi_InsertPeer(peer);
1352 * Find the connection at (host, port) started at epoch, and with the
1353 * given connection id. Creates the server connection if necessary.
1354 * The type specifies whether a client connection or a server
1355 * connection is desired. In both cases, (host, port) specify the
1356 * peer's (host, pair) pair. Client connections are not made
1357 * automatically by this routine. The parameter socket gives the
1358 * socket descriptor on which the packet was received. This is used,
1359 * in the case of server connections, to check that *new* connections
1360 * come via a valid (port, serviceId). Finally, the securityIndex
1361 * parameter must match the existing index for the connection. If a
1362 * server connection is created, it will be created using the supplied
1363 * index, if the index is valid for this service
1365 static struct rx_connection *
1366 rxi_FindConnection(osi_socket asocket, uint32_t host,
1367 uint16_t port, uint16_t serviceId, uint32_t cid,
1368 uint32_t epoch, int type, u_int securityIndex)
1370 int hashindex;
1371 struct rx_connection *conn;
1372 struct rx_peer *pp = NULL;
1374 hashindex = CONN_HASH(host, port, cid, epoch, type);
1375 for (conn = rx_connHashTable[hashindex]; conn; conn = conn->next) {
1376 if ((conn->type == type) && ((cid & RX_CIDMASK) == conn->cid) &&
1377 (epoch == conn->epoch) &&
1378 (securityIndex == conn->securityIndex)) {
1379 pp = conn->peer;
1381 if (type == RX_CLIENT_CONNECTION || pp->host == host)
1382 break;
1385 if (conn != NULL) {
1386 if (pp->host != host || pp->port != port)
1387 conn->peer = rxi_ChangePeer (pp, host, port);
1388 } else {
1389 struct rx_service *service;
1391 if (type == RX_CLIENT_CONNECTION)
1392 return (struct rx_connection *) 0;
1393 service = rxi_FindService(asocket, serviceId);
1394 if (!service || (securityIndex >= service->nSecurityObjects)
1395 || (service->securityObjects[securityIndex] == 0))
1396 return (struct rx_connection *) 0;
1397 conn = rxi_AllocConnection(); /* This bzero's the connection */
1398 #ifdef RX_ENABLE_LOCKS
1399 LOCK_INIT(&conn->lock, "conn lock");
1400 #endif
1401 conn->next = rx_connHashTable[hashindex];
1402 rx_connHashTable[hashindex] = conn;
1403 conn->peer = rxi_FindPeer(host, port);
1404 queue_Append(&conn->peer->connQueue, conn);
1405 conn->maxPacketSize = MIN(conn->peer->packetSize, OLD_MAX_PACKET_SIZE);
1406 conn->type = RX_SERVER_CONNECTION;
1407 conn->lastSendTime = clock_Sec(); /* don't GC immediately */
1408 conn->epoch = epoch;
1409 conn->cid = cid & RX_CIDMASK;
1410 /* conn->serial = conn->lastSerial = 0; */
1411 /* conn->rock = 0; */
1412 /* conn->timeout = 0; */
1413 conn->service = service;
1414 conn->serviceId = serviceId;
1415 conn->securityIndex = securityIndex;
1416 conn->securityObject = service->securityObjects[securityIndex];
1417 rx_SetConnDeadTime(conn, service->connDeadTime);
1418 /* Notify security object of the new connection */
1419 RXS_NewConnection(conn->securityObject, conn);
1420 /* XXXX Connection timeout? */
1421 if (service->newConnProc)
1422 (*service->newConnProc) (conn);
1423 rx_stats.nServerConns++;
1425 conn->refCount++;
1426 return conn;
1430 * Force a timeout on the connection for `host', `port'.
1433 void
1434 rxi_KillConnection(uint32_t host, u_short port)
1436 struct rx_connection *conn;
1437 int hashindex, i;
1439 for (hashindex = 0; hashindex < rx_hashTableSize; hashindex++) {
1440 for (conn = rx_connHashTable[hashindex]; conn; conn = conn->next) {
1441 if (htonl(conn->peer->host) == host &&
1442 htons(conn->peer->port) == port)
1444 for (i = 0; i < RX_MAXCALLS; i++) {
1445 struct rx_call *call = conn->call[i];
1446 if (call && call->state == RX_STATE_ACTIVE)
1447 rxi_CallError(call, RX_CALL_TIMEOUT);
1455 * There are two packet tracing routines available for testing and monitoring
1456 * Rx. One is called just after every packet is received and the other is
1457 * called just before every packet is sent. Received packets, have had their
1458 * headers decoded, and packets to be sent have not yet had their headers
1459 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
1460 * containing the network address. Both can be modified. The return value, if
1461 * non-zero, indicates that the packet should be dropped.
1464 int (*rx_justReceived)() = NULL;
1465 int (*rx_almostSent)() = NULL;
1468 * A packet has been received off the interface. Np is the packet, socket is
1469 * the socket number it was received from (useful in determining which service
1470 * this packet corresponds to), and (host, port) reflect the host,port of the
1471 * sender. This call returns the packet to the caller if it is finished with
1472 * it, rather than de-allocating it, just as a small performance hack
1475 struct rx_packet *
1476 rxi_ReceivePacket(struct rx_packet * np, osi_socket asocket,
1477 uint32_t host, uint16_t port)
1479 struct rx_call *call;
1480 struct rx_connection *conn;
1481 int channel;
1482 unsigned long currentCallNumber;
1483 int type;
1484 int skew;
1486 #ifdef RXDEBUG
1487 char *packetType;
1489 #endif
1490 struct rx_packet *tnp;
1492 #ifdef RXDEBUG
1494 * We don't print out the packet until now because (1) the time may not be
1495 * accurate enough until now in the lwp implementation (rx_Listener only gets
1496 * the time after the packet is read) and (2) from a protocol point of view,
1497 * this is the first time the packet has been seen
1499 packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
1500 ? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
1501 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
1502 np->header.serial, packetType, host, port, np->header.serviceId,
1503 (int)np->header.epoch, np->header.cid, np->header.callNumber,
1504 np->header.seq, np->header.flags, np));
1505 #endif
1507 if (np->header.type == RX_PACKET_TYPE_VERSION)
1508 return rxi_ReceiveVersionPacket(np, asocket, host, port);
1510 if (np->header.type == RX_PACKET_TYPE_DEBUG)
1511 return rxi_ReceiveDebugPacket(np, asocket, host, port);
1513 #ifdef RXDEBUG
1516 * If an input tracer function is defined, call it with the packet and
1517 * network address. Note this function may modify its arguments.
1519 if (rx_justReceived) {
1520 struct sockaddr_in addr;
1521 int drop;
1523 addr.sin_family = AF_INET;
1524 addr.sin_port = port;
1525 addr.sin_addr.s_addr = host;
1526 drop = (*rx_justReceived) (np, &addr);
1527 /* drop packet if return value is non-zero */
1528 if (drop)
1529 return np;
1530 port = addr.sin_port; /* in case fcn changed addr */
1531 host = addr.sin_addr.s_addr;
1533 #endif
1535 /* If packet was not sent by the client, then *we* must be the client */
1536 type = ((np->header.flags & RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
1537 ? RX_CLIENT_CONNECTION : RX_SERVER_CONNECTION;
1540 * Find the connection (or fabricate one, if we're the server & if
1541 * necessary) associated with this packet
1543 conn = rxi_FindConnection(asocket, host, port, np->header.serviceId,
1544 np->header.cid, np->header.epoch, type,
1545 np->header.securityIndex);
1547 if (!conn) {
1549 * If no connection found or fabricated, just ignore the packet.
1550 * (An argument could be made for sending an abort packet for
1551 * the conn)
1553 return np;
1555 /* compute the max serial number seen on this connection */
1556 if (conn->maxSerial < np->header.serial)
1557 conn->maxSerial = np->header.serial;
1560 * If the connection is in an error state, send an abort packet and
1561 * ignore the incoming packet
1563 if (conn->error) {
1564 rxi_ConnectionError (conn, conn->error);
1565 /* Don't respond to an abort packet--we don't want loops! */
1566 if (np->header.type != RX_PACKET_TYPE_ABORT)
1567 np = rxi_SendConnectionAbort(conn, np);
1568 conn->refCount--;
1569 return np;
1571 /* Check for connection-only requests (i.e. not call specific). */
1572 if (np->header.callNumber == 0) {
1573 switch (np->header.type) {
1574 case RX_PACKET_TYPE_ABORT:
1575 /* What if the supplied error is zero? */
1576 rxi_ConnectionError(conn, ntohl(rx_SlowGetLong(np, 0)));
1577 conn->refCount--;
1578 return np;
1579 case RX_PACKET_TYPE_CHALLENGE:
1580 tnp = rxi_ReceiveChallengePacket(conn, np);
1581 conn->refCount--;
1582 return tnp;
1583 case RX_PACKET_TYPE_RESPONSE:
1584 tnp = rxi_ReceiveResponsePacket(conn, np);
1585 conn->refCount--;
1586 return tnp;
1587 case RX_PACKET_TYPE_PARAMS:
1588 case RX_PACKET_TYPE_PARAMS + 1:
1589 case RX_PACKET_TYPE_PARAMS + 2:
1590 /* ignore these packet types for now */
1591 conn->refCount--;
1592 return np;
1595 default:
1597 * Should not reach here, unless the peer is broken: send an
1598 * abort packet
1600 rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
1601 tnp = rxi_SendConnectionAbort(conn, np);
1602 conn->refCount--;
1603 return tnp;
1606 channel = np->header.cid & RX_CHANNELMASK;
1607 call = conn->call[channel];
1608 #ifdef RX_ENABLE_LOCKSX
1609 if (call)
1610 mutex_enter(&call->lock);
1611 #endif
1612 currentCallNumber = conn->callNumber[channel];
1614 if (type == RX_SERVER_CONNECTION) {/* We're the server */
1615 if (np->header.callNumber < currentCallNumber) {
1616 rx_stats.spuriousPacketsRead++;
1617 #ifdef RX_ENABLE_LOCKSX
1618 if (call)
1619 mutex_exit(&call->lock);
1620 #endif
1621 conn->refCount--;
1622 return np;
1624 if (!call) {
1625 call = rxi_NewCall(conn, channel);
1626 #ifdef RX_ENABLE_LOCKSX
1627 mutex_enter(&call->lock);
1628 #endif
1629 *call->callNumber = np->header.callNumber;
1630 call->state = RX_STATE_PRECALL;
1631 rxi_KeepAliveOn(call);
1632 } else if (np->header.callNumber != currentCallNumber) {
1634 * If the new call cannot be taken right now send a busy and set
1635 * the error condition in this call, so that it terminates as
1636 * quickly as possible
1638 if (call->state == RX_STATE_ACTIVE) {
1639 struct rx_packet *tp;
1641 rxi_CallError(call, RX_CALL_DEAD);
1642 tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY, (char *) 0, 0);
1643 #ifdef RX_ENABLE_LOCKSX
1644 mutex_exit(&call->lock);
1645 #endif
1646 conn->refCount--;
1647 return tp;
1650 * If the new call can be taken right now (it's not busy) then
1651 * accept it.
1653 rxi_ResetCall(call);
1654 *call->callNumber = np->header.callNumber;
1655 call->state = RX_STATE_PRECALL;
1656 rxi_KeepAliveOn(call);
1657 } else {
1658 /* Continuing call; do nothing here. */
1660 } else { /* we're the client */
1663 * Ignore anything that's not relevant to the current call. If there
1664 * isn't a current call, then no packet is relevant.
1666 if (!call || (np->header.callNumber != currentCallNumber)) {
1667 rx_stats.spuriousPacketsRead++;
1668 #ifdef RX_ENABLE_LOCKSX
1669 if (call)
1670 mutex_exit(&call->lock);
1671 #endif
1672 conn->refCount--;
1673 return np;
1676 * If the service security object index stamped in the packet does not
1677 * match the connection's security index, ignore the packet
1679 if (np->header.securityIndex != conn->securityIndex) {
1680 conn->refCount--;
1681 #ifdef RX_ENABLE_LOCKSX
1682 mutex_exit(&call->lock);
1683 #endif
1684 return np;
1687 * If we're receiving the response, then all transmit packets are
1688 * implicitly acknowledged. Get rid of them.
1690 if (np->header.type == RX_PACKET_TYPE_DATA) {
1691 #ifdef AFS_SUN5_ENV
1693 * XXX Hack. Because we can't release the global rx lock when
1694 * sending packets (osi_NetSend) we drop all acks while we're
1695 * traversing the tq in rxi_Start sending packets out because
1696 * packets may move to the freePacketQueue as result of being
1697 * here! So we drop these packets until we're safely out of the
1698 * traversing. Really ugly!
1700 if (call->flags & RX_CALL_TQ_BUSY) {
1701 rx_tq_dropped++;
1702 return np; /* xmitting; drop packet */
1704 #endif
1705 rxi_ClearTransmitQueue(call);
1706 } else {
1707 if (np->header.type == RX_PACKET_TYPE_ACK) {
1709 * now check to see if this is an ack packet acknowledging
1710 * that the server actually *lost* some hard-acked data. If
1711 * this happens we ignore this packet, as it may indicate that
1712 * the server restarted in the middle of a call. It is also
1713 * possible that this is an old ack packet. We don't abort
1714 * the connection in this case, because this *might* just be
1715 * an old ack packet. The right way to detect a server restart
1716 * in the midst of a call is to notice that the server epoch
1717 * changed, btw.
1720 * LWSXXX I'm not sure this is exactly right, since tfirst
1721 * LWSXXX **IS** unacknowledged. I think that this is
1722 * LWSXXX off-by-one, but I don't dare change it just yet,
1723 * LWSXXX since it will interact badly with the
1724 * LWSXXX server-restart detection code in receiveackpacket.
1726 if (ntohl(rx_SlowGetLong(np, FIRSTACKOFFSET)) < call->tfirst) {
1727 rx_stats.spuriousPacketsRead++;
1728 #ifdef RX_ENABLE_LOCKSX
1729 mutex_exit(&call->lock);
1730 #endif
1731 conn->refCount--;
1732 return np;
1735 } /* else not a data packet */
1738 /* Set remote user defined status from packet */
1739 call->remoteStatus = np->header.userStatus;
1742 * Note the gap between the expected next packet and the actual packet
1743 * that arrived, when the new packet has a smaller serial number than
1744 * expected. Rioses frequently reorder packets all by themselves, so
1745 * this will be quite important with very large window sizes. Skew is
1746 * checked against 0 here to avoid any dependence on the type of
1747 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is
1748 * always true! The inPacketSkew should be a smoothed running value, not
1749 * just a maximum. MTUXXX see CalculateRoundTripTime for an example of
1750 * how to keep smoothed values. I think using a beta of 1/8 is probably
1751 * appropriate. lws 93.04.21
1753 skew = conn->lastSerial - np->header.serial;
1754 conn->lastSerial = np->header.serial;
1755 if (skew > 0) {
1756 struct rx_peer *peer;
1758 peer = conn->peer;
1759 if (skew > peer->inPacketSkew) {
1760 dpf(("*** In skew changed from %d to %d\n",
1761 peer->inPacketSkew, skew));
1762 peer->inPacketSkew = skew;
1765 /* Now do packet type-specific processing */
1766 switch (np->header.type) {
1767 case RX_PACKET_TYPE_DATA:
1768 np = rxi_ReceiveDataPacket(call, np);
1769 break;
1770 case RX_PACKET_TYPE_ACK:
1772 * Respond immediately to ack packets requesting acknowledgement
1773 * (ping packets)
1775 if (np->header.flags & RX_REQUEST_ACK) {
1776 if (call->error)
1777 (void) rxi_SendCallAbort(call, 0);
1778 else
1779 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING_RESPONSE);
1781 np = rxi_ReceiveAckPacket(call, np);
1782 break;
1783 case RX_PACKET_TYPE_ABORT:
1785 * An abort packet: reset the connection, passing the error up to
1786 * the user
1788 /* XXX What if error is zero? and length of packet is 0 */
1789 rxi_CallError(call, ntohl(*(uint32_t *) rx_DataOf(np)));
1790 break;
1791 case RX_PACKET_TYPE_BUSY:
1792 /* XXXX */
1793 break;
1794 case RX_PACKET_TYPE_ACKALL:
1796 * All packets acknowledged, so we can drop all packets previously
1797 * readied for sending
1799 #ifdef AFS_SUN5_ENV
1801 * XXX Hack. We because we can't release the global rx lock
1802 * when sending packets (osi_NetSend) we drop all ack pkts while
1803 * we're traversing the tq in rxi_Start sending packets out
1804 * because packets may move to the freePacketQueue as result of
1805 * being here! So we drop these packets until we're
1806 * safely out of the traversing. Really ugly!
1808 if (call->flags & RX_CALL_TQ_BUSY) {
1809 rx_tq_dropped++;
1810 return np; /* xmitting; drop packet */
1812 #endif
1813 rxi_ClearTransmitQueue(call);
1814 break;
1815 default:
1817 * Should not reach here, unless the peer is broken: send an abort
1818 * packet
1820 rxi_CallError(call, RX_PROTOCOL_ERROR);
1821 np = rxi_SendCallAbort(call, np);
1822 break;
1825 * Note when this last legitimate packet was received, for keep-alive
1826 * processing. Note, we delay getting the time until now in the hope that
1827 * the packet will be delivered to the user before any get time is required
1828 * (if not, then the time won't actually be re-evaluated here).
1830 call->lastReceiveTime = clock_Sec();
1831 #ifdef RX_ENABLE_LOCKSX
1832 mutex_exit(&call->lock);
1833 #endif
1834 conn->refCount--;
1835 return np;
1839 * return true if this is an "interesting" connection from the point of view
1840 * of someone trying to debug the system
1843 rxi_IsConnInteresting(struct rx_connection * aconn)
1845 int i;
1846 struct rx_call *tcall;
1848 if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
1849 return 1;
1850 for (i = 0; i < RX_MAXCALLS; i++) {
1851 tcall = aconn->call[i];
1852 if (tcall) {
1853 if ((tcall->state == RX_STATE_PRECALL) ||
1854 (tcall->state == RX_STATE_ACTIVE))
1855 return 1;
1856 if ((tcall->mode == RX_MODE_SENDING) ||
1857 (tcall->mode == RX_MODE_RECEIVING))
1858 return 1;
1861 return 0;
1865 * if this is one of the last few packets AND it wouldn't be used by the
1866 * receiving call to immediately satisfy a read request, then drop it on
1867 * the floor, since accepting it might prevent a lock-holding thread from
1868 * making progress in its reading
1871 static int
1872 TooLow(struct rx_packet * ap, struct rx_call * acall)
1874 if ((rx_nFreePackets < rxi_dataQuota + 2) &&
1875 !((ap->header.seq == acall->rnext) &&
1876 (acall->flags & RX_CALL_READER_WAIT))) {
1877 return 1;
1878 } else
1879 return 0;
1882 /* try to attach call, if authentication is complete */
1883 static void
1884 TryAttach(struct rx_call * acall)
1886 struct rx_connection *conn;
1888 conn = acall->conn;
1889 if ((conn->type == RX_SERVER_CONNECTION) &&
1890 (acall->state == RX_STATE_PRECALL)) {
1891 /* Don't attach until we have any req'd. authentication. */
1892 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
1893 rxi_AttachServerProc(acall);
1895 * Note: this does not necessarily succeed; there
1896 * may not any proc available
1898 } else {
1899 rxi_ChallengeOn(acall->conn);
1905 * A data packet has been received off the interface. This packet is
1906 * appropriate to the call (the call is in the right state, etc.). This
1907 * routine can return a packet to the caller, for re-use
1909 struct rx_packet *
1910 rxi_ReceiveDataPacket(struct rx_call * call,
1911 struct rx_packet * np)
1913 u_long seq, serial, flags;
1914 int ack_done;
1916 ack_done = 0;
1918 seq = np->header.seq;
1919 serial = np->header.serial;
1920 flags = np->header.flags;
1922 rx_stats.dataPacketsRead++;
1924 /* If the call is in an error state, send an abort message */
1925 /* XXXX this will send too many aborts for multi-packet calls */
1926 if (call->error)
1927 return rxi_SendCallAbort(call, np);
1929 if (np->header.spare != 0)
1930 call->conn->flags |= RX_CONN_USING_PACKET_CKSUM;
1933 * If there are no packet buffers, drop this new packet, unless we can find
1934 * packet buffers from inactive calls
1936 if (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE) || TooLow(np, call)) {
1937 rx_stats.noPacketBuffersOnRead++;
1938 call->rprev = seq;
1939 TryAttach(call);
1940 rxi_calltrace(RX_TRACE_DROP, call);
1941 return np;
1943 /* The usual case is that this is the expected next packet */
1944 if (seq == call->rnext) {
1946 /* Check to make sure it is not a duplicate of one already queued */
1947 if (queue_IsNotEmpty(&call->rq)
1948 && queue_First(&call->rq, rx_packet)->header.seq == seq) {
1949 rx_stats.dupPacketsRead++;
1950 np = rxi_SendAck(call, np, seq, serial, flags, RX_ACK_DUPLICATE);
1951 call->rprev = seq;
1952 return np;
1955 * It's the next packet. Stick it on the receive queue for this call
1957 queue_Prepend(&call->rq, np);
1958 #ifdef SOFT_ACK
1959 call->nSoftAcks++;
1960 #endif /* SOFT_ACK */
1962 #ifndef ADAPT_PERF
1963 np = 0; /* We can't use this any more */
1966 * Provide asynchronous notification for those who want it
1967 * (e.g. multi rx)
1969 if (call->arrivalProc) {
1970 (*call->arrivalProc) (call, call->arrivalProcHandle,
1971 call->arrivalProcArg);
1972 call->arrivalProc = (void (*) ()) 0;
1974 /* Wakeup the reader, if any */
1975 if (call->flags & RX_CALL_READER_WAIT) {
1976 call->flags &= ~RX_CALL_READER_WAIT;
1977 RX_MUTEX_ENTER(&call->lockq);
1979 #ifdef RX_ENABLE_LOCKS
1980 cv_broadcast(&call->cv_rq);
1981 #else
1982 osi_rxWakeup(&call->rq);
1983 #endif
1984 RX_MUTEX_EXIT(&call->lockq);
1986 #endif
1989 * ACK packet right away, in order to keep the window going, and to
1990 * reduce variability in round-trip-time estimates.
1993 if (flags & RX_REQUEST_ACK) {
1996 * Acknowledge, if requested. Also take this opportunity to
1997 * revise MTU estimate.
1999 #ifdef MISCMTU
2000 /* Copy a lower estimate of the MTU from the other end. (cfe) */
2002 * We can figure out what the other end is using by checking out
2003 * the size of its packets, and then adding back the header size.
2004 * We don't count the last packet, since it might be partly empty.
2005 * We shouldn't do this check on every packet, it's overkill.
2006 * Perhaps it would be better done in
2007 * ComputeRate if I decide it's ever worth doing. (lws)
2009 if (!(flags & RX_LAST_PACKET) && call->conn && call->conn->peer) {
2010 u_long length;
2011 struct rx_peer *peer = call->conn->peer;
2013 length = np->length + RX_HEADER_SIZE;
2014 if (length < peer->packetSize) {
2015 dpf(("CONG peer %lx/%u: packetsize %lu=>%lu (rtt %u)",
2016 ntohl(peer->host), ntohs(peer->port), peer->packetSize,
2017 length, peer->srtt));
2018 peer->packetSize = length;
2021 #endif /* MISCMTU */
2022 } else if (flags & RX_LAST_PACKET) {
2023 struct clock when;
2025 /* Or schedule an acknowledgement later on. */
2026 rxevent_Cancel(call->delayedAckEvent);
2027 clock_GetTime(&when);
2028 clock_Add(&when, &rx_lastAckDelay);
2030 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2031 call, NULL);
2033 ack_done = 1;
2035 #ifdef ADAPT_PERF
2037 * Provide asynchronous notification for those who want it
2038 * (e.g. multi rx)
2040 if (call->arrivalProc) {
2041 (*call->arrivalProc) (call, call->arrivalProcHandle,
2042 call->arrivalProcArg);
2043 call->arrivalProc = (void (*) ()) 0;
2045 /* Wakeup the reader, if any */
2046 RX_MUTEX_ENTER(&call->lockq);
2047 if (call->flags & RX_CALL_READER_WAIT) {
2048 call->flags &= ~RX_CALL_READER_WAIT;
2049 #ifdef RX_ENABLE_LOCKS
2050 /* cv_signal(&call->cv_rq);*/
2051 cv_broadcast(&call->cv_rq);
2052 #else
2053 osi_rxWakeup(&call->rq);
2054 #endif
2056 RX_MUTEX_EXIT(&call->lockq);
2058 np = 0; /* We can't use this any more */
2059 #endif /* ADAPT_PERF */
2061 /* Update last packet received */
2062 call->rprev = seq;
2065 * If there is no server process serving this call, grab one, if
2066 * available
2068 TryAttach(call);
2070 /* This is not the expected next packet */
2071 else {
2073 * Determine whether this is a new or old packet, and if it's
2074 * a new one, whether it fits into the current receive window.
2075 * It's also useful to know if the packet arrived out of
2076 * sequence, so that we can force an acknowledgement in that
2077 * case. We have a slightly complex definition of
2078 * out-of-sequence: the previous packet number received off
2079 * the wire is remembered. If the new arrival's sequence
2080 * number is less than previous, then previous is reset (to
2081 * 0). MTUXXX This should change slightly if skew is taken into
2082 * consideration. lws 93.04.20
2083 * The new packet is then declared out-of-sequence if
2084 * there are any packets missing between the "previous" packet
2085 * and the one which just arrived (because the missing packets
2086 * should have been filled in between the previous packet and
2087 * the new arrival). This works regardless of whether the
2088 * peer's retransmission algorithm has been invoked, or not
2089 * (i.e. whether this is the first or subsequent pass over the
2090 * sequence of packets). All this assumes that "most" of the
2091 * time, packets are delivered in the same *order* as they are
2092 * transmitted, with, possibly, some packets lost due to
2093 * transmission errors along the way.
2096 u_long prev; /* "Previous packet" sequence number */
2097 struct rx_packet *tp; /* Temporary packet pointer */
2098 struct rx_packet *nxp;/*
2099 * Next packet pointer, for queue_Scan
2101 int nTwixt; /*
2102 * Number of packets between previous
2103 * and new one
2107 * If the new packet's sequence number has been sent to the
2108 * application already, then this is a duplicate
2110 if (seq < call->rnext) {
2111 rx_stats.dupPacketsRead++;
2112 np = rxi_SendAck(call, np, seq, serial, flags, RX_ACK_DUPLICATE);
2113 call->rprev = seq;
2114 return np;
2117 * If the sequence number is greater than what can be
2118 * accomodated by the current window, then send a negative
2119 * acknowledge and drop the packet
2121 if ((call->rnext + call->rwind) <= seq) {
2122 np = rxi_SendAck(call, np, seq, serial, flags,
2123 RX_ACK_EXCEEDS_WINDOW);
2124 call->rprev = seq;
2125 return np;
2127 /* Look for the packet in the queue of old received packets */
2128 prev = call->rprev;
2129 if (prev > seq)
2130 prev = 0;
2131 for (nTwixt = 0, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
2132 /* Check for duplicate packet */
2133 if (seq == tp->header.seq) {
2134 rx_stats.dupPacketsRead++;
2135 np = rxi_SendAck(call, np, seq, serial, flags,
2136 RX_ACK_DUPLICATE);
2137 call->rprev = seq;
2138 return np;
2142 * Count the number of packets received 'twixt the previous
2143 * packet and the new packet
2145 if (tp->header.seq > prev && tp->header.seq < seq)
2146 nTwixt++;
2149 * If we find a higher sequence packet, break out and insert the
2150 * new packet here.
2152 if (seq < tp->header.seq)
2153 break;
2157 * It's within the window: add it to the the receive queue.
2158 * tp is left by the previous loop either pointing at the
2159 * packet before which to insert the new packet, or at the
2160 * queue head if the queue is empty or the packet should be
2161 * appended.
2163 queue_InsertBefore(tp, np);
2164 #ifdef SOFT_ACK
2165 call->nSoftAcks++;
2166 #endif /* SOFT_ACK */
2168 call->rprev = seq;
2169 np = 0;
2173 * Acknowledge the packet if requested by peer, or we are doing
2174 * softack.
2176 * Add a timed ack to make sure we send out a ack to before we get
2177 * a request from the client that they send a REQUEST-ACK packet.
2179 if (ack_done) {
2180 /* ack is already taken care of */
2181 } else if (flags & RX_REQUEST_ACK) {
2182 rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_REQUESTED);
2183 call->rprev = seq;
2184 #ifdef SOFT_ACK
2185 } else if (call->nSoftAcks > rxi_SoftAckRate) {
2186 rxevent_Cancel(call->delayedAckEvent);
2187 rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_IDLE);
2188 } else if (call->nSoftAcks) {
2189 struct clock when;
2191 rxevent_Cancel(call->delayedAckEvent);
2192 clock_GetTime(&when);
2193 clock_Add(&when, &rx_softAckDelay);
2194 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
2195 call, NULL);
2196 #endif /* SOFT_ACK */
2199 return np;
2202 #ifdef ADAPT_WINDOW
2203 static void rxi_ComputeRate();
2205 #endif
2207 /* Timeout is set to RTT + 4*MDEV. */
2208 static
2209 void
2210 update_timeout(struct rx_peer *peer)
2212 u_long rtt_timeout;
2213 rtt_timeout = peer->srtt + 4*peer->mdev;
2215 * Add 100ms to hide the effects of unpredictable
2216 * scheduling. 100ms is *very* conservative and should probably be
2217 * much smaller. We don't want to generate any redundant
2218 * retransmits so for now, let's use 100ms.
2220 rtt_timeout += 100*1000;
2221 if (rtt_timeout < 1000) /* 1000 = 1ms */
2222 rtt_timeout = 1000; /* Minimum timeout */
2223 peer->timeout.usec = rtt_timeout % 1000000;
2224 peer->timeout.sec = rtt_timeout / 1000000;;
2227 /* On a dubious timeout double MDEV but within reason.
2228 * Timeout is limited by 5*RTT.
2230 static
2231 void
2232 dubious_timeout(struct rx_peer *peer)
2234 if (peer->mdev >= peer->srtt)
2235 return;
2237 peer->mdev *= 2;
2238 if (peer->mdev > peer->srtt)
2239 peer->mdev = peer->srtt;
2240 update_timeout(peer);
2243 /* The real smarts of the whole thing. Right now somewhat short-changed. */
2244 struct rx_packet *
2245 rxi_ReceiveAckPacket(struct rx_call * call, struct rx_packet * np)
2247 struct rx_ackPacket *ap;
2248 int nAcks;
2249 struct rx_packet *tp;
2250 struct rx_packet *nxp; /*
2251 * Next packet pointer for queue_Scan
2253 struct rx_connection *conn = call->conn;
2254 struct rx_peer *peer = conn->peer;
2255 u_long first;
2256 u_long serial;
2258 /* because there are CM's that are bogus, sending weird values for this. */
2259 u_long skew = 0;
2260 int needRxStart = 0;
2261 int nbytes;
2263 rx_stats.ackPacketsRead++;
2264 ap = (struct rx_ackPacket *) rx_DataOf(np);
2265 nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *) ap);
2266 if (nbytes < 0)
2267 return np; /* truncated ack packet */
2269 nAcks = MIN(nbytes, ap->nAcks); /* depends on ack packet struct */
2270 first = ntohl(ap->firstPacket);
2271 serial = ntohl(ap->serial);
2272 #ifdef notdef
2273 skew = ntohs(ap->maxSkew);
2274 #endif
2277 #ifdef RXDEBUG
2278 if (Log) {
2279 fprintf(Log,
2280 "RACK: reason %x previous %lu seq %lu serial %lu skew %lu "
2281 "first %lu", ap->reason,
2282 (unsigned long)ntohl(ap->previousPacket),
2283 (unsigned long)np->header.seq,
2284 (unsigned long)serial,
2285 (unsigned long)skew,
2286 (unsigned long)ntohl(ap->firstPacket));
2287 if (nAcks) {
2288 int offset;
2290 for (offset = 0; offset < nAcks; offset++)
2291 putc(ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*', Log);
2293 putc('\n', Log);
2295 #endif
2297 #if 0 /* need congestion avoidance stuff first */
2298 if (np->header.flags & RX_SLOW_START_OK)
2299 call->flags |= RX_CALL_SLOW_START_OK;
2300 #endif
2304 * if a server connection has been re-created, it doesn't remember what
2305 * serial # it was up to. An ack will tell us, since the serial field
2306 * contains the largest serial received by the other side
2308 if ((conn->type == RX_SERVER_CONNECTION) && (conn->serial < serial)) {
2309 conn->serial = serial + 1;
2313 * Update the outgoing packet skew value to the latest value of the
2314 * peer's incoming packet skew value. The ack packet, of course, could
2315 * arrive out of order, but that won't affect things much
2317 peer->outPacketSkew = skew;
2319 #ifdef AFS_SUN5_ENV
2321 * XXX Hack. Because we can't release the global rx lock when sending
2322 * packets (osi_NetSend) we drop all acks while we're traversing the tq in
2323 * rxi_Start sending packets out because packets
2324 * may move to the freePacketQueue as result of being here! So we drop
2325 * these packets until we're safely out of the traversing. Really ugly!
2327 if (call->flags & RX_CALL_TQ_BUSY) {
2328 rx_tq_dropped++;
2329 return np; /* xmitting; drop packet */
2331 #endif
2333 * Check for packets that no longer need to be transmitted, and
2334 * discard them. This only applies to packets positively
2335 * acknowledged as having been sent to the peer's upper level.
2336 * All other packets must be retained. So only packets with
2337 * sequence numbers < ap->firstPacket are candidates.
2339 while (queue_IsNotEmpty(&call->tq)) {
2340 tp = queue_First(&call->tq, rx_packet);
2341 if (tp->header.seq >= first)
2342 break;
2343 call->tfirst = tp->header.seq + 1;
2344 if (tp->header.serial == serial) {
2345 if (ap->reason != RX_ACK_DELAY) {
2346 #ifdef ADAPT_PERF
2347 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
2348 #else
2349 rxi_ComputeRoundTripTime(tp, 0, 0);
2350 #endif
2352 #ifdef ADAPT_WINDOW
2353 rxi_ComputeRate(peer, call, tp, np, ap->reason);
2354 #endif
2356 #ifdef ADAPT_PERF
2357 else if ((tp->firstSerial == serial)) {
2358 if (ap->reason != RX_ACK_DELAY)
2359 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
2360 #ifdef ADAPT_WINDOW
2361 rxi_ComputeRate(peer, call, tp, np, ap->reason);
2362 #endif
2364 #endif /* ADAPT_PERF */
2365 queue_Remove(tp);
2366 rxi_FreePacket(tp); /*
2367 * rxi_FreePacket mustn't wake up anyone,
2368 * preemptively.
2372 #ifdef ADAPT_WINDOW
2373 /* Give rate detector a chance to respond to ping requests */
2374 if (ap->reason == RX_ACK_PING_RESPONSE) {
2375 rxi_ComputeRate(peer, call, 0, np, ap->reason);
2377 #endif
2378 /* "Slow start" every call. */
2379 if (call->twind < rx_Window) call->twind += 1;
2382 * N.B. we don't turn off any timers here. They'll go away by themselves,
2383 * anyway
2387 * Now go through explicit acks/nacks and record the results in
2388 * the waiting packets. These are packets that can't be released
2389 * yet, even with a positive acknowledge. This positive
2390 * acknowledge only means the packet has been received by the
2391 * peer, not that it will be retained long enough to be sent to
2392 * the peer's upper level. In addition, reset the transmit timers
2393 * of any missing packets (those packets that must be missing
2394 * because this packet was out of sequence)
2397 for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
2400 * Update round trip time if the ack was stimulated on receipt of
2401 * this packet
2403 if (tp->header.serial == serial) {
2404 if (ap->reason != RX_ACK_DELAY) {
2405 #ifdef ADAPT_PERF
2406 rxi_ComputeRoundTripTime(tp, &tp->timeSent, peer);
2407 #else
2408 rxi_ComputeRoundTripTime(tp, 0, 0);
2409 #endif
2411 #ifdef ADAPT_WINDOW
2412 rxi_ComputeRate(peer, call, tp, np, ap->reason);
2413 #endif
2415 #ifdef ADAPT_PERF
2416 else if ((tp->firstSerial == serial)) {
2417 if (ap->reason != RX_ACK_DELAY)
2418 rxi_ComputeRoundTripTime(tp, &tp->firstSent, peer);
2419 #ifdef ADAPT_WINDOW
2420 rxi_ComputeRate(peer, call, tp, np, ap->reason);
2421 #endif
2423 #endif /* ADAPT_PERF */
2426 * Set the acknowledge flag per packet based on the
2427 * information in the ack packet. It's possible for an
2428 * acknowledged packet to be downgraded
2430 if (tp->header.seq < first + nAcks) {
2431 /* Explicit ack information: set it in the packet appropriately */
2432 tp->acked = (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK);
2433 } else {
2435 * No ack information: the packet may have been
2436 * acknowledged previously, but that is now rescinded (the
2437 * peer may have reduced the window size)
2439 tp->acked = 0;
2443 #ifdef ADAPT_PERF
2445 * If packet isn't yet acked, and it has been transmitted at least
2446 * once, reset retransmit time using latest timeout
2447 * ie, this should readjust the retransmit timer for all outstanding
2448 * packets... So we don't just retransmit when we should know better
2451 if (!tp->acked && tp->header.serial) {
2452 tp->retryTime = tp->timeSent;
2453 clock_Add(&tp->retryTime, &peer->timeout);
2454 /* shift by eight because one quarter-sec ~ 256 milliseconds */
2455 clock_Addmsec(&(tp->retryTime), ((unsigned long) tp->backoff) << 8);
2457 #endif /* ADAPT_PERF */
2460 * If the packet isn't yet acked, and its serial number
2461 * indicates that it was transmitted before the packet which
2462 * prompted the acknowledge (that packet's serial number is
2463 * supplied in the ack packet), then schedule the packet to be
2464 * transmitted *soon*. This is done by resetting the
2465 * retransmit time in the packet to the current time.
2466 * Actually this is slightly more intelligent: to guard
2467 * against packets that have been transmitted out-of-order by
2468 * the network (this even happens on the local token ring with
2469 * our IBM RT's!), the degree of out-of-orderness (skew) of
2470 * the packet is compared against the maximum skew for this
2471 * peer. If it is less, we don't retransmit yet. Note that
2472 * we don't do this for packets with zero serial numbers: they
2473 * never have been transmitted.
2477 * I don't know if we should add in the new retransmit backoff time
2478 * here or not. I think that we should also consider reducing
2479 * the "congestion window" size as an alternative. LWSXXX
2482 if (!tp->acked && tp->header.serial
2483 && ((tp->header.serial + skew) <= serial)) {
2484 rx_stats.dataPacketsPushed++;
2485 clock_GetTime(&tp->retryTime);
2486 needRxStart = 1;
2488 dpf(("Pushed packet seq %d serial %d, new time %d.%d\n",
2489 tp->header.seq, tp->header.serial, tp->retryTime.sec,
2490 tp->retryTime.usec / 1000));
2494 if (ap->reason == RX_ACK_DUPLICATE) {
2496 * Other end receives duplicates because either:
2497 * A. acks where lost
2498 * B. receiver gets scheduled in an unpredictable way
2499 * C. we have a busted timer
2501 * To fix B & C wait for new acks to update srtt and mdev. In
2502 * the meantime, increase mdev to increase the retransmission
2503 * timeout.
2505 dubious_timeout(peer);
2509 * If the window has been extended by this acknowledge packet,
2510 * then wakeup a sender waiting in alloc for window space, or try
2511 * sending packets now, if he's been sitting on packets due to
2512 * lack of window space
2514 if (call->tnext < (call->tfirst + call->twind)) {
2515 #ifdef RX_ENABLE_LOCKS
2516 RX_MUTEX_ENTER(&call->lockw);
2517 cv_signal(&call->cv_twind);
2518 RX_MUTEX_EXIT(&call->lockw);
2519 #else
2520 if (call->flags & RX_CALL_WAIT_WINDOW_ALLOC) {
2521 call->flags &= ~RX_CALL_WAIT_WINDOW_ALLOC;
2522 osi_rxWakeup(&call->twind);
2524 #endif
2525 if (call->flags & RX_CALL_WAIT_WINDOW_SEND) {
2526 call->flags &= ~RX_CALL_WAIT_WINDOW_SEND;
2527 needRxStart = 1;
2531 * if the ack packet has a receivelen field hanging off it,
2532 * update our state
2534 if (np->length >= rx_AckDataSize(ap->nAcks) + 4) {
2535 unsigned long maxPacketSize;
2537 rx_packetread(np, rx_AckDataSize(ap->nAcks), 4, &maxPacketSize);
2538 maxPacketSize = (unsigned long) ntohl(maxPacketSize);
2539 dpf(("maxPacketSize=%ul\n", maxPacketSize));
2542 * sanity check - peer might have restarted with different params.
2543 * If peer says "send less", dammit, send less... Peer should never
2544 * be unable to accept packets of the size that prior AFS versions
2545 * would send without asking.
2547 if (OLD_MAX_PACKET_SIZE <= maxPacketSize)
2548 conn->maxPacketSize = MIN(maxPacketSize, conn->peer->packetSize);
2550 /* if (needRxStart) rxi_Start(0, call); */
2551 rxi_Start(0, call); /* Force rxi_Restart for now: skew
2552 * problems!!! */
2553 return np;
2556 /* Post a new challenge-event, this is to resend lost packets. */
2557 static void
2558 rxi_resend_ChallengeEvent(struct rx_connection *conn)
2560 struct clock when;
2562 if (conn->challengeEvent)
2563 rxevent_Cancel(conn->challengeEvent);
2565 clock_GetTime(&when);
2566 when.sec += RX_CHALLENGE_TIMEOUT;
2567 conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent,
2568 conn, NULL);
2571 /* Received a response to a challenge packet */
2572 struct rx_packet *
2573 rxi_ReceiveResponsePacket(struct rx_connection * conn,
2574 struct rx_packet * np)
2576 int error;
2578 /* Ignore the packet if we're the client */
2579 if (conn->type == RX_CLIENT_CONNECTION)
2580 return np;
2582 /* If already authenticated, ignore the packet (it's probably a retry) */
2583 if (RXS_CheckAuthentication(conn->securityObject, conn) == 0)
2584 return np;
2586 /* Otherwise, have the security object evaluate the response packet */
2587 error = RXS_CheckResponse(conn->securityObject, conn, np);
2588 if (error == RX_AUTH_REPLY) {
2589 rxi_SendSpecial(NULL, conn, np, RX_PACKET_TYPE_CHALLENGE,
2590 NULL, -1);
2591 rxi_resend_ChallengeEvent(conn);
2592 } else if (error) {
2594 * If the response is invalid, reset the connection, sending an abort
2595 * to the peer
2597 #ifndef KERNEL
2598 IOMGR_Sleep(1);
2599 #endif
2600 rxi_ConnectionError(conn, error);
2601 return rxi_SendConnectionAbort(conn, np);
2602 } else {
2604 * If the response is valid, any calls waiting to attach servers can
2605 * now do so
2607 int i;
2609 for (i = 0; i < RX_MAXCALLS; i++) {
2610 struct rx_call *call = conn->call[i];
2612 if (call && (call->state == RX_STATE_PRECALL))
2613 rxi_AttachServerProc(call);
2616 return np;
2620 * A client has received an authentication challenge: the security
2621 * object is asked to cough up a respectable response packet to send
2622 * back to the server. The server is responsible for retrying the
2623 * challenge if it fails to get a response.
2626 struct rx_packet *
2627 rxi_ReceiveChallengePacket(struct rx_connection * conn,
2628 struct rx_packet * np)
2630 int error;
2632 /* Ignore the challenge if we're the server */
2633 if (conn->type == RX_SERVER_CONNECTION)
2634 return np;
2637 * Ignore the challenge if the connection is otherwise idle; someone's
2638 * trying to use us as an oracle.
2640 if (!rxi_HasActiveCalls(conn))
2641 return np;
2644 * Send the security object the challenge packet. It is expected to fill
2645 * in the response.
2647 error = RXS_GetResponse(conn->securityObject, conn, np);
2650 * If the security object is unable to return a valid response, reset the
2651 * connection and send an abort to the peer. Otherwise send the response
2652 * packet to the peer connection.
2654 if (error) {
2655 rxi_ConnectionError(conn, error);
2656 np = rxi_SendConnectionAbort(conn, np);
2657 } else {
2658 np = rxi_SendSpecial((struct rx_call *) 0, conn, np,
2659 RX_PACKET_TYPE_RESPONSE, (char *) 0, -1);
2661 return np;
2666 * Find an available server process to service the current request in
2667 * the given call structure. If one isn't available, queue up this
2668 * call so it eventually gets one
2670 void
2671 rxi_AttachServerProc(struct rx_call * call)
2673 struct rx_serverQueueEntry *sq;
2674 struct rx_service *service = call->conn->service;
2676 /* May already be attached */
2677 if (call->state == RX_STATE_ACTIVE)
2678 return;
2680 if (!QuotaOK(service) || queue_IsEmpty(&rx_idleServerQueue)) {
2682 * If there are no processes available to service this call,
2683 * put the call on the incoming call queue (unless it's
2684 * already on the queue).
2686 if (!(call->flags & RX_CALL_WAIT_PROC)) {
2687 call->flags |= RX_CALL_WAIT_PROC;
2688 rx_nWaiting++;
2689 rxi_calltrace(RX_CALL_ARRIVAL, call);
2690 queue_Append(&rx_incomingCallQueue, call);
2692 } else {
2693 sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
2695 RX_MUTEX_ENTER(&sq->lock);
2697 queue_Remove(sq);
2698 sq->newcall = call;
2699 if (call->flags & RX_CALL_WAIT_PROC) {
2700 /* Conservative: I don't think this should happen */
2701 call->flags &= ~RX_CALL_WAIT_PROC;
2702 rx_nWaiting--;
2703 queue_Remove(call);
2705 call->state = RX_STATE_ACTIVE;
2706 call->mode = RX_MODE_RECEIVING;
2707 if (call->flags & RX_CALL_CLEARED) {
2708 /* send an ack now to start the packet flow up again */
2709 call->flags &= ~RX_CALL_CLEARED;
2710 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY);
2712 service->nRequestsRunning++;
2713 if (service->nRequestsRunning <= service->minProcs)
2714 rxi_minDeficit--;
2715 rxi_availProcs--;
2716 #ifdef RX_ENABLE_LOCKS
2717 cv_signal(&sq->cv);
2718 #else
2719 osi_rxWakeup(sq);
2720 #endif
2721 RX_MUTEX_EXIT(&sq->lock);
2726 * Delay the sending of an acknowledge event for a short while, while
2727 * a new call is being prepared (in the case of a client) or a reply
2728 * is being prepared (in the case of a server). Rather than sending
2729 * an ack packet, an ACKALL packet is sent.
2731 void
2732 rxi_AckAll(struct rxevent * event, struct rx_call * call, char *dummy)
2734 if (event)
2735 call->delayedAckEvent = (struct rxevent *) 0;
2736 rxi_SendSpecial(call, call->conn, (struct rx_packet *) 0,
2737 RX_PACKET_TYPE_ACKALL, (char *) 0, 0);
2740 void
2741 rxi_SendDelayedAck(struct rxevent * event, struct rx_call * call,
2742 char *dummy)
2744 if (event)
2745 call->delayedAckEvent = (struct rxevent *) 0;
2746 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY);
2750 * Clear out the transmit queue for the current call (all packets have
2751 * been received by peer)
2753 void
2754 rxi_ClearTransmitQueue(struct rx_call * call)
2756 struct rx_packet *p, *tp;
2758 for (queue_Scan(&call->tq, p, tp, rx_packet)) {
2759 queue_Remove(p);
2760 rxi_FreePacket(p);
2763 rxevent_Cancel(call->resendEvent);
2764 call->tfirst = call->tnext; /*
2765 * implicitly acknowledge all data already sent
2767 RX_MUTEX_ENTER(&call->lockw);
2768 #ifdef RX_ENABLE_LOCKS
2769 cv_signal(&call->cv_twind);
2770 #else
2771 osi_rxWakeup(&call->twind);
2772 #endif
2773 RX_MUTEX_EXIT(&call->lockw);
2776 void
2777 rxi_ClearReceiveQueue(struct rx_call * call)
2779 struct rx_packet *p, *tp;
2781 for (queue_Scan(&call->rq, p, tp, rx_packet)) {
2782 queue_Remove(p);
2783 rxi_FreePacket(p);
2785 if (call->state == RX_STATE_PRECALL)
2786 call->flags |= RX_CALL_CLEARED;
2789 /* Send an abort packet for the specified call */
2790 struct rx_packet *
2791 rxi_SendCallAbort(struct rx_call * call, struct rx_packet * packet)
2793 if (call->error) {
2794 int32_t error;
2796 error = htonl(call->error);
2797 packet = rxi_SendSpecial(call, call->conn, packet, RX_PACKET_TYPE_ABORT,
2798 (char *) &error, sizeof(error));
2800 return packet;
2804 * Send an abort packet for the specified connection. Np is an
2805 * optional packet that can be used to send the abort.
2807 struct rx_packet *
2808 rxi_SendConnectionAbort(struct rx_connection * conn,
2809 struct rx_packet * packet)
2811 if (conn->error) {
2812 int32_t error;
2814 error = htonl(conn->error);
2815 packet = rxi_SendSpecial((struct rx_call *) 0, conn, packet,
2816 RX_PACKET_TYPE_ABORT, (char *) &error, sizeof(error));
2818 return packet;
2822 * Associate an error all of the calls owned by a connection. Called
2823 * with error non-zero. This is only for really fatal things, like
2824 * bad authentication responses. The connection itself is set in
2825 * error at this point, so that future packets received will be
2826 * rejected.
2828 void
2829 rxi_ConnectionError(struct rx_connection * conn, int32_t error)
2831 if (error) {
2832 int i;
2834 if (conn->challengeEvent)
2835 rxevent_Cancel(conn->challengeEvent);
2836 for (i = 0; i < RX_MAXCALLS; i++) {
2837 struct rx_call *call = conn->call[i];
2839 if (call)
2840 rxi_CallError(call, error);
2842 conn->error = error;
2843 rx_stats.fatalErrors++;
2847 /* Reset all of the calls associated with a connection. */
2848 void
2849 rxi_ResetConnection(struct rx_connection * conn)
2851 int i;
2853 for (i = 0; i < RX_MAXCALLS; i++) {
2854 struct rx_call *call = conn->call[i];
2856 if (call)
2857 rxi_ResetCall(call);
2860 /* get rid of pending events that could zap us later */
2861 if (conn->challengeEvent)
2862 rxevent_Cancel(conn->challengeEvent);
2865 void
2866 rxi_CallError(struct rx_call * call, int32_t error)
2868 if (call->error)
2869 error = call->error;
2870 rxi_ResetCall(call);
2871 call->error = error;
2872 call->mode = RX_MODE_ERROR;
2876 * Reset various fields in a call structure, and wakeup waiting
2877 * processes. Some fields aren't changed: state & mode are not
2878 * touched (these must be set by the caller), and bufptr, nLeft, and
2879 * nFree are not reset, since these fields are manipulated by
2880 * unprotected macros, and may only be reset by non-interrupting code.
2882 #ifdef ADAPT_WINDOW
2883 /* this code requires that call->conn be set properly as a pre-condition. */
2884 #endif /* ADAPT_WINDOW */
2886 void
2887 rxi_ResetCall(struct rx_call * call)
2889 int flags;
2891 /* Notify anyone who is waiting for asynchronous packet arrival */
2892 if (call->arrivalProc) {
2893 (*call->arrivalProc) (call, call->arrivalProcHandle,
2894 call->arrivalProcArg);
2895 call->arrivalProc = (void (*) ()) 0;
2897 flags = call->flags;
2898 rxi_ClearReceiveQueue(call);
2899 rxi_ClearTransmitQueue(call);
2900 call->error = 0;
2901 call->flags = 0;
2902 call->rwind = rx_Window; /* XXXX */
2903 #ifdef ADAPT_WINDOW
2904 call->twind = call->conn->peer->maxWindow; /* XXXX */
2905 #else
2906 /* "Slow start" every call. */
2907 call->twind = rx_initialWindow;
2908 #endif
2910 call->tfirst = call->rnext = call->tnext = 1;
2911 call->rprev = 0;
2912 call->lastAcked = 0;
2913 call->localStatus = call->remoteStatus = 0;
2915 RX_MUTEX_ENTER(&call->lockq);
2916 if (flags & RX_CALL_READER_WAIT) {
2917 #ifdef RX_ENABLE_LOCKS
2918 /* cv_signal(&call->cv_rq);*/
2919 cv_broadcast(&call->cv_rq);
2920 #else
2921 osi_rxWakeup(&call->rq);
2922 #endif
2924 RX_MUTEX_EXIT(&call->lockq);
2925 if (flags & RX_CALL_WAIT_PACKETS)
2926 rxi_PacketsUnWait(); /* XXX */
2927 RX_MUTEX_ENTER(&call->lockw);
2929 #ifdef RX_ENABLE_LOCKS
2930 cv_signal(&call->cv_twind);
2931 #else
2932 if (flags & RX_CALL_WAIT_WINDOW_ALLOC)
2933 osi_rxWakeup(&call->twind);
2934 #endif
2935 RX_MUTEX_EXIT(&call->lockw);
2937 if (queue_IsOnQueue(call)) {
2938 queue_Remove(call);
2939 if (flags & RX_CALL_WAIT_PROC)
2940 rx_nWaiting--;
2942 rxi_KeepAliveOff(call);
2943 rxevent_Cancel(call->delayedAckEvent);
2947 * Send an acknowledge for the indicated packet (seq,serial) of the
2948 * indicated call, for the indicated reason (reason). This
2949 * acknowledge will specifically acknowledge receiving the packet, and
2950 * will also specify which other packets for this call have been
2951 * received. This routine returns the packet that was used to the
2952 * caller. The caller is responsible for freeing it or re-using it.
2953 * This acknowledgement also returns the highest sequence number
2954 * actually read out by the higher level to the sender; the sender
2955 * promises to keep around packets that have not been read by the
2956 * higher level yet (unless, of course, the sender decides to abort
2957 * the call altogether). Any of p, seq, serial, pflags, or reason may
2958 * be set to zero without ill effect. That is, if they are zero, they
2959 * will not convey any information.
2960 * NOW there is a trailer field, after the ack where it will safely be
2961 * ignored by mundanes, which indicates the maximum size packet this
2962 * host can swallow.
2964 struct rx_packet *
2965 rxi_SendAck(struct rx_call * call,
2966 struct rx_packet * optionalPacket, int seq, int serial,
2967 int pflags, int reason)
2968 #if 0
2969 struct rx_call *call;
2970 struct rx_packet *optionalPacket; /* use to send ack (or null) */
2971 int seq; /* Sequence number of the packet we
2972 * are acking */
2973 int serial; /* Serial number of the packet */
2974 int pflags; /* Flags field from packet header */
2975 int reason; /* Reason an acknowledge was prompted */
2977 #endif
2979 struct rx_ackPacket *ap;
2980 struct rx_packet *rqp;
2981 struct rx_packet *nxp; /* For queue_Scan */
2982 struct rx_packet *p;
2983 u_char offset;
2984 long templ;
2986 if (call->rnext > call->lastAcked)
2987 call->lastAcked = call->rnext;
2989 p = optionalPacket;
2991 if (p) {
2992 rx_computelen(p, p->length); /* reset length, you never know */
2994 /* where that's been... */
2995 else if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL)))
2996 osi_Panic("rxi_SendAck");
2998 #ifdef SOFT_ACK
2999 call->nSoftAcks = 0;
3000 call->nHardAcks = 0;
3001 #endif /* SOFT_ACK */
3003 templ = rx_AckDataSize(call->rwind) + 4 - rx_GetDataSize(p);
3004 if (templ > 0) {
3005 if (rxi_AllocDataBuf(p, templ))
3006 return optionalPacket;
3007 templ = rx_AckDataSize(call->rwind) + 4;
3008 if (rx_Contiguous(p) < templ)
3009 return optionalPacket;
3010 } /* MTUXXX failing to send an ack is
3011 * very serious. We should */
3012 /* try as hard as possible to send even a partial ack; it's */
3013 /* better than nothing. */
3014 ap = (struct rx_ackPacket *) rx_DataOf(p);
3015 ap->bufferSpace = htonl(0); /* Something should go here, sometime */
3016 ap->reason = reason;
3018 /* The skew computation used to bullshit, I think it's better now. */
3019 /* We should start paying attention to skew. XXX */
3020 ap->serial = htonl(call->conn->maxSerial);
3021 ap->maxSkew = 0; /* used to be peer->inPacketSkew */
3023 ap->firstPacket = htonl(call->rnext); /*
3024 * First packet not yet forwarded
3025 * to reader
3027 ap->previousPacket = htonl(call->rprev); /* Previous packet received */
3030 * No fear of running out of ack packet here because there can only be
3031 * at most one window full of unacknowledged packets. The window size
3032 * must be constrained to be less than the maximum ack size, of course.
3033 * Also, an ack should always fit into a single packet -- it should not
3034 * ever be fragmented.
3036 for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
3037 while (rqp->header.seq > call->rnext + offset)
3038 ap->acks[offset++] = RX_ACK_TYPE_NACK;
3039 ap->acks[offset++] = RX_ACK_TYPE_ACK;
3041 ap->nAcks = offset;
3042 p->length = rx_AckDataSize(offset) + 4;
3043 templ = htonl(rx_maxReceiveSize);
3044 rx_packetwrite(p, rx_AckDataSize(offset), 4, &templ);
3045 p->header.serviceId = call->conn->serviceId;
3046 p->header.cid = (call->conn->cid | call->channel);
3047 p->header.callNumber = *call->callNumber;
3048 p->header.seq = seq;
3049 p->header.securityIndex = call->conn->securityIndex;
3050 p->header.epoch = call->conn->epoch;
3051 p->header.type = RX_PACKET_TYPE_ACK;
3052 p->header.flags = 0;
3053 if (reason == RX_ACK_PING) {
3054 p->header.flags |= RX_REQUEST_ACK;
3055 #ifdef ADAPT_WINDOW
3056 clock_GetTime(&call->pingRequestTime);
3057 #endif
3059 if (call->conn->type == RX_CLIENT_CONNECTION)
3060 p->header.flags |= RX_CLIENT_INITIATED;
3063 #ifdef RXDEBUG
3064 if (Log) {
3065 fprintf(Log, "SACK: reason %x previous %lu seq %lu first %lu",
3066 ap->reason,
3067 (unsigned long)ntohl(ap->previousPacket),
3068 (unsigned long)p->header.seq,
3069 (unsigned long)ntohl(ap->firstPacket));
3070 if (ap->nAcks) {
3071 for (offset = 0; offset < ap->nAcks; offset++)
3072 putc(ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*', Log);
3074 putc('\n', Log);
3076 #endif
3079 int i, nbytes = p->length;
3081 for (i = 1; i < p->niovecs; i++) { /* vec 0 is ALWAYS header */
3082 if (nbytes <= p->wirevec[i].iov_len) {
3083 int savelen, saven;
3085 savelen = p->wirevec[i].iov_len;
3086 saven = p->niovecs;
3087 p->wirevec[i].iov_len = nbytes;
3088 p->niovecs = i + 1;
3089 rxi_Send(call, p);
3090 p->wirevec[i].iov_len = savelen;
3091 p->niovecs = saven;
3092 break;
3093 } else
3094 nbytes -= p->wirevec[i].iov_len;
3097 rx_stats.ackPacketsSent++;
3098 if (!optionalPacket)
3099 rxi_FreePacket(p);
3100 return optionalPacket; /* Return packet for re-use by caller */
3105 * This routine is called when new packets are readied for
3106 * transmission and when retransmission may be necessary, or when the
3107 * transmission window or burst count are favourable. This should be
3108 * better optimized for new packets, the usual case, now that we've
3109 * got rid of queues of send packets. XXXXXXXXXXX
3111 void
3112 rxi_Start(struct rxevent * event, struct rx_call * call)
3114 int nSent = 0;
3115 struct rx_packet *p;
3116 struct rx_packet *nxp; /* Next pointer for queue_Scan */
3117 struct rx_packet *lastPacket;
3118 struct rx_peer *peer = call->conn->peer;
3119 struct clock now, retryTime;
3120 int haveEvent;
3123 * If rxi_Start is being called as a result of a resend event,
3124 * then make sure that the event pointer is removed from the call
3125 * structure, since there is no longer a per-call retransmission
3126 * event pending.
3128 if (event && event == call->resendEvent)
3129 call->resendEvent = 0;
3131 if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
3133 * Get clock to compute the re-transmit time for any packets
3134 * in this burst. Note, if we back off, it's reasonable to
3135 * back off all of the packets in the same manner, even if
3136 * some of them have been retransmitted more times than more
3137 * recent additions
3139 clock_GetTime(&now);
3140 retryTime = now; /* initialize before use */
3141 clock_Add(&retryTime, &peer->timeout);
3144 * Send (or resend) any packets that need it, subject to
3145 * window restrictions and congestion burst control
3146 * restrictions. Ask for an ack on the last packet sent in
3147 * this burst. For now, we're relying upon the window being
3148 * considerably bigger than the largest number of packets that
3149 * are typically sent at once by one initial call to
3150 * rxi_Start. This is probably bogus (perhaps we should ask
3151 * for an ack when we're half way through the current
3152 * window?). Also, for non file transfer applications, this
3153 * may end up asking for an ack for every packet. Bogus. XXXX
3155 call->flags |= RX_CALL_TQ_BUSY;
3156 for (lastPacket = (struct rx_packet *) 0,
3157 queue_Scan(&call->tq, p, nxp, rx_packet)) {
3158 if (p->acked) {
3159 rx_stats.ignoreAckedPacket++;
3160 continue; /* Ignore this packet if it has been
3161 * acknowledged */
3164 * Turn off all flags except these ones, which are the same
3165 * on each transmission
3167 p->header.flags &= RX_PRESET_FLAGS;
3169 if (p->header.seq >= call->tfirst + call->twind) {
3170 call->flags |= RX_CALL_WAIT_WINDOW_SEND; /*
3171 * Wait for transmit
3172 * window
3175 * Note: if we're waiting for more window space, we can
3176 * still send retransmits; hence we don't return here, but
3177 * break out to schedule a retransmit event
3179 break;
3182 * If we're not allowed to send any more in the current
3183 * burst, make sure we get scheduled later. Also schedule
3184 * an event to "give back" the packets we've used, when the
3185 * burst time has elapsed (if we used any packets at all).
3187 /* XXX this need to go away and replaced with congestion
3188 * avoidance */
3189 if (peer->burstSize && !peer->burst) {
3190 if (nSent) {
3191 /* Send off the prior packet */
3193 * Don't request an ack if it's a short packet, 'cuz the
3194 * peer will cut down its MTU as a result on 3.3 clients.
3196 if ((lastPacket->header.flags & RX_LAST_PACKET) == 0) {
3197 if (/* call->cwind <= (u_short)call->ackRate || */
3198 (!(call->flags & RX_CALL_SLOW_START_OK)
3199 && (lastPacket->header.seq & 1))) {
3201 lastPacket->header.flags |= RX_REQUEST_ACK;
3204 rxi_Send(call, lastPacket);
3205 rxi_ScheduleDecongestionEvent(call, nSent);
3207 rxi_CongestionWait(call);
3209 * Note: if we're waiting for congestion to ease, we can't
3210 * send any packets, including retransmits. Hence we do
3211 * not schedule a new retransmit event right now
3213 call->flags &= ~RX_CALL_TQ_BUSY;
3214 return;
3217 * Transmit the packet if it has never been sent, or
3218 * retransmit it if the current timeout for this host for
3219 * this packet has elapsed
3221 if (!clock_IsZero(&p->retryTime)) {
3222 struct clock updatedRetryTime;
3224 if (clock_Lt(&now, &p->retryTime))
3225 continue;
3228 * If the RTT has gone up since the packet
3229 * was sent, don't retransmit just yet!
3231 updatedRetryTime = p->timeSent;
3232 clock_Add(&updatedRetryTime, &peer->timeout);
3233 if (clock_Lt(&now, &updatedRetryTime)) {
3234 p->retryTime = updatedRetryTime;
3235 continue;
3239 * If we have to retransmit chances are that we have a
3240 * busted timer. Increase MDEV to reflect this
3241 * fact. If we are wrong, MDEV will come down quickly
3242 * as new acks arrive.
3244 dubious_timeout(peer);
3247 * Always request an ack on a retransmitted packet; this
3248 * will help to get the data moving again, especially if
3249 * the packet is near the beginning of the window.
3250 * Actually, XXXX, we may want to do just that: only
3251 * request the acks if the packet is in, say, the first
3252 * half of the window
3254 p->header.flags |= RX_REQUEST_ACK;
3255 peer->reSends++, rx_stats.dataPacketsReSent++;
3256 p->retryTime = retryTime;
3259 * if a packet gets dropped, don't keep hammering on it --
3260 * back off exponentially, at least up to a point. I had
3261 * to trade off global congestion avoidance against individual
3262 * performance. Note that most connections will time out
3263 * after 20 - 60 seconds. In pathological cases, retransmits
3264 * must still continue to disperse. For instance, there is
3265 * a condition where the server discards received packets, but
3266 * still sends keep-alives on the call, so the call may live
3267 * much longer than 60 seconds.
3269 if (p->backoff < MAXBACKOFF)
3270 p->backoff = (p->backoff << 1) + 1; /* so it can't stay == 0 */
3271 else
3272 p->backoff++;
3273 clock_Addmsec(&(p->retryTime), ((unsigned long) p->backoff) << 8);
3274 /* consider shrinking the packet size? XXXX */
3275 /* no, shrink the burst size. LWSXXX */
3276 } else {
3277 peer->nSent++, rx_stats.dataPacketsSent++;
3278 p->firstSent = now; /* improved RTO calculation- not Karn */
3279 p->retryTime = retryTime;
3281 * Ask for an ack for the first packet on a new
3282 * connection, since it may carry some interesting info
3283 * like maxReceiveSize. It will also help us train to a
3284 * new estimate of RTT, for good or bad. This has one
3285 * neat side effect: since the first call on a connection
3286 * usually triggers a challenge/response exchange, the
3287 * first packet was often retransmitted before the call
3288 * could complete. Getting this ack prevents those
3289 * retransmissions. Admittedly, this is straining at gnats.
3291 if ((p->header.callNumber == 1) && (p->header.seq == 1) &&
3292 (p->length >= call->conn->maxPacketSize)) {
3293 p->header.flags |= RX_REQUEST_ACK;
3298 * Install the new retransmit time for the packet, and
3299 * record the time sent
3301 p->timeSent = now;
3304 * Send the previous packet, and remember this one--we don't
3305 * send it immediately, so we can tag it as requiring an ack
3306 * later, if necessary
3308 if (peer->burstSize)
3309 peer->burst--;
3310 nSent++;
3311 if (lastPacket) {
3313 * Tag this packet as not being the last in this group,
3314 * for the receiver's benefit
3316 lastPacket->header.flags |= RX_MORE_PACKETS;
3317 rxi_Send(call, lastPacket);
3319 lastPacket = p;
3321 call->flags &= ~RX_CALL_TQ_BUSY;
3324 * If any packets are to be sent, send them and post a
3325 * decongestion event to bump the burst count that we used up
3326 * sending the packets
3328 if (nSent) {
3331 * we don't ask for an ack on the final packet, since the
3332 * response from the peer implicitly acks it, but we do wait a
3333 * little longer for the ack on the last packet on server conns.
3335 if ((lastPacket->header.flags & RX_LAST_PACKET) == 0) {
3338 * to get the window up faster we ack every packet as
3339 * long as we are below the fast ack window, or if the
3340 * client doesn't support slow start, every second packet
3342 if (/* call->cwind <= (u_short)call->ackRate || */
3343 (!(call->flags & RX_CALL_SLOW_START_OK)
3344 && (lastPacket->header.seq & 1))) {
3346 lastPacket->header.flags |= RX_REQUEST_ACK;
3348 } else if (!(lastPacket->header.flags & RX_CLIENT_INITIATED))
3349 clock_Addmsec(&(lastPacket->retryTime), 400);
3351 rxi_Send(call, lastPacket);
3352 if (peer->burstSize)
3353 rxi_ScheduleDecongestionEvent(call, nSent);
3357 * Always post a resend event, if there is anything in the queue, and
3358 * resend is possible. There should be at least one unacknowledged
3359 * packet in the queue ... otherwise none of these packets should be
3360 * on the queue in the first place.
3362 if (call->resendEvent) {
3365 * If there's an existing resend event, then if its expiry time
3366 * is sooner than the new one, then it must be less than any
3367 * possible expiry time (because it represents all previous
3368 * packets sent that may still need retransmitting). In this
3369 * case, just leave that event as scheduled
3371 if (clock_Le(&call->resendEvent->eventTime, &retryTime))
3372 return;
3373 /* Otherwise, cancel the existing event and post a new one */
3374 rxevent_Cancel(call->resendEvent);
3378 * Loop to find the earliest event. I *know* XXXX that this can be
3379 * coded more elegantly (perhaps rolled into the above code)
3381 for (haveEvent = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
3382 if (!p->acked && !clock_IsZero(&p->retryTime)) {
3383 haveEvent = 1;
3384 if (clock_Lt(&p->retryTime, &retryTime))
3385 retryTime = p->retryTime;
3389 /* Post a new event to re-run rxi_Start when retries may be needed */
3390 if (haveEvent) {
3391 call->resendEvent = rxevent_Post(&retryTime, rxi_Start, (void *) call, NULL);
3397 * Also adjusts the keep alive parameters for the call, to reflect
3398 * that we have just sent a packet (so keep alives aren't sent
3399 * immediately)
3401 void
3402 rxi_Send(struct rx_call *call, struct rx_packet *p)
3404 struct rx_connection *conn = call->conn;
3406 /* Stamp each packet with the user supplied status */
3407 p->header.userStatus = call->localStatus;
3410 * Allow the security object controlling this call's security to make any
3411 * last-minute changes to the packet
3413 RXS_SendPacket(conn->securityObject, call, p);
3415 /* Actually send the packet, filling in more connection-specific fields */
3416 rxi_SendPacket(conn, p);
3419 * Update last send time for this call (for keep-alive processing), and
3420 * for the connection (so that we can discover idle connections)
3422 conn->lastSendTime = call->lastSendTime = clock_Sec();
3425 * Since we've just sent SOME sort of packet to the peer, it's safe to
3426 * nuke any scheduled end-of-packets ack
3428 rxevent_Cancel(call->delayedAckEvent);
3433 * Check if a call needs to be destroyed. Called by keep-alive code to ensure
3434 * that things are fine. Also called periodically to guarantee that nothing
3435 * falls through the cracks (e.g. (error + dally) connections have keepalive
3436 * turned off. Returns 0 if conn is well, negativ otherwise.
3437 * -1 means that the call still exists, -2 means that the call is freed.
3440 static int
3441 rxi_CheckCall(struct rx_call *call)
3443 struct rx_connection *conn = call->conn;
3444 struct rx_service *tservice;
3445 u_long now;
3447 now = clock_Sec();
3450 * These are computed to the second (+- 1 second). But that's good
3451 * enough for these values, which should be a significant number of
3452 * seconds.
3454 if (now > (call->lastReceiveTime + conn->secondsUntilDead)) {
3456 if (call->state == RX_STATE_ACTIVE) {
3457 rxi_CallError(call, RX_CALL_DEAD);
3458 return -1;
3459 } else {
3460 rxi_FreeCall(call);
3461 return -2;
3465 * Non-active calls are destroyed if they are not responding to
3466 * pings; active calls are simply flagged in error, so the attached
3467 * process can die reasonably gracefully.
3471 /* see if we have a non-activity timeout */
3472 tservice = conn->service;
3473 if ((conn->type == RX_SERVER_CONNECTION) && call->startWait
3474 && tservice->idleDeadTime
3475 && ((call->startWait + tservice->idleDeadTime) < now)) {
3476 if (call->state == RX_STATE_ACTIVE) {
3477 rxi_CallError(call, RX_CALL_TIMEOUT);
3478 return -1;
3481 /* see if we have a hard timeout */
3482 if (conn->hardDeadTime && (now > (conn->hardDeadTime + call->startTime))) {
3483 if (call->state == RX_STATE_ACTIVE)
3484 rxi_CallError(call, RX_CALL_TIMEOUT);
3485 return -1;
3487 return 0;
3492 * When a call is in progress, this routine is called occasionally to
3493 * make sure that some traffic has arrived (or been sent to) the peer.
3494 * If nothing has arrived in a reasonable amount of time, the call is
3495 * declared dead; if nothing has been sent for a while, we send a
3496 * keep-alive packet (if we're actually trying to keep the call alive)
3498 void
3499 rxi_KeepAliveEvent(struct rxevent *event, struct rx_call *call,
3500 char *dummy)
3502 struct rx_connection *conn = call->conn;
3503 u_long now;
3505 call->keepAliveEvent = (struct rxevent *) 0;
3506 now = clock_Sec();
3508 if (rxi_CheckCall(call))
3509 return;
3511 /* Don't try to keep alive dallying calls */
3512 if ((call->state != RX_STATE_DALLY)
3513 && ((now - call->lastSendTime) > conn->secondsUntilPing)) {
3514 /* Don't try to send keepalives if there is unacknowledged data */
3517 * the rexmit code should be good enough, this little hack doesn't
3518 * quite work LWSXXX
3520 (void) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_PING);
3522 rxi_ScheduleKeepAliveEvent(call);
3526 void
3527 rxi_ScheduleKeepAliveEvent(struct rx_call *call)
3529 if (!call->keepAliveEvent) {
3530 struct clock when;
3532 clock_GetTime(&when);
3533 when.sec += call->conn->secondsUntilPing;
3534 call->keepAliveEvent = rxevent_Post(&when, rxi_KeepAliveEvent, call, NULL);
3538 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
3539 void
3540 rxi_KeepAliveOn(struct rx_call *call)
3544 * Pretend last packet received was received now--i.e. if another packet
3545 * isn't received within the keep alive time, then the call will die;
3546 * Initialize last send time to the current time--even if a packet hasn't
3547 * been sent yet. This will guarantee that a keep-alive is sent within
3548 * the ping time
3550 call->lastReceiveTime = call->lastSendTime = clock_Sec();
3551 rxi_ScheduleKeepAliveEvent(call);
3555 * This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
3556 * seconds) to ask the client to authenticate itself. The routine
3557 * issues a challenge to the client, which is obtained from the
3558 * security object associated with the connection
3560 void
3561 rxi_ChallengeEvent(struct rxevent *event, struct rx_connection *conn,
3562 char *dummy)
3564 conn->challengeEvent = (struct rxevent *) 0;
3565 if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
3566 struct rx_packet *packet;
3568 packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
3569 if (!packet)
3570 osi_Panic("rxi_ChallengeEvent");
3571 RXS_GetChallenge(conn->securityObject, conn, packet);
3572 rxi_SendSpecial((struct rx_call *) 0, conn, packet,
3573 RX_PACKET_TYPE_CHALLENGE, (char *) 0, -1);
3574 rxi_FreePacket(packet);
3575 rxi_resend_ChallengeEvent(conn);
3580 * Call this routine to start requesting the client to authenticate
3581 * itself. This will continue until authentication is established,
3582 * the call times out, or an invalid response is returned. The
3583 * security object associated with the connection is asked to create
3584 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
3585 * defined earlier.
3587 void
3588 rxi_ChallengeOn(struct rx_connection *conn)
3590 if (!conn->challengeEvent) {
3591 RXS_CreateChallenge(conn->securityObject, conn);
3592 rxi_ChallengeEvent((struct rxevent *) 0, conn, 0);
3597 * Called by event.c when a decongestion event (setup by
3598 * rxi_CongestionWait) occurs. This adds back in to the burst count
3599 * for the specified host the number of packets that were sent at the
3600 * time the event was scheduled. It also calls rxi_Start on as many
3601 * waiting calls as possible before the burst count goes down to zero,
3602 * again.
3604 static void
3605 rxi_DecongestionEvent(struct rxevent *event, struct rx_peer *peer,
3606 int nPackets)
3608 struct rx_call *call;
3609 struct rx_call *nxcall; /* Next pointer for queue_Scan */
3611 peer->burst += nPackets;
3612 if (peer->burst > peer->burstSize)
3613 peer->burst = peer->burstSize;
3614 for (queue_Scan(&peer->congestionQueue, call, nxcall, rx_call)) {
3615 assert(queue_IsNotEmpty(&peer->congestionQueue));
3616 assert(queue_Prev(&peer->congestionQueue, rx_call));
3617 queue_Remove(call);
3620 * The rxi_Start may put the call back on the congestion queue. In
3621 * that case, peer->burst should be 0 (otherwise no congestion was
3622 * encountered). It should go on the end of the queue, to allow
3623 * other calls to proceed when the next burst is allowed
3625 rxi_Start((struct rxevent *) 0, call);
3626 if (!peer->burst)
3627 goto done;
3629 done:
3630 peer->refCount--; /* It was bumped by the callee */
3631 return;
3635 * Schedule an event at a host-dependent time in the future which will
3636 * add back nPackets to the current allowed burst window. Any number
3637 * of these events may be scheduled.
3639 void
3640 rxi_ScheduleDecongestionEvent(struct rx_call *call, int nPackets)
3642 struct rx_peer *peer = call->conn->peer;
3643 struct clock tmp;
3645 clock_GetTime(&tmp);
3646 clock_Add(&tmp, &peer->burstWait);
3647 peer->refCount++; /* So it won't disappear underneath
3648 * us! */
3649 /* this is stupid - sending an int as a pointer is begging for trouble */
3650 rxevent_Post(&tmp, rxi_DecongestionEvent, (void *) peer, (void *)(long)nPackets);
3654 * The caller wishes to have rxi_Start called when the burst count has
3655 * gone up, and more packets can therefore be sent. Add the caller to
3656 * the end of the list of calls waiting for decongestion events to
3657 * happen. It's important that it's added to the end so that the
3658 * rxi_DecongestionEvent procedure always terminates (aside from
3659 * matters of scheduling fairness).
3661 void
3662 rxi_CongestionWait(struct rx_call *call)
3664 if (queue_IsOnQueue(call))
3665 return;
3666 assert(queue_IsNotEmpty(&call->conn->peer->congestionQueue));
3667 assert(queue_Prev(&call->conn->peer->congestionQueue, rx_call));
3668 queue_Append(&call->conn->peer->congestionQueue, call);
3672 * Compute round trip time of the packet provided, in *rttp.
3674 #ifdef ADAPT_PERF
3675 #define ADAPT_RTO
3676 #endif
3678 void
3679 rxi_ComputeRoundTripTime(struct rx_packet *p,
3680 struct clock *sentp,
3681 struct rx_peer *peer)
3683 struct clock thisRtt, *rttp = &thisRtt;
3685 #ifdef ADAPT_RTO
3686 static char id[] = "@(#)adaptive RTO";
3687 *id = *id; /* so it won't complain about unsed variables */
3689 clock_GetTime(rttp);
3690 if (clock_Lt(rttp, sentp)) {
3691 clock_Zero(rttp);
3692 return; /* somebody set the clock back, don't
3693 * count this time. */
3695 clock_Sub(rttp, sentp);
3696 #else
3697 clock_GetTime(rttp);
3698 clock_Sub(rttp, &p->timeSent);
3699 #endif
3700 if (clock_Lt(rttp, &rx_stats.minRtt))
3701 rx_stats.minRtt = *rttp;
3702 if (clock_Gt(rttp, &rx_stats.maxRtt)) {
3703 if (rttp->sec > 110)
3704 return; /* somebody set the clock ahead */
3705 rx_stats.maxRtt = *rttp;
3707 clock_Add(&rx_stats.totalRtt, rttp);
3708 rx_stats.nRttSamples++;
3710 #ifdef ADAPT_RTO
3711 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,???) */
3713 /* Apply VanJacobson round-trip estimations */
3714 if (peer->srtt) {
3715 u_long rtt;
3716 u_long error;
3718 rtt = rttp->usec + rttp->sec*1000000;
3719 if (rtt >= peer->srtt)
3720 error = rtt - peer->srtt;
3721 else
3722 error = peer->srtt - rtt;
3725 * The following magic is equivalent to the smoothing
3726 * algorithm in rfc793 with an alpha of .875
3727 * (srtt = rtt/8 + srtt*7/8 in fixed point).
3730 peer->srtt = (peer->srtt*7 + rtt)/8;
3733 * We accumulate a smoothed rtt variance (actually, a smoothed
3734 * mean difference), then set the retransmit timer to smoothed
3735 * rtt + 4 times the smoothed variance (was 2x in van's
3736 * original paper, but 4x works better for me, and apparently
3737 * for him as well).
3739 * The following is equivalent to rfc793 smoothing with an
3740 * alpha of .75 (rttvar = rttvar*3/4 + |err| / 4). This
3741 * replaces rfc793's wired-in beta.
3744 peer->mdev = (peer->mdev*3 + error)/4;
3745 } else {
3746 peer->srtt = rttp->usec + rttp->sec*1000000;
3747 peer->mdev = peer->srtt/2;
3748 /* One single measurement is a real poor estimate of RTT&MDEV */
3749 if (peer->mdev < 1000)
3750 peer->mdev = 1000; /* 1ms */
3753 update_timeout(peer);
3755 dpf(("rtt=%.2f ms, srtt=%.2f ms, mdev=%.2f ms, timeout=%.2f ms\n",
3756 rttp->usec/1000.0 + rttp->sec*1000.0,
3757 peer->srtt/1000.0, peer->mdev/1000.0,
3758 peer->timeout.sec*1000.0 + peer->timeout.usec/1000.0));
3759 #endif /* ADAPT_RTO */
3764 * Find all server connections that have not been active for a long time,
3765 * and toss them
3767 void
3768 rxi_ReapConnections(void)
3770 struct clock now;
3772 clock_GetTime(&now);
3775 * Find server connection structures that haven't been used for greater
3776 * than rx_idleConnectionTime
3779 struct rx_connection **conn_ptr, **conn_end;
3780 int i, havecalls = 0, ret;
3782 for (conn_ptr = &rx_connHashTable[0],
3783 conn_end = &rx_connHashTable[rx_hashTableSize];
3784 conn_ptr < conn_end;
3785 conn_ptr++) {
3786 struct rx_connection *conn, *next;
3788 rereap:
3789 for (conn = *conn_ptr; conn; conn = next) {
3790 next = conn->next;
3791 /* once a minute look at everything to see what's up */
3792 havecalls = 0;
3793 for (i = 0; i < RX_MAXCALLS; i++) {
3794 if (conn->call[i]) {
3795 havecalls = 1;
3796 ret = rxi_CheckCall(conn->call[i]);
3797 if (ret == -2) {
3798 /* If CheckCall freed the call, it might
3799 * have destroyed the connection as well,
3800 * which screws up the linked lists.
3802 goto rereap;
3806 if (conn->type == RX_SERVER_CONNECTION) {
3809 * This only actually destroys the connection if there
3810 * are no outstanding calls
3812 if (!havecalls && !conn->refCount &&
3813 ((conn->lastSendTime + rx_idleConnectionTime) < now.sec)) {
3814 conn->refCount++; /* it will be decr in
3815 * rx_DestroyConn */
3816 rx_DestroyConnection(conn);
3824 * Find any peer structures that haven't been used (haven't had an
3825 * associated connection) for greater than rx_idlePeerTime
3828 struct rx_peer **peer_ptr, **peer_end;
3830 for (peer_ptr = &rx_peerHashTable[0],
3831 peer_end = &rx_peerHashTable[rx_hashTableSize];
3832 peer_ptr < peer_end; peer_ptr++) {
3833 struct rx_peer *peer, *next;
3835 for (peer = *peer_ptr; peer; peer = next) {
3836 next = peer->next;
3837 if (peer->refCount == 0
3838 && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
3839 rxi_DestroyPeer(peer);
3846 * THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
3847 * rxi_AllocSendPacket, if it hits, will be handled at the next conn GC,
3848 * just below. Really, we shouldn't have to keep moving packets from one
3849 * place to another, but instead ought to always know if we can afford to
3850 * hold onto a packet in its particular use.
3852 RX_MUTEX_ENTER(&rx_waitingForPackets_lock);
3853 if (rx_waitingForPackets) {
3854 rx_waitingForPackets = 0;
3855 #ifdef RX_ENABLE_LOCKS
3856 cv_signal(&rx_waitingForPackets_cv);
3857 #else
3858 osi_rxWakeup(&rx_waitingForPackets);
3859 #endif
3861 RX_MUTEX_EXIT(&rx_waitingForPackets_lock);
3863 now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
3864 rxevent_Post(&now, rxi_ReapConnections, NULL, NULL);
3869 * rxs_Release - This isn't strictly necessary but, since the macro name from
3870 * rx.h is sort of strange this is better. This is called with a security
3871 * object before it is discarded. Each connection using a security object has
3872 * its own refcount to the object so it won't actually be freed until the last
3873 * connection is destroyed.
3875 * This is the only rxs module call. A hold could also be written but no one
3876 * needs it.
3879 int
3880 rxs_Release(struct rx_securityClass *aobj)
3882 return RXS_Close(aobj);
3885 #ifdef ADAPT_WINDOW
3886 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
3887 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
3888 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
3889 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
3892 * Adjust our estimate of the transmission rate to this peer, given
3893 * that the packet p was just acked. We can adjust peer->timeout and
3894 * call->twind (and peer->maxWindow). Pragmatically, this is called
3895 * only with packets of maximal length.
3898 static void
3899 rxi_ComputeRate(struct rx_peer *peer, struct rx_call *call,
3900 struct rx_packet *p, struct rx_packet *ackp, u_char ackReason)
3902 long xferSize, xferMs;
3903 long minTime;
3904 struct clock newTO;
3906 /* Count down packets */
3907 if (peer->rateFlag > 0)
3908 peer->rateFlag--;
3909 /* Do nothing until we're enabled */
3910 if (peer->rateFlag != 0)
3911 return;
3912 if (!call->conn)
3913 return;
3915 /* Count only when the ack seems legitimate */
3916 switch (ackReason) {
3917 case RX_ACK_REQUESTED:
3918 xferSize = p->length + RX_HEADER_SIZE +
3919 call->conn->securityMaxTrailerSize;
3920 xferMs = peer->rtt;
3921 break;
3923 case RX_ACK_PING_RESPONSE:
3924 if (p) /* want the response to ping-request,
3925 * not data send */
3926 return;
3927 clock_GetTime(&newTO);
3928 if (clock_Gt(&newTO, &call->pingRequestTime)) {
3929 clock_Sub(&newTO, &call->pingRequestTime);
3930 xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
3931 } else {
3932 return;
3934 xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
3935 break;
3937 default:
3938 return;
3941 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, "
3942 "rtt %u, win %u, ps %u)",
3943 ntohl(peer->host), ntohs(peer->port),
3944 (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
3945 xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt,
3946 peer->maxWindow, peer->packetSize));
3948 /* Track only packets that are big enough. */
3949 if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
3950 peer->packetSize)
3951 return;
3953 /* absorb RTT data (in milliseconds) for these big packets */
3954 if (peer->smRtt == 0) {
3955 peer->smRtt = xferMs;
3956 } else {
3957 peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
3958 if (!peer->smRtt)
3959 peer->smRtt = 1;
3962 if (peer->countDown) {
3963 peer->countDown--;
3964 return;
3966 peer->countDown = 10; /* recalculate only every so often */
3968 #if 0
3971 * We here assume that we can approximate the total elapsed time for a
3972 * window-full of full packets as: time = RTT + ((winSize *
3973 * (packetSize+overhead)) - minPktSize) / byteRate
3975 /* The RTT and byteRate numbers are what is measured above. */
3978 * In principle, we can change the other parameters: - winSize, the
3979 * number of packets in the transmission window; - packetSize, the max
3980 * size of a data packet; - the timeout, which must be larger than the
3981 * expected time.
3985 * In practice, we do this in two steps: (a) ensure that the timeout is
3986 * large enough for a single packet to get through; (b) ensure that the
3987 * transmit-window is small enough to fit in the timeout.
3990 /* First, an expression for the expected RTT for a full packet */
3991 minTime = peer->smRtt + ((1000 * (peer->packetSize +
3992 RX_HEADER_SIZE + RX_IPUDP_SIZE)) / peer->smBps);
3994 /* Get a reasonable estimate for a timeout period */
3995 minTime += minTime;
3996 newTO.sec = minTime / 1000;
3997 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
4000 * Increase the timeout period so that we can always do at least one
4001 * packet exchange
4003 if (clock_Gt(&newTO, &peer->timeout)) {
4005 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu "
4006 "(rtt %u, win %u, ps %u, Bps %u)",
4007 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
4008 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
4009 peer->maxWindow, peer->packetSize, peer->smBps));
4011 peer->timeout = newTO;
4013 /* Now, get an estimate for the transmit window size. */
4014 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
4017 * Now, convert to the number of full packets that could fit in that
4018 * interval
4020 minTime = ((((minTime - peer->smRtt) * peer->smBps) / 1000) +
4021 RXRATE_AVG_SMALL_PKT) /
4022 (peer->packetSize + RX_HEADER_SIZE + RX_IPUDP_SIZE);
4023 minTime >>= 1; /* Take half that many */
4024 xferSize = minTime; /* (make a copy) */
4026 /* Now clamp the size to reasonable bounds. */
4027 if (minTime <= 1)
4028 minTime = 1;
4029 else if (minTime > rx_Window)
4030 minTime = rx_Window;
4031 if (minTime != peer->maxWindow) {
4032 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, "
4033 "rtt %u, ps %u, Bps %u)",
4034 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
4035 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
4036 peer->packetSize, peer->smBps));
4038 peer->maxWindow = minTime;
4039 /* call->twind = minTime; */
4043 * Cut back on the peer timeout if it has grown unreasonably. Discern
4044 * this by calculating the timeout necessary for rx_Window packets.
4046 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
4047 /* calculate estimate for transmission interval in milliseconds */
4048 minTime = (((1000 * rx_Window *
4049 (peer->packetSize + RX_HEADER_SIZE + RX_IPUDP_SIZE))
4050 - RXRATE_AVG_SMALL_PKT) / peer->smBps) + peer->smRtt;
4051 if (minTime < 1000) {
4053 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, "
4054 "win %u, ps %u, Bps %u)",
4055 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
4056 peer->timeout.usec, peer->smRtt, peer->maxWindow,
4057 peer->packetSize, peer->smBps));
4059 newTO.sec = 0; /* cut back on timeout by half a
4060 * second */
4061 newTO.usec = 500000;
4062 clock_Sub(&peer->timeout, &newTO);
4065 #endif /* 0 */
4068 * In practice, we can measure only the RTT for full packets, because of
4069 * the way Rx acks the data that it receives. (If it's smaller than a
4070 * full packet, it often gets implicitly acked either by the call
4071 * response (from a server) or by the next call (from a client), and
4072 * either case confuses transmission times with processing times.)
4073 * Therefore, replace the above more-sophisticated processing with a
4074 * simpler version, where the smoothed RTT is kept for full-size packets,
4075 * and the time to transmit a windowful of full-size packets is simply
4076 * RTT * windowSize. Again, we take two steps: - ensure the timeout is
4077 * large enough for a single packet's RTT; - ensure that the window is
4078 * small enough to fit in the desired timeout.
4081 /* First, the timeout check. */
4082 minTime = peer->smRtt;
4083 /* Get a reasonable estimate for a timeout period */
4084 minTime += minTime;
4085 newTO.sec = minTime / 1000;
4086 newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
4089 * Increase the timeout period so that we can always do at least one
4090 * packet exchange
4092 if (clock_Gt(&newTO, &peer->timeout)) {
4094 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, "
4095 "win %u, ps %u)",
4096 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
4097 peer->timeout.usec, newTO.sec, newTO.usec, peer->smRtt,
4098 peer->maxWindow, peer->packetSize));
4100 peer->timeout = newTO;
4102 /* Now, get an estimate for the transmit window size. */
4103 minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
4106 * Now, convert to the number of full packets that could fit in a
4107 * reasonable fraction of that interval
4109 minTime /= (peer->smRtt << 1);
4110 xferSize = minTime; /* (make a copy) */
4112 /* Now clamp the size to reasonable bounds. */
4113 if (minTime <= 1)
4114 minTime = 1;
4115 else if (minTime > rx_Window)
4116 minTime = rx_Window;
4117 if (minTime != peer->maxWindow) {
4118 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, "
4119 "rtt %u, ps %u)",
4120 ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
4121 peer->timeout.sec, peer->timeout.usec, peer->smRtt,
4122 peer->packetSize));
4123 peer->maxWindow = minTime;
4124 /* call->twind = minTime; */
4128 * Cut back on the peer timeout if it had earlier grown unreasonably.
4129 * Discern this by calculating the timeout necessary for rx_Window
4130 * packets.
4132 if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
4133 /* calculate estimate for transmission interval in milliseconds */
4134 minTime = rx_Window * peer->smRtt;
4135 if (minTime < 1000) {
4136 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, "
4137 "win %u, ps %u)",
4138 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
4139 peer->timeout.usec, peer->smRtt, peer->maxWindow,
4140 peer->packetSize));
4142 newTO.sec = 0; /* cut back on timeout by half a
4143 * second */
4144 newTO.usec = 500000;
4145 clock_Sub(&peer->timeout, &newTO);
4148 return;
4149 } /* end of rxi_ComputeRate */
4151 #endif /* ADAPT_WINDOW */
4158 #ifdef RXDEBUG
4159 /* Don't call this debugging routine directly; use dpf */
4160 void
4161 rxi_DebugPrint(const char *fmt, ...)
4163 struct clock now;
4164 va_list ap;
4166 clock_GetTime(&now);
4168 fprintf(Log, " %lu.%.3lu:", now.sec, now.usec / 1000);
4169 va_start(ap, fmt);
4170 vfprintf(Log, fmt, ap);
4171 va_end(ap);
4172 putc('\n', Log);
4175 #endif
4177 void
4178 rx_PrintTheseStats(FILE *file, struct rx_stats *s, int size)
4180 #ifdef RXDEBUG
4181 int i;
4183 if (size != sizeof(struct rx_stats))
4184 fprintf(file, "Unexpected size of stats structure: was %d, "
4185 "expected %d\n",
4186 size,
4187 (int)sizeof(struct rx_stats));
4189 fprintf(file,
4190 "rx stats: free packets %d, allocs %d, "
4191 "alloc-failures(rcv %d,send %d,ack %d)\n", rx_nFreePackets,
4192 s->packetRequests, s->noPackets[0], s->noPackets[1],
4193 s->noPackets[2]);
4194 fprintf(file,
4195 " greedy %d, bogusReads %d (last from host %x), "
4196 "noPackets %d, noBuffers %d, selects %d, sendSelects %d\n",
4197 s->socketGreedy, s->bogusPacketOnRead, s->bogusHost,
4198 s->noPacketOnRead, s->noPacketBuffersOnRead, s->selects,
4199 s->sendSelects);
4200 fprintf(file, " packets read: ");
4201 for (i = 0; i < RX_N_PACKET_TYPES; i++)
4202 fprintf(file, "%s %d ", rx_packetTypes[i], s->packetsRead[i]);
4204 fprintf(file, "\n");
4205 fprintf(file,
4206 " other read counters: data %d, ack %d, dup %d "
4207 "spurious %d\n", s->dataPacketsRead, s->ackPacketsRead,
4208 s->dupPacketsRead, s->spuriousPacketsRead);
4209 fprintf(file, " packets sent: ");
4210 for (i = 0; i < RX_N_PACKET_TYPES; i++)
4211 fprintf(file, "%s %d ", rx_packetTypes[i], s->packetsSent[i]);
4212 fprintf(file, "\n");
4213 fprintf(file,
4214 " other send counters: ack %d, data %d (not resends), "
4215 "resends %d, pushed %d, acked&ignored %d\n", s->ackPacketsSent,
4216 s->dataPacketsSent, s->dataPacketsReSent, s->dataPacketsPushed,
4217 s->ignoreAckedPacket);
4218 fprintf(file,
4219 " \t(these should be small) sendFailed %lu, "
4220 "fatalErrors %lu\n",
4221 (unsigned long)s->netSendFailures,
4222 (unsigned long)s->fatalErrors);
4223 if (s->nRttSamples) {
4224 fprintf(file, " Average rtt is %0.3f, with %d samples\n",
4225 clock_Float(&s->totalRtt) / s->nRttSamples, s->nRttSamples);
4227 fprintf(file, " Minimum rtt is %0.3f, maximum is %0.3f\n",
4228 clock_Float(&s->minRtt), clock_Float(&s->maxRtt));
4230 fprintf(file,
4231 " %d server connections, %d client connections, %d "
4232 "peer structs, %d call structs, %d free call structs\n",
4233 s->nServerConns, s->nClientConns, s->nPeerStructs,
4234 s->nCallStructs, s->nFreeCallStructs);
4235 fprintf(file, " %d clock updates\n", clock_nUpdates);
4236 #endif /* RXDEBUG */
4239 /* for backward compatibility */
4240 void
4241 rx_PrintStats(FILE *file)
4243 #ifdef RXDEBUG
4244 rx_PrintTheseStats(file, &rx_stats, sizeof(rx_stats));
4245 #endif
4248 void
4249 rx_PrintPeerStats(FILE *file, struct rx_peer *peer)
4251 #ifdef RXDEBUG
4252 fprintf(file, "Peer %lx.%d. Burst size %d, burst wait %lu.%ld.\n",
4253 (unsigned long)ntohl(peer->host),
4254 peer->port,
4255 peer->burstSize,
4256 (unsigned long)peer->burstWait.sec,
4257 (unsigned long)peer->burstWait.usec);
4258 fprintf(file, " Rtt %lu us, retry time %lu.%06ld, total sent %d, resent %d\n",
4259 peer->srtt,
4260 (unsigned long)peer->timeout.sec,
4261 (long)peer->timeout.usec,
4262 peer->nSent,
4263 peer->reSends);
4264 fprintf(file,
4265 " Packet size %d, max in packet skew %ld, max out packet "
4266 "skew %ld\n", peer->packetSize, peer->inPacketSkew,
4267 peer->outPacketSkew);
4268 #endif /* RXDEBUG */
4271 void
4272 shutdown_rx(void)
4274 struct rx_serverQueueEntry *np;
4275 int i, j;
4277 rxinit_status = 0;
4279 struct rx_peer **peer_ptr, **peer_end;
4281 for (peer_ptr = &rx_peerHashTable[0],
4282 peer_end = &rx_peerHashTable[rx_hashTableSize];
4283 peer_ptr < peer_end; peer_ptr++) {
4284 struct rx_peer *peer, *next;
4286 for (peer = *peer_ptr; peer; peer = next) {
4287 next = peer->next;
4288 rxi_DestroyPeer(peer);
4292 for (i = 0; i < RX_MAX_SERVICES; i++) {
4293 if (rx_services[i])
4294 rxi_Free(rx_services[i], sizeof(*rx_services));
4296 for (i = 0; i < rx_hashTableSize; i++) {
4297 struct rx_connection *tc, *ntc;
4299 for (tc = rx_connHashTable[i]; tc; tc = ntc) {
4300 ntc = tc->next;
4301 for (j = 0; j < RX_MAXCALLS; j++) {
4302 if (tc->call[j]) {
4303 rxi_Free(tc->call[j], sizeof(*(tc->call)));
4306 rxi_Free(tc, sizeof(tc));
4310 RX_MUTEX_ENTER(&freeSQEList_lock);
4312 while ((np = rx_FreeSQEList) != NULL) {
4313 rx_FreeSQEList = *(struct rx_serverQueueEntry **) np;
4314 RX_MUTEX_DESTROY(&np->lock);
4315 rxi_Free(np, sizeof(np));
4318 RX_MUTEX_EXIT(&freeSQEList_lock);
4319 RX_MUTEX_DESTROY(&freeSQEList_lock);
4320 RX_MUTEX_DESTROY(&rx_waitingForPackets_lock);
4321 RX_MUTEX_DESTROY(&rx_freeCallQueue_lock);
4323 osi_Free(rx_connHashTable,
4324 rx_hashTableSize * sizeof(struct rx_connection *));
4325 osi_Free(rx_peerHashTable, rx_hashTableSize * sizeof(struct rx_peer *));
4326 osi_Free(rx_allocedP, sizeof(struct rx_packet) * rx_nPackets);
4328 UNPIN(rx_connHashTable, rx_hashTableSize * sizeof(struct rx_connection *));
4329 UNPIN(rx_peerHashTable, rx_hashTableSize * sizeof(struct rx_peer *));
4330 UNPIN(rx_allocedP, sizeof(struct rx_packet) * rx_nPackets);
4332 rxi_FreeAllPackets();
4334 rxi_dataQuota = RX_MAX_QUOTA;
4335 rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
4338 void *
4339 rx_getServiceRock(struct rx_service *service)
4341 return service->serviceRock;
4344 void
4345 rx_setServiceRock(struct rx_service *service, void *rock)
4347 service->serviceRock = rock;
4350 void *
4351 rx_getConnRock(struct rx_connection *conn)
4353 return conn->rock;
4356 void
4357 rx_setConnRock(struct rx_connection *conn, void *rock)
4359 conn->rock = rock;