GUI: Fix Tomato RAF theme for all builds. Compilation typo.
[tomato.git] / release / src-rt-6.x.4708 / linux / linux-2.6.36 / net / sunrpc / xprtrdma / verbs.c
blobc0a0fb16c4f61dda5fade6176db54ef7cf558ed8
1 /*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * verbs.c
43 * Encapsulates the major functions managing:
44 * o adapters
45 * o endpoints
46 * o connections
47 * o buffer memory
50 #include <linux/pci.h> /* for Tavor hack below */
51 #include <linux/slab.h>
53 #include "xprt_rdma.h"
56 * Globals/Macros
59 #ifdef RPC_DEBUG
60 # define RPCDBG_FACILITY RPCDBG_TRANS
61 #endif
64 * internal functions
68 * handle replies in tasklet context, using a single, global list
69 * rdma tasklet function -- just turn around and call the func
70 * for all replies on the list
73 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
74 static LIST_HEAD(rpcrdma_tasklets_g);
76 static void
77 rpcrdma_run_tasklet(unsigned long data)
79 struct rpcrdma_rep *rep;
80 void (*func)(struct rpcrdma_rep *);
81 unsigned long flags;
83 data = data;
84 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
85 while (!list_empty(&rpcrdma_tasklets_g)) {
86 rep = list_entry(rpcrdma_tasklets_g.next,
87 struct rpcrdma_rep, rr_list);
88 list_del(&rep->rr_list);
89 func = rep->rr_func;
90 rep->rr_func = NULL;
91 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 if (func)
94 func(rep);
95 else
96 rpcrdma_recv_buffer_put(rep);
98 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
103 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105 static inline void
106 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 unsigned long flags;
110 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
111 list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
112 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
113 tasklet_schedule(&rpcrdma_tasklet_g);
116 static void
117 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 struct rpcrdma_ep *ep = context;
121 dprintk("RPC: %s: QP error %X on device %s ep %p\n",
122 __func__, event->event, event->device->name, context);
123 if (ep->rep_connected == 1) {
124 ep->rep_connected = -EIO;
125 ep->rep_func(ep);
126 wake_up_all(&ep->rep_connect_wait);
130 static void
131 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 struct rpcrdma_ep *ep = context;
135 dprintk("RPC: %s: CQ error %X on device %s ep %p\n",
136 __func__, event->event, event->device->name, context);
137 if (ep->rep_connected == 1) {
138 ep->rep_connected = -EIO;
139 ep->rep_func(ep);
140 wake_up_all(&ep->rep_connect_wait);
144 static inline
145 void rpcrdma_event_process(struct ib_wc *wc)
147 struct rpcrdma_rep *rep =
148 (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
150 dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
151 __func__, rep, wc->status, wc->opcode, wc->byte_len);
153 if (!rep) /* send or bind completion that we don't care about */
154 return;
156 if (IB_WC_SUCCESS != wc->status) {
157 dprintk("RPC: %s: %s WC status %X, connection lost\n",
158 __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
159 wc->status);
160 rep->rr_len = ~0U;
161 rpcrdma_schedule_tasklet(rep);
162 return;
165 switch (wc->opcode) {
166 case IB_WC_RECV:
167 rep->rr_len = wc->byte_len;
168 ib_dma_sync_single_for_cpu(
169 rdmab_to_ia(rep->rr_buffer)->ri_id->device,
170 rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
171 /* Keep (only) the most recent credits, after check validity */
172 if (rep->rr_len >= 16) {
173 struct rpcrdma_msg *p =
174 (struct rpcrdma_msg *) rep->rr_base;
175 unsigned int credits = ntohl(p->rm_credit);
176 if (credits == 0) {
177 dprintk("RPC: %s: server"
178 " dropped credits to 0!\n", __func__);
179 /* don't deadlock */
180 credits = 1;
181 } else if (credits > rep->rr_buffer->rb_max_requests) {
182 dprintk("RPC: %s: server"
183 " over-crediting: %d (%d)\n",
184 __func__, credits,
185 rep->rr_buffer->rb_max_requests);
186 credits = rep->rr_buffer->rb_max_requests;
188 atomic_set(&rep->rr_buffer->rb_credits, credits);
190 /* fall through */
191 case IB_WC_BIND_MW:
192 rpcrdma_schedule_tasklet(rep);
193 break;
194 default:
195 dprintk("RPC: %s: unexpected WC event %X\n",
196 __func__, wc->opcode);
197 break;
201 static inline int
202 rpcrdma_cq_poll(struct ib_cq *cq)
204 struct ib_wc wc;
205 int rc;
207 for (;;) {
208 rc = ib_poll_cq(cq, 1, &wc);
209 if (rc < 0) {
210 dprintk("RPC: %s: ib_poll_cq failed %i\n",
211 __func__, rc);
212 return rc;
214 if (rc == 0)
215 break;
217 rpcrdma_event_process(&wc);
220 return 0;
224 * rpcrdma_cq_event_upcall
226 * This upcall handles recv, send, bind and unbind events.
227 * It is reentrant but processes single events in order to maintain
228 * ordering of receives to keep server credits.
230 * It is the responsibility of the scheduled tasklet to return
231 * recv buffers to the pool. NOTE: this affects synchronization of
232 * connection shutdown. That is, the structures required for
233 * the completion of the reply handler must remain intact until
234 * all memory has been reclaimed.
236 * Note that send events are suppressed and do not result in an upcall.
238 static void
239 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
241 int rc;
243 rc = rpcrdma_cq_poll(cq);
244 if (rc)
245 return;
247 rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
248 if (rc) {
249 dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
250 __func__, rc);
251 return;
254 rpcrdma_cq_poll(cq);
257 #ifdef RPC_DEBUG
258 static const char * const conn[] = {
259 "address resolved",
260 "address error",
261 "route resolved",
262 "route error",
263 "connect request",
264 "connect response",
265 "connect error",
266 "unreachable",
267 "rejected",
268 "established",
269 "disconnected",
270 "device removal"
272 #endif
274 static int
275 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
277 struct rpcrdma_xprt *xprt = id->context;
278 struct rpcrdma_ia *ia = &xprt->rx_ia;
279 struct rpcrdma_ep *ep = &xprt->rx_ep;
280 #ifdef RPC_DEBUG
281 struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
282 #endif
283 struct ib_qp_attr attr;
284 struct ib_qp_init_attr iattr;
285 int connstate = 0;
287 switch (event->event) {
288 case RDMA_CM_EVENT_ADDR_RESOLVED:
289 case RDMA_CM_EVENT_ROUTE_RESOLVED:
290 ia->ri_async_rc = 0;
291 complete(&ia->ri_done);
292 break;
293 case RDMA_CM_EVENT_ADDR_ERROR:
294 ia->ri_async_rc = -EHOSTUNREACH;
295 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
296 __func__, ep);
297 complete(&ia->ri_done);
298 break;
299 case RDMA_CM_EVENT_ROUTE_ERROR:
300 ia->ri_async_rc = -ENETUNREACH;
301 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
302 __func__, ep);
303 complete(&ia->ri_done);
304 break;
305 case RDMA_CM_EVENT_ESTABLISHED:
306 connstate = 1;
307 ib_query_qp(ia->ri_id->qp, &attr,
308 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
309 &iattr);
310 dprintk("RPC: %s: %d responder resources"
311 " (%d initiator)\n",
312 __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
313 goto connected;
314 case RDMA_CM_EVENT_CONNECT_ERROR:
315 connstate = -ENOTCONN;
316 goto connected;
317 case RDMA_CM_EVENT_UNREACHABLE:
318 connstate = -ENETDOWN;
319 goto connected;
320 case RDMA_CM_EVENT_REJECTED:
321 connstate = -ECONNREFUSED;
322 goto connected;
323 case RDMA_CM_EVENT_DISCONNECTED:
324 connstate = -ECONNABORTED;
325 goto connected;
326 case RDMA_CM_EVENT_DEVICE_REMOVAL:
327 connstate = -ENODEV;
328 connected:
329 dprintk("RPC: %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
330 __func__,
331 (event->event <= 11) ? conn[event->event] :
332 "unknown connection error",
333 &addr->sin_addr.s_addr,
334 ntohs(addr->sin_port),
335 ep, event->event);
336 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
337 dprintk("RPC: %s: %sconnected\n",
338 __func__, connstate > 0 ? "" : "dis");
339 ep->rep_connected = connstate;
340 ep->rep_func(ep);
341 wake_up_all(&ep->rep_connect_wait);
342 break;
343 default:
344 dprintk("RPC: %s: unexpected CM event %d\n",
345 __func__, event->event);
346 break;
349 #ifdef RPC_DEBUG
350 if (connstate == 1) {
351 int ird = attr.max_dest_rd_atomic;
352 int tird = ep->rep_remote_cma.responder_resources;
353 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
354 "on %s, memreg %d slots %d ird %d%s\n",
355 &addr->sin_addr.s_addr,
356 ntohs(addr->sin_port),
357 ia->ri_id->device->name,
358 ia->ri_memreg_strategy,
359 xprt->rx_buf.rb_max_requests,
360 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
361 } else if (connstate < 0) {
362 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
363 &addr->sin_addr.s_addr,
364 ntohs(addr->sin_port),
365 connstate);
367 #endif
369 return 0;
372 static struct rdma_cm_id *
373 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
374 struct rpcrdma_ia *ia, struct sockaddr *addr)
376 struct rdma_cm_id *id;
377 int rc;
379 init_completion(&ia->ri_done);
381 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
382 if (IS_ERR(id)) {
383 rc = PTR_ERR(id);
384 dprintk("RPC: %s: rdma_create_id() failed %i\n",
385 __func__, rc);
386 return id;
389 ia->ri_async_rc = -ETIMEDOUT;
390 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
391 if (rc) {
392 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
393 __func__, rc);
394 goto out;
396 wait_for_completion_interruptible_timeout(&ia->ri_done,
397 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
398 rc = ia->ri_async_rc;
399 if (rc)
400 goto out;
402 ia->ri_async_rc = -ETIMEDOUT;
403 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
404 if (rc) {
405 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
406 __func__, rc);
407 goto out;
409 wait_for_completion_interruptible_timeout(&ia->ri_done,
410 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
411 rc = ia->ri_async_rc;
412 if (rc)
413 goto out;
415 return id;
417 out:
418 rdma_destroy_id(id);
419 return ERR_PTR(rc);
423 * Drain any cq, prior to teardown.
425 static void
426 rpcrdma_clean_cq(struct ib_cq *cq)
428 struct ib_wc wc;
429 int count = 0;
431 while (1 == ib_poll_cq(cq, 1, &wc))
432 ++count;
434 if (count)
435 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
436 __func__, count, wc.opcode);
440 * Exported functions.
444 * Open and initialize an Interface Adapter.
445 * o initializes fields of struct rpcrdma_ia, including
446 * interface and provider attributes and protection zone.
449 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
451 int rc, mem_priv;
452 struct ib_device_attr devattr;
453 struct rpcrdma_ia *ia = &xprt->rx_ia;
455 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
456 if (IS_ERR(ia->ri_id)) {
457 rc = PTR_ERR(ia->ri_id);
458 goto out1;
461 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
462 if (IS_ERR(ia->ri_pd)) {
463 rc = PTR_ERR(ia->ri_pd);
464 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
465 __func__, rc);
466 goto out2;
470 * Query the device to determine if the requested memory
471 * registration strategy is supported. If it isn't, set the
472 * strategy to a globally supported model.
474 rc = ib_query_device(ia->ri_id->device, &devattr);
475 if (rc) {
476 dprintk("RPC: %s: ib_query_device failed %d\n",
477 __func__, rc);
478 goto out2;
481 if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
482 ia->ri_have_dma_lkey = 1;
483 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
486 switch (memreg) {
487 case RPCRDMA_MEMWINDOWS:
488 case RPCRDMA_MEMWINDOWS_ASYNC:
489 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
490 dprintk("RPC: %s: MEMWINDOWS registration "
491 "specified but not supported by adapter, "
492 "using slower RPCRDMA_REGISTER\n",
493 __func__);
494 memreg = RPCRDMA_REGISTER;
496 break;
497 case RPCRDMA_MTHCAFMR:
498 if (!ia->ri_id->device->alloc_fmr) {
499 #if RPCRDMA_PERSISTENT_REGISTRATION
500 dprintk("RPC: %s: MTHCAFMR registration "
501 "specified but not supported by adapter, "
502 "using riskier RPCRDMA_ALLPHYSICAL\n",
503 __func__);
504 memreg = RPCRDMA_ALLPHYSICAL;
505 #else
506 dprintk("RPC: %s: MTHCAFMR registration "
507 "specified but not supported by adapter, "
508 "using slower RPCRDMA_REGISTER\n",
509 __func__);
510 memreg = RPCRDMA_REGISTER;
511 #endif
513 break;
514 case RPCRDMA_FRMR:
515 /* Requires both frmr reg and local dma lkey */
516 if ((devattr.device_cap_flags &
517 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
518 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
519 #if RPCRDMA_PERSISTENT_REGISTRATION
520 dprintk("RPC: %s: FRMR registration "
521 "specified but not supported by adapter, "
522 "using riskier RPCRDMA_ALLPHYSICAL\n",
523 __func__);
524 memreg = RPCRDMA_ALLPHYSICAL;
525 #else
526 dprintk("RPC: %s: FRMR registration "
527 "specified but not supported by adapter, "
528 "using slower RPCRDMA_REGISTER\n",
529 __func__);
530 memreg = RPCRDMA_REGISTER;
531 #endif
533 break;
537 * Optionally obtain an underlying physical identity mapping in
538 * order to do a memory window-based bind. This base registration
539 * is protected from remote access - that is enabled only by binding
540 * for the specific bytes targeted during each RPC operation, and
541 * revoked after the corresponding completion similar to a storage
542 * adapter.
544 switch (memreg) {
545 case RPCRDMA_BOUNCEBUFFERS:
546 case RPCRDMA_REGISTER:
547 case RPCRDMA_FRMR:
548 break;
549 #if RPCRDMA_PERSISTENT_REGISTRATION
550 case RPCRDMA_ALLPHYSICAL:
551 mem_priv = IB_ACCESS_LOCAL_WRITE |
552 IB_ACCESS_REMOTE_WRITE |
553 IB_ACCESS_REMOTE_READ;
554 goto register_setup;
555 #endif
556 case RPCRDMA_MEMWINDOWS_ASYNC:
557 case RPCRDMA_MEMWINDOWS:
558 mem_priv = IB_ACCESS_LOCAL_WRITE |
559 IB_ACCESS_MW_BIND;
560 goto register_setup;
561 case RPCRDMA_MTHCAFMR:
562 if (ia->ri_have_dma_lkey)
563 break;
564 mem_priv = IB_ACCESS_LOCAL_WRITE;
565 register_setup:
566 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
567 if (IS_ERR(ia->ri_bind_mem)) {
568 printk(KERN_ALERT "%s: ib_get_dma_mr for "
569 "phys register failed with %lX\n\t"
570 "Will continue with degraded performance\n",
571 __func__, PTR_ERR(ia->ri_bind_mem));
572 memreg = RPCRDMA_REGISTER;
573 ia->ri_bind_mem = NULL;
575 break;
576 default:
577 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
578 __func__, memreg);
579 rc = -EINVAL;
580 goto out2;
582 dprintk("RPC: %s: memory registration strategy is %d\n",
583 __func__, memreg);
585 /* Else will do memory reg/dereg for each chunk */
586 ia->ri_memreg_strategy = memreg;
588 return 0;
589 out2:
590 rdma_destroy_id(ia->ri_id);
591 ia->ri_id = NULL;
592 out1:
593 return rc;
597 * Clean up/close an IA.
598 * o if event handles and PD have been initialized, free them.
599 * o close the IA
601 void
602 rpcrdma_ia_close(struct rpcrdma_ia *ia)
604 int rc;
606 dprintk("RPC: %s: entering\n", __func__);
607 if (ia->ri_bind_mem != NULL) {
608 rc = ib_dereg_mr(ia->ri_bind_mem);
609 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
610 __func__, rc);
612 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
613 if (ia->ri_id->qp)
614 rdma_destroy_qp(ia->ri_id);
615 rdma_destroy_id(ia->ri_id);
616 ia->ri_id = NULL;
618 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
619 rc = ib_dealloc_pd(ia->ri_pd);
620 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
621 __func__, rc);
626 * Create unconnected endpoint.
629 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
630 struct rpcrdma_create_data_internal *cdata)
632 struct ib_device_attr devattr;
633 int rc, err;
635 rc = ib_query_device(ia->ri_id->device, &devattr);
636 if (rc) {
637 dprintk("RPC: %s: ib_query_device failed %d\n",
638 __func__, rc);
639 return rc;
642 /* check provider's send/recv wr limits */
643 if (cdata->max_requests > devattr.max_qp_wr)
644 cdata->max_requests = devattr.max_qp_wr;
646 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
647 ep->rep_attr.qp_context = ep;
648 /* send_cq and recv_cq initialized below */
649 ep->rep_attr.srq = NULL;
650 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
651 switch (ia->ri_memreg_strategy) {
652 case RPCRDMA_FRMR:
653 /* Add room for frmr register and invalidate WRs.
654 * 1. FRMR reg WR for head
655 * 2. FRMR invalidate WR for head
656 * 3. FRMR reg WR for pagelist
657 * 4. FRMR invalidate WR for pagelist
658 * 5. FRMR reg WR for tail
659 * 6. FRMR invalidate WR for tail
660 * 7. The RDMA_SEND WR
662 ep->rep_attr.cap.max_send_wr *= 7;
663 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
664 cdata->max_requests = devattr.max_qp_wr / 7;
665 if (!cdata->max_requests)
666 return -EINVAL;
667 ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7;
669 break;
670 case RPCRDMA_MEMWINDOWS_ASYNC:
671 case RPCRDMA_MEMWINDOWS:
672 /* Add room for mw_binds+unbinds - overkill! */
673 ep->rep_attr.cap.max_send_wr++;
674 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
675 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
676 return -EINVAL;
677 break;
678 default:
679 break;
681 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
682 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
683 ep->rep_attr.cap.max_recv_sge = 1;
684 ep->rep_attr.cap.max_inline_data = 0;
685 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
686 ep->rep_attr.qp_type = IB_QPT_RC;
687 ep->rep_attr.port_num = ~0;
689 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
690 "iovs: send %d recv %d\n",
691 __func__,
692 ep->rep_attr.cap.max_send_wr,
693 ep->rep_attr.cap.max_recv_wr,
694 ep->rep_attr.cap.max_send_sge,
695 ep->rep_attr.cap.max_recv_sge);
697 /* set trigger for requesting send completion */
698 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
699 switch (ia->ri_memreg_strategy) {
700 case RPCRDMA_MEMWINDOWS_ASYNC:
701 case RPCRDMA_MEMWINDOWS:
702 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
703 break;
704 default:
705 break;
707 if (ep->rep_cqinit <= 2)
708 ep->rep_cqinit = 0;
709 INIT_CQCOUNT(ep);
710 ep->rep_ia = ia;
711 init_waitqueue_head(&ep->rep_connect_wait);
714 * Create a single cq for receive dto and mw_bind (only ever
715 * care about unbind, really). Send completions are suppressed.
716 * Use single threaded tasklet upcalls to maintain ordering.
718 ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
719 rpcrdma_cq_async_error_upcall, NULL,
720 ep->rep_attr.cap.max_recv_wr +
721 ep->rep_attr.cap.max_send_wr + 1, 0);
722 if (IS_ERR(ep->rep_cq)) {
723 rc = PTR_ERR(ep->rep_cq);
724 dprintk("RPC: %s: ib_create_cq failed: %i\n",
725 __func__, rc);
726 goto out1;
729 rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
730 if (rc) {
731 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
732 __func__, rc);
733 goto out2;
736 ep->rep_attr.send_cq = ep->rep_cq;
737 ep->rep_attr.recv_cq = ep->rep_cq;
739 /* Initialize cma parameters */
741 /* RPC/RDMA does not use private data */
742 ep->rep_remote_cma.private_data = NULL;
743 ep->rep_remote_cma.private_data_len = 0;
745 /* Client offers RDMA Read but does not initiate */
746 ep->rep_remote_cma.initiator_depth = 0;
747 if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
748 ep->rep_remote_cma.responder_resources = 0;
749 else if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
750 ep->rep_remote_cma.responder_resources = 32;
751 else
752 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
754 ep->rep_remote_cma.retry_count = 7;
755 ep->rep_remote_cma.flow_control = 0;
756 ep->rep_remote_cma.rnr_retry_count = 0;
758 return 0;
760 out2:
761 err = ib_destroy_cq(ep->rep_cq);
762 if (err)
763 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
764 __func__, err);
765 out1:
766 return rc;
770 * rpcrdma_ep_destroy
772 * Disconnect and destroy endpoint. After this, the only
773 * valid operations on the ep are to free it (if dynamically
774 * allocated) or re-create it.
776 * The caller's error handling must be sure to not leak the endpoint
777 * if this function fails.
780 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
782 int rc;
784 dprintk("RPC: %s: entering, connected is %d\n",
785 __func__, ep->rep_connected);
787 if (ia->ri_id->qp) {
788 rc = rpcrdma_ep_disconnect(ep, ia);
789 if (rc)
790 dprintk("RPC: %s: rpcrdma_ep_disconnect"
791 " returned %i\n", __func__, rc);
792 rdma_destroy_qp(ia->ri_id);
793 ia->ri_id->qp = NULL;
796 /* padding - could be done in rpcrdma_buffer_destroy... */
797 if (ep->rep_pad_mr) {
798 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
799 ep->rep_pad_mr = NULL;
802 rpcrdma_clean_cq(ep->rep_cq);
803 rc = ib_destroy_cq(ep->rep_cq);
804 if (rc)
805 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
806 __func__, rc);
808 return rc;
812 * Connect unconnected endpoint.
815 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
817 struct rdma_cm_id *id;
818 int rc = 0;
819 int retry_count = 0;
821 if (ep->rep_connected != 0) {
822 struct rpcrdma_xprt *xprt;
823 retry:
824 rc = rpcrdma_ep_disconnect(ep, ia);
825 if (rc && rc != -ENOTCONN)
826 dprintk("RPC: %s: rpcrdma_ep_disconnect"
827 " status %i\n", __func__, rc);
828 rpcrdma_clean_cq(ep->rep_cq);
830 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
831 id = rpcrdma_create_id(xprt, ia,
832 (struct sockaddr *)&xprt->rx_data.addr);
833 if (IS_ERR(id)) {
834 rc = PTR_ERR(id);
835 goto out;
837 /* TEMP TEMP TEMP - fail if new device:
838 * Deregister/remarshal *all* requests!
839 * Close and recreate adapter, pd, etc!
840 * Re-determine all attributes still sane!
841 * More stuff I haven't thought of!
842 * Rrrgh!
844 if (ia->ri_id->device != id->device) {
845 printk("RPC: %s: can't reconnect on "
846 "different device!\n", __func__);
847 rdma_destroy_id(id);
848 rc = -ENETDOWN;
849 goto out;
851 /* END TEMP */
852 rdma_destroy_qp(ia->ri_id);
853 rdma_destroy_id(ia->ri_id);
854 ia->ri_id = id;
857 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
858 if (rc) {
859 dprintk("RPC: %s: rdma_create_qp failed %i\n",
860 __func__, rc);
861 goto out;
864 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
865 struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
866 if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
867 (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
868 pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
869 struct ib_qp_attr attr = {
870 .path_mtu = IB_MTU_1024
872 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
876 ep->rep_connected = 0;
878 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
879 if (rc) {
880 dprintk("RPC: %s: rdma_connect() failed with %i\n",
881 __func__, rc);
882 goto out;
885 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
888 * Check state. A non-peer reject indicates no listener
889 * (ECONNREFUSED), which may be a transient state. All
890 * others indicate a transport condition which has already
891 * undergone a best-effort.
893 if (ep->rep_connected == -ECONNREFUSED &&
894 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
895 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
896 goto retry;
898 if (ep->rep_connected <= 0) {
899 /* Sometimes, the only way to reliably connect to remote
900 * CMs is to use same nonzero values for ORD and IRD. */
901 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
902 (ep->rep_remote_cma.responder_resources == 0 ||
903 ep->rep_remote_cma.initiator_depth !=
904 ep->rep_remote_cma.responder_resources)) {
905 if (ep->rep_remote_cma.responder_resources == 0)
906 ep->rep_remote_cma.responder_resources = 1;
907 ep->rep_remote_cma.initiator_depth =
908 ep->rep_remote_cma.responder_resources;
909 goto retry;
911 rc = ep->rep_connected;
912 } else {
913 dprintk("RPC: %s: connected\n", __func__);
916 out:
917 if (rc)
918 ep->rep_connected = rc;
919 return rc;
923 * rpcrdma_ep_disconnect
925 * This is separate from destroy to facilitate the ability
926 * to reconnect without recreating the endpoint.
928 * This call is not reentrant, and must not be made in parallel
929 * on the same endpoint.
932 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
934 int rc;
936 rpcrdma_clean_cq(ep->rep_cq);
937 rc = rdma_disconnect(ia->ri_id);
938 if (!rc) {
939 /* returns without wait if not connected */
940 wait_event_interruptible(ep->rep_connect_wait,
941 ep->rep_connected != 1);
942 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
943 (ep->rep_connected == 1) ? "still " : "dis");
944 } else {
945 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
946 ep->rep_connected = rc;
948 return rc;
952 * Initialize buffer memory
955 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
956 struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
958 char *p;
959 size_t len;
960 int i, rc;
961 struct rpcrdma_mw *r;
963 buf->rb_max_requests = cdata->max_requests;
964 spin_lock_init(&buf->rb_lock);
965 atomic_set(&buf->rb_credits, 1);
967 /* Need to allocate:
968 * 1. arrays for send and recv pointers
969 * 2. arrays of struct rpcrdma_req to fill in pointers
970 * 3. array of struct rpcrdma_rep for replies
971 * 4. padding, if any
972 * 5. mw's, fmr's or frmr's, if any
973 * Send/recv buffers in req/rep need to be registered
976 len = buf->rb_max_requests *
977 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
978 len += cdata->padding;
979 switch (ia->ri_memreg_strategy) {
980 case RPCRDMA_FRMR:
981 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
982 sizeof(struct rpcrdma_mw);
983 break;
984 case RPCRDMA_MTHCAFMR:
985 /* TBD we are perhaps overallocating here */
986 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
987 sizeof(struct rpcrdma_mw);
988 break;
989 case RPCRDMA_MEMWINDOWS_ASYNC:
990 case RPCRDMA_MEMWINDOWS:
991 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
992 sizeof(struct rpcrdma_mw);
993 break;
994 default:
995 break;
998 /* allocate 1, 4 and 5 in one shot */
999 p = kzalloc(len, GFP_KERNEL);
1000 if (p == NULL) {
1001 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1002 __func__, len);
1003 rc = -ENOMEM;
1004 goto out;
1006 buf->rb_pool = p; /* for freeing it later */
1008 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1009 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1010 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1011 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1014 * Register the zeroed pad buffer, if any.
1016 if (cdata->padding) {
1017 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1018 &ep->rep_pad_mr, &ep->rep_pad);
1019 if (rc)
1020 goto out;
1022 p += cdata->padding;
1025 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1026 * We "cycle" the mw's in order to minimize rkey reuse,
1027 * and also reduce unbind-to-bind collision.
1029 INIT_LIST_HEAD(&buf->rb_mws);
1030 r = (struct rpcrdma_mw *)p;
1031 switch (ia->ri_memreg_strategy) {
1032 case RPCRDMA_FRMR:
1033 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1034 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1035 RPCRDMA_MAX_SEGS);
1036 if (IS_ERR(r->r.frmr.fr_mr)) {
1037 rc = PTR_ERR(r->r.frmr.fr_mr);
1038 dprintk("RPC: %s: ib_alloc_fast_reg_mr"
1039 " failed %i\n", __func__, rc);
1040 goto out;
1042 r->r.frmr.fr_pgl =
1043 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1044 RPCRDMA_MAX_SEGS);
1045 if (IS_ERR(r->r.frmr.fr_pgl)) {
1046 rc = PTR_ERR(r->r.frmr.fr_pgl);
1047 dprintk("RPC: %s: "
1048 "ib_alloc_fast_reg_page_list "
1049 "failed %i\n", __func__, rc);
1050 goto out;
1052 list_add(&r->mw_list, &buf->rb_mws);
1053 ++r;
1055 break;
1056 case RPCRDMA_MTHCAFMR:
1057 /* TBD we are perhaps overallocating here */
1058 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1059 static struct ib_fmr_attr fa =
1060 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1061 r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1062 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1063 &fa);
1064 if (IS_ERR(r->r.fmr)) {
1065 rc = PTR_ERR(r->r.fmr);
1066 dprintk("RPC: %s: ib_alloc_fmr"
1067 " failed %i\n", __func__, rc);
1068 goto out;
1070 list_add(&r->mw_list, &buf->rb_mws);
1071 ++r;
1073 break;
1074 case RPCRDMA_MEMWINDOWS_ASYNC:
1075 case RPCRDMA_MEMWINDOWS:
1076 /* Allocate one extra request's worth, for full cycling */
1077 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1078 r->r.mw = ib_alloc_mw(ia->ri_pd);
1079 if (IS_ERR(r->r.mw)) {
1080 rc = PTR_ERR(r->r.mw);
1081 dprintk("RPC: %s: ib_alloc_mw"
1082 " failed %i\n", __func__, rc);
1083 goto out;
1085 list_add(&r->mw_list, &buf->rb_mws);
1086 ++r;
1088 break;
1089 default:
1090 break;
1094 * Allocate/init the request/reply buffers. Doing this
1095 * using kmalloc for now -- one for each buf.
1097 for (i = 0; i < buf->rb_max_requests; i++) {
1098 struct rpcrdma_req *req;
1099 struct rpcrdma_rep *rep;
1101 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1102 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1103 /* Typical ~2400b, so rounding up saves work later */
1104 if (len < 4096)
1105 len = 4096;
1106 req = kmalloc(len, GFP_KERNEL);
1107 if (req == NULL) {
1108 dprintk("RPC: %s: request buffer %d alloc"
1109 " failed\n", __func__, i);
1110 rc = -ENOMEM;
1111 goto out;
1113 memset(req, 0, sizeof(struct rpcrdma_req));
1114 buf->rb_send_bufs[i] = req;
1115 buf->rb_send_bufs[i]->rl_buffer = buf;
1117 rc = rpcrdma_register_internal(ia, req->rl_base,
1118 len - offsetof(struct rpcrdma_req, rl_base),
1119 &buf->rb_send_bufs[i]->rl_handle,
1120 &buf->rb_send_bufs[i]->rl_iov);
1121 if (rc)
1122 goto out;
1124 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1126 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1127 rep = kmalloc(len, GFP_KERNEL);
1128 if (rep == NULL) {
1129 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1130 __func__, i);
1131 rc = -ENOMEM;
1132 goto out;
1134 memset(rep, 0, sizeof(struct rpcrdma_rep));
1135 buf->rb_recv_bufs[i] = rep;
1136 buf->rb_recv_bufs[i]->rr_buffer = buf;
1137 init_waitqueue_head(&rep->rr_unbind);
1139 rc = rpcrdma_register_internal(ia, rep->rr_base,
1140 len - offsetof(struct rpcrdma_rep, rr_base),
1141 &buf->rb_recv_bufs[i]->rr_handle,
1142 &buf->rb_recv_bufs[i]->rr_iov);
1143 if (rc)
1144 goto out;
1147 dprintk("RPC: %s: max_requests %d\n",
1148 __func__, buf->rb_max_requests);
1149 /* done */
1150 return 0;
1151 out:
1152 rpcrdma_buffer_destroy(buf);
1153 return rc;
1157 * Unregister and destroy buffer memory. Need to deal with
1158 * partial initialization, so it's callable from failed create.
1159 * Must be called before destroying endpoint, as registrations
1160 * reference it.
1162 void
1163 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1165 int rc, i;
1166 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1167 struct rpcrdma_mw *r;
1169 /* clean up in reverse order from create
1170 * 1. recv mr memory (mr free, then kfree)
1171 * 1a. bind mw memory
1172 * 2. send mr memory (mr free, then kfree)
1173 * 3. padding (if any) [moved to rpcrdma_ep_destroy]
1174 * 4. arrays
1176 dprintk("RPC: %s: entering\n", __func__);
1178 for (i = 0; i < buf->rb_max_requests; i++) {
1179 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1180 rpcrdma_deregister_internal(ia,
1181 buf->rb_recv_bufs[i]->rr_handle,
1182 &buf->rb_recv_bufs[i]->rr_iov);
1183 kfree(buf->rb_recv_bufs[i]);
1185 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1186 while (!list_empty(&buf->rb_mws)) {
1187 r = list_entry(buf->rb_mws.next,
1188 struct rpcrdma_mw, mw_list);
1189 list_del(&r->mw_list);
1190 switch (ia->ri_memreg_strategy) {
1191 case RPCRDMA_FRMR:
1192 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1193 if (rc)
1194 dprintk("RPC: %s:"
1195 " ib_dereg_mr"
1196 " failed %i\n",
1197 __func__, rc);
1198 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1199 break;
1200 case RPCRDMA_MTHCAFMR:
1201 rc = ib_dealloc_fmr(r->r.fmr);
1202 if (rc)
1203 dprintk("RPC: %s:"
1204 " ib_dealloc_fmr"
1205 " failed %i\n",
1206 __func__, rc);
1207 break;
1208 case RPCRDMA_MEMWINDOWS_ASYNC:
1209 case RPCRDMA_MEMWINDOWS:
1210 rc = ib_dealloc_mw(r->r.mw);
1211 if (rc)
1212 dprintk("RPC: %s:"
1213 " ib_dealloc_mw"
1214 " failed %i\n",
1215 __func__, rc);
1216 break;
1217 default:
1218 break;
1221 rpcrdma_deregister_internal(ia,
1222 buf->rb_send_bufs[i]->rl_handle,
1223 &buf->rb_send_bufs[i]->rl_iov);
1224 kfree(buf->rb_send_bufs[i]);
1228 kfree(buf->rb_pool);
1232 * Get a set of request/reply buffers.
1234 * Reply buffer (if needed) is attached to send buffer upon return.
1235 * Rule:
1236 * rb_send_index and rb_recv_index MUST always be pointing to the
1237 * *next* available buffer (non-NULL). They are incremented after
1238 * removing buffers, and decremented *before* returning them.
1240 struct rpcrdma_req *
1241 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1243 struct rpcrdma_req *req;
1244 unsigned long flags;
1245 int i;
1246 struct rpcrdma_mw *r;
1248 spin_lock_irqsave(&buffers->rb_lock, flags);
1249 if (buffers->rb_send_index == buffers->rb_max_requests) {
1250 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1251 dprintk("RPC: %s: out of request buffers\n", __func__);
1252 return ((struct rpcrdma_req *)NULL);
1255 req = buffers->rb_send_bufs[buffers->rb_send_index];
1256 if (buffers->rb_send_index < buffers->rb_recv_index) {
1257 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1258 __func__,
1259 buffers->rb_recv_index - buffers->rb_send_index);
1260 req->rl_reply = NULL;
1261 } else {
1262 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1263 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1265 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1266 if (!list_empty(&buffers->rb_mws)) {
1267 i = RPCRDMA_MAX_SEGS - 1;
1268 do {
1269 r = list_entry(buffers->rb_mws.next,
1270 struct rpcrdma_mw, mw_list);
1271 list_del(&r->mw_list);
1272 req->rl_segments[i].mr_chunk.rl_mw = r;
1273 } while (--i >= 0);
1275 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1276 return req;
1280 * Put request/reply buffers back into pool.
1281 * Pre-decrement counter/array index.
1283 void
1284 rpcrdma_buffer_put(struct rpcrdma_req *req)
1286 struct rpcrdma_buffer *buffers = req->rl_buffer;
1287 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1288 int i;
1289 unsigned long flags;
1291 BUG_ON(req->rl_nchunks != 0);
1292 spin_lock_irqsave(&buffers->rb_lock, flags);
1293 buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1294 req->rl_niovs = 0;
1295 if (req->rl_reply) {
1296 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1297 init_waitqueue_head(&req->rl_reply->rr_unbind);
1298 req->rl_reply->rr_func = NULL;
1299 req->rl_reply = NULL;
1301 switch (ia->ri_memreg_strategy) {
1302 case RPCRDMA_FRMR:
1303 case RPCRDMA_MTHCAFMR:
1304 case RPCRDMA_MEMWINDOWS_ASYNC:
1305 case RPCRDMA_MEMWINDOWS:
1307 * Cycle mw's back in reverse order, and "spin" them.
1308 * This delays and scrambles reuse as much as possible.
1310 i = 1;
1311 do {
1312 struct rpcrdma_mw **mw;
1313 mw = &req->rl_segments[i].mr_chunk.rl_mw;
1314 list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1315 *mw = NULL;
1316 } while (++i < RPCRDMA_MAX_SEGS);
1317 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1318 &buffers->rb_mws);
1319 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1320 break;
1321 default:
1322 break;
1324 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1328 * Recover reply buffers from pool.
1329 * This happens when recovering from error conditions.
1330 * Post-increment counter/array index.
1332 void
1333 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1335 struct rpcrdma_buffer *buffers = req->rl_buffer;
1336 unsigned long flags;
1338 if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
1339 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1340 spin_lock_irqsave(&buffers->rb_lock, flags);
1341 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1342 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1343 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1345 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1349 * Put reply buffers back into pool when not attached to
1350 * request. This happens in error conditions, and when
1351 * aborting unbinds. Pre-decrement counter/array index.
1353 void
1354 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1356 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1357 unsigned long flags;
1359 rep->rr_func = NULL;
1360 spin_lock_irqsave(&buffers->rb_lock, flags);
1361 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1362 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1366 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1370 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1371 struct ib_mr **mrp, struct ib_sge *iov)
1373 struct ib_phys_buf ipb;
1374 struct ib_mr *mr;
1375 int rc;
1378 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1380 iov->addr = ib_dma_map_single(ia->ri_id->device,
1381 va, len, DMA_BIDIRECTIONAL);
1382 iov->length = len;
1384 if (ia->ri_have_dma_lkey) {
1385 *mrp = NULL;
1386 iov->lkey = ia->ri_dma_lkey;
1387 return 0;
1388 } else if (ia->ri_bind_mem != NULL) {
1389 *mrp = NULL;
1390 iov->lkey = ia->ri_bind_mem->lkey;
1391 return 0;
1394 ipb.addr = iov->addr;
1395 ipb.size = iov->length;
1396 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1397 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1399 dprintk("RPC: %s: phys convert: 0x%llx "
1400 "registered 0x%llx length %d\n",
1401 __func__, (unsigned long long)ipb.addr,
1402 (unsigned long long)iov->addr, len);
1404 if (IS_ERR(mr)) {
1405 *mrp = NULL;
1406 rc = PTR_ERR(mr);
1407 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1408 } else {
1409 *mrp = mr;
1410 iov->lkey = mr->lkey;
1411 rc = 0;
1414 return rc;
1418 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1419 struct ib_mr *mr, struct ib_sge *iov)
1421 int rc;
1423 ib_dma_unmap_single(ia->ri_id->device,
1424 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1426 if (NULL == mr)
1427 return 0;
1429 rc = ib_dereg_mr(mr);
1430 if (rc)
1431 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1432 return rc;
1436 * Wrappers for chunk registration, shared by read/write chunk code.
1439 static void
1440 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1442 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1443 seg->mr_dmalen = seg->mr_len;
1444 if (seg->mr_page)
1445 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1446 seg->mr_page, offset_in_page(seg->mr_offset),
1447 seg->mr_dmalen, seg->mr_dir);
1448 else
1449 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1450 seg->mr_offset,
1451 seg->mr_dmalen, seg->mr_dir);
1454 static void
1455 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1457 if (seg->mr_page)
1458 ib_dma_unmap_page(ia->ri_id->device,
1459 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1460 else
1461 ib_dma_unmap_single(ia->ri_id->device,
1462 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1465 static int
1466 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1467 int *nsegs, int writing, struct rpcrdma_ia *ia,
1468 struct rpcrdma_xprt *r_xprt)
1470 struct rpcrdma_mr_seg *seg1 = seg;
1471 struct ib_send_wr frmr_wr, *bad_wr;
1472 u8 key;
1473 int len, pageoff;
1474 int i, rc;
1476 pageoff = offset_in_page(seg1->mr_offset);
1477 seg1->mr_offset -= pageoff; /* start of page */
1478 seg1->mr_len += pageoff;
1479 len = -pageoff;
1480 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1481 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1482 for (i = 0; i < *nsegs;) {
1483 rpcrdma_map_one(ia, seg, writing);
1484 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1485 len += seg->mr_len;
1486 ++seg;
1487 ++i;
1488 /* Check for holes */
1489 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1490 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1491 break;
1493 dprintk("RPC: %s: Using frmr %p to map %d segments\n",
1494 __func__, seg1->mr_chunk.rl_mw, i);
1496 /* Bump the key */
1497 key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1498 ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1500 /* Prepare FRMR WR */
1501 memset(&frmr_wr, 0, sizeof frmr_wr);
1502 frmr_wr.opcode = IB_WR_FAST_REG_MR;
1503 frmr_wr.send_flags = 0; /* unsignaled */
1504 frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1505 frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1506 frmr_wr.wr.fast_reg.page_list_len = i;
1507 frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1508 frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1509 frmr_wr.wr.fast_reg.access_flags = (writing ?
1510 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1511 IB_ACCESS_REMOTE_READ);
1512 frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1513 DECR_CQCOUNT(&r_xprt->rx_ep);
1515 rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1517 if (rc) {
1518 dprintk("RPC: %s: failed ib_post_send for register,"
1519 " status %i\n", __func__, rc);
1520 while (i--)
1521 rpcrdma_unmap_one(ia, --seg);
1522 } else {
1523 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1524 seg1->mr_base = seg1->mr_dma + pageoff;
1525 seg1->mr_nsegs = i;
1526 seg1->mr_len = len;
1528 *nsegs = i;
1529 return rc;
1532 static int
1533 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1534 struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1536 struct rpcrdma_mr_seg *seg1 = seg;
1537 struct ib_send_wr invalidate_wr, *bad_wr;
1538 int rc;
1540 while (seg1->mr_nsegs--)
1541 rpcrdma_unmap_one(ia, seg++);
1543 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1544 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1545 invalidate_wr.send_flags = 0; /* unsignaled */
1546 invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1547 DECR_CQCOUNT(&r_xprt->rx_ep);
1549 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1550 if (rc)
1551 dprintk("RPC: %s: failed ib_post_send for invalidate,"
1552 " status %i\n", __func__, rc);
1553 return rc;
1556 static int
1557 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1558 int *nsegs, int writing, struct rpcrdma_ia *ia)
1560 struct rpcrdma_mr_seg *seg1 = seg;
1561 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1562 int len, pageoff, i, rc;
1564 pageoff = offset_in_page(seg1->mr_offset);
1565 seg1->mr_offset -= pageoff; /* start of page */
1566 seg1->mr_len += pageoff;
1567 len = -pageoff;
1568 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1569 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1570 for (i = 0; i < *nsegs;) {
1571 rpcrdma_map_one(ia, seg, writing);
1572 physaddrs[i] = seg->mr_dma;
1573 len += seg->mr_len;
1574 ++seg;
1575 ++i;
1576 /* Check for holes */
1577 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1578 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1579 break;
1581 rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1582 physaddrs, i, seg1->mr_dma);
1583 if (rc) {
1584 dprintk("RPC: %s: failed ib_map_phys_fmr "
1585 "%u@0x%llx+%i (%d)... status %i\n", __func__,
1586 len, (unsigned long long)seg1->mr_dma,
1587 pageoff, i, rc);
1588 while (i--)
1589 rpcrdma_unmap_one(ia, --seg);
1590 } else {
1591 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1592 seg1->mr_base = seg1->mr_dma + pageoff;
1593 seg1->mr_nsegs = i;
1594 seg1->mr_len = len;
1596 *nsegs = i;
1597 return rc;
1600 static int
1601 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1602 struct rpcrdma_ia *ia)
1604 struct rpcrdma_mr_seg *seg1 = seg;
1605 LIST_HEAD(l);
1606 int rc;
1608 list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1609 rc = ib_unmap_fmr(&l);
1610 while (seg1->mr_nsegs--)
1611 rpcrdma_unmap_one(ia, seg++);
1612 if (rc)
1613 dprintk("RPC: %s: failed ib_unmap_fmr,"
1614 " status %i\n", __func__, rc);
1615 return rc;
1618 static int
1619 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1620 int *nsegs, int writing, struct rpcrdma_ia *ia,
1621 struct rpcrdma_xprt *r_xprt)
1623 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1624 IB_ACCESS_REMOTE_READ);
1625 struct ib_mw_bind param;
1626 int rc;
1628 *nsegs = 1;
1629 rpcrdma_map_one(ia, seg, writing);
1630 param.mr = ia->ri_bind_mem;
1631 param.wr_id = 0ULL; /* no send cookie */
1632 param.addr = seg->mr_dma;
1633 param.length = seg->mr_len;
1634 param.send_flags = 0;
1635 param.mw_access_flags = mem_priv;
1637 DECR_CQCOUNT(&r_xprt->rx_ep);
1638 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1639 if (rc) {
1640 dprintk("RPC: %s: failed ib_bind_mw "
1641 "%u@0x%llx status %i\n",
1642 __func__, seg->mr_len,
1643 (unsigned long long)seg->mr_dma, rc);
1644 rpcrdma_unmap_one(ia, seg);
1645 } else {
1646 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1647 seg->mr_base = param.addr;
1648 seg->mr_nsegs = 1;
1650 return rc;
1653 static int
1654 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1655 struct rpcrdma_ia *ia,
1656 struct rpcrdma_xprt *r_xprt, void **r)
1658 struct ib_mw_bind param;
1659 LIST_HEAD(l);
1660 int rc;
1662 BUG_ON(seg->mr_nsegs != 1);
1663 param.mr = ia->ri_bind_mem;
1664 param.addr = 0ULL; /* unbind */
1665 param.length = 0;
1666 param.mw_access_flags = 0;
1667 if (*r) {
1668 param.wr_id = (u64) (unsigned long) *r;
1669 param.send_flags = IB_SEND_SIGNALED;
1670 INIT_CQCOUNT(&r_xprt->rx_ep);
1671 } else {
1672 param.wr_id = 0ULL;
1673 param.send_flags = 0;
1674 DECR_CQCOUNT(&r_xprt->rx_ep);
1676 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1677 rpcrdma_unmap_one(ia, seg);
1678 if (rc)
1679 dprintk("RPC: %s: failed ib_(un)bind_mw,"
1680 " status %i\n", __func__, rc);
1681 else
1682 *r = NULL; /* will upcall on completion */
1683 return rc;
1686 static int
1687 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1688 int *nsegs, int writing, struct rpcrdma_ia *ia)
1690 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1691 IB_ACCESS_REMOTE_READ);
1692 struct rpcrdma_mr_seg *seg1 = seg;
1693 struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1694 int len, i, rc = 0;
1696 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1697 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1698 for (len = 0, i = 0; i < *nsegs;) {
1699 rpcrdma_map_one(ia, seg, writing);
1700 ipb[i].addr = seg->mr_dma;
1701 ipb[i].size = seg->mr_len;
1702 len += seg->mr_len;
1703 ++seg;
1704 ++i;
1705 /* Check for holes */
1706 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1707 offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1708 break;
1710 seg1->mr_base = seg1->mr_dma;
1711 seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1712 ipb, i, mem_priv, &seg1->mr_base);
1713 if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1714 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1715 dprintk("RPC: %s: failed ib_reg_phys_mr "
1716 "%u@0x%llx (%d)... status %i\n",
1717 __func__, len,
1718 (unsigned long long)seg1->mr_dma, i, rc);
1719 while (i--)
1720 rpcrdma_unmap_one(ia, --seg);
1721 } else {
1722 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1723 seg1->mr_nsegs = i;
1724 seg1->mr_len = len;
1726 *nsegs = i;
1727 return rc;
1730 static int
1731 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1732 struct rpcrdma_ia *ia)
1734 struct rpcrdma_mr_seg *seg1 = seg;
1735 int rc;
1737 rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1738 seg1->mr_chunk.rl_mr = NULL;
1739 while (seg1->mr_nsegs--)
1740 rpcrdma_unmap_one(ia, seg++);
1741 if (rc)
1742 dprintk("RPC: %s: failed ib_dereg_mr,"
1743 " status %i\n", __func__, rc);
1744 return rc;
1748 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1749 int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1751 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1752 int rc = 0;
1754 switch (ia->ri_memreg_strategy) {
1756 #if RPCRDMA_PERSISTENT_REGISTRATION
1757 case RPCRDMA_ALLPHYSICAL:
1758 rpcrdma_map_one(ia, seg, writing);
1759 seg->mr_rkey = ia->ri_bind_mem->rkey;
1760 seg->mr_base = seg->mr_dma;
1761 seg->mr_nsegs = 1;
1762 nsegs = 1;
1763 break;
1764 #endif
1766 /* Registration using frmr registration */
1767 case RPCRDMA_FRMR:
1768 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1769 break;
1771 /* Registration using fmr memory registration */
1772 case RPCRDMA_MTHCAFMR:
1773 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1774 break;
1776 /* Registration using memory windows */
1777 case RPCRDMA_MEMWINDOWS_ASYNC:
1778 case RPCRDMA_MEMWINDOWS:
1779 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1780 break;
1782 /* Default registration each time */
1783 default:
1784 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1785 break;
1787 if (rc)
1788 return -1;
1790 return nsegs;
1794 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1795 struct rpcrdma_xprt *r_xprt, void *r)
1797 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1798 int nsegs = seg->mr_nsegs, rc;
1800 switch (ia->ri_memreg_strategy) {
1802 #if RPCRDMA_PERSISTENT_REGISTRATION
1803 case RPCRDMA_ALLPHYSICAL:
1804 BUG_ON(nsegs != 1);
1805 rpcrdma_unmap_one(ia, seg);
1806 rc = 0;
1807 break;
1808 #endif
1810 case RPCRDMA_FRMR:
1811 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1812 break;
1814 case RPCRDMA_MTHCAFMR:
1815 rc = rpcrdma_deregister_fmr_external(seg, ia);
1816 break;
1818 case RPCRDMA_MEMWINDOWS_ASYNC:
1819 case RPCRDMA_MEMWINDOWS:
1820 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1821 break;
1823 default:
1824 rc = rpcrdma_deregister_default_external(seg, ia);
1825 break;
1827 if (r) {
1828 struct rpcrdma_rep *rep = r;
1829 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1830 rep->rr_func = NULL;
1831 func(rep); /* dereg done, callback now */
1833 return nsegs;
1837 * Prepost any receive buffer, then post send.
1839 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1842 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1843 struct rpcrdma_ep *ep,
1844 struct rpcrdma_req *req)
1846 struct ib_send_wr send_wr, *send_wr_fail;
1847 struct rpcrdma_rep *rep = req->rl_reply;
1848 int rc;
1850 if (rep) {
1851 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1852 if (rc)
1853 goto out;
1854 req->rl_reply = NULL;
1857 send_wr.next = NULL;
1858 send_wr.wr_id = 0ULL; /* no send cookie */
1859 send_wr.sg_list = req->rl_send_iov;
1860 send_wr.num_sge = req->rl_niovs;
1861 send_wr.opcode = IB_WR_SEND;
1862 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1863 ib_dma_sync_single_for_device(ia->ri_id->device,
1864 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1865 DMA_TO_DEVICE);
1866 ib_dma_sync_single_for_device(ia->ri_id->device,
1867 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1868 DMA_TO_DEVICE);
1869 ib_dma_sync_single_for_device(ia->ri_id->device,
1870 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1871 DMA_TO_DEVICE);
1873 if (DECR_CQCOUNT(ep) > 0)
1874 send_wr.send_flags = 0;
1875 else { /* Provider must take a send completion every now and then */
1876 INIT_CQCOUNT(ep);
1877 send_wr.send_flags = IB_SEND_SIGNALED;
1880 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1881 if (rc)
1882 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1883 rc);
1884 out:
1885 return rc;
1889 * (Re)post a receive buffer.
1892 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1893 struct rpcrdma_ep *ep,
1894 struct rpcrdma_rep *rep)
1896 struct ib_recv_wr recv_wr, *recv_wr_fail;
1897 int rc;
1899 recv_wr.next = NULL;
1900 recv_wr.wr_id = (u64) (unsigned long) rep;
1901 recv_wr.sg_list = &rep->rr_iov;
1902 recv_wr.num_sge = 1;
1904 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1905 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1907 DECR_CQCOUNT(ep);
1908 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1910 if (rc)
1911 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1912 rc);
1913 return rc;