read/write: account for buffer changes during wait
[kgio.git] / ext / kgio / kgio_ext.c
blobac6f448fea1a584f86a519e8bc6a6a1422731c28
1 #include <ruby.h>
2 #ifdef HAVE_RUBY_IO_H
3 # include <ruby/io.h>
4 #else
5 # include <rubyio.h>
6 #endif
7 #include <errno.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <sys/un.h>
11 #include <netinet/in.h>
12 #include <fcntl.h>
13 #include <unistd.h>
14 #include <arpa/inet.h>
15 #include <assert.h>
17 #include "missing/accept4.h"
18 #include "nonblock.h"
19 #include "my_fileno.h"
20 #include "sock_for_fd.h"
22 #if defined(__linux__)
24 * we know MSG_DONTWAIT works properly on all stream sockets under Linux
25 * we can define this macro for other platforms as people care and
26 * notice.
28 # define USE_MSG_DONTWAIT
29 static int accept4_flags = A4_SOCK_CLOEXEC;
30 #else /* ! linux */
31 static int accept4_flags = A4_SOCK_CLOEXEC | A4_SOCK_NONBLOCK;
32 #endif /* ! linux */
34 static VALUE cSocket;
35 static VALUE localhost;
36 static VALUE mKgio_WaitReadable, mKgio_WaitWritable;
37 static ID io_wait_rd, io_wait_wr;
38 static ID iv_kgio_addr;
40 struct io_args {
41 VALUE io;
42 VALUE buf;
43 char *ptr;
44 long len;
45 int fd;
48 struct accept_args {
49 int fd;
50 struct sockaddr *addr;
51 socklen_t *addrlen;
54 static void wait_readable(VALUE io, int fd)
56 if (io_wait_rd) {
57 (void)rb_funcall(io, io_wait_rd, 0, 0);
58 } else {
59 if (!rb_io_wait_readable(fd))
60 rb_sys_fail("wait readable");
64 static void wait_writable(VALUE io, int fd)
66 if (io_wait_wr) {
67 (void)rb_funcall(io, io_wait_wr, 0, 0);
68 } else {
69 if (!rb_io_wait_writable(fd))
70 rb_sys_fail("wait writable");
74 static void prepare_read(struct io_args *a, int argc, VALUE *argv, VALUE io)
76 VALUE length;
78 a->io = io;
79 a->fd = my_fileno(io);
80 rb_scan_args(argc, argv, "11", &length, &a->buf);
81 a->len = NUM2LONG(length);
82 if (NIL_P(a->buf)) {
83 a->buf = rb_str_new(NULL, a->len);
84 } else {
85 StringValue(a->buf);
86 rb_str_resize(a->buf, a->len);
88 a->ptr = RSTRING_PTR(a->buf);
91 static int read_check(struct io_args *a, long n, const char *msg, int io_wait)
93 if (n == -1) {
94 if (errno == EINTR)
95 return -1;
96 rb_str_set_len(a->buf, 0);
97 if (errno == EAGAIN) {
98 if (io_wait) {
99 wait_readable(a->io, a->fd);
101 /* buf may be modified in other thread/fiber */
102 rb_str_resize(a->buf, a->len);
103 a->ptr = RSTRING_PTR(a->buf);
104 return -1;
105 } else {
106 a->buf = mKgio_WaitReadable;
107 return 0;
110 rb_sys_fail(msg);
112 rb_str_set_len(a->buf, n);
113 if (n == 0)
114 a->buf = Qnil;
115 return 0;
118 static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io)
120 struct io_args a;
121 long n;
123 prepare_read(&a, argc, argv, io);
124 set_nonblocking(a.fd);
125 retry:
126 n = (long)read(a.fd, a.ptr, a.len);
127 if (read_check(&a, n, "read", io_wait) != 0)
128 goto retry;
129 return a.buf;
133 * call-seq:
135 * io.kgio_read(maxlen) -> buffer
136 * io.kgio_read(maxlen, buffer) -> buffer
138 * Reads at most maxlen bytes from the stream socket. Returns with a
139 * newly allocated buffer, or may reuse an existing buffer if supplied.
141 * Calls the method assigned to Kgio.wait_readable, or blocks in a
142 * thread-safe manner for writability.
144 * Returns nil on EOF.
146 * This behaves like read(2) and IO#readpartial, NOT fread(3) or
147 * IO#read which possess read-in-full behavior.
149 static VALUE kgio_read(int argc, VALUE *argv, VALUE io)
151 return my_read(1, argc, argv, io);
155 * call-seq:
157 * io.kgio_tryread(maxlen) -> buffer
158 * io.kgio_tryread(maxlen, buffer) -> buffer
160 * Reads at most maxlen bytes from the stream socket. Returns with a
161 * newly allocated buffer, or may reuse an existing buffer if supplied.
163 * Returns nil on EOF.
165 * Returns Kgio::WaitReadable if EAGAIN is encountered.
167 static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io)
169 return my_read(0, argc, argv, io);
172 #ifdef USE_MSG_DONTWAIT
173 static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io)
175 struct io_args a;
176 long n;
178 prepare_read(&a, argc, argv, io);
179 retry:
180 n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT);
181 if (read_check(&a, n, "recv", io_wait) != 0)
182 goto retry;
183 return a.buf;
187 * This method may be optimized on some systems (e.g. GNU/Linux) to use
188 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
189 * Otherwise this is the same as Kgio::PipeMethods#kgio_read
191 static VALUE kgio_recv(int argc, VALUE *argv, VALUE io)
193 return my_recv(1, argc, argv, io);
197 * This method may be optimized on some systems (e.g. GNU/Linux) to use
198 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
199 * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread
201 static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io)
203 return my_recv(0, argc, argv, io);
205 #else /* ! USE_MSG_DONTWAIT */
206 # define kgio_recv kgio_read
207 # define kgio_tryrecv kgio_tryread
208 #endif /* USE_MSG_DONTWAIT */
210 static void prepare_write(struct io_args *a, VALUE io, VALUE str)
212 a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str);
213 a->ptr = RSTRING_PTR(a->buf);
214 a->len = RSTRING_LEN(a->buf);
215 a->io = io;
216 a->fd = my_fileno(io);
219 static int write_check(struct io_args *a, long n, const char *msg, int io_wait)
221 if (a->len == n) {
222 done:
223 a->buf = Qnil;
224 } else if (n == -1) {
225 if (errno == EINTR)
226 return -1;
227 if (errno == EAGAIN) {
228 if (io_wait) {
229 long written = RSTRING_LEN(a->buf) - a->len;
231 wait_writable(a->io, a->fd);
233 /* buf may be modified in other thread/fiber */
234 a->len = RSTRING_LEN(a->buf) - written;
235 if (a->len <= 0)
236 goto done;
237 a->ptr = RSTRING_PTR(a->buf) + written;
238 return -1;
239 } else {
240 a->buf = mKgio_WaitWritable;
241 return 0;
244 rb_sys_fail(msg);
245 } else {
246 assert(n >= 0 && n < a->len && "write/send syscall broken?");
247 if (io_wait) {
248 a->ptr += n;
249 a->len -= n;
250 return -1;
252 a->buf = rb_str_new(a->ptr + n, a->len - n);
254 return 0;
257 static VALUE my_write(VALUE io, VALUE str, int io_wait)
259 struct io_args a;
260 long n;
262 prepare_write(&a, io, str);
263 set_nonblocking(a.fd);
264 retry:
265 n = (long)write(a.fd, a.ptr, a.len);
266 if (write_check(&a, n, "write", io_wait) != 0)
267 goto retry;
268 return a.buf;
272 * call-seq:
274 * io.kgio_write(str) -> nil
276 * Returns nil when the write completes.
278 * Calls the method Kgio.wait_writable if it is set. Otherwise this
279 * blocks in a thread-safe manner until all data is written or a
280 * fatal error occurs.
282 static VALUE kgio_write(VALUE io, VALUE str)
284 return my_write(io, str, 1);
288 * call-seq:
290 * io.kgio_trywrite(str) -> nil or Kgio::WaitWritable
292 * Returns nil if the write was completed in full.
294 * Returns a String containing the unwritten portion if there was a
295 * partial write.
297 * Returns Kgio::WaitWritable if EAGAIN is encountered.
299 static VALUE kgio_trywrite(VALUE io, VALUE str)
301 return my_write(io, str, 0);
304 #ifdef USE_MSG_DONTWAIT
306 * This method behaves like Kgio::PipeMethods#kgio_write, except
307 * it will use send(2) with the MSG_DONTWAIT flag on sockets to
308 * avoid unnecessary calls to fcntl(2).
310 static VALUE my_send(VALUE io, VALUE str, int io_wait)
312 struct io_args a;
313 long n;
315 prepare_write(&a, io, str);
316 retry:
317 n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
318 if (write_check(&a, n, "send", io_wait) != 0)
319 goto retry;
320 return a.buf;
324 * This method may be optimized on some systems (e.g. GNU/Linux) to use
325 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
326 * Otherwise this is the same as Kgio::PipeMethods#kgio_write
328 static VALUE kgio_send(VALUE io, VALUE str)
330 return my_send(io, str, 1);
334 * This method may be optimized on some systems (e.g. GNU/Linux) to use
335 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
336 * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite
338 static VALUE kgio_trysend(VALUE io, VALUE str)
340 return my_send(io, str, 0);
342 #else /* ! USE_MSG_DONTWAIT */
343 # define kgio_send kgio_write
344 # define kgio_trysend kgio_trywrite
345 #endif /* ! USE_MSG_DONTWAIT */
348 * call-seq:
350 * Kgio.wait_readable = :method_name
351 * Kgio.wait_readable = nil
353 * Sets a method for kgio_read to call when a read would block.
354 * This is useful for non-blocking frameworks that use Fibers,
355 * as the method referred to this may cause the current Fiber
356 * to yield execution.
358 * A special value of nil will cause Ruby to wait using the
359 * rb_io_wait_readable() function.
361 static VALUE set_wait_rd(VALUE mod, VALUE sym)
363 switch (TYPE(sym)) {
364 case T_SYMBOL:
365 io_wait_rd = SYM2ID(sym);
366 return sym;
367 case T_NIL:
368 io_wait_rd = 0;
369 return sym;
371 rb_raise(rb_eTypeError, "must be a symbol or nil");
372 return sym;
376 * call-seq:
378 * Kgio.wait_writable = :method_name
379 * Kgio.wait_writable = nil
381 * Sets a method for kgio_write to call when a read would block.
382 * This is useful for non-blocking frameworks that use Fibers,
383 * as the method referred to this may cause the current Fiber
384 * to yield execution.
386 * A special value of nil will cause Ruby to wait using the
387 * rb_io_wait_writable() function.
389 static VALUE set_wait_wr(VALUE mod, VALUE sym)
391 switch (TYPE(sym)) {
392 case T_SYMBOL:
393 io_wait_wr = SYM2ID(sym);
394 return sym;
395 case T_NIL:
396 io_wait_wr = 0;
397 return sym;
399 rb_raise(rb_eTypeError, "must be a symbol or nil");
400 return sym;
404 * call-seq:
406 * Kgio.wait_writable -> Symbol or nil
408 * Returns the symbolic method name of the method assigned to
409 * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_write
410 * or Kgio::SocketMethods#kgio_write call
412 static VALUE wait_wr(VALUE mod)
414 return io_wait_wr ? ID2SYM(io_wait_wr) : Qnil;
418 * call-seq:
420 * Kgio.wait_readable -> Symbol or nil
422 * Returns the symbolic method name of the method assigned to
423 * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_read
424 * or Kgio::SocketMethods#kgio_read call.
426 static VALUE wait_rd(VALUE mod)
428 return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil;
431 static VALUE xaccept(void *ptr)
433 struct accept_args *a = ptr;
435 return (VALUE)accept4(a->fd, a->addr, a->addrlen, accept4_flags);
438 #ifdef HAVE_RB_THREAD_BLOCKING_REGION
439 # include <time.h>
441 * Try to use a (real) blocking accept() since that can prevent
442 * thundering herds under Linux:
443 * http://www.citi.umich.edu/projects/linux-scalability/reports/accept.html
445 * So we periodically disable non-blocking, but not too frequently
446 * because other processes may set non-blocking (especially during
447 * a process upgrade) with Rainbows! concurrency model changes.
449 static int thread_accept(struct accept_args *a, int force_nonblock)
451 if (force_nonblock)
452 set_nonblocking(a->fd);
453 return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0);
456 static void set_blocking_or_block(int fd)
458 static time_t last_set_blocking;
459 time_t now = time(NULL);
461 if (last_set_blocking == 0) {
462 last_set_blocking = now;
463 (void)rb_io_wait_readable(fd);
464 } else if ((now - last_set_blocking) <= 5) {
465 (void)rb_io_wait_readable(fd);
466 } else {
467 int flags = fcntl(fd, F_GETFL);
468 if (flags == -1)
469 rb_sys_fail("fcntl(F_GETFL)");
470 if (flags & O_NONBLOCK) {
471 flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
472 if (flags == -1)
473 rb_sys_fail("fcntl(F_SETFL)");
475 last_set_blocking = now;
478 #else /* ! HAVE_RB_THREAD_BLOCKING_REGION */
479 # include <rubysig.h>
480 static int thread_accept(struct accept_args *a, int force_nonblock)
482 int rv;
484 /* always use non-blocking accept() under 1.8 for green threads */
485 set_nonblocking(a->fd);
486 TRAP_BEG;
487 rv = (int)xaccept(a);
488 TRAP_END;
489 return rv;
491 #define set_blocking_or_block(fd) (void)rb_io_wait_readable(fd)
492 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
494 static VALUE
495 my_accept(VALUE io, struct sockaddr *addr, socklen_t *addrlen, int nonblock)
497 int client;
498 struct accept_args a;
500 a.fd = my_fileno(io);
501 a.addr = addr;
502 a.addrlen = addrlen;
503 retry:
504 client = thread_accept(&a, nonblock);
505 if (client == -1) {
506 switch (errno) {
507 case EAGAIN:
508 if (nonblock)
509 return Qnil;
510 set_blocking_or_block(a.fd);
511 #ifdef ECONNABORTED
512 case ECONNABORTED:
513 #endif /* ECONNABORTED */
514 #ifdef EPROTO
515 case EPROTO:
516 #endif /* EPROTO */
517 case EINTR:
518 goto retry;
519 case ENOMEM:
520 case EMFILE:
521 case ENFILE:
522 #ifdef ENOBUFS
523 case ENOBUFS:
524 #endif /* ENOBUFS */
525 errno = 0;
526 rb_gc();
527 client = thread_accept(&a, nonblock);
529 if (client == -1) {
530 if (errno == EINTR)
531 goto retry;
532 rb_sys_fail("accept");
535 return sock_for_fd(cSocket, client);
538 static void in_addr_set(VALUE io, struct sockaddr_in *addr)
540 VALUE host = rb_str_new(0, INET_ADDRSTRLEN);
541 socklen_t addrlen = (socklen_t)INET_ADDRSTRLEN;
542 const char *name;
544 name = inet_ntop(AF_INET, &addr->sin_addr, RSTRING_PTR(host), addrlen);
545 if (name == NULL)
546 rb_sys_fail("inet_ntop");
547 rb_str_set_len(host, strlen(name));
548 rb_ivar_set(io, iv_kgio_addr, host);
552 * call-seq:
554 * server = Kgio::TCPServer.new('0.0.0.0', 80)
555 * server.kgio_tryaccept -> Kgio::Socket or nil
557 * Initiates a non-blocking accept and returns a generic Kgio::Socket
558 * object with the kgio_addr attribute set to the IP address of the
559 * connected client on success.
561 * Returns nil on EAGAIN, and raises on other errors.
563 static VALUE tcp_tryaccept(VALUE io)
565 struct sockaddr_in addr;
566 socklen_t addrlen = sizeof(struct sockaddr_in);
567 VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 1);
569 if (!NIL_P(rv))
570 in_addr_set(rv, &addr);
571 return rv;
575 * call-seq:
577 * server = Kgio::TCPServer.new('0.0.0.0', 80)
578 * server.kgio_accept -> Kgio::Socket or nil
580 * Initiates a blocking accept and returns a generic Kgio::Socket
581 * object with the kgio_addr attribute set to the IP address of
582 * the client on success.
584 * On Ruby implementations using native threads, this can use a blocking
585 * accept(2) (or accept4(2)) system call to avoid thundering herds.
587 static VALUE tcp_accept(VALUE io)
589 struct sockaddr_in addr;
590 socklen_t addrlen = sizeof(struct sockaddr_in);
591 VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 0);
593 in_addr_set(rv, &addr);
594 return rv;
598 * call-seq:
600 * server = Kgio::UNIXServer.new("/path/to/unix/socket")
601 * server.kgio_tryaccept -> Kgio::Socket or nil
603 * Initiates a non-blocking accept and returns a generic Kgio::Socket
604 * object with the kgio_addr attribute set (to the value of
605 * Kgio::LOCALHOST) on success.
607 * Returns nil on EAGAIN, and raises on other errors.
609 static VALUE unix_tryaccept(VALUE io)
611 VALUE rv = my_accept(io, NULL, NULL, 1);
613 if (!NIL_P(rv))
614 rb_ivar_set(rv, iv_kgio_addr, localhost);
615 return rv;
619 * call-seq:
621 * server = Kgio::UNIXServer.new("/path/to/unix/socket")
622 * server.kgio_accept -> Kgio::Socket or nil
624 * Initiates a blocking accept and returns a generic Kgio::Socket
625 * object with the kgio_addr attribute set (to the value of
626 * Kgio::LOCALHOST) on success.
628 * On Ruby implementations using native threads, this can use a blocking
629 * accept(2) (or accept4(2)) system call to avoid thundering herds.
631 static VALUE unix_accept(VALUE io)
633 VALUE rv = my_accept(io, NULL, NULL, 0);
635 rb_ivar_set(rv, iv_kgio_addr, localhost);
636 return rv;
640 * call-seq:
642 * Kgio.accept_cloexec? -> true or false
644 * Returns true if newly accepted Kgio::Sockets are created with the
645 * FD_CLOEXEC file descriptor flag, false if not.
647 static VALUE get_cloexec(VALUE mod)
649 return (accept4_flags & A4_SOCK_CLOEXEC) ==
650 A4_SOCK_CLOEXEC ? Qtrue : Qfalse;
655 * call-seq:
657 * Kgio.accept_nonblock? -> true or false
659 * Returns true if newly accepted Kgio::Sockets are created with the
660 * O_NONBLOCK file status flag, false if not.
662 static VALUE get_nonblock(VALUE mod)
664 return (accept4_flags & A4_SOCK_NONBLOCK) ==
665 A4_SOCK_NONBLOCK ? Qtrue : Qfalse;
669 * call-seq:
671 * Kgio.accept_cloexec = true
672 * Kgio.accept_clocexec = false
674 * Sets whether or not Kgio::Socket objects created by
675 * TCPServer#kgio_accept,
676 * TCPServer#kgio_tryaccept,
677 * UNIXServer#kgio_accept,
678 * and UNIXServer#kgio_tryaccept
679 * are created with the FD_CLOEXEC file descriptor flag.
681 * This is on by default, as there is little reason to deal to enable
682 * it for client sockets on a socket server.
684 static VALUE set_cloexec(VALUE mod, VALUE boolean)
686 switch (TYPE(boolean)) {
687 case T_TRUE:
688 accept4_flags |= A4_SOCK_CLOEXEC;
689 return boolean;
690 case T_FALSE:
691 accept4_flags &= ~A4_SOCK_CLOEXEC;
692 return boolean;
694 rb_raise(rb_eTypeError, "not true or false");
695 return Qnil;
699 * call-seq:
701 * Kgio.accept_nonblock = true
702 * Kgio.accept_nonblock = false
704 * Sets whether or not Kgio::Socket objects created by
705 * TCPServer#kgio_accept,
706 * TCPServer#kgio_tryaccept,
707 * UNIXServer#kgio_accept,
708 * and UNIXServer#kgio_tryaccept
709 * are created with the O_NONBLOCK file status flag.
711 * This defaults to +false+ for GNU/Linux where MSG_DONTWAIT is
712 * available (and on newer GNU/Linux, accept4() may also set
713 * the non-blocking flag. This defaults to +true+ on non-GNU/Linux
714 * systems.
716 static VALUE set_nonblock(VALUE mod, VALUE boolean)
718 switch (TYPE(boolean)) {
719 case T_TRUE:
720 accept4_flags |= A4_SOCK_NONBLOCK;
721 return boolean;
722 case T_FALSE:
723 accept4_flags &= ~A4_SOCK_NONBLOCK;
724 return boolean;
726 rb_raise(rb_eTypeError, "not true or false");
727 return Qnil;
730 static void close_fail(int fd, const char *msg)
732 int saved_errno = errno;
733 (void)close(fd);
734 errno = saved_errno;
735 rb_sys_fail(msg);
738 #ifdef SOCK_NONBLOCK
739 # define MY_SOCK_STREAM (SOCK_STREAM|SOCK_NONBLOCK)
740 #else
741 # define MY_SOCK_STREAM SOCK_STREAM
742 #endif /* ! SOCK_NONBLOCK */
744 static VALUE
745 my_connect(VALUE klass, int io_wait, int domain, void *addr, socklen_t addrlen)
747 int fd = socket(domain, MY_SOCK_STREAM, 0);
749 if (fd == -1) {
750 switch (errno) {
751 case EMFILE:
752 case ENFILE:
753 #ifdef ENOBUFS
754 case ENOBUFS:
755 #endif /* ENOBUFS */
756 errno = 0;
757 rb_gc();
758 fd = socket(domain, MY_SOCK_STREAM, 0);
760 if (fd == -1)
761 rb_sys_fail("socket");
764 #ifndef SOCK_NONBLOCK
765 if (fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK) == -1)
766 close_fail(fd, "fcntl(F_SETFL, O_RDWR | O_NONBLOCK)");
767 #endif /* SOCK_NONBLOCK */
769 if (connect(fd, addr, addrlen) == -1) {
770 if (errno == EINPROGRESS) {
771 VALUE io = sock_for_fd(klass, fd);
773 if (io_wait) {
774 errno = EAGAIN;
775 wait_writable(io, fd);
777 return io;
779 close_fail(fd, "connect");
781 return sock_for_fd(klass, fd);
784 static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait)
786 struct sockaddr_in addr = { 0 };
788 addr.sin_family = AF_INET;
789 addr.sin_port = htons((unsigned short)NUM2INT(port));
791 switch (inet_pton(AF_INET, StringValuePtr(ip), &addr.sin_addr)) {
792 case 1:
793 return my_connect(klass, io_wait, PF_INET, &addr, sizeof(addr));
794 case -1:
795 rb_sys_fail("inet_pton");
797 rb_raise(rb_eArgError, "invalid address: %s", StringValuePtr(ip));
799 return Qnil;
803 * call-seq:
805 * Kgio::TCPSocket.new('127.0.0.1', 80) -> socket
807 * Creates a new Kgio::TCPSocket object and initiates a
808 * non-blocking connection.
810 * This may block and call any method assigned to Kgio.wait_writable.
812 * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
813 * lookups (which is subject to a different set of timeouts and
814 * best handled elsewhere).
816 static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port)
818 return tcp_connect(klass, ip, port, 1);
822 * call-seq:
824 * Kgio::TCPSocket.start('127.0.0.1', 80) -> socket
826 * Creates a new Kgio::TCPSocket object and initiates a
827 * non-blocking connection. The caller should select/poll
828 * on the socket for writability before attempting to write
829 * or optimistically attempt a write and handle Kgio::WaitWritable
830 * or Errno::EAGAIN.
832 * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
833 * lookups (which is subject to a different set of timeouts and
834 * best handled elsewhere).
836 static VALUE kgio_tcp_start(VALUE klass, VALUE ip, VALUE port)
838 return tcp_connect(klass, ip, port, 0);
841 static VALUE unix_connect(VALUE klass, VALUE path, int io_wait)
843 struct sockaddr_un addr = { 0 };
844 long len;
846 StringValue(path);
847 len = RSTRING_LEN(path);
848 if (sizeof(addr.sun_path) <= len)
849 rb_raise(rb_eArgError,
850 "too long unix socket path (max: %dbytes)",
851 (int)sizeof(addr.sun_path)-1);
853 memcpy(addr.sun_path, RSTRING_PTR(path), len);
854 addr.sun_family = AF_UNIX;
856 return my_connect(klass, io_wait, PF_UNIX, &addr, sizeof(addr));
860 * call-seq:
862 * Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket
864 * Creates a new Kgio::UNIXSocket object and initiates a
865 * non-blocking connection.
867 * This may block and call any method assigned to Kgio.wait_writable.
869 static VALUE kgio_unix_connect(VALUE klass, VALUE path)
871 return unix_connect(klass, path, 1);
875 * call-seq:
877 * Kgio::UNIXSocket.start("/path/to/unix/socket") -> socket
879 * Creates a new Kgio::UNIXSocket object and initiates a
880 * non-blocking connection. The caller should select/poll
881 * on the socket for writability before attempting to write
882 * or optimistically attempt a write and handle Kgio::WaitWritable
883 * or Errno::EAGAIN.
885 static VALUE kgio_unix_start(VALUE klass, VALUE path)
887 return unix_connect(klass, path, 0);
890 static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait)
892 int domain;
893 socklen_t addrlen;
894 struct sockaddr *sockaddr;
896 if (TYPE(addr) == T_STRING) {
897 sockaddr = (struct sockaddr *)(RSTRING_PTR(addr));
898 addrlen = (socklen_t)RSTRING_LEN(addr);
899 } else {
900 rb_raise(rb_eTypeError, "invalid address");
902 switch (((struct sockaddr_in *)(sockaddr))->sin_family) {
903 case AF_UNIX: domain = PF_UNIX; break;
904 case AF_INET: domain = PF_INET; break;
905 #ifdef AF_INET6 /* IPv6 support incomplete */
906 case AF_INET6: domain = PF_INET6; break;
907 #endif /* AF_INET6 */
908 default:
909 rb_raise(rb_eArgError, "invalid address family");
912 return my_connect(klass, io_wait, domain, sockaddr, addrlen);
915 /* call-seq:
917 * addr = Socket.pack_sockaddr_in(80, 'example.com')
918 * Kgio::Socket.connect(addr) -> socket
920 * addr = Socket.pack_sockaddr_un("/path/to/unix/socket")
921 * Kgio::Socket.connect(addr) -> socket
923 * Creates a generic Kgio::Socket object and initiates a
924 * non-blocking connection.
926 * This may block and call any method assigned to Kgio.wait_writable.
928 static VALUE kgio_connect(VALUE klass, VALUE addr)
930 return stream_connect(klass, addr, 1);
933 /* call-seq:
935 * addr = Socket.pack_sockaddr_in(80, 'example.com')
936 * Kgio::Socket.start(addr) -> socket
938 * addr = Socket.pack_sockaddr_un("/path/to/unix/socket")
939 * Kgio::Socket.start(addr) -> socket
941 * Creates a generic Kgio::Socket object and initiates a
942 * non-blocking connection. The caller should select/poll
943 * on the socket for writability before attempting to write
944 * or optimistically attempt a write and handle Kgio::WaitWritable
945 * or Errno::EAGAIN.
947 static VALUE kgio_start(VALUE klass, VALUE addr)
949 return stream_connect(klass, addr, 0);
952 void Init_kgio_ext(void)
954 VALUE mKgio = rb_define_module("Kgio");
955 VALUE mPipeMethods, mSocketMethods;
956 VALUE cUNIXServer, cTCPServer, cUNIXSocket, cTCPSocket;
958 rb_require("socket");
961 * Document-module: Kgio::Socket
963 * A generic socket class with Kgio::SocketMethods included.
964 * This is returned by all Kgio methods that accept(2) a connected
965 * stream socket.
967 cSocket = rb_const_get(rb_cObject, rb_intern("Socket"));
968 cSocket = rb_define_class_under(mKgio, "Socket", cSocket);
970 localhost = rb_str_new2("127.0.0.1");
973 * The IPv4 address of UNIX domain sockets, useful for creating
974 * Rack (and CGI) servers that also serve HTTP traffic over
975 * UNIX domain sockets.
977 rb_const_set(mKgio, rb_intern("LOCALHOST"), localhost);
980 * Document-module: Kgio::WaitReadable
982 * PipeMethods#kgio_tryread and SocketMethods#kgio_tryread will
983 * return this constant when waiting for a read is required.
985 mKgio_WaitReadable = rb_define_module_under(mKgio, "WaitReadable");
988 * Document-module: Kgio::WaitWritable
990 * PipeMethods#kgio_trywrite and SocketMethods#kgio_trywrite will
991 * return this constant when waiting for a read is required.
993 mKgio_WaitWritable = rb_define_module_under(mKgio, "WaitWritable");
995 rb_define_singleton_method(mKgio, "wait_readable=", set_wait_rd, 1);
996 rb_define_singleton_method(mKgio, "wait_writable=", set_wait_wr, 1);
997 rb_define_singleton_method(mKgio, "wait_readable", wait_rd, 0);
998 rb_define_singleton_method(mKgio, "wait_writable", wait_wr, 0);
999 rb_define_singleton_method(mKgio, "accept_cloexec?", get_cloexec, 0);
1000 rb_define_singleton_method(mKgio, "accept_cloexec=", set_cloexec, 1);
1001 rb_define_singleton_method(mKgio, "accept_nonblock?", get_nonblock, 0);
1002 rb_define_singleton_method(mKgio, "accept_nonblock=", set_nonblock, 1);
1005 * Document-module: Kgio::PipeMethods
1007 * This module may be used used to create classes that respond to
1008 * various Kgio methods for reading and writing. This is included
1009 * in Kgio::Pipe by default.
1011 mPipeMethods = rb_define_module_under(mKgio, "PipeMethods");
1012 rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
1013 rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
1014 rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
1015 rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
1018 * Document-module: Kgio::SocketMethods
1020 * This method behaves like Kgio::PipeMethods, but contains
1021 * optimizations for sockets on certain operating systems
1022 * (e.g. GNU/Linux).
1024 mSocketMethods = rb_define_module_under(mKgio, "SocketMethods");
1025 rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
1026 rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
1027 rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
1028 rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
1031 * Returns the client IPv4 address of the socket in dotted quad
1032 * form as a string. This is always the value of the
1033 * Kgio::LOCALHOST constant for UNIX domain sockets.
1035 rb_define_attr(mSocketMethods, "kgio_addr", 1, 1);
1037 rb_include_module(cSocket, mSocketMethods);
1038 rb_define_singleton_method(cSocket, "new", kgio_connect, 1);
1039 rb_define_singleton_method(cSocket, "start", kgio_start, 1);
1041 cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer"));
1042 cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer);
1043 rb_define_method(cUNIXServer, "kgio_tryaccept", unix_tryaccept, 0);
1044 rb_define_method(cUNIXServer, "kgio_accept", unix_accept, 0);
1046 cTCPServer = rb_const_get(rb_cObject, rb_intern("TCPServer"));
1047 cTCPServer = rb_define_class_under(mKgio, "TCPServer", cTCPServer);
1048 rb_define_method(cTCPServer, "kgio_tryaccept", tcp_tryaccept, 0);
1049 rb_define_method(cTCPServer, "kgio_accept", tcp_accept, 0);
1051 cTCPSocket = rb_const_get(rb_cObject, rb_intern("TCPSocket"));
1052 cTCPSocket = rb_define_class_under(mKgio, "TCPSocket", cTCPSocket);
1053 rb_include_module(cTCPSocket, mSocketMethods);
1054 rb_define_singleton_method(cTCPSocket, "new", kgio_tcp_connect, 2);
1055 rb_define_singleton_method(cTCPSocket, "start", kgio_tcp_start, 2);
1057 cUNIXSocket = rb_const_get(rb_cObject, rb_intern("UNIXSocket"));
1058 cUNIXSocket = rb_define_class_under(mKgio, "UNIXSocket", cUNIXSocket);
1059 rb_include_module(cUNIXSocket, mSocketMethods);
1060 rb_define_singleton_method(cUNIXSocket, "new", kgio_unix_connect, 1);
1061 rb_define_singleton_method(cUNIXSocket, "start", kgio_unix_start, 1);
1063 iv_kgio_addr = rb_intern("@kgio_addr");
1064 init_sock_for_fd();