2 * Unix SMB/CIFS implementation.
3 * Wrap Infiniband calls.
5 * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006
7 * Major code contributions by Peter Somogyi <psomogyi@gamax.hu>
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <sys/socket.h>
32 #include <arpa/inet.h>
38 #include "lib/events/events.h"
39 #include "ibwrapper.h"
41 #include <rdma/rdma_cma.h>
43 #include "ibwrapper_internal.h"
44 #include "lib/util/dlinklist.h"
46 #define IBW_LASTERR_BUFSIZE 512
47 static char ibw_lasterr
[IBW_LASTERR_BUFSIZE
];
49 static void ibw_event_handler_verbs(struct event_context
*ev
,
50 struct fd_event
*fde
, uint16_t flags
, void *private_data
);
51 static int ibw_fill_cq(struct ibw_conn
*conn
);
52 static inline int ibw_wc_recv(struct ibw_conn
*conn
, struct ibv_wc
*wc
);
53 static inline int ibw_wc_send(struct ibw_conn
*conn
, struct ibv_wc
*wc
);
55 static void *ibw_alloc_mr(struct ibw_ctx_priv
*pctx
, struct ibw_conn_priv
*pconn
,
56 uint32_t n
, struct ibv_mr
**ppmr
)
60 DEBUG(10, ("ibw_alloc_mr(cmid=%u, n=%u)\n", (uint32_t)pconn
->cm_id
, n
));
61 buf
= memalign(pctx
->pagesize
, n
);
63 sprintf(ibw_lasterr
, "couldn't allocate memory\n");
67 *ppmr
= ibv_reg_mr(pctx
->pd
, buf
, n
, IBV_ACCESS_LOCAL_WRITE
);
69 sprintf(ibw_lasterr
, "couldn't allocate mr\n");
77 static void ibw_free_mr(char **ppbuf
, struct ibv_mr
**ppmr
)
79 DEBUG(10, ("ibw_free_mr(%u %u)\n", (uint32_t)*ppbuf
, (uint32_t)*ppmr
));
90 static int ibw_init_memory(struct ibw_conn
*conn
)
92 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
93 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
94 struct ibw_opts
*opts
= &pctx
->opts
;
98 DEBUG(10, ("ibw_init_memory(cmid: %u)\n", (uint32_t)pconn
->cm_id
));
99 pconn
->buf_send
= ibw_alloc_mr(pctx
, pconn
,
100 opts
->max_send_wr
* opts
->avg_send_size
, &pconn
->mr_send
);
101 if (!pconn
->buf_send
) {
102 sprintf(ibw_lasterr
, "couldn't allocate work send buf\n");
106 pconn
->buf_recv
= ibw_alloc_mr(pctx
, pconn
,
107 opts
->max_recv_wr
* opts
->recv_bufsize
, &pconn
->mr_recv
);
108 if (!pconn
->buf_recv
) {
109 sprintf(ibw_lasterr
, "couldn't allocate work recv buf\n");
113 pconn
->wr_index
= talloc_size(pconn
, opts
->max_send_wr
* sizeof(struct ibw_wr
*));
114 assert(pconn
->wr_index
!=NULL
);
116 for(i
=0; i
<opts
->max_send_wr
; i
++) {
117 p
= pconn
->wr_index
[i
] = talloc_zero(pconn
, struct ibw_wr
);
118 p
->msg
= pconn
->buf_send
+ (i
* opts
->avg_send_size
);
119 p
->wr_id
= i
+ opts
->max_recv_wr
;
121 DLIST_ADD(pconn
->wr_list_avail
, p
);
127 static int ibw_ctx_priv_destruct(struct ibw_ctx_priv
*pctx
)
129 DEBUG(10, ("ibw_ctx_priv_destruct(%u)\n", (uint32_t)pctx
));
132 ibv_dealloc_pd(pctx
->pd
);
137 if (pctx
->cm_channel
) {
138 rdma_destroy_event_channel(pctx
->cm_channel
);
139 pctx
->cm_channel
= NULL
;
141 if (pctx
->cm_channel_event
) {
142 /* TODO: do we have to do this here? */
143 talloc_free(pctx
->cm_channel_event
);
144 pctx
->cm_channel_event
= NULL
;
147 rdma_destroy_id(pctx
->cm_id
);
154 static int ibw_ctx_destruct(struct ibw_ctx
*ctx
)
156 DEBUG(10, ("ibw_ctx_destruct(%u)\n", (uint32_t)ctx
));
160 static int ibw_conn_priv_destruct(struct ibw_conn_priv
*pconn
)
162 DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %u)\n",
163 (uint32_t)pconn
, (uint32_t)pconn
->cm_id
));
165 /* free memory regions */
166 ibw_free_mr(&pconn
->buf_send
, &pconn
->mr_send
);
167 ibw_free_mr(&pconn
->buf_recv
, &pconn
->mr_recv
);
169 /* pconn->wr_index is freed by talloc */
170 /* pconn->wr_index[i] are freed by talloc */
173 if (pconn
->cm_id
->qp
) {
174 ibv_destroy_qp(pconn
->cm_id
->qp
);
175 pconn
->cm_id
->qp
= NULL
;
178 ibv_destroy_cq(pconn
->cq
);
181 if (pconn
->verbs_channel
) {
182 ibv_destroy_comp_channel(pconn
->verbs_channel
);
183 pconn
->verbs_channel
= NULL
;
185 if (pconn
->verbs_channel_event
) {
186 /* TODO: do we have to do this here? */
187 talloc_free(pconn
->verbs_channel_event
);
188 pconn
->verbs_channel_event
= NULL
;
191 rdma_destroy_id(pconn
->cm_id
);
197 static int ibw_conn_destruct(struct ibw_conn
*conn
)
199 DEBUG(10, ("ibw_conn_destruct(%u)\n", (uint32_t)conn
));
201 /* important here: ctx is a talloc _parent_ */
202 DLIST_REMOVE(conn
->ctx
->conn_list
, conn
);
206 static struct ibw_conn
*ibw_conn_new(struct ibw_ctx
*ctx
)
208 struct ibw_conn
*conn
;
209 struct ibw_conn_priv
*pconn
;
211 conn
= talloc_zero(ctx
, struct ibw_conn
);
213 talloc_set_destructor(conn
, ibw_conn_destruct
);
215 pconn
= talloc_zero(ctx
, struct ibw_conn_priv
);
217 talloc_set_destructor(pconn
, ibw_conn_priv_destruct
);
221 DLIST_ADD(ctx
->conn_list
, conn
);
226 static int ibw_setup_cq_qp(struct ibw_conn
*conn
)
228 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
229 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
230 struct ibv_qp_init_attr init_attr
;
233 DEBUG(10, ("ibw_setup_cq_qp(cmid: %u)\n", (uint32_t)pconn
->cm_id
));
236 if (ibw_init_memory(conn
))
240 pconn
->verbs_channel
= ibv_create_comp_channel(pconn
->cm_id
->verbs
);
241 if (!pconn
->verbs_channel
) {
242 sprintf(ibw_lasterr
, "ibv_create_comp_channel failed %d\n", errno
);
245 DEBUG(10, ("created channel %p\n", pconn
->verbs_channel
));
247 pconn
->verbs_channel_event
= event_add_fd(pctx
->ectx
, conn
,
248 pconn
->verbs_channel
->fd
, EVENT_FD_READ
, ibw_event_handler_verbs
, conn
);
251 pconn
->cq
= ibv_create_cq(pconn
->cm_id
->verbs
,
252 pctx
->opts
.max_recv_wr
+ pctx
->opts
.max_send_wr
,
253 conn
, pconn
->verbs_channel
, 0);
254 if (pconn
->cq
==NULL
) {
255 sprintf(ibw_lasterr
, "ibv_create_cq failed\n");
259 rc
= ibv_req_notify_cq(pconn
->cq
, 0);
261 sprintf(ibw_lasterr
, "ibv_req_notify_cq failed with %d\n", rc
);
266 memset(&init_attr
, 0, sizeof(init_attr
));
267 init_attr
.cap
.max_send_wr
= pctx
->opts
.max_send_wr
;
268 init_attr
.cap
.max_recv_wr
= pctx
->opts
.max_recv_wr
;
269 init_attr
.cap
.max_recv_sge
= 1;
270 init_attr
.cap
.max_send_sge
= 1;
271 init_attr
.qp_type
= IBV_QPT_RC
;
272 init_attr
.send_cq
= pconn
->cq
;
273 init_attr
.recv_cq
= pconn
->cq
;
275 rc
= rdma_create_qp(pconn
->cm_id
, pctx
->pd
, &init_attr
);
277 sprintf(ibw_lasterr
, "rdma_create_qp failed with %d\n", rc
);
280 /* elase result is in pconn->cm_id->qp */
282 return ibw_fill_cq(conn
);
285 static int ibw_refill_cq_recv(struct ibw_conn
*conn
)
287 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
288 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
290 struct ibv_sge list
= {
291 .addr
= (uintptr_t) NULL
,
292 .length
= pctx
->opts
.recv_bufsize
,
293 .lkey
= pconn
->mr_recv
->lkey
295 struct ibv_recv_wr wr
= {
300 struct ibv_recv_wr
*bad_wr
;
302 DEBUG(10, ("ibw_refill_cq_recv(cmid: %u)\n", (uint32_t)pconn
->cm_id
));
304 list
.addr
= (uintptr_t) pconn
->buf_recv
+ pctx
->opts
.recv_bufsize
* pconn
->recv_index
;
305 wr
.wr_id
= pconn
->recv_index
;
306 pconn
->recv_index
= (pconn
->recv_index
+ 1) % pctx
->opts
.max_recv_wr
;
308 rc
= ibv_post_recv(pconn
->cm_id
->qp
, &wr
, &bad_wr
);
310 sprintf(ibw_lasterr
, "ibv_post_recv failed with %d\n", rc
);
311 DEBUG(0, (ibw_lasterr
));
318 static int ibw_fill_cq(struct ibw_conn
*conn
)
320 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
321 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
323 struct ibv_sge list
= {
324 .addr
= (uintptr_t) NULL
,
325 .length
= pctx
->opts
.recv_bufsize
,
326 .lkey
= pconn
->mr_recv
->lkey
328 struct ibv_recv_wr wr
= {
333 struct ibv_recv_wr
*bad_wr
;
335 DEBUG(10, ("ibw_fill_cq(cmid: %u)\n", (uint32_t)pconn
->cm_id
));
337 for(i
= pctx
->opts
.max_recv_wr
; i
!=0; i
--) {
338 list
.addr
= (uintptr_t) pconn
->buf_recv
+ pctx
->opts
.recv_bufsize
* pconn
->recv_index
;
339 wr
.wr_id
= pconn
->recv_index
;
340 pconn
->recv_index
= (pconn
->recv_index
+ 1) % pctx
->opts
.max_recv_wr
;
342 rc
= ibv_post_recv(pconn
->cm_id
->qp
, &wr
, &bad_wr
);
344 sprintf(ibw_lasterr
, "ibv_post_recv failed with %d\n", rc
);
345 DEBUG(0, (ibw_lasterr
));
353 static int ibw_manage_connect(struct ibw_conn
*conn
, struct rdma_cm_id
*cma_id
)
355 struct rdma_conn_param conn_param
;
358 DEBUG(10, ("ibw_manage_connect(cmid: %u)", (uint32_t)cma_id
));
359 rc
= ibw_setup_cq_qp(conn
);
364 memset(&conn_param
, 0, sizeof conn_param
);
365 conn_param
.responder_resources
= 1;
366 conn_param
.initiator_depth
= 1;
367 conn_param
.retry_count
= 10;
369 rc
= rdma_connect(cma_id
, &conn_param
);
371 sprintf(ibw_lasterr
, "rdma_connect error %d\n", rc
);
376 static void ibw_event_handler_cm(struct event_context
*ev
,
377 struct fd_event
*fde
, uint16_t flags
, void *private_data
)
380 struct ibw_ctx
*ctx
= talloc_get_type(private_data
, struct ibw_ctx
);
381 struct ibw_ctx_priv
*pctx
= talloc_get_type(ctx
->internal
, struct ibw_ctx_priv
);
382 struct ibw_conn
*conn
= NULL
;
383 struct ibw_conn_priv
*pconn
= NULL
;
384 struct rdma_cm_id
*cma_id
= NULL
;
385 struct rdma_cm_event
*event
= NULL
;
389 rc
= rdma_get_cm_event(pctx
->cm_channel
, &event
);
391 ctx
->state
= IBWS_ERROR
;
392 sprintf(ibw_lasterr
, "rdma_get_cm_event error %d\n", rc
);
397 DEBUG(10, ("cma_event type %d cma_id %p (%s)\n", event
->event
, cma_id
,
398 (cma_id
== pctx
->cm_id
) ? "parent" : "child"));
400 switch (event
->event
) {
401 case RDMA_CM_EVENT_ADDR_RESOLVED
:
402 /* continuing from ibw_connect ... */
403 rc
= rdma_resolve_route(cma_id
, 2000);
405 sprintf(ibw_lasterr
, "rdma_resolve_route error %d\n", rc
);
408 /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */
411 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
412 /* after RDMA_CM_EVENT_ADDR_RESOLVED: */
413 assert(cma_id
->context
!=NULL
);
414 conn
= talloc_get_type(cma_id
->context
, struct ibw_conn
);
416 rc
= ibw_manage_connect(conn
, cma_id
);
422 case RDMA_CM_EVENT_CONNECT_REQUEST
:
423 ctx
->state
= IBWS_CONNECT_REQUEST
;
424 conn
= ibw_conn_new(ctx
);
425 pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
426 pconn
->cm_id
= cma_id
; /* !!! event will be freed but id not */
427 cma_id
->context
= (void *)conn
;
428 DEBUG(10, ("pconn->cm_id %p\n", pconn
->cm_id
));
430 conn
->state
= IBWC_INIT
;
431 pctx
->connstate_func(ctx
, conn
);
433 /* continued at ibw_accept when invoked by the func above */
434 if (!pconn
->is_accepted
) {
436 DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn
->cm_id
));
438 if (ibw_setup_cq_qp(conn
))
442 /* TODO: clarify whether if it's needed by upper layer: */
443 ctx
->state
= IBWS_READY
;
444 pctx
->connstate_func(ctx
, NULL
);
446 /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */
449 case RDMA_CM_EVENT_ESTABLISHED
:
450 /* expected after ibw_accept and ibw_connect[not directly] */
451 DEBUG(0, ("ESTABLISHED (conn: %u)\n", (unsigned int)cma_id
->context
));
452 conn
= talloc_get_type(cma_id
->context
, struct ibw_conn
);
453 assert(conn
!=NULL
); /* important assumption */
455 /* client conn is up */
456 conn
->state
= IBWC_CONNECTED
;
458 /* both ctx and conn have changed */
459 pctx
->connstate_func(ctx
, conn
);
462 case RDMA_CM_EVENT_ADDR_ERROR
:
463 case RDMA_CM_EVENT_ROUTE_ERROR
:
464 case RDMA_CM_EVENT_CONNECT_ERROR
:
465 case RDMA_CM_EVENT_UNREACHABLE
:
466 case RDMA_CM_EVENT_REJECTED
:
467 sprintf(ibw_lasterr
, "cma event %d, error %d\n", event
->event
, event
->status
);
470 case RDMA_CM_EVENT_DISCONNECTED
:
471 if (cma_id
!=pctx
->cm_id
) {
472 DEBUG(0, ("client DISCONNECT event\n"));
473 conn
= talloc_get_type(cma_id
->context
, struct ibw_conn
);
474 conn
->state
= IBWC_DISCONNECTED
;
475 pctx
->connstate_func(NULL
, conn
);
479 /* if we are the last... */
480 if (ctx
->conn_list
==NULL
)
481 rdma_disconnect(pctx
->cm_id
);
483 DEBUG(0, ("server DISCONNECT event\n"));
484 ctx
->state
= IBWS_STOPPED
; /* ??? TODO: try it... */
485 /* talloc_free(ctx) should be called within or after this func */
486 pctx
->connstate_func(ctx
, NULL
);
490 case RDMA_CM_EVENT_DEVICE_REMOVAL
:
491 sprintf(ibw_lasterr
, "cma detected device removal!\n");
495 sprintf(ibw_lasterr
, "unknown event %d\n", event
->event
);
499 if ((rc
=rdma_ack_cm_event(event
))) {
500 sprintf(ibw_lasterr
, "rdma_ack_cm_event failed with %d\n", rc
);
506 DEBUG(0, ("cm event handler: %s", ibw_lasterr
));
507 if (cma_id
!=pctx
->cm_id
) {
508 conn
= talloc_get_type(cma_id
->context
, struct ibw_conn
);
510 conn
->state
= IBWC_ERROR
;
511 pctx
->connstate_func(NULL
, conn
);
513 ctx
->state
= IBWS_ERROR
;
514 pctx
->connstate_func(ctx
, NULL
);
518 static void ibw_event_handler_verbs(struct event_context
*ev
,
519 struct fd_event
*fde
, uint16_t flags
, void *private_data
)
521 struct ibw_conn
*conn
= talloc_get_type(private_data
, struct ibw_conn
);
522 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
523 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
527 struct ibv_cq
*ev_cq
;
530 DEBUG(10, ("ibw_event_handler_verbs(%u)\n", (uint32_t)flags
));
532 /* TODO: check whether if it's good to have more channels here... */
533 rc
= ibv_get_cq_event(pconn
->verbs_channel
, &ev_cq
, &ev_ctx
);
535 sprintf(ibw_lasterr
, "Failed to get cq_event with %d\n", rc
);
538 if (ev_cq
!= pconn
->cq
) {
539 sprintf(ibw_lasterr
, "ev_cq(%u) != pconn->cq(%u)\n",
540 (unsigned int)ev_cq
, (unsigned int)pconn
->cq
);
543 rc
= ibv_req_notify_cq(pconn
->cq
, 0);
545 sprintf(ibw_lasterr
, "Couldn't request CQ notification (%d)\n", rc
);
549 while((rc
=ibv_poll_cq(pconn
->cq
, 1, &wc
))==1) {
551 sprintf(ibw_lasterr
, "cq completion failed status %d\n",
558 DEBUG(10, ("send completion\n"));
559 if (ibw_wc_send(conn
, &wc
))
563 case IBV_WC_RDMA_WRITE
:
564 DEBUG(10, ("rdma write completion\n"));
567 case IBV_WC_RDMA_READ
:
568 DEBUG(10, ("rdma read completion\n"));
572 DEBUG(10, ("recv completion\n"));
573 if (ibw_wc_recv(conn
, &wc
))
578 sprintf(ibw_lasterr
, "unknown completion %d\n", wc
.opcode
);
583 sprintf(ibw_lasterr
, "ibv_poll_cq error %d\n", rc
);
589 DEBUG(0, (ibw_lasterr
));
590 conn
->state
= IBWC_ERROR
;
591 pctx
->connstate_func(NULL
, conn
);
594 static inline int ibw_wc_send(struct ibw_conn
*conn
, struct ibv_wc
*wc
)
596 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
597 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
601 DEBUG(10, ("ibw_wc_send(cmid: %u, wr_id: %u, bl: %u)\n",
602 (uint32_t)pconn
->cm_id
, (uint32_t)wc
->wr_id
, (uint32_t)wc
->byte_len
));
604 assert(pconn
->cm_id
->qp
->qp_num
==wc
->qp_num
);
605 assert(wc
->wr_id
> pctx
->opts
.max_recv_wr
);
606 send_index
= wc
->wr_id
- pctx
->opts
.max_recv_wr
;
609 if (send_index
< pctx
->opts
.max_send_wr
) {
610 DEBUG(10, ("ibw_wc_send#1 %u", (int)wc
->wr_id
));
611 p
= pconn
->wr_index
[send_index
];
613 ibw_free_mr(&p
->msg_large
, &p
->mr_large
);
614 DLIST_REMOVE(pconn
->wr_list_used
, p
);
615 DLIST_ADD(pconn
->wr_list_avail
, p
);
616 } else { /* "extra" request - not optimized */
617 DEBUG(10, ("ibw_wc_send#2 %u", (int)wc
->wr_id
));
618 for(p
=pconn
->extra_sent
; p
!=NULL
; p
=p
->next
)
619 if (p
->wr_id
==(int)wc
->wr_id
)
622 sprintf(ibw_lasterr
, "failed to find wr_id %d\n", (int)wc
->wr_id
);
625 ibw_free_mr(&p
->msg_large
, &p
->mr_large
);
626 DLIST_REMOVE(pconn
->extra_sent
, p
);
627 DLIST_ADD(pconn
->extra_avail
, p
);
633 DEBUG(10, ("ibw_wc_send#queue %u", (int)wc
->wr_id
));
635 DLIST_REMOVE(pconn
->queue
, p
);
637 buf
= (p
->msg_large
!=NULL
) ? p
->msg_large
: p
->msg
;
638 ibw_send(conn
, buf
, p
, ntohl(*(uint32_t *)buf
));
644 static inline int ibw_append_to_part(struct ibw_conn_priv
*pconn
,
645 struct ibw_part
*part
, char **pp
, uint32_t add_len
, int info
)
647 DEBUG(10, ("ibw_append_to_part: cmid=%u, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n",
648 (uint32_t)pconn
->cm_id
, part
->bufsize
, part
->len
, part
->to_read
, add_len
, info
));
650 /* allocate more if necessary - it's an "evergrowing" buffer... */
651 if (part
->len
+ add_len
> part
->bufsize
) {
652 if (part
->buf
==NULL
) {
653 assert(part
->len
==0);
654 part
->buf
= talloc_size(pconn
, add_len
);
655 if (part
->buf
==NULL
) {
656 sprintf(ibw_lasterr
, "recv talloc_size error (%u) #%d\n",
660 part
->bufsize
= add_len
;
662 part
->buf
= talloc_realloc_size(pconn
,
663 part
->buf
, part
->len
+ add_len
);
664 if (part
->buf
==NULL
) {
665 sprintf(ibw_lasterr
, "recv realloc error (%u + %u) #%d\n",
666 part
->len
, add_len
, info
);
670 part
->bufsize
= part
->len
+ add_len
;
674 memcpy(part
->buf
+ part
->len
, *pp
, add_len
);
676 part
->len
+= add_len
;
677 part
->to_read
-= add_len
;
682 static inline int ibw_wc_mem_threshold(struct ibw_conn_priv
*pconn
,
683 struct ibw_part
*part
, uint32_t threshold
)
685 DEBUG(10, ("ibw_wc_mem_threshold: cmid=%u, (bs=%u, len=%u, tr=%u), thr=%u\n",
686 (uint32_t)pconn
->cm_id
, part
->bufsize
, part
->len
, part
->to_read
, threshold
));
688 if (part
->bufsize
> threshold
) {
689 DEBUG(3, ("ibw_wc_mem_threshold: cmid=%u, %u > %u\n",
690 (uint32_t)pconn
->cm_id
, part
->bufsize
, threshold
));
691 talloc_free(part
->buf
);
692 part
->buf
= talloc_size(pconn
, threshold
);
693 if (part
->buf
==NULL
) {
694 sprintf(ibw_lasterr
, "talloc_size failed\n");
697 part
->bufsize
= threshold
;
702 static inline int ibw_wc_recv(struct ibw_conn
*conn
, struct ibv_wc
*wc
)
704 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
705 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
706 struct ibw_part
*part
= &pconn
->part
;
708 uint32_t remain
= wc
->byte_len
;
710 DEBUG(10, ("ibw_wc_recv: cmid=%u, wr_id: %u, bl: %u\n",
711 (uint32_t)pconn
->cm_id
, (uint32_t)wc
->wr_id
, remain
));
713 assert(pconn
->cm_id
->qp
->qp_num
==wc
->qp_num
);
714 assert((int)wc
->wr_id
< pctx
->opts
.max_recv_wr
);
715 assert(wc
->byte_len
<= pctx
->opts
.recv_bufsize
);
717 p
= pconn
->buf_recv
+ ((int)wc
->wr_id
* pctx
->opts
.recv_bufsize
);
720 /* here always true: (part->len!=0 && part->to_read!=0) ||
721 (part->len==0 && part->to_read==0) */
722 if (part
->len
) { /* is there a partial msg to be continued? */
723 int read_len
= (part
->to_read
<=remain
) ? part
->to_read
: remain
;
724 if (ibw_append_to_part(pconn
, part
, &p
, read_len
, 421))
728 if (part
->len
<=sizeof(uint32_t) && part
->to_read
==0) {
729 assert(part
->len
==sizeof(uint32_t));
730 /* set it again now... */
731 part
->to_read
= ntohl(*((uint32_t *)(part
->buf
)));
732 if (part
->to_read
<sizeof(uint32_t)) {
733 sprintf(ibw_lasterr
, "got msglen=%u #2\n", part
->to_read
);
736 part
->to_read
-= sizeof(uint32_t); /* it's already read */
739 if (part
->to_read
==0) {
740 pctx
->receive_func(conn
, part
->buf
, part
->len
);
741 part
->len
= 0; /* tells not having partial data (any more) */
742 if (ibw_wc_mem_threshold(pconn
, part
, pctx
->opts
.recv_threshold
))
746 if (remain
>=sizeof(uint32_t)) {
747 uint32_t msglen
= ntohl(*(uint32_t *)p
);
748 if (msglen
<sizeof(uint32_t)) {
749 sprintf(ibw_lasterr
, "got msglen=%u\n", msglen
);
753 /* mostly awaited case: */
754 if (msglen
<=remain
) {
755 pctx
->receive_func(conn
, p
, msglen
);
759 part
->to_read
= msglen
;
760 /* part->len is already 0 */
761 if (ibw_append_to_part(pconn
, part
, &p
, remain
, 422))
763 remain
= 0; /* to be continued ... */
764 /* part->to_read > 0 here */
766 } else { /* edge case: */
767 part
->to_read
= sizeof(uint32_t);
768 /* part->len is already 0 */
769 if (ibw_append_to_part(pconn
, part
, &p
, remain
, 423))
772 /* part->to_read > 0 here */
775 } /* <remain> is always decreased at least by 1 */
777 if (ibw_refill_cq_recv(conn
))
783 DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr
));
787 static int ibw_process_init_attrs(struct ibw_initattr
*attr
, int nattr
, struct ibw_opts
*opts
)
790 const char *name
, *value
;
792 DEBUG(10, ("ibw_process_init_attrs: nattr: %d\n", nattr
));
794 opts
->max_send_wr
= 256;
795 opts
->max_recv_wr
= 1024;
796 opts
->avg_send_size
= 1024;
797 opts
->recv_bufsize
= 256;
798 opts
->recv_threshold
= 1 * 1024 * 1024;
800 for(i
=0; i
<nattr
; i
++) {
802 value
= attr
[i
].value
;
804 assert(name
!=NULL
&& value
!=NULL
);
805 if (strcmp(name
, "max_send_wr")==0)
806 opts
->max_send_wr
= atoi(value
);
807 else if (strcmp(name
, "max_recv_wr")==0)
808 opts
->max_recv_wr
= atoi(value
);
809 else if (strcmp(name
, "avg_send_size")==0)
810 opts
->avg_send_size
= atoi(value
);
811 else if (strcmp(name
, "recv_bufsize")==0)
812 opts
->recv_bufsize
= atoi(value
);
813 else if (strcmp(name
, "recv_threshold")==0)
814 opts
->recv_threshold
= atoi(value
);
816 sprintf(ibw_lasterr
, "ibw_init: unknown name %s\n", name
);
823 struct ibw_ctx
*ibw_init(struct ibw_initattr
*attr
, int nattr
,
825 ibw_connstate_fn_t ibw_connstate
,
826 ibw_receive_fn_t ibw_receive
,
827 struct event_context
*ectx
)
829 struct ibw_ctx
*ctx
= talloc_zero(NULL
, struct ibw_ctx
);
830 struct ibw_ctx_priv
*pctx
;
833 DEBUG(10, ("ibw_init(ctx_userdata: %u, ectx: %u)\n",
834 (uint32_t)ctx_userdata
, (uint32_t)ectx
));
836 /* initialize basic data structures */
837 memset(ibw_lasterr
, 0, IBW_LASTERR_BUFSIZE
);
840 ibw_lasterr
[0] = '\0';
841 talloc_set_destructor(ctx
, ibw_ctx_destruct
);
842 ctx
->ctx_userdata
= ctx_userdata
;
844 pctx
= talloc_zero(ctx
, struct ibw_ctx_priv
);
845 talloc_set_destructor(pctx
, ibw_ctx_priv_destruct
);
846 ctx
->internal
= (void *)pctx
;
849 pctx
->connstate_func
= ibw_connstate
;
850 pctx
->receive_func
= ibw_receive
;
854 /* process attributes */
855 if (ibw_process_init_attrs(attr
, nattr
, &pctx
->opts
))
859 pctx
->cm_channel
= rdma_create_event_channel();
860 if (!pctx
->cm_channel
) {
861 sprintf(ibw_lasterr
, "rdma_create_event_channel error %d\n", errno
);
865 pctx
->cm_channel_event
= event_add_fd(pctx
->ectx
, pctx
,
866 pctx
->cm_channel
->fd
, EVENT_FD_READ
, ibw_event_handler_cm
, ctx
);
868 rc
= rdma_create_id(pctx
->cm_channel
, &pctx
->cm_id
, ctx
, RDMA_PS_TCP
);
871 sprintf(ibw_lasterr
, "rdma_create_id error %d\n", rc
);
874 DEBUG(10, ("created cm_id %p\n", pctx
->cm_id
));
877 pctx
->pd
= ibv_alloc_pd(pctx
->cm_id
->verbs
);
879 sprintf(ibw_lasterr
, "ibv_alloc_pd failed %d\n", errno
);
882 DEBUG(10, ("created pd %p\n", pctx
->pd
));
884 pctx
->pagesize
= sysconf(_SC_PAGESIZE
);
887 /* don't put code here */
889 DEBUG(0, (ibw_lasterr
));
897 int ibw_stop(struct ibw_ctx
*ctx
)
901 DEBUG(10, ("ibw_stop\n"));
902 for(p
=ctx
->conn_list
; p
!=NULL
; p
=p
->next
) {
903 if (ctx
->state
==IBWC_ERROR
|| ctx
->state
==IBWC_CONNECTED
) {
904 if (ibw_disconnect(p
))
912 int ibw_bind(struct ibw_ctx
*ctx
, struct sockaddr_in
*my_addr
)
914 struct ibw_ctx_priv
*pctx
= (struct ibw_ctx_priv
*)ctx
->internal
;
917 DEBUG(10, ("ibw_bind: addr=%s, port=%u\n",
918 inet_ntoa(my_addr
->sin_addr
), my_addr
->sin_port
));
919 rc
= rdma_bind_addr(pctx
->cm_id
, (struct sockaddr
*) my_addr
);
921 sprintf(ibw_lasterr
, "rdma_bind_addr error %d\n", rc
);
922 DEBUG(0, (ibw_lasterr
));
925 DEBUG(10, ("rdma_bind_addr successful\n"));
930 int ibw_listen(struct ibw_ctx
*ctx
, int backlog
)
932 struct ibw_ctx_priv
*pctx
= talloc_get_type(ctx
->internal
, struct ibw_ctx_priv
);
935 DEBUG(10, ("ibw_listen\n"));
936 rc
= rdma_listen(pctx
->cm_id
, backlog
);
938 sprintf(ibw_lasterr
, "rdma_listen failed: %d\n", rc
);
939 DEBUG(0, (ibw_lasterr
));
946 int ibw_accept(struct ibw_ctx
*ctx
, struct ibw_conn
*conn
, void *conn_userdata
)
948 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
949 struct rdma_conn_param conn_param
;
952 DEBUG(10, ("ibw_accept: cmid=%u\n", (uint32_t)pconn
->cm_id
));
953 conn
->conn_userdata
= conn_userdata
;
955 memset(&conn_param
, 0, sizeof(struct rdma_conn_param
));
956 conn_param
.responder_resources
= 1;
957 conn_param
.initiator_depth
= 1;
958 rc
= rdma_accept(pconn
->cm_id
, &conn_param
);
960 sprintf(ibw_lasterr
, "rdma_accept failed %d\n", rc
);
961 DEBUG(0, (ibw_lasterr
));
965 pconn
->is_accepted
= 1;
967 /* continued at RDMA_CM_EVENT_ESTABLISHED */
972 int ibw_connect(struct ibw_ctx
*ctx
, struct sockaddr_in
*serv_addr
, void *conn_userdata
)
974 struct ibw_ctx_priv
*pctx
= talloc_get_type(ctx
->internal
, struct ibw_ctx_priv
);
975 struct ibw_conn
*conn
= NULL
;
976 struct ibw_conn_priv
*pconn
= NULL
;
979 DEBUG(10, ("ibw_connect: cmid=%u, addr=%s, port=%u\n", (uint32_t)pconn
->cm_id
,
980 inet_ntoa(serv_addr
->sin_addr
), serv_addr
->sin_port
));
981 conn
= ibw_conn_new(ctx
);
982 conn
->conn_userdata
= conn_userdata
;
983 pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
985 rc
= rdma_create_id(pctx
->cm_channel
, &pconn
->cm_id
, conn
, RDMA_PS_TCP
);
988 sprintf(ibw_lasterr
, "rdma_create_id error %d\n", rc
);
992 rc
= rdma_resolve_addr(pconn
->cm_id
, NULL
, (struct sockaddr
*) &serv_addr
, 2000);
994 sprintf(ibw_lasterr
, "rdma_resolve_addr error %d\n", rc
);
995 DEBUG(0, (ibw_lasterr
));
999 /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */
1004 int ibw_disconnect(struct ibw_conn
*conn
)
1007 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
1008 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
1010 DEBUG(10, ("ibw_disconnect: cmid=%u\n", (uint32_t)pconn
->cm_id
));
1012 rc
= rdma_disconnect(pctx
->cm_id
);
1014 sprintf(ibw_lasterr
, "ibw_disconnect failed with %d", rc
);
1015 DEBUG(0, (ibw_lasterr
));
1019 /* continued at RDMA_CM_EVENT_DISCONNECTED */
1024 int ibw_alloc_send_buf(struct ibw_conn
*conn
, void **buf
, void **key
, uint32_t len
)
1026 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
1027 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
1028 struct ibw_wr
*p
= pconn
->wr_list_avail
;
1031 DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%u, len=%d\n", (uint32_t)pconn
->cm_id
, len
));
1033 DLIST_REMOVE(pconn
->wr_list_avail
, p
);
1034 DLIST_ADD(pconn
->wr_list_used
, p
);
1036 if (len
+ sizeof(uint32_t) <= pctx
->opts
.avg_send_size
) {
1037 *buf
= (void *)(p
->msg
+ sizeof(uint32_t));
1039 p
->msg_large
= ibw_alloc_mr(pctx
, pconn
, len
+ sizeof(uint32_t), &p
->mr_large
);
1040 if (!p
->msg_large
) {
1041 sprintf(ibw_lasterr
, "ibw_alloc_mr#1 failed\n");
1044 *buf
= (void *)(p
->msg_large
+ sizeof(uint32_t));
1047 DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%u, len=%d\n", (uint32_t)pconn
->cm_id
, len
));
1049 p
= pconn
->extra_avail
;
1051 p
= pconn
->extra_avail
= talloc_zero(pconn
, struct ibw_wr
);
1053 sprintf(ibw_lasterr
, "talloc_zero failed (emax: %u)", pconn
->extra_max
);
1056 p
->wr_id
= pctx
->opts
.max_send_wr
+ pconn
->extra_max
;
1058 switch(pconn
->extra_max
) {
1059 case 1: DEBUG(2, ("warning: queue performed\n")); break;
1060 case 10: DEBUG(0, ("warning: queue reached 10\n")); break;
1061 case 100: DEBUG(0, ("warning: queue reached 100\n")); break;
1062 case 1000: DEBUG(0, ("warning: queue reached 1000\n")); break;
1066 DLIST_REMOVE(pconn
->extra_avail
, p
);
1068 p
->msg_large
= ibw_alloc_mr(pctx
, pconn
, len
+ sizeof(uint32_t), &p
->mr_large
);
1069 if (!p
->msg_large
) {
1070 sprintf(ibw_lasterr
, "ibw_alloc_mr#2 failed");
1073 *buf
= (void *)(p
->msg_large
+ sizeof(uint32_t));
1080 DEBUG(0, ("ibw_alloc_send_buf error: %s\n", ibw_lasterr
));
1085 int ibw_send(struct ibw_conn
*conn
, void *buf
, void *key
, uint32_t len
)
1087 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
1088 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
1089 struct ibw_wr
*p
= talloc_get_type(key
, struct ibw_wr
);
1092 *((uint32_t *)buf
) = htonl(len
);
1094 /* can we send it right now? */
1095 if (pconn
->wr_sent
<=pctx
->opts
.max_send_wr
) {
1096 struct ibv_sge list
= {
1097 .addr
= (uintptr_t) NULL
,
1101 struct ibv_send_wr wr
= {
1102 .wr_id
= p
->wr_id
+ pctx
->opts
.max_recv_wr
,
1105 .opcode
= IBV_WR_SEND
,
1106 .send_flags
= IBV_SEND_SIGNALED
,
1108 struct ibv_send_wr
*bad_wr
;
1110 DEBUG(10, ("ibw_wc_send#1(cmid: %u, wrid: %u, n: %d)\n",
1111 (uint32_t)pconn
->cm_id
, (uint32_t)wr
.wr_id
, len
));
1113 if (p
->msg_large
==NULL
) {
1114 list
.lkey
= pconn
->mr_send
->lkey
;
1115 list
.addr
= (uintptr_t) p
->msg
;
1117 assert(p
->mr_large
!=NULL
);
1118 list
.lkey
= p
->mr_large
->lkey
;
1119 list
.addr
= (uintptr_t) p
->msg_large
;
1122 rc
= ibv_post_send(pconn
->cm_id
->qp
, &wr
, &bad_wr
);
1124 sprintf(ibw_lasterr
, "ibv_post_send error %d (%d)\n",
1125 rc
, pconn
->wr_sent
);
1126 DEBUG(0, (ibw_lasterr
));
1129 if (p
->wr_id
>=pctx
->opts
.max_send_wr
) {
1130 /* we don't have prepared index for this, so that
1131 * we will have to find this later on */
1132 DLIST_ADD(pconn
->extra_sent
, p
);
1137 } /* else put the request into our own queue: */
1139 DEBUG(10, ("ibw_wc_send#2(cmid: %u, len: %u)\n", (uint32_t)pconn
->cm_id
, len
));
1141 /* to be sent by ibw_wc_send */
1142 DLIST_ADD_END(pconn
->queue
, p
, struct ibw_wr
*); /* TODO: optimize */
1147 int ibw_cancel_send_buf(struct ibw_conn
*conn
, void *buf
, void *key
)
1149 struct ibw_ctx_priv
*pctx
= talloc_get_type(conn
->ctx
->internal
, struct ibw_ctx_priv
);
1150 struct ibw_conn_priv
*pconn
= talloc_get_type(conn
->internal
, struct ibw_conn_priv
);
1151 struct ibw_wr
*p
= talloc_get_type(key
, struct ibw_wr
);
1158 ibw_free_mr(&p
->msg_large
, &p
->mr_large
);
1161 if (p
->wr_id
< pctx
->opts
.max_send_wr
) {
1162 DEBUG(10, ("ibw_cancel_send_buf#1 %u", (int)p
->wr_id
));
1163 DLIST_REMOVE(pconn
->wr_list_used
, p
);
1164 DLIST_ADD(pconn
->wr_list_avail
, p
);
1165 } else { /* "extra" packet */
1166 DEBUG(10, ("ibw_cancel_send_buf#2 %u", (int)p
->wr_id
));
1167 DLIST_ADD(pconn
->extra_avail
, p
);
1173 const char *ibw_getLastError(void)