watcher_demo: add more listeners to the mix
[raindrops.git] / ext / raindrops / linux_inet_diag.c
blobf43a8c96a6bc0dd21928d43f5642a1dd47767095
1 #include <ruby.h>
2 #ifdef HAVE_RUBY_ST_H
3 # include <ruby/st.h>
4 #else
5 # include <st.h>
6 #endif
7 #include "my_fileno.h"
8 #ifdef __linux__
10 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
11 #ifndef RSTRING_LEN
12 # define RSTRING_LEN(s) (RSTRING(s)->len)
13 #endif
15 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
16 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
17 # include <rubysig.h>
18 typedef void rb_unblock_function_t(void *);
19 typedef VALUE rb_blocking_function_t(void *);
20 static VALUE
21 rb_thread_blocking_region(
22 rb_blocking_function_t *func, void *data1,
23 rb_unblock_function_t *ubf, void *data2)
25 VALUE rv;
27 TRAP_BEG;
28 rv = func(data1);
29 TRAP_END;
31 return rv;
33 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
35 #include <assert.h>
36 #include <errno.h>
37 #include <sys/socket.h>
38 #include <sys/types.h>
39 #include <netdb.h>
40 #include <unistd.h>
41 #include <string.h>
42 #include <asm/types.h>
43 #include <netinet/in.h>
44 #include <arpa/inet.h>
45 #include <netinet/tcp.h>
46 #include <linux/netlink.h>
47 #include <linux/rtnetlink.h>
48 #include <linux/inet_diag.h>
50 static size_t page_size;
51 static unsigned g_seq;
52 static VALUE cListenStats, cIDSock;
53 static ID id_new;
55 struct listen_stats {
56 uint32_t active;
57 uint32_t listener_p:1;
58 uint32_t queued:31;
61 #define OPLEN (sizeof(struct inet_diag_bc_op) + \
62 sizeof(struct inet_diag_hostcond) + \
63 sizeof(struct sockaddr_storage))
65 struct nogvl_args {
66 st_table *table;
67 struct iovec iov[3]; /* last iov holds inet_diag bytecode */
68 struct listen_stats stats;
69 int fd;
73 * call-seq:
74 * Raindrops::InetDiagSocket.new -> Socket
76 * Creates a new Socket object for the netlink inet_diag facility
78 static VALUE ids_s_new(VALUE klass)
80 VALUE argv[3];
81 argv[0] = INT2NUM(AF_NETLINK);
82 argv[1] = INT2NUM(SOCK_RAW);
83 argv[2] = INT2NUM(NETLINK_INET_DIAG);
85 return rb_call_super(3, argv);
88 /* creates a Ruby ListenStats Struct based on our internal listen_stats */
89 static VALUE rb_listen_stats(struct listen_stats *stats)
91 VALUE active = UINT2NUM(stats->active);
92 VALUE queued = UINT2NUM(stats->queued);
94 return rb_struct_new(cListenStats, active, queued);
97 static int st_free_data(st_data_t key, st_data_t value, st_data_t ignored)
99 xfree((void *)key);
100 xfree((void *)value);
102 return ST_DELETE;
105 static int st_to_hash(st_data_t key, st_data_t value, VALUE hash)
107 struct listen_stats *stats = (struct listen_stats *)value;
109 if (stats->listener_p) {
110 VALUE k = rb_str_new2((const char *)key);
111 VALUE v = rb_listen_stats(stats);
113 OBJ_FREEZE(k);
114 rb_hash_aset(hash, k, v);
116 return st_free_data(key, value, 0);
119 static int st_AND_hash(st_data_t key, st_data_t value, VALUE hash)
121 struct listen_stats *stats = (struct listen_stats *)value;
123 if (stats->listener_p) {
124 VALUE k = rb_str_new2((const char *)key);
126 if (rb_hash_lookup(hash, k) == Qtrue) {
127 VALUE v = rb_listen_stats(stats);
128 OBJ_FREEZE(k);
129 rb_hash_aset(hash, k, v);
132 return st_free_data(key, value, 0);
135 static const char *addr_any(sa_family_t family)
137 static const char ipv4[] = "0.0.0.0";
138 static const char ipv6[] = "[::]";
140 if (family == AF_INET)
141 return ipv4;
142 assert(family == AF_INET6 && "unknown family");
143 return ipv6;
146 static void bug_warn(void)
148 fprintf(stderr, "Please report how you produced this at "\
149 "raindrops@librelist.com\n");
150 fflush(stderr);
153 static struct listen_stats *stats_for(st_table *table, struct inet_diag_msg *r)
155 char *key, *port, *old_key;
156 size_t alloca_len;
157 struct listen_stats *stats;
158 size_t keylen;
159 size_t portlen = sizeof("65535");
160 struct sockaddr_storage ss = { 0 };
161 socklen_t len = sizeof(struct sockaddr_storage);
162 int rc;
163 int flags = NI_NUMERICHOST | NI_NUMERICSERV;
165 switch ((ss.ss_family = r->idiag_family)) {
166 case AF_INET: {
167 struct sockaddr_in *in = (struct sockaddr_in *)&ss;
169 in->sin_port = r->id.idiag_sport;
170 in->sin_addr.s_addr = r->id.idiag_src[0];
171 keylen = INET_ADDRSTRLEN;
172 alloca_len = keylen + 1 + portlen;
173 key = alloca(alloca_len);
174 key[keylen] = 0; /* will be ':' later */
175 port = key + keylen + 1;
176 rc = getnameinfo((struct sockaddr *)&ss, len,
177 key, keylen, port, portlen, flags);
178 break;
180 case AF_INET6: {
181 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss;
182 in6->sin6_port = r->id.idiag_sport;
183 memcpy(&in6->sin6_addr.in6_u.u6_addr32,
184 &r->id.idiag_src, sizeof(__be32[4]));
185 keylen = INET6_ADDRSTRLEN;
186 /* [ ] */
187 alloca_len = 1 + keylen + 1 + 1 + portlen;
188 key = alloca(alloca_len);
189 *key = '[';
190 key[1 + keylen + 1] = 0; /* will be ':' later */
191 port = 1 + key + keylen + 1 + 1;
192 rc = getnameinfo((struct sockaddr *)&ss, len,
193 key + 1, keylen, port, portlen, flags);
194 break;
196 default:
197 assert(0 && "unsupported address family, could that be IPv7?!");
199 if (rc != 0) {
200 fprintf(stderr, "BUG: getnameinfo: %s\n", gai_strerror(rc));
201 bug_warn();
202 *key = 0;
205 keylen = strlen(key);
206 portlen = strlen(port);
208 switch (ss.ss_family) {
209 case AF_INET:
210 key[keylen] = ':';
211 memmove(key + keylen + 1, port, portlen + 1);
212 break;
213 case AF_INET6:
214 key[keylen] = ']';
215 key[keylen + 1] = ':';
216 memmove(key + keylen + 2, port, portlen + 1);
217 keylen++;
218 break;
219 default:
220 assert(0 && "unsupported address family, could that be IPv7?!");
223 if (st_lookup(table, (st_data_t)key, (st_data_t *)&stats))
224 return stats;
226 old_key = key;
228 if (r->idiag_state == TCP_ESTABLISHED) {
229 int n = snprintf(key, alloca_len, "%s:%u",
230 addr_any(ss.ss_family),
231 ntohs(r->id.idiag_sport));
232 if (n <= 0) {
233 fprintf(stderr, "BUG: snprintf: %d\n", n);
234 bug_warn();
236 if (st_lookup(table, (st_data_t)key, (st_data_t *)&stats))
237 return stats;
238 if (n <= 0) {
239 key = xmalloc(1);
240 *key = '\0';
241 } else {
242 old_key = key;
243 key = xmalloc(n + 1);
244 memcpy(key, old_key, n + 1);
246 } else {
247 key = xmalloc(keylen + 1 + portlen + 1);
248 memcpy(key, old_key, keylen + 1 + portlen + 1);
250 stats = xcalloc(1, sizeof(struct listen_stats));
251 st_insert(table, (st_data_t)key, (st_data_t)stats);
252 return stats;
255 static void table_incr_active(st_table *table, struct inet_diag_msg *r)
257 struct listen_stats *stats = stats_for(table, r);
258 ++stats->active;
261 static void table_set_queued(st_table *table, struct inet_diag_msg *r)
263 struct listen_stats *stats = stats_for(table, r);
264 stats->listener_p = 1;
265 stats->queued = r->idiag_rqueue;
268 /* inner loop of inet_diag, called for every socket returned by netlink */
269 static inline void r_acc(struct nogvl_args *args, struct inet_diag_msg *r)
272 * inode == 0 means the connection is still in the listen queue
273 * and has not yet been accept()-ed by the server. The
274 * inet_diag bytecode cannot filter this for us.
276 if (r->idiag_inode == 0)
277 return;
278 if (r->idiag_state == TCP_ESTABLISHED) {
279 if (args->table)
280 table_incr_active(args->table, r);
281 else
282 args->stats.active++;
283 } else { /* if (r->idiag_state == TCP_LISTEN) */
284 if (args->table)
285 table_set_queued(args->table, r);
286 else
287 args->stats.queued = r->idiag_rqueue;
290 * we wont get anything else because of the idiag_states filter
294 static const char err_sendmsg[] = "sendmsg";
295 static const char err_recvmsg[] = "recvmsg";
296 static const char err_nlmsg[] = "nlmsg";
298 struct diag_req {
299 struct nlmsghdr nlh;
300 struct inet_diag_req r;
303 static void prep_msghdr(
304 struct msghdr *msg,
305 struct nogvl_args *args,
306 struct sockaddr_nl *nladdr,
307 size_t iovlen)
309 memset(msg, 0, sizeof(struct msghdr));
310 msg->msg_name = (void *)nladdr;
311 msg->msg_namelen = sizeof(struct sockaddr_nl);
312 msg->msg_iov = args->iov;
313 msg->msg_iovlen = iovlen;
316 static void prep_diag_args(
317 struct nogvl_args *args,
318 struct sockaddr_nl *nladdr,
319 struct rtattr *rta,
320 struct diag_req *req,
321 struct msghdr *msg)
323 memset(req, 0, sizeof(struct diag_req));
324 memset(nladdr, 0, sizeof(struct sockaddr_nl));
326 nladdr->nl_family = AF_NETLINK;
328 req->nlh.nlmsg_len = sizeof(struct diag_req) +
329 RTA_LENGTH(args->iov[2].iov_len);
330 req->nlh.nlmsg_type = TCPDIAG_GETSOCK;
331 req->nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST;
332 req->nlh.nlmsg_pid = getpid();
333 req->r.idiag_states = (1<<TCP_ESTABLISHED) | (1<<TCP_LISTEN);
334 rta->rta_type = INET_DIAG_REQ_BYTECODE;
335 rta->rta_len = RTA_LENGTH(args->iov[2].iov_len);
337 args->iov[0].iov_base = req;
338 args->iov[0].iov_len = sizeof(struct diag_req);
339 args->iov[1].iov_base = rta;
340 args->iov[1].iov_len = sizeof(struct rtattr);
342 prep_msghdr(msg, args, nladdr, 3);
345 static void prep_recvmsg_buf(struct nogvl_args *args)
347 /* reuse buffer that was allocated for bytecode */
348 args->iov[0].iov_len = page_size;
349 args->iov[0].iov_base = args->iov[2].iov_base;
352 /* does the inet_diag stuff with netlink(), this is called w/o GVL */
353 static VALUE diag(void *ptr)
355 struct nogvl_args *args = ptr;
356 struct sockaddr_nl nladdr;
357 struct rtattr rta;
358 struct diag_req req;
359 struct msghdr msg;
360 const char *err = NULL;
361 unsigned seq = ++g_seq;
363 prep_diag_args(args, &nladdr, &rta, &req, &msg);
364 req.nlh.nlmsg_seq = seq;
366 if (sendmsg(args->fd, &msg, 0) < 0) {
367 err = err_sendmsg;
368 goto out;
371 prep_recvmsg_buf(args);
373 while (1) {
374 ssize_t readed;
375 size_t r;
376 struct nlmsghdr *h = (struct nlmsghdr *)args->iov[0].iov_base;
378 prep_msghdr(&msg, args, &nladdr, 1);
379 readed = recvmsg(args->fd, &msg, 0);
380 if (readed < 0) {
381 if (errno == EINTR)
382 continue;
383 err = err_recvmsg;
384 goto out;
386 if (readed == 0)
387 goto out;
388 r = (size_t)readed;
389 for ( ; NLMSG_OK(h, r); h = NLMSG_NEXT(h, r)) {
390 if (h->nlmsg_seq != seq)
391 continue;
392 if (h->nlmsg_type == NLMSG_DONE)
393 goto out;
394 if (h->nlmsg_type == NLMSG_ERROR) {
395 err = err_nlmsg;
396 goto out;
398 r_acc(args, NLMSG_DATA(h));
401 out:
403 int save_errno = errno;
404 if (err && args->table) {
405 st_foreach(args->table, st_free_data, 0);
406 st_free_table(args->table);
408 errno = save_errno;
410 return (VALUE)err;
413 /* populates sockaddr_storage struct by parsing +addr+ */
414 static void parse_addr(struct sockaddr_storage *inet, VALUE addr)
416 char *host_ptr;
417 char *check;
418 char *colon = NULL;
419 char *rbracket = NULL;
420 void *dst;
421 long host_len;
422 int af, rc;
423 uint16_t *portdst;
424 unsigned long port;
426 Check_Type(addr, T_STRING);
427 host_ptr = StringValueCStr(addr);
428 host_len = RSTRING_LEN(addr);
429 if (*host_ptr == '[') { /* ipv6 address format (rfc2732) */
430 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)inet;
431 rbracket = memchr(host_ptr + 1, ']', host_len - 1);
433 if (rbracket == NULL)
434 rb_raise(rb_eArgError, "']' not found in IPv6 addr=%s",
435 host_ptr);
436 if (rbracket[1] != ':')
437 rb_raise(rb_eArgError, "':' not found in IPv6 addr=%s",
438 host_ptr);
439 colon = rbracket + 1;
440 host_ptr++;
441 *rbracket = 0;
442 inet->ss_family = af = AF_INET6;
443 dst = &in6->sin6_addr;
444 portdst = &in6->sin6_port;
445 } else { /* ipv4 */
446 struct sockaddr_in *in = (struct sockaddr_in *)inet;
447 colon = memchr(host_ptr, ':', host_len);
448 inet->ss_family = af = AF_INET;
449 dst = &in->sin_addr;
450 portdst = &in->sin_port;
453 if (!colon)
454 rb_raise(rb_eArgError, "port not found in: `%s'", host_ptr);
455 port = strtoul(colon + 1, &check, 10);
456 *colon = 0;
457 rc = inet_pton(af, host_ptr, dst);
458 *colon = ':';
459 if (rbracket) *rbracket = ']';
460 if (*check || ((uint16_t)port != port))
461 rb_raise(rb_eArgError, "invalid port: %s", colon + 1);
462 if (rc != 1)
463 rb_raise(rb_eArgError, "inet_pton failed for: `%s' with %d",
464 host_ptr, rc);
465 *portdst = ntohs((uint16_t)port);
468 /* generates inet_diag bytecode to match all addrs */
469 static void gen_bytecode_all(struct iovec *iov)
471 struct inet_diag_bc_op *op;
472 struct inet_diag_hostcond *cond;
474 /* iov_len was already set and base allocated in a parent function */
475 assert(iov->iov_len == OPLEN && iov->iov_base && "iov invalid");
476 op = iov->iov_base;
477 op->code = INET_DIAG_BC_S_COND;
478 op->yes = OPLEN;
479 op->no = sizeof(struct inet_diag_bc_op) + OPLEN;
480 cond = (struct inet_diag_hostcond *)(op + 1);
481 cond->family = AF_UNSPEC;
482 cond->port = -1;
483 cond->prefix_len = 0;
486 /* generates inet_diag bytecode to match a single addr */
487 static void gen_bytecode(struct iovec *iov, struct sockaddr_storage *inet)
489 struct inet_diag_bc_op *op;
490 struct inet_diag_hostcond *cond;
492 /* iov_len was already set and base allocated in a parent function */
493 assert(iov->iov_len == OPLEN && iov->iov_base && "iov invalid");
494 op = iov->iov_base;
495 op->code = INET_DIAG_BC_S_COND;
496 op->yes = OPLEN;
497 op->no = sizeof(struct inet_diag_bc_op) + OPLEN;
499 cond = (struct inet_diag_hostcond *)(op + 1);
500 cond->family = inet->ss_family;
501 switch (inet->ss_family) {
502 case AF_INET: {
503 struct sockaddr_in *in = (struct sockaddr_in *)inet;
505 cond->port = ntohs(in->sin_port);
506 cond->prefix_len = in->sin_addr.s_addr == 0 ? 0 :
507 sizeof(in->sin_addr.s_addr) * CHAR_BIT;
508 *cond->addr = in->sin_addr.s_addr;
510 break;
511 case AF_INET6: {
512 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)inet;
514 cond->port = ntohs(in6->sin6_port);
515 cond->prefix_len = memcmp(&in6addr_any, &in6->sin6_addr,
516 sizeof(struct in6_addr)) == 0 ?
517 0 : sizeof(in6->sin6_addr) * CHAR_BIT;
518 memcpy(&cond->addr, &in6->sin6_addr, sizeof(struct in6_addr));
520 break;
521 default:
522 assert(0 && "unsupported address family, could that be IPv7?!");
526 static void nl_errcheck(VALUE r)
528 const char *err = (const char *)r;
530 if (err) {
531 if (err == err_nlmsg)
532 rb_raise(rb_eRuntimeError, "NLMSG_ERROR");
533 else
534 rb_sys_fail(err);
538 static VALUE tcp_stats(struct nogvl_args *args, VALUE addr)
540 struct sockaddr_storage query_addr;
542 parse_addr(&query_addr, addr);
543 gen_bytecode(&args->iov[2], &query_addr);
545 memset(&args->stats, 0, sizeof(struct listen_stats));
546 nl_errcheck(rb_thread_blocking_region(diag, args, 0, 0));
548 return rb_listen_stats(&args->stats);
552 * call-seq:
553 * Raindrops::Linux.tcp_listener_stats([addrs[, sock]]) => hash
555 * If specified, +addr+ may be a string or array of strings representing
556 * listen addresses to filter for. Returns a hash with given addresses as
557 * keys and ListenStats objects as the values or a hash of all addresses.
559 * addrs = %w(0.0.0.0:80 127.0.0.1:8080)
561 * If +addr+ is nil or not specified, all (IPv4) addresses are returned.
562 * If +sock+ is specified, it should be a Raindrops::InetDiagSock object.
564 static VALUE tcp_listener_stats(int argc, VALUE *argv, VALUE self)
566 VALUE *ary;
567 long i;
568 VALUE rv = rb_hash_new();
569 struct nogvl_args args;
570 VALUE addrs, sock;
572 rb_scan_args(argc, argv, "02", &addrs, &sock);
575 * allocating page_size instead of OP_LEN since we'll reuse the
576 * buffer for recvmsg() later, we already checked for
577 * OPLEN <= page_size at initialization
579 args.iov[2].iov_len = OPLEN;
580 args.iov[2].iov_base = alloca(page_size);
581 args.table = NULL;
582 if (NIL_P(sock))
583 sock = rb_funcall(cIDSock, id_new, 0);
584 args.fd = my_fileno(sock);
586 switch (TYPE(addrs)) {
587 case T_STRING:
588 rb_hash_aset(rv, addrs, tcp_stats(&args, addrs));
589 return rv;
590 case T_ARRAY:
591 ary = RARRAY_PTR(addrs);
592 i = RARRAY_LEN(addrs);
593 if (i == 1) {
594 rb_hash_aset(rv, *ary, tcp_stats(&args, *ary));
595 return rv;
597 for (; --i >= 0; ary++) {
598 struct sockaddr_storage check;
600 parse_addr(&check, *ary);
601 rb_hash_aset(rv, *ary, Qtrue);
603 /* fall through */
604 case T_NIL:
605 args.table = st_init_strtable();
606 gen_bytecode_all(&args.iov[2]);
607 break;
608 default:
609 rb_raise(rb_eArgError,
610 "addr must be an array of strings, a string, or nil");
613 nl_errcheck(rb_thread_blocking_region(diag, &args, NULL, 0));
615 st_foreach(args.table, NIL_P(addrs) ? st_to_hash : st_AND_hash, rv);
616 st_free_table(args.table);
618 /* let GC deal with corner cases */
619 if (argc < 2) rb_io_close(sock);
620 return rv;
623 void Init_raindrops_linux_inet_diag(void)
625 VALUE cRaindrops = rb_const_get(rb_cObject, rb_intern("Raindrops"));
626 VALUE mLinux = rb_define_module_under(cRaindrops, "Linux");
628 rb_require("socket");
629 cIDSock = rb_const_get(rb_cObject, rb_intern("Socket"));
630 id_new = rb_intern("new");
633 * Document-class: Raindrops::InetDiagSocket
635 * This is a subclass of +Socket+ specifically for talking
636 * to the inet_diag facility of Netlink.
638 cIDSock = rb_define_class_under(cRaindrops, "InetDiagSocket", cIDSock);
639 rb_define_singleton_method(cIDSock, "new", ids_s_new, 0);
641 cListenStats = rb_const_get(cRaindrops, rb_intern("ListenStats"));
643 rb_define_module_function(mLinux, "tcp_listener_stats",
644 tcp_listener_stats, -1);
646 page_size = getpagesize();
648 assert(OPLEN <= page_size && "bytecode OPLEN is not <= PAGE_SIZE");
650 #endif /* __linux__ */