net: Move && and || to end of previous line
[linux-2.6/linux-acpi-2.6/ibm-acpi-2.6.git] / net / sunrpc / xprtrdma / verbs.c
blob2209aa87d899819152572e9ae87957d2b223d29d
1 /*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * verbs.c
43 * Encapsulates the major functions managing:
44 * o adapters
45 * o endpoints
46 * o connections
47 * o buffer memory
50 #include <linux/pci.h> /* for Tavor hack below */
52 #include "xprt_rdma.h"
55 * Globals/Macros
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY RPCDBG_TRANS
60 #endif
63 * internal functions
67 * handle replies in tasklet context, using a single, global list
68 * rdma tasklet function -- just turn around and call the func
69 * for all replies on the list
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
78 struct rpcrdma_rep *rep;
79 void (*func)(struct rpcrdma_rep *);
80 unsigned long flags;
82 data = data;
83 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84 while (!list_empty(&rpcrdma_tasklets_g)) {
85 rep = list_entry(rpcrdma_tasklets_g.next,
86 struct rpcrdma_rep, rr_list);
87 list_del(&rep->rr_list);
88 func = rep->rr_func;
89 rep->rr_func = NULL;
90 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
92 if (func)
93 func(rep);
94 else
95 rpcrdma_recv_buffer_put(rep);
97 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
99 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
107 unsigned long flags;
109 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110 list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112 tasklet_schedule(&rpcrdma_tasklet_g);
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
118 struct rpcrdma_ep *ep = context;
120 dprintk("RPC: %s: QP error %X on device %s ep %p\n",
121 __func__, event->event, event->device->name, context);
122 if (ep->rep_connected == 1) {
123 ep->rep_connected = -EIO;
124 ep->rep_func(ep);
125 wake_up_all(&ep->rep_connect_wait);
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
132 struct rpcrdma_ep *ep = context;
134 dprintk("RPC: %s: CQ error %X on device %s ep %p\n",
135 __func__, event->event, event->device->name, context);
136 if (ep->rep_connected == 1) {
137 ep->rep_connected = -EIO;
138 ep->rep_func(ep);
139 wake_up_all(&ep->rep_connect_wait);
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
146 struct rpcrdma_rep *rep =
147 (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
149 dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
150 __func__, rep, wc->status, wc->opcode, wc->byte_len);
152 if (!rep) /* send or bind completion that we don't care about */
153 return;
155 if (IB_WC_SUCCESS != wc->status) {
156 dprintk("RPC: %s: %s WC status %X, connection lost\n",
157 __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158 wc->status);
159 rep->rr_len = ~0U;
160 rpcrdma_schedule_tasklet(rep);
161 return;
164 switch (wc->opcode) {
165 case IB_WC_RECV:
166 rep->rr_len = wc->byte_len;
167 ib_dma_sync_single_for_cpu(
168 rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169 rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170 /* Keep (only) the most recent credits, after check validity */
171 if (rep->rr_len >= 16) {
172 struct rpcrdma_msg *p =
173 (struct rpcrdma_msg *) rep->rr_base;
174 unsigned int credits = ntohl(p->rm_credit);
175 if (credits == 0) {
176 dprintk("RPC: %s: server"
177 " dropped credits to 0!\n", __func__);
178 /* don't deadlock */
179 credits = 1;
180 } else if (credits > rep->rr_buffer->rb_max_requests) {
181 dprintk("RPC: %s: server"
182 " over-crediting: %d (%d)\n",
183 __func__, credits,
184 rep->rr_buffer->rb_max_requests);
185 credits = rep->rr_buffer->rb_max_requests;
187 atomic_set(&rep->rr_buffer->rb_credits, credits);
189 /* fall through */
190 case IB_WC_BIND_MW:
191 rpcrdma_schedule_tasklet(rep);
192 break;
193 default:
194 dprintk("RPC: %s: unexpected WC event %X\n",
195 __func__, wc->opcode);
196 break;
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
203 struct ib_wc wc;
204 int rc;
206 for (;;) {
207 rc = ib_poll_cq(cq, 1, &wc);
208 if (rc < 0) {
209 dprintk("RPC: %s: ib_poll_cq failed %i\n",
210 __func__, rc);
211 return rc;
213 if (rc == 0)
214 break;
216 rpcrdma_event_process(&wc);
219 return 0;
223 * rpcrdma_cq_event_upcall
225 * This upcall handles recv, send, bind and unbind events.
226 * It is reentrant but processes single events in order to maintain
227 * ordering of receives to keep server credits.
229 * It is the responsibility of the scheduled tasklet to return
230 * recv buffers to the pool. NOTE: this affects synchronization of
231 * connection shutdown. That is, the structures required for
232 * the completion of the reply handler must remain intact until
233 * all memory has been reclaimed.
235 * Note that send events are suppressed and do not result in an upcall.
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
240 int rc;
242 rc = rpcrdma_cq_poll(cq);
243 if (rc)
244 return;
246 rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247 if (rc) {
248 dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
249 __func__, rc);
250 return;
253 rpcrdma_cq_poll(cq);
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258 "address resolved",
259 "address error",
260 "route resolved",
261 "route error",
262 "connect request",
263 "connect response",
264 "connect error",
265 "unreachable",
266 "rejected",
267 "established",
268 "disconnected",
269 "device removal"
271 #endif
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
276 struct rpcrdma_xprt *xprt = id->context;
277 struct rpcrdma_ia *ia = &xprt->rx_ia;
278 struct rpcrdma_ep *ep = &xprt->rx_ep;
279 #ifdef RPC_DEBUG
280 struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
281 #endif
282 struct ib_qp_attr attr;
283 struct ib_qp_init_attr iattr;
284 int connstate = 0;
286 switch (event->event) {
287 case RDMA_CM_EVENT_ADDR_RESOLVED:
288 case RDMA_CM_EVENT_ROUTE_RESOLVED:
289 ia->ri_async_rc = 0;
290 complete(&ia->ri_done);
291 break;
292 case RDMA_CM_EVENT_ADDR_ERROR:
293 ia->ri_async_rc = -EHOSTUNREACH;
294 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
295 __func__, ep);
296 complete(&ia->ri_done);
297 break;
298 case RDMA_CM_EVENT_ROUTE_ERROR:
299 ia->ri_async_rc = -ENETUNREACH;
300 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
301 __func__, ep);
302 complete(&ia->ri_done);
303 break;
304 case RDMA_CM_EVENT_ESTABLISHED:
305 connstate = 1;
306 ib_query_qp(ia->ri_id->qp, &attr,
307 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
308 &iattr);
309 dprintk("RPC: %s: %d responder resources"
310 " (%d initiator)\n",
311 __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
312 goto connected;
313 case RDMA_CM_EVENT_CONNECT_ERROR:
314 connstate = -ENOTCONN;
315 goto connected;
316 case RDMA_CM_EVENT_UNREACHABLE:
317 connstate = -ENETDOWN;
318 goto connected;
319 case RDMA_CM_EVENT_REJECTED:
320 connstate = -ECONNREFUSED;
321 goto connected;
322 case RDMA_CM_EVENT_DISCONNECTED:
323 connstate = -ECONNABORTED;
324 goto connected;
325 case RDMA_CM_EVENT_DEVICE_REMOVAL:
326 connstate = -ENODEV;
327 connected:
328 dprintk("RPC: %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
329 __func__,
330 (event->event <= 11) ? conn[event->event] :
331 "unknown connection error",
332 &addr->sin_addr.s_addr,
333 ntohs(addr->sin_port),
334 ep, event->event);
335 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
336 dprintk("RPC: %s: %sconnected\n",
337 __func__, connstate > 0 ? "" : "dis");
338 ep->rep_connected = connstate;
339 ep->rep_func(ep);
340 wake_up_all(&ep->rep_connect_wait);
341 break;
342 default:
343 dprintk("RPC: %s: unexpected CM event %d\n",
344 __func__, event->event);
345 break;
348 #ifdef RPC_DEBUG
349 if (connstate == 1) {
350 int ird = attr.max_dest_rd_atomic;
351 int tird = ep->rep_remote_cma.responder_resources;
352 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
353 "on %s, memreg %d slots %d ird %d%s\n",
354 &addr->sin_addr.s_addr,
355 ntohs(addr->sin_port),
356 ia->ri_id->device->name,
357 ia->ri_memreg_strategy,
358 xprt->rx_buf.rb_max_requests,
359 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
360 } else if (connstate < 0) {
361 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
362 &addr->sin_addr.s_addr,
363 ntohs(addr->sin_port),
364 connstate);
366 #endif
368 return 0;
371 static struct rdma_cm_id *
372 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
373 struct rpcrdma_ia *ia, struct sockaddr *addr)
375 struct rdma_cm_id *id;
376 int rc;
378 init_completion(&ia->ri_done);
380 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
381 if (IS_ERR(id)) {
382 rc = PTR_ERR(id);
383 dprintk("RPC: %s: rdma_create_id() failed %i\n",
384 __func__, rc);
385 return id;
388 ia->ri_async_rc = -ETIMEDOUT;
389 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
390 if (rc) {
391 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
392 __func__, rc);
393 goto out;
395 wait_for_completion_interruptible_timeout(&ia->ri_done,
396 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
397 rc = ia->ri_async_rc;
398 if (rc)
399 goto out;
401 ia->ri_async_rc = -ETIMEDOUT;
402 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
403 if (rc) {
404 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
405 __func__, rc);
406 goto out;
408 wait_for_completion_interruptible_timeout(&ia->ri_done,
409 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
410 rc = ia->ri_async_rc;
411 if (rc)
412 goto out;
414 return id;
416 out:
417 rdma_destroy_id(id);
418 return ERR_PTR(rc);
422 * Drain any cq, prior to teardown.
424 static void
425 rpcrdma_clean_cq(struct ib_cq *cq)
427 struct ib_wc wc;
428 int count = 0;
430 while (1 == ib_poll_cq(cq, 1, &wc))
431 ++count;
433 if (count)
434 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
435 __func__, count, wc.opcode);
439 * Exported functions.
443 * Open and initialize an Interface Adapter.
444 * o initializes fields of struct rpcrdma_ia, including
445 * interface and provider attributes and protection zone.
448 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
450 int rc, mem_priv;
451 struct ib_device_attr devattr;
452 struct rpcrdma_ia *ia = &xprt->rx_ia;
454 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
455 if (IS_ERR(ia->ri_id)) {
456 rc = PTR_ERR(ia->ri_id);
457 goto out1;
460 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
461 if (IS_ERR(ia->ri_pd)) {
462 rc = PTR_ERR(ia->ri_pd);
463 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
464 __func__, rc);
465 goto out2;
469 * Query the device to determine if the requested memory
470 * registration strategy is supported. If it isn't, set the
471 * strategy to a globally supported model.
473 rc = ib_query_device(ia->ri_id->device, &devattr);
474 if (rc) {
475 dprintk("RPC: %s: ib_query_device failed %d\n",
476 __func__, rc);
477 goto out2;
480 if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
481 ia->ri_have_dma_lkey = 1;
482 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
485 switch (memreg) {
486 case RPCRDMA_MEMWINDOWS:
487 case RPCRDMA_MEMWINDOWS_ASYNC:
488 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
489 dprintk("RPC: %s: MEMWINDOWS registration "
490 "specified but not supported by adapter, "
491 "using slower RPCRDMA_REGISTER\n",
492 __func__);
493 memreg = RPCRDMA_REGISTER;
495 break;
496 case RPCRDMA_MTHCAFMR:
497 if (!ia->ri_id->device->alloc_fmr) {
498 #if RPCRDMA_PERSISTENT_REGISTRATION
499 dprintk("RPC: %s: MTHCAFMR registration "
500 "specified but not supported by adapter, "
501 "using riskier RPCRDMA_ALLPHYSICAL\n",
502 __func__);
503 memreg = RPCRDMA_ALLPHYSICAL;
504 #else
505 dprintk("RPC: %s: MTHCAFMR registration "
506 "specified but not supported by adapter, "
507 "using slower RPCRDMA_REGISTER\n",
508 __func__);
509 memreg = RPCRDMA_REGISTER;
510 #endif
512 break;
513 case RPCRDMA_FRMR:
514 /* Requires both frmr reg and local dma lkey */
515 if ((devattr.device_cap_flags &
516 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
517 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
518 #if RPCRDMA_PERSISTENT_REGISTRATION
519 dprintk("RPC: %s: FRMR registration "
520 "specified but not supported by adapter, "
521 "using riskier RPCRDMA_ALLPHYSICAL\n",
522 __func__);
523 memreg = RPCRDMA_ALLPHYSICAL;
524 #else
525 dprintk("RPC: %s: FRMR registration "
526 "specified but not supported by adapter, "
527 "using slower RPCRDMA_REGISTER\n",
528 __func__);
529 memreg = RPCRDMA_REGISTER;
530 #endif
532 break;
536 * Optionally obtain an underlying physical identity mapping in
537 * order to do a memory window-based bind. This base registration
538 * is protected from remote access - that is enabled only by binding
539 * for the specific bytes targeted during each RPC operation, and
540 * revoked after the corresponding completion similar to a storage
541 * adapter.
543 switch (memreg) {
544 case RPCRDMA_BOUNCEBUFFERS:
545 case RPCRDMA_REGISTER:
546 case RPCRDMA_FRMR:
547 break;
548 #if RPCRDMA_PERSISTENT_REGISTRATION
549 case RPCRDMA_ALLPHYSICAL:
550 mem_priv = IB_ACCESS_LOCAL_WRITE |
551 IB_ACCESS_REMOTE_WRITE |
552 IB_ACCESS_REMOTE_READ;
553 goto register_setup;
554 #endif
555 case RPCRDMA_MEMWINDOWS_ASYNC:
556 case RPCRDMA_MEMWINDOWS:
557 mem_priv = IB_ACCESS_LOCAL_WRITE |
558 IB_ACCESS_MW_BIND;
559 goto register_setup;
560 case RPCRDMA_MTHCAFMR:
561 if (ia->ri_have_dma_lkey)
562 break;
563 mem_priv = IB_ACCESS_LOCAL_WRITE;
564 register_setup:
565 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
566 if (IS_ERR(ia->ri_bind_mem)) {
567 printk(KERN_ALERT "%s: ib_get_dma_mr for "
568 "phys register failed with %lX\n\t"
569 "Will continue with degraded performance\n",
570 __func__, PTR_ERR(ia->ri_bind_mem));
571 memreg = RPCRDMA_REGISTER;
572 ia->ri_bind_mem = NULL;
574 break;
575 default:
576 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
577 __func__, memreg);
578 rc = -EINVAL;
579 goto out2;
581 dprintk("RPC: %s: memory registration strategy is %d\n",
582 __func__, memreg);
584 /* Else will do memory reg/dereg for each chunk */
585 ia->ri_memreg_strategy = memreg;
587 return 0;
588 out2:
589 rdma_destroy_id(ia->ri_id);
590 ia->ri_id = NULL;
591 out1:
592 return rc;
596 * Clean up/close an IA.
597 * o if event handles and PD have been initialized, free them.
598 * o close the IA
600 void
601 rpcrdma_ia_close(struct rpcrdma_ia *ia)
603 int rc;
605 dprintk("RPC: %s: entering\n", __func__);
606 if (ia->ri_bind_mem != NULL) {
607 rc = ib_dereg_mr(ia->ri_bind_mem);
608 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
609 __func__, rc);
611 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
612 if (ia->ri_id->qp)
613 rdma_destroy_qp(ia->ri_id);
614 rdma_destroy_id(ia->ri_id);
615 ia->ri_id = NULL;
617 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
618 rc = ib_dealloc_pd(ia->ri_pd);
619 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
620 __func__, rc);
625 * Create unconnected endpoint.
628 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
629 struct rpcrdma_create_data_internal *cdata)
631 struct ib_device_attr devattr;
632 int rc, err;
634 rc = ib_query_device(ia->ri_id->device, &devattr);
635 if (rc) {
636 dprintk("RPC: %s: ib_query_device failed %d\n",
637 __func__, rc);
638 return rc;
641 /* check provider's send/recv wr limits */
642 if (cdata->max_requests > devattr.max_qp_wr)
643 cdata->max_requests = devattr.max_qp_wr;
645 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
646 ep->rep_attr.qp_context = ep;
647 /* send_cq and recv_cq initialized below */
648 ep->rep_attr.srq = NULL;
649 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
650 switch (ia->ri_memreg_strategy) {
651 case RPCRDMA_FRMR:
652 /* Add room for frmr register and invalidate WRs */
653 ep->rep_attr.cap.max_send_wr *= 3;
654 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
655 return -EINVAL;
656 break;
657 case RPCRDMA_MEMWINDOWS_ASYNC:
658 case RPCRDMA_MEMWINDOWS:
659 /* Add room for mw_binds+unbinds - overkill! */
660 ep->rep_attr.cap.max_send_wr++;
661 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
662 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
663 return -EINVAL;
664 break;
665 default:
666 break;
668 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
669 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
670 ep->rep_attr.cap.max_recv_sge = 1;
671 ep->rep_attr.cap.max_inline_data = 0;
672 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
673 ep->rep_attr.qp_type = IB_QPT_RC;
674 ep->rep_attr.port_num = ~0;
676 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
677 "iovs: send %d recv %d\n",
678 __func__,
679 ep->rep_attr.cap.max_send_wr,
680 ep->rep_attr.cap.max_recv_wr,
681 ep->rep_attr.cap.max_send_sge,
682 ep->rep_attr.cap.max_recv_sge);
684 /* set trigger for requesting send completion */
685 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
686 switch (ia->ri_memreg_strategy) {
687 case RPCRDMA_MEMWINDOWS_ASYNC:
688 case RPCRDMA_MEMWINDOWS:
689 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
690 break;
691 default:
692 break;
694 if (ep->rep_cqinit <= 2)
695 ep->rep_cqinit = 0;
696 INIT_CQCOUNT(ep);
697 ep->rep_ia = ia;
698 init_waitqueue_head(&ep->rep_connect_wait);
701 * Create a single cq for receive dto and mw_bind (only ever
702 * care about unbind, really). Send completions are suppressed.
703 * Use single threaded tasklet upcalls to maintain ordering.
705 ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
706 rpcrdma_cq_async_error_upcall, NULL,
707 ep->rep_attr.cap.max_recv_wr +
708 ep->rep_attr.cap.max_send_wr + 1, 0);
709 if (IS_ERR(ep->rep_cq)) {
710 rc = PTR_ERR(ep->rep_cq);
711 dprintk("RPC: %s: ib_create_cq failed: %i\n",
712 __func__, rc);
713 goto out1;
716 rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
717 if (rc) {
718 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
719 __func__, rc);
720 goto out2;
723 ep->rep_attr.send_cq = ep->rep_cq;
724 ep->rep_attr.recv_cq = ep->rep_cq;
726 /* Initialize cma parameters */
728 /* RPC/RDMA does not use private data */
729 ep->rep_remote_cma.private_data = NULL;
730 ep->rep_remote_cma.private_data_len = 0;
732 /* Client offers RDMA Read but does not initiate */
733 ep->rep_remote_cma.initiator_depth = 0;
734 if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
735 ep->rep_remote_cma.responder_resources = 0;
736 else if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
737 ep->rep_remote_cma.responder_resources = 32;
738 else
739 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
741 ep->rep_remote_cma.retry_count = 7;
742 ep->rep_remote_cma.flow_control = 0;
743 ep->rep_remote_cma.rnr_retry_count = 0;
745 return 0;
747 out2:
748 err = ib_destroy_cq(ep->rep_cq);
749 if (err)
750 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
751 __func__, err);
752 out1:
753 return rc;
757 * rpcrdma_ep_destroy
759 * Disconnect and destroy endpoint. After this, the only
760 * valid operations on the ep are to free it (if dynamically
761 * allocated) or re-create it.
763 * The caller's error handling must be sure to not leak the endpoint
764 * if this function fails.
767 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
769 int rc;
771 dprintk("RPC: %s: entering, connected is %d\n",
772 __func__, ep->rep_connected);
774 if (ia->ri_id->qp) {
775 rc = rpcrdma_ep_disconnect(ep, ia);
776 if (rc)
777 dprintk("RPC: %s: rpcrdma_ep_disconnect"
778 " returned %i\n", __func__, rc);
779 rdma_destroy_qp(ia->ri_id);
780 ia->ri_id->qp = NULL;
783 /* padding - could be done in rpcrdma_buffer_destroy... */
784 if (ep->rep_pad_mr) {
785 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
786 ep->rep_pad_mr = NULL;
789 rpcrdma_clean_cq(ep->rep_cq);
790 rc = ib_destroy_cq(ep->rep_cq);
791 if (rc)
792 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
793 __func__, rc);
795 return rc;
799 * Connect unconnected endpoint.
802 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
804 struct rdma_cm_id *id;
805 int rc = 0;
806 int retry_count = 0;
808 if (ep->rep_connected != 0) {
809 struct rpcrdma_xprt *xprt;
810 retry:
811 rc = rpcrdma_ep_disconnect(ep, ia);
812 if (rc && rc != -ENOTCONN)
813 dprintk("RPC: %s: rpcrdma_ep_disconnect"
814 " status %i\n", __func__, rc);
815 rpcrdma_clean_cq(ep->rep_cq);
817 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
818 id = rpcrdma_create_id(xprt, ia,
819 (struct sockaddr *)&xprt->rx_data.addr);
820 if (IS_ERR(id)) {
821 rc = PTR_ERR(id);
822 goto out;
824 /* TEMP TEMP TEMP - fail if new device:
825 * Deregister/remarshal *all* requests!
826 * Close and recreate adapter, pd, etc!
827 * Re-determine all attributes still sane!
828 * More stuff I haven't thought of!
829 * Rrrgh!
831 if (ia->ri_id->device != id->device) {
832 printk("RPC: %s: can't reconnect on "
833 "different device!\n", __func__);
834 rdma_destroy_id(id);
835 rc = -ENETDOWN;
836 goto out;
838 /* END TEMP */
839 rdma_destroy_qp(ia->ri_id);
840 rdma_destroy_id(ia->ri_id);
841 ia->ri_id = id;
844 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
845 if (rc) {
846 dprintk("RPC: %s: rdma_create_qp failed %i\n",
847 __func__, rc);
848 goto out;
851 /* XXX Tavor device performs badly with 2K MTU! */
852 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
853 struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
854 if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
855 (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
856 pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
857 struct ib_qp_attr attr = {
858 .path_mtu = IB_MTU_1024
860 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
864 ep->rep_connected = 0;
866 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
867 if (rc) {
868 dprintk("RPC: %s: rdma_connect() failed with %i\n",
869 __func__, rc);
870 goto out;
873 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
876 * Check state. A non-peer reject indicates no listener
877 * (ECONNREFUSED), which may be a transient state. All
878 * others indicate a transport condition which has already
879 * undergone a best-effort.
881 if (ep->rep_connected == -ECONNREFUSED &&
882 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
883 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
884 goto retry;
886 if (ep->rep_connected <= 0) {
887 /* Sometimes, the only way to reliably connect to remote
888 * CMs is to use same nonzero values for ORD and IRD. */
889 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
890 (ep->rep_remote_cma.responder_resources == 0 ||
891 ep->rep_remote_cma.initiator_depth !=
892 ep->rep_remote_cma.responder_resources)) {
893 if (ep->rep_remote_cma.responder_resources == 0)
894 ep->rep_remote_cma.responder_resources = 1;
895 ep->rep_remote_cma.initiator_depth =
896 ep->rep_remote_cma.responder_resources;
897 goto retry;
899 rc = ep->rep_connected;
900 } else {
901 dprintk("RPC: %s: connected\n", __func__);
904 out:
905 if (rc)
906 ep->rep_connected = rc;
907 return rc;
911 * rpcrdma_ep_disconnect
913 * This is separate from destroy to facilitate the ability
914 * to reconnect without recreating the endpoint.
916 * This call is not reentrant, and must not be made in parallel
917 * on the same endpoint.
920 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
922 int rc;
924 rpcrdma_clean_cq(ep->rep_cq);
925 rc = rdma_disconnect(ia->ri_id);
926 if (!rc) {
927 /* returns without wait if not connected */
928 wait_event_interruptible(ep->rep_connect_wait,
929 ep->rep_connected != 1);
930 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
931 (ep->rep_connected == 1) ? "still " : "dis");
932 } else {
933 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
934 ep->rep_connected = rc;
936 return rc;
940 * Initialize buffer memory
943 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
944 struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
946 char *p;
947 size_t len;
948 int i, rc;
949 struct rpcrdma_mw *r;
951 buf->rb_max_requests = cdata->max_requests;
952 spin_lock_init(&buf->rb_lock);
953 atomic_set(&buf->rb_credits, 1);
955 /* Need to allocate:
956 * 1. arrays for send and recv pointers
957 * 2. arrays of struct rpcrdma_req to fill in pointers
958 * 3. array of struct rpcrdma_rep for replies
959 * 4. padding, if any
960 * 5. mw's, fmr's or frmr's, if any
961 * Send/recv buffers in req/rep need to be registered
964 len = buf->rb_max_requests *
965 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
966 len += cdata->padding;
967 switch (ia->ri_memreg_strategy) {
968 case RPCRDMA_FRMR:
969 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
970 sizeof(struct rpcrdma_mw);
971 break;
972 case RPCRDMA_MTHCAFMR:
973 /* TBD we are perhaps overallocating here */
974 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
975 sizeof(struct rpcrdma_mw);
976 break;
977 case RPCRDMA_MEMWINDOWS_ASYNC:
978 case RPCRDMA_MEMWINDOWS:
979 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
980 sizeof(struct rpcrdma_mw);
981 break;
982 default:
983 break;
986 /* allocate 1, 4 and 5 in one shot */
987 p = kzalloc(len, GFP_KERNEL);
988 if (p == NULL) {
989 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
990 __func__, len);
991 rc = -ENOMEM;
992 goto out;
994 buf->rb_pool = p; /* for freeing it later */
996 buf->rb_send_bufs = (struct rpcrdma_req **) p;
997 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
998 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
999 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1002 * Register the zeroed pad buffer, if any.
1004 if (cdata->padding) {
1005 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1006 &ep->rep_pad_mr, &ep->rep_pad);
1007 if (rc)
1008 goto out;
1010 p += cdata->padding;
1013 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1014 * We "cycle" the mw's in order to minimize rkey reuse,
1015 * and also reduce unbind-to-bind collision.
1017 INIT_LIST_HEAD(&buf->rb_mws);
1018 r = (struct rpcrdma_mw *)p;
1019 switch (ia->ri_memreg_strategy) {
1020 case RPCRDMA_FRMR:
1021 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1022 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1023 RPCRDMA_MAX_SEGS);
1024 if (IS_ERR(r->r.frmr.fr_mr)) {
1025 rc = PTR_ERR(r->r.frmr.fr_mr);
1026 dprintk("RPC: %s: ib_alloc_fast_reg_mr"
1027 " failed %i\n", __func__, rc);
1028 goto out;
1030 r->r.frmr.fr_pgl =
1031 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1032 RPCRDMA_MAX_SEGS);
1033 if (IS_ERR(r->r.frmr.fr_pgl)) {
1034 rc = PTR_ERR(r->r.frmr.fr_pgl);
1035 dprintk("RPC: %s: "
1036 "ib_alloc_fast_reg_page_list "
1037 "failed %i\n", __func__, rc);
1038 goto out;
1040 list_add(&r->mw_list, &buf->rb_mws);
1041 ++r;
1043 break;
1044 case RPCRDMA_MTHCAFMR:
1045 /* TBD we are perhaps overallocating here */
1046 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1047 static struct ib_fmr_attr fa =
1048 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1049 r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1050 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1051 &fa);
1052 if (IS_ERR(r->r.fmr)) {
1053 rc = PTR_ERR(r->r.fmr);
1054 dprintk("RPC: %s: ib_alloc_fmr"
1055 " failed %i\n", __func__, rc);
1056 goto out;
1058 list_add(&r->mw_list, &buf->rb_mws);
1059 ++r;
1061 break;
1062 case RPCRDMA_MEMWINDOWS_ASYNC:
1063 case RPCRDMA_MEMWINDOWS:
1064 /* Allocate one extra request's worth, for full cycling */
1065 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1066 r->r.mw = ib_alloc_mw(ia->ri_pd);
1067 if (IS_ERR(r->r.mw)) {
1068 rc = PTR_ERR(r->r.mw);
1069 dprintk("RPC: %s: ib_alloc_mw"
1070 " failed %i\n", __func__, rc);
1071 goto out;
1073 list_add(&r->mw_list, &buf->rb_mws);
1074 ++r;
1076 break;
1077 default:
1078 break;
1082 * Allocate/init the request/reply buffers. Doing this
1083 * using kmalloc for now -- one for each buf.
1085 for (i = 0; i < buf->rb_max_requests; i++) {
1086 struct rpcrdma_req *req;
1087 struct rpcrdma_rep *rep;
1089 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1090 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1091 /* Typical ~2400b, so rounding up saves work later */
1092 if (len < 4096)
1093 len = 4096;
1094 req = kmalloc(len, GFP_KERNEL);
1095 if (req == NULL) {
1096 dprintk("RPC: %s: request buffer %d alloc"
1097 " failed\n", __func__, i);
1098 rc = -ENOMEM;
1099 goto out;
1101 memset(req, 0, sizeof(struct rpcrdma_req));
1102 buf->rb_send_bufs[i] = req;
1103 buf->rb_send_bufs[i]->rl_buffer = buf;
1105 rc = rpcrdma_register_internal(ia, req->rl_base,
1106 len - offsetof(struct rpcrdma_req, rl_base),
1107 &buf->rb_send_bufs[i]->rl_handle,
1108 &buf->rb_send_bufs[i]->rl_iov);
1109 if (rc)
1110 goto out;
1112 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1114 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1115 rep = kmalloc(len, GFP_KERNEL);
1116 if (rep == NULL) {
1117 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1118 __func__, i);
1119 rc = -ENOMEM;
1120 goto out;
1122 memset(rep, 0, sizeof(struct rpcrdma_rep));
1123 buf->rb_recv_bufs[i] = rep;
1124 buf->rb_recv_bufs[i]->rr_buffer = buf;
1125 init_waitqueue_head(&rep->rr_unbind);
1127 rc = rpcrdma_register_internal(ia, rep->rr_base,
1128 len - offsetof(struct rpcrdma_rep, rr_base),
1129 &buf->rb_recv_bufs[i]->rr_handle,
1130 &buf->rb_recv_bufs[i]->rr_iov);
1131 if (rc)
1132 goto out;
1135 dprintk("RPC: %s: max_requests %d\n",
1136 __func__, buf->rb_max_requests);
1137 /* done */
1138 return 0;
1139 out:
1140 rpcrdma_buffer_destroy(buf);
1141 return rc;
1145 * Unregister and destroy buffer memory. Need to deal with
1146 * partial initialization, so it's callable from failed create.
1147 * Must be called before destroying endpoint, as registrations
1148 * reference it.
1150 void
1151 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1153 int rc, i;
1154 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1155 struct rpcrdma_mw *r;
1157 /* clean up in reverse order from create
1158 * 1. recv mr memory (mr free, then kfree)
1159 * 1a. bind mw memory
1160 * 2. send mr memory (mr free, then kfree)
1161 * 3. padding (if any) [moved to rpcrdma_ep_destroy]
1162 * 4. arrays
1164 dprintk("RPC: %s: entering\n", __func__);
1166 for (i = 0; i < buf->rb_max_requests; i++) {
1167 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1168 rpcrdma_deregister_internal(ia,
1169 buf->rb_recv_bufs[i]->rr_handle,
1170 &buf->rb_recv_bufs[i]->rr_iov);
1171 kfree(buf->rb_recv_bufs[i]);
1173 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1174 while (!list_empty(&buf->rb_mws)) {
1175 r = list_entry(buf->rb_mws.next,
1176 struct rpcrdma_mw, mw_list);
1177 list_del(&r->mw_list);
1178 switch (ia->ri_memreg_strategy) {
1179 case RPCRDMA_FRMR:
1180 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1181 if (rc)
1182 dprintk("RPC: %s:"
1183 " ib_dereg_mr"
1184 " failed %i\n",
1185 __func__, rc);
1186 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1187 break;
1188 case RPCRDMA_MTHCAFMR:
1189 rc = ib_dealloc_fmr(r->r.fmr);
1190 if (rc)
1191 dprintk("RPC: %s:"
1192 " ib_dealloc_fmr"
1193 " failed %i\n",
1194 __func__, rc);
1195 break;
1196 case RPCRDMA_MEMWINDOWS_ASYNC:
1197 case RPCRDMA_MEMWINDOWS:
1198 rc = ib_dealloc_mw(r->r.mw);
1199 if (rc)
1200 dprintk("RPC: %s:"
1201 " ib_dealloc_mw"
1202 " failed %i\n",
1203 __func__, rc);
1204 break;
1205 default:
1206 break;
1209 rpcrdma_deregister_internal(ia,
1210 buf->rb_send_bufs[i]->rl_handle,
1211 &buf->rb_send_bufs[i]->rl_iov);
1212 kfree(buf->rb_send_bufs[i]);
1216 kfree(buf->rb_pool);
1220 * Get a set of request/reply buffers.
1222 * Reply buffer (if needed) is attached to send buffer upon return.
1223 * Rule:
1224 * rb_send_index and rb_recv_index MUST always be pointing to the
1225 * *next* available buffer (non-NULL). They are incremented after
1226 * removing buffers, and decremented *before* returning them.
1228 struct rpcrdma_req *
1229 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1231 struct rpcrdma_req *req;
1232 unsigned long flags;
1233 int i;
1234 struct rpcrdma_mw *r;
1236 spin_lock_irqsave(&buffers->rb_lock, flags);
1237 if (buffers->rb_send_index == buffers->rb_max_requests) {
1238 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1239 dprintk("RPC: %s: out of request buffers\n", __func__);
1240 return ((struct rpcrdma_req *)NULL);
1243 req = buffers->rb_send_bufs[buffers->rb_send_index];
1244 if (buffers->rb_send_index < buffers->rb_recv_index) {
1245 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1246 __func__,
1247 buffers->rb_recv_index - buffers->rb_send_index);
1248 req->rl_reply = NULL;
1249 } else {
1250 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1251 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1253 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1254 if (!list_empty(&buffers->rb_mws)) {
1255 i = RPCRDMA_MAX_SEGS - 1;
1256 do {
1257 r = list_entry(buffers->rb_mws.next,
1258 struct rpcrdma_mw, mw_list);
1259 list_del(&r->mw_list);
1260 req->rl_segments[i].mr_chunk.rl_mw = r;
1261 } while (--i >= 0);
1263 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1264 return req;
1268 * Put request/reply buffers back into pool.
1269 * Pre-decrement counter/array index.
1271 void
1272 rpcrdma_buffer_put(struct rpcrdma_req *req)
1274 struct rpcrdma_buffer *buffers = req->rl_buffer;
1275 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1276 int i;
1277 unsigned long flags;
1279 BUG_ON(req->rl_nchunks != 0);
1280 spin_lock_irqsave(&buffers->rb_lock, flags);
1281 buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1282 req->rl_niovs = 0;
1283 if (req->rl_reply) {
1284 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1285 init_waitqueue_head(&req->rl_reply->rr_unbind);
1286 req->rl_reply->rr_func = NULL;
1287 req->rl_reply = NULL;
1289 switch (ia->ri_memreg_strategy) {
1290 case RPCRDMA_FRMR:
1291 case RPCRDMA_MTHCAFMR:
1292 case RPCRDMA_MEMWINDOWS_ASYNC:
1293 case RPCRDMA_MEMWINDOWS:
1295 * Cycle mw's back in reverse order, and "spin" them.
1296 * This delays and scrambles reuse as much as possible.
1298 i = 1;
1299 do {
1300 struct rpcrdma_mw **mw;
1301 mw = &req->rl_segments[i].mr_chunk.rl_mw;
1302 list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1303 *mw = NULL;
1304 } while (++i < RPCRDMA_MAX_SEGS);
1305 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1306 &buffers->rb_mws);
1307 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1308 break;
1309 default:
1310 break;
1312 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1316 * Recover reply buffers from pool.
1317 * This happens when recovering from error conditions.
1318 * Post-increment counter/array index.
1320 void
1321 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1323 struct rpcrdma_buffer *buffers = req->rl_buffer;
1324 unsigned long flags;
1326 if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
1327 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1328 spin_lock_irqsave(&buffers->rb_lock, flags);
1329 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1330 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1331 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1333 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1337 * Put reply buffers back into pool when not attached to
1338 * request. This happens in error conditions, and when
1339 * aborting unbinds. Pre-decrement counter/array index.
1341 void
1342 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1344 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1345 unsigned long flags;
1347 rep->rr_func = NULL;
1348 spin_lock_irqsave(&buffers->rb_lock, flags);
1349 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1350 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1354 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1358 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1359 struct ib_mr **mrp, struct ib_sge *iov)
1361 struct ib_phys_buf ipb;
1362 struct ib_mr *mr;
1363 int rc;
1366 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1368 iov->addr = ib_dma_map_single(ia->ri_id->device,
1369 va, len, DMA_BIDIRECTIONAL);
1370 iov->length = len;
1372 if (ia->ri_have_dma_lkey) {
1373 *mrp = NULL;
1374 iov->lkey = ia->ri_dma_lkey;
1375 return 0;
1376 } else if (ia->ri_bind_mem != NULL) {
1377 *mrp = NULL;
1378 iov->lkey = ia->ri_bind_mem->lkey;
1379 return 0;
1382 ipb.addr = iov->addr;
1383 ipb.size = iov->length;
1384 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1385 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1387 dprintk("RPC: %s: phys convert: 0x%llx "
1388 "registered 0x%llx length %d\n",
1389 __func__, (unsigned long long)ipb.addr,
1390 (unsigned long long)iov->addr, len);
1392 if (IS_ERR(mr)) {
1393 *mrp = NULL;
1394 rc = PTR_ERR(mr);
1395 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1396 } else {
1397 *mrp = mr;
1398 iov->lkey = mr->lkey;
1399 rc = 0;
1402 return rc;
1406 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1407 struct ib_mr *mr, struct ib_sge *iov)
1409 int rc;
1411 ib_dma_unmap_single(ia->ri_id->device,
1412 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1414 if (NULL == mr)
1415 return 0;
1417 rc = ib_dereg_mr(mr);
1418 if (rc)
1419 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1420 return rc;
1424 * Wrappers for chunk registration, shared by read/write chunk code.
1427 static void
1428 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1430 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1431 seg->mr_dmalen = seg->mr_len;
1432 if (seg->mr_page)
1433 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1434 seg->mr_page, offset_in_page(seg->mr_offset),
1435 seg->mr_dmalen, seg->mr_dir);
1436 else
1437 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1438 seg->mr_offset,
1439 seg->mr_dmalen, seg->mr_dir);
1442 static void
1443 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1445 if (seg->mr_page)
1446 ib_dma_unmap_page(ia->ri_id->device,
1447 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1448 else
1449 ib_dma_unmap_single(ia->ri_id->device,
1450 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1453 static int
1454 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1455 int *nsegs, int writing, struct rpcrdma_ia *ia,
1456 struct rpcrdma_xprt *r_xprt)
1458 struct rpcrdma_mr_seg *seg1 = seg;
1459 struct ib_send_wr frmr_wr, *bad_wr;
1460 u8 key;
1461 int len, pageoff;
1462 int i, rc;
1464 pageoff = offset_in_page(seg1->mr_offset);
1465 seg1->mr_offset -= pageoff; /* start of page */
1466 seg1->mr_len += pageoff;
1467 len = -pageoff;
1468 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1469 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1470 for (i = 0; i < *nsegs;) {
1471 rpcrdma_map_one(ia, seg, writing);
1472 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1473 len += seg->mr_len;
1474 ++seg;
1475 ++i;
1476 /* Check for holes */
1477 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1478 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1479 break;
1481 dprintk("RPC: %s: Using frmr %p to map %d segments\n",
1482 __func__, seg1->mr_chunk.rl_mw, i);
1484 /* Bump the key */
1485 key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1486 ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1488 /* Prepare FRMR WR */
1489 memset(&frmr_wr, 0, sizeof frmr_wr);
1490 frmr_wr.opcode = IB_WR_FAST_REG_MR;
1491 frmr_wr.send_flags = 0; /* unsignaled */
1492 frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1493 frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1494 frmr_wr.wr.fast_reg.page_list_len = i;
1495 frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1496 frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1497 frmr_wr.wr.fast_reg.access_flags = (writing ?
1498 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1499 IB_ACCESS_REMOTE_READ);
1500 frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1501 DECR_CQCOUNT(&r_xprt->rx_ep);
1503 rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1505 if (rc) {
1506 dprintk("RPC: %s: failed ib_post_send for register,"
1507 " status %i\n", __func__, rc);
1508 while (i--)
1509 rpcrdma_unmap_one(ia, --seg);
1510 } else {
1511 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1512 seg1->mr_base = seg1->mr_dma + pageoff;
1513 seg1->mr_nsegs = i;
1514 seg1->mr_len = len;
1516 *nsegs = i;
1517 return rc;
1520 static int
1521 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1522 struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1524 struct rpcrdma_mr_seg *seg1 = seg;
1525 struct ib_send_wr invalidate_wr, *bad_wr;
1526 int rc;
1528 while (seg1->mr_nsegs--)
1529 rpcrdma_unmap_one(ia, seg++);
1531 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1532 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1533 invalidate_wr.send_flags = 0; /* unsignaled */
1534 invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1535 DECR_CQCOUNT(&r_xprt->rx_ep);
1537 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1538 if (rc)
1539 dprintk("RPC: %s: failed ib_post_send for invalidate,"
1540 " status %i\n", __func__, rc);
1541 return rc;
1544 static int
1545 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1546 int *nsegs, int writing, struct rpcrdma_ia *ia)
1548 struct rpcrdma_mr_seg *seg1 = seg;
1549 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1550 int len, pageoff, i, rc;
1552 pageoff = offset_in_page(seg1->mr_offset);
1553 seg1->mr_offset -= pageoff; /* start of page */
1554 seg1->mr_len += pageoff;
1555 len = -pageoff;
1556 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1557 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1558 for (i = 0; i < *nsegs;) {
1559 rpcrdma_map_one(ia, seg, writing);
1560 physaddrs[i] = seg->mr_dma;
1561 len += seg->mr_len;
1562 ++seg;
1563 ++i;
1564 /* Check for holes */
1565 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1566 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1567 break;
1569 rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1570 physaddrs, i, seg1->mr_dma);
1571 if (rc) {
1572 dprintk("RPC: %s: failed ib_map_phys_fmr "
1573 "%u@0x%llx+%i (%d)... status %i\n", __func__,
1574 len, (unsigned long long)seg1->mr_dma,
1575 pageoff, i, rc);
1576 while (i--)
1577 rpcrdma_unmap_one(ia, --seg);
1578 } else {
1579 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1580 seg1->mr_base = seg1->mr_dma + pageoff;
1581 seg1->mr_nsegs = i;
1582 seg1->mr_len = len;
1584 *nsegs = i;
1585 return rc;
1588 static int
1589 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1590 struct rpcrdma_ia *ia)
1592 struct rpcrdma_mr_seg *seg1 = seg;
1593 LIST_HEAD(l);
1594 int rc;
1596 list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1597 rc = ib_unmap_fmr(&l);
1598 while (seg1->mr_nsegs--)
1599 rpcrdma_unmap_one(ia, seg++);
1600 if (rc)
1601 dprintk("RPC: %s: failed ib_unmap_fmr,"
1602 " status %i\n", __func__, rc);
1603 return rc;
1606 static int
1607 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1608 int *nsegs, int writing, struct rpcrdma_ia *ia,
1609 struct rpcrdma_xprt *r_xprt)
1611 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1612 IB_ACCESS_REMOTE_READ);
1613 struct ib_mw_bind param;
1614 int rc;
1616 *nsegs = 1;
1617 rpcrdma_map_one(ia, seg, writing);
1618 param.mr = ia->ri_bind_mem;
1619 param.wr_id = 0ULL; /* no send cookie */
1620 param.addr = seg->mr_dma;
1621 param.length = seg->mr_len;
1622 param.send_flags = 0;
1623 param.mw_access_flags = mem_priv;
1625 DECR_CQCOUNT(&r_xprt->rx_ep);
1626 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1627 if (rc) {
1628 dprintk("RPC: %s: failed ib_bind_mw "
1629 "%u@0x%llx status %i\n",
1630 __func__, seg->mr_len,
1631 (unsigned long long)seg->mr_dma, rc);
1632 rpcrdma_unmap_one(ia, seg);
1633 } else {
1634 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1635 seg->mr_base = param.addr;
1636 seg->mr_nsegs = 1;
1638 return rc;
1641 static int
1642 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1643 struct rpcrdma_ia *ia,
1644 struct rpcrdma_xprt *r_xprt, void **r)
1646 struct ib_mw_bind param;
1647 LIST_HEAD(l);
1648 int rc;
1650 BUG_ON(seg->mr_nsegs != 1);
1651 param.mr = ia->ri_bind_mem;
1652 param.addr = 0ULL; /* unbind */
1653 param.length = 0;
1654 param.mw_access_flags = 0;
1655 if (*r) {
1656 param.wr_id = (u64) (unsigned long) *r;
1657 param.send_flags = IB_SEND_SIGNALED;
1658 INIT_CQCOUNT(&r_xprt->rx_ep);
1659 } else {
1660 param.wr_id = 0ULL;
1661 param.send_flags = 0;
1662 DECR_CQCOUNT(&r_xprt->rx_ep);
1664 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1665 rpcrdma_unmap_one(ia, seg);
1666 if (rc)
1667 dprintk("RPC: %s: failed ib_(un)bind_mw,"
1668 " status %i\n", __func__, rc);
1669 else
1670 *r = NULL; /* will upcall on completion */
1671 return rc;
1674 static int
1675 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1676 int *nsegs, int writing, struct rpcrdma_ia *ia)
1678 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1679 IB_ACCESS_REMOTE_READ);
1680 struct rpcrdma_mr_seg *seg1 = seg;
1681 struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1682 int len, i, rc = 0;
1684 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1685 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1686 for (len = 0, i = 0; i < *nsegs;) {
1687 rpcrdma_map_one(ia, seg, writing);
1688 ipb[i].addr = seg->mr_dma;
1689 ipb[i].size = seg->mr_len;
1690 len += seg->mr_len;
1691 ++seg;
1692 ++i;
1693 /* Check for holes */
1694 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1695 offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1696 break;
1698 seg1->mr_base = seg1->mr_dma;
1699 seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1700 ipb, i, mem_priv, &seg1->mr_base);
1701 if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1702 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1703 dprintk("RPC: %s: failed ib_reg_phys_mr "
1704 "%u@0x%llx (%d)... status %i\n",
1705 __func__, len,
1706 (unsigned long long)seg1->mr_dma, i, rc);
1707 while (i--)
1708 rpcrdma_unmap_one(ia, --seg);
1709 } else {
1710 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1711 seg1->mr_nsegs = i;
1712 seg1->mr_len = len;
1714 *nsegs = i;
1715 return rc;
1718 static int
1719 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1720 struct rpcrdma_ia *ia)
1722 struct rpcrdma_mr_seg *seg1 = seg;
1723 int rc;
1725 rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1726 seg1->mr_chunk.rl_mr = NULL;
1727 while (seg1->mr_nsegs--)
1728 rpcrdma_unmap_one(ia, seg++);
1729 if (rc)
1730 dprintk("RPC: %s: failed ib_dereg_mr,"
1731 " status %i\n", __func__, rc);
1732 return rc;
1736 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1737 int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1739 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1740 int rc = 0;
1742 switch (ia->ri_memreg_strategy) {
1744 #if RPCRDMA_PERSISTENT_REGISTRATION
1745 case RPCRDMA_ALLPHYSICAL:
1746 rpcrdma_map_one(ia, seg, writing);
1747 seg->mr_rkey = ia->ri_bind_mem->rkey;
1748 seg->mr_base = seg->mr_dma;
1749 seg->mr_nsegs = 1;
1750 nsegs = 1;
1751 break;
1752 #endif
1754 /* Registration using frmr registration */
1755 case RPCRDMA_FRMR:
1756 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1757 break;
1759 /* Registration using fmr memory registration */
1760 case RPCRDMA_MTHCAFMR:
1761 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1762 break;
1764 /* Registration using memory windows */
1765 case RPCRDMA_MEMWINDOWS_ASYNC:
1766 case RPCRDMA_MEMWINDOWS:
1767 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1768 break;
1770 /* Default registration each time */
1771 default:
1772 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1773 break;
1775 if (rc)
1776 return -1;
1778 return nsegs;
1782 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1783 struct rpcrdma_xprt *r_xprt, void *r)
1785 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1786 int nsegs = seg->mr_nsegs, rc;
1788 switch (ia->ri_memreg_strategy) {
1790 #if RPCRDMA_PERSISTENT_REGISTRATION
1791 case RPCRDMA_ALLPHYSICAL:
1792 BUG_ON(nsegs != 1);
1793 rpcrdma_unmap_one(ia, seg);
1794 rc = 0;
1795 break;
1796 #endif
1798 case RPCRDMA_FRMR:
1799 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1800 break;
1802 case RPCRDMA_MTHCAFMR:
1803 rc = rpcrdma_deregister_fmr_external(seg, ia);
1804 break;
1806 case RPCRDMA_MEMWINDOWS_ASYNC:
1807 case RPCRDMA_MEMWINDOWS:
1808 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1809 break;
1811 default:
1812 rc = rpcrdma_deregister_default_external(seg, ia);
1813 break;
1815 if (r) {
1816 struct rpcrdma_rep *rep = r;
1817 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1818 rep->rr_func = NULL;
1819 func(rep); /* dereg done, callback now */
1821 return nsegs;
1825 * Prepost any receive buffer, then post send.
1827 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1830 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1831 struct rpcrdma_ep *ep,
1832 struct rpcrdma_req *req)
1834 struct ib_send_wr send_wr, *send_wr_fail;
1835 struct rpcrdma_rep *rep = req->rl_reply;
1836 int rc;
1838 if (rep) {
1839 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1840 if (rc)
1841 goto out;
1842 req->rl_reply = NULL;
1845 send_wr.next = NULL;
1846 send_wr.wr_id = 0ULL; /* no send cookie */
1847 send_wr.sg_list = req->rl_send_iov;
1848 send_wr.num_sge = req->rl_niovs;
1849 send_wr.opcode = IB_WR_SEND;
1850 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1851 ib_dma_sync_single_for_device(ia->ri_id->device,
1852 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1853 DMA_TO_DEVICE);
1854 ib_dma_sync_single_for_device(ia->ri_id->device,
1855 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1856 DMA_TO_DEVICE);
1857 ib_dma_sync_single_for_device(ia->ri_id->device,
1858 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1859 DMA_TO_DEVICE);
1861 if (DECR_CQCOUNT(ep) > 0)
1862 send_wr.send_flags = 0;
1863 else { /* Provider must take a send completion every now and then */
1864 INIT_CQCOUNT(ep);
1865 send_wr.send_flags = IB_SEND_SIGNALED;
1868 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1869 if (rc)
1870 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1871 rc);
1872 out:
1873 return rc;
1877 * (Re)post a receive buffer.
1880 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1881 struct rpcrdma_ep *ep,
1882 struct rpcrdma_rep *rep)
1884 struct ib_recv_wr recv_wr, *recv_wr_fail;
1885 int rc;
1887 recv_wr.next = NULL;
1888 recv_wr.wr_id = (u64) (unsigned long) rep;
1889 recv_wr.sg_list = &rep->rr_iov;
1890 recv_wr.num_sge = 1;
1892 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1893 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1895 DECR_CQCOUNT(ep);
1896 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1898 if (rc)
1899 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1900 rc);
1901 return rc;