2 * QEMU paravirtual RDMA - rdmacm-mux implementation
4 * Copyright (C) 2018 Oracle
5 * Copyright (C) 2018 Red Hat Inc
8 * Yuval Shaia <yuval.shaia@oracle.com>
9 * Marcel Apfelbaum <marcel@redhat.com>
11 * This work is licensed under the terms of the GNU GPL, version 2 or later.
12 * See the COPYING file in the top-level directory.
16 #include "qemu/osdep.h"
18 #include <sys/ioctl.h>
22 #include <infiniband/verbs.h>
23 #include <infiniband/umad.h>
24 #include <infiniband/umad_types.h>
25 #include <infiniband/umad_sa.h>
26 #include <infiniband/umad_cm.h>
28 #include "rdmacm-mux.h"
31 #define COMMID_TTL 2 /* How many SCALE_US a context of MAD session is saved */
32 #define SLEEP_SECS 5 /* This is used both in poll() and thread */
33 #define SERVER_LISTEN_BACKLOG 10
34 #define MAX_CLIENTS 4096
35 #define MAD_RMPP_VERSION 0
36 #define MAD_METHOD_MASK0 0x8
38 #define IB_USER_MAD_LONGS_PER_METHOD_MASK (128 / (8 * sizeof(long)))
40 #define CM_REQ_DGID_POS 80
41 #define CM_SIDR_REQ_DGID_POS 44
43 /* The below can be override by command line parameter */
44 #define UNIX_SOCKET_PATH "/var/run/rdmacm-mux"
45 /* Has format %s-%s-%d" <path>-<rdma-dev--name>-<port> */
46 #define SOCKET_PATH_MAX (PATH_MAX - NAME_MAX - sizeof(int) - 2)
47 #define RDMA_PORT_NUM 1
49 typedef struct RdmaCmServerArgs
{
50 char unix_socket_path
[PATH_MAX
];
51 char rdma_dev_name
[NAME_MAX
];
55 typedef struct CommId2FdEntry
{
57 int ttl
; /* Initialized to 2, decrement each timeout, entry delete when 0 */
61 typedef struct RdmaCmUMadAgent
{
64 GHashTable
*gid2fd
; /* Used to find fd of a given gid */
65 GHashTable
*commid2fd
; /* Used to find fd on of a given comm_id */
68 typedef struct RdmaCmServer
{
70 RdmaCMServerArgs args
;
71 struct pollfd fds
[MAX_CLIENTS
];
73 RdmaCmUMadAgent umad_agent
;
74 pthread_t umad_recv_thread
;
75 pthread_rwlock_t lock
;
78 static RdmaCMServer server
= {0};
80 static void usage(const char *progname
)
82 printf("Usage: %s [OPTION]...\n"
83 "Start a RDMA-CM multiplexer\n"
85 "\t-h Show this help\n"
86 "\t-d rdma-device-name Name of RDMA device to register with\n"
87 "\t-s unix-socket-path Path to unix socket to listen on (default %s)\n"
88 "\t-p rdma-device-port Port number of RDMA device to register with (default %d)\n",
89 progname
, UNIX_SOCKET_PATH
, RDMA_PORT_NUM
);
92 static void help(const char *progname
)
94 fprintf(stderr
, "Try '%s -h' for more information.\n", progname
);
97 static void parse_args(int argc
, char *argv
[])
100 char unix_socket_path
[SOCKET_PATH_MAX
];
102 strcpy(server
.args
.rdma_dev_name
, "");
103 strcpy(unix_socket_path
, UNIX_SOCKET_PATH
);
104 server
.args
.rdma_port_num
= RDMA_PORT_NUM
;
106 while ((c
= getopt(argc
, argv
, "hs:d:p:")) != -1) {
113 strncpy(server
.args
.rdma_dev_name
, optarg
, NAME_MAX
- 1);
117 /* This is temporary, final name will build below */
118 strncpy(unix_socket_path
, optarg
, SOCKET_PATH_MAX
- 1);
122 server
.args
.rdma_port_num
= atoi(optarg
);
131 if (!strcmp(server
.args
.rdma_dev_name
, "")) {
132 fprintf(stderr
, "Missing RDMA device name\n");
137 /* Build unique unix-socket file name */
138 snprintf(server
.args
.unix_socket_path
, PATH_MAX
, "%s-%s-%d",
139 unix_socket_path
, server
.args
.rdma_dev_name
,
140 server
.args
.rdma_port_num
);
142 syslog(LOG_INFO
, "unix_socket_path=%s", server
.args
.unix_socket_path
);
143 syslog(LOG_INFO
, "rdma-device-name=%s", server
.args
.rdma_dev_name
);
144 syslog(LOG_INFO
, "rdma-device-port=%d", server
.args
.rdma_port_num
);
147 static void hash_tbl_alloc(void)
150 server
.umad_agent
.gid2fd
= g_hash_table_new_full(g_int64_hash
,
153 server
.umad_agent
.commid2fd
= g_hash_table_new_full(g_int_hash
,
158 static void hash_tbl_free(void)
160 if (server
.umad_agent
.commid2fd
) {
161 g_hash_table_destroy(server
.umad_agent
.commid2fd
);
163 if (server
.umad_agent
.gid2fd
) {
164 g_hash_table_destroy(server
.umad_agent
.gid2fd
);
169 static int _hash_tbl_search_fd_by_ifid(__be64
*gid_ifid
)
173 fd
= g_hash_table_lookup(server
.umad_agent
.gid2fd
, gid_ifid
);
176 *gid_ifid
|= 0x00000000ffff0000;
177 fd
= g_hash_table_lookup(server
.umad_agent
.gid2fd
, gid_ifid
);
183 static int hash_tbl_search_fd_by_ifid(int *fd
, __be64
*gid_ifid
)
185 pthread_rwlock_rdlock(&server
.lock
);
186 *fd
= _hash_tbl_search_fd_by_ifid(gid_ifid
);
187 pthread_rwlock_unlock(&server
.lock
);
190 syslog(LOG_WARNING
, "Can't find matching for ifid 0x%llx\n", *gid_ifid
);
197 static int hash_tbl_search_fd_by_comm_id(uint32_t comm_id
, int *fd
,
202 pthread_rwlock_rdlock(&server
.lock
);
203 fde
= g_hash_table_lookup(server
.umad_agent
.commid2fd
, &comm_id
);
204 pthread_rwlock_unlock(&server
.lock
);
207 syslog(LOG_WARNING
, "Can't find matching for comm_id 0x%x\n", comm_id
);
212 *gid_idid
= fde
->gid_ifid
;
217 static RdmaCmMuxErrCode
add_fd_ifid_pair(int fd
, __be64 gid_ifid
)
221 pthread_rwlock_wrlock(&server
.lock
);
223 fd1
= _hash_tbl_search_fd_by_ifid(&gid_ifid
);
224 if (fd1
) { /* record already exist - an error */
225 pthread_rwlock_unlock(&server
.lock
);
226 return fd
== fd1
? RDMACM_MUX_ERR_CODE_EEXIST
:
227 RDMACM_MUX_ERR_CODE_EACCES
;
230 g_hash_table_insert(server
.umad_agent
.gid2fd
, g_memdup(&gid_ifid
,
231 sizeof(gid_ifid
)), g_memdup(&fd
, sizeof(fd
)));
233 pthread_rwlock_unlock(&server
.lock
);
235 syslog(LOG_INFO
, "0x%lx registered on socket %d",
236 be64toh((uint64_t)gid_ifid
), fd
);
238 return RDMACM_MUX_ERR_CODE_OK
;
241 static RdmaCmMuxErrCode
delete_fd_ifid_pair(int fd
, __be64 gid_ifid
)
245 pthread_rwlock_wrlock(&server
.lock
);
247 fd1
= _hash_tbl_search_fd_by_ifid(&gid_ifid
);
248 if (!fd1
) { /* record not exist - an error */
249 pthread_rwlock_unlock(&server
.lock
);
250 return RDMACM_MUX_ERR_CODE_ENOTFOUND
;
253 g_hash_table_remove(server
.umad_agent
.gid2fd
, g_memdup(&gid_ifid
,
255 pthread_rwlock_unlock(&server
.lock
);
257 syslog(LOG_INFO
, "0x%lx unregistered on socket %d",
258 be64toh((uint64_t)gid_ifid
), fd
);
260 return RDMACM_MUX_ERR_CODE_OK
;
263 static void hash_tbl_save_fd_comm_id_pair(int fd
, uint32_t comm_id
,
266 CommId2FdEntry fde
= {fd
, COMMID_TTL
, gid_ifid
};
268 pthread_rwlock_wrlock(&server
.lock
);
269 g_hash_table_insert(server
.umad_agent
.commid2fd
,
270 g_memdup(&comm_id
, sizeof(comm_id
)),
271 g_memdup(&fde
, sizeof(fde
)));
272 pthread_rwlock_unlock(&server
.lock
);
275 static gboolean
remove_old_comm_ids(gpointer key
, gpointer value
,
278 CommId2FdEntry
*fde
= (CommId2FdEntry
*)value
;
283 static gboolean
remove_entry_from_gid2fd(gpointer key
, gpointer value
,
286 if (*(int *)value
== *(int *)user_data
) {
287 syslog(LOG_INFO
, "0x%lx unregistered on socket %d",
288 be64toh(*(uint64_t *)key
), *(int *)value
);
295 static void hash_tbl_remove_fd_ifid_pair(int fd
)
297 pthread_rwlock_wrlock(&server
.lock
);
298 g_hash_table_foreach_remove(server
.umad_agent
.gid2fd
,
299 remove_entry_from_gid2fd
, (gpointer
)&fd
);
300 pthread_rwlock_unlock(&server
.lock
);
303 static int get_fd(const char *mad
, int umad_len
, int *fd
, __be64
*gid_ifid
)
305 struct umad_hdr
*hdr
= (struct umad_hdr
*)mad
;
306 char *data
= (char *)hdr
+ sizeof(*hdr
);
308 uint16_t attr_id
= be16toh(hdr
->attr_id
);
311 if (umad_len
<= sizeof(*hdr
)) {
313 syslog(LOG_DEBUG
, "Ignoring MAD packets with header only\n");
318 case UMAD_CM_ATTR_REQ
:
319 if (unlikely(umad_len
< sizeof(*hdr
) + CM_REQ_DGID_POS
+
320 sizeof(*gid_ifid
))) {
323 "Invalid MAD packet size (%d) for attr_id 0x%x\n", umad_len
,
327 memcpy(gid_ifid
, data
+ CM_REQ_DGID_POS
, sizeof(*gid_ifid
));
328 rc
= hash_tbl_search_fd_by_ifid(fd
, gid_ifid
);
331 case UMAD_CM_ATTR_SIDR_REQ
:
332 if (unlikely(umad_len
< sizeof(*hdr
) + CM_SIDR_REQ_DGID_POS
+
333 sizeof(*gid_ifid
))) {
336 "Invalid MAD packet size (%d) for attr_id 0x%x\n", umad_len
,
340 memcpy(gid_ifid
, data
+ CM_SIDR_REQ_DGID_POS
, sizeof(*gid_ifid
));
341 rc
= hash_tbl_search_fd_by_ifid(fd
, gid_ifid
);
344 case UMAD_CM_ATTR_REP
:
346 case UMAD_CM_ATTR_REJ
:
348 case UMAD_CM_ATTR_DREQ
:
350 case UMAD_CM_ATTR_DREP
:
352 case UMAD_CM_ATTR_RTU
:
353 data
+= sizeof(comm_id
);
355 case UMAD_CM_ATTR_SIDR_REP
:
356 if (unlikely(umad_len
< sizeof(*hdr
) + sizeof(comm_id
))) {
359 "Invalid MAD packet size (%d) for attr_id 0x%x\n", umad_len
,
363 memcpy(&comm_id
, data
, sizeof(comm_id
));
365 rc
= hash_tbl_search_fd_by_comm_id(comm_id
, fd
, gid_ifid
);
371 syslog(LOG_WARNING
, "Unsupported attr_id 0x%x\n", attr_id
);
374 syslog(LOG_DEBUG
, "mad_to_vm: %d 0x%x 0x%x\n", *fd
, attr_id
, comm_id
);
380 static void *umad_recv_thread_func(void *args
)
383 RdmaCmMuxMsg msg
= {};
386 msg
.hdr
.msg_type
= RDMACM_MUX_MSG_TYPE_REQ
;
387 msg
.hdr
.op_code
= RDMACM_MUX_OP_CODE_MAD
;
391 msg
.umad_len
= sizeof(msg
.umad
.mad
);
392 rc
= umad_recv(server
.umad_agent
.port_id
, &msg
.umad
, &msg
.umad_len
,
393 SLEEP_SECS
* SCALE_US
);
394 if ((rc
== -EIO
) || (rc
== -EINVAL
)) {
395 syslog(LOG_CRIT
, "Fatal error while trying to read MAD");
398 if (rc
== -ETIMEDOUT
) {
399 g_hash_table_foreach_remove(server
.umad_agent
.commid2fd
,
400 remove_old_comm_ids
, NULL
);
402 } while (rc
&& server
.run
);
405 rc
= get_fd(msg
.umad
.mad
, msg
.umad_len
, &fd
,
406 &msg
.hdr
.sgid
.global
.interface_id
);
411 send(fd
, &msg
, sizeof(msg
), 0);
418 static int read_and_process(int fd
)
421 RdmaCmMuxMsg msg
= {};
422 struct umad_hdr
*hdr
;
423 uint32_t *comm_id
= 0;
426 rc
= recv(fd
, &msg
, sizeof(msg
), 0);
427 syslog(LOG_DEBUG
, "Socket %d, recv %d\n", fd
, rc
);
429 if (rc
< 0 && errno
!= EWOULDBLOCK
) {
430 syslog(LOG_ERR
, "Fail to read from socket %d\n", fd
);
435 syslog(LOG_ERR
, "Fail to read from socket %d\n", fd
);
439 if (msg
.hdr
.msg_type
!= RDMACM_MUX_MSG_TYPE_REQ
) {
440 syslog(LOG_WARNING
, "Got non-request message (%d) from socket %d\n",
441 msg
.hdr
.msg_type
, fd
);
445 switch (msg
.hdr
.op_code
) {
446 case RDMACM_MUX_OP_CODE_REG
:
447 rc
= add_fd_ifid_pair(fd
, msg
.hdr
.sgid
.global
.interface_id
);
450 case RDMACM_MUX_OP_CODE_UNREG
:
451 rc
= delete_fd_ifid_pair(fd
, msg
.hdr
.sgid
.global
.interface_id
);
454 case RDMACM_MUX_OP_CODE_MAD
:
455 /* If this is REQ or REP then store the pair comm_id,fd to be later
456 * used for other messages where gid is unknown */
457 hdr
= (struct umad_hdr
*)msg
.umad
.mad
;
458 attr_id
= be16toh(hdr
->attr_id
);
459 if ((attr_id
== UMAD_CM_ATTR_REQ
) || (attr_id
== UMAD_CM_ATTR_DREQ
) ||
460 (attr_id
== UMAD_CM_ATTR_SIDR_REQ
) ||
461 (attr_id
== UMAD_CM_ATTR_REP
) || (attr_id
== UMAD_CM_ATTR_DREP
)) {
462 comm_id
= (uint32_t *)(msg
.umad
.mad
+ sizeof(*hdr
));
463 hash_tbl_save_fd_comm_id_pair(fd
, *comm_id
,
464 msg
.hdr
.sgid
.global
.interface_id
);
467 syslog(LOG_DEBUG
, "vm_to_mad: %d 0x%x 0x%x\n", fd
, attr_id
,
468 comm_id
? *comm_id
: 0);
469 rc
= umad_send(server
.umad_agent
.port_id
, server
.umad_agent
.agent_id
,
470 &msg
.umad
, msg
.umad_len
, 1, 0);
473 "Fail to send MAD message (0x%x) from socket %d, err=%d",
479 syslog(LOG_ERR
, "Got invalid op_code (%d) from socket %d",
480 msg
.hdr
.msg_type
, fd
);
481 rc
= RDMACM_MUX_ERR_CODE_EINVAL
;
484 msg
.hdr
.msg_type
= RDMACM_MUX_MSG_TYPE_RESP
;
485 msg
.hdr
.err_code
= rc
;
486 rc
= send(fd
, &msg
, sizeof(msg
), 0);
488 return rc
== sizeof(msg
) ? 0 : -EPIPE
;
491 static int accept_all(void)
495 pthread_rwlock_wrlock(&server
.lock
);
498 if ((server
.nfds
+ 1) > MAX_CLIENTS
) {
499 syslog(LOG_WARNING
, "Too many clients (%d)", server
.nfds
);
504 fd
= accept(server
.fds
[0].fd
, NULL
, NULL
);
506 if (errno
!= EWOULDBLOCK
) {
507 syslog(LOG_WARNING
, "accept() failed");
514 syslog(LOG_INFO
, "Client connected on socket %d\n", fd
);
515 server
.fds
[server
.nfds
].fd
= fd
;
516 server
.fds
[server
.nfds
].events
= POLLIN
;
521 pthread_rwlock_unlock(&server
.lock
);
525 static void compress_fds(void)
530 pthread_rwlock_wrlock(&server
.lock
);
532 for (i
= 1; i
< server
.nfds
; i
++) {
533 if (!server
.fds
[i
].fd
) {
535 for (j
= i
; j
< server
.nfds
- 1; j
++) {
536 server
.fds
[j
] = server
.fds
[j
+ 1];
541 server
.nfds
-= closed
;
543 pthread_rwlock_unlock(&server
.lock
);
546 static void close_fd(int idx
)
548 close(server
.fds
[idx
].fd
);
549 syslog(LOG_INFO
, "Socket %d closed\n", server
.fds
[idx
].fd
);
550 hash_tbl_remove_fd_ifid_pair(server
.fds
[idx
].fd
);
551 server
.fds
[idx
].fd
= 0;
554 static void run(void)
557 bool compress
= false;
559 syslog(LOG_INFO
, "Service started");
562 rc
= poll(server
.fds
, server
.nfds
, SLEEP_SECS
* SCALE_US
);
564 if (errno
!= EINTR
) {
565 syslog(LOG_WARNING
, "poll() failed");
575 for (i
= 0; i
< nfds
; i
++) {
576 syslog(LOG_DEBUG
, "pollfd[%d]: revents 0x%x, events 0x%x\n", i
,
577 server
.fds
[i
].revents
, server
.fds
[i
].events
);
578 if (server
.fds
[i
].revents
== 0) {
582 if (server
.fds
[i
].revents
!= POLLIN
) {
584 syslog(LOG_NOTICE
, "Unexpected poll() event (0x%x)\n",
585 server
.fds
[i
].revents
);
599 rc
= read_and_process(server
.fds
[i
].fd
);
614 static void fini_listener(void)
618 if (server
.fds
[0].fd
<= 0) {
622 for (i
= server
.nfds
- 1; i
>= 0; i
--) {
623 if (server
.fds
[i
].fd
) {
624 close(server
.fds
[i
].fd
);
628 unlink(server
.args
.unix_socket_path
);
631 static void fini_umad(void)
633 if (server
.umad_agent
.agent_id
) {
634 umad_unregister(server
.umad_agent
.port_id
, server
.umad_agent
.agent_id
);
637 if (server
.umad_agent
.port_id
) {
638 umad_close_port(server
.umad_agent
.port_id
);
644 static void fini(void)
646 if (server
.umad_recv_thread
) {
647 pthread_join(server
.umad_recv_thread
, NULL
);
648 server
.umad_recv_thread
= 0;
652 pthread_rwlock_destroy(&server
.lock
);
654 syslog(LOG_INFO
, "Service going down");
657 static int init_listener(void)
659 struct sockaddr_un sun
;
662 server
.fds
[0].fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
663 if (server
.fds
[0].fd
< 0) {
664 syslog(LOG_ALERT
, "socket() failed");
668 rc
= setsockopt(server
.fds
[0].fd
, SOL_SOCKET
, SO_REUSEADDR
, (char *)&on
,
671 syslog(LOG_ALERT
, "setsockopt() failed");
676 rc
= ioctl(server
.fds
[0].fd
, FIONBIO
, (char *)&on
);
678 syslog(LOG_ALERT
, "ioctl() failed");
683 if (strlen(server
.args
.unix_socket_path
) >= sizeof(sun
.sun_path
)) {
685 "Invalid unix_socket_path, size must be less than %ld\n",
686 sizeof(sun
.sun_path
));
691 sun
.sun_family
= AF_UNIX
;
692 rc
= snprintf(sun
.sun_path
, sizeof(sun
.sun_path
), "%s",
693 server
.args
.unix_socket_path
);
694 if (rc
< 0 || rc
>= sizeof(sun
.sun_path
)) {
695 syslog(LOG_ALERT
, "Could not copy unix socket path\n");
700 rc
= bind(server
.fds
[0].fd
, (struct sockaddr
*)&sun
, sizeof(sun
));
702 syslog(LOG_ALERT
, "bind() failed");
707 rc
= listen(server
.fds
[0].fd
, SERVER_LISTEN_BACKLOG
);
709 syslog(LOG_ALERT
, "listen() failed");
714 server
.fds
[0].events
= POLLIN
;
721 close(server
.fds
[0].fd
);
725 static int init_umad(void)
727 long method_mask
[IB_USER_MAD_LONGS_PER_METHOD_MASK
];
729 server
.umad_agent
.port_id
= umad_open_port(server
.args
.rdma_dev_name
,
730 server
.args
.rdma_port_num
);
732 if (server
.umad_agent
.port_id
< 0) {
733 syslog(LOG_WARNING
, "umad_open_port() failed");
737 memset(&method_mask
, 0, sizeof(method_mask
));
738 method_mask
[0] = MAD_METHOD_MASK0
;
739 server
.umad_agent
.agent_id
= umad_register(server
.umad_agent
.port_id
,
741 UMAD_SA_CLASS_VERSION
,
742 MAD_RMPP_VERSION
, method_mask
);
743 if (server
.umad_agent
.agent_id
< 0) {
744 syslog(LOG_WARNING
, "umad_register() failed");
753 static void signal_handler(int sig
, siginfo_t
*siginfo
, void *context
)
757 /* Prevent stop if clients are connected */
758 if (server
.nfds
!= 1) {
761 "Can't stop while active client exist, resend SIGINT to overid");
775 static int init(void)
778 struct sigaction sig
= {};
780 rc
= init_listener();
790 pthread_rwlock_init(&server
.lock
, 0);
792 rc
= pthread_create(&server
.umad_recv_thread
, NULL
, umad_recv_thread_func
,
795 syslog(LOG_ERR
, "Fail to create UMAD receiver thread (%d)\n", rc
);
799 sig
.sa_sigaction
= &signal_handler
;
800 sig
.sa_flags
= SA_SIGINFO
;
801 rc
= sigaction(SIGINT
, &sig
, NULL
);
803 syslog(LOG_ERR
, "Fail to install SIGINT handler (%d)\n", errno
);
810 int main(int argc
, char *argv
[])
814 memset(&server
, 0, sizeof(server
));
816 parse_args(argc
, argv
);
820 syslog(LOG_ERR
, "Fail to initialize server (%d)\n", rc
);