[SCSI] scsi_transport_fc: fix blocked bsg request when fc object deleted
[linux-2.6/linux-acpi-2.6/ibm-acpi-2.6.git] / net / sunrpc / xprtrdma / verbs.c
blob5f4c7b3bc7114c597703c2739a427815c3be741c
1 /*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * verbs.c
43 * Encapsulates the major functions managing:
44 * o adapters
45 * o endpoints
46 * o connections
47 * o buffer memory
50 #include <linux/pci.h> /* for Tavor hack below */
51 #include <linux/slab.h>
53 #include "xprt_rdma.h"
56 * Globals/Macros
59 #ifdef RPC_DEBUG
60 # define RPCDBG_FACILITY RPCDBG_TRANS
61 #endif
64 * internal functions
68 * handle replies in tasklet context, using a single, global list
69 * rdma tasklet function -- just turn around and call the func
70 * for all replies on the list
73 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
74 static LIST_HEAD(rpcrdma_tasklets_g);
76 static void
77 rpcrdma_run_tasklet(unsigned long data)
79 struct rpcrdma_rep *rep;
80 void (*func)(struct rpcrdma_rep *);
81 unsigned long flags;
83 data = data;
84 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
85 while (!list_empty(&rpcrdma_tasklets_g)) {
86 rep = list_entry(rpcrdma_tasklets_g.next,
87 struct rpcrdma_rep, rr_list);
88 list_del(&rep->rr_list);
89 func = rep->rr_func;
90 rep->rr_func = NULL;
91 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 if (func)
94 func(rep);
95 else
96 rpcrdma_recv_buffer_put(rep);
98 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
103 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105 static inline void
106 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 unsigned long flags;
110 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
111 list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
112 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
113 tasklet_schedule(&rpcrdma_tasklet_g);
116 static void
117 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 struct rpcrdma_ep *ep = context;
121 dprintk("RPC: %s: QP error %X on device %s ep %p\n",
122 __func__, event->event, event->device->name, context);
123 if (ep->rep_connected == 1) {
124 ep->rep_connected = -EIO;
125 ep->rep_func(ep);
126 wake_up_all(&ep->rep_connect_wait);
130 static void
131 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 struct rpcrdma_ep *ep = context;
135 dprintk("RPC: %s: CQ error %X on device %s ep %p\n",
136 __func__, event->event, event->device->name, context);
137 if (ep->rep_connected == 1) {
138 ep->rep_connected = -EIO;
139 ep->rep_func(ep);
140 wake_up_all(&ep->rep_connect_wait);
144 static inline
145 void rpcrdma_event_process(struct ib_wc *wc)
147 struct rpcrdma_rep *rep =
148 (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
150 dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
151 __func__, rep, wc->status, wc->opcode, wc->byte_len);
153 if (!rep) /* send or bind completion that we don't care about */
154 return;
156 if (IB_WC_SUCCESS != wc->status) {
157 dprintk("RPC: %s: %s WC status %X, connection lost\n",
158 __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
159 wc->status);
160 rep->rr_len = ~0U;
161 rpcrdma_schedule_tasklet(rep);
162 return;
165 switch (wc->opcode) {
166 case IB_WC_RECV:
167 rep->rr_len = wc->byte_len;
168 ib_dma_sync_single_for_cpu(
169 rdmab_to_ia(rep->rr_buffer)->ri_id->device,
170 rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
171 /* Keep (only) the most recent credits, after check validity */
172 if (rep->rr_len >= 16) {
173 struct rpcrdma_msg *p =
174 (struct rpcrdma_msg *) rep->rr_base;
175 unsigned int credits = ntohl(p->rm_credit);
176 if (credits == 0) {
177 dprintk("RPC: %s: server"
178 " dropped credits to 0!\n", __func__);
179 /* don't deadlock */
180 credits = 1;
181 } else if (credits > rep->rr_buffer->rb_max_requests) {
182 dprintk("RPC: %s: server"
183 " over-crediting: %d (%d)\n",
184 __func__, credits,
185 rep->rr_buffer->rb_max_requests);
186 credits = rep->rr_buffer->rb_max_requests;
188 atomic_set(&rep->rr_buffer->rb_credits, credits);
190 /* fall through */
191 case IB_WC_BIND_MW:
192 rpcrdma_schedule_tasklet(rep);
193 break;
194 default:
195 dprintk("RPC: %s: unexpected WC event %X\n",
196 __func__, wc->opcode);
197 break;
201 static inline int
202 rpcrdma_cq_poll(struct ib_cq *cq)
204 struct ib_wc wc;
205 int rc;
207 for (;;) {
208 rc = ib_poll_cq(cq, 1, &wc);
209 if (rc < 0) {
210 dprintk("RPC: %s: ib_poll_cq failed %i\n",
211 __func__, rc);
212 return rc;
214 if (rc == 0)
215 break;
217 rpcrdma_event_process(&wc);
220 return 0;
224 * rpcrdma_cq_event_upcall
226 * This upcall handles recv, send, bind and unbind events.
227 * It is reentrant but processes single events in order to maintain
228 * ordering of receives to keep server credits.
230 * It is the responsibility of the scheduled tasklet to return
231 * recv buffers to the pool. NOTE: this affects synchronization of
232 * connection shutdown. That is, the structures required for
233 * the completion of the reply handler must remain intact until
234 * all memory has been reclaimed.
236 * Note that send events are suppressed and do not result in an upcall.
238 static void
239 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
241 int rc;
243 rc = rpcrdma_cq_poll(cq);
244 if (rc)
245 return;
247 rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
248 if (rc) {
249 dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
250 __func__, rc);
251 return;
254 rpcrdma_cq_poll(cq);
257 #ifdef RPC_DEBUG
258 static const char * const conn[] = {
259 "address resolved",
260 "address error",
261 "route resolved",
262 "route error",
263 "connect request",
264 "connect response",
265 "connect error",
266 "unreachable",
267 "rejected",
268 "established",
269 "disconnected",
270 "device removal"
272 #endif
274 static int
275 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
277 struct rpcrdma_xprt *xprt = id->context;
278 struct rpcrdma_ia *ia = &xprt->rx_ia;
279 struct rpcrdma_ep *ep = &xprt->rx_ep;
280 #ifdef RPC_DEBUG
281 struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
282 #endif
283 struct ib_qp_attr attr;
284 struct ib_qp_init_attr iattr;
285 int connstate = 0;
287 switch (event->event) {
288 case RDMA_CM_EVENT_ADDR_RESOLVED:
289 case RDMA_CM_EVENT_ROUTE_RESOLVED:
290 ia->ri_async_rc = 0;
291 complete(&ia->ri_done);
292 break;
293 case RDMA_CM_EVENT_ADDR_ERROR:
294 ia->ri_async_rc = -EHOSTUNREACH;
295 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
296 __func__, ep);
297 complete(&ia->ri_done);
298 break;
299 case RDMA_CM_EVENT_ROUTE_ERROR:
300 ia->ri_async_rc = -ENETUNREACH;
301 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
302 __func__, ep);
303 complete(&ia->ri_done);
304 break;
305 case RDMA_CM_EVENT_ESTABLISHED:
306 connstate = 1;
307 ib_query_qp(ia->ri_id->qp, &attr,
308 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
309 &iattr);
310 dprintk("RPC: %s: %d responder resources"
311 " (%d initiator)\n",
312 __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
313 goto connected;
314 case RDMA_CM_EVENT_CONNECT_ERROR:
315 connstate = -ENOTCONN;
316 goto connected;
317 case RDMA_CM_EVENT_UNREACHABLE:
318 connstate = -ENETDOWN;
319 goto connected;
320 case RDMA_CM_EVENT_REJECTED:
321 connstate = -ECONNREFUSED;
322 goto connected;
323 case RDMA_CM_EVENT_DISCONNECTED:
324 connstate = -ECONNABORTED;
325 goto connected;
326 case RDMA_CM_EVENT_DEVICE_REMOVAL:
327 connstate = -ENODEV;
328 connected:
329 dprintk("RPC: %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
330 __func__,
331 (event->event <= 11) ? conn[event->event] :
332 "unknown connection error",
333 &addr->sin_addr.s_addr,
334 ntohs(addr->sin_port),
335 ep, event->event);
336 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
337 dprintk("RPC: %s: %sconnected\n",
338 __func__, connstate > 0 ? "" : "dis");
339 ep->rep_connected = connstate;
340 ep->rep_func(ep);
341 wake_up_all(&ep->rep_connect_wait);
342 break;
343 default:
344 dprintk("RPC: %s: unexpected CM event %d\n",
345 __func__, event->event);
346 break;
349 #ifdef RPC_DEBUG
350 if (connstate == 1) {
351 int ird = attr.max_dest_rd_atomic;
352 int tird = ep->rep_remote_cma.responder_resources;
353 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
354 "on %s, memreg %d slots %d ird %d%s\n",
355 &addr->sin_addr.s_addr,
356 ntohs(addr->sin_port),
357 ia->ri_id->device->name,
358 ia->ri_memreg_strategy,
359 xprt->rx_buf.rb_max_requests,
360 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
361 } else if (connstate < 0) {
362 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
363 &addr->sin_addr.s_addr,
364 ntohs(addr->sin_port),
365 connstate);
367 #endif
369 return 0;
372 static struct rdma_cm_id *
373 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
374 struct rpcrdma_ia *ia, struct sockaddr *addr)
376 struct rdma_cm_id *id;
377 int rc;
379 init_completion(&ia->ri_done);
381 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
382 if (IS_ERR(id)) {
383 rc = PTR_ERR(id);
384 dprintk("RPC: %s: rdma_create_id() failed %i\n",
385 __func__, rc);
386 return id;
389 ia->ri_async_rc = -ETIMEDOUT;
390 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
391 if (rc) {
392 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
393 __func__, rc);
394 goto out;
396 wait_for_completion_interruptible_timeout(&ia->ri_done,
397 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
398 rc = ia->ri_async_rc;
399 if (rc)
400 goto out;
402 ia->ri_async_rc = -ETIMEDOUT;
403 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
404 if (rc) {
405 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
406 __func__, rc);
407 goto out;
409 wait_for_completion_interruptible_timeout(&ia->ri_done,
410 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
411 rc = ia->ri_async_rc;
412 if (rc)
413 goto out;
415 return id;
417 out:
418 rdma_destroy_id(id);
419 return ERR_PTR(rc);
423 * Drain any cq, prior to teardown.
425 static void
426 rpcrdma_clean_cq(struct ib_cq *cq)
428 struct ib_wc wc;
429 int count = 0;
431 while (1 == ib_poll_cq(cq, 1, &wc))
432 ++count;
434 if (count)
435 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
436 __func__, count, wc.opcode);
440 * Exported functions.
444 * Open and initialize an Interface Adapter.
445 * o initializes fields of struct rpcrdma_ia, including
446 * interface and provider attributes and protection zone.
449 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
451 int rc, mem_priv;
452 struct ib_device_attr devattr;
453 struct rpcrdma_ia *ia = &xprt->rx_ia;
455 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
456 if (IS_ERR(ia->ri_id)) {
457 rc = PTR_ERR(ia->ri_id);
458 goto out1;
461 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
462 if (IS_ERR(ia->ri_pd)) {
463 rc = PTR_ERR(ia->ri_pd);
464 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
465 __func__, rc);
466 goto out2;
470 * Query the device to determine if the requested memory
471 * registration strategy is supported. If it isn't, set the
472 * strategy to a globally supported model.
474 rc = ib_query_device(ia->ri_id->device, &devattr);
475 if (rc) {
476 dprintk("RPC: %s: ib_query_device failed %d\n",
477 __func__, rc);
478 goto out2;
481 if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
482 ia->ri_have_dma_lkey = 1;
483 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
486 switch (memreg) {
487 case RPCRDMA_MEMWINDOWS:
488 case RPCRDMA_MEMWINDOWS_ASYNC:
489 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
490 dprintk("RPC: %s: MEMWINDOWS registration "
491 "specified but not supported by adapter, "
492 "using slower RPCRDMA_REGISTER\n",
493 __func__);
494 memreg = RPCRDMA_REGISTER;
496 break;
497 case RPCRDMA_MTHCAFMR:
498 if (!ia->ri_id->device->alloc_fmr) {
499 #if RPCRDMA_PERSISTENT_REGISTRATION
500 dprintk("RPC: %s: MTHCAFMR registration "
501 "specified but not supported by adapter, "
502 "using riskier RPCRDMA_ALLPHYSICAL\n",
503 __func__);
504 memreg = RPCRDMA_ALLPHYSICAL;
505 #else
506 dprintk("RPC: %s: MTHCAFMR registration "
507 "specified but not supported by adapter, "
508 "using slower RPCRDMA_REGISTER\n",
509 __func__);
510 memreg = RPCRDMA_REGISTER;
511 #endif
513 break;
514 case RPCRDMA_FRMR:
515 /* Requires both frmr reg and local dma lkey */
516 if ((devattr.device_cap_flags &
517 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
518 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
519 #if RPCRDMA_PERSISTENT_REGISTRATION
520 dprintk("RPC: %s: FRMR registration "
521 "specified but not supported by adapter, "
522 "using riskier RPCRDMA_ALLPHYSICAL\n",
523 __func__);
524 memreg = RPCRDMA_ALLPHYSICAL;
525 #else
526 dprintk("RPC: %s: FRMR registration "
527 "specified but not supported by adapter, "
528 "using slower RPCRDMA_REGISTER\n",
529 __func__);
530 memreg = RPCRDMA_REGISTER;
531 #endif
533 break;
537 * Optionally obtain an underlying physical identity mapping in
538 * order to do a memory window-based bind. This base registration
539 * is protected from remote access - that is enabled only by binding
540 * for the specific bytes targeted during each RPC operation, and
541 * revoked after the corresponding completion similar to a storage
542 * adapter.
544 switch (memreg) {
545 case RPCRDMA_BOUNCEBUFFERS:
546 case RPCRDMA_REGISTER:
547 case RPCRDMA_FRMR:
548 break;
549 #if RPCRDMA_PERSISTENT_REGISTRATION
550 case RPCRDMA_ALLPHYSICAL:
551 mem_priv = IB_ACCESS_LOCAL_WRITE |
552 IB_ACCESS_REMOTE_WRITE |
553 IB_ACCESS_REMOTE_READ;
554 goto register_setup;
555 #endif
556 case RPCRDMA_MEMWINDOWS_ASYNC:
557 case RPCRDMA_MEMWINDOWS:
558 mem_priv = IB_ACCESS_LOCAL_WRITE |
559 IB_ACCESS_MW_BIND;
560 goto register_setup;
561 case RPCRDMA_MTHCAFMR:
562 if (ia->ri_have_dma_lkey)
563 break;
564 mem_priv = IB_ACCESS_LOCAL_WRITE;
565 register_setup:
566 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
567 if (IS_ERR(ia->ri_bind_mem)) {
568 printk(KERN_ALERT "%s: ib_get_dma_mr for "
569 "phys register failed with %lX\n\t"
570 "Will continue with degraded performance\n",
571 __func__, PTR_ERR(ia->ri_bind_mem));
572 memreg = RPCRDMA_REGISTER;
573 ia->ri_bind_mem = NULL;
575 break;
576 default:
577 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
578 __func__, memreg);
579 rc = -EINVAL;
580 goto out2;
582 dprintk("RPC: %s: memory registration strategy is %d\n",
583 __func__, memreg);
585 /* Else will do memory reg/dereg for each chunk */
586 ia->ri_memreg_strategy = memreg;
588 return 0;
589 out2:
590 rdma_destroy_id(ia->ri_id);
591 ia->ri_id = NULL;
592 out1:
593 return rc;
597 * Clean up/close an IA.
598 * o if event handles and PD have been initialized, free them.
599 * o close the IA
601 void
602 rpcrdma_ia_close(struct rpcrdma_ia *ia)
604 int rc;
606 dprintk("RPC: %s: entering\n", __func__);
607 if (ia->ri_bind_mem != NULL) {
608 rc = ib_dereg_mr(ia->ri_bind_mem);
609 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
610 __func__, rc);
612 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
613 if (ia->ri_id->qp)
614 rdma_destroy_qp(ia->ri_id);
615 rdma_destroy_id(ia->ri_id);
616 ia->ri_id = NULL;
618 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
619 rc = ib_dealloc_pd(ia->ri_pd);
620 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
621 __func__, rc);
626 * Create unconnected endpoint.
629 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
630 struct rpcrdma_create_data_internal *cdata)
632 struct ib_device_attr devattr;
633 int rc, err;
635 rc = ib_query_device(ia->ri_id->device, &devattr);
636 if (rc) {
637 dprintk("RPC: %s: ib_query_device failed %d\n",
638 __func__, rc);
639 return rc;
642 /* check provider's send/recv wr limits */
643 if (cdata->max_requests > devattr.max_qp_wr)
644 cdata->max_requests = devattr.max_qp_wr;
646 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
647 ep->rep_attr.qp_context = ep;
648 /* send_cq and recv_cq initialized below */
649 ep->rep_attr.srq = NULL;
650 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
651 switch (ia->ri_memreg_strategy) {
652 case RPCRDMA_FRMR:
653 /* Add room for frmr register and invalidate WRs.
654 * 1. FRMR reg WR for head
655 * 2. FRMR invalidate WR for head
656 * 3. FRMR reg WR for pagelist
657 * 4. FRMR invalidate WR for pagelist
658 * 5. FRMR reg WR for tail
659 * 6. FRMR invalidate WR for tail
660 * 7. The RDMA_SEND WR
662 ep->rep_attr.cap.max_send_wr *= 7;
663 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
664 cdata->max_requests = devattr.max_qp_wr / 7;
665 if (!cdata->max_requests)
666 return -EINVAL;
667 ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7;
669 break;
670 case RPCRDMA_MEMWINDOWS_ASYNC:
671 case RPCRDMA_MEMWINDOWS:
672 /* Add room for mw_binds+unbinds - overkill! */
673 ep->rep_attr.cap.max_send_wr++;
674 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
675 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
676 return -EINVAL;
677 break;
678 default:
679 break;
681 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
682 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
683 ep->rep_attr.cap.max_recv_sge = 1;
684 ep->rep_attr.cap.max_inline_data = 0;
685 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
686 ep->rep_attr.qp_type = IB_QPT_RC;
687 ep->rep_attr.port_num = ~0;
689 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
690 "iovs: send %d recv %d\n",
691 __func__,
692 ep->rep_attr.cap.max_send_wr,
693 ep->rep_attr.cap.max_recv_wr,
694 ep->rep_attr.cap.max_send_sge,
695 ep->rep_attr.cap.max_recv_sge);
697 /* set trigger for requesting send completion */
698 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
699 switch (ia->ri_memreg_strategy) {
700 case RPCRDMA_MEMWINDOWS_ASYNC:
701 case RPCRDMA_MEMWINDOWS:
702 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
703 break;
704 default:
705 break;
707 if (ep->rep_cqinit <= 2)
708 ep->rep_cqinit = 0;
709 INIT_CQCOUNT(ep);
710 ep->rep_ia = ia;
711 init_waitqueue_head(&ep->rep_connect_wait);
714 * Create a single cq for receive dto and mw_bind (only ever
715 * care about unbind, really). Send completions are suppressed.
716 * Use single threaded tasklet upcalls to maintain ordering.
718 ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
719 rpcrdma_cq_async_error_upcall, NULL,
720 ep->rep_attr.cap.max_recv_wr +
721 ep->rep_attr.cap.max_send_wr + 1, 0);
722 if (IS_ERR(ep->rep_cq)) {
723 rc = PTR_ERR(ep->rep_cq);
724 dprintk("RPC: %s: ib_create_cq failed: %i\n",
725 __func__, rc);
726 goto out1;
729 rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
730 if (rc) {
731 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
732 __func__, rc);
733 goto out2;
736 ep->rep_attr.send_cq = ep->rep_cq;
737 ep->rep_attr.recv_cq = ep->rep_cq;
739 /* Initialize cma parameters */
741 /* RPC/RDMA does not use private data */
742 ep->rep_remote_cma.private_data = NULL;
743 ep->rep_remote_cma.private_data_len = 0;
745 /* Client offers RDMA Read but does not initiate */
746 ep->rep_remote_cma.initiator_depth = 0;
747 if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
748 ep->rep_remote_cma.responder_resources = 0;
749 else if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
750 ep->rep_remote_cma.responder_resources = 32;
751 else
752 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
754 ep->rep_remote_cma.retry_count = 7;
755 ep->rep_remote_cma.flow_control = 0;
756 ep->rep_remote_cma.rnr_retry_count = 0;
758 return 0;
760 out2:
761 err = ib_destroy_cq(ep->rep_cq);
762 if (err)
763 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
764 __func__, err);
765 out1:
766 return rc;
770 * rpcrdma_ep_destroy
772 * Disconnect and destroy endpoint. After this, the only
773 * valid operations on the ep are to free it (if dynamically
774 * allocated) or re-create it.
776 * The caller's error handling must be sure to not leak the endpoint
777 * if this function fails.
780 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
782 int rc;
784 dprintk("RPC: %s: entering, connected is %d\n",
785 __func__, ep->rep_connected);
787 if (ia->ri_id->qp) {
788 rc = rpcrdma_ep_disconnect(ep, ia);
789 if (rc)
790 dprintk("RPC: %s: rpcrdma_ep_disconnect"
791 " returned %i\n", __func__, rc);
792 rdma_destroy_qp(ia->ri_id);
793 ia->ri_id->qp = NULL;
796 /* padding - could be done in rpcrdma_buffer_destroy... */
797 if (ep->rep_pad_mr) {
798 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
799 ep->rep_pad_mr = NULL;
802 rpcrdma_clean_cq(ep->rep_cq);
803 rc = ib_destroy_cq(ep->rep_cq);
804 if (rc)
805 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
806 __func__, rc);
808 return rc;
812 * Connect unconnected endpoint.
815 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
817 struct rdma_cm_id *id;
818 int rc = 0;
819 int retry_count = 0;
821 if (ep->rep_connected != 0) {
822 struct rpcrdma_xprt *xprt;
823 retry:
824 rc = rpcrdma_ep_disconnect(ep, ia);
825 if (rc && rc != -ENOTCONN)
826 dprintk("RPC: %s: rpcrdma_ep_disconnect"
827 " status %i\n", __func__, rc);
828 rpcrdma_clean_cq(ep->rep_cq);
830 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
831 id = rpcrdma_create_id(xprt, ia,
832 (struct sockaddr *)&xprt->rx_data.addr);
833 if (IS_ERR(id)) {
834 rc = PTR_ERR(id);
835 goto out;
837 /* TEMP TEMP TEMP - fail if new device:
838 * Deregister/remarshal *all* requests!
839 * Close and recreate adapter, pd, etc!
840 * Re-determine all attributes still sane!
841 * More stuff I haven't thought of!
842 * Rrrgh!
844 if (ia->ri_id->device != id->device) {
845 printk("RPC: %s: can't reconnect on "
846 "different device!\n", __func__);
847 rdma_destroy_id(id);
848 rc = -ENETDOWN;
849 goto out;
851 /* END TEMP */
852 rdma_destroy_qp(ia->ri_id);
853 rdma_destroy_id(ia->ri_id);
854 ia->ri_id = id;
857 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
858 if (rc) {
859 dprintk("RPC: %s: rdma_create_qp failed %i\n",
860 __func__, rc);
861 goto out;
864 /* XXX Tavor device performs badly with 2K MTU! */
865 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
866 struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
867 if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
868 (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
869 pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
870 struct ib_qp_attr attr = {
871 .path_mtu = IB_MTU_1024
873 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
877 ep->rep_connected = 0;
879 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
880 if (rc) {
881 dprintk("RPC: %s: rdma_connect() failed with %i\n",
882 __func__, rc);
883 goto out;
886 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
889 * Check state. A non-peer reject indicates no listener
890 * (ECONNREFUSED), which may be a transient state. All
891 * others indicate a transport condition which has already
892 * undergone a best-effort.
894 if (ep->rep_connected == -ECONNREFUSED &&
895 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
896 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
897 goto retry;
899 if (ep->rep_connected <= 0) {
900 /* Sometimes, the only way to reliably connect to remote
901 * CMs is to use same nonzero values for ORD and IRD. */
902 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
903 (ep->rep_remote_cma.responder_resources == 0 ||
904 ep->rep_remote_cma.initiator_depth !=
905 ep->rep_remote_cma.responder_resources)) {
906 if (ep->rep_remote_cma.responder_resources == 0)
907 ep->rep_remote_cma.responder_resources = 1;
908 ep->rep_remote_cma.initiator_depth =
909 ep->rep_remote_cma.responder_resources;
910 goto retry;
912 rc = ep->rep_connected;
913 } else {
914 dprintk("RPC: %s: connected\n", __func__);
917 out:
918 if (rc)
919 ep->rep_connected = rc;
920 return rc;
924 * rpcrdma_ep_disconnect
926 * This is separate from destroy to facilitate the ability
927 * to reconnect without recreating the endpoint.
929 * This call is not reentrant, and must not be made in parallel
930 * on the same endpoint.
933 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
935 int rc;
937 rpcrdma_clean_cq(ep->rep_cq);
938 rc = rdma_disconnect(ia->ri_id);
939 if (!rc) {
940 /* returns without wait if not connected */
941 wait_event_interruptible(ep->rep_connect_wait,
942 ep->rep_connected != 1);
943 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
944 (ep->rep_connected == 1) ? "still " : "dis");
945 } else {
946 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
947 ep->rep_connected = rc;
949 return rc;
953 * Initialize buffer memory
956 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
957 struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
959 char *p;
960 size_t len;
961 int i, rc;
962 struct rpcrdma_mw *r;
964 buf->rb_max_requests = cdata->max_requests;
965 spin_lock_init(&buf->rb_lock);
966 atomic_set(&buf->rb_credits, 1);
968 /* Need to allocate:
969 * 1. arrays for send and recv pointers
970 * 2. arrays of struct rpcrdma_req to fill in pointers
971 * 3. array of struct rpcrdma_rep for replies
972 * 4. padding, if any
973 * 5. mw's, fmr's or frmr's, if any
974 * Send/recv buffers in req/rep need to be registered
977 len = buf->rb_max_requests *
978 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
979 len += cdata->padding;
980 switch (ia->ri_memreg_strategy) {
981 case RPCRDMA_FRMR:
982 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
983 sizeof(struct rpcrdma_mw);
984 break;
985 case RPCRDMA_MTHCAFMR:
986 /* TBD we are perhaps overallocating here */
987 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
988 sizeof(struct rpcrdma_mw);
989 break;
990 case RPCRDMA_MEMWINDOWS_ASYNC:
991 case RPCRDMA_MEMWINDOWS:
992 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
993 sizeof(struct rpcrdma_mw);
994 break;
995 default:
996 break;
999 /* allocate 1, 4 and 5 in one shot */
1000 p = kzalloc(len, GFP_KERNEL);
1001 if (p == NULL) {
1002 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1003 __func__, len);
1004 rc = -ENOMEM;
1005 goto out;
1007 buf->rb_pool = p; /* for freeing it later */
1009 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1010 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1011 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1012 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1015 * Register the zeroed pad buffer, if any.
1017 if (cdata->padding) {
1018 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1019 &ep->rep_pad_mr, &ep->rep_pad);
1020 if (rc)
1021 goto out;
1023 p += cdata->padding;
1026 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1027 * We "cycle" the mw's in order to minimize rkey reuse,
1028 * and also reduce unbind-to-bind collision.
1030 INIT_LIST_HEAD(&buf->rb_mws);
1031 r = (struct rpcrdma_mw *)p;
1032 switch (ia->ri_memreg_strategy) {
1033 case RPCRDMA_FRMR:
1034 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1035 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1036 RPCRDMA_MAX_SEGS);
1037 if (IS_ERR(r->r.frmr.fr_mr)) {
1038 rc = PTR_ERR(r->r.frmr.fr_mr);
1039 dprintk("RPC: %s: ib_alloc_fast_reg_mr"
1040 " failed %i\n", __func__, rc);
1041 goto out;
1043 r->r.frmr.fr_pgl =
1044 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1045 RPCRDMA_MAX_SEGS);
1046 if (IS_ERR(r->r.frmr.fr_pgl)) {
1047 rc = PTR_ERR(r->r.frmr.fr_pgl);
1048 dprintk("RPC: %s: "
1049 "ib_alloc_fast_reg_page_list "
1050 "failed %i\n", __func__, rc);
1051 goto out;
1053 list_add(&r->mw_list, &buf->rb_mws);
1054 ++r;
1056 break;
1057 case RPCRDMA_MTHCAFMR:
1058 /* TBD we are perhaps overallocating here */
1059 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1060 static struct ib_fmr_attr fa =
1061 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1062 r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1063 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1064 &fa);
1065 if (IS_ERR(r->r.fmr)) {
1066 rc = PTR_ERR(r->r.fmr);
1067 dprintk("RPC: %s: ib_alloc_fmr"
1068 " failed %i\n", __func__, rc);
1069 goto out;
1071 list_add(&r->mw_list, &buf->rb_mws);
1072 ++r;
1074 break;
1075 case RPCRDMA_MEMWINDOWS_ASYNC:
1076 case RPCRDMA_MEMWINDOWS:
1077 /* Allocate one extra request's worth, for full cycling */
1078 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1079 r->r.mw = ib_alloc_mw(ia->ri_pd);
1080 if (IS_ERR(r->r.mw)) {
1081 rc = PTR_ERR(r->r.mw);
1082 dprintk("RPC: %s: ib_alloc_mw"
1083 " failed %i\n", __func__, rc);
1084 goto out;
1086 list_add(&r->mw_list, &buf->rb_mws);
1087 ++r;
1089 break;
1090 default:
1091 break;
1095 * Allocate/init the request/reply buffers. Doing this
1096 * using kmalloc for now -- one for each buf.
1098 for (i = 0; i < buf->rb_max_requests; i++) {
1099 struct rpcrdma_req *req;
1100 struct rpcrdma_rep *rep;
1102 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1103 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1104 /* Typical ~2400b, so rounding up saves work later */
1105 if (len < 4096)
1106 len = 4096;
1107 req = kmalloc(len, GFP_KERNEL);
1108 if (req == NULL) {
1109 dprintk("RPC: %s: request buffer %d alloc"
1110 " failed\n", __func__, i);
1111 rc = -ENOMEM;
1112 goto out;
1114 memset(req, 0, sizeof(struct rpcrdma_req));
1115 buf->rb_send_bufs[i] = req;
1116 buf->rb_send_bufs[i]->rl_buffer = buf;
1118 rc = rpcrdma_register_internal(ia, req->rl_base,
1119 len - offsetof(struct rpcrdma_req, rl_base),
1120 &buf->rb_send_bufs[i]->rl_handle,
1121 &buf->rb_send_bufs[i]->rl_iov);
1122 if (rc)
1123 goto out;
1125 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1127 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1128 rep = kmalloc(len, GFP_KERNEL);
1129 if (rep == NULL) {
1130 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1131 __func__, i);
1132 rc = -ENOMEM;
1133 goto out;
1135 memset(rep, 0, sizeof(struct rpcrdma_rep));
1136 buf->rb_recv_bufs[i] = rep;
1137 buf->rb_recv_bufs[i]->rr_buffer = buf;
1138 init_waitqueue_head(&rep->rr_unbind);
1140 rc = rpcrdma_register_internal(ia, rep->rr_base,
1141 len - offsetof(struct rpcrdma_rep, rr_base),
1142 &buf->rb_recv_bufs[i]->rr_handle,
1143 &buf->rb_recv_bufs[i]->rr_iov);
1144 if (rc)
1145 goto out;
1148 dprintk("RPC: %s: max_requests %d\n",
1149 __func__, buf->rb_max_requests);
1150 /* done */
1151 return 0;
1152 out:
1153 rpcrdma_buffer_destroy(buf);
1154 return rc;
1158 * Unregister and destroy buffer memory. Need to deal with
1159 * partial initialization, so it's callable from failed create.
1160 * Must be called before destroying endpoint, as registrations
1161 * reference it.
1163 void
1164 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1166 int rc, i;
1167 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1168 struct rpcrdma_mw *r;
1170 /* clean up in reverse order from create
1171 * 1. recv mr memory (mr free, then kfree)
1172 * 1a. bind mw memory
1173 * 2. send mr memory (mr free, then kfree)
1174 * 3. padding (if any) [moved to rpcrdma_ep_destroy]
1175 * 4. arrays
1177 dprintk("RPC: %s: entering\n", __func__);
1179 for (i = 0; i < buf->rb_max_requests; i++) {
1180 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1181 rpcrdma_deregister_internal(ia,
1182 buf->rb_recv_bufs[i]->rr_handle,
1183 &buf->rb_recv_bufs[i]->rr_iov);
1184 kfree(buf->rb_recv_bufs[i]);
1186 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1187 while (!list_empty(&buf->rb_mws)) {
1188 r = list_entry(buf->rb_mws.next,
1189 struct rpcrdma_mw, mw_list);
1190 list_del(&r->mw_list);
1191 switch (ia->ri_memreg_strategy) {
1192 case RPCRDMA_FRMR:
1193 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1194 if (rc)
1195 dprintk("RPC: %s:"
1196 " ib_dereg_mr"
1197 " failed %i\n",
1198 __func__, rc);
1199 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1200 break;
1201 case RPCRDMA_MTHCAFMR:
1202 rc = ib_dealloc_fmr(r->r.fmr);
1203 if (rc)
1204 dprintk("RPC: %s:"
1205 " ib_dealloc_fmr"
1206 " failed %i\n",
1207 __func__, rc);
1208 break;
1209 case RPCRDMA_MEMWINDOWS_ASYNC:
1210 case RPCRDMA_MEMWINDOWS:
1211 rc = ib_dealloc_mw(r->r.mw);
1212 if (rc)
1213 dprintk("RPC: %s:"
1214 " ib_dealloc_mw"
1215 " failed %i\n",
1216 __func__, rc);
1217 break;
1218 default:
1219 break;
1222 rpcrdma_deregister_internal(ia,
1223 buf->rb_send_bufs[i]->rl_handle,
1224 &buf->rb_send_bufs[i]->rl_iov);
1225 kfree(buf->rb_send_bufs[i]);
1229 kfree(buf->rb_pool);
1233 * Get a set of request/reply buffers.
1235 * Reply buffer (if needed) is attached to send buffer upon return.
1236 * Rule:
1237 * rb_send_index and rb_recv_index MUST always be pointing to the
1238 * *next* available buffer (non-NULL). They are incremented after
1239 * removing buffers, and decremented *before* returning them.
1241 struct rpcrdma_req *
1242 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1244 struct rpcrdma_req *req;
1245 unsigned long flags;
1246 int i;
1247 struct rpcrdma_mw *r;
1249 spin_lock_irqsave(&buffers->rb_lock, flags);
1250 if (buffers->rb_send_index == buffers->rb_max_requests) {
1251 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1252 dprintk("RPC: %s: out of request buffers\n", __func__);
1253 return ((struct rpcrdma_req *)NULL);
1256 req = buffers->rb_send_bufs[buffers->rb_send_index];
1257 if (buffers->rb_send_index < buffers->rb_recv_index) {
1258 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1259 __func__,
1260 buffers->rb_recv_index - buffers->rb_send_index);
1261 req->rl_reply = NULL;
1262 } else {
1263 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1264 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1266 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1267 if (!list_empty(&buffers->rb_mws)) {
1268 i = RPCRDMA_MAX_SEGS - 1;
1269 do {
1270 r = list_entry(buffers->rb_mws.next,
1271 struct rpcrdma_mw, mw_list);
1272 list_del(&r->mw_list);
1273 req->rl_segments[i].mr_chunk.rl_mw = r;
1274 } while (--i >= 0);
1276 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1277 return req;
1281 * Put request/reply buffers back into pool.
1282 * Pre-decrement counter/array index.
1284 void
1285 rpcrdma_buffer_put(struct rpcrdma_req *req)
1287 struct rpcrdma_buffer *buffers = req->rl_buffer;
1288 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1289 int i;
1290 unsigned long flags;
1292 BUG_ON(req->rl_nchunks != 0);
1293 spin_lock_irqsave(&buffers->rb_lock, flags);
1294 buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1295 req->rl_niovs = 0;
1296 if (req->rl_reply) {
1297 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1298 init_waitqueue_head(&req->rl_reply->rr_unbind);
1299 req->rl_reply->rr_func = NULL;
1300 req->rl_reply = NULL;
1302 switch (ia->ri_memreg_strategy) {
1303 case RPCRDMA_FRMR:
1304 case RPCRDMA_MTHCAFMR:
1305 case RPCRDMA_MEMWINDOWS_ASYNC:
1306 case RPCRDMA_MEMWINDOWS:
1308 * Cycle mw's back in reverse order, and "spin" them.
1309 * This delays and scrambles reuse as much as possible.
1311 i = 1;
1312 do {
1313 struct rpcrdma_mw **mw;
1314 mw = &req->rl_segments[i].mr_chunk.rl_mw;
1315 list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1316 *mw = NULL;
1317 } while (++i < RPCRDMA_MAX_SEGS);
1318 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1319 &buffers->rb_mws);
1320 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1321 break;
1322 default:
1323 break;
1325 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1329 * Recover reply buffers from pool.
1330 * This happens when recovering from error conditions.
1331 * Post-increment counter/array index.
1333 void
1334 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1336 struct rpcrdma_buffer *buffers = req->rl_buffer;
1337 unsigned long flags;
1339 if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
1340 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1341 spin_lock_irqsave(&buffers->rb_lock, flags);
1342 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1343 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1344 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1346 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1350 * Put reply buffers back into pool when not attached to
1351 * request. This happens in error conditions, and when
1352 * aborting unbinds. Pre-decrement counter/array index.
1354 void
1355 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1357 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1358 unsigned long flags;
1360 rep->rr_func = NULL;
1361 spin_lock_irqsave(&buffers->rb_lock, flags);
1362 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1363 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1367 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1371 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1372 struct ib_mr **mrp, struct ib_sge *iov)
1374 struct ib_phys_buf ipb;
1375 struct ib_mr *mr;
1376 int rc;
1379 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1381 iov->addr = ib_dma_map_single(ia->ri_id->device,
1382 va, len, DMA_BIDIRECTIONAL);
1383 iov->length = len;
1385 if (ia->ri_have_dma_lkey) {
1386 *mrp = NULL;
1387 iov->lkey = ia->ri_dma_lkey;
1388 return 0;
1389 } else if (ia->ri_bind_mem != NULL) {
1390 *mrp = NULL;
1391 iov->lkey = ia->ri_bind_mem->lkey;
1392 return 0;
1395 ipb.addr = iov->addr;
1396 ipb.size = iov->length;
1397 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1398 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1400 dprintk("RPC: %s: phys convert: 0x%llx "
1401 "registered 0x%llx length %d\n",
1402 __func__, (unsigned long long)ipb.addr,
1403 (unsigned long long)iov->addr, len);
1405 if (IS_ERR(mr)) {
1406 *mrp = NULL;
1407 rc = PTR_ERR(mr);
1408 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1409 } else {
1410 *mrp = mr;
1411 iov->lkey = mr->lkey;
1412 rc = 0;
1415 return rc;
1419 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1420 struct ib_mr *mr, struct ib_sge *iov)
1422 int rc;
1424 ib_dma_unmap_single(ia->ri_id->device,
1425 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1427 if (NULL == mr)
1428 return 0;
1430 rc = ib_dereg_mr(mr);
1431 if (rc)
1432 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1433 return rc;
1437 * Wrappers for chunk registration, shared by read/write chunk code.
1440 static void
1441 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1443 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1444 seg->mr_dmalen = seg->mr_len;
1445 if (seg->mr_page)
1446 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1447 seg->mr_page, offset_in_page(seg->mr_offset),
1448 seg->mr_dmalen, seg->mr_dir);
1449 else
1450 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1451 seg->mr_offset,
1452 seg->mr_dmalen, seg->mr_dir);
1455 static void
1456 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1458 if (seg->mr_page)
1459 ib_dma_unmap_page(ia->ri_id->device,
1460 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1461 else
1462 ib_dma_unmap_single(ia->ri_id->device,
1463 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1466 static int
1467 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1468 int *nsegs, int writing, struct rpcrdma_ia *ia,
1469 struct rpcrdma_xprt *r_xprt)
1471 struct rpcrdma_mr_seg *seg1 = seg;
1472 struct ib_send_wr frmr_wr, *bad_wr;
1473 u8 key;
1474 int len, pageoff;
1475 int i, rc;
1477 pageoff = offset_in_page(seg1->mr_offset);
1478 seg1->mr_offset -= pageoff; /* start of page */
1479 seg1->mr_len += pageoff;
1480 len = -pageoff;
1481 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1482 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1483 for (i = 0; i < *nsegs;) {
1484 rpcrdma_map_one(ia, seg, writing);
1485 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1486 len += seg->mr_len;
1487 ++seg;
1488 ++i;
1489 /* Check for holes */
1490 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1491 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1492 break;
1494 dprintk("RPC: %s: Using frmr %p to map %d segments\n",
1495 __func__, seg1->mr_chunk.rl_mw, i);
1497 /* Bump the key */
1498 key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1499 ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1501 /* Prepare FRMR WR */
1502 memset(&frmr_wr, 0, sizeof frmr_wr);
1503 frmr_wr.opcode = IB_WR_FAST_REG_MR;
1504 frmr_wr.send_flags = 0; /* unsignaled */
1505 frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1506 frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1507 frmr_wr.wr.fast_reg.page_list_len = i;
1508 frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1509 frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1510 frmr_wr.wr.fast_reg.access_flags = (writing ?
1511 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1512 IB_ACCESS_REMOTE_READ);
1513 frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1514 DECR_CQCOUNT(&r_xprt->rx_ep);
1516 rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1518 if (rc) {
1519 dprintk("RPC: %s: failed ib_post_send for register,"
1520 " status %i\n", __func__, rc);
1521 while (i--)
1522 rpcrdma_unmap_one(ia, --seg);
1523 } else {
1524 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1525 seg1->mr_base = seg1->mr_dma + pageoff;
1526 seg1->mr_nsegs = i;
1527 seg1->mr_len = len;
1529 *nsegs = i;
1530 return rc;
1533 static int
1534 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1535 struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1537 struct rpcrdma_mr_seg *seg1 = seg;
1538 struct ib_send_wr invalidate_wr, *bad_wr;
1539 int rc;
1541 while (seg1->mr_nsegs--)
1542 rpcrdma_unmap_one(ia, seg++);
1544 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1545 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1546 invalidate_wr.send_flags = 0; /* unsignaled */
1547 invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1548 DECR_CQCOUNT(&r_xprt->rx_ep);
1550 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1551 if (rc)
1552 dprintk("RPC: %s: failed ib_post_send for invalidate,"
1553 " status %i\n", __func__, rc);
1554 return rc;
1557 static int
1558 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1559 int *nsegs, int writing, struct rpcrdma_ia *ia)
1561 struct rpcrdma_mr_seg *seg1 = seg;
1562 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1563 int len, pageoff, i, rc;
1565 pageoff = offset_in_page(seg1->mr_offset);
1566 seg1->mr_offset -= pageoff; /* start of page */
1567 seg1->mr_len += pageoff;
1568 len = -pageoff;
1569 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1570 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1571 for (i = 0; i < *nsegs;) {
1572 rpcrdma_map_one(ia, seg, writing);
1573 physaddrs[i] = seg->mr_dma;
1574 len += seg->mr_len;
1575 ++seg;
1576 ++i;
1577 /* Check for holes */
1578 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1579 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1580 break;
1582 rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1583 physaddrs, i, seg1->mr_dma);
1584 if (rc) {
1585 dprintk("RPC: %s: failed ib_map_phys_fmr "
1586 "%u@0x%llx+%i (%d)... status %i\n", __func__,
1587 len, (unsigned long long)seg1->mr_dma,
1588 pageoff, i, rc);
1589 while (i--)
1590 rpcrdma_unmap_one(ia, --seg);
1591 } else {
1592 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1593 seg1->mr_base = seg1->mr_dma + pageoff;
1594 seg1->mr_nsegs = i;
1595 seg1->mr_len = len;
1597 *nsegs = i;
1598 return rc;
1601 static int
1602 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1603 struct rpcrdma_ia *ia)
1605 struct rpcrdma_mr_seg *seg1 = seg;
1606 LIST_HEAD(l);
1607 int rc;
1609 list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1610 rc = ib_unmap_fmr(&l);
1611 while (seg1->mr_nsegs--)
1612 rpcrdma_unmap_one(ia, seg++);
1613 if (rc)
1614 dprintk("RPC: %s: failed ib_unmap_fmr,"
1615 " status %i\n", __func__, rc);
1616 return rc;
1619 static int
1620 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1621 int *nsegs, int writing, struct rpcrdma_ia *ia,
1622 struct rpcrdma_xprt *r_xprt)
1624 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1625 IB_ACCESS_REMOTE_READ);
1626 struct ib_mw_bind param;
1627 int rc;
1629 *nsegs = 1;
1630 rpcrdma_map_one(ia, seg, writing);
1631 param.mr = ia->ri_bind_mem;
1632 param.wr_id = 0ULL; /* no send cookie */
1633 param.addr = seg->mr_dma;
1634 param.length = seg->mr_len;
1635 param.send_flags = 0;
1636 param.mw_access_flags = mem_priv;
1638 DECR_CQCOUNT(&r_xprt->rx_ep);
1639 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1640 if (rc) {
1641 dprintk("RPC: %s: failed ib_bind_mw "
1642 "%u@0x%llx status %i\n",
1643 __func__, seg->mr_len,
1644 (unsigned long long)seg->mr_dma, rc);
1645 rpcrdma_unmap_one(ia, seg);
1646 } else {
1647 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1648 seg->mr_base = param.addr;
1649 seg->mr_nsegs = 1;
1651 return rc;
1654 static int
1655 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1656 struct rpcrdma_ia *ia,
1657 struct rpcrdma_xprt *r_xprt, void **r)
1659 struct ib_mw_bind param;
1660 LIST_HEAD(l);
1661 int rc;
1663 BUG_ON(seg->mr_nsegs != 1);
1664 param.mr = ia->ri_bind_mem;
1665 param.addr = 0ULL; /* unbind */
1666 param.length = 0;
1667 param.mw_access_flags = 0;
1668 if (*r) {
1669 param.wr_id = (u64) (unsigned long) *r;
1670 param.send_flags = IB_SEND_SIGNALED;
1671 INIT_CQCOUNT(&r_xprt->rx_ep);
1672 } else {
1673 param.wr_id = 0ULL;
1674 param.send_flags = 0;
1675 DECR_CQCOUNT(&r_xprt->rx_ep);
1677 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1678 rpcrdma_unmap_one(ia, seg);
1679 if (rc)
1680 dprintk("RPC: %s: failed ib_(un)bind_mw,"
1681 " status %i\n", __func__, rc);
1682 else
1683 *r = NULL; /* will upcall on completion */
1684 return rc;
1687 static int
1688 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1689 int *nsegs, int writing, struct rpcrdma_ia *ia)
1691 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1692 IB_ACCESS_REMOTE_READ);
1693 struct rpcrdma_mr_seg *seg1 = seg;
1694 struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1695 int len, i, rc = 0;
1697 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1698 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1699 for (len = 0, i = 0; i < *nsegs;) {
1700 rpcrdma_map_one(ia, seg, writing);
1701 ipb[i].addr = seg->mr_dma;
1702 ipb[i].size = seg->mr_len;
1703 len += seg->mr_len;
1704 ++seg;
1705 ++i;
1706 /* Check for holes */
1707 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1708 offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1709 break;
1711 seg1->mr_base = seg1->mr_dma;
1712 seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1713 ipb, i, mem_priv, &seg1->mr_base);
1714 if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1715 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1716 dprintk("RPC: %s: failed ib_reg_phys_mr "
1717 "%u@0x%llx (%d)... status %i\n",
1718 __func__, len,
1719 (unsigned long long)seg1->mr_dma, i, rc);
1720 while (i--)
1721 rpcrdma_unmap_one(ia, --seg);
1722 } else {
1723 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1724 seg1->mr_nsegs = i;
1725 seg1->mr_len = len;
1727 *nsegs = i;
1728 return rc;
1731 static int
1732 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1733 struct rpcrdma_ia *ia)
1735 struct rpcrdma_mr_seg *seg1 = seg;
1736 int rc;
1738 rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1739 seg1->mr_chunk.rl_mr = NULL;
1740 while (seg1->mr_nsegs--)
1741 rpcrdma_unmap_one(ia, seg++);
1742 if (rc)
1743 dprintk("RPC: %s: failed ib_dereg_mr,"
1744 " status %i\n", __func__, rc);
1745 return rc;
1749 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1750 int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1752 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1753 int rc = 0;
1755 switch (ia->ri_memreg_strategy) {
1757 #if RPCRDMA_PERSISTENT_REGISTRATION
1758 case RPCRDMA_ALLPHYSICAL:
1759 rpcrdma_map_one(ia, seg, writing);
1760 seg->mr_rkey = ia->ri_bind_mem->rkey;
1761 seg->mr_base = seg->mr_dma;
1762 seg->mr_nsegs = 1;
1763 nsegs = 1;
1764 break;
1765 #endif
1767 /* Registration using frmr registration */
1768 case RPCRDMA_FRMR:
1769 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1770 break;
1772 /* Registration using fmr memory registration */
1773 case RPCRDMA_MTHCAFMR:
1774 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1775 break;
1777 /* Registration using memory windows */
1778 case RPCRDMA_MEMWINDOWS_ASYNC:
1779 case RPCRDMA_MEMWINDOWS:
1780 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1781 break;
1783 /* Default registration each time */
1784 default:
1785 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1786 break;
1788 if (rc)
1789 return -1;
1791 return nsegs;
1795 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1796 struct rpcrdma_xprt *r_xprt, void *r)
1798 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1799 int nsegs = seg->mr_nsegs, rc;
1801 switch (ia->ri_memreg_strategy) {
1803 #if RPCRDMA_PERSISTENT_REGISTRATION
1804 case RPCRDMA_ALLPHYSICAL:
1805 BUG_ON(nsegs != 1);
1806 rpcrdma_unmap_one(ia, seg);
1807 rc = 0;
1808 break;
1809 #endif
1811 case RPCRDMA_FRMR:
1812 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1813 break;
1815 case RPCRDMA_MTHCAFMR:
1816 rc = rpcrdma_deregister_fmr_external(seg, ia);
1817 break;
1819 case RPCRDMA_MEMWINDOWS_ASYNC:
1820 case RPCRDMA_MEMWINDOWS:
1821 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1822 break;
1824 default:
1825 rc = rpcrdma_deregister_default_external(seg, ia);
1826 break;
1828 if (r) {
1829 struct rpcrdma_rep *rep = r;
1830 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1831 rep->rr_func = NULL;
1832 func(rep); /* dereg done, callback now */
1834 return nsegs;
1838 * Prepost any receive buffer, then post send.
1840 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1843 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1844 struct rpcrdma_ep *ep,
1845 struct rpcrdma_req *req)
1847 struct ib_send_wr send_wr, *send_wr_fail;
1848 struct rpcrdma_rep *rep = req->rl_reply;
1849 int rc;
1851 if (rep) {
1852 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1853 if (rc)
1854 goto out;
1855 req->rl_reply = NULL;
1858 send_wr.next = NULL;
1859 send_wr.wr_id = 0ULL; /* no send cookie */
1860 send_wr.sg_list = req->rl_send_iov;
1861 send_wr.num_sge = req->rl_niovs;
1862 send_wr.opcode = IB_WR_SEND;
1863 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1864 ib_dma_sync_single_for_device(ia->ri_id->device,
1865 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1866 DMA_TO_DEVICE);
1867 ib_dma_sync_single_for_device(ia->ri_id->device,
1868 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1869 DMA_TO_DEVICE);
1870 ib_dma_sync_single_for_device(ia->ri_id->device,
1871 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1872 DMA_TO_DEVICE);
1874 if (DECR_CQCOUNT(ep) > 0)
1875 send_wr.send_flags = 0;
1876 else { /* Provider must take a send completion every now and then */
1877 INIT_CQCOUNT(ep);
1878 send_wr.send_flags = IB_SEND_SIGNALED;
1881 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1882 if (rc)
1883 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1884 rc);
1885 out:
1886 return rc;
1890 * (Re)post a receive buffer.
1893 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1894 struct rpcrdma_ep *ep,
1895 struct rpcrdma_rep *rep)
1897 struct ib_recv_wr recv_wr, *recv_wr_fail;
1898 int rc;
1900 recv_wr.next = NULL;
1901 recv_wr.wr_id = (u64) (unsigned long) rep;
1902 recv_wr.sg_list = &rep->rr_iov;
1903 recv_wr.num_sge = 1;
1905 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1906 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1908 DECR_CQCOUNT(ep);
1909 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1911 if (rc)
1912 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1913 rc);
1914 return rc;