RPC/RDMA: optionally emit useful transport info upon connect/disconnect.
[linux-2.6/linux-acpi-2.6/ibm-acpi-2.6.git] / net / sunrpc / xprtrdma / verbs.c
blob170e69cba6c4e58f63ec4ef357b3b006d57e883c
1 /*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * verbs.c
43 * Encapsulates the major functions managing:
44 * o adapters
45 * o endpoints
46 * o connections
47 * o buffer memory
50 #include <linux/pci.h> /* for Tavor hack below */
52 #include "xprt_rdma.h"
55 * Globals/Macros
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY RPCDBG_TRANS
60 #endif
63 * internal functions
67 * handle replies in tasklet context, using a single, global list
68 * rdma tasklet function -- just turn around and call the func
69 * for all replies on the list
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
78 struct rpcrdma_rep *rep;
79 void (*func)(struct rpcrdma_rep *);
80 unsigned long flags;
82 data = data;
83 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84 while (!list_empty(&rpcrdma_tasklets_g)) {
85 rep = list_entry(rpcrdma_tasklets_g.next,
86 struct rpcrdma_rep, rr_list);
87 list_del(&rep->rr_list);
88 func = rep->rr_func;
89 rep->rr_func = NULL;
90 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
92 if (func)
93 func(rep);
94 else
95 rpcrdma_recv_buffer_put(rep);
97 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
99 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
107 unsigned long flags;
109 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110 list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112 tasklet_schedule(&rpcrdma_tasklet_g);
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
118 struct rpcrdma_ep *ep = context;
120 dprintk("RPC: %s: QP error %X on device %s ep %p\n",
121 __func__, event->event, event->device->name, context);
122 if (ep->rep_connected == 1) {
123 ep->rep_connected = -EIO;
124 ep->rep_func(ep);
125 wake_up_all(&ep->rep_connect_wait);
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
132 struct rpcrdma_ep *ep = context;
134 dprintk("RPC: %s: CQ error %X on device %s ep %p\n",
135 __func__, event->event, event->device->name, context);
136 if (ep->rep_connected == 1) {
137 ep->rep_connected = -EIO;
138 ep->rep_func(ep);
139 wake_up_all(&ep->rep_connect_wait);
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
146 struct rpcrdma_rep *rep =
147 (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
149 dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
150 __func__, rep, wc->status, wc->opcode, wc->byte_len);
152 if (!rep) /* send or bind completion that we don't care about */
153 return;
155 if (IB_WC_SUCCESS != wc->status) {
156 dprintk("RPC: %s: %s WC status %X, connection lost\n",
157 __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158 wc->status);
159 rep->rr_len = ~0U;
160 rpcrdma_schedule_tasklet(rep);
161 return;
164 switch (wc->opcode) {
165 case IB_WC_RECV:
166 rep->rr_len = wc->byte_len;
167 ib_dma_sync_single_for_cpu(
168 rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169 rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170 /* Keep (only) the most recent credits, after check validity */
171 if (rep->rr_len >= 16) {
172 struct rpcrdma_msg *p =
173 (struct rpcrdma_msg *) rep->rr_base;
174 unsigned int credits = ntohl(p->rm_credit);
175 if (credits == 0) {
176 dprintk("RPC: %s: server"
177 " dropped credits to 0!\n", __func__);
178 /* don't deadlock */
179 credits = 1;
180 } else if (credits > rep->rr_buffer->rb_max_requests) {
181 dprintk("RPC: %s: server"
182 " over-crediting: %d (%d)\n",
183 __func__, credits,
184 rep->rr_buffer->rb_max_requests);
185 credits = rep->rr_buffer->rb_max_requests;
187 atomic_set(&rep->rr_buffer->rb_credits, credits);
189 /* fall through */
190 case IB_WC_BIND_MW:
191 rpcrdma_schedule_tasklet(rep);
192 break;
193 default:
194 dprintk("RPC: %s: unexpected WC event %X\n",
195 __func__, wc->opcode);
196 break;
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
203 struct ib_wc wc;
204 int rc;
206 for (;;) {
207 rc = ib_poll_cq(cq, 1, &wc);
208 if (rc < 0) {
209 dprintk("RPC: %s: ib_poll_cq failed %i\n",
210 __func__, rc);
211 return rc;
213 if (rc == 0)
214 break;
216 rpcrdma_event_process(&wc);
219 return 0;
223 * rpcrdma_cq_event_upcall
225 * This upcall handles recv, send, bind and unbind events.
226 * It is reentrant but processes single events in order to maintain
227 * ordering of receives to keep server credits.
229 * It is the responsibility of the scheduled tasklet to return
230 * recv buffers to the pool. NOTE: this affects synchronization of
231 * connection shutdown. That is, the structures required for
232 * the completion of the reply handler must remain intact until
233 * all memory has been reclaimed.
235 * Note that send events are suppressed and do not result in an upcall.
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
240 int rc;
242 rc = rpcrdma_cq_poll(cq);
243 if (rc)
244 return;
246 rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247 if (rc) {
248 dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
249 __func__, rc);
250 return;
253 rpcrdma_cq_poll(cq);
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258 "address resolved",
259 "address error",
260 "route resolved",
261 "route error",
262 "connect request",
263 "connect response",
264 "connect error",
265 "unreachable",
266 "rejected",
267 "established",
268 "disconnected",
269 "device removal"
271 #endif
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
276 struct rpcrdma_xprt *xprt = id->context;
277 struct rpcrdma_ia *ia = &xprt->rx_ia;
278 struct rpcrdma_ep *ep = &xprt->rx_ep;
279 struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280 struct ib_qp_attr attr;
281 struct ib_qp_init_attr iattr;
282 int connstate = 0;
284 switch (event->event) {
285 case RDMA_CM_EVENT_ADDR_RESOLVED:
286 case RDMA_CM_EVENT_ROUTE_RESOLVED:
287 ia->ri_async_rc = 0;
288 complete(&ia->ri_done);
289 break;
290 case RDMA_CM_EVENT_ADDR_ERROR:
291 ia->ri_async_rc = -EHOSTUNREACH;
292 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
293 __func__, ep);
294 complete(&ia->ri_done);
295 break;
296 case RDMA_CM_EVENT_ROUTE_ERROR:
297 ia->ri_async_rc = -ENETUNREACH;
298 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
299 __func__, ep);
300 complete(&ia->ri_done);
301 break;
302 case RDMA_CM_EVENT_ESTABLISHED:
303 connstate = 1;
304 ib_query_qp(ia->ri_id->qp, &attr,
305 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
306 &iattr);
307 dprintk("RPC: %s: %d responder resources"
308 " (%d initiator)\n",
309 __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
310 goto connected;
311 case RDMA_CM_EVENT_CONNECT_ERROR:
312 connstate = -ENOTCONN;
313 goto connected;
314 case RDMA_CM_EVENT_UNREACHABLE:
315 connstate = -ENETDOWN;
316 goto connected;
317 case RDMA_CM_EVENT_REJECTED:
318 connstate = -ECONNREFUSED;
319 goto connected;
320 case RDMA_CM_EVENT_DISCONNECTED:
321 connstate = -ECONNABORTED;
322 goto connected;
323 case RDMA_CM_EVENT_DEVICE_REMOVAL:
324 connstate = -ENODEV;
325 connected:
326 dprintk("RPC: %s: %s: %u.%u.%u.%u:%u"
327 " (ep 0x%p event 0x%x)\n",
328 __func__,
329 (event->event <= 11) ? conn[event->event] :
330 "unknown connection error",
331 NIPQUAD(addr->sin_addr.s_addr),
332 ntohs(addr->sin_port),
333 ep, event->event);
334 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
335 dprintk("RPC: %s: %sconnected\n",
336 __func__, connstate > 0 ? "" : "dis");
337 ep->rep_connected = connstate;
338 ep->rep_func(ep);
339 wake_up_all(&ep->rep_connect_wait);
340 break;
341 default:
342 dprintk("RPC: %s: unexpected CM event %d\n",
343 __func__, event->event);
344 break;
347 #ifdef RPC_DEBUG
348 if (connstate == 1) {
349 int ird = attr.max_dest_rd_atomic;
350 int tird = ep->rep_remote_cma.responder_resources;
351 printk(KERN_INFO "rpcrdma: connection to %u.%u.%u.%u:%u "
352 "on %s, memreg %d slots %d ird %d%s\n",
353 NIPQUAD(addr->sin_addr.s_addr),
354 ntohs(addr->sin_port),
355 ia->ri_id->device->name,
356 ia->ri_memreg_strategy,
357 xprt->rx_buf.rb_max_requests,
358 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
359 } else if (connstate < 0) {
360 printk(KERN_INFO "rpcrdma: connection to %u.%u.%u.%u:%u "
361 "closed (%d)\n",
362 NIPQUAD(addr->sin_addr.s_addr),
363 ntohs(addr->sin_port),
364 connstate);
366 #endif
368 return 0;
371 static struct rdma_cm_id *
372 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
373 struct rpcrdma_ia *ia, struct sockaddr *addr)
375 struct rdma_cm_id *id;
376 int rc;
378 init_completion(&ia->ri_done);
380 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
381 if (IS_ERR(id)) {
382 rc = PTR_ERR(id);
383 dprintk("RPC: %s: rdma_create_id() failed %i\n",
384 __func__, rc);
385 return id;
388 ia->ri_async_rc = -ETIMEDOUT;
389 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
390 if (rc) {
391 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
392 __func__, rc);
393 goto out;
395 wait_for_completion_interruptible_timeout(&ia->ri_done,
396 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
397 rc = ia->ri_async_rc;
398 if (rc)
399 goto out;
401 ia->ri_async_rc = -ETIMEDOUT;
402 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
403 if (rc) {
404 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
405 __func__, rc);
406 goto out;
408 wait_for_completion_interruptible_timeout(&ia->ri_done,
409 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
410 rc = ia->ri_async_rc;
411 if (rc)
412 goto out;
414 return id;
416 out:
417 rdma_destroy_id(id);
418 return ERR_PTR(rc);
422 * Drain any cq, prior to teardown.
424 static void
425 rpcrdma_clean_cq(struct ib_cq *cq)
427 struct ib_wc wc;
428 int count = 0;
430 while (1 == ib_poll_cq(cq, 1, &wc))
431 ++count;
433 if (count)
434 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
435 __func__, count, wc.opcode);
439 * Exported functions.
443 * Open and initialize an Interface Adapter.
444 * o initializes fields of struct rpcrdma_ia, including
445 * interface and provider attributes and protection zone.
448 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
450 int rc, mem_priv;
451 struct ib_device_attr devattr;
452 struct rpcrdma_ia *ia = &xprt->rx_ia;
454 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
455 if (IS_ERR(ia->ri_id)) {
456 rc = PTR_ERR(ia->ri_id);
457 goto out1;
460 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
461 if (IS_ERR(ia->ri_pd)) {
462 rc = PTR_ERR(ia->ri_pd);
463 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
464 __func__, rc);
465 goto out2;
469 * Query the device to determine if the requested memory
470 * registration strategy is supported. If it isn't, set the
471 * strategy to a globally supported model.
473 rc = ib_query_device(ia->ri_id->device, &devattr);
474 if (rc) {
475 dprintk("RPC: %s: ib_query_device failed %d\n",
476 __func__, rc);
477 goto out2;
480 if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
481 ia->ri_have_dma_lkey = 1;
482 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
485 switch (memreg) {
486 case RPCRDMA_MEMWINDOWS:
487 case RPCRDMA_MEMWINDOWS_ASYNC:
488 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
489 dprintk("RPC: %s: MEMWINDOWS registration "
490 "specified but not supported by adapter, "
491 "using slower RPCRDMA_REGISTER\n",
492 __func__);
493 memreg = RPCRDMA_REGISTER;
495 break;
496 case RPCRDMA_MTHCAFMR:
497 if (!ia->ri_id->device->alloc_fmr) {
498 #if RPCRDMA_PERSISTENT_REGISTRATION
499 dprintk("RPC: %s: MTHCAFMR registration "
500 "specified but not supported by adapter, "
501 "using riskier RPCRDMA_ALLPHYSICAL\n",
502 __func__);
503 memreg = RPCRDMA_ALLPHYSICAL;
504 #else
505 dprintk("RPC: %s: MTHCAFMR registration "
506 "specified but not supported by adapter, "
507 "using slower RPCRDMA_REGISTER\n",
508 __func__);
509 memreg = RPCRDMA_REGISTER;
510 #endif
512 break;
513 case RPCRDMA_FRMR:
514 /* Requires both frmr reg and local dma lkey */
515 if ((devattr.device_cap_flags &
516 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
517 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
518 #if RPCRDMA_PERSISTENT_REGISTRATION
519 dprintk("RPC: %s: FRMR registration "
520 "specified but not supported by adapter, "
521 "using riskier RPCRDMA_ALLPHYSICAL\n",
522 __func__);
523 memreg = RPCRDMA_ALLPHYSICAL;
524 #else
525 dprintk("RPC: %s: FRMR registration "
526 "specified but not supported by adapter, "
527 "using slower RPCRDMA_REGISTER\n",
528 __func__);
529 memreg = RPCRDMA_REGISTER;
530 #endif
532 break;
536 * Optionally obtain an underlying physical identity mapping in
537 * order to do a memory window-based bind. This base registration
538 * is protected from remote access - that is enabled only by binding
539 * for the specific bytes targeted during each RPC operation, and
540 * revoked after the corresponding completion similar to a storage
541 * adapter.
543 switch (memreg) {
544 case RPCRDMA_BOUNCEBUFFERS:
545 case RPCRDMA_REGISTER:
546 case RPCRDMA_FRMR:
547 break;
548 #if RPCRDMA_PERSISTENT_REGISTRATION
549 case RPCRDMA_ALLPHYSICAL:
550 mem_priv = IB_ACCESS_LOCAL_WRITE |
551 IB_ACCESS_REMOTE_WRITE |
552 IB_ACCESS_REMOTE_READ;
553 goto register_setup;
554 #endif
555 case RPCRDMA_MEMWINDOWS_ASYNC:
556 case RPCRDMA_MEMWINDOWS:
557 mem_priv = IB_ACCESS_LOCAL_WRITE |
558 IB_ACCESS_MW_BIND;
559 goto register_setup;
560 case RPCRDMA_MTHCAFMR:
561 if (ia->ri_have_dma_lkey)
562 break;
563 mem_priv = IB_ACCESS_LOCAL_WRITE;
564 register_setup:
565 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
566 if (IS_ERR(ia->ri_bind_mem)) {
567 printk(KERN_ALERT "%s: ib_get_dma_mr for "
568 "phys register failed with %lX\n\t"
569 "Will continue with degraded performance\n",
570 __func__, PTR_ERR(ia->ri_bind_mem));
571 memreg = RPCRDMA_REGISTER;
572 ia->ri_bind_mem = NULL;
574 break;
575 default:
576 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
577 __func__, memreg);
578 rc = -EINVAL;
579 goto out2;
581 dprintk("RPC: %s: memory registration strategy is %d\n",
582 __func__, memreg);
584 /* Else will do memory reg/dereg for each chunk */
585 ia->ri_memreg_strategy = memreg;
587 return 0;
588 out2:
589 rdma_destroy_id(ia->ri_id);
590 ia->ri_id = NULL;
591 out1:
592 return rc;
596 * Clean up/close an IA.
597 * o if event handles and PD have been initialized, free them.
598 * o close the IA
600 void
601 rpcrdma_ia_close(struct rpcrdma_ia *ia)
603 int rc;
605 dprintk("RPC: %s: entering\n", __func__);
606 if (ia->ri_bind_mem != NULL) {
607 rc = ib_dereg_mr(ia->ri_bind_mem);
608 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
609 __func__, rc);
611 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
612 if (ia->ri_id->qp)
613 rdma_destroy_qp(ia->ri_id);
614 rdma_destroy_id(ia->ri_id);
615 ia->ri_id = NULL;
617 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
618 rc = ib_dealloc_pd(ia->ri_pd);
619 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
620 __func__, rc);
625 * Create unconnected endpoint.
628 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
629 struct rpcrdma_create_data_internal *cdata)
631 struct ib_device_attr devattr;
632 int rc, err;
634 rc = ib_query_device(ia->ri_id->device, &devattr);
635 if (rc) {
636 dprintk("RPC: %s: ib_query_device failed %d\n",
637 __func__, rc);
638 return rc;
641 /* check provider's send/recv wr limits */
642 if (cdata->max_requests > devattr.max_qp_wr)
643 cdata->max_requests = devattr.max_qp_wr;
645 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
646 ep->rep_attr.qp_context = ep;
647 /* send_cq and recv_cq initialized below */
648 ep->rep_attr.srq = NULL;
649 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
650 switch (ia->ri_memreg_strategy) {
651 case RPCRDMA_FRMR:
652 /* Add room for frmr register and invalidate WRs */
653 ep->rep_attr.cap.max_send_wr *= 3;
654 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
655 return -EINVAL;
656 break;
657 case RPCRDMA_MEMWINDOWS_ASYNC:
658 case RPCRDMA_MEMWINDOWS:
659 /* Add room for mw_binds+unbinds - overkill! */
660 ep->rep_attr.cap.max_send_wr++;
661 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
662 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
663 return -EINVAL;
664 break;
665 default:
666 break;
668 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
669 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
670 ep->rep_attr.cap.max_recv_sge = 1;
671 ep->rep_attr.cap.max_inline_data = 0;
672 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
673 ep->rep_attr.qp_type = IB_QPT_RC;
674 ep->rep_attr.port_num = ~0;
676 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
677 "iovs: send %d recv %d\n",
678 __func__,
679 ep->rep_attr.cap.max_send_wr,
680 ep->rep_attr.cap.max_recv_wr,
681 ep->rep_attr.cap.max_send_sge,
682 ep->rep_attr.cap.max_recv_sge);
684 /* set trigger for requesting send completion */
685 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
686 switch (ia->ri_memreg_strategy) {
687 case RPCRDMA_MEMWINDOWS_ASYNC:
688 case RPCRDMA_MEMWINDOWS:
689 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
690 break;
691 default:
692 break;
694 if (ep->rep_cqinit <= 2)
695 ep->rep_cqinit = 0;
696 INIT_CQCOUNT(ep);
697 ep->rep_ia = ia;
698 init_waitqueue_head(&ep->rep_connect_wait);
701 * Create a single cq for receive dto and mw_bind (only ever
702 * care about unbind, really). Send completions are suppressed.
703 * Use single threaded tasklet upcalls to maintain ordering.
705 ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
706 rpcrdma_cq_async_error_upcall, NULL,
707 ep->rep_attr.cap.max_recv_wr +
708 ep->rep_attr.cap.max_send_wr + 1, 0);
709 if (IS_ERR(ep->rep_cq)) {
710 rc = PTR_ERR(ep->rep_cq);
711 dprintk("RPC: %s: ib_create_cq failed: %i\n",
712 __func__, rc);
713 goto out1;
716 rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
717 if (rc) {
718 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
719 __func__, rc);
720 goto out2;
723 ep->rep_attr.send_cq = ep->rep_cq;
724 ep->rep_attr.recv_cq = ep->rep_cq;
726 /* Initialize cma parameters */
728 /* RPC/RDMA does not use private data */
729 ep->rep_remote_cma.private_data = NULL;
730 ep->rep_remote_cma.private_data_len = 0;
732 /* Client offers RDMA Read but does not initiate */
733 ep->rep_remote_cma.initiator_depth = 0;
734 if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
735 ep->rep_remote_cma.responder_resources = 0;
736 else if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
737 ep->rep_remote_cma.responder_resources = 32;
738 else
739 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
741 ep->rep_remote_cma.retry_count = 7;
742 ep->rep_remote_cma.flow_control = 0;
743 ep->rep_remote_cma.rnr_retry_count = 0;
745 return 0;
747 out2:
748 err = ib_destroy_cq(ep->rep_cq);
749 if (err)
750 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
751 __func__, err);
752 out1:
753 return rc;
757 * rpcrdma_ep_destroy
759 * Disconnect and destroy endpoint. After this, the only
760 * valid operations on the ep are to free it (if dynamically
761 * allocated) or re-create it.
763 * The caller's error handling must be sure to not leak the endpoint
764 * if this function fails.
767 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
769 int rc;
771 dprintk("RPC: %s: entering, connected is %d\n",
772 __func__, ep->rep_connected);
774 if (ia->ri_id->qp) {
775 rc = rpcrdma_ep_disconnect(ep, ia);
776 if (rc)
777 dprintk("RPC: %s: rpcrdma_ep_disconnect"
778 " returned %i\n", __func__, rc);
779 rdma_destroy_qp(ia->ri_id);
780 ia->ri_id->qp = NULL;
783 /* padding - could be done in rpcrdma_buffer_destroy... */
784 if (ep->rep_pad_mr) {
785 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
786 ep->rep_pad_mr = NULL;
789 rpcrdma_clean_cq(ep->rep_cq);
790 rc = ib_destroy_cq(ep->rep_cq);
791 if (rc)
792 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
793 __func__, rc);
795 return rc;
799 * Connect unconnected endpoint.
802 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
804 struct rdma_cm_id *id;
805 int rc = 0;
806 int retry_count = 0;
807 int reconnect = (ep->rep_connected != 0);
809 if (reconnect) {
810 struct rpcrdma_xprt *xprt;
811 retry:
812 rc = rpcrdma_ep_disconnect(ep, ia);
813 if (rc && rc != -ENOTCONN)
814 dprintk("RPC: %s: rpcrdma_ep_disconnect"
815 " status %i\n", __func__, rc);
816 rpcrdma_clean_cq(ep->rep_cq);
818 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
819 id = rpcrdma_create_id(xprt, ia,
820 (struct sockaddr *)&xprt->rx_data.addr);
821 if (IS_ERR(id)) {
822 rc = PTR_ERR(id);
823 goto out;
825 /* TEMP TEMP TEMP - fail if new device:
826 * Deregister/remarshal *all* requests!
827 * Close and recreate adapter, pd, etc!
828 * Re-determine all attributes still sane!
829 * More stuff I haven't thought of!
830 * Rrrgh!
832 if (ia->ri_id->device != id->device) {
833 printk("RPC: %s: can't reconnect on "
834 "different device!\n", __func__);
835 rdma_destroy_id(id);
836 rc = -ENETDOWN;
837 goto out;
839 /* END TEMP */
840 rdma_destroy_qp(ia->ri_id);
841 rdma_destroy_id(ia->ri_id);
842 ia->ri_id = id;
845 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
846 if (rc) {
847 dprintk("RPC: %s: rdma_create_qp failed %i\n",
848 __func__, rc);
849 goto out;
852 /* XXX Tavor device performs badly with 2K MTU! */
853 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
854 struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
855 if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
856 (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
857 pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
858 struct ib_qp_attr attr = {
859 .path_mtu = IB_MTU_1024
861 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
865 ep->rep_connected = 0;
867 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
868 if (rc) {
869 dprintk("RPC: %s: rdma_connect() failed with %i\n",
870 __func__, rc);
871 goto out;
874 if (reconnect)
875 return 0;
877 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
880 * Check state. A non-peer reject indicates no listener
881 * (ECONNREFUSED), which may be a transient state. All
882 * others indicate a transport condition which has already
883 * undergone a best-effort.
885 if (ep->rep_connected == -ECONNREFUSED
886 && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
887 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
888 goto retry;
890 if (ep->rep_connected <= 0) {
891 /* Sometimes, the only way to reliably connect to remote
892 * CMs is to use same nonzero values for ORD and IRD. */
893 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
894 (ep->rep_remote_cma.responder_resources == 0 ||
895 ep->rep_remote_cma.initiator_depth !=
896 ep->rep_remote_cma.responder_resources)) {
897 if (ep->rep_remote_cma.responder_resources == 0)
898 ep->rep_remote_cma.responder_resources = 1;
899 ep->rep_remote_cma.initiator_depth =
900 ep->rep_remote_cma.responder_resources;
901 goto retry;
903 rc = ep->rep_connected;
904 } else {
905 dprintk("RPC: %s: connected\n", __func__);
908 out:
909 if (rc)
910 ep->rep_connected = rc;
911 return rc;
915 * rpcrdma_ep_disconnect
917 * This is separate from destroy to facilitate the ability
918 * to reconnect without recreating the endpoint.
920 * This call is not reentrant, and must not be made in parallel
921 * on the same endpoint.
924 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
926 int rc;
928 rpcrdma_clean_cq(ep->rep_cq);
929 rc = rdma_disconnect(ia->ri_id);
930 if (!rc) {
931 /* returns without wait if not connected */
932 wait_event_interruptible(ep->rep_connect_wait,
933 ep->rep_connected != 1);
934 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
935 (ep->rep_connected == 1) ? "still " : "dis");
936 } else {
937 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
938 ep->rep_connected = rc;
940 return rc;
944 * Initialize buffer memory
947 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
948 struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
950 char *p;
951 size_t len;
952 int i, rc;
953 struct rpcrdma_mw *r;
955 buf->rb_max_requests = cdata->max_requests;
956 spin_lock_init(&buf->rb_lock);
957 atomic_set(&buf->rb_credits, 1);
959 /* Need to allocate:
960 * 1. arrays for send and recv pointers
961 * 2. arrays of struct rpcrdma_req to fill in pointers
962 * 3. array of struct rpcrdma_rep for replies
963 * 4. padding, if any
964 * 5. mw's, fmr's or frmr's, if any
965 * Send/recv buffers in req/rep need to be registered
968 len = buf->rb_max_requests *
969 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
970 len += cdata->padding;
971 switch (ia->ri_memreg_strategy) {
972 case RPCRDMA_FRMR:
973 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
974 sizeof(struct rpcrdma_mw);
975 break;
976 case RPCRDMA_MTHCAFMR:
977 /* TBD we are perhaps overallocating here */
978 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
979 sizeof(struct rpcrdma_mw);
980 break;
981 case RPCRDMA_MEMWINDOWS_ASYNC:
982 case RPCRDMA_MEMWINDOWS:
983 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
984 sizeof(struct rpcrdma_mw);
985 break;
986 default:
987 break;
990 /* allocate 1, 4 and 5 in one shot */
991 p = kzalloc(len, GFP_KERNEL);
992 if (p == NULL) {
993 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
994 __func__, len);
995 rc = -ENOMEM;
996 goto out;
998 buf->rb_pool = p; /* for freeing it later */
1000 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1001 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1002 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1003 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1006 * Register the zeroed pad buffer, if any.
1008 if (cdata->padding) {
1009 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1010 &ep->rep_pad_mr, &ep->rep_pad);
1011 if (rc)
1012 goto out;
1014 p += cdata->padding;
1017 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1018 * We "cycle" the mw's in order to minimize rkey reuse,
1019 * and also reduce unbind-to-bind collision.
1021 INIT_LIST_HEAD(&buf->rb_mws);
1022 r = (struct rpcrdma_mw *)p;
1023 switch (ia->ri_memreg_strategy) {
1024 case RPCRDMA_FRMR:
1025 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1026 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1027 RPCRDMA_MAX_SEGS);
1028 if (IS_ERR(r->r.frmr.fr_mr)) {
1029 rc = PTR_ERR(r->r.frmr.fr_mr);
1030 dprintk("RPC: %s: ib_alloc_fast_reg_mr"
1031 " failed %i\n", __func__, rc);
1032 goto out;
1034 r->r.frmr.fr_pgl =
1035 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1036 RPCRDMA_MAX_SEGS);
1037 if (IS_ERR(r->r.frmr.fr_pgl)) {
1038 rc = PTR_ERR(r->r.frmr.fr_pgl);
1039 dprintk("RPC: %s: "
1040 "ib_alloc_fast_reg_page_list "
1041 "failed %i\n", __func__, rc);
1042 goto out;
1044 list_add(&r->mw_list, &buf->rb_mws);
1045 ++r;
1047 break;
1048 case RPCRDMA_MTHCAFMR:
1049 /* TBD we are perhaps overallocating here */
1050 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1051 static struct ib_fmr_attr fa =
1052 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1053 r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1054 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1055 &fa);
1056 if (IS_ERR(r->r.fmr)) {
1057 rc = PTR_ERR(r->r.fmr);
1058 dprintk("RPC: %s: ib_alloc_fmr"
1059 " failed %i\n", __func__, rc);
1060 goto out;
1062 list_add(&r->mw_list, &buf->rb_mws);
1063 ++r;
1065 break;
1066 case RPCRDMA_MEMWINDOWS_ASYNC:
1067 case RPCRDMA_MEMWINDOWS:
1068 /* Allocate one extra request's worth, for full cycling */
1069 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1070 r->r.mw = ib_alloc_mw(ia->ri_pd);
1071 if (IS_ERR(r->r.mw)) {
1072 rc = PTR_ERR(r->r.mw);
1073 dprintk("RPC: %s: ib_alloc_mw"
1074 " failed %i\n", __func__, rc);
1075 goto out;
1077 list_add(&r->mw_list, &buf->rb_mws);
1078 ++r;
1080 break;
1081 default:
1082 break;
1086 * Allocate/init the request/reply buffers. Doing this
1087 * using kmalloc for now -- one for each buf.
1089 for (i = 0; i < buf->rb_max_requests; i++) {
1090 struct rpcrdma_req *req;
1091 struct rpcrdma_rep *rep;
1093 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1094 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1095 /* Typical ~2400b, so rounding up saves work later */
1096 if (len < 4096)
1097 len = 4096;
1098 req = kmalloc(len, GFP_KERNEL);
1099 if (req == NULL) {
1100 dprintk("RPC: %s: request buffer %d alloc"
1101 " failed\n", __func__, i);
1102 rc = -ENOMEM;
1103 goto out;
1105 memset(req, 0, sizeof(struct rpcrdma_req));
1106 buf->rb_send_bufs[i] = req;
1107 buf->rb_send_bufs[i]->rl_buffer = buf;
1109 rc = rpcrdma_register_internal(ia, req->rl_base,
1110 len - offsetof(struct rpcrdma_req, rl_base),
1111 &buf->rb_send_bufs[i]->rl_handle,
1112 &buf->rb_send_bufs[i]->rl_iov);
1113 if (rc)
1114 goto out;
1116 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1118 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1119 rep = kmalloc(len, GFP_KERNEL);
1120 if (rep == NULL) {
1121 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1122 __func__, i);
1123 rc = -ENOMEM;
1124 goto out;
1126 memset(rep, 0, sizeof(struct rpcrdma_rep));
1127 buf->rb_recv_bufs[i] = rep;
1128 buf->rb_recv_bufs[i]->rr_buffer = buf;
1129 init_waitqueue_head(&rep->rr_unbind);
1131 rc = rpcrdma_register_internal(ia, rep->rr_base,
1132 len - offsetof(struct rpcrdma_rep, rr_base),
1133 &buf->rb_recv_bufs[i]->rr_handle,
1134 &buf->rb_recv_bufs[i]->rr_iov);
1135 if (rc)
1136 goto out;
1139 dprintk("RPC: %s: max_requests %d\n",
1140 __func__, buf->rb_max_requests);
1141 /* done */
1142 return 0;
1143 out:
1144 rpcrdma_buffer_destroy(buf);
1145 return rc;
1149 * Unregister and destroy buffer memory. Need to deal with
1150 * partial initialization, so it's callable from failed create.
1151 * Must be called before destroying endpoint, as registrations
1152 * reference it.
1154 void
1155 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1157 int rc, i;
1158 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1159 struct rpcrdma_mw *r;
1161 /* clean up in reverse order from create
1162 * 1. recv mr memory (mr free, then kfree)
1163 * 1a. bind mw memory
1164 * 2. send mr memory (mr free, then kfree)
1165 * 3. padding (if any) [moved to rpcrdma_ep_destroy]
1166 * 4. arrays
1168 dprintk("RPC: %s: entering\n", __func__);
1170 for (i = 0; i < buf->rb_max_requests; i++) {
1171 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1172 rpcrdma_deregister_internal(ia,
1173 buf->rb_recv_bufs[i]->rr_handle,
1174 &buf->rb_recv_bufs[i]->rr_iov);
1175 kfree(buf->rb_recv_bufs[i]);
1177 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1178 while (!list_empty(&buf->rb_mws)) {
1179 r = list_entry(buf->rb_mws.next,
1180 struct rpcrdma_mw, mw_list);
1181 list_del(&r->mw_list);
1182 switch (ia->ri_memreg_strategy) {
1183 case RPCRDMA_FRMR:
1184 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1185 if (rc)
1186 dprintk("RPC: %s:"
1187 " ib_dereg_mr"
1188 " failed %i\n",
1189 __func__, rc);
1190 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1191 break;
1192 case RPCRDMA_MTHCAFMR:
1193 rc = ib_dealloc_fmr(r->r.fmr);
1194 if (rc)
1195 dprintk("RPC: %s:"
1196 " ib_dealloc_fmr"
1197 " failed %i\n",
1198 __func__, rc);
1199 break;
1200 case RPCRDMA_MEMWINDOWS_ASYNC:
1201 case RPCRDMA_MEMWINDOWS:
1202 rc = ib_dealloc_mw(r->r.mw);
1203 if (rc)
1204 dprintk("RPC: %s:"
1205 " ib_dealloc_mw"
1206 " failed %i\n",
1207 __func__, rc);
1208 break;
1209 default:
1210 break;
1213 rpcrdma_deregister_internal(ia,
1214 buf->rb_send_bufs[i]->rl_handle,
1215 &buf->rb_send_bufs[i]->rl_iov);
1216 kfree(buf->rb_send_bufs[i]);
1220 kfree(buf->rb_pool);
1224 * Get a set of request/reply buffers.
1226 * Reply buffer (if needed) is attached to send buffer upon return.
1227 * Rule:
1228 * rb_send_index and rb_recv_index MUST always be pointing to the
1229 * *next* available buffer (non-NULL). They are incremented after
1230 * removing buffers, and decremented *before* returning them.
1232 struct rpcrdma_req *
1233 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1235 struct rpcrdma_req *req;
1236 unsigned long flags;
1237 int i;
1238 struct rpcrdma_mw *r;
1240 spin_lock_irqsave(&buffers->rb_lock, flags);
1241 if (buffers->rb_send_index == buffers->rb_max_requests) {
1242 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1243 dprintk("RPC: %s: out of request buffers\n", __func__);
1244 return ((struct rpcrdma_req *)NULL);
1247 req = buffers->rb_send_bufs[buffers->rb_send_index];
1248 if (buffers->rb_send_index < buffers->rb_recv_index) {
1249 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1250 __func__,
1251 buffers->rb_recv_index - buffers->rb_send_index);
1252 req->rl_reply = NULL;
1253 } else {
1254 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1255 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1257 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1258 if (!list_empty(&buffers->rb_mws)) {
1259 i = RPCRDMA_MAX_SEGS - 1;
1260 do {
1261 r = list_entry(buffers->rb_mws.next,
1262 struct rpcrdma_mw, mw_list);
1263 list_del(&r->mw_list);
1264 req->rl_segments[i].mr_chunk.rl_mw = r;
1265 } while (--i >= 0);
1267 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1268 return req;
1272 * Put request/reply buffers back into pool.
1273 * Pre-decrement counter/array index.
1275 void
1276 rpcrdma_buffer_put(struct rpcrdma_req *req)
1278 struct rpcrdma_buffer *buffers = req->rl_buffer;
1279 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1280 int i;
1281 unsigned long flags;
1283 BUG_ON(req->rl_nchunks != 0);
1284 spin_lock_irqsave(&buffers->rb_lock, flags);
1285 buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1286 req->rl_niovs = 0;
1287 if (req->rl_reply) {
1288 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1289 init_waitqueue_head(&req->rl_reply->rr_unbind);
1290 req->rl_reply->rr_func = NULL;
1291 req->rl_reply = NULL;
1293 switch (ia->ri_memreg_strategy) {
1294 case RPCRDMA_FRMR:
1295 case RPCRDMA_MTHCAFMR:
1296 case RPCRDMA_MEMWINDOWS_ASYNC:
1297 case RPCRDMA_MEMWINDOWS:
1299 * Cycle mw's back in reverse order, and "spin" them.
1300 * This delays and scrambles reuse as much as possible.
1302 i = 1;
1303 do {
1304 struct rpcrdma_mw **mw;
1305 mw = &req->rl_segments[i].mr_chunk.rl_mw;
1306 list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1307 *mw = NULL;
1308 } while (++i < RPCRDMA_MAX_SEGS);
1309 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1310 &buffers->rb_mws);
1311 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1312 break;
1313 default:
1314 break;
1316 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1320 * Recover reply buffers from pool.
1321 * This happens when recovering from error conditions.
1322 * Post-increment counter/array index.
1324 void
1325 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1327 struct rpcrdma_buffer *buffers = req->rl_buffer;
1328 unsigned long flags;
1330 if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
1331 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1332 spin_lock_irqsave(&buffers->rb_lock, flags);
1333 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1334 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1335 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1337 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1341 * Put reply buffers back into pool when not attached to
1342 * request. This happens in error conditions, and when
1343 * aborting unbinds. Pre-decrement counter/array index.
1345 void
1346 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1348 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1349 unsigned long flags;
1351 rep->rr_func = NULL;
1352 spin_lock_irqsave(&buffers->rb_lock, flags);
1353 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1354 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1358 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1362 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1363 struct ib_mr **mrp, struct ib_sge *iov)
1365 struct ib_phys_buf ipb;
1366 struct ib_mr *mr;
1367 int rc;
1370 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1372 iov->addr = ib_dma_map_single(ia->ri_id->device,
1373 va, len, DMA_BIDIRECTIONAL);
1374 iov->length = len;
1376 if (ia->ri_have_dma_lkey) {
1377 *mrp = NULL;
1378 iov->lkey = ia->ri_dma_lkey;
1379 return 0;
1380 } else if (ia->ri_bind_mem != NULL) {
1381 *mrp = NULL;
1382 iov->lkey = ia->ri_bind_mem->lkey;
1383 return 0;
1386 ipb.addr = iov->addr;
1387 ipb.size = iov->length;
1388 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1389 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1391 dprintk("RPC: %s: phys convert: 0x%llx "
1392 "registered 0x%llx length %d\n",
1393 __func__, (unsigned long long)ipb.addr,
1394 (unsigned long long)iov->addr, len);
1396 if (IS_ERR(mr)) {
1397 *mrp = NULL;
1398 rc = PTR_ERR(mr);
1399 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1400 } else {
1401 *mrp = mr;
1402 iov->lkey = mr->lkey;
1403 rc = 0;
1406 return rc;
1410 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1411 struct ib_mr *mr, struct ib_sge *iov)
1413 int rc;
1415 ib_dma_unmap_single(ia->ri_id->device,
1416 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1418 if (NULL == mr)
1419 return 0;
1421 rc = ib_dereg_mr(mr);
1422 if (rc)
1423 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1424 return rc;
1428 * Wrappers for chunk registration, shared by read/write chunk code.
1431 static void
1432 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1434 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1435 seg->mr_dmalen = seg->mr_len;
1436 if (seg->mr_page)
1437 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1438 seg->mr_page, offset_in_page(seg->mr_offset),
1439 seg->mr_dmalen, seg->mr_dir);
1440 else
1441 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1442 seg->mr_offset,
1443 seg->mr_dmalen, seg->mr_dir);
1446 static void
1447 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1449 if (seg->mr_page)
1450 ib_dma_unmap_page(ia->ri_id->device,
1451 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1452 else
1453 ib_dma_unmap_single(ia->ri_id->device,
1454 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1457 static int
1458 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1459 int *nsegs, int writing, struct rpcrdma_ia *ia,
1460 struct rpcrdma_xprt *r_xprt)
1462 struct rpcrdma_mr_seg *seg1 = seg;
1463 struct ib_send_wr frmr_wr, *bad_wr;
1464 u8 key;
1465 int len, pageoff;
1466 int i, rc;
1468 pageoff = offset_in_page(seg1->mr_offset);
1469 seg1->mr_offset -= pageoff; /* start of page */
1470 seg1->mr_len += pageoff;
1471 len = -pageoff;
1472 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1473 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1474 for (i = 0; i < *nsegs;) {
1475 rpcrdma_map_one(ia, seg, writing);
1476 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1477 len += seg->mr_len;
1478 ++seg;
1479 ++i;
1480 /* Check for holes */
1481 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1482 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1483 break;
1485 dprintk("RPC: %s: Using frmr %p to map %d segments\n",
1486 __func__, seg1->mr_chunk.rl_mw, i);
1488 /* Bump the key */
1489 key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1490 ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1492 /* Prepare FRMR WR */
1493 memset(&frmr_wr, 0, sizeof frmr_wr);
1494 frmr_wr.opcode = IB_WR_FAST_REG_MR;
1495 frmr_wr.send_flags = 0; /* unsignaled */
1496 frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1497 frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1498 frmr_wr.wr.fast_reg.page_list_len = i;
1499 frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1500 frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1501 frmr_wr.wr.fast_reg.access_flags = (writing ?
1502 IB_ACCESS_REMOTE_WRITE : IB_ACCESS_REMOTE_READ);
1503 frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1504 DECR_CQCOUNT(&r_xprt->rx_ep);
1506 rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1508 if (rc) {
1509 dprintk("RPC: %s: failed ib_post_send for register,"
1510 " status %i\n", __func__, rc);
1511 while (i--)
1512 rpcrdma_unmap_one(ia, --seg);
1513 } else {
1514 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1515 seg1->mr_base = seg1->mr_dma + pageoff;
1516 seg1->mr_nsegs = i;
1517 seg1->mr_len = len;
1519 *nsegs = i;
1520 return rc;
1523 static int
1524 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1525 struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1527 struct rpcrdma_mr_seg *seg1 = seg;
1528 struct ib_send_wr invalidate_wr, *bad_wr;
1529 int rc;
1531 while (seg1->mr_nsegs--)
1532 rpcrdma_unmap_one(ia, seg++);
1534 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1535 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1536 invalidate_wr.send_flags = 0; /* unsignaled */
1537 invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1538 DECR_CQCOUNT(&r_xprt->rx_ep);
1540 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1541 if (rc)
1542 dprintk("RPC: %s: failed ib_post_send for invalidate,"
1543 " status %i\n", __func__, rc);
1544 return rc;
1547 static int
1548 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1549 int *nsegs, int writing, struct rpcrdma_ia *ia)
1551 struct rpcrdma_mr_seg *seg1 = seg;
1552 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1553 int len, pageoff, i, rc;
1555 pageoff = offset_in_page(seg1->mr_offset);
1556 seg1->mr_offset -= pageoff; /* start of page */
1557 seg1->mr_len += pageoff;
1558 len = -pageoff;
1559 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1560 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1561 for (i = 0; i < *nsegs;) {
1562 rpcrdma_map_one(ia, seg, writing);
1563 physaddrs[i] = seg->mr_dma;
1564 len += seg->mr_len;
1565 ++seg;
1566 ++i;
1567 /* Check for holes */
1568 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1569 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1570 break;
1572 rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1573 physaddrs, i, seg1->mr_dma);
1574 if (rc) {
1575 dprintk("RPC: %s: failed ib_map_phys_fmr "
1576 "%u@0x%llx+%i (%d)... status %i\n", __func__,
1577 len, (unsigned long long)seg1->mr_dma,
1578 pageoff, i, rc);
1579 while (i--)
1580 rpcrdma_unmap_one(ia, --seg);
1581 } else {
1582 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1583 seg1->mr_base = seg1->mr_dma + pageoff;
1584 seg1->mr_nsegs = i;
1585 seg1->mr_len = len;
1587 *nsegs = i;
1588 return rc;
1591 static int
1592 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1593 struct rpcrdma_ia *ia)
1595 struct rpcrdma_mr_seg *seg1 = seg;
1596 LIST_HEAD(l);
1597 int rc;
1599 list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1600 rc = ib_unmap_fmr(&l);
1601 while (seg1->mr_nsegs--)
1602 rpcrdma_unmap_one(ia, seg++);
1603 if (rc)
1604 dprintk("RPC: %s: failed ib_unmap_fmr,"
1605 " status %i\n", __func__, rc);
1606 return rc;
1609 static int
1610 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1611 int *nsegs, int writing, struct rpcrdma_ia *ia,
1612 struct rpcrdma_xprt *r_xprt)
1614 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1615 IB_ACCESS_REMOTE_READ);
1616 struct ib_mw_bind param;
1617 int rc;
1619 *nsegs = 1;
1620 rpcrdma_map_one(ia, seg, writing);
1621 param.mr = ia->ri_bind_mem;
1622 param.wr_id = 0ULL; /* no send cookie */
1623 param.addr = seg->mr_dma;
1624 param.length = seg->mr_len;
1625 param.send_flags = 0;
1626 param.mw_access_flags = mem_priv;
1628 DECR_CQCOUNT(&r_xprt->rx_ep);
1629 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1630 if (rc) {
1631 dprintk("RPC: %s: failed ib_bind_mw "
1632 "%u@0x%llx status %i\n",
1633 __func__, seg->mr_len,
1634 (unsigned long long)seg->mr_dma, rc);
1635 rpcrdma_unmap_one(ia, seg);
1636 } else {
1637 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1638 seg->mr_base = param.addr;
1639 seg->mr_nsegs = 1;
1641 return rc;
1644 static int
1645 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1646 struct rpcrdma_ia *ia,
1647 struct rpcrdma_xprt *r_xprt, void **r)
1649 struct ib_mw_bind param;
1650 LIST_HEAD(l);
1651 int rc;
1653 BUG_ON(seg->mr_nsegs != 1);
1654 param.mr = ia->ri_bind_mem;
1655 param.addr = 0ULL; /* unbind */
1656 param.length = 0;
1657 param.mw_access_flags = 0;
1658 if (*r) {
1659 param.wr_id = (u64) (unsigned long) *r;
1660 param.send_flags = IB_SEND_SIGNALED;
1661 INIT_CQCOUNT(&r_xprt->rx_ep);
1662 } else {
1663 param.wr_id = 0ULL;
1664 param.send_flags = 0;
1665 DECR_CQCOUNT(&r_xprt->rx_ep);
1667 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1668 rpcrdma_unmap_one(ia, seg);
1669 if (rc)
1670 dprintk("RPC: %s: failed ib_(un)bind_mw,"
1671 " status %i\n", __func__, rc);
1672 else
1673 *r = NULL; /* will upcall on completion */
1674 return rc;
1677 static int
1678 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1679 int *nsegs, int writing, struct rpcrdma_ia *ia)
1681 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1682 IB_ACCESS_REMOTE_READ);
1683 struct rpcrdma_mr_seg *seg1 = seg;
1684 struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1685 int len, i, rc = 0;
1687 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1688 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1689 for (len = 0, i = 0; i < *nsegs;) {
1690 rpcrdma_map_one(ia, seg, writing);
1691 ipb[i].addr = seg->mr_dma;
1692 ipb[i].size = seg->mr_len;
1693 len += seg->mr_len;
1694 ++seg;
1695 ++i;
1696 /* Check for holes */
1697 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1698 offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1699 break;
1701 seg1->mr_base = seg1->mr_dma;
1702 seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1703 ipb, i, mem_priv, &seg1->mr_base);
1704 if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1705 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1706 dprintk("RPC: %s: failed ib_reg_phys_mr "
1707 "%u@0x%llx (%d)... status %i\n",
1708 __func__, len,
1709 (unsigned long long)seg1->mr_dma, i, rc);
1710 while (i--)
1711 rpcrdma_unmap_one(ia, --seg);
1712 } else {
1713 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1714 seg1->mr_nsegs = i;
1715 seg1->mr_len = len;
1717 *nsegs = i;
1718 return rc;
1721 static int
1722 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1723 struct rpcrdma_ia *ia)
1725 struct rpcrdma_mr_seg *seg1 = seg;
1726 int rc;
1728 rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1729 seg1->mr_chunk.rl_mr = NULL;
1730 while (seg1->mr_nsegs--)
1731 rpcrdma_unmap_one(ia, seg++);
1732 if (rc)
1733 dprintk("RPC: %s: failed ib_dereg_mr,"
1734 " status %i\n", __func__, rc);
1735 return rc;
1739 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1740 int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1742 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1743 int rc = 0;
1745 switch (ia->ri_memreg_strategy) {
1747 #if RPCRDMA_PERSISTENT_REGISTRATION
1748 case RPCRDMA_ALLPHYSICAL:
1749 rpcrdma_map_one(ia, seg, writing);
1750 seg->mr_rkey = ia->ri_bind_mem->rkey;
1751 seg->mr_base = seg->mr_dma;
1752 seg->mr_nsegs = 1;
1753 nsegs = 1;
1754 break;
1755 #endif
1757 /* Registration using frmr registration */
1758 case RPCRDMA_FRMR:
1759 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1760 break;
1762 /* Registration using fmr memory registration */
1763 case RPCRDMA_MTHCAFMR:
1764 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1765 break;
1767 /* Registration using memory windows */
1768 case RPCRDMA_MEMWINDOWS_ASYNC:
1769 case RPCRDMA_MEMWINDOWS:
1770 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1771 break;
1773 /* Default registration each time */
1774 default:
1775 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1776 break;
1778 if (rc)
1779 return -1;
1781 return nsegs;
1785 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1786 struct rpcrdma_xprt *r_xprt, void *r)
1788 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1789 int nsegs = seg->mr_nsegs, rc;
1791 switch (ia->ri_memreg_strategy) {
1793 #if RPCRDMA_PERSISTENT_REGISTRATION
1794 case RPCRDMA_ALLPHYSICAL:
1795 BUG_ON(nsegs != 1);
1796 rpcrdma_unmap_one(ia, seg);
1797 rc = 0;
1798 break;
1799 #endif
1801 case RPCRDMA_FRMR:
1802 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1803 break;
1805 case RPCRDMA_MTHCAFMR:
1806 rc = rpcrdma_deregister_fmr_external(seg, ia);
1807 break;
1809 case RPCRDMA_MEMWINDOWS_ASYNC:
1810 case RPCRDMA_MEMWINDOWS:
1811 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1812 break;
1814 default:
1815 rc = rpcrdma_deregister_default_external(seg, ia);
1816 break;
1818 if (r) {
1819 struct rpcrdma_rep *rep = r;
1820 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1821 rep->rr_func = NULL;
1822 func(rep); /* dereg done, callback now */
1824 return nsegs;
1828 * Prepost any receive buffer, then post send.
1830 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1833 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1834 struct rpcrdma_ep *ep,
1835 struct rpcrdma_req *req)
1837 struct ib_send_wr send_wr, *send_wr_fail;
1838 struct rpcrdma_rep *rep = req->rl_reply;
1839 int rc;
1841 if (rep) {
1842 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1843 if (rc)
1844 goto out;
1845 req->rl_reply = NULL;
1848 send_wr.next = NULL;
1849 send_wr.wr_id = 0ULL; /* no send cookie */
1850 send_wr.sg_list = req->rl_send_iov;
1851 send_wr.num_sge = req->rl_niovs;
1852 send_wr.opcode = IB_WR_SEND;
1853 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1854 ib_dma_sync_single_for_device(ia->ri_id->device,
1855 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1856 DMA_TO_DEVICE);
1857 ib_dma_sync_single_for_device(ia->ri_id->device,
1858 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1859 DMA_TO_DEVICE);
1860 ib_dma_sync_single_for_device(ia->ri_id->device,
1861 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1862 DMA_TO_DEVICE);
1864 if (DECR_CQCOUNT(ep) > 0)
1865 send_wr.send_flags = 0;
1866 else { /* Provider must take a send completion every now and then */
1867 INIT_CQCOUNT(ep);
1868 send_wr.send_flags = IB_SEND_SIGNALED;
1871 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1872 if (rc)
1873 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1874 rc);
1875 out:
1876 return rc;
1880 * (Re)post a receive buffer.
1883 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1884 struct rpcrdma_ep *ep,
1885 struct rpcrdma_rep *rep)
1887 struct ib_recv_wr recv_wr, *recv_wr_fail;
1888 int rc;
1890 recv_wr.next = NULL;
1891 recv_wr.wr_id = (u64) (unsigned long) rep;
1892 recv_wr.sg_list = &rep->rr_iov;
1893 recv_wr.num_sge = 1;
1895 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1896 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1898 DECR_CQCOUNT(ep);
1899 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1901 if (rc)
1902 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1903 rc);
1904 return rc;