1 /* call.c: Rx call routines
3 * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
12 #include <linux/sched.h>
13 #include <linux/slab.h>
14 #include <linux/module.h>
15 #include <rxrpc/rxrpc.h>
16 #include <rxrpc/transport.h>
17 #include <rxrpc/peer.h>
18 #include <rxrpc/connection.h>
19 #include <rxrpc/call.h>
20 #include <rxrpc/message.h>
23 __RXACCT_DECL(atomic_t rxrpc_call_count
);
24 __RXACCT_DECL(atomic_t rxrpc_message_count
);
26 LIST_HEAD(rxrpc_calls
);
27 DECLARE_RWSEM(rxrpc_calls_sem
);
29 unsigned rxrpc_call_rcv_timeout
= HZ
/3;
30 static unsigned rxrpc_call_acks_timeout
= HZ
/3;
31 static unsigned rxrpc_call_dfr_ack_timeout
= HZ
/20;
32 static unsigned short rxrpc_call_max_resend
= HZ
/10;
34 const char *rxrpc_call_states
[] = {
47 const char *rxrpc_call_error_states
[] = {
55 const char *rxrpc_pkts
[] = {
57 "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug",
58 "?09", "?10", "?11", "?12", "?13", "?14", "?15"
61 static const char *rxrpc_acks
[] = {
62 "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
66 static const char _acktype
[] = "NA-";
68 static void rxrpc_call_receive_packet(struct rxrpc_call
*call
);
69 static void rxrpc_call_receive_data_packet(struct rxrpc_call
*call
,
70 struct rxrpc_message
*msg
);
71 static void rxrpc_call_receive_ack_packet(struct rxrpc_call
*call
,
72 struct rxrpc_message
*msg
);
73 static void rxrpc_call_definitively_ACK(struct rxrpc_call
*call
,
75 static void rxrpc_call_resend(struct rxrpc_call
*call
, rxrpc_seq_t highest
);
76 static int __rxrpc_call_read_data(struct rxrpc_call
*call
);
78 static int rxrpc_call_record_ACK(struct rxrpc_call
*call
,
79 struct rxrpc_message
*msg
,
83 static int rxrpc_call_flush(struct rxrpc_call
*call
);
85 #define _state(call) \
86 _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]);
88 static void rxrpc_call_default_attn_func(struct rxrpc_call
*call
)
90 wake_up(&call
->waitq
);
93 static void rxrpc_call_default_error_func(struct rxrpc_call
*call
)
95 wake_up(&call
->waitq
);
98 static void rxrpc_call_default_aemap_func(struct rxrpc_call
*call
)
100 switch (call
->app_err_state
) {
101 case RXRPC_ESTATE_LOCAL_ABORT
:
102 call
->app_abort_code
= -call
->app_errno
;
103 case RXRPC_ESTATE_PEER_ABORT
:
104 call
->app_errno
= -ECONNABORTED
;
110 static void __rxrpc_call_acks_timeout(unsigned long _call
)
112 struct rxrpc_call
*call
= (struct rxrpc_call
*) _call
;
114 _debug("ACKS TIMEOUT %05lu", jiffies
- call
->cjif
);
116 call
->flags
|= RXRPC_CALL_ACKS_TIMO
;
117 rxrpc_krxiod_queue_call(call
);
120 static void __rxrpc_call_rcv_timeout(unsigned long _call
)
122 struct rxrpc_call
*call
= (struct rxrpc_call
*) _call
;
124 _debug("RCV TIMEOUT %05lu", jiffies
- call
->cjif
);
126 call
->flags
|= RXRPC_CALL_RCV_TIMO
;
127 rxrpc_krxiod_queue_call(call
);
130 static void __rxrpc_call_ackr_timeout(unsigned long _call
)
132 struct rxrpc_call
*call
= (struct rxrpc_call
*) _call
;
134 _debug("ACKR TIMEOUT %05lu",jiffies
- call
->cjif
);
136 call
->flags
|= RXRPC_CALL_ACKR_TIMO
;
137 rxrpc_krxiod_queue_call(call
);
140 /*****************************************************************************/
142 * calculate a timeout based on an RTT value
144 static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call
*call
,
147 unsigned long expiry
= call
->conn
->peer
->rtt
/ (1000000 / HZ
);
150 if (expiry
< HZ
/ 25)
155 _leave(" = %lu jiffies", expiry
);
156 return jiffies
+ expiry
;
157 } /* end __rxrpc_rtt_based_timeout() */
159 /*****************************************************************************/
161 * create a new call record
163 static inline int __rxrpc_create_call(struct rxrpc_connection
*conn
,
164 struct rxrpc_call
**_call
)
166 struct rxrpc_call
*call
;
170 /* allocate and initialise a call record */
171 call
= (struct rxrpc_call
*) get_zeroed_page(GFP_KERNEL
);
177 atomic_set(&call
->usage
, 1);
179 init_waitqueue_head(&call
->waitq
);
180 spin_lock_init(&call
->lock
);
181 INIT_LIST_HEAD(&call
->link
);
182 INIT_LIST_HEAD(&call
->acks_pendq
);
183 INIT_LIST_HEAD(&call
->rcv_receiveq
);
184 INIT_LIST_HEAD(&call
->rcv_krxiodq_lk
);
185 INIT_LIST_HEAD(&call
->app_readyq
);
186 INIT_LIST_HEAD(&call
->app_unreadyq
);
187 INIT_LIST_HEAD(&call
->app_link
);
188 INIT_LIST_HEAD(&call
->app_attn_link
);
190 init_timer(&call
->acks_timeout
);
191 call
->acks_timeout
.data
= (unsigned long) call
;
192 call
->acks_timeout
.function
= __rxrpc_call_acks_timeout
;
194 init_timer(&call
->rcv_timeout
);
195 call
->rcv_timeout
.data
= (unsigned long) call
;
196 call
->rcv_timeout
.function
= __rxrpc_call_rcv_timeout
;
198 init_timer(&call
->ackr_dfr_timo
);
199 call
->ackr_dfr_timo
.data
= (unsigned long) call
;
200 call
->ackr_dfr_timo
.function
= __rxrpc_call_ackr_timeout
;
203 call
->ackr_win_bot
= 1;
204 call
->ackr_win_top
= call
->ackr_win_bot
+ RXRPC_CALL_ACK_WINDOW_SIZE
- 1;
205 call
->ackr_prev_seq
= 0;
206 call
->app_mark
= RXRPC_APP_MARK_EOF
;
207 call
->app_attn_func
= rxrpc_call_default_attn_func
;
208 call
->app_error_func
= rxrpc_call_default_error_func
;
209 call
->app_aemap_func
= rxrpc_call_default_aemap_func
;
210 call
->app_scr_alloc
= call
->app_scratch
;
212 call
->cjif
= jiffies
;
214 _leave(" = 0 (%p)", call
);
219 } /* end __rxrpc_create_call() */
221 /*****************************************************************************/
223 * create a new call record for outgoing calls
225 int rxrpc_create_call(struct rxrpc_connection
*conn
,
226 rxrpc_call_attn_func_t attn
,
227 rxrpc_call_error_func_t error
,
228 rxrpc_call_aemap_func_t aemap
,
229 struct rxrpc_call
**_call
)
231 DECLARE_WAITQUEUE(myself
, current
);
233 struct rxrpc_call
*call
;
238 /* allocate and initialise a call record */
239 ret
= __rxrpc_create_call(conn
, &call
);
241 _leave(" = %d", ret
);
245 call
->app_call_state
= RXRPC_CSTATE_CLNT_SND_ARGS
;
247 call
->app_attn_func
= attn
;
249 call
->app_error_func
= error
;
251 call
->app_aemap_func
= aemap
;
255 spin_lock(&conn
->lock
);
256 set_current_state(TASK_INTERRUPTIBLE
);
257 add_wait_queue(&conn
->chanwait
, &myself
);
260 /* try to find an unused channel */
261 for (cix
= 0; cix
< 4; cix
++)
262 if (!conn
->channels
[cix
])
265 /* no free channels - wait for one to become available */
267 if (signal_pending(current
))
270 spin_unlock(&conn
->lock
);
273 set_current_state(TASK_INTERRUPTIBLE
);
275 spin_lock(&conn
->lock
);
278 /* got a channel - now attach to the connection */
280 remove_wait_queue(&conn
->chanwait
, &myself
);
281 set_current_state(TASK_RUNNING
);
283 /* concoct a unique call number */
285 call
->call_id
= htonl(++conn
->call_counter
);
286 for (loop
= 0; loop
< 4; loop
++)
287 if (conn
->channels
[loop
] &&
288 conn
->channels
[loop
]->call_id
== call
->call_id
)
291 rxrpc_get_connection(conn
);
292 conn
->channels
[cix
] = call
; /* assign _after_ done callid check loop */
293 do_gettimeofday(&conn
->atime
);
294 call
->chan_ix
= htonl(cix
);
296 spin_unlock(&conn
->lock
);
298 down_write(&rxrpc_calls_sem
);
299 list_add_tail(&call
->call_link
, &rxrpc_calls
);
300 up_write(&rxrpc_calls_sem
);
302 __RXACCT(atomic_inc(&rxrpc_call_count
));
305 _leave(" = 0 (call=%p cix=%u)", call
, cix
);
309 remove_wait_queue(&conn
->chanwait
, &myself
);
310 set_current_state(TASK_RUNNING
);
311 spin_unlock(&conn
->lock
);
313 free_page((unsigned long) call
);
314 _leave(" = %d", ret
);
316 } /* end rxrpc_create_call() */
318 /*****************************************************************************/
320 * create a new call record for incoming calls
322 int rxrpc_incoming_call(struct rxrpc_connection
*conn
,
323 struct rxrpc_message
*msg
,
324 struct rxrpc_call
**_call
)
326 struct rxrpc_call
*call
;
330 cix
= ntohl(msg
->hdr
.cid
) & RXRPC_CHANNELMASK
;
332 _enter("%p,%u,%u", conn
, ntohl(msg
->hdr
.callNumber
), cix
);
334 /* allocate and initialise a call record */
335 ret
= __rxrpc_create_call(conn
, &call
);
337 _leave(" = %d", ret
);
341 call
->pkt_rcv_count
= 1;
342 call
->app_call_state
= RXRPC_CSTATE_SRVR_RCV_OPID
;
343 call
->app_mark
= sizeof(uint32_t);
347 /* attach to the connection */
349 call
->chan_ix
= htonl(cix
);
350 call
->call_id
= msg
->hdr
.callNumber
;
352 spin_lock(&conn
->lock
);
354 if (!conn
->channels
[cix
] ||
355 conn
->channels
[cix
]->app_call_state
== RXRPC_CSTATE_COMPLETE
||
356 conn
->channels
[cix
]->app_call_state
== RXRPC_CSTATE_ERROR
358 conn
->channels
[cix
] = call
;
359 rxrpc_get_connection(conn
);
363 spin_unlock(&conn
->lock
);
366 free_page((unsigned long) call
);
371 down_write(&rxrpc_calls_sem
);
372 list_add_tail(&call
->call_link
, &rxrpc_calls
);
373 up_write(&rxrpc_calls_sem
);
374 __RXACCT(atomic_inc(&rxrpc_call_count
));
378 _leave(" = %d [%p]", ret
, call
);
380 } /* end rxrpc_incoming_call() */
382 /*****************************************************************************/
386 void rxrpc_put_call(struct rxrpc_call
*call
)
388 struct rxrpc_connection
*conn
= call
->conn
;
389 struct rxrpc_message
*msg
;
391 _enter("%p{u=%d}",call
,atomic_read(&call
->usage
));
394 if (atomic_read(&call
->usage
) <= 0)
397 /* to prevent a race, the decrement and the de-list must be effectively
399 spin_lock(&conn
->lock
);
400 if (likely(!atomic_dec_and_test(&call
->usage
))) {
401 spin_unlock(&conn
->lock
);
406 if (conn
->channels
[ntohl(call
->chan_ix
)] == call
)
407 conn
->channels
[ntohl(call
->chan_ix
)] = NULL
;
409 spin_unlock(&conn
->lock
);
411 wake_up(&conn
->chanwait
);
413 rxrpc_put_connection(conn
);
415 /* clear the timers and dequeue from krxiod */
416 del_timer_sync(&call
->acks_timeout
);
417 del_timer_sync(&call
->rcv_timeout
);
418 del_timer_sync(&call
->ackr_dfr_timo
);
420 rxrpc_krxiod_dequeue_call(call
);
422 /* clean up the contents of the struct */
423 if (call
->snd_nextmsg
)
424 rxrpc_put_message(call
->snd_nextmsg
);
427 rxrpc_put_message(call
->snd_ping
);
429 while (!list_empty(&call
->acks_pendq
)) {
430 msg
= list_entry(call
->acks_pendq
.next
,
431 struct rxrpc_message
, link
);
432 list_del(&msg
->link
);
433 rxrpc_put_message(msg
);
436 while (!list_empty(&call
->rcv_receiveq
)) {
437 msg
= list_entry(call
->rcv_receiveq
.next
,
438 struct rxrpc_message
, link
);
439 list_del(&msg
->link
);
440 rxrpc_put_message(msg
);
443 while (!list_empty(&call
->app_readyq
)) {
444 msg
= list_entry(call
->app_readyq
.next
,
445 struct rxrpc_message
, link
);
446 list_del(&msg
->link
);
447 rxrpc_put_message(msg
);
450 while (!list_empty(&call
->app_unreadyq
)) {
451 msg
= list_entry(call
->app_unreadyq
.next
,
452 struct rxrpc_message
, link
);
453 list_del(&msg
->link
);
454 rxrpc_put_message(msg
);
457 module_put(call
->owner
);
459 down_write(&rxrpc_calls_sem
);
460 list_del(&call
->call_link
);
461 up_write(&rxrpc_calls_sem
);
463 __RXACCT(atomic_dec(&rxrpc_call_count
));
464 free_page((unsigned long) call
);
466 _leave(" [destroyed]");
467 } /* end rxrpc_put_call() */
469 /*****************************************************************************/
471 * actually generate a normal ACK
473 static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call
*call
,
476 struct rxrpc_message
*msg
;
481 /* ACKs default to DELAY */
482 if (!call
->ackr
.reason
)
483 call
->ackr
.reason
= RXRPC_ACK_DELAY
;
485 _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
486 jiffies
- call
->cjif
,
487 ntohs(call
->ackr
.maxSkew
),
488 ntohl(call
->ackr
.firstPacket
),
489 ntohl(call
->ackr
.previousPacket
),
490 ntohl(call
->ackr
.serial
),
491 rxrpc_acks
[call
->ackr
.reason
],
494 aux
[0] = htonl(call
->conn
->peer
->if_mtu
); /* interface MTU */
495 aux
[1] = htonl(1444); /* max MTU */
496 aux
[2] = htonl(16); /* rwind */
497 aux
[3] = htonl(4); /* max packets */
499 diov
[0].iov_len
= sizeof(struct rxrpc_ackpacket
);
500 diov
[0].iov_base
= &call
->ackr
;
501 diov
[1].iov_len
= call
->ackr_pend_cnt
+ 3;
502 diov
[1].iov_base
= call
->ackr_array
;
503 diov
[2].iov_len
= sizeof(aux
);
504 diov
[2].iov_base
= &aux
;
506 /* build and send the message */
507 ret
= rxrpc_conn_newmsg(call
->conn
,call
, RXRPC_PACKET_TYPE_ACK
,
508 3, diov
, GFP_KERNEL
, &msg
);
513 msg
->hdr
.seq
= htonl(seq
);
514 msg
->hdr
.flags
|= RXRPC_SLOW_START_OK
;
516 ret
= rxrpc_conn_sendmsg(call
->conn
, msg
);
517 rxrpc_put_message(msg
);
520 call
->pkt_snd_count
++;
522 /* count how many actual ACKs there were at the front */
523 for (delta
= 0; delta
< call
->ackr_pend_cnt
; delta
++)
524 if (call
->ackr_array
[delta
] != RXRPC_ACK_TYPE_ACK
)
527 call
->ackr_pend_cnt
-= delta
; /* all ACK'd to this point */
529 /* crank the ACK window around */
531 /* un-ACK'd window */
533 else if (delta
< RXRPC_CALL_ACK_WINDOW_SIZE
) {
534 /* partially ACK'd window
535 * - shuffle down to avoid losing out-of-sequence packets
537 call
->ackr_win_bot
+= delta
;
538 call
->ackr_win_top
+= delta
;
540 memmove(&call
->ackr_array
[0],
541 &call
->ackr_array
[delta
],
542 call
->ackr_pend_cnt
);
544 memset(&call
->ackr_array
[call
->ackr_pend_cnt
],
546 sizeof(call
->ackr_array
) - call
->ackr_pend_cnt
);
549 /* fully ACK'd window
550 * - just clear the whole thing
552 memset(&call
->ackr_array
,
554 sizeof(call
->ackr_array
));
558 memset(&call
->ackr
, 0, sizeof(call
->ackr
));
561 if (!call
->app_call_state
)
562 printk("___ STATE 0 ___\n");
564 } /* end __rxrpc_call_gen_normal_ACK() */
566 /*****************************************************************************/
568 * note the reception of a packet in the call's ACK records and generate an
569 * appropriate ACK packet if necessary
570 * - returns 0 if packet should be processed, 1 if packet should be ignored
571 * and -ve on an error
573 static int rxrpc_call_generate_ACK(struct rxrpc_call
*call
,
574 struct rxrpc_header
*hdr
,
575 struct rxrpc_ackpacket
*ack
)
577 struct rxrpc_message
*msg
;
581 u8 special_ACK
, do_ACK
, force
;
583 _enter("%p,%p { seq=%d tp=%d fl=%02x }",
584 call
, hdr
, ntohl(hdr
->seq
), hdr
->type
, hdr
->flags
);
586 seq
= ntohl(hdr
->seq
);
587 offset
= seq
- call
->ackr_win_bot
;
588 do_ACK
= RXRPC_ACK_DELAY
;
592 if (call
->ackr_high_seq
< seq
)
593 call
->ackr_high_seq
= seq
;
595 /* deal with generation of obvious special ACKs first */
596 if (ack
&& ack
->reason
== RXRPC_ACK_PING
) {
597 special_ACK
= RXRPC_ACK_PING_RESPONSE
;
602 if (seq
< call
->ackr_win_bot
) {
603 special_ACK
= RXRPC_ACK_DUPLICATE
;
608 if (seq
>= call
->ackr_win_top
) {
609 special_ACK
= RXRPC_ACK_EXCEEDS_WINDOW
;
614 if (call
->ackr_array
[offset
] != RXRPC_ACK_TYPE_NACK
) {
615 special_ACK
= RXRPC_ACK_DUPLICATE
;
620 /* okay... it's a normal data packet inside the ACK window */
621 call
->ackr_array
[offset
] = RXRPC_ACK_TYPE_ACK
;
623 if (offset
< call
->ackr_pend_cnt
) {
625 else if (offset
> call
->ackr_pend_cnt
) {
626 do_ACK
= RXRPC_ACK_OUT_OF_SEQUENCE
;
627 call
->ackr_pend_cnt
= offset
;
631 if (hdr
->flags
& RXRPC_REQUEST_ACK
) {
632 do_ACK
= RXRPC_ACK_REQUESTED
;
635 /* generate an ACK on the final packet of a reply just received */
636 if (hdr
->flags
& RXRPC_LAST_PACKET
) {
637 if (call
->conn
->out_clientflag
)
640 else if (!(hdr
->flags
& RXRPC_MORE_PACKETS
)) {
641 do_ACK
= RXRPC_ACK_REQUESTED
;
644 /* re-ACK packets previously received out-of-order */
645 for (offset
++; offset
< RXRPC_CALL_ACK_WINDOW_SIZE
; offset
++)
646 if (call
->ackr_array
[offset
] != RXRPC_ACK_TYPE_ACK
)
649 call
->ackr_pend_cnt
= offset
;
651 /* generate an ACK if we fill up the window */
652 if (call
->ackr_pend_cnt
>= RXRPC_CALL_ACK_WINDOW_SIZE
)
656 _debug("%05lu ACKs pend=%u norm=%s special=%s%s",
657 jiffies
- call
->cjif
,
660 rxrpc_acks
[special_ACK
],
661 force
? " immediate" :
662 do_ACK
== RXRPC_ACK_REQUESTED
? " merge-req" :
663 hdr
->flags
& RXRPC_LAST_PACKET
? " finalise" :
667 /* send any pending normal ACKs if need be */
668 if (call
->ackr_pend_cnt
> 0) {
669 /* fill out the appropriate form */
670 call
->ackr
.bufferSpace
= htons(RXRPC_CALL_ACK_WINDOW_SIZE
);
671 call
->ackr
.maxSkew
= htons(min(call
->ackr_high_seq
- seq
,
673 call
->ackr
.firstPacket
= htonl(call
->ackr_win_bot
);
674 call
->ackr
.previousPacket
= call
->ackr_prev_seq
;
675 call
->ackr
.serial
= hdr
->serial
;
676 call
->ackr
.nAcks
= call
->ackr_pend_cnt
;
678 if (do_ACK
== RXRPC_ACK_REQUESTED
)
679 call
->ackr
.reason
= do_ACK
;
681 /* generate the ACK immediately if necessary */
682 if (special_ACK
|| force
) {
683 err
= __rxrpc_call_gen_normal_ACK(
684 call
, do_ACK
== RXRPC_ACK_DELAY
? 0 : seq
);
692 if (call
->ackr
.reason
== RXRPC_ACK_REQUESTED
)
693 call
->ackr_dfr_seq
= seq
;
695 /* start the ACK timer if not running if there are any pending deferred
697 if (call
->ackr_pend_cnt
> 0 &&
698 call
->ackr
.reason
!= RXRPC_ACK_REQUESTED
&&
699 !timer_pending(&call
->ackr_dfr_timo
)
703 timo
= rxrpc_call_dfr_ack_timeout
+ jiffies
;
705 _debug("START ACKR TIMER for cj=%lu", timo
- call
->cjif
);
707 spin_lock(&call
->lock
);
708 mod_timer(&call
->ackr_dfr_timo
, timo
);
709 spin_unlock(&call
->lock
);
711 else if ((call
->ackr_pend_cnt
== 0 ||
712 call
->ackr
.reason
== RXRPC_ACK_REQUESTED
) &&
713 timer_pending(&call
->ackr_dfr_timo
)
715 /* stop timer if no pending ACKs */
716 _debug("CLEAR ACKR TIMER");
717 del_timer_sync(&call
->ackr_dfr_timo
);
720 /* send a special ACK if one is required */
722 struct rxrpc_ackpacket ack
;
724 uint8_t acks
[1] = { RXRPC_ACK_TYPE_ACK
};
726 /* fill out the appropriate form */
727 ack
.bufferSpace
= htons(RXRPC_CALL_ACK_WINDOW_SIZE
);
728 ack
.maxSkew
= htons(min(call
->ackr_high_seq
- seq
,
730 ack
.firstPacket
= htonl(call
->ackr_win_bot
);
731 ack
.previousPacket
= call
->ackr_prev_seq
;
732 ack
.serial
= hdr
->serial
;
733 ack
.reason
= special_ACK
;
736 _proto("Rx Sending s-ACK"
737 " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
739 ntohl(ack
.firstPacket
),
740 ntohl(ack
.previousPacket
),
742 rxrpc_acks
[ack
.reason
],
745 diov
[0].iov_len
= sizeof(struct rxrpc_ackpacket
);
746 diov
[0].iov_base
= &ack
;
747 diov
[1].iov_len
= sizeof(acks
);
748 diov
[1].iov_base
= acks
;
750 /* build and send the message */
751 err
= rxrpc_conn_newmsg(call
->conn
,call
, RXRPC_PACKET_TYPE_ACK
,
752 hdr
->seq
? 2 : 1, diov
,
761 msg
->hdr
.seq
= htonl(seq
);
762 msg
->hdr
.flags
|= RXRPC_SLOW_START_OK
;
764 err
= rxrpc_conn_sendmsg(call
->conn
, msg
);
765 rxrpc_put_message(msg
);
770 call
->pkt_snd_count
++;
775 call
->ackr_prev_seq
= hdr
->seq
;
777 _leave(" = %d", ret
);
779 } /* end rxrpc_call_generate_ACK() */
781 /*****************************************************************************/
783 * handle work to be done on a call
784 * - includes packet reception and timeout processing
786 void rxrpc_call_do_stuff(struct rxrpc_call
*call
)
788 _enter("%p{flags=%lx}", call
, call
->flags
);
790 /* handle packet reception */
791 if (call
->flags
& RXRPC_CALL_RCV_PKT
) {
792 _debug("- receive packet");
793 call
->flags
&= ~RXRPC_CALL_RCV_PKT
;
794 rxrpc_call_receive_packet(call
);
797 /* handle overdue ACKs */
798 if (call
->flags
& RXRPC_CALL_ACKS_TIMO
) {
799 _debug("- overdue ACK timeout");
800 call
->flags
&= ~RXRPC_CALL_ACKS_TIMO
;
801 rxrpc_call_resend(call
, call
->snd_seq_count
);
804 /* handle lack of reception */
805 if (call
->flags
& RXRPC_CALL_RCV_TIMO
) {
806 _debug("- reception timeout");
807 call
->flags
&= ~RXRPC_CALL_RCV_TIMO
;
808 rxrpc_call_abort(call
, -EIO
);
811 /* handle deferred ACKs */
812 if (call
->flags
& RXRPC_CALL_ACKR_TIMO
||
813 (call
->ackr
.nAcks
> 0 && call
->ackr
.reason
== RXRPC_ACK_REQUESTED
)
815 _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u",
816 jiffies
- call
->cjif
,
817 rxrpc_acks
[call
->ackr
.reason
],
820 call
->flags
&= ~RXRPC_CALL_ACKR_TIMO
;
822 if (call
->ackr
.nAcks
> 0 &&
823 call
->app_call_state
!= RXRPC_CSTATE_ERROR
) {
825 __rxrpc_call_gen_normal_ACK(call
, call
->ackr_dfr_seq
);
826 call
->ackr_dfr_seq
= 0;
832 } /* end rxrpc_call_do_stuff() */
834 /*****************************************************************************/
836 * send an abort message at call or connection level
837 * - must be called with call->lock held
838 * - the supplied error code is sent as the packet data
840 static int __rxrpc_call_abort(struct rxrpc_call
*call
, int errno
)
842 struct rxrpc_connection
*conn
= call
->conn
;
843 struct rxrpc_message
*msg
;
848 _enter("%p{%08x},%p{%d},%d",
849 conn
, ntohl(conn
->conn_id
), call
, ntohl(call
->call_id
), errno
);
851 /* if this call is already aborted, then just wake up any waiters */
852 if (call
->app_call_state
== RXRPC_CSTATE_ERROR
) {
853 spin_unlock(&call
->lock
);
854 call
->app_error_func(call
);
859 rxrpc_get_call(call
);
861 /* change the state _with_ the lock still held */
862 call
->app_call_state
= RXRPC_CSTATE_ERROR
;
863 call
->app_err_state
= RXRPC_ESTATE_LOCAL_ABORT
;
864 call
->app_errno
= errno
;
865 call
->app_mark
= RXRPC_APP_MARK_EOF
;
866 call
->app_read_buf
= NULL
;
867 call
->app_async_read
= 0;
871 /* ask the app to translate the error code */
872 call
->app_aemap_func(call
);
874 spin_unlock(&call
->lock
);
876 /* flush any outstanding ACKs */
877 del_timer_sync(&call
->acks_timeout
);
878 del_timer_sync(&call
->rcv_timeout
);
879 del_timer_sync(&call
->ackr_dfr_timo
);
881 if (rxrpc_call_is_ack_pending(call
))
882 __rxrpc_call_gen_normal_ACK(call
, 0);
884 /* send the abort packet only if we actually traded some other
887 if (call
->pkt_snd_count
|| call
->pkt_rcv_count
) {
888 /* actually send the abort */
889 _proto("Rx Sending Call ABORT { data=%d }",
890 call
->app_abort_code
);
892 _error
= htonl(call
->app_abort_code
);
894 diov
[0].iov_len
= sizeof(_error
);
895 diov
[0].iov_base
= &_error
;
897 ret
= rxrpc_conn_newmsg(conn
, call
, RXRPC_PACKET_TYPE_ABORT
,
898 1, diov
, GFP_KERNEL
, &msg
);
900 ret
= rxrpc_conn_sendmsg(conn
, msg
);
901 rxrpc_put_message(msg
);
905 /* tell the app layer to let go */
906 call
->app_error_func(call
);
908 rxrpc_put_call(call
);
910 _leave(" = %d", ret
);
912 } /* end __rxrpc_call_abort() */
914 /*****************************************************************************/
916 * send an abort message at call or connection level
917 * - the supplied error code is sent as the packet data
919 int rxrpc_call_abort(struct rxrpc_call
*call
, int error
)
921 spin_lock(&call
->lock
);
923 return __rxrpc_call_abort(call
, error
);
925 } /* end rxrpc_call_abort() */
927 /*****************************************************************************/
929 * process packets waiting for this call
931 static void rxrpc_call_receive_packet(struct rxrpc_call
*call
)
933 struct rxrpc_message
*msg
;
934 struct list_head
*_p
;
938 rxrpc_get_call(call
); /* must not go away too soon if aborted by
941 while (!list_empty(&call
->rcv_receiveq
)) {
942 /* try to get next packet */
944 spin_lock(&call
->lock
);
945 if (!list_empty(&call
->rcv_receiveq
)) {
946 _p
= call
->rcv_receiveq
.next
;
949 spin_unlock(&call
->lock
);
954 msg
= list_entry(_p
, struct rxrpc_message
, link
);
956 _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)",
957 jiffies
- call
->cjif
,
958 rxrpc_pkts
[msg
->hdr
.type
],
959 ntohl(msg
->hdr
.serial
),
961 msg
->hdr
.flags
& RXRPC_JUMBO_PACKET
? 'j' : '-',
962 msg
->hdr
.flags
& RXRPC_MORE_PACKETS
? 'm' : '-',
963 msg
->hdr
.flags
& RXRPC_LAST_PACKET
? 'l' : '-',
964 msg
->hdr
.flags
& RXRPC_REQUEST_ACK
? 'r' : '-',
965 msg
->hdr
.flags
& RXRPC_CLIENT_INITIATED
? 'C' : 'S'
968 switch (msg
->hdr
.type
) {
969 /* deal with data packets */
970 case RXRPC_PACKET_TYPE_DATA
:
971 /* ACK the packet if necessary */
972 switch (rxrpc_call_generate_ACK(call
, &msg
->hdr
,
974 case 0: /* useful packet */
975 rxrpc_call_receive_data_packet(call
, msg
);
977 case 1: /* duplicate or out-of-window packet */
980 rxrpc_put_message(msg
);
985 /* deal with ACK packets */
986 case RXRPC_PACKET_TYPE_ACK
:
987 rxrpc_call_receive_ack_packet(call
, msg
);
990 /* deal with abort packets */
991 case RXRPC_PACKET_TYPE_ABORT
: {
994 dp
= skb_header_pointer(msg
->pkt
, msg
->offset
,
995 sizeof(_dbuf
), &_dbuf
);
997 printk("Rx Received short ABORT packet\n");
999 _proto("Rx Received Call ABORT { data=%d }",
1000 (dp
? ntohl(*dp
) : 0));
1002 spin_lock(&call
->lock
);
1003 call
->app_call_state
= RXRPC_CSTATE_ERROR
;
1004 call
->app_err_state
= RXRPC_ESTATE_PEER_ABORT
;
1005 call
->app_abort_code
= (dp
? ntohl(*dp
) : 0);
1006 call
->app_errno
= -ECONNABORTED
;
1007 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1008 call
->app_read_buf
= NULL
;
1009 call
->app_async_read
= 0;
1011 /* ask the app to translate the error code */
1012 call
->app_aemap_func(call
);
1014 spin_unlock(&call
->lock
);
1015 call
->app_error_func(call
);
1019 /* deal with other packet types */
1020 _proto("Rx Unsupported packet type %u (#%u)",
1021 msg
->hdr
.type
, msg
->seq
);
1025 rxrpc_put_message(msg
);
1029 rxrpc_put_call(call
);
1031 } /* end rxrpc_call_receive_packet() */
1033 /*****************************************************************************/
1035 * process next data packet
1036 * - as the next data packet arrives:
1037 * - it is queued on app_readyq _if_ it is the next one expected
1039 * - it is queued on app_unreadyq _if_ it is not the next one expected
1040 * - if a packet placed on app_readyq completely fills a hole leading up to
1041 * the first packet on app_unreadyq, then packets now in sequence are
1042 * tranferred to app_readyq
1043 * - the application layer can only see packets on app_readyq
1044 * (app_ready_qty bytes)
1045 * - the application layer is prodded every time a new packet arrives
1047 static void rxrpc_call_receive_data_packet(struct rxrpc_call
*call
,
1048 struct rxrpc_message
*msg
)
1050 const struct rxrpc_operation
*optbl
, *op
;
1051 struct rxrpc_message
*pmsg
;
1052 struct list_head
*_p
;
1053 int ret
, lo
, hi
, rmtimo
;
1056 _enter("%p{%u},%p{%u}", call
, ntohl(call
->call_id
), msg
, msg
->seq
);
1058 rxrpc_get_message(msg
);
1060 /* add to the unready queue if we'd have to create a hole in the ready
1061 * queue otherwise */
1062 if (msg
->seq
!= call
->app_ready_seq
+ 1) {
1063 _debug("Call add packet %d to unreadyq", msg
->seq
);
1065 /* insert in seq order */
1066 list_for_each(_p
, &call
->app_unreadyq
) {
1067 pmsg
= list_entry(_p
, struct rxrpc_message
, link
);
1068 if (pmsg
->seq
> msg
->seq
)
1072 list_add_tail(&msg
->link
, _p
);
1074 _leave(" [unreadyq]");
1078 /* next in sequence - simply append into the call's ready queue */
1079 _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)",
1080 msg
->seq
, msg
->dsize
, call
->app_ready_qty
);
1082 spin_lock(&call
->lock
);
1083 call
->app_ready_seq
= msg
->seq
;
1084 call
->app_ready_qty
+= msg
->dsize
;
1085 list_add_tail(&msg
->link
, &call
->app_readyq
);
1087 /* move unready packets to the readyq if we got rid of a hole */
1088 while (!list_empty(&call
->app_unreadyq
)) {
1089 pmsg
= list_entry(call
->app_unreadyq
.next
,
1090 struct rxrpc_message
, link
);
1092 if (pmsg
->seq
!= call
->app_ready_seq
+ 1)
1095 /* next in sequence - just move list-to-list */
1096 _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)",
1097 pmsg
->seq
, pmsg
->dsize
, call
->app_ready_qty
);
1099 call
->app_ready_seq
= pmsg
->seq
;
1100 call
->app_ready_qty
+= pmsg
->dsize
;
1101 list_del_init(&pmsg
->link
);
1102 list_add_tail(&pmsg
->link
, &call
->app_readyq
);
1105 /* see if we've got the last packet yet */
1106 if (!list_empty(&call
->app_readyq
)) {
1107 pmsg
= list_entry(call
->app_readyq
.prev
,
1108 struct rxrpc_message
, link
);
1109 if (pmsg
->hdr
.flags
& RXRPC_LAST_PACKET
) {
1110 call
->app_last_rcv
= 1;
1111 _debug("Last packet on readyq");
1115 switch (call
->app_call_state
) {
1116 /* do nothing if call already aborted */
1117 case RXRPC_CSTATE_ERROR
:
1118 spin_unlock(&call
->lock
);
1122 /* extract the operation ID from an incoming call if that's not
1124 case RXRPC_CSTATE_SRVR_RCV_OPID
:
1125 spin_unlock(&call
->lock
);
1127 /* handle as yet insufficient data for the operation ID */
1128 if (call
->app_ready_qty
< 4) {
1129 if (call
->app_last_rcv
)
1130 /* trouble - last packet seen */
1131 rxrpc_call_abort(call
, -EINVAL
);
1137 /* pull the operation ID out of the buffer */
1138 ret
= rxrpc_call_read_data(call
, &opid
, sizeof(opid
), 0);
1140 printk("Unexpected error from read-data: %d\n", ret
);
1141 if (call
->app_call_state
!= RXRPC_CSTATE_ERROR
)
1142 rxrpc_call_abort(call
, ret
);
1146 call
->app_opcode
= ntohl(opid
);
1148 /* locate the operation in the available ops table */
1149 optbl
= call
->conn
->service
->ops_begin
;
1151 hi
= call
->conn
->service
->ops_end
- optbl
;
1154 int mid
= (hi
+ lo
) / 2;
1156 if (call
->app_opcode
== op
->id
)
1158 if (call
->app_opcode
> op
->id
)
1165 kproto("Rx Client requested operation %d from %s service",
1166 call
->app_opcode
, call
->conn
->service
->name
);
1167 rxrpc_call_abort(call
, -EINVAL
);
1172 _proto("Rx Client requested operation %s from %s service",
1173 op
->name
, call
->conn
->service
->name
);
1175 /* we're now waiting for the argument block (unless the call
1177 spin_lock(&call
->lock
);
1178 if (call
->app_call_state
== RXRPC_CSTATE_SRVR_RCV_OPID
||
1179 call
->app_call_state
== RXRPC_CSTATE_SRVR_SND_REPLY
) {
1180 if (!call
->app_last_rcv
)
1181 call
->app_call_state
=
1182 RXRPC_CSTATE_SRVR_RCV_ARGS
;
1183 else if (call
->app_ready_qty
> 0)
1184 call
->app_call_state
=
1185 RXRPC_CSTATE_SRVR_GOT_ARGS
;
1187 call
->app_call_state
=
1188 RXRPC_CSTATE_SRVR_SND_REPLY
;
1189 call
->app_mark
= op
->asize
;
1190 call
->app_user
= op
->user
;
1192 spin_unlock(&call
->lock
);
1197 case RXRPC_CSTATE_SRVR_RCV_ARGS
:
1198 /* change state if just received last packet of arg block */
1199 if (call
->app_last_rcv
)
1200 call
->app_call_state
= RXRPC_CSTATE_SRVR_GOT_ARGS
;
1201 spin_unlock(&call
->lock
);
1206 case RXRPC_CSTATE_CLNT_RCV_REPLY
:
1207 /* change state if just received last packet of reply block */
1209 if (call
->app_last_rcv
) {
1210 call
->app_call_state
= RXRPC_CSTATE_CLNT_GOT_REPLY
;
1213 spin_unlock(&call
->lock
);
1216 del_timer_sync(&call
->acks_timeout
);
1217 del_timer_sync(&call
->rcv_timeout
);
1218 del_timer_sync(&call
->ackr_dfr_timo
);
1225 /* deal with data reception in an unexpected state */
1226 printk("Unexpected state [[[ %u ]]]\n", call
->app_call_state
);
1227 __rxrpc_call_abort(call
, -EBADMSG
);
1232 if (call
->app_call_state
== RXRPC_CSTATE_CLNT_RCV_REPLY
&&
1236 /* otherwise just invoke the data function whenever we can satisfy its desire for more
1239 _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s",
1240 call
->app_call_state
, call
->app_ready_qty
, call
->app_mark
,
1241 call
->app_last_rcv
? " last-rcvd" : "");
1243 spin_lock(&call
->lock
);
1245 ret
= __rxrpc_call_read_data(call
);
1248 spin_unlock(&call
->lock
);
1249 call
->app_attn_func(call
);
1252 spin_unlock(&call
->lock
);
1255 spin_unlock(&call
->lock
);
1258 __rxrpc_call_abort(call
, ret
);
1266 } /* end rxrpc_call_receive_data_packet() */
1268 /*****************************************************************************/
1270 * received an ACK packet
1272 static void rxrpc_call_receive_ack_packet(struct rxrpc_call
*call
,
1273 struct rxrpc_message
*msg
)
1275 struct rxrpc_ackpacket _ack
, *ap
;
1276 rxrpc_serial_net_t serial
;
1280 _enter("%p{%u},%p{%u}", call
, ntohl(call
->call_id
), msg
, msg
->seq
);
1282 /* extract the basic ACK record */
1283 ap
= skb_header_pointer(msg
->pkt
, msg
->offset
, sizeof(_ack
), &_ack
);
1285 printk("Rx Received short ACK packet\n");
1288 msg
->offset
+= sizeof(_ack
);
1290 serial
= ap
->serial
;
1291 seq
= ntohl(ap
->firstPacket
);
1293 _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }",
1294 ntohl(msg
->hdr
.serial
),
1295 ntohs(ap
->bufferSpace
),
1298 ntohl(ap
->previousPacket
),
1300 rxrpc_acks
[ap
->reason
],
1304 /* check the other side isn't ACK'ing a sequence number I haven't sent
1306 if (ap
->nAcks
> 0 &&
1307 (seq
> call
->snd_seq_count
||
1308 seq
+ ap
->nAcks
- 1 > call
->snd_seq_count
)) {
1309 printk("Received ACK (#%u-#%u) for unsent packet\n",
1310 seq
, seq
+ ap
->nAcks
- 1);
1311 rxrpc_call_abort(call
, -EINVAL
);
1316 /* deal with RTT calculation */
1318 struct rxrpc_message
*rttmsg
;
1320 /* find the prompting packet */
1321 spin_lock(&call
->lock
);
1322 if (call
->snd_ping
&& call
->snd_ping
->hdr
.serial
== serial
) {
1323 /* it was a ping packet */
1324 rttmsg
= call
->snd_ping
;
1325 call
->snd_ping
= NULL
;
1326 spin_unlock(&call
->lock
);
1329 rttmsg
->rttdone
= 1;
1330 rxrpc_peer_calculate_rtt(call
->conn
->peer
,
1332 rxrpc_put_message(rttmsg
);
1336 struct list_head
*_p
;
1338 /* it ought to be a data packet - look in the pending
1340 list_for_each(_p
, &call
->acks_pendq
) {
1341 rttmsg
= list_entry(_p
, struct rxrpc_message
,
1343 if (rttmsg
->hdr
.serial
== serial
) {
1344 if (rttmsg
->rttdone
)
1345 /* never do RTT twice without
1349 rttmsg
->rttdone
= 1;
1350 rxrpc_peer_calculate_rtt(
1351 call
->conn
->peer
, rttmsg
, msg
);
1355 spin_unlock(&call
->lock
);
1359 switch (ap
->reason
) {
1360 /* deal with negative/positive acknowledgement of data
1362 case RXRPC_ACK_REQUESTED
:
1363 case RXRPC_ACK_DELAY
:
1364 case RXRPC_ACK_IDLE
:
1365 rxrpc_call_definitively_ACK(call
, seq
- 1);
1367 case RXRPC_ACK_DUPLICATE
:
1368 case RXRPC_ACK_OUT_OF_SEQUENCE
:
1369 case RXRPC_ACK_EXCEEDS_WINDOW
:
1370 call
->snd_resend_cnt
= 0;
1371 ret
= rxrpc_call_record_ACK(call
, msg
, seq
, ap
->nAcks
);
1373 rxrpc_call_abort(call
, ret
);
1376 /* respond to ping packets immediately */
1377 case RXRPC_ACK_PING
:
1378 rxrpc_call_generate_ACK(call
, &msg
->hdr
, ap
);
1381 /* only record RTT on ping response packets */
1382 case RXRPC_ACK_PING_RESPONSE
:
1383 if (call
->snd_ping
) {
1384 struct rxrpc_message
*rttmsg
;
1386 /* only do RTT stuff if the response matches the
1389 spin_lock(&call
->lock
);
1390 if (call
->snd_ping
&&
1391 call
->snd_ping
->hdr
.serial
== ap
->serial
) {
1392 rttmsg
= call
->snd_ping
;
1393 call
->snd_ping
= NULL
;
1395 spin_unlock(&call
->lock
);
1398 rttmsg
->rttdone
= 1;
1399 rxrpc_peer_calculate_rtt(call
->conn
->peer
,
1401 rxrpc_put_message(rttmsg
);
1407 printk("Unsupported ACK reason %u\n", ap
->reason
);
1412 } /* end rxrpc_call_receive_ack_packet() */
1414 /*****************************************************************************/
1416 * record definitive ACKs for all messages up to and including the one with the
1419 static void rxrpc_call_definitively_ACK(struct rxrpc_call
*call
,
1420 rxrpc_seq_t highest
)
1422 struct rxrpc_message
*msg
;
1425 _enter("%p{ads=%u},%u", call
, call
->acks_dftv_seq
, highest
);
1427 while (call
->acks_dftv_seq
< highest
) {
1428 call
->acks_dftv_seq
++;
1430 _proto("Definitive ACK on packet #%u", call
->acks_dftv_seq
);
1432 /* discard those at front of queue until message with highest
1434 spin_lock(&call
->lock
);
1436 if (!list_empty(&call
->acks_pendq
)) {
1437 msg
= list_entry(call
->acks_pendq
.next
,
1438 struct rxrpc_message
, link
);
1439 list_del_init(&msg
->link
); /* dequeue */
1440 if (msg
->state
== RXRPC_MSG_SENT
)
1441 call
->acks_pend_cnt
--;
1443 spin_unlock(&call
->lock
);
1445 /* insanity check */
1447 panic("%s(): acks_pendq unexpectedly empty\n",
1450 if (msg
->seq
!= call
->acks_dftv_seq
)
1451 panic("%s(): Packet #%u expected at front of acks_pendq"
1453 __FUNCTION__
, call
->acks_dftv_seq
, msg
->seq
);
1455 /* discard the message */
1456 msg
->state
= RXRPC_MSG_DONE
;
1457 rxrpc_put_message(msg
);
1460 /* if all sent packets are definitively ACK'd then prod any sleepers just in case */
1462 spin_lock(&call
->lock
);
1463 if (call
->acks_dftv_seq
== call
->snd_seq_count
) {
1464 if (call
->app_call_state
!= RXRPC_CSTATE_COMPLETE
) {
1465 call
->app_call_state
= RXRPC_CSTATE_COMPLETE
;
1470 spin_unlock(&call
->lock
);
1473 del_timer_sync(&call
->acks_timeout
);
1474 del_timer_sync(&call
->rcv_timeout
);
1475 del_timer_sync(&call
->ackr_dfr_timo
);
1476 call
->app_attn_func(call
);
1480 } /* end rxrpc_call_definitively_ACK() */
1482 /*****************************************************************************/
1484 * record the specified amount of ACKs/NAKs
1486 static int rxrpc_call_record_ACK(struct rxrpc_call
*call
,
1487 struct rxrpc_message
*msg
,
1491 struct rxrpc_message
*dmsg
;
1492 struct list_head
*_p
;
1493 rxrpc_seq_t highest
;
1496 char resend
, now_complete
;
1499 _enter("%p{apc=%u ads=%u},%p,%u,%Zu",
1500 call
, call
->acks_pend_cnt
, call
->acks_dftv_seq
,
1503 /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order
1505 if (seq
<= call
->acks_dftv_seq
) {
1506 unsigned delta
= call
->acks_dftv_seq
- seq
;
1508 if (count
<= delta
) {
1509 _leave(" = 0 [all definitively ACK'd]");
1515 msg
->offset
+= delta
;
1518 highest
= seq
+ count
- 1;
1521 /* extract up to 16 ACK slots at a time */
1522 chunk
= min(count
, sizeof(acks
));
1525 memset(acks
, 2, sizeof(acks
));
1527 if (skb_copy_bits(msg
->pkt
, msg
->offset
, &acks
, chunk
) < 0) {
1528 printk("Rx Received short ACK packet\n");
1529 _leave(" = -EINVAL");
1532 msg
->offset
+= chunk
;
1534 /* check that the ACK set is valid */
1535 for (ix
= 0; ix
< chunk
; ix
++) {
1537 case RXRPC_ACK_TYPE_ACK
:
1539 case RXRPC_ACK_TYPE_NACK
:
1543 printk("Rx Received unsupported ACK state"
1545 _leave(" = -EINVAL");
1550 _proto("Rx ACK of packets #%u-#%u "
1551 "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)",
1552 seq
, (unsigned) (seq
+ chunk
- 1),
1553 _acktype
[acks
[0x0]],
1554 _acktype
[acks
[0x1]],
1555 _acktype
[acks
[0x2]],
1556 _acktype
[acks
[0x3]],
1557 _acktype
[acks
[0x4]],
1558 _acktype
[acks
[0x5]],
1559 _acktype
[acks
[0x6]],
1560 _acktype
[acks
[0x7]],
1561 _acktype
[acks
[0x8]],
1562 _acktype
[acks
[0x9]],
1563 _acktype
[acks
[0xA]],
1564 _acktype
[acks
[0xB]],
1565 _acktype
[acks
[0xC]],
1566 _acktype
[acks
[0xD]],
1567 _acktype
[acks
[0xE]],
1568 _acktype
[acks
[0xF]],
1572 /* mark the packets in the ACK queue as being provisionally
1575 spin_lock(&call
->lock
);
1577 /* find the first packet ACK'd/NAK'd here */
1578 list_for_each(_p
, &call
->acks_pendq
) {
1579 dmsg
= list_entry(_p
, struct rxrpc_message
, link
);
1580 if (dmsg
->seq
== seq
)
1582 _debug("- %u: skipping #%u", ix
, dmsg
->seq
);
1588 _debug("- %u: processing #%u (%c) apc=%u",
1589 ix
, dmsg
->seq
, _acktype
[acks
[ix
]],
1590 call
->acks_pend_cnt
);
1592 if (acks
[ix
] == RXRPC_ACK_TYPE_ACK
) {
1593 if (dmsg
->state
== RXRPC_MSG_SENT
)
1594 call
->acks_pend_cnt
--;
1595 dmsg
->state
= RXRPC_MSG_ACKED
;
1598 if (dmsg
->state
== RXRPC_MSG_ACKED
)
1599 call
->acks_pend_cnt
++;
1600 dmsg
->state
= RXRPC_MSG_SENT
;
1605 _p
= dmsg
->link
.next
;
1606 dmsg
= list_entry(_p
, struct rxrpc_message
, link
);
1607 } while(ix
< chunk
&&
1608 _p
!= &call
->acks_pendq
&&
1614 spin_unlock(&call
->lock
);
1618 rxrpc_call_resend(call
, highest
);
1620 /* if all packets are provisionally ACK'd, then wake up anyone who's
1621 * waiting for that */
1623 spin_lock(&call
->lock
);
1624 if (call
->acks_pend_cnt
== 0) {
1625 if (call
->app_call_state
== RXRPC_CSTATE_SRVR_RCV_FINAL_ACK
) {
1626 call
->app_call_state
= RXRPC_CSTATE_COMPLETE
;
1631 spin_unlock(&call
->lock
);
1634 _debug("- wake up waiters");
1635 del_timer_sync(&call
->acks_timeout
);
1636 del_timer_sync(&call
->rcv_timeout
);
1637 del_timer_sync(&call
->ackr_dfr_timo
);
1638 call
->app_attn_func(call
);
1641 _leave(" = 0 (apc=%u)", call
->acks_pend_cnt
);
1645 panic("%s(): acks_pendq in bad state (packet #%u absent)\n",
1648 } /* end rxrpc_call_record_ACK() */
1650 /*****************************************************************************/
1652 * transfer data from the ready packet queue to the asynchronous read buffer
1653 * - since this func is the only one going to look at packets queued on
1654 * app_readyq, we don't need a lock to modify or access them, only to modify
1655 * the queue pointers
1656 * - called with call->lock held
1657 * - the buffer must be in kernel space
1659 * 0 if buffer filled
1660 * -EAGAIN if buffer not filled and more data to come
1661 * -EBADMSG if last packet received and insufficient data left
1662 * -ECONNABORTED if the call has in an error state
1664 static int __rxrpc_call_read_data(struct rxrpc_call
*call
)
1666 struct rxrpc_message
*msg
;
1670 _enter("%p{as=%d buf=%p qty=%Zu/%Zu}",
1672 call
->app_async_read
, call
->app_read_buf
,
1673 call
->app_ready_qty
, call
->app_mark
);
1675 /* check the state */
1676 switch (call
->app_call_state
) {
1677 case RXRPC_CSTATE_SRVR_RCV_ARGS
:
1678 case RXRPC_CSTATE_CLNT_RCV_REPLY
:
1679 if (call
->app_last_rcv
) {
1680 printk("%s(%p,%p,%Zd):"
1681 " Inconsistent call state (%s, last pkt)",
1683 call
, call
->app_read_buf
, call
->app_mark
,
1684 rxrpc_call_states
[call
->app_call_state
]);
1689 case RXRPC_CSTATE_SRVR_RCV_OPID
:
1690 case RXRPC_CSTATE_SRVR_GOT_ARGS
:
1691 case RXRPC_CSTATE_CLNT_GOT_REPLY
:
1694 case RXRPC_CSTATE_SRVR_SND_REPLY
:
1695 if (!call
->app_last_rcv
) {
1696 printk("%s(%p,%p,%Zd):"
1697 " Inconsistent call state (%s, not last pkt)",
1699 call
, call
->app_read_buf
, call
->app_mark
,
1700 rxrpc_call_states
[call
->app_call_state
]);
1703 _debug("Trying to read data from call in SND_REPLY state");
1706 case RXRPC_CSTATE_ERROR
:
1707 _leave(" = -ECONNABORTED");
1708 return -ECONNABORTED
;
1711 printk("reading in unexpected state [[[ %u ]]]\n",
1712 call
->app_call_state
);
1716 /* handle the case of not having an async buffer */
1717 if (!call
->app_async_read
) {
1718 if (call
->app_mark
== RXRPC_APP_MARK_EOF
) {
1719 ret
= call
->app_last_rcv
? 0 : -EAGAIN
;
1722 if (call
->app_mark
>= call
->app_ready_qty
) {
1723 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1727 ret
= call
->app_last_rcv
? -EBADMSG
: -EAGAIN
;
1731 _leave(" = %d [no buf]", ret
);
1735 while (!list_empty(&call
->app_readyq
) && call
->app_mark
> 0) {
1736 msg
= list_entry(call
->app_readyq
.next
,
1737 struct rxrpc_message
, link
);
1739 /* drag as much data as we need out of this packet */
1740 qty
= min(call
->app_mark
, msg
->dsize
);
1742 _debug("reading %Zu from skb=%p off=%lu",
1743 qty
, msg
->pkt
, msg
->offset
);
1745 if (call
->app_read_buf
)
1746 if (skb_copy_bits(msg
->pkt
, msg
->offset
,
1747 call
->app_read_buf
, qty
) < 0)
1748 panic("%s: Failed to copy data from packet:"
1751 call
, call
->app_read_buf
, qty
);
1753 /* if that packet is now empty, discard it */
1754 call
->app_ready_qty
-= qty
;
1757 if (msg
->dsize
== 0) {
1758 list_del_init(&msg
->link
);
1759 rxrpc_put_message(msg
);
1765 call
->app_mark
-= qty
;
1766 if (call
->app_read_buf
)
1767 call
->app_read_buf
+= qty
;
1770 if (call
->app_mark
== 0) {
1771 call
->app_async_read
= 0;
1772 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1773 call
->app_read_buf
= NULL
;
1775 /* adjust the state if used up all packets */
1776 if (list_empty(&call
->app_readyq
) && call
->app_last_rcv
) {
1777 switch (call
->app_call_state
) {
1778 case RXRPC_CSTATE_SRVR_RCV_OPID
:
1779 call
->app_call_state
= RXRPC_CSTATE_SRVR_SND_REPLY
;
1780 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1782 del_timer_sync(&call
->rcv_timeout
);
1784 case RXRPC_CSTATE_SRVR_GOT_ARGS
:
1785 call
->app_call_state
= RXRPC_CSTATE_SRVR_SND_REPLY
;
1787 del_timer_sync(&call
->rcv_timeout
);
1790 call
->app_call_state
= RXRPC_CSTATE_COMPLETE
;
1792 del_timer_sync(&call
->acks_timeout
);
1793 del_timer_sync(&call
->ackr_dfr_timo
);
1794 del_timer_sync(&call
->rcv_timeout
);
1803 if (call
->app_last_rcv
) {
1804 _debug("Insufficient data (%Zu/%Zu)",
1805 call
->app_ready_qty
, call
->app_mark
);
1806 call
->app_async_read
= 0;
1807 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1808 call
->app_read_buf
= NULL
;
1810 _leave(" = -EBADMSG");
1814 _leave(" = -EAGAIN");
1816 } /* end __rxrpc_call_read_data() */
1818 /*****************************************************************************/
1820 * attempt to read the specified amount of data from the call's ready queue
1821 * into the buffer provided
1822 * - since this func is the only one going to look at packets queued on
1823 * app_readyq, we don't need a lock to modify or access them, only to modify
1824 * the queue pointers
1825 * - if the buffer pointer is NULL, then data is merely drained, not copied
1826 * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is
1827 * enough data or an error will be generated
1828 * - note that the caller must have added the calling task to the call's wait
1830 * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this
1831 * function doesn't read all available data
1833 int rxrpc_call_read_data(struct rxrpc_call
*call
,
1834 void *buffer
, size_t size
, int flags
)
1838 _enter("%p{arq=%Zu},%p,%Zd,%x",
1839 call
, call
->app_ready_qty
, buffer
, size
, flags
);
1841 spin_lock(&call
->lock
);
1843 if (unlikely(!!call
->app_read_buf
)) {
1844 spin_unlock(&call
->lock
);
1845 _leave(" = -EBUSY");
1849 call
->app_mark
= size
;
1850 call
->app_read_buf
= buffer
;
1851 call
->app_async_read
= 1;
1852 call
->app_read_count
++;
1854 /* read as much data as possible */
1855 ret
= __rxrpc_call_read_data(call
);
1858 if (flags
& RXRPC_CALL_READ_ALL
&&
1859 (!call
->app_last_rcv
|| call
->app_ready_qty
> 0)) {
1860 _leave(" = -EBADMSG");
1861 __rxrpc_call_abort(call
, -EBADMSG
);
1865 spin_unlock(&call
->lock
);
1866 call
->app_attn_func(call
);
1871 spin_unlock(&call
->lock
);
1872 _leave(" = %d [aborted]", ret
);
1876 __rxrpc_call_abort(call
, ret
);
1877 _leave(" = %d", ret
);
1881 spin_unlock(&call
->lock
);
1883 if (!(flags
& RXRPC_CALL_READ_BLOCK
)) {
1884 _leave(" = -EAGAIN");
1888 /* wait for the data to arrive */
1889 _debug("blocking for data arrival");
1892 set_current_state(TASK_INTERRUPTIBLE
);
1893 if (!call
->app_async_read
|| signal_pending(current
))
1897 set_current_state(TASK_RUNNING
);
1899 if (signal_pending(current
)) {
1900 _leave(" = -EINTR");
1904 if (call
->app_call_state
== RXRPC_CSTATE_ERROR
) {
1905 _leave(" = -ECONNABORTED");
1906 return -ECONNABORTED
;
1913 } /* end rxrpc_call_read_data() */
1915 /*****************************************************************************/
1917 * write data to a call
1918 * - the data may not be sent immediately if it doesn't fill a buffer
1919 * - if we can't queue all the data for buffering now, siov[] will have been
1920 * adjusted to take account of what has been sent
1922 int rxrpc_call_write_data(struct rxrpc_call
*call
,
1930 struct rxrpc_message
*msg
;
1932 size_t space
, size
, chunk
, tmp
;
1936 _enter("%p,%Zu,%p,%02x,%x,%d,%p",
1937 call
, sioc
, siov
, rxhdr_flags
, alloc_flags
, dup_data
,
1944 /* can't send more if we've sent last packet from this end */
1945 switch (call
->app_call_state
) {
1946 case RXRPC_CSTATE_SRVR_SND_REPLY
:
1947 case RXRPC_CSTATE_CLNT_SND_ARGS
:
1949 case RXRPC_CSTATE_ERROR
:
1950 ret
= call
->app_errno
;
1955 /* calculate how much data we've been given */
1957 for (; sioc
> 0; sptr
++, sioc
--) {
1961 if (!sptr
->iov_base
)
1964 size
+= sptr
->iov_len
;
1967 _debug("- size=%Zu mtu=%Zu", size
, call
->conn
->mtu_size
);
1970 /* make sure there's a message under construction */
1971 if (!call
->snd_nextmsg
) {
1972 /* no - allocate a message with no data yet attached */
1973 ret
= rxrpc_conn_newmsg(call
->conn
, call
,
1974 RXRPC_PACKET_TYPE_DATA
,
1975 0, NULL
, alloc_flags
,
1976 &call
->snd_nextmsg
);
1979 _debug("- allocated new message [ds=%Zu]",
1980 call
->snd_nextmsg
->dsize
);
1983 msg
= call
->snd_nextmsg
;
1984 msg
->hdr
.flags
|= rxhdr_flags
;
1986 /* deal with zero-length terminal packet */
1988 if (rxhdr_flags
& RXRPC_LAST_PACKET
) {
1989 ret
= rxrpc_call_flush(call
);
1996 /* work out how much space current packet has available */
1997 space
= call
->conn
->mtu_size
- msg
->dsize
;
1998 chunk
= min(space
, size
);
2000 _debug("- [before] space=%Zu chunk=%Zu", space
, chunk
);
2002 while (!siov
->iov_len
)
2005 /* if we are going to have to duplicate the data then coalesce
2008 /* don't allocate more that 1 page at a time */
2009 if (chunk
> PAGE_SIZE
)
2012 /* allocate a data buffer and attach to the message */
2013 buf
= kmalloc(chunk
, alloc_flags
);
2014 if (unlikely(!buf
)) {
2016 sizeof(struct rxrpc_header
)) {
2017 /* discard an empty msg and wind back
2018 * the seq counter */
2019 rxrpc_put_message(msg
);
2020 call
->snd_nextmsg
= NULL
;
2021 call
->snd_seq_count
--;
2028 tmp
= msg
->dcount
++;
2029 set_bit(tmp
, &msg
->dfree
);
2030 msg
->data
[tmp
].iov_base
= buf
;
2031 msg
->data
[tmp
].iov_len
= chunk
;
2032 msg
->dsize
+= chunk
;
2033 *size_sent
+= chunk
;
2036 /* load the buffer with data */
2038 tmp
= min(chunk
, siov
->iov_len
);
2039 memcpy(buf
, siov
->iov_base
, tmp
);
2041 siov
->iov_base
+= tmp
;
2042 siov
->iov_len
-= tmp
;
2049 /* we want to attach the supplied buffers directly */
2051 msg
->dcount
< RXRPC_MSG_MAX_IOCS
) {
2052 tmp
= msg
->dcount
++;
2053 msg
->data
[tmp
].iov_base
= siov
->iov_base
;
2054 msg
->data
[tmp
].iov_len
= siov
->iov_len
;
2055 msg
->dsize
+= siov
->iov_len
;
2056 *size_sent
+= siov
->iov_len
;
2057 size
-= siov
->iov_len
;
2058 chunk
-= siov
->iov_len
;
2063 _debug("- [loaded] chunk=%Zu size=%Zu", chunk
, size
);
2065 /* dispatch the message when full, final or requesting ACK */
2066 if (msg
->dsize
>= call
->conn
->mtu_size
|| rxhdr_flags
) {
2067 ret
= rxrpc_call_flush(call
);
2076 _leave(" = %d (%Zd queued, %Zd rem)", ret
, *size_sent
, size
);
2079 } /* end rxrpc_call_write_data() */
2081 /*****************************************************************************/
2083 * flush outstanding packets to the network
2085 static int rxrpc_call_flush(struct rxrpc_call
*call
)
2087 struct rxrpc_message
*msg
;
2092 rxrpc_get_call(call
);
2094 /* if there's a packet under construction, then dispatch it now */
2095 if (call
->snd_nextmsg
) {
2096 msg
= call
->snd_nextmsg
;
2097 call
->snd_nextmsg
= NULL
;
2099 if (msg
->hdr
.flags
& RXRPC_LAST_PACKET
) {
2100 msg
->hdr
.flags
&= ~RXRPC_MORE_PACKETS
;
2101 if (call
->app_call_state
!= RXRPC_CSTATE_CLNT_SND_ARGS
)
2102 msg
->hdr
.flags
|= RXRPC_REQUEST_ACK
;
2105 msg
->hdr
.flags
|= RXRPC_MORE_PACKETS
;
2108 _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }",
2109 msg
->dsize
, msg
->dcount
, msg
->dfree
);
2111 /* queue and adjust call state */
2112 spin_lock(&call
->lock
);
2113 list_add_tail(&msg
->link
, &call
->acks_pendq
);
2115 /* decide what to do depending on current state and if this is
2116 * the last packet */
2118 switch (call
->app_call_state
) {
2119 case RXRPC_CSTATE_SRVR_SND_REPLY
:
2120 if (msg
->hdr
.flags
& RXRPC_LAST_PACKET
) {
2121 call
->app_call_state
=
2122 RXRPC_CSTATE_SRVR_RCV_FINAL_ACK
;
2127 case RXRPC_CSTATE_CLNT_SND_ARGS
:
2128 if (msg
->hdr
.flags
& RXRPC_LAST_PACKET
) {
2129 call
->app_call_state
=
2130 RXRPC_CSTATE_CLNT_RCV_REPLY
;
2135 case RXRPC_CSTATE_ERROR
:
2136 ret
= call
->app_errno
;
2138 spin_unlock(&call
->lock
);
2142 call
->acks_pend_cnt
++;
2144 mod_timer(&call
->acks_timeout
,
2145 __rxrpc_rtt_based_timeout(call
,
2146 rxrpc_call_acks_timeout
));
2148 spin_unlock(&call
->lock
);
2150 ret
= rxrpc_conn_sendmsg(call
->conn
, msg
);
2152 call
->pkt_snd_count
++;
2156 rxrpc_put_call(call
);
2158 _leave(" = %d", ret
);
2161 } /* end rxrpc_call_flush() */
2163 /*****************************************************************************/
2165 * resend NAK'd or unacknowledged packets up to the highest one specified
2167 static void rxrpc_call_resend(struct rxrpc_call
*call
, rxrpc_seq_t highest
)
2169 struct rxrpc_message
*msg
;
2170 struct list_head
*_p
;
2171 rxrpc_seq_t seq
= 0;
2173 _enter("%p,%u", call
, highest
);
2175 _proto("Rx Resend required");
2177 /* handle too many resends */
2178 if (call
->snd_resend_cnt
>= rxrpc_call_max_resend
) {
2179 _debug("Aborting due to too many resends (rcv=%d)",
2180 call
->pkt_rcv_count
);
2181 rxrpc_call_abort(call
,
2182 call
->pkt_rcv_count
> 0 ? -EIO
: -ETIMEDOUT
);
2187 spin_lock(&call
->lock
);
2188 call
->snd_resend_cnt
++;
2190 /* determine which the next packet we might need to ACK is */
2191 if (seq
<= call
->acks_dftv_seq
)
2192 seq
= call
->acks_dftv_seq
;
2198 /* look for the packet in the pending-ACK queue */
2199 list_for_each(_p
, &call
->acks_pendq
) {
2200 msg
= list_entry(_p
, struct rxrpc_message
, link
);
2201 if (msg
->seq
== seq
)
2206 " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n",
2207 __FUNCTION__
, call
, highest
,
2208 call
->acks_dftv_seq
, call
->snd_seq_count
, seq
);
2211 if (msg
->state
!= RXRPC_MSG_SENT
)
2212 continue; /* only un-ACK'd packets */
2214 rxrpc_get_message(msg
);
2215 spin_unlock(&call
->lock
);
2217 /* send each message again (and ignore any errors we might
2219 _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }",
2220 msg
->dsize
, msg
->dcount
, msg
->dfree
);
2222 if (rxrpc_conn_sendmsg(call
->conn
, msg
) == 0)
2223 call
->pkt_snd_count
++;
2225 rxrpc_put_message(msg
);
2227 spin_lock(&call
->lock
);
2230 /* reset the timeout */
2231 mod_timer(&call
->acks_timeout
,
2232 __rxrpc_rtt_based_timeout(call
, rxrpc_call_acks_timeout
));
2234 spin_unlock(&call
->lock
);
2237 } /* end rxrpc_call_resend() */
2239 /*****************************************************************************/
2241 * handle an ICMP error being applied to a call
2243 void rxrpc_call_handle_error(struct rxrpc_call
*call
, int local
, int errno
)
2245 _enter("%p{%u},%d", call
, ntohl(call
->call_id
), errno
);
2247 /* if this call is already aborted, then just wake up any waiters */
2248 if (call
->app_call_state
== RXRPC_CSTATE_ERROR
) {
2249 call
->app_error_func(call
);
2252 /* tell the app layer what happened */
2253 spin_lock(&call
->lock
);
2254 call
->app_call_state
= RXRPC_CSTATE_ERROR
;
2257 call
->app_err_state
= RXRPC_ESTATE_LOCAL_ERROR
;
2259 call
->app_err_state
= RXRPC_ESTATE_REMOTE_ERROR
;
2260 call
->app_errno
= errno
;
2261 call
->app_mark
= RXRPC_APP_MARK_EOF
;
2262 call
->app_read_buf
= NULL
;
2263 call
->app_async_read
= 0;
2266 call
->app_aemap_func(call
);
2268 del_timer_sync(&call
->acks_timeout
);
2269 del_timer_sync(&call
->rcv_timeout
);
2270 del_timer_sync(&call
->ackr_dfr_timo
);
2272 spin_unlock(&call
->lock
);
2274 call
->app_error_func(call
);
2278 } /* end rxrpc_call_handle_error() */