2 ****************************************************************************
3 * Copyright IBM Corporation 1988, 1989 - All Rights Reserved *
5 * Permission to use, copy, modify, and distribute this software and its *
6 * documentation for any purpose and without fee is hereby granted, *
7 * provided that the above copyright notice appear in all copies and *
8 * that both that copyright notice and this permission notice appear in *
9 * supporting documentation, and that the name of IBM not be used in *
10 * advertising or publicity pertaining to distribution of the software *
11 * without specific, written prior permission. *
13 * IBM DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL *
14 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL IBM *
15 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY *
16 * DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER *
17 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING *
18 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *
19 ****************************************************************************
22 /* RX: Extended Remote Procedure Call */
30 * quota system: each attached server process must be able to make
31 * progress to avoid system deadlock, so we ensure that we can always
32 * handle the arrival of the next unacknowledged data packet for an
33 * attached call. rxi_dataQuota gives the max # of packets that must be
34 * reserved for active calls for them to be able to make progress, which is
35 * essentially enough to queue up a window-full of packets (the first packet
36 * may be missing, so these may not get read) + the # of packets the thread
37 * may use before reading all of its input (# free must be one more than send
38 * packet quota). Thus, each thread allocates rx_Window+1 (max queued input
39 * packets) + an extra for sending data. The system also reserves
40 * RX_MAX_QUOTA (must be more than RX_PACKET_QUOTA[i], which is 10), so that
41 * the extra packet can be sent (must be under the system-wide send packet
42 * quota to send any packets)
44 /* # to reserve so that thread with input can still make calls (send packets)
46 long rx_tq_dropped
= 0; /* Solaris only temp variable */
47 long rxi_dataQuota
= RX_MAX_QUOTA
; /* packets to reserve for active
51 * Variables for handling the minProcs implementation. availProcs gives the
52 * number of threads available in the pool at this moment (not counting dudes
53 * executing right now). totalMin gives the total number of procs required
54 * for handling all minProcs requests. minDeficit is a dynamic variable
55 * tracking the # of procs required to satisfy all of the remaining minProcs
59 long rxi_availProcs
= 0; /* number of threads in the pool */
60 long rxi_totalMin
; /* Sum(minProcs) forall services */
61 long rxi_minDeficit
= 0; /* number of procs needed to handle
63 long Rx0
= 0, Rx1
= 0;
65 struct rx_serverQueueEntry
*rx_waitForPacket
= 0;
66 struct rx_packet
*rx_allocedP
= 0;
68 /* ------------Exported Interfaces------------- */
72 * This function allows rxkad to set the epoch to a suitably random number
73 * which rx_NewConnection will use in the future. The principle purpose is to
74 * get rxnull connections to use the same epoch as the rxkad connections do, at
75 * least once the first rxkad connection is established. This is important now
76 * that the host/port addresses aren't used in FindConnection: the uniqueness
77 * of epoch/cid matters and the start time won't do.
81 rx_SetEpoch(uint32_t epoch
)
87 * Initialize rx. A port number may be mentioned, in which case this
88 * becomes the default port number for any service installed later.
89 * If 0 is provided for the port number, a random port will be chosen
90 * by the kernel. Whether this will ever overlap anything in
91 * /etc/services is anybody's guess... Returns 0 on success, -1 on
94 static int rxinit_status
= 1;
97 rx_Init(uint16_t port
)
100 char *htable
, *ptable
;
105 if (rxinit_status
!= 1)
106 return rxinit_status
; /* Already done; return previous error
110 * Allocate and initialize a socket for client and perhaps server
113 rx_socket
= rxi_GetUDPSocket(port
, &rport
);
114 rx_socket_icmp
= rxi_GetICMPSocket();
116 if (rx_socket
== OSI_NULLSOCKET
) {
119 #if defined(AFS_GLOBAL_SUNLOCK) && defined(KERNEL)
120 LOCK_INIT(&afs_rxglobal_lock
, "afs_rxglobal_lock");
122 #ifdef RX_ENABLE_LOCKS
123 LOCK_INIT(&rx_freePktQ_lock
, "rx_freePktQ_lock");
124 LOCK_INIT(&freeSQEList_lock
, "freeSQEList lock");
125 LOCK_INIT(&rx_waitingForPackets_lock
, "rx_waitingForPackets lock");
126 LOCK_INIT(&rx_freeCallQueue_lock
, "rx_waitingForPackets lock");
130 rx_connDeadTime
= 12;
131 memset((char *) &rx_stats
, 0, sizeof(struct rx_stats
));
133 htable
= (char *) osi_Alloc(rx_hashTableSize
*
134 sizeof(struct rx_connection
*));
135 PIN(htable
, rx_hashTableSize
*
136 sizeof(struct rx_connection
*)); /* XXXXX */
137 memset(htable
, 0, rx_hashTableSize
*
138 sizeof(struct rx_connection
*));
139 ptable
= (char *) osi_Alloc(rx_hashTableSize
* sizeof(struct rx_peer
*));
140 PIN(ptable
, rx_hashTableSize
* sizeof(struct rx_peer
*)); /* XXXXX */
141 memset(ptable
, 0, rx_hashTableSize
* sizeof(struct rx_peer
*));
146 * *Slightly* random start time for the cid. This is just to help
147 * out with the hashing function at the peer
150 rx_stats
.minRtt
.sec
= 9999999;
151 rx_SetEpoch(tv
.tv_sec
); /*
152 * Start time of this package, rxkad
153 * will provide a randomer value.
155 rxi_dataQuota
+= rx_extraQuota
; /*
156 * + extra packets caller asked to
159 rx_nPackets
= rx_extraPackets
+ RX_MAX_QUOTA
+ 2; /* fudge */
160 rx_nextCid
= ((tv
.tv_sec
^ tv
.tv_usec
) << RX_CIDSHIFT
);
161 rx_connHashTable
= (struct rx_connection
**) htable
;
162 rx_peerHashTable
= (struct rx_peer
**) ptable
;
165 rxevent_Init(20, rxi_ReScheduleEvents
);
167 /* Malloc up a bunch of packet buffers */
169 queue_Init(&rx_freePacketQueue
);
171 queue_Init(&rx_freeCbufQueue
);
173 rxi_MorePackets(rx_nPackets
);
174 rxi_MoreCbufs(rx_nPackets
);
178 rx_lastAckDelay
.sec
= 0;
179 rx_lastAckDelay
.usec
= 400000; /* 400 ms */
181 rx_hardAckDelay
.sec
= 0;
182 rx_hardAckDelay
.usec
= 100000; /* 100 ms */
183 rx_softAckDelay
.sec
= 0;
184 rx_softAckDelay
.usec
= 100000; /* 100 ms */
185 #endif /* SOFT_ACK */
187 /* Initialize various global queues */
188 queue_Init(&rx_idleServerQueue
);
189 queue_Init(&rx_incomingCallQueue
);
190 queue_Init(&rx_freeCallQueue
);
193 * Start listener process (exact function is dependent on the
194 * implementation environment--kernel or user space)
200 return rxinit_status
;
204 * called with unincremented nRequestsRunning to see if it is OK to start
205 * a new thread in this service. Could be "no" for two reasons: over the
206 * max quota, or would prevent others from reaching their min quota.
209 QuotaOK(struct rx_service
* aservice
)
211 /* check if over max quota */
212 if (aservice
->nRequestsRunning
>= aservice
->maxProcs
)
215 /* under min quota, we're OK */
216 if (aservice
->nRequestsRunning
< aservice
->minProcs
)
220 * otherwise, can use only if there are enough to allow everyone to go to
221 * their min quota after this guy starts.
223 if (rxi_availProcs
> rxi_minDeficit
)
231 * This routine must be called if any services are exported. If the
232 * donateMe flag is set, the calling process is donated to the server
236 rx_StartServer(int donateMe
)
238 struct rx_service
*service
;
246 * Start server processes, if necessary (exact function is dependent
247 * on the implementation environment--kernel or user space). DonateMe
248 * will be 1 if there is 1 pre-existing proc, i.e. this one. In this
249 * case, one less new proc will be created rx_StartServerProcs.
251 rxi_StartServerProcs(donateMe
);
254 * count up the # of threads in minProcs, and add set the min deficit to
255 * be that value, too.
257 for (i
= 0; i
< RX_MAX_SERVICES
; i
++) {
258 service
= rx_services
[i
];
259 if (service
== (struct rx_service
*) 0)
261 rxi_totalMin
+= service
->minProcs
;
263 * below works even if a thread is running, since minDeficit would
264 * still have been decremented and later re-incremented.
266 rxi_minDeficit
+= service
->minProcs
;
269 /* Turn on reaping of idle server connections */
270 rxi_ReapConnections();
273 rx_ServerProc(); /* Never returns */
279 * Create a new client connection to the specified service, using the
280 * specified security object to implement the security model for this
283 struct rx_connection
*
284 rx_NewConnection(uint32_t shost
, uint16_t sport
, uint16_t sservice
,
285 struct rx_securityClass
* securityObject
,
286 int serviceSecurityIndex
)
288 int hashindex
, error
;
290 struct rx_connection
*conn
;
293 #if defined(AFS_SGIMP_ENV)
299 dpf(("rx_NewConnection(host %x, port %u, service %u, "
300 "securityObject %x, serviceSecurityIndex %d)\n",
301 shost
, sport
, sservice
, securityObject
, serviceSecurityIndex
));
303 #if defined(AFS_SGIMP_ENV)
305 /* NETPRI protects Cid and Alloc */
308 cid
= (rx_nextCid
+= RX_MAXCALLS
);
309 conn
= rxi_AllocConnection();
310 #if !defined(AFS_SGIMP_ENV)
313 conn
->type
= RX_CLIENT_CONNECTION
;
315 conn
->epoch
= rx_epoch
;
316 conn
->peer
= rxi_FindPeer(shost
, sport
);
317 queue_Append(&conn
->peer
->connQueue
, conn
);
318 conn
->serviceId
= sservice
;
319 conn
->securityObject
= securityObject
;
320 conn
->securityData
= (void *) 0;
321 conn
->securityIndex
= serviceSecurityIndex
;
322 conn
->maxPacketSize
= MIN(conn
->peer
->packetSize
, OLD_MAX_PACKET_SIZE
);
323 rx_SetConnDeadTime(conn
, rx_connDeadTime
);
325 error
= RXS_NewConnection(securityObject
, conn
);
328 hashindex
= CONN_HASH(shost
, sport
, conn
->cid
, conn
->epoch
,
329 RX_CLIENT_CONNECTION
);
330 conn
->next
= rx_connHashTable
[hashindex
];
331 rx_connHashTable
[hashindex
] = conn
;
332 rx_stats
.nClientConns
++;
336 #if defined(AFS_SGIMP_ENV)
344 rxi_SetConnDeadTime(struct rx_connection
* conn
, int seconds
)
347 * This is completely silly; perhaps exponential back off
348 * would be more reasonable? XXXX
350 conn
->secondsUntilDead
= seconds
;
351 conn
->secondsUntilPing
= seconds
/ 6;
352 if (conn
->secondsUntilPing
== 0)
353 conn
->secondsUntilPing
= 1; /* XXXX */
356 /* Destroy the specified connection */
358 rx_DestroyConnection(struct rx_connection
* conn
)
360 struct rx_connection
**conn_ptr
;
369 if (--conn
->refCount
> 0) {
370 /* Busy; wait till the last guy before proceeding */
375 * If the client previously called rx_NewCall, but it is still
376 * waiting, treat this as a running call, and wait to destroy the
377 * connection later when the call completes.
379 if ((conn
->type
== RX_CLIENT_CONNECTION
) &&
380 (conn
->flags
& RX_CONN_MAKECALL_WAITING
)) {
381 conn
->flags
|= RX_CONN_DESTROY_ME
;
385 /* Check for extant references to this connection */
386 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
387 struct rx_call
*call
= conn
->call
[i
];
391 if (conn
->type
== RX_CLIENT_CONNECTION
) {
392 if (call
->delayedAckEvent
) {
394 * Push the final acknowledgment out now--there
395 * won't be a subsequent call to acknowledge the
398 rxevent_Cancel(call
->delayedAckEvent
);
399 rxi_AckAll((struct rxevent
*) 0, call
, 0);
407 * Don't destroy the connection if there are any call
408 * structures still in use
410 conn
->flags
|= RX_CONN_DESTROY_ME
;
414 /* Remove from connection hash table before proceeding */
415 conn_ptr
= &rx_connHashTable
[CONN_HASH(peer
->host
, peer
->port
, conn
->cid
,
416 conn
->epoch
, conn
->type
)];
417 for (; *conn_ptr
; conn_ptr
= &(*conn_ptr
)->next
) {
418 if (*conn_ptr
== conn
) {
419 *conn_ptr
= conn
->next
;
425 * Notify the service exporter, if requested, that this connection
428 if (conn
->type
== RX_SERVER_CONNECTION
&& conn
->service
->destroyConnProc
)
429 (*conn
->service
->destroyConnProc
) (conn
);
431 /* Notify the security module that this connection is being destroyed */
432 RXS_DestroyConnection(conn
->securityObject
, conn
);
434 /* Make sure that the connection is completely reset before deleting it. */
435 rxi_ResetConnection(conn
);
438 if (--conn
->peer
->refCount
== 0)
439 conn
->peer
->idleWhen
= clock_Sec();
441 if (conn
->type
== RX_SERVER_CONNECTION
)
442 rx_stats
.nServerConns
--;
444 rx_stats
.nClientConns
--;
447 RX_MUTEX_DESTROY(&conn
->lock
);
448 rxi_FreeConnection(conn
);
453 * Start a new rx remote procedure call, on the specified connection.
454 * If wait is set to 1, wait for a free call channel; otherwise return
455 * 0. Maxtime gives the maximum number of seconds this call may take,
456 * after rx_MakeCall returns. After this time interval, a call to any
457 * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
460 rx_NewCall(struct rx_connection
* conn
)
463 struct rx_call
*call
;
466 #if defined(AFS_SGIMP_ENV)
471 dpf(("rx_NewCall(conn %x)\n", conn
));
474 #if defined(AFS_SGIMP_ENV)
479 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
480 call
= conn
->call
[i
];
482 if (call
->state
== RX_STATE_DALLY
) {
483 RX_MUTEX_ENTER(&call
->lock
);
485 (*call
->callNumber
)++;
489 call
= rxi_NewCall(conn
, i
);
490 RX_MUTEX_ENTER(&call
->lock
);
494 if (i
< RX_MAXCALLS
) {
497 conn
->flags
|= RX_CONN_MAKECALL_WAITING
;
498 #ifdef RX_ENABLE_LOCKS
499 cv_wait(&conn
->cv
, &conn
->lock
);
505 /* Client is initially in send mode */
506 call
->state
= RX_STATE_ACTIVE
;
507 call
->mode
= RX_MODE_SENDING
;
509 /* remember start time for call in case we have hard dead time limit */
510 call
->startTime
= clock_Sec();
512 /* Turn on busy protocol. */
513 rxi_KeepAliveOn(call
);
515 RX_MUTEX_EXIT(&call
->lock
);
517 #if defined(AFS_SGIMP_ENV)
525 rxi_HasActiveCalls(struct rx_connection
* aconn
)
528 struct rx_call
*tcall
;
533 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
534 if ((tcall
= aconn
->call
[i
]) != NULL
) {
535 if ((tcall
->state
== RX_STATE_ACTIVE
)
536 || (tcall
->state
== RX_STATE_PRECALL
)) {
547 rxi_GetCallNumberVector(const struct rx_connection
* aconn
,
551 struct rx_call
*tcall
;
556 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
557 if ((tcall
= aconn
->call
[i
]) && (tcall
->state
== RX_STATE_DALLY
))
558 alongs
[i
] = aconn
->callNumber
[i
] + 1;
560 alongs
[i
] = aconn
->callNumber
[i
];
567 rxi_SetCallNumberVector(struct rx_connection
* aconn
,
571 struct rx_call
*tcall
;
576 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
577 if ((tcall
= aconn
->call
[i
]) && (tcall
->state
== RX_STATE_DALLY
))
578 aconn
->callNumber
[i
] = alongs
[i
] - 1;
580 aconn
->callNumber
[i
] = alongs
[i
];
587 * Advertise a new service. A service is named locally by a UDP port
588 * number plus a 16-bit service id. Returns (struct rx_service *) 0
592 rx_NewService(uint16_t port
, uint16_t serviceId
, char *serviceName
,
593 struct rx_securityClass
** securityObjects
,
594 int nSecurityObjects
,
595 int32_t (*serviceProc
) (struct rx_call
*))
597 osi_socket asocket
= OSI_NULLSOCKET
;
598 struct rx_securityClass
**sersec
;
599 struct rx_service
*tservice
;
607 if (serviceId
== 0) {
608 osi_Msg(("rx_NewService: service id for service %s is not"
609 " non-zero.\n", serviceName
));
614 osi_Msg(("rx_NewService: A non-zero port must be specified on "
615 "this call if a non-zero port was not provided at Rx "
616 "initialization (service %s).\n", serviceName
));
622 sersec
= (void *)rxi_Alloc(sizeof(*sersec
) * nSecurityObjects
);
623 for (i
= 0; i
< nSecurityObjects
; i
++)
624 sersec
[i
] = securityObjects
[i
];
625 tservice
= rxi_AllocService();
627 for (i
= 0; i
< RX_MAX_SERVICES
; i
++) {
628 struct rx_service
*service
= rx_services
[i
];
631 if (port
== service
->servicePort
) {
632 if (service
->serviceId
== serviceId
) {
634 * The identical service has already been
635 * installed; if the caller was intending to
636 * change the security classes used by this
637 * service, he/she loses.
639 osi_Msg(("rx_NewService: tried to install service %s "
640 "with service id %d, which is already in use "
641 "for service %s\n", serviceName
, serviceId
,
642 service
->serviceName
));
644 rxi_FreeService(tservice
);
648 * Different service, same port: re-use the socket which is
649 * bound to the same port
651 asocket
= service
->socket
;
655 if (asocket
== OSI_NULLSOCKET
) {
658 * If we don't already have a socket (from another service on
659 * same port) get a new one
661 asocket
= rxi_GetUDPSocket(port
, NULL
);
662 if (asocket
== OSI_NULLSOCKET
) {
664 rxi_FreeService(tservice
);
669 service
->socket
= asocket
;
670 service
->servicePort
= port
;
671 service
->serviceId
= serviceId
;
672 service
->serviceName
= serviceName
;
673 service
->nSecurityObjects
= nSecurityObjects
;
674 service
->securityObjects
= sersec
;
675 service
->minProcs
= 0;
676 service
->maxProcs
= 1;
677 service
->idleDeadTime
= 60;
678 service
->connDeadTime
= rx_connDeadTime
;
679 service
->executeRequestProc
= serviceProc
;
680 rx_services
[i
] = service
; /* not visible until now */
681 for (i
= 0; i
< nSecurityObjects
; i
++) {
682 if (securityObjects
[i
])
683 RXS_NewService(securityObjects
[i
], service
, reuse
);
690 rxi_FreeService(tservice
);
691 osi_Msg(("rx_NewService: cannot support > %d services\n",
697 * This is the server process's request loop. This routine should
698 * either be each server proc's entry point, or it should be called by
699 * the server process (depending upon the rx implementation
705 struct rx_call
*call
;
707 struct rx_service
*tservice
;
709 #if defined(AFS_SGIMP_ENV)
717 rxi_dataQuota
+= rx_Window
+ 2; /*
718 * reserve a window of packets for
721 rxi_MorePackets(rx_Window
+ 2); /* alloc more packets, too */
723 * one more thread handling incoming
726 #if defined(AFS_SGIMP_ENV)
733 if (afs_termState
== AFSOP_STOP_RXCALLBACK
) {
734 RX_MUTEX_ENTER(&afs_termStateLock
);
735 afs_termState
= AFSOP_STOP_AFS
;
736 #ifdef RX_ENABLE_LOCKS
737 cv_signal(&afs_termStateCv
);
739 osi_rxWakeup(&afs_termState
);
741 RX_MUTEX_EXIT(&afs_termStateLock
);
745 tservice
= call
->conn
->service
;
746 #if defined(AFS_SGIMP_ENV)
749 if (tservice
->beforeProc
)
750 (*tservice
->beforeProc
) (call
);
752 code
= call
->conn
->service
->executeRequestProc(call
);
754 if (tservice
->afterProc
)
755 (*tservice
->afterProc
) (call
, code
);
757 rx_EndCall(call
, code
);
759 #if defined(AFS_SGIMP_ENV)
767 * Sleep until a call arrives. Returns a pointer to the call, ready
773 struct rx_serverQueueEntry
*sq
;
774 struct rx_call
*call
= (struct rx_call
*) 0;
775 struct rx_service
*service
= NULL
;
780 RX_MUTEX_ENTER(&freeSQEList_lock
);
782 if ((sq
= rx_FreeSQEList
) != NULL
) {
783 rx_FreeSQEList
= *(struct rx_serverQueueEntry
**) sq
;
784 RX_MUTEX_EXIT(&freeSQEList_lock
);
785 } else { /* otherwise allocate a new one and
787 RX_MUTEX_EXIT(&freeSQEList_lock
);
788 sq
= (struct rx_serverQueueEntry
*) rxi_Alloc(sizeof(struct rx_serverQueueEntry
));
789 #ifdef RX_ENABLE_LOCKS
790 LOCK_INIT(&sq
->lock
, "server Queue lock");
793 RX_MUTEX_ENTER(&sq
->lock
);
794 #if defined(AFS_SGIMP_ENV)
795 ASSERT(!isafs_glock());
799 if (queue_IsNotEmpty(&rx_incomingCallQueue
)) {
800 struct rx_call
*tcall
, *ncall
;
803 * Scan for eligible incoming calls. A call is not eligible
804 * if the maximum number of calls for its service type are
807 for (queue_Scan(&rx_incomingCallQueue
, tcall
, ncall
, rx_call
)) {
808 service
= tcall
->conn
->service
;
809 if (QuotaOK(service
)) {
817 call
->flags
&= (~RX_CALL_WAIT_PROC
);
818 service
->nRequestsRunning
++;
820 * just started call in minProcs pool, need fewer to maintain
823 if (service
->nRequestsRunning
<= service
->minProcs
)
829 * If there are no eligible incoming calls, add this process
830 * to the idle server queue, to wait for one
833 queue_Append(&rx_idleServerQueue
, sq
);
834 rx_waitForPacket
= sq
;
836 #ifdef RX_ENABLE_LOCKS
837 cv_wait(&sq
->cv
, &sq
->lock
);
842 if (afs_termState
== AFSOP_STOP_RXCALLBACK
) {
845 return (struct rx_call
*) 0;
848 } while (!(call
= sq
->newcall
));
850 RX_MUTEX_EXIT(&sq
->lock
);
852 RX_MUTEX_ENTER(&freeSQEList_lock
);
853 *(struct rx_serverQueueEntry
**) sq
= rx_FreeSQEList
;
855 RX_MUTEX_EXIT(&freeSQEList_lock
);
857 call
->state
= RX_STATE_ACTIVE
;
858 call
->mode
= RX_MODE_RECEIVING
;
859 if (queue_IsEmpty(&call
->rq
)) {
860 /* we can't schedule a call if there's no data!!! */
861 rxi_SendAck(call
, 0, 0, 0, 0, RX_ACK_DELAY
);
863 rxi_calltrace(RX_CALL_START
, call
);
864 dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
865 call
->conn
->service
->servicePort
,
866 call
->conn
->service
->serviceId
, call
));
877 * Establish a procedure to be called when a packet arrives for a
878 * call. This routine will be called at most once after each call,
879 * and will also be called if there is an error condition on the or
880 * the call is complete. Used by multi rx to build a selection
881 * function which determines which of several calls is likely to be a
882 * good one to read from.
883 * NOTE: the way this is currently implemented it is probably only a
884 * good idea to (1) use it immediately after a newcall (clients only)
885 * and (2) only use it once. Other uses currently void your warranty
888 rx_SetArrivalProc(struct rx_call
* call
, void (*proc
) (),
889 void *handle
, void *arg
)
891 call
->arrivalProc
= proc
;
892 call
->arrivalProcHandle
= handle
;
893 call
->arrivalProcArg
= arg
;
897 * Call is finished (possibly prematurely). Return rc to the peer, if
898 * appropriate, and return the final error code from the conversation
902 rx_EndCall(struct rx_call
* call
, int32_t rc
)
904 struct rx_connection
*conn
= call
->conn
;
905 struct rx_service
*service
;
909 #if defined(AFS_SGIMP_ENV)
914 dpf(("rx_EndCall(call %x)\n", call
));
916 #if defined(AFS_SGIMP_ENV)
921 RX_MUTEX_ENTER(&call
->lock
);
923 call
->arrivalProc
= (void (*) ()) 0;
924 if (rc
&& call
->error
== 0) {
925 rxi_CallError(call
, rc
);
927 * Send an abort message to the peer if this error code has
928 * only just been set. If it was set previously, assume the
929 * peer has already been sent the error code or will request it
931 rxi_SendCallAbort(call
, (struct rx_packet
*) 0);
933 if (conn
->type
== RX_SERVER_CONNECTION
) {
934 /* Make sure reply or at least dummy reply is sent */
935 if (call
->mode
== RX_MODE_RECEIVING
) {
936 RX_MUTEX_EXIT(&call
->lock
);
939 rx_Write(call
, 0, 0);
942 RX_MUTEX_ENTER(&call
->lock
);
944 if (call
->mode
== RX_MODE_SENDING
) {
947 service
= conn
->service
;
948 service
->nRequestsRunning
--;
949 if (service
->nRequestsRunning
< service
->minProcs
)
952 rxi_calltrace(RX_CALL_END
, call
);
953 } else { /* Client connection */
957 * Make sure server receives input packets, in the case where
958 * no reply arguments are expected
960 if (call
->mode
== RX_MODE_SENDING
) {
961 RX_MUTEX_EXIT(&call
->lock
);
964 (void) rx_Read(call
, &dummy
, 1);
967 RX_MUTEX_ENTER(&call
->lock
);
969 if (conn
->flags
& RX_CONN_MAKECALL_WAITING
) {
970 conn
->flags
&= (~RX_CONN_MAKECALL_WAITING
);
971 #ifdef RX_ENABLE_LOCKS
972 cv_signal(&conn
->cv
);
978 call
->state
= RX_STATE_DALLY
;
982 * currentPacket, nLeft, and NFree must be zeroed here, because
983 * ResetCall cannot: ResetCall may be called at splnet(), in the
984 * kernel version, and may interrupt the macros rx_Read or
985 * rx_Write, which run at normal priority for efficiency.
987 if (call
->currentPacket
) {
988 rxi_FreePacket(call
->currentPacket
);
989 call
->currentPacket
= (struct rx_packet
*) 0;
990 call
->nLeft
= call
->nFree
= 0;
992 call
->nLeft
= call
->nFree
= 0;
995 RX_MUTEX_EXIT(&call
->lock
);
998 #if defined(AFS_SGIMP_ENV)
1002 * Map errors to the local host's errno.h format.
1004 error
= ntoh_syserr_conv(error
);
1009 * Call this routine when shutting down a server or client (especially
1010 * clients). This will allow Rx to gracefully garbage collect server
1011 * connections, and reduce the number of retries that a server might
1012 * make to a dead client
1017 struct rx_connection
**conn_ptr
, **conn_end
;
1021 if (rx_connHashTable
)
1022 for (conn_ptr
= &rx_connHashTable
[0],
1023 conn_end
= &rx_connHashTable
[rx_hashTableSize
];
1024 conn_ptr
< conn_end
; conn_ptr
++) {
1025 struct rx_connection
*conn
, *next
;
1027 for (conn
= *conn_ptr
; conn
; conn
= next
) {
1029 if (conn
->type
== RX_CLIENT_CONNECTION
)
1030 rx_DestroyConnection(conn
);
1036 fclose(rx_debugFile
);
1037 rx_debugFile
= NULL
;
1045 * if we wakeup packet waiter too often, can get in loop with two
1046 * AllocSendPackets each waking each other up (from ReclaimPacket calls)
1049 rxi_PacketsUnWait(void)
1052 RX_MUTEX_ENTER(&rx_waitingForPackets_lock
);
1053 if (!rx_waitingForPackets
) {
1054 RX_MUTEX_EXIT(&rx_waitingForPackets_lock
);
1057 if (rxi_OverQuota(RX_PACKET_CLASS_SEND
)) {
1058 RX_MUTEX_EXIT(&rx_waitingForPackets_lock
);
1059 return; /* still over quota */
1061 rx_waitingForPackets
= 0;
1062 #ifdef RX_ENABLE_LOCKS
1063 cv_signal(&rx_waitingForPackets_cv
);
1065 osi_rxWakeup(&rx_waitingForPackets
);
1067 RX_MUTEX_EXIT(&rx_waitingForPackets_lock
);
1072 /* ------------------Internal interfaces------------------------- */
1075 * Return this process's service structure for the
1076 * specified socket and service
1078 static struct rx_service
*
1079 rxi_FindService(osi_socket asocket
, uint16_t serviceId
)
1081 struct rx_service
**sp
;
1083 for (sp
= &rx_services
[0]; *sp
; sp
++) {
1084 if ((*sp
)->serviceId
== serviceId
&& (*sp
)->socket
== asocket
)
1091 * Allocate a call structure, for the indicated channel of the
1092 * supplied connection. The mode and state of the call must be set by
1096 rxi_NewCall(struct rx_connection
* conn
, int channel
)
1098 struct rx_call
*call
;
1101 * Grab an existing call structure, or allocate a new one.
1102 * Existing call structures are assumed to have been left reset by
1105 RX_MUTEX_ENTER(&rx_freeCallQueue_lock
);
1107 if (queue_IsNotEmpty(&rx_freeCallQueue
)) {
1108 call
= queue_First(&rx_freeCallQueue
, rx_call
);
1110 rx_stats
.nFreeCallStructs
--;
1111 RX_MUTEX_EXIT(&rx_freeCallQueue_lock
);
1112 RX_MUTEX_ENTER(&call
->lock
);
1115 /* Bind the call to its connection structure */
1119 call
= (struct rx_call
*) rxi_Alloc(sizeof(struct rx_call
));
1121 RX_MUTEX_EXIT(&rx_freeCallQueue_lock
);
1122 RX_MUTEX_INIT(&call
->lock
, "call lock", RX_MUTEX_DEFAULT
, NULL
);
1123 RX_MUTEX_INIT(&call
->lockw
, "call wlock", RX_MUTEX_DEFAULT
, NULL
);
1124 RX_MUTEX_ENTER(&call
->lock
);
1126 rx_stats
.nCallStructs
++;
1127 /* Initialize once-only items */
1128 queue_Init(&call
->tq
);
1129 queue_Init(&call
->rq
);
1131 /* Bind the call to its connection structure (prereq for reset) */
1134 rxi_ResetCall(call
);
1137 /* Bind the call to its connection structure (prereq for reset) */
1140 call
->channel
= channel
;
1141 call
->callNumber
= &conn
->callNumber
[channel
];
1143 * Note that the next expected call number is retained (in
1144 * conn->callNumber[i]), even if we reallocate the call structure
1146 conn
->call
[channel
] = call
;
1148 * if the channel's never been used (== 0), we should start at 1, otherwise
1149 * the call number is valid from the last time this channel was used
1151 if (*call
->callNumber
== 0)
1152 *call
->callNumber
= 1;
1154 RX_MUTEX_EXIT(&call
->lock
);
1159 * A call has been inactive long enough that so we can throw away
1160 * state, including the call structure, which is placed on the call
1164 rxi_FreeCall(struct rx_call
* call
)
1166 int channel
= call
->channel
;
1167 struct rx_connection
*conn
= call
->conn
;
1170 RX_MUTEX_ENTER(&call
->lock
);
1172 if (call
->state
== RX_STATE_DALLY
)
1173 (*call
->callNumber
)++;
1174 rxi_ResetCall(call
);
1175 call
->conn
->call
[channel
] = (struct rx_call
*) 0;
1177 RX_MUTEX_ENTER(&rx_freeCallQueue_lock
);
1179 queue_Append(&rx_freeCallQueue
, call
);
1180 rx_stats
.nFreeCallStructs
++;
1182 RX_MUTEX_EXIT(&rx_freeCallQueue_lock
);
1183 RX_MUTEX_EXIT(&call
->lock
);
1186 * Destroy the connection if it was previously slated for
1187 * destruction, i.e. the Rx client code previously called
1188 * rx_DestroyConnection (client connections), or
1189 * rxi_ReapConnections called the same routine (server
1190 * connections). Only do this, however, if there are no
1193 if (conn
->flags
& RX_CONN_DESTROY_ME
) {
1197 rx_DestroyConnection(conn
);
1202 long rxi_Alloccnt
= 0, rxi_Allocsize
= 0;
1210 rxi_Allocsize
+= size
;
1211 p
= osi_Alloc(size
);
1213 osi_Panic("rxi_Alloc error");
1219 rxi_Free(void *addr
, int size
)
1222 rxi_Allocsize
-= size
;
1223 #if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && defined(KERNEL)
1224 osi_FreeSmall(addr
);
1226 osi_Free(addr
, size
);
1231 * Find the peer process represented by the supplied (host,port)
1232 * combination. If there is no appropriate active peer structure, a
1233 * new one will be allocated and initialized
1236 rxi_FindPeer(uint32_t host
, uint16_t port
)
1241 hashIndex
= PEER_HASH(host
, port
);
1242 for (pp
= rx_peerHashTable
[hashIndex
]; pp
; pp
= pp
->next
) {
1243 if ((pp
->host
== host
) && (pp
->port
== port
))
1247 pp
= rxi_AllocPeer(); /*
1248 * This bzero's *pp: anything not
1252 * set here or in InitPeerParams is
1256 queue_Init(&pp
->congestionQueue
);
1257 queue_Init(&pp
->connQueue
);
1258 pp
->next
= rx_peerHashTable
[hashIndex
];
1259 rx_peerHashTable
[hashIndex
] = pp
;
1260 rxi_InitPeerParams(pp
);
1261 rx_stats
.nPeerStructs
++;
1268 * Remove `peer' from the hash table.
1272 rxi_RemovePeer(struct rx_peer
*peer
)
1274 struct rx_peer
**peer_ptr
;
1276 for (peer_ptr
= &rx_peerHashTable
[PEER_HASH(peer
->host
, peer
->port
)];
1277 *peer_ptr
; peer_ptr
= &(*peer_ptr
)->next
) {
1278 if (*peer_ptr
== peer
) {
1279 *peer_ptr
= peer
->next
;
1286 * Destroy the specified peer structure, removing it from the peer hash
1291 rxi_DestroyPeer(struct rx_peer
* peer
)
1293 rxi_RemovePeer(peer
);
1294 assert(queue_IsEmpty(&peer
->connQueue
));
1296 rx_stats
.nPeerStructs
--;
1300 * Add `peer' to the hash table.
1303 static struct rx_peer
*
1304 rxi_InsertPeer(struct rx_peer
*peer
)
1309 hashIndex
= PEER_HASH(peer
->host
, peer
->port
);
1310 for (pp
= rx_peerHashTable
[hashIndex
]; pp
; pp
= pp
->next
) {
1311 if ((pp
->host
== peer
->host
) && (pp
->port
== peer
->port
))
1315 struct rx_connection
*conn
, *next
;
1317 pp
->refCount
+= peer
->refCount
;
1318 pp
->nSent
+= peer
->nSent
;
1319 pp
->reSends
+= peer
->reSends
;
1321 for (queue_Scan(&peer
->connQueue
, conn
, next
, rx_connection
)) {
1324 queue_Append(&pp
->connQueue
, conn
);
1327 assert(queue_IsEmpty(&peer
->connQueue
));
1329 rx_stats
.nPeerStructs
--;
1332 peer
->next
= rx_peerHashTable
[hashIndex
];
1333 rx_peerHashTable
[hashIndex
] = peer
;
1339 * Change the key of a given peer
1342 static struct rx_peer
*
1343 rxi_ChangePeer(struct rx_peer
*peer
, uint32_t host
, uint16_t port
)
1345 rxi_RemovePeer(peer
);
1348 return rxi_InsertPeer(peer
);
1352 * Find the connection at (host, port) started at epoch, and with the
1353 * given connection id. Creates the server connection if necessary.
1354 * The type specifies whether a client connection or a server
1355 * connection is desired. In both cases, (host, port) specify the
1356 * peer's (host, pair) pair. Client connections are not made
1357 * automatically by this routine. The parameter socket gives the
1358 * socket descriptor on which the packet was received. This is used,
1359 * in the case of server connections, to check that *new* connections
1360 * come via a valid (port, serviceId). Finally, the securityIndex
1361 * parameter must match the existing index for the connection. If a
1362 * server connection is created, it will be created using the supplied
1363 * index, if the index is valid for this service
1365 static struct rx_connection
*
1366 rxi_FindConnection(osi_socket asocket
, uint32_t host
,
1367 uint16_t port
, uint16_t serviceId
, uint32_t cid
,
1368 uint32_t epoch
, int type
, u_int securityIndex
)
1371 struct rx_connection
*conn
;
1372 struct rx_peer
*pp
= NULL
;
1374 hashindex
= CONN_HASH(host
, port
, cid
, epoch
, type
);
1375 for (conn
= rx_connHashTable
[hashindex
]; conn
; conn
= conn
->next
) {
1376 if ((conn
->type
== type
) && ((cid
& RX_CIDMASK
) == conn
->cid
) &&
1377 (epoch
== conn
->epoch
) &&
1378 (securityIndex
== conn
->securityIndex
)) {
1381 if (type
== RX_CLIENT_CONNECTION
|| pp
->host
== host
)
1386 if (pp
->host
!= host
|| pp
->port
!= port
)
1387 conn
->peer
= rxi_ChangePeer (pp
, host
, port
);
1389 struct rx_service
*service
;
1391 if (type
== RX_CLIENT_CONNECTION
)
1392 return (struct rx_connection
*) 0;
1393 service
= rxi_FindService(asocket
, serviceId
);
1394 if (!service
|| (securityIndex
>= service
->nSecurityObjects
)
1395 || (service
->securityObjects
[securityIndex
] == 0))
1396 return (struct rx_connection
*) 0;
1397 conn
= rxi_AllocConnection(); /* This bzero's the connection */
1398 #ifdef RX_ENABLE_LOCKS
1399 LOCK_INIT(&conn
->lock
, "conn lock");
1401 conn
->next
= rx_connHashTable
[hashindex
];
1402 rx_connHashTable
[hashindex
] = conn
;
1403 conn
->peer
= rxi_FindPeer(host
, port
);
1404 queue_Append(&conn
->peer
->connQueue
, conn
);
1405 conn
->maxPacketSize
= MIN(conn
->peer
->packetSize
, OLD_MAX_PACKET_SIZE
);
1406 conn
->type
= RX_SERVER_CONNECTION
;
1407 conn
->lastSendTime
= clock_Sec(); /* don't GC immediately */
1408 conn
->epoch
= epoch
;
1409 conn
->cid
= cid
& RX_CIDMASK
;
1410 /* conn->serial = conn->lastSerial = 0; */
1411 /* conn->rock = 0; */
1412 /* conn->timeout = 0; */
1413 conn
->service
= service
;
1414 conn
->serviceId
= serviceId
;
1415 conn
->securityIndex
= securityIndex
;
1416 conn
->securityObject
= service
->securityObjects
[securityIndex
];
1417 rx_SetConnDeadTime(conn
, service
->connDeadTime
);
1418 /* Notify security object of the new connection */
1419 RXS_NewConnection(conn
->securityObject
, conn
);
1420 /* XXXX Connection timeout? */
1421 if (service
->newConnProc
)
1422 (*service
->newConnProc
) (conn
);
1423 rx_stats
.nServerConns
++;
1430 * Force a timeout on the connection for `host', `port'.
1434 rxi_KillConnection(uint32_t host
, u_short port
)
1436 struct rx_connection
*conn
;
1439 for (hashindex
= 0; hashindex
< rx_hashTableSize
; hashindex
++) {
1440 for (conn
= rx_connHashTable
[hashindex
]; conn
; conn
= conn
->next
) {
1441 if (htonl(conn
->peer
->host
) == host
&&
1442 htons(conn
->peer
->port
) == port
)
1444 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
1445 struct rx_call
*call
= conn
->call
[i
];
1446 if (call
&& call
->state
== RX_STATE_ACTIVE
)
1447 rxi_CallError(call
, RX_CALL_TIMEOUT
);
1455 * There are two packet tracing routines available for testing and monitoring
1456 * Rx. One is called just after every packet is received and the other is
1457 * called just before every packet is sent. Received packets, have had their
1458 * headers decoded, and packets to be sent have not yet had their headers
1459 * encoded. Both take two parameters: a pointer to the packet and a sockaddr
1460 * containing the network address. Both can be modified. The return value, if
1461 * non-zero, indicates that the packet should be dropped.
1464 int (*rx_justReceived
)() = NULL
;
1465 int (*rx_almostSent
)() = NULL
;
1468 * A packet has been received off the interface. Np is the packet, socket is
1469 * the socket number it was received from (useful in determining which service
1470 * this packet corresponds to), and (host, port) reflect the host,port of the
1471 * sender. This call returns the packet to the caller if it is finished with
1472 * it, rather than de-allocating it, just as a small performance hack
1476 rxi_ReceivePacket(struct rx_packet
* np
, osi_socket asocket
,
1477 uint32_t host
, uint16_t port
)
1479 struct rx_call
*call
;
1480 struct rx_connection
*conn
;
1482 unsigned long currentCallNumber
;
1490 struct rx_packet
*tnp
;
1494 * We don't print out the packet until now because (1) the time may not be
1495 * accurate enough until now in the lwp implementation (rx_Listener only gets
1496 * the time after the packet is read) and (2) from a protocol point of view,
1497 * this is the first time the packet has been seen
1499 packetType
= (np
->header
.type
> 0 && np
->header
.type
< RX_N_PACKET_TYPES
)
1500 ? rx_packetTypes
[np
->header
.type
- 1] : "*UNKNOWN*";
1501 dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
1502 np
->header
.serial
, packetType
, host
, port
, np
->header
.serviceId
,
1503 (int)np
->header
.epoch
, np
->header
.cid
, np
->header
.callNumber
,
1504 np
->header
.seq
, np
->header
.flags
, np
));
1507 if (np
->header
.type
== RX_PACKET_TYPE_VERSION
)
1508 return rxi_ReceiveVersionPacket(np
, asocket
, host
, port
);
1510 if (np
->header
.type
== RX_PACKET_TYPE_DEBUG
)
1511 return rxi_ReceiveDebugPacket(np
, asocket
, host
, port
);
1516 * If an input tracer function is defined, call it with the packet and
1517 * network address. Note this function may modify its arguments.
1519 if (rx_justReceived
) {
1520 struct sockaddr_in addr
;
1523 addr
.sin_family
= AF_INET
;
1524 addr
.sin_port
= port
;
1525 addr
.sin_addr
.s_addr
= host
;
1526 drop
= (*rx_justReceived
) (np
, &addr
);
1527 /* drop packet if return value is non-zero */
1530 port
= addr
.sin_port
; /* in case fcn changed addr */
1531 host
= addr
.sin_addr
.s_addr
;
1535 /* If packet was not sent by the client, then *we* must be the client */
1536 type
= ((np
->header
.flags
& RX_CLIENT_INITIATED
) != RX_CLIENT_INITIATED
)
1537 ? RX_CLIENT_CONNECTION
: RX_SERVER_CONNECTION
;
1540 * Find the connection (or fabricate one, if we're the server & if
1541 * necessary) associated with this packet
1543 conn
= rxi_FindConnection(asocket
, host
, port
, np
->header
.serviceId
,
1544 np
->header
.cid
, np
->header
.epoch
, type
,
1545 np
->header
.securityIndex
);
1549 * If no connection found or fabricated, just ignore the packet.
1550 * (An argument could be made for sending an abort packet for
1555 /* compute the max serial number seen on this connection */
1556 if (conn
->maxSerial
< np
->header
.serial
)
1557 conn
->maxSerial
= np
->header
.serial
;
1560 * If the connection is in an error state, send an abort packet and
1561 * ignore the incoming packet
1564 rxi_ConnectionError (conn
, conn
->error
);
1565 /* Don't respond to an abort packet--we don't want loops! */
1566 if (np
->header
.type
!= RX_PACKET_TYPE_ABORT
)
1567 np
= rxi_SendConnectionAbort(conn
, np
);
1571 /* Check for connection-only requests (i.e. not call specific). */
1572 if (np
->header
.callNumber
== 0) {
1573 switch (np
->header
.type
) {
1574 case RX_PACKET_TYPE_ABORT
:
1575 /* What if the supplied error is zero? */
1576 rxi_ConnectionError(conn
, ntohl(rx_SlowGetLong(np
, 0)));
1579 case RX_PACKET_TYPE_CHALLENGE
:
1580 tnp
= rxi_ReceiveChallengePacket(conn
, np
);
1583 case RX_PACKET_TYPE_RESPONSE
:
1584 tnp
= rxi_ReceiveResponsePacket(conn
, np
);
1587 case RX_PACKET_TYPE_PARAMS
:
1588 case RX_PACKET_TYPE_PARAMS
+ 1:
1589 case RX_PACKET_TYPE_PARAMS
+ 2:
1590 /* ignore these packet types for now */
1597 * Should not reach here, unless the peer is broken: send an
1600 rxi_ConnectionError(conn
, RX_PROTOCOL_ERROR
);
1601 tnp
= rxi_SendConnectionAbort(conn
, np
);
1606 channel
= np
->header
.cid
& RX_CHANNELMASK
;
1607 call
= conn
->call
[channel
];
1608 #ifdef RX_ENABLE_LOCKSX
1610 mutex_enter(&call
->lock
);
1612 currentCallNumber
= conn
->callNumber
[channel
];
1614 if (type
== RX_SERVER_CONNECTION
) {/* We're the server */
1615 if (np
->header
.callNumber
< currentCallNumber
) {
1616 rx_stats
.spuriousPacketsRead
++;
1617 #ifdef RX_ENABLE_LOCKSX
1619 mutex_exit(&call
->lock
);
1625 call
= rxi_NewCall(conn
, channel
);
1626 #ifdef RX_ENABLE_LOCKSX
1627 mutex_enter(&call
->lock
);
1629 *call
->callNumber
= np
->header
.callNumber
;
1630 call
->state
= RX_STATE_PRECALL
;
1631 rxi_KeepAliveOn(call
);
1632 } else if (np
->header
.callNumber
!= currentCallNumber
) {
1634 * If the new call cannot be taken right now send a busy and set
1635 * the error condition in this call, so that it terminates as
1636 * quickly as possible
1638 if (call
->state
== RX_STATE_ACTIVE
) {
1639 struct rx_packet
*tp
;
1641 rxi_CallError(call
, RX_CALL_DEAD
);
1642 tp
= rxi_SendSpecial(call
, conn
, np
, RX_PACKET_TYPE_BUSY
, (char *) 0, 0);
1643 #ifdef RX_ENABLE_LOCKSX
1644 mutex_exit(&call
->lock
);
1650 * If the new call can be taken right now (it's not busy) then
1653 rxi_ResetCall(call
);
1654 *call
->callNumber
= np
->header
.callNumber
;
1655 call
->state
= RX_STATE_PRECALL
;
1656 rxi_KeepAliveOn(call
);
1658 /* Continuing call; do nothing here. */
1660 } else { /* we're the client */
1663 * Ignore anything that's not relevant to the current call. If there
1664 * isn't a current call, then no packet is relevant.
1666 if (!call
|| (np
->header
.callNumber
!= currentCallNumber
)) {
1667 rx_stats
.spuriousPacketsRead
++;
1668 #ifdef RX_ENABLE_LOCKSX
1670 mutex_exit(&call
->lock
);
1676 * If the service security object index stamped in the packet does not
1677 * match the connection's security index, ignore the packet
1679 if (np
->header
.securityIndex
!= conn
->securityIndex
) {
1681 #ifdef RX_ENABLE_LOCKSX
1682 mutex_exit(&call
->lock
);
1687 * If we're receiving the response, then all transmit packets are
1688 * implicitly acknowledged. Get rid of them.
1690 if (np
->header
.type
== RX_PACKET_TYPE_DATA
) {
1693 * XXX Hack. Because we can't release the global rx lock when
1694 * sending packets (osi_NetSend) we drop all acks while we're
1695 * traversing the tq in rxi_Start sending packets out because
1696 * packets may move to the freePacketQueue as result of being
1697 * here! So we drop these packets until we're safely out of the
1698 * traversing. Really ugly!
1700 if (call
->flags
& RX_CALL_TQ_BUSY
) {
1702 return np
; /* xmitting; drop packet */
1705 rxi_ClearTransmitQueue(call
);
1707 if (np
->header
.type
== RX_PACKET_TYPE_ACK
) {
1709 * now check to see if this is an ack packet acknowledging
1710 * that the server actually *lost* some hard-acked data. If
1711 * this happens we ignore this packet, as it may indicate that
1712 * the server restarted in the middle of a call. It is also
1713 * possible that this is an old ack packet. We don't abort
1714 * the connection in this case, because this *might* just be
1715 * an old ack packet. The right way to detect a server restart
1716 * in the midst of a call is to notice that the server epoch
1720 * LWSXXX I'm not sure this is exactly right, since tfirst
1721 * LWSXXX **IS** unacknowledged. I think that this is
1722 * LWSXXX off-by-one, but I don't dare change it just yet,
1723 * LWSXXX since it will interact badly with the
1724 * LWSXXX server-restart detection code in receiveackpacket.
1726 if (ntohl(rx_SlowGetLong(np
, FIRSTACKOFFSET
)) < call
->tfirst
) {
1727 rx_stats
.spuriousPacketsRead
++;
1728 #ifdef RX_ENABLE_LOCKSX
1729 mutex_exit(&call
->lock
);
1735 } /* else not a data packet */
1738 /* Set remote user defined status from packet */
1739 call
->remoteStatus
= np
->header
.userStatus
;
1742 * Note the gap between the expected next packet and the actual packet
1743 * that arrived, when the new packet has a smaller serial number than
1744 * expected. Rioses frequently reorder packets all by themselves, so
1745 * this will be quite important with very large window sizes. Skew is
1746 * checked against 0 here to avoid any dependence on the type of
1747 * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is
1748 * always true! The inPacketSkew should be a smoothed running value, not
1749 * just a maximum. MTUXXX see CalculateRoundTripTime for an example of
1750 * how to keep smoothed values. I think using a beta of 1/8 is probably
1751 * appropriate. lws 93.04.21
1753 skew
= conn
->lastSerial
- np
->header
.serial
;
1754 conn
->lastSerial
= np
->header
.serial
;
1756 struct rx_peer
*peer
;
1759 if (skew
> peer
->inPacketSkew
) {
1760 dpf(("*** In skew changed from %d to %d\n",
1761 peer
->inPacketSkew
, skew
));
1762 peer
->inPacketSkew
= skew
;
1765 /* Now do packet type-specific processing */
1766 switch (np
->header
.type
) {
1767 case RX_PACKET_TYPE_DATA
:
1768 np
= rxi_ReceiveDataPacket(call
, np
);
1770 case RX_PACKET_TYPE_ACK
:
1772 * Respond immediately to ack packets requesting acknowledgement
1775 if (np
->header
.flags
& RX_REQUEST_ACK
) {
1777 (void) rxi_SendCallAbort(call
, 0);
1779 (void) rxi_SendAck(call
, 0, 0, 0, 0, RX_ACK_PING_RESPONSE
);
1781 np
= rxi_ReceiveAckPacket(call
, np
);
1783 case RX_PACKET_TYPE_ABORT
:
1785 * An abort packet: reset the connection, passing the error up to
1788 /* XXX What if error is zero? and length of packet is 0 */
1789 rxi_CallError(call
, ntohl(*(uint32_t *) rx_DataOf(np
)));
1791 case RX_PACKET_TYPE_BUSY
:
1794 case RX_PACKET_TYPE_ACKALL
:
1796 * All packets acknowledged, so we can drop all packets previously
1797 * readied for sending
1801 * XXX Hack. We because we can't release the global rx lock
1802 * when sending packets (osi_NetSend) we drop all ack pkts while
1803 * we're traversing the tq in rxi_Start sending packets out
1804 * because packets may move to the freePacketQueue as result of
1805 * being here! So we drop these packets until we're
1806 * safely out of the traversing. Really ugly!
1808 if (call
->flags
& RX_CALL_TQ_BUSY
) {
1810 return np
; /* xmitting; drop packet */
1813 rxi_ClearTransmitQueue(call
);
1817 * Should not reach here, unless the peer is broken: send an abort
1820 rxi_CallError(call
, RX_PROTOCOL_ERROR
);
1821 np
= rxi_SendCallAbort(call
, np
);
1825 * Note when this last legitimate packet was received, for keep-alive
1826 * processing. Note, we delay getting the time until now in the hope that
1827 * the packet will be delivered to the user before any get time is required
1828 * (if not, then the time won't actually be re-evaluated here).
1830 call
->lastReceiveTime
= clock_Sec();
1831 #ifdef RX_ENABLE_LOCKSX
1832 mutex_exit(&call
->lock
);
1839 * return true if this is an "interesting" connection from the point of view
1840 * of someone trying to debug the system
1843 rxi_IsConnInteresting(struct rx_connection
* aconn
)
1846 struct rx_call
*tcall
;
1848 if (aconn
->flags
& (RX_CONN_MAKECALL_WAITING
| RX_CONN_DESTROY_ME
))
1850 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
1851 tcall
= aconn
->call
[i
];
1853 if ((tcall
->state
== RX_STATE_PRECALL
) ||
1854 (tcall
->state
== RX_STATE_ACTIVE
))
1856 if ((tcall
->mode
== RX_MODE_SENDING
) ||
1857 (tcall
->mode
== RX_MODE_RECEIVING
))
1865 * if this is one of the last few packets AND it wouldn't be used by the
1866 * receiving call to immediately satisfy a read request, then drop it on
1867 * the floor, since accepting it might prevent a lock-holding thread from
1868 * making progress in its reading
1872 TooLow(struct rx_packet
* ap
, struct rx_call
* acall
)
1874 if ((rx_nFreePackets
< rxi_dataQuota
+ 2) &&
1875 !((ap
->header
.seq
== acall
->rnext
) &&
1876 (acall
->flags
& RX_CALL_READER_WAIT
))) {
1882 /* try to attach call, if authentication is complete */
1884 TryAttach(struct rx_call
* acall
)
1886 struct rx_connection
*conn
;
1889 if ((conn
->type
== RX_SERVER_CONNECTION
) &&
1890 (acall
->state
== RX_STATE_PRECALL
)) {
1891 /* Don't attach until we have any req'd. authentication. */
1892 if (RXS_CheckAuthentication(conn
->securityObject
, conn
) == 0) {
1893 rxi_AttachServerProc(acall
);
1895 * Note: this does not necessarily succeed; there
1896 * may not any proc available
1899 rxi_ChallengeOn(acall
->conn
);
1905 * A data packet has been received off the interface. This packet is
1906 * appropriate to the call (the call is in the right state, etc.). This
1907 * routine can return a packet to the caller, for re-use
1910 rxi_ReceiveDataPacket(struct rx_call
* call
,
1911 struct rx_packet
* np
)
1913 u_long seq
, serial
, flags
;
1918 seq
= np
->header
.seq
;
1919 serial
= np
->header
.serial
;
1920 flags
= np
->header
.flags
;
1922 rx_stats
.dataPacketsRead
++;
1924 /* If the call is in an error state, send an abort message */
1925 /* XXXX this will send too many aborts for multi-packet calls */
1927 return rxi_SendCallAbort(call
, np
);
1929 if (np
->header
.spare
!= 0)
1930 call
->conn
->flags
|= RX_CONN_USING_PACKET_CKSUM
;
1933 * If there are no packet buffers, drop this new packet, unless we can find
1934 * packet buffers from inactive calls
1936 if (rxi_OverQuota(RX_PACKET_CLASS_RECEIVE
) || TooLow(np
, call
)) {
1937 rx_stats
.noPacketBuffersOnRead
++;
1940 rxi_calltrace(RX_TRACE_DROP
, call
);
1943 /* The usual case is that this is the expected next packet */
1944 if (seq
== call
->rnext
) {
1946 /* Check to make sure it is not a duplicate of one already queued */
1947 if (queue_IsNotEmpty(&call
->rq
)
1948 && queue_First(&call
->rq
, rx_packet
)->header
.seq
== seq
) {
1949 rx_stats
.dupPacketsRead
++;
1950 np
= rxi_SendAck(call
, np
, seq
, serial
, flags
, RX_ACK_DUPLICATE
);
1955 * It's the next packet. Stick it on the receive queue for this call
1957 queue_Prepend(&call
->rq
, np
);
1960 #endif /* SOFT_ACK */
1963 np
= 0; /* We can't use this any more */
1966 * Provide asynchronous notification for those who want it
1969 if (call
->arrivalProc
) {
1970 (*call
->arrivalProc
) (call
, call
->arrivalProcHandle
,
1971 call
->arrivalProcArg
);
1972 call
->arrivalProc
= (void (*) ()) 0;
1974 /* Wakeup the reader, if any */
1975 if (call
->flags
& RX_CALL_READER_WAIT
) {
1976 call
->flags
&= ~RX_CALL_READER_WAIT
;
1977 RX_MUTEX_ENTER(&call
->lockq
);
1979 #ifdef RX_ENABLE_LOCKS
1980 cv_broadcast(&call
->cv_rq
);
1982 osi_rxWakeup(&call
->rq
);
1984 RX_MUTEX_EXIT(&call
->lockq
);
1989 * ACK packet right away, in order to keep the window going, and to
1990 * reduce variability in round-trip-time estimates.
1993 if (flags
& RX_REQUEST_ACK
) {
1996 * Acknowledge, if requested. Also take this opportunity to
1997 * revise MTU estimate.
2000 /* Copy a lower estimate of the MTU from the other end. (cfe) */
2002 * We can figure out what the other end is using by checking out
2003 * the size of its packets, and then adding back the header size.
2004 * We don't count the last packet, since it might be partly empty.
2005 * We shouldn't do this check on every packet, it's overkill.
2006 * Perhaps it would be better done in
2007 * ComputeRate if I decide it's ever worth doing. (lws)
2009 if (!(flags
& RX_LAST_PACKET
) && call
->conn
&& call
->conn
->peer
) {
2011 struct rx_peer
*peer
= call
->conn
->peer
;
2013 length
= np
->length
+ RX_HEADER_SIZE
;
2014 if (length
< peer
->packetSize
) {
2015 dpf(("CONG peer %lx/%u: packetsize %lu=>%lu (rtt %u)",
2016 ntohl(peer
->host
), ntohs(peer
->port
), peer
->packetSize
,
2017 length
, peer
->srtt
));
2018 peer
->packetSize
= length
;
2021 #endif /* MISCMTU */
2022 } else if (flags
& RX_LAST_PACKET
) {
2025 /* Or schedule an acknowledgement later on. */
2026 rxevent_Cancel(call
->delayedAckEvent
);
2027 clock_GetTime(&when
);
2028 clock_Add(&when
, &rx_lastAckDelay
);
2030 call
->delayedAckEvent
= rxevent_Post(&when
, rxi_SendDelayedAck
,
2037 * Provide asynchronous notification for those who want it
2040 if (call
->arrivalProc
) {
2041 (*call
->arrivalProc
) (call
, call
->arrivalProcHandle
,
2042 call
->arrivalProcArg
);
2043 call
->arrivalProc
= (void (*) ()) 0;
2045 /* Wakeup the reader, if any */
2046 RX_MUTEX_ENTER(&call
->lockq
);
2047 if (call
->flags
& RX_CALL_READER_WAIT
) {
2048 call
->flags
&= ~RX_CALL_READER_WAIT
;
2049 #ifdef RX_ENABLE_LOCKS
2050 /* cv_signal(&call->cv_rq);*/
2051 cv_broadcast(&call
->cv_rq
);
2053 osi_rxWakeup(&call
->rq
);
2056 RX_MUTEX_EXIT(&call
->lockq
);
2058 np
= 0; /* We can't use this any more */
2059 #endif /* ADAPT_PERF */
2061 /* Update last packet received */
2065 * If there is no server process serving this call, grab one, if
2070 /* This is not the expected next packet */
2073 * Determine whether this is a new or old packet, and if it's
2074 * a new one, whether it fits into the current receive window.
2075 * It's also useful to know if the packet arrived out of
2076 * sequence, so that we can force an acknowledgement in that
2077 * case. We have a slightly complex definition of
2078 * out-of-sequence: the previous packet number received off
2079 * the wire is remembered. If the new arrival's sequence
2080 * number is less than previous, then previous is reset (to
2081 * 0). MTUXXX This should change slightly if skew is taken into
2082 * consideration. lws 93.04.20
2083 * The new packet is then declared out-of-sequence if
2084 * there are any packets missing between the "previous" packet
2085 * and the one which just arrived (because the missing packets
2086 * should have been filled in between the previous packet and
2087 * the new arrival). This works regardless of whether the
2088 * peer's retransmission algorithm has been invoked, or not
2089 * (i.e. whether this is the first or subsequent pass over the
2090 * sequence of packets). All this assumes that "most" of the
2091 * time, packets are delivered in the same *order* as they are
2092 * transmitted, with, possibly, some packets lost due to
2093 * transmission errors along the way.
2096 u_long prev
; /* "Previous packet" sequence number */
2097 struct rx_packet
*tp
; /* Temporary packet pointer */
2098 struct rx_packet
*nxp
;/*
2099 * Next packet pointer, for queue_Scan
2102 * Number of packets between previous
2107 * If the new packet's sequence number has been sent to the
2108 * application already, then this is a duplicate
2110 if (seq
< call
->rnext
) {
2111 rx_stats
.dupPacketsRead
++;
2112 np
= rxi_SendAck(call
, np
, seq
, serial
, flags
, RX_ACK_DUPLICATE
);
2117 * If the sequence number is greater than what can be
2118 * accomodated by the current window, then send a negative
2119 * acknowledge and drop the packet
2121 if ((call
->rnext
+ call
->rwind
) <= seq
) {
2122 np
= rxi_SendAck(call
, np
, seq
, serial
, flags
,
2123 RX_ACK_EXCEEDS_WINDOW
);
2127 /* Look for the packet in the queue of old received packets */
2131 for (nTwixt
= 0, queue_Scan(&call
->rq
, tp
, nxp
, rx_packet
)) {
2132 /* Check for duplicate packet */
2133 if (seq
== tp
->header
.seq
) {
2134 rx_stats
.dupPacketsRead
++;
2135 np
= rxi_SendAck(call
, np
, seq
, serial
, flags
,
2142 * Count the number of packets received 'twixt the previous
2143 * packet and the new packet
2145 if (tp
->header
.seq
> prev
&& tp
->header
.seq
< seq
)
2149 * If we find a higher sequence packet, break out and insert the
2152 if (seq
< tp
->header
.seq
)
2157 * It's within the window: add it to the the receive queue.
2158 * tp is left by the previous loop either pointing at the
2159 * packet before which to insert the new packet, or at the
2160 * queue head if the queue is empty or the packet should be
2163 queue_InsertBefore(tp
, np
);
2166 #endif /* SOFT_ACK */
2173 * Acknowledge the packet if requested by peer, or we are doing
2176 * Add a timed ack to make sure we send out a ack to before we get
2177 * a request from the client that they send a REQUEST-ACK packet.
2180 /* ack is already taken care of */
2181 } else if (flags
& RX_REQUEST_ACK
) {
2182 rxi_SendAck(call
, 0, seq
, serial
, flags
, RX_ACK_REQUESTED
);
2185 } else if (call
->nSoftAcks
> rxi_SoftAckRate
) {
2186 rxevent_Cancel(call
->delayedAckEvent
);
2187 rxi_SendAck(call
, 0, seq
, serial
, flags
, RX_ACK_IDLE
);
2188 } else if (call
->nSoftAcks
) {
2191 rxevent_Cancel(call
->delayedAckEvent
);
2192 clock_GetTime(&when
);
2193 clock_Add(&when
, &rx_softAckDelay
);
2194 call
->delayedAckEvent
= rxevent_Post(&when
, rxi_SendDelayedAck
,
2196 #endif /* SOFT_ACK */
2203 static void rxi_ComputeRate();
2207 /* Timeout is set to RTT + 4*MDEV. */
2210 update_timeout(struct rx_peer
*peer
)
2213 rtt_timeout
= peer
->srtt
+ 4*peer
->mdev
;
2215 * Add 100ms to hide the effects of unpredictable
2216 * scheduling. 100ms is *very* conservative and should probably be
2217 * much smaller. We don't want to generate any redundant
2218 * retransmits so for now, let's use 100ms.
2220 rtt_timeout
+= 100*1000;
2221 if (rtt_timeout
< 1000) /* 1000 = 1ms */
2222 rtt_timeout
= 1000; /* Minimum timeout */
2223 peer
->timeout
.usec
= rtt_timeout
% 1000000;
2224 peer
->timeout
.sec
= rtt_timeout
/ 1000000;;
2227 /* On a dubious timeout double MDEV but within reason.
2228 * Timeout is limited by 5*RTT.
2232 dubious_timeout(struct rx_peer
*peer
)
2234 if (peer
->mdev
>= peer
->srtt
)
2238 if (peer
->mdev
> peer
->srtt
)
2239 peer
->mdev
= peer
->srtt
;
2240 update_timeout(peer
);
2243 /* The real smarts of the whole thing. Right now somewhat short-changed. */
2245 rxi_ReceiveAckPacket(struct rx_call
* call
, struct rx_packet
* np
)
2247 struct rx_ackPacket
*ap
;
2249 struct rx_packet
*tp
;
2250 struct rx_packet
*nxp
; /*
2251 * Next packet pointer for queue_Scan
2253 struct rx_connection
*conn
= call
->conn
;
2254 struct rx_peer
*peer
= conn
->peer
;
2258 /* because there are CM's that are bogus, sending weird values for this. */
2260 int needRxStart
= 0;
2263 rx_stats
.ackPacketsRead
++;
2264 ap
= (struct rx_ackPacket
*) rx_DataOf(np
);
2265 nbytes
= rx_Contiguous(np
) - ((ap
->acks
) - (u_char
*) ap
);
2267 return np
; /* truncated ack packet */
2269 nAcks
= MIN(nbytes
, ap
->nAcks
); /* depends on ack packet struct */
2270 first
= ntohl(ap
->firstPacket
);
2271 serial
= ntohl(ap
->serial
);
2273 skew
= ntohs(ap
->maxSkew
);
2280 "RACK: reason %x previous %lu seq %lu serial %lu skew %lu "
2281 "first %lu", ap
->reason
,
2282 (unsigned long)ntohl(ap
->previousPacket
),
2283 (unsigned long)np
->header
.seq
,
2284 (unsigned long)serial
,
2285 (unsigned long)skew
,
2286 (unsigned long)ntohl(ap
->firstPacket
));
2290 for (offset
= 0; offset
< nAcks
; offset
++)
2291 putc(ap
->acks
[offset
] == RX_ACK_TYPE_NACK
? '-' : '*', Log
);
2297 #if 0 /* need congestion avoidance stuff first */
2298 if (np
->header
.flags
& RX_SLOW_START_OK
)
2299 call
->flags
|= RX_CALL_SLOW_START_OK
;
2304 * if a server connection has been re-created, it doesn't remember what
2305 * serial # it was up to. An ack will tell us, since the serial field
2306 * contains the largest serial received by the other side
2308 if ((conn
->type
== RX_SERVER_CONNECTION
) && (conn
->serial
< serial
)) {
2309 conn
->serial
= serial
+ 1;
2313 * Update the outgoing packet skew value to the latest value of the
2314 * peer's incoming packet skew value. The ack packet, of course, could
2315 * arrive out of order, but that won't affect things much
2317 peer
->outPacketSkew
= skew
;
2321 * XXX Hack. Because we can't release the global rx lock when sending
2322 * packets (osi_NetSend) we drop all acks while we're traversing the tq in
2323 * rxi_Start sending packets out because packets
2324 * may move to the freePacketQueue as result of being here! So we drop
2325 * these packets until we're safely out of the traversing. Really ugly!
2327 if (call
->flags
& RX_CALL_TQ_BUSY
) {
2329 return np
; /* xmitting; drop packet */
2333 * Check for packets that no longer need to be transmitted, and
2334 * discard them. This only applies to packets positively
2335 * acknowledged as having been sent to the peer's upper level.
2336 * All other packets must be retained. So only packets with
2337 * sequence numbers < ap->firstPacket are candidates.
2339 while (queue_IsNotEmpty(&call
->tq
)) {
2340 tp
= queue_First(&call
->tq
, rx_packet
);
2341 if (tp
->header
.seq
>= first
)
2343 call
->tfirst
= tp
->header
.seq
+ 1;
2344 if (tp
->header
.serial
== serial
) {
2345 if (ap
->reason
!= RX_ACK_DELAY
) {
2347 rxi_ComputeRoundTripTime(tp
, &tp
->timeSent
, peer
);
2349 rxi_ComputeRoundTripTime(tp
, 0, 0);
2353 rxi_ComputeRate(peer
, call
, tp
, np
, ap
->reason
);
2357 else if ((tp
->firstSerial
== serial
)) {
2358 if (ap
->reason
!= RX_ACK_DELAY
)
2359 rxi_ComputeRoundTripTime(tp
, &tp
->firstSent
, peer
);
2361 rxi_ComputeRate(peer
, call
, tp
, np
, ap
->reason
);
2364 #endif /* ADAPT_PERF */
2366 rxi_FreePacket(tp
); /*
2367 * rxi_FreePacket mustn't wake up anyone,
2373 /* Give rate detector a chance to respond to ping requests */
2374 if (ap
->reason
== RX_ACK_PING_RESPONSE
) {
2375 rxi_ComputeRate(peer
, call
, 0, np
, ap
->reason
);
2378 /* "Slow start" every call. */
2379 if (call
->twind
< rx_Window
) call
->twind
+= 1;
2382 * N.B. we don't turn off any timers here. They'll go away by themselves,
2387 * Now go through explicit acks/nacks and record the results in
2388 * the waiting packets. These are packets that can't be released
2389 * yet, even with a positive acknowledge. This positive
2390 * acknowledge only means the packet has been received by the
2391 * peer, not that it will be retained long enough to be sent to
2392 * the peer's upper level. In addition, reset the transmit timers
2393 * of any missing packets (those packets that must be missing
2394 * because this packet was out of sequence)
2397 for (queue_Scan(&call
->tq
, tp
, nxp
, rx_packet
)) {
2400 * Update round trip time if the ack was stimulated on receipt of
2403 if (tp
->header
.serial
== serial
) {
2404 if (ap
->reason
!= RX_ACK_DELAY
) {
2406 rxi_ComputeRoundTripTime(tp
, &tp
->timeSent
, peer
);
2408 rxi_ComputeRoundTripTime(tp
, 0, 0);
2412 rxi_ComputeRate(peer
, call
, tp
, np
, ap
->reason
);
2416 else if ((tp
->firstSerial
== serial
)) {
2417 if (ap
->reason
!= RX_ACK_DELAY
)
2418 rxi_ComputeRoundTripTime(tp
, &tp
->firstSent
, peer
);
2420 rxi_ComputeRate(peer
, call
, tp
, np
, ap
->reason
);
2423 #endif /* ADAPT_PERF */
2426 * Set the acknowledge flag per packet based on the
2427 * information in the ack packet. It's possible for an
2428 * acknowledged packet to be downgraded
2430 if (tp
->header
.seq
< first
+ nAcks
) {
2431 /* Explicit ack information: set it in the packet appropriately */
2432 tp
->acked
= (ap
->acks
[tp
->header
.seq
- first
] == RX_ACK_TYPE_ACK
);
2435 * No ack information: the packet may have been
2436 * acknowledged previously, but that is now rescinded (the
2437 * peer may have reduced the window size)
2445 * If packet isn't yet acked, and it has been transmitted at least
2446 * once, reset retransmit time using latest timeout
2447 * ie, this should readjust the retransmit timer for all outstanding
2448 * packets... So we don't just retransmit when we should know better
2451 if (!tp
->acked
&& tp
->header
.serial
) {
2452 tp
->retryTime
= tp
->timeSent
;
2453 clock_Add(&tp
->retryTime
, &peer
->timeout
);
2454 /* shift by eight because one quarter-sec ~ 256 milliseconds */
2455 clock_Addmsec(&(tp
->retryTime
), ((unsigned long) tp
->backoff
) << 8);
2457 #endif /* ADAPT_PERF */
2460 * If the packet isn't yet acked, and its serial number
2461 * indicates that it was transmitted before the packet which
2462 * prompted the acknowledge (that packet's serial number is
2463 * supplied in the ack packet), then schedule the packet to be
2464 * transmitted *soon*. This is done by resetting the
2465 * retransmit time in the packet to the current time.
2466 * Actually this is slightly more intelligent: to guard
2467 * against packets that have been transmitted out-of-order by
2468 * the network (this even happens on the local token ring with
2469 * our IBM RT's!), the degree of out-of-orderness (skew) of
2470 * the packet is compared against the maximum skew for this
2471 * peer. If it is less, we don't retransmit yet. Note that
2472 * we don't do this for packets with zero serial numbers: they
2473 * never have been transmitted.
2477 * I don't know if we should add in the new retransmit backoff time
2478 * here or not. I think that we should also consider reducing
2479 * the "congestion window" size as an alternative. LWSXXX
2482 if (!tp
->acked
&& tp
->header
.serial
2483 && ((tp
->header
.serial
+ skew
) <= serial
)) {
2484 rx_stats
.dataPacketsPushed
++;
2485 clock_GetTime(&tp
->retryTime
);
2488 dpf(("Pushed packet seq %d serial %d, new time %d.%d\n",
2489 tp
->header
.seq
, tp
->header
.serial
, tp
->retryTime
.sec
,
2490 tp
->retryTime
.usec
/ 1000));
2494 if (ap
->reason
== RX_ACK_DUPLICATE
) {
2496 * Other end receives duplicates because either:
2497 * A. acks where lost
2498 * B. receiver gets scheduled in an unpredictable way
2499 * C. we have a busted timer
2501 * To fix B & C wait for new acks to update srtt and mdev. In
2502 * the meantime, increase mdev to increase the retransmission
2505 dubious_timeout(peer
);
2509 * If the window has been extended by this acknowledge packet,
2510 * then wakeup a sender waiting in alloc for window space, or try
2511 * sending packets now, if he's been sitting on packets due to
2512 * lack of window space
2514 if (call
->tnext
< (call
->tfirst
+ call
->twind
)) {
2515 #ifdef RX_ENABLE_LOCKS
2516 RX_MUTEX_ENTER(&call
->lockw
);
2517 cv_signal(&call
->cv_twind
);
2518 RX_MUTEX_EXIT(&call
->lockw
);
2520 if (call
->flags
& RX_CALL_WAIT_WINDOW_ALLOC
) {
2521 call
->flags
&= ~RX_CALL_WAIT_WINDOW_ALLOC
;
2522 osi_rxWakeup(&call
->twind
);
2525 if (call
->flags
& RX_CALL_WAIT_WINDOW_SEND
) {
2526 call
->flags
&= ~RX_CALL_WAIT_WINDOW_SEND
;
2531 * if the ack packet has a receivelen field hanging off it,
2534 if (np
->length
>= rx_AckDataSize(ap
->nAcks
) + 4) {
2535 unsigned long maxPacketSize
;
2537 rx_packetread(np
, rx_AckDataSize(ap
->nAcks
), 4, &maxPacketSize
);
2538 maxPacketSize
= (unsigned long) ntohl(maxPacketSize
);
2539 dpf(("maxPacketSize=%ul\n", maxPacketSize
));
2542 * sanity check - peer might have restarted with different params.
2543 * If peer says "send less", dammit, send less... Peer should never
2544 * be unable to accept packets of the size that prior AFS versions
2545 * would send without asking.
2547 if (OLD_MAX_PACKET_SIZE
<= maxPacketSize
)
2548 conn
->maxPacketSize
= MIN(maxPacketSize
, conn
->peer
->packetSize
);
2550 /* if (needRxStart) rxi_Start(0, call); */
2551 rxi_Start(0, call
); /* Force rxi_Restart for now: skew
2556 /* Post a new challenge-event, this is to resend lost packets. */
2558 rxi_resend_ChallengeEvent(struct rx_connection
*conn
)
2562 if (conn
->challengeEvent
)
2563 rxevent_Cancel(conn
->challengeEvent
);
2565 clock_GetTime(&when
);
2566 when
.sec
+= RX_CHALLENGE_TIMEOUT
;
2567 conn
->challengeEvent
= rxevent_Post(&when
, rxi_ChallengeEvent
,
2571 /* Received a response to a challenge packet */
2573 rxi_ReceiveResponsePacket(struct rx_connection
* conn
,
2574 struct rx_packet
* np
)
2578 /* Ignore the packet if we're the client */
2579 if (conn
->type
== RX_CLIENT_CONNECTION
)
2582 /* If already authenticated, ignore the packet (it's probably a retry) */
2583 if (RXS_CheckAuthentication(conn
->securityObject
, conn
) == 0)
2586 /* Otherwise, have the security object evaluate the response packet */
2587 error
= RXS_CheckResponse(conn
->securityObject
, conn
, np
);
2588 if (error
== RX_AUTH_REPLY
) {
2589 rxi_SendSpecial(NULL
, conn
, np
, RX_PACKET_TYPE_CHALLENGE
,
2591 rxi_resend_ChallengeEvent(conn
);
2594 * If the response is invalid, reset the connection, sending an abort
2600 rxi_ConnectionError(conn
, error
);
2601 return rxi_SendConnectionAbort(conn
, np
);
2604 * If the response is valid, any calls waiting to attach servers can
2609 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
2610 struct rx_call
*call
= conn
->call
[i
];
2612 if (call
&& (call
->state
== RX_STATE_PRECALL
))
2613 rxi_AttachServerProc(call
);
2620 * A client has received an authentication challenge: the security
2621 * object is asked to cough up a respectable response packet to send
2622 * back to the server. The server is responsible for retrying the
2623 * challenge if it fails to get a response.
2627 rxi_ReceiveChallengePacket(struct rx_connection
* conn
,
2628 struct rx_packet
* np
)
2632 /* Ignore the challenge if we're the server */
2633 if (conn
->type
== RX_SERVER_CONNECTION
)
2637 * Ignore the challenge if the connection is otherwise idle; someone's
2638 * trying to use us as an oracle.
2640 if (!rxi_HasActiveCalls(conn
))
2644 * Send the security object the challenge packet. It is expected to fill
2647 error
= RXS_GetResponse(conn
->securityObject
, conn
, np
);
2650 * If the security object is unable to return a valid response, reset the
2651 * connection and send an abort to the peer. Otherwise send the response
2652 * packet to the peer connection.
2655 rxi_ConnectionError(conn
, error
);
2656 np
= rxi_SendConnectionAbort(conn
, np
);
2658 np
= rxi_SendSpecial((struct rx_call
*) 0, conn
, np
,
2659 RX_PACKET_TYPE_RESPONSE
, (char *) 0, -1);
2666 * Find an available server process to service the current request in
2667 * the given call structure. If one isn't available, queue up this
2668 * call so it eventually gets one
2671 rxi_AttachServerProc(struct rx_call
* call
)
2673 struct rx_serverQueueEntry
*sq
;
2674 struct rx_service
*service
= call
->conn
->service
;
2676 /* May already be attached */
2677 if (call
->state
== RX_STATE_ACTIVE
)
2680 if (!QuotaOK(service
) || queue_IsEmpty(&rx_idleServerQueue
)) {
2682 * If there are no processes available to service this call,
2683 * put the call on the incoming call queue (unless it's
2684 * already on the queue).
2686 if (!(call
->flags
& RX_CALL_WAIT_PROC
)) {
2687 call
->flags
|= RX_CALL_WAIT_PROC
;
2689 rxi_calltrace(RX_CALL_ARRIVAL
, call
);
2690 queue_Append(&rx_incomingCallQueue
, call
);
2693 sq
= queue_First(&rx_idleServerQueue
, rx_serverQueueEntry
);
2695 RX_MUTEX_ENTER(&sq
->lock
);
2699 if (call
->flags
& RX_CALL_WAIT_PROC
) {
2700 /* Conservative: I don't think this should happen */
2701 call
->flags
&= ~RX_CALL_WAIT_PROC
;
2705 call
->state
= RX_STATE_ACTIVE
;
2706 call
->mode
= RX_MODE_RECEIVING
;
2707 if (call
->flags
& RX_CALL_CLEARED
) {
2708 /* send an ack now to start the packet flow up again */
2709 call
->flags
&= ~RX_CALL_CLEARED
;
2710 rxi_SendAck(call
, 0, 0, 0, 0, RX_ACK_DELAY
);
2712 service
->nRequestsRunning
++;
2713 if (service
->nRequestsRunning
<= service
->minProcs
)
2716 #ifdef RX_ENABLE_LOCKS
2721 RX_MUTEX_EXIT(&sq
->lock
);
2726 * Delay the sending of an acknowledge event for a short while, while
2727 * a new call is being prepared (in the case of a client) or a reply
2728 * is being prepared (in the case of a server). Rather than sending
2729 * an ack packet, an ACKALL packet is sent.
2732 rxi_AckAll(struct rxevent
* event
, struct rx_call
* call
, char *dummy
)
2735 call
->delayedAckEvent
= (struct rxevent
*) 0;
2736 rxi_SendSpecial(call
, call
->conn
, (struct rx_packet
*) 0,
2737 RX_PACKET_TYPE_ACKALL
, (char *) 0, 0);
2741 rxi_SendDelayedAck(struct rxevent
* event
, struct rx_call
* call
,
2745 call
->delayedAckEvent
= (struct rxevent
*) 0;
2746 (void) rxi_SendAck(call
, 0, 0, 0, 0, RX_ACK_DELAY
);
2750 * Clear out the transmit queue for the current call (all packets have
2751 * been received by peer)
2754 rxi_ClearTransmitQueue(struct rx_call
* call
)
2756 struct rx_packet
*p
, *tp
;
2758 for (queue_Scan(&call
->tq
, p
, tp
, rx_packet
)) {
2763 rxevent_Cancel(call
->resendEvent
);
2764 call
->tfirst
= call
->tnext
; /*
2765 * implicitly acknowledge all data already sent
2767 RX_MUTEX_ENTER(&call
->lockw
);
2768 #ifdef RX_ENABLE_LOCKS
2769 cv_signal(&call
->cv_twind
);
2771 osi_rxWakeup(&call
->twind
);
2773 RX_MUTEX_EXIT(&call
->lockw
);
2777 rxi_ClearReceiveQueue(struct rx_call
* call
)
2779 struct rx_packet
*p
, *tp
;
2781 for (queue_Scan(&call
->rq
, p
, tp
, rx_packet
)) {
2785 if (call
->state
== RX_STATE_PRECALL
)
2786 call
->flags
|= RX_CALL_CLEARED
;
2789 /* Send an abort packet for the specified call */
2791 rxi_SendCallAbort(struct rx_call
* call
, struct rx_packet
* packet
)
2796 error
= htonl(call
->error
);
2797 packet
= rxi_SendSpecial(call
, call
->conn
, packet
, RX_PACKET_TYPE_ABORT
,
2798 (char *) &error
, sizeof(error
));
2804 * Send an abort packet for the specified connection. Np is an
2805 * optional packet that can be used to send the abort.
2808 rxi_SendConnectionAbort(struct rx_connection
* conn
,
2809 struct rx_packet
* packet
)
2814 error
= htonl(conn
->error
);
2815 packet
= rxi_SendSpecial((struct rx_call
*) 0, conn
, packet
,
2816 RX_PACKET_TYPE_ABORT
, (char *) &error
, sizeof(error
));
2822 * Associate an error all of the calls owned by a connection. Called
2823 * with error non-zero. This is only for really fatal things, like
2824 * bad authentication responses. The connection itself is set in
2825 * error at this point, so that future packets received will be
2829 rxi_ConnectionError(struct rx_connection
* conn
, int32_t error
)
2834 if (conn
->challengeEvent
)
2835 rxevent_Cancel(conn
->challengeEvent
);
2836 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
2837 struct rx_call
*call
= conn
->call
[i
];
2840 rxi_CallError(call
, error
);
2842 conn
->error
= error
;
2843 rx_stats
.fatalErrors
++;
2847 /* Reset all of the calls associated with a connection. */
2849 rxi_ResetConnection(struct rx_connection
* conn
)
2853 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
2854 struct rx_call
*call
= conn
->call
[i
];
2857 rxi_ResetCall(call
);
2860 /* get rid of pending events that could zap us later */
2861 if (conn
->challengeEvent
)
2862 rxevent_Cancel(conn
->challengeEvent
);
2866 rxi_CallError(struct rx_call
* call
, int32_t error
)
2869 error
= call
->error
;
2870 rxi_ResetCall(call
);
2871 call
->error
= error
;
2872 call
->mode
= RX_MODE_ERROR
;
2876 * Reset various fields in a call structure, and wakeup waiting
2877 * processes. Some fields aren't changed: state & mode are not
2878 * touched (these must be set by the caller), and bufptr, nLeft, and
2879 * nFree are not reset, since these fields are manipulated by
2880 * unprotected macros, and may only be reset by non-interrupting code.
2883 /* this code requires that call->conn be set properly as a pre-condition. */
2884 #endif /* ADAPT_WINDOW */
2887 rxi_ResetCall(struct rx_call
* call
)
2891 /* Notify anyone who is waiting for asynchronous packet arrival */
2892 if (call
->arrivalProc
) {
2893 (*call
->arrivalProc
) (call
, call
->arrivalProcHandle
,
2894 call
->arrivalProcArg
);
2895 call
->arrivalProc
= (void (*) ()) 0;
2897 flags
= call
->flags
;
2898 rxi_ClearReceiveQueue(call
);
2899 rxi_ClearTransmitQueue(call
);
2902 call
->rwind
= rx_Window
; /* XXXX */
2904 call
->twind
= call
->conn
->peer
->maxWindow
; /* XXXX */
2906 /* "Slow start" every call. */
2907 call
->twind
= rx_initialWindow
;
2910 call
->tfirst
= call
->rnext
= call
->tnext
= 1;
2912 call
->lastAcked
= 0;
2913 call
->localStatus
= call
->remoteStatus
= 0;
2915 RX_MUTEX_ENTER(&call
->lockq
);
2916 if (flags
& RX_CALL_READER_WAIT
) {
2917 #ifdef RX_ENABLE_LOCKS
2918 /* cv_signal(&call->cv_rq);*/
2919 cv_broadcast(&call
->cv_rq
);
2921 osi_rxWakeup(&call
->rq
);
2924 RX_MUTEX_EXIT(&call
->lockq
);
2925 if (flags
& RX_CALL_WAIT_PACKETS
)
2926 rxi_PacketsUnWait(); /* XXX */
2927 RX_MUTEX_ENTER(&call
->lockw
);
2929 #ifdef RX_ENABLE_LOCKS
2930 cv_signal(&call
->cv_twind
);
2932 if (flags
& RX_CALL_WAIT_WINDOW_ALLOC
)
2933 osi_rxWakeup(&call
->twind
);
2935 RX_MUTEX_EXIT(&call
->lockw
);
2937 if (queue_IsOnQueue(call
)) {
2939 if (flags
& RX_CALL_WAIT_PROC
)
2942 rxi_KeepAliveOff(call
);
2943 rxevent_Cancel(call
->delayedAckEvent
);
2947 * Send an acknowledge for the indicated packet (seq,serial) of the
2948 * indicated call, for the indicated reason (reason). This
2949 * acknowledge will specifically acknowledge receiving the packet, and
2950 * will also specify which other packets for this call have been
2951 * received. This routine returns the packet that was used to the
2952 * caller. The caller is responsible for freeing it or re-using it.
2953 * This acknowledgement also returns the highest sequence number
2954 * actually read out by the higher level to the sender; the sender
2955 * promises to keep around packets that have not been read by the
2956 * higher level yet (unless, of course, the sender decides to abort
2957 * the call altogether). Any of p, seq, serial, pflags, or reason may
2958 * be set to zero without ill effect. That is, if they are zero, they
2959 * will not convey any information.
2960 * NOW there is a trailer field, after the ack where it will safely be
2961 * ignored by mundanes, which indicates the maximum size packet this
2965 rxi_SendAck(struct rx_call
* call
,
2966 struct rx_packet
* optionalPacket
, int seq
, int serial
,
2967 int pflags
, int reason
)
2969 struct rx_call
*call
;
2970 struct rx_packet
*optionalPacket
; /* use to send ack (or null) */
2971 int seq
; /* Sequence number of the packet we
2973 int serial
; /* Serial number of the packet */
2974 int pflags
; /* Flags field from packet header */
2975 int reason
; /* Reason an acknowledge was prompted */
2979 struct rx_ackPacket
*ap
;
2980 struct rx_packet
*rqp
;
2981 struct rx_packet
*nxp
; /* For queue_Scan */
2982 struct rx_packet
*p
;
2986 if (call
->rnext
> call
->lastAcked
)
2987 call
->lastAcked
= call
->rnext
;
2992 rx_computelen(p
, p
->length
); /* reset length, you never know */
2994 /* where that's been... */
2995 else if (!(p
= rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL
)))
2996 osi_Panic("rxi_SendAck");
2999 call
->nSoftAcks
= 0;
3000 call
->nHardAcks
= 0;
3001 #endif /* SOFT_ACK */
3003 templ
= rx_AckDataSize(call
->rwind
) + 4 - rx_GetDataSize(p
);
3005 if (rxi_AllocDataBuf(p
, templ
))
3006 return optionalPacket
;
3007 templ
= rx_AckDataSize(call
->rwind
) + 4;
3008 if (rx_Contiguous(p
) < templ
)
3009 return optionalPacket
;
3010 } /* MTUXXX failing to send an ack is
3011 * very serious. We should */
3012 /* try as hard as possible to send even a partial ack; it's */
3013 /* better than nothing. */
3014 ap
= (struct rx_ackPacket
*) rx_DataOf(p
);
3015 ap
->bufferSpace
= htonl(0); /* Something should go here, sometime */
3016 ap
->reason
= reason
;
3018 /* The skew computation used to bullshit, I think it's better now. */
3019 /* We should start paying attention to skew. XXX */
3020 ap
->serial
= htonl(call
->conn
->maxSerial
);
3021 ap
->maxSkew
= 0; /* used to be peer->inPacketSkew */
3023 ap
->firstPacket
= htonl(call
->rnext
); /*
3024 * First packet not yet forwarded
3027 ap
->previousPacket
= htonl(call
->rprev
); /* Previous packet received */
3030 * No fear of running out of ack packet here because there can only be
3031 * at most one window full of unacknowledged packets. The window size
3032 * must be constrained to be less than the maximum ack size, of course.
3033 * Also, an ack should always fit into a single packet -- it should not
3034 * ever be fragmented.
3036 for (offset
= 0, queue_Scan(&call
->rq
, rqp
, nxp
, rx_packet
)) {
3037 while (rqp
->header
.seq
> call
->rnext
+ offset
)
3038 ap
->acks
[offset
++] = RX_ACK_TYPE_NACK
;
3039 ap
->acks
[offset
++] = RX_ACK_TYPE_ACK
;
3042 p
->length
= rx_AckDataSize(offset
) + 4;
3043 templ
= htonl(rx_maxReceiveSize
);
3044 rx_packetwrite(p
, rx_AckDataSize(offset
), 4, &templ
);
3045 p
->header
.serviceId
= call
->conn
->serviceId
;
3046 p
->header
.cid
= (call
->conn
->cid
| call
->channel
);
3047 p
->header
.callNumber
= *call
->callNumber
;
3048 p
->header
.seq
= seq
;
3049 p
->header
.securityIndex
= call
->conn
->securityIndex
;
3050 p
->header
.epoch
= call
->conn
->epoch
;
3051 p
->header
.type
= RX_PACKET_TYPE_ACK
;
3052 p
->header
.flags
= 0;
3053 if (reason
== RX_ACK_PING
) {
3054 p
->header
.flags
|= RX_REQUEST_ACK
;
3056 clock_GetTime(&call
->pingRequestTime
);
3059 if (call
->conn
->type
== RX_CLIENT_CONNECTION
)
3060 p
->header
.flags
|= RX_CLIENT_INITIATED
;
3065 fprintf(Log
, "SACK: reason %x previous %lu seq %lu first %lu",
3067 (unsigned long)ntohl(ap
->previousPacket
),
3068 (unsigned long)p
->header
.seq
,
3069 (unsigned long)ntohl(ap
->firstPacket
));
3071 for (offset
= 0; offset
< ap
->nAcks
; offset
++)
3072 putc(ap
->acks
[offset
] == RX_ACK_TYPE_NACK
? '-' : '*', Log
);
3079 int i
, nbytes
= p
->length
;
3081 for (i
= 1; i
< p
->niovecs
; i
++) { /* vec 0 is ALWAYS header */
3082 if (nbytes
<= p
->wirevec
[i
].iov_len
) {
3085 savelen
= p
->wirevec
[i
].iov_len
;
3087 p
->wirevec
[i
].iov_len
= nbytes
;
3090 p
->wirevec
[i
].iov_len
= savelen
;
3094 nbytes
-= p
->wirevec
[i
].iov_len
;
3097 rx_stats
.ackPacketsSent
++;
3098 if (!optionalPacket
)
3100 return optionalPacket
; /* Return packet for re-use by caller */
3105 * This routine is called when new packets are readied for
3106 * transmission and when retransmission may be necessary, or when the
3107 * transmission window or burst count are favourable. This should be
3108 * better optimized for new packets, the usual case, now that we've
3109 * got rid of queues of send packets. XXXXXXXXXXX
3112 rxi_Start(struct rxevent
* event
, struct rx_call
* call
)
3115 struct rx_packet
*p
;
3116 struct rx_packet
*nxp
; /* Next pointer for queue_Scan */
3117 struct rx_packet
*lastPacket
;
3118 struct rx_peer
*peer
= call
->conn
->peer
;
3119 struct clock now
, retryTime
;
3123 * If rxi_Start is being called as a result of a resend event,
3124 * then make sure that the event pointer is removed from the call
3125 * structure, since there is no longer a per-call retransmission
3128 if (event
&& event
== call
->resendEvent
)
3129 call
->resendEvent
= 0;
3131 if (queue_IsNotEmpty(&call
->tq
)) { /* If we have anything to send */
3133 * Get clock to compute the re-transmit time for any packets
3134 * in this burst. Note, if we back off, it's reasonable to
3135 * back off all of the packets in the same manner, even if
3136 * some of them have been retransmitted more times than more
3139 clock_GetTime(&now
);
3140 retryTime
= now
; /* initialize before use */
3141 clock_Add(&retryTime
, &peer
->timeout
);
3144 * Send (or resend) any packets that need it, subject to
3145 * window restrictions and congestion burst control
3146 * restrictions. Ask for an ack on the last packet sent in
3147 * this burst. For now, we're relying upon the window being
3148 * considerably bigger than the largest number of packets that
3149 * are typically sent at once by one initial call to
3150 * rxi_Start. This is probably bogus (perhaps we should ask
3151 * for an ack when we're half way through the current
3152 * window?). Also, for non file transfer applications, this
3153 * may end up asking for an ack for every packet. Bogus. XXXX
3155 call
->flags
|= RX_CALL_TQ_BUSY
;
3156 for (lastPacket
= (struct rx_packet
*) 0,
3157 queue_Scan(&call
->tq
, p
, nxp
, rx_packet
)) {
3159 rx_stats
.ignoreAckedPacket
++;
3160 continue; /* Ignore this packet if it has been
3164 * Turn off all flags except these ones, which are the same
3165 * on each transmission
3167 p
->header
.flags
&= RX_PRESET_FLAGS
;
3169 if (p
->header
.seq
>= call
->tfirst
+ call
->twind
) {
3170 call
->flags
|= RX_CALL_WAIT_WINDOW_SEND
; /*
3175 * Note: if we're waiting for more window space, we can
3176 * still send retransmits; hence we don't return here, but
3177 * break out to schedule a retransmit event
3182 * If we're not allowed to send any more in the current
3183 * burst, make sure we get scheduled later. Also schedule
3184 * an event to "give back" the packets we've used, when the
3185 * burst time has elapsed (if we used any packets at all).
3187 /* XXX this need to go away and replaced with congestion
3189 if (peer
->burstSize
&& !peer
->burst
) {
3191 /* Send off the prior packet */
3193 * Don't request an ack if it's a short packet, 'cuz the
3194 * peer will cut down its MTU as a result on 3.3 clients.
3196 if ((lastPacket
->header
.flags
& RX_LAST_PACKET
) == 0) {
3197 if (/* call->cwind <= (u_short)call->ackRate || */
3198 (!(call
->flags
& RX_CALL_SLOW_START_OK
)
3199 && (lastPacket
->header
.seq
& 1))) {
3201 lastPacket
->header
.flags
|= RX_REQUEST_ACK
;
3204 rxi_Send(call
, lastPacket
);
3205 rxi_ScheduleDecongestionEvent(call
, nSent
);
3207 rxi_CongestionWait(call
);
3209 * Note: if we're waiting for congestion to ease, we can't
3210 * send any packets, including retransmits. Hence we do
3211 * not schedule a new retransmit event right now
3213 call
->flags
&= ~RX_CALL_TQ_BUSY
;
3217 * Transmit the packet if it has never been sent, or
3218 * retransmit it if the current timeout for this host for
3219 * this packet has elapsed
3221 if (!clock_IsZero(&p
->retryTime
)) {
3222 struct clock updatedRetryTime
;
3224 if (clock_Lt(&now
, &p
->retryTime
))
3228 * If the RTT has gone up since the packet
3229 * was sent, don't retransmit just yet!
3231 updatedRetryTime
= p
->timeSent
;
3232 clock_Add(&updatedRetryTime
, &peer
->timeout
);
3233 if (clock_Lt(&now
, &updatedRetryTime
)) {
3234 p
->retryTime
= updatedRetryTime
;
3239 * If we have to retransmit chances are that we have a
3240 * busted timer. Increase MDEV to reflect this
3241 * fact. If we are wrong, MDEV will come down quickly
3242 * as new acks arrive.
3244 dubious_timeout(peer
);
3247 * Always request an ack on a retransmitted packet; this
3248 * will help to get the data moving again, especially if
3249 * the packet is near the beginning of the window.
3250 * Actually, XXXX, we may want to do just that: only
3251 * request the acks if the packet is in, say, the first
3252 * half of the window
3254 p
->header
.flags
|= RX_REQUEST_ACK
;
3255 peer
->reSends
++, rx_stats
.dataPacketsReSent
++;
3256 p
->retryTime
= retryTime
;
3259 * if a packet gets dropped, don't keep hammering on it --
3260 * back off exponentially, at least up to a point. I had
3261 * to trade off global congestion avoidance against individual
3262 * performance. Note that most connections will time out
3263 * after 20 - 60 seconds. In pathological cases, retransmits
3264 * must still continue to disperse. For instance, there is
3265 * a condition where the server discards received packets, but
3266 * still sends keep-alives on the call, so the call may live
3267 * much longer than 60 seconds.
3269 if (p
->backoff
< MAXBACKOFF
)
3270 p
->backoff
= (p
->backoff
<< 1) + 1; /* so it can't stay == 0 */
3273 clock_Addmsec(&(p
->retryTime
), ((unsigned long) p
->backoff
) << 8);
3274 /* consider shrinking the packet size? XXXX */
3275 /* no, shrink the burst size. LWSXXX */
3277 peer
->nSent
++, rx_stats
.dataPacketsSent
++;
3278 p
->firstSent
= now
; /* improved RTO calculation- not Karn */
3279 p
->retryTime
= retryTime
;
3281 * Ask for an ack for the first packet on a new
3282 * connection, since it may carry some interesting info
3283 * like maxReceiveSize. It will also help us train to a
3284 * new estimate of RTT, for good or bad. This has one
3285 * neat side effect: since the first call on a connection
3286 * usually triggers a challenge/response exchange, the
3287 * first packet was often retransmitted before the call
3288 * could complete. Getting this ack prevents those
3289 * retransmissions. Admittedly, this is straining at gnats.
3291 if ((p
->header
.callNumber
== 1) && (p
->header
.seq
== 1) &&
3292 (p
->length
>= call
->conn
->maxPacketSize
)) {
3293 p
->header
.flags
|= RX_REQUEST_ACK
;
3298 * Install the new retransmit time for the packet, and
3299 * record the time sent
3304 * Send the previous packet, and remember this one--we don't
3305 * send it immediately, so we can tag it as requiring an ack
3306 * later, if necessary
3308 if (peer
->burstSize
)
3313 * Tag this packet as not being the last in this group,
3314 * for the receiver's benefit
3316 lastPacket
->header
.flags
|= RX_MORE_PACKETS
;
3317 rxi_Send(call
, lastPacket
);
3321 call
->flags
&= ~RX_CALL_TQ_BUSY
;
3324 * If any packets are to be sent, send them and post a
3325 * decongestion event to bump the burst count that we used up
3326 * sending the packets
3331 * we don't ask for an ack on the final packet, since the
3332 * response from the peer implicitly acks it, but we do wait a
3333 * little longer for the ack on the last packet on server conns.
3335 if ((lastPacket
->header
.flags
& RX_LAST_PACKET
) == 0) {
3338 * to get the window up faster we ack every packet as
3339 * long as we are below the fast ack window, or if the
3340 * client doesn't support slow start, every second packet
3342 if (/* call->cwind <= (u_short)call->ackRate || */
3343 (!(call
->flags
& RX_CALL_SLOW_START_OK
)
3344 && (lastPacket
->header
.seq
& 1))) {
3346 lastPacket
->header
.flags
|= RX_REQUEST_ACK
;
3348 } else if (!(lastPacket
->header
.flags
& RX_CLIENT_INITIATED
))
3349 clock_Addmsec(&(lastPacket
->retryTime
), 400);
3351 rxi_Send(call
, lastPacket
);
3352 if (peer
->burstSize
)
3353 rxi_ScheduleDecongestionEvent(call
, nSent
);
3357 * Always post a resend event, if there is anything in the queue, and
3358 * resend is possible. There should be at least one unacknowledged
3359 * packet in the queue ... otherwise none of these packets should be
3360 * on the queue in the first place.
3362 if (call
->resendEvent
) {
3365 * If there's an existing resend event, then if its expiry time
3366 * is sooner than the new one, then it must be less than any
3367 * possible expiry time (because it represents all previous
3368 * packets sent that may still need retransmitting). In this
3369 * case, just leave that event as scheduled
3371 if (clock_Le(&call
->resendEvent
->eventTime
, &retryTime
))
3373 /* Otherwise, cancel the existing event and post a new one */
3374 rxevent_Cancel(call
->resendEvent
);
3378 * Loop to find the earliest event. I *know* XXXX that this can be
3379 * coded more elegantly (perhaps rolled into the above code)
3381 for (haveEvent
= 0, queue_Scan(&call
->tq
, p
, nxp
, rx_packet
)) {
3382 if (!p
->acked
&& !clock_IsZero(&p
->retryTime
)) {
3384 if (clock_Lt(&p
->retryTime
, &retryTime
))
3385 retryTime
= p
->retryTime
;
3389 /* Post a new event to re-run rxi_Start when retries may be needed */
3391 call
->resendEvent
= rxevent_Post(&retryTime
, rxi_Start
, (void *) call
, NULL
);
3397 * Also adjusts the keep alive parameters for the call, to reflect
3398 * that we have just sent a packet (so keep alives aren't sent
3402 rxi_Send(struct rx_call
*call
, struct rx_packet
*p
)
3404 struct rx_connection
*conn
= call
->conn
;
3406 /* Stamp each packet with the user supplied status */
3407 p
->header
.userStatus
= call
->localStatus
;
3410 * Allow the security object controlling this call's security to make any
3411 * last-minute changes to the packet
3413 RXS_SendPacket(conn
->securityObject
, call
, p
);
3415 /* Actually send the packet, filling in more connection-specific fields */
3416 rxi_SendPacket(conn
, p
);
3419 * Update last send time for this call (for keep-alive processing), and
3420 * for the connection (so that we can discover idle connections)
3422 conn
->lastSendTime
= call
->lastSendTime
= clock_Sec();
3425 * Since we've just sent SOME sort of packet to the peer, it's safe to
3426 * nuke any scheduled end-of-packets ack
3428 rxevent_Cancel(call
->delayedAckEvent
);
3433 * Check if a call needs to be destroyed. Called by keep-alive code to ensure
3434 * that things are fine. Also called periodically to guarantee that nothing
3435 * falls through the cracks (e.g. (error + dally) connections have keepalive
3436 * turned off. Returns 0 if conn is well, negativ otherwise.
3437 * -1 means that the call still exists, -2 means that the call is freed.
3441 rxi_CheckCall(struct rx_call
*call
)
3443 struct rx_connection
*conn
= call
->conn
;
3444 struct rx_service
*tservice
;
3450 * These are computed to the second (+- 1 second). But that's good
3451 * enough for these values, which should be a significant number of
3454 if (now
> (call
->lastReceiveTime
+ conn
->secondsUntilDead
)) {
3456 if (call
->state
== RX_STATE_ACTIVE
) {
3457 rxi_CallError(call
, RX_CALL_DEAD
);
3465 * Non-active calls are destroyed if they are not responding to
3466 * pings; active calls are simply flagged in error, so the attached
3467 * process can die reasonably gracefully.
3471 /* see if we have a non-activity timeout */
3472 tservice
= conn
->service
;
3473 if ((conn
->type
== RX_SERVER_CONNECTION
) && call
->startWait
3474 && tservice
->idleDeadTime
3475 && ((call
->startWait
+ tservice
->idleDeadTime
) < now
)) {
3476 if (call
->state
== RX_STATE_ACTIVE
) {
3477 rxi_CallError(call
, RX_CALL_TIMEOUT
);
3481 /* see if we have a hard timeout */
3482 if (conn
->hardDeadTime
&& (now
> (conn
->hardDeadTime
+ call
->startTime
))) {
3483 if (call
->state
== RX_STATE_ACTIVE
)
3484 rxi_CallError(call
, RX_CALL_TIMEOUT
);
3492 * When a call is in progress, this routine is called occasionally to
3493 * make sure that some traffic has arrived (or been sent to) the peer.
3494 * If nothing has arrived in a reasonable amount of time, the call is
3495 * declared dead; if nothing has been sent for a while, we send a
3496 * keep-alive packet (if we're actually trying to keep the call alive)
3499 rxi_KeepAliveEvent(struct rxevent
*event
, struct rx_call
*call
,
3502 struct rx_connection
*conn
= call
->conn
;
3505 call
->keepAliveEvent
= (struct rxevent
*) 0;
3508 if (rxi_CheckCall(call
))
3511 /* Don't try to keep alive dallying calls */
3512 if ((call
->state
!= RX_STATE_DALLY
)
3513 && ((now
- call
->lastSendTime
) > conn
->secondsUntilPing
)) {
3514 /* Don't try to send keepalives if there is unacknowledged data */
3517 * the rexmit code should be good enough, this little hack doesn't
3520 (void) rxi_SendAck(call
, 0, 0, 0, 0, RX_ACK_PING
);
3522 rxi_ScheduleKeepAliveEvent(call
);
3527 rxi_ScheduleKeepAliveEvent(struct rx_call
*call
)
3529 if (!call
->keepAliveEvent
) {
3532 clock_GetTime(&when
);
3533 when
.sec
+= call
->conn
->secondsUntilPing
;
3534 call
->keepAliveEvent
= rxevent_Post(&when
, rxi_KeepAliveEvent
, call
, NULL
);
3538 /* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
3540 rxi_KeepAliveOn(struct rx_call
*call
)
3544 * Pretend last packet received was received now--i.e. if another packet
3545 * isn't received within the keep alive time, then the call will die;
3546 * Initialize last send time to the current time--even if a packet hasn't
3547 * been sent yet. This will guarantee that a keep-alive is sent within
3550 call
->lastReceiveTime
= call
->lastSendTime
= clock_Sec();
3551 rxi_ScheduleKeepAliveEvent(call
);
3555 * This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
3556 * seconds) to ask the client to authenticate itself. The routine
3557 * issues a challenge to the client, which is obtained from the
3558 * security object associated with the connection
3561 rxi_ChallengeEvent(struct rxevent
*event
, struct rx_connection
*conn
,
3564 conn
->challengeEvent
= (struct rxevent
*) 0;
3565 if (RXS_CheckAuthentication(conn
->securityObject
, conn
) != 0) {
3566 struct rx_packet
*packet
;
3568 packet
= rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL
);
3570 osi_Panic("rxi_ChallengeEvent");
3571 RXS_GetChallenge(conn
->securityObject
, conn
, packet
);
3572 rxi_SendSpecial((struct rx_call
*) 0, conn
, packet
,
3573 RX_PACKET_TYPE_CHALLENGE
, (char *) 0, -1);
3574 rxi_FreePacket(packet
);
3575 rxi_resend_ChallengeEvent(conn
);
3580 * Call this routine to start requesting the client to authenticate
3581 * itself. This will continue until authentication is established,
3582 * the call times out, or an invalid response is returned. The
3583 * security object associated with the connection is asked to create
3584 * the challenge at this time. N.B. rxi_ChallengeOff is a macro,
3588 rxi_ChallengeOn(struct rx_connection
*conn
)
3590 if (!conn
->challengeEvent
) {
3591 RXS_CreateChallenge(conn
->securityObject
, conn
);
3592 rxi_ChallengeEvent((struct rxevent
*) 0, conn
, 0);
3597 * Called by event.c when a decongestion event (setup by
3598 * rxi_CongestionWait) occurs. This adds back in to the burst count
3599 * for the specified host the number of packets that were sent at the
3600 * time the event was scheduled. It also calls rxi_Start on as many
3601 * waiting calls as possible before the burst count goes down to zero,
3605 rxi_DecongestionEvent(struct rxevent
*event
, struct rx_peer
*peer
,
3608 struct rx_call
*call
;
3609 struct rx_call
*nxcall
; /* Next pointer for queue_Scan */
3611 peer
->burst
+= nPackets
;
3612 if (peer
->burst
> peer
->burstSize
)
3613 peer
->burst
= peer
->burstSize
;
3614 for (queue_Scan(&peer
->congestionQueue
, call
, nxcall
, rx_call
)) {
3615 assert(queue_IsNotEmpty(&peer
->congestionQueue
));
3616 assert(queue_Prev(&peer
->congestionQueue
, rx_call
));
3620 * The rxi_Start may put the call back on the congestion queue. In
3621 * that case, peer->burst should be 0 (otherwise no congestion was
3622 * encountered). It should go on the end of the queue, to allow
3623 * other calls to proceed when the next burst is allowed
3625 rxi_Start((struct rxevent
*) 0, call
);
3630 peer
->refCount
--; /* It was bumped by the callee */
3635 * Schedule an event at a host-dependent time in the future which will
3636 * add back nPackets to the current allowed burst window. Any number
3637 * of these events may be scheduled.
3640 rxi_ScheduleDecongestionEvent(struct rx_call
*call
, int nPackets
)
3642 struct rx_peer
*peer
= call
->conn
->peer
;
3645 clock_GetTime(&tmp
);
3646 clock_Add(&tmp
, &peer
->burstWait
);
3647 peer
->refCount
++; /* So it won't disappear underneath
3649 /* this is stupid - sending an int as a pointer is begging for trouble */
3650 rxevent_Post(&tmp
, rxi_DecongestionEvent
, (void *) peer
, (void *)(long)nPackets
);
3654 * The caller wishes to have rxi_Start called when the burst count has
3655 * gone up, and more packets can therefore be sent. Add the caller to
3656 * the end of the list of calls waiting for decongestion events to
3657 * happen. It's important that it's added to the end so that the
3658 * rxi_DecongestionEvent procedure always terminates (aside from
3659 * matters of scheduling fairness).
3662 rxi_CongestionWait(struct rx_call
*call
)
3664 if (queue_IsOnQueue(call
))
3666 assert(queue_IsNotEmpty(&call
->conn
->peer
->congestionQueue
));
3667 assert(queue_Prev(&call
->conn
->peer
->congestionQueue
, rx_call
));
3668 queue_Append(&call
->conn
->peer
->congestionQueue
, call
);
3672 * Compute round trip time of the packet provided, in *rttp.
3679 rxi_ComputeRoundTripTime(struct rx_packet
*p
,
3680 struct clock
*sentp
,
3681 struct rx_peer
*peer
)
3683 struct clock thisRtt
, *rttp
= &thisRtt
;
3686 static char id
[] = "@(#)adaptive RTO";
3687 *id
= *id
; /* so it won't complain about unsed variables */
3689 clock_GetTime(rttp
);
3690 if (clock_Lt(rttp
, sentp
)) {
3692 return; /* somebody set the clock back, don't
3693 * count this time. */
3695 clock_Sub(rttp
, sentp
);
3697 clock_GetTime(rttp
);
3698 clock_Sub(rttp
, &p
->timeSent
);
3700 if (clock_Lt(rttp
, &rx_stats
.minRtt
))
3701 rx_stats
.minRtt
= *rttp
;
3702 if (clock_Gt(rttp
, &rx_stats
.maxRtt
)) {
3703 if (rttp
->sec
> 110)
3704 return; /* somebody set the clock ahead */
3705 rx_stats
.maxRtt
= *rttp
;
3707 clock_Add(&rx_stats
.totalRtt
, rttp
);
3708 rx_stats
.nRttSamples
++;
3711 /* better rtt calculation courtesy of UMich crew (dave,larry,peter,???) */
3713 /* Apply VanJacobson round-trip estimations */
3718 rtt
= rttp
->usec
+ rttp
->sec
*1000000;
3719 if (rtt
>= peer
->srtt
)
3720 error
= rtt
- peer
->srtt
;
3722 error
= peer
->srtt
- rtt
;
3725 * The following magic is equivalent to the smoothing
3726 * algorithm in rfc793 with an alpha of .875
3727 * (srtt = rtt/8 + srtt*7/8 in fixed point).
3730 peer
->srtt
= (peer
->srtt
*7 + rtt
)/8;
3733 * We accumulate a smoothed rtt variance (actually, a smoothed
3734 * mean difference), then set the retransmit timer to smoothed
3735 * rtt + 4 times the smoothed variance (was 2x in van's
3736 * original paper, but 4x works better for me, and apparently
3739 * The following is equivalent to rfc793 smoothing with an
3740 * alpha of .75 (rttvar = rttvar*3/4 + |err| / 4). This
3741 * replaces rfc793's wired-in beta.
3744 peer
->mdev
= (peer
->mdev
*3 + error
)/4;
3746 peer
->srtt
= rttp
->usec
+ rttp
->sec
*1000000;
3747 peer
->mdev
= peer
->srtt
/2;
3748 /* One single measurement is a real poor estimate of RTT&MDEV */
3749 if (peer
->mdev
< 1000)
3750 peer
->mdev
= 1000; /* 1ms */
3753 update_timeout(peer
);
3755 dpf(("rtt=%.2f ms, srtt=%.2f ms, mdev=%.2f ms, timeout=%.2f ms\n",
3756 rttp
->usec
/1000.0 + rttp
->sec
*1000.0,
3757 peer
->srtt
/1000.0, peer
->mdev
/1000.0,
3758 peer
->timeout
.sec
*1000.0 + peer
->timeout
.usec
/1000.0));
3759 #endif /* ADAPT_RTO */
3764 * Find all server connections that have not been active for a long time,
3768 rxi_ReapConnections(void)
3772 clock_GetTime(&now
);
3775 * Find server connection structures that haven't been used for greater
3776 * than rx_idleConnectionTime
3779 struct rx_connection
**conn_ptr
, **conn_end
;
3780 int i
, havecalls
= 0, ret
;
3782 for (conn_ptr
= &rx_connHashTable
[0],
3783 conn_end
= &rx_connHashTable
[rx_hashTableSize
];
3784 conn_ptr
< conn_end
;
3786 struct rx_connection
*conn
, *next
;
3789 for (conn
= *conn_ptr
; conn
; conn
= next
) {
3791 /* once a minute look at everything to see what's up */
3793 for (i
= 0; i
< RX_MAXCALLS
; i
++) {
3794 if (conn
->call
[i
]) {
3796 ret
= rxi_CheckCall(conn
->call
[i
]);
3798 /* If CheckCall freed the call, it might
3799 * have destroyed the connection as well,
3800 * which screws up the linked lists.
3806 if (conn
->type
== RX_SERVER_CONNECTION
) {
3809 * This only actually destroys the connection if there
3810 * are no outstanding calls
3812 if (!havecalls
&& !conn
->refCount
&&
3813 ((conn
->lastSendTime
+ rx_idleConnectionTime
) < now
.sec
)) {
3814 conn
->refCount
++; /* it will be decr in
3816 rx_DestroyConnection(conn
);
3824 * Find any peer structures that haven't been used (haven't had an
3825 * associated connection) for greater than rx_idlePeerTime
3828 struct rx_peer
**peer_ptr
, **peer_end
;
3830 for (peer_ptr
= &rx_peerHashTable
[0],
3831 peer_end
= &rx_peerHashTable
[rx_hashTableSize
];
3832 peer_ptr
< peer_end
; peer_ptr
++) {
3833 struct rx_peer
*peer
, *next
;
3835 for (peer
= *peer_ptr
; peer
; peer
= next
) {
3837 if (peer
->refCount
== 0
3838 && ((peer
->idleWhen
+ rx_idlePeerTime
) < now
.sec
)) {
3839 rxi_DestroyPeer(peer
);
3846 * THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
3847 * rxi_AllocSendPacket, if it hits, will be handled at the next conn GC,
3848 * just below. Really, we shouldn't have to keep moving packets from one
3849 * place to another, but instead ought to always know if we can afford to
3850 * hold onto a packet in its particular use.
3852 RX_MUTEX_ENTER(&rx_waitingForPackets_lock
);
3853 if (rx_waitingForPackets
) {
3854 rx_waitingForPackets
= 0;
3855 #ifdef RX_ENABLE_LOCKS
3856 cv_signal(&rx_waitingForPackets_cv
);
3858 osi_rxWakeup(&rx_waitingForPackets
);
3861 RX_MUTEX_EXIT(&rx_waitingForPackets_lock
);
3863 now
.sec
+= RX_REAP_TIME
; /* Check every RX_REAP_TIME seconds */
3864 rxevent_Post(&now
, rxi_ReapConnections
, NULL
, NULL
);
3869 * rxs_Release - This isn't strictly necessary but, since the macro name from
3870 * rx.h is sort of strange this is better. This is called with a security
3871 * object before it is discarded. Each connection using a security object has
3872 * its own refcount to the object so it won't actually be freed until the last
3873 * connection is destroyed.
3875 * This is the only rxs module call. A hold could also be written but no one
3880 rxs_Release(struct rx_securityClass
*aobj
)
3882 return RXS_Close(aobj
);
3886 #define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
3887 #define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
3888 #define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
3889 #define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
3892 * Adjust our estimate of the transmission rate to this peer, given
3893 * that the packet p was just acked. We can adjust peer->timeout and
3894 * call->twind (and peer->maxWindow). Pragmatically, this is called
3895 * only with packets of maximal length.
3899 rxi_ComputeRate(struct rx_peer
*peer
, struct rx_call
*call
,
3900 struct rx_packet
*p
, struct rx_packet
*ackp
, u_char ackReason
)
3902 long xferSize
, xferMs
;
3906 /* Count down packets */
3907 if (peer
->rateFlag
> 0)
3909 /* Do nothing until we're enabled */
3910 if (peer
->rateFlag
!= 0)
3915 /* Count only when the ack seems legitimate */
3916 switch (ackReason
) {
3917 case RX_ACK_REQUESTED
:
3918 xferSize
= p
->length
+ RX_HEADER_SIZE
+
3919 call
->conn
->securityMaxTrailerSize
;
3923 case RX_ACK_PING_RESPONSE
:
3924 if (p
) /* want the response to ping-request,
3927 clock_GetTime(&newTO
);
3928 if (clock_Gt(&newTO
, &call
->pingRequestTime
)) {
3929 clock_Sub(&newTO
, &call
->pingRequestTime
);
3930 xferMs
= (newTO
.sec
* 1000) + (newTO
.usec
/ 1000);
3934 xferSize
= rx_AckDataSize(rx_Window
) + RX_HEADER_SIZE
;
3941 dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %lu.%06lu, "
3942 "rtt %u, win %u, ps %u)",
3943 ntohl(peer
->host
), ntohs(peer
->port
),
3944 (ackReason
== RX_ACK_REQUESTED
? "dataack" : "pingack"),
3945 xferSize
, xferMs
, peer
->timeout
.sec
, peer
->timeout
.usec
, peer
->smRtt
,
3946 peer
->maxWindow
, peer
->packetSize
));
3948 /* Track only packets that are big enough. */
3949 if ((p
->length
+ RX_HEADER_SIZE
+ call
->conn
->securityMaxTrailerSize
) <
3953 /* absorb RTT data (in milliseconds) for these big packets */
3954 if (peer
->smRtt
== 0) {
3955 peer
->smRtt
= xferMs
;
3957 peer
->smRtt
= ((peer
->smRtt
* 15) + xferMs
+ 4) >> 4;
3962 if (peer
->countDown
) {
3966 peer
->countDown
= 10; /* recalculate only every so often */
3971 * We here assume that we can approximate the total elapsed time for a
3972 * window-full of full packets as: time = RTT + ((winSize *
3973 * (packetSize+overhead)) - minPktSize) / byteRate
3975 /* The RTT and byteRate numbers are what is measured above. */
3978 * In principle, we can change the other parameters: - winSize, the
3979 * number of packets in the transmission window; - packetSize, the max
3980 * size of a data packet; - the timeout, which must be larger than the
3985 * In practice, we do this in two steps: (a) ensure that the timeout is
3986 * large enough for a single packet to get through; (b) ensure that the
3987 * transmit-window is small enough to fit in the timeout.
3990 /* First, an expression for the expected RTT for a full packet */
3991 minTime
= peer
->smRtt
+ ((1000 * (peer
->packetSize
+
3992 RX_HEADER_SIZE
+ RX_IPUDP_SIZE
)) / peer
->smBps
);
3994 /* Get a reasonable estimate for a timeout period */
3996 newTO
.sec
= minTime
/ 1000;
3997 newTO
.usec
= (minTime
- (newTO
.sec
* 1000)) * 1000;
4000 * Increase the timeout period so that we can always do at least one
4003 if (clock_Gt(&newTO
, &peer
->timeout
)) {
4005 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu "
4006 "(rtt %u, win %u, ps %u, Bps %u)",
4007 ntohl(peer
->host
), ntohs(peer
->port
), peer
->timeout
.sec
,
4008 peer
->timeout
.usec
, newTO
.sec
, newTO
.usec
, peer
->smRtt
,
4009 peer
->maxWindow
, peer
->packetSize
, peer
->smBps
));
4011 peer
->timeout
= newTO
;
4013 /* Now, get an estimate for the transmit window size. */
4014 minTime
= peer
->timeout
.sec
* 1000 + (peer
->timeout
.usec
/ 1000);
4017 * Now, convert to the number of full packets that could fit in that
4020 minTime
= ((((minTime
- peer
->smRtt
) * peer
->smBps
) / 1000) +
4021 RXRATE_AVG_SMALL_PKT
) /
4022 (peer
->packetSize
+ RX_HEADER_SIZE
+ RX_IPUDP_SIZE
);
4023 minTime
>>= 1; /* Take half that many */
4024 xferSize
= minTime
; /* (make a copy) */
4026 /* Now clamp the size to reasonable bounds. */
4029 else if (minTime
> rx_Window
)
4030 minTime
= rx_Window
;
4031 if (minTime
!= peer
->maxWindow
) {
4032 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, "
4033 "rtt %u, ps %u, Bps %u)",
4034 ntohl(peer
->host
), ntohs(peer
->port
), peer
->maxWindow
, minTime
,
4035 peer
->timeout
.sec
, peer
->timeout
.usec
, peer
->smRtt
,
4036 peer
->packetSize
, peer
->smBps
));
4038 peer
->maxWindow
= minTime
;
4039 /* call->twind = minTime; */
4043 * Cut back on the peer timeout if it has grown unreasonably. Discern
4044 * this by calculating the timeout necessary for rx_Window packets.
4046 if ((xferSize
> rx_Window
) && (peer
->timeout
.sec
>= 3)) {
4047 /* calculate estimate for transmission interval in milliseconds */
4048 minTime
= (((1000 * rx_Window
*
4049 (peer
->packetSize
+ RX_HEADER_SIZE
+ RX_IPUDP_SIZE
))
4050 - RXRATE_AVG_SMALL_PKT
) / peer
->smBps
) + peer
->smRtt
;
4051 if (minTime
< 1000) {
4053 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, "
4054 "win %u, ps %u, Bps %u)",
4055 ntohl(peer
->host
), ntohs(peer
->port
), peer
->timeout
.sec
,
4056 peer
->timeout
.usec
, peer
->smRtt
, peer
->maxWindow
,
4057 peer
->packetSize
, peer
->smBps
));
4059 newTO
.sec
= 0; /* cut back on timeout by half a
4061 newTO
.usec
= 500000;
4062 clock_Sub(&peer
->timeout
, &newTO
);
4068 * In practice, we can measure only the RTT for full packets, because of
4069 * the way Rx acks the data that it receives. (If it's smaller than a
4070 * full packet, it often gets implicitly acked either by the call
4071 * response (from a server) or by the next call (from a client), and
4072 * either case confuses transmission times with processing times.)
4073 * Therefore, replace the above more-sophisticated processing with a
4074 * simpler version, where the smoothed RTT is kept for full-size packets,
4075 * and the time to transmit a windowful of full-size packets is simply
4076 * RTT * windowSize. Again, we take two steps: - ensure the timeout is
4077 * large enough for a single packet's RTT; - ensure that the window is
4078 * small enough to fit in the desired timeout.
4081 /* First, the timeout check. */
4082 minTime
= peer
->smRtt
;
4083 /* Get a reasonable estimate for a timeout period */
4085 newTO
.sec
= minTime
/ 1000;
4086 newTO
.usec
= (minTime
- (newTO
.sec
* 1000)) * 1000;
4089 * Increase the timeout period so that we can always do at least one
4092 if (clock_Gt(&newTO
, &peer
->timeout
)) {
4094 dpf(("CONG peer %lx/%u: timeout %lu.%06lu ==> %lu.%06lu (rtt %u, "
4096 ntohl(peer
->host
), ntohs(peer
->port
), peer
->timeout
.sec
,
4097 peer
->timeout
.usec
, newTO
.sec
, newTO
.usec
, peer
->smRtt
,
4098 peer
->maxWindow
, peer
->packetSize
));
4100 peer
->timeout
= newTO
;
4102 /* Now, get an estimate for the transmit window size. */
4103 minTime
= peer
->timeout
.sec
* 1000 + (peer
->timeout
.usec
/ 1000);
4106 * Now, convert to the number of full packets that could fit in a
4107 * reasonable fraction of that interval
4109 minTime
/= (peer
->smRtt
<< 1);
4110 xferSize
= minTime
; /* (make a copy) */
4112 /* Now clamp the size to reasonable bounds. */
4115 else if (minTime
> rx_Window
)
4116 minTime
= rx_Window
;
4117 if (minTime
!= peer
->maxWindow
) {
4118 dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, "
4120 ntohl(peer
->host
), ntohs(peer
->port
), peer
->maxWindow
, minTime
,
4121 peer
->timeout
.sec
, peer
->timeout
.usec
, peer
->smRtt
,
4123 peer
->maxWindow
= minTime
;
4124 /* call->twind = minTime; */
4128 * Cut back on the peer timeout if it had earlier grown unreasonably.
4129 * Discern this by calculating the timeout necessary for rx_Window
4132 if ((xferSize
> rx_Window
) && (peer
->timeout
.sec
>= 3)) {
4133 /* calculate estimate for transmission interval in milliseconds */
4134 minTime
= rx_Window
* peer
->smRtt
;
4135 if (minTime
< 1000) {
4136 dpf(("CONG peer %lx/%u: cut TO %lu.%06lu by 0.5 (rtt %u, "
4138 ntohl(peer
->host
), ntohs(peer
->port
), peer
->timeout
.sec
,
4139 peer
->timeout
.usec
, peer
->smRtt
, peer
->maxWindow
,
4142 newTO
.sec
= 0; /* cut back on timeout by half a
4144 newTO
.usec
= 500000;
4145 clock_Sub(&peer
->timeout
, &newTO
);
4149 } /* end of rxi_ComputeRate */
4151 #endif /* ADAPT_WINDOW */
4159 /* Don't call this debugging routine directly; use dpf */
4161 rxi_DebugPrint(const char *fmt
, ...)
4166 clock_GetTime(&now
);
4168 fprintf(Log
, " %lu.%.3lu:", now
.sec
, now
.usec
/ 1000);
4170 vfprintf(Log
, fmt
, ap
);
4178 rx_PrintTheseStats(FILE *file
, struct rx_stats
*s
, int size
)
4183 if (size
!= sizeof(struct rx_stats
))
4184 fprintf(file
, "Unexpected size of stats structure: was %d, "
4187 (int)sizeof(struct rx_stats
));
4190 "rx stats: free packets %d, allocs %d, "
4191 "alloc-failures(rcv %d,send %d,ack %d)\n", rx_nFreePackets
,
4192 s
->packetRequests
, s
->noPackets
[0], s
->noPackets
[1],
4195 " greedy %d, bogusReads %d (last from host %x), "
4196 "noPackets %d, noBuffers %d, selects %d, sendSelects %d\n",
4197 s
->socketGreedy
, s
->bogusPacketOnRead
, s
->bogusHost
,
4198 s
->noPacketOnRead
, s
->noPacketBuffersOnRead
, s
->selects
,
4200 fprintf(file
, " packets read: ");
4201 for (i
= 0; i
< RX_N_PACKET_TYPES
; i
++)
4202 fprintf(file
, "%s %d ", rx_packetTypes
[i
], s
->packetsRead
[i
]);
4204 fprintf(file
, "\n");
4206 " other read counters: data %d, ack %d, dup %d "
4207 "spurious %d\n", s
->dataPacketsRead
, s
->ackPacketsRead
,
4208 s
->dupPacketsRead
, s
->spuriousPacketsRead
);
4209 fprintf(file
, " packets sent: ");
4210 for (i
= 0; i
< RX_N_PACKET_TYPES
; i
++)
4211 fprintf(file
, "%s %d ", rx_packetTypes
[i
], s
->packetsSent
[i
]);
4212 fprintf(file
, "\n");
4214 " other send counters: ack %d, data %d (not resends), "
4215 "resends %d, pushed %d, acked&ignored %d\n", s
->ackPacketsSent
,
4216 s
->dataPacketsSent
, s
->dataPacketsReSent
, s
->dataPacketsPushed
,
4217 s
->ignoreAckedPacket
);
4219 " \t(these should be small) sendFailed %lu, "
4220 "fatalErrors %lu\n",
4221 (unsigned long)s
->netSendFailures
,
4222 (unsigned long)s
->fatalErrors
);
4223 if (s
->nRttSamples
) {
4224 fprintf(file
, " Average rtt is %0.3f, with %d samples\n",
4225 clock_Float(&s
->totalRtt
) / s
->nRttSamples
, s
->nRttSamples
);
4227 fprintf(file
, " Minimum rtt is %0.3f, maximum is %0.3f\n",
4228 clock_Float(&s
->minRtt
), clock_Float(&s
->maxRtt
));
4231 " %d server connections, %d client connections, %d "
4232 "peer structs, %d call structs, %d free call structs\n",
4233 s
->nServerConns
, s
->nClientConns
, s
->nPeerStructs
,
4234 s
->nCallStructs
, s
->nFreeCallStructs
);
4235 fprintf(file
, " %d clock updates\n", clock_nUpdates
);
4236 #endif /* RXDEBUG */
4239 /* for backward compatibility */
4241 rx_PrintStats(FILE *file
)
4244 rx_PrintTheseStats(file
, &rx_stats
, sizeof(rx_stats
));
4249 rx_PrintPeerStats(FILE *file
, struct rx_peer
*peer
)
4252 fprintf(file
, "Peer %lx.%d. Burst size %d, burst wait %lu.%ld.\n",
4253 (unsigned long)ntohl(peer
->host
),
4256 (unsigned long)peer
->burstWait
.sec
,
4257 (unsigned long)peer
->burstWait
.usec
);
4258 fprintf(file
, " Rtt %lu us, retry time %lu.%06ld, total sent %d, resent %d\n",
4260 (unsigned long)peer
->timeout
.sec
,
4261 (long)peer
->timeout
.usec
,
4265 " Packet size %d, max in packet skew %ld, max out packet "
4266 "skew %ld\n", peer
->packetSize
, peer
->inPacketSkew
,
4267 peer
->outPacketSkew
);
4268 #endif /* RXDEBUG */
4274 struct rx_serverQueueEntry
*np
;
4279 struct rx_peer
**peer_ptr
, **peer_end
;
4281 for (peer_ptr
= &rx_peerHashTable
[0],
4282 peer_end
= &rx_peerHashTable
[rx_hashTableSize
];
4283 peer_ptr
< peer_end
; peer_ptr
++) {
4284 struct rx_peer
*peer
, *next
;
4286 for (peer
= *peer_ptr
; peer
; peer
= next
) {
4288 rxi_DestroyPeer(peer
);
4292 for (i
= 0; i
< RX_MAX_SERVICES
; i
++) {
4294 rxi_Free(rx_services
[i
], sizeof(*rx_services
));
4296 for (i
= 0; i
< rx_hashTableSize
; i
++) {
4297 struct rx_connection
*tc
, *ntc
;
4299 for (tc
= rx_connHashTable
[i
]; tc
; tc
= ntc
) {
4301 for (j
= 0; j
< RX_MAXCALLS
; j
++) {
4303 rxi_Free(tc
->call
[j
], sizeof(*(tc
->call
)));
4306 rxi_Free(tc
, sizeof(tc
));
4310 RX_MUTEX_ENTER(&freeSQEList_lock
);
4312 while ((np
= rx_FreeSQEList
) != NULL
) {
4313 rx_FreeSQEList
= *(struct rx_serverQueueEntry
**) np
;
4314 RX_MUTEX_DESTROY(&np
->lock
);
4315 rxi_Free(np
, sizeof(np
));
4318 RX_MUTEX_EXIT(&freeSQEList_lock
);
4319 RX_MUTEX_DESTROY(&freeSQEList_lock
);
4320 RX_MUTEX_DESTROY(&rx_waitingForPackets_lock
);
4321 RX_MUTEX_DESTROY(&rx_freeCallQueue_lock
);
4323 osi_Free(rx_connHashTable
,
4324 rx_hashTableSize
* sizeof(struct rx_connection
*));
4325 osi_Free(rx_peerHashTable
, rx_hashTableSize
* sizeof(struct rx_peer
*));
4326 osi_Free(rx_allocedP
, sizeof(struct rx_packet
) * rx_nPackets
);
4328 UNPIN(rx_connHashTable
, rx_hashTableSize
* sizeof(struct rx_connection
*));
4329 UNPIN(rx_peerHashTable
, rx_hashTableSize
* sizeof(struct rx_peer
*));
4330 UNPIN(rx_allocedP
, sizeof(struct rx_packet
) * rx_nPackets
);
4332 rxi_FreeAllPackets();
4334 rxi_dataQuota
= RX_MAX_QUOTA
;
4335 rxi_availProcs
= rxi_totalMin
= rxi_minDeficit
= 0;
4339 rx_getServiceRock(struct rx_service
*service
)
4341 return service
->serviceRock
;
4345 rx_setServiceRock(struct rx_service
*service
, void *rock
)
4347 service
->serviceRock
= rock
;