1 /* We do not modify RSTRING in this file, so RSTRING_MODIFIED is not needed */
4 #include "sock_for_fd.h"
5 #include "blocking_io_region.h"
7 static void close_fail(int fd
, const char *msg
)
9 int saved_errno
= errno
;
15 static int MY_SOCK_STREAM
=
16 #if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
17 # ifdef HAVE_RB_FD_FIX_CLOEXEC
18 (SOCK_STREAM
|SOCK_NONBLOCK
|SOCK_CLOEXEC
)
20 (SOCK_STREAM
|SOCK_NONBLOCK
)
24 #endif /* ! SOCK_NONBLOCK */
27 /* do not set close-on-exec by default on Ruby <2.0.0 */
28 #ifndef HAVE_RB_FD_FIX_CLOEXEC
29 # define rb_fd_fix_cloexec(fd) for (;0;)
30 #endif /* HAVE_RB_FD_FIX_CLOEXEC */
32 /* try to use SOCK_NONBLOCK and SOCK_CLOEXEC */
33 static int my_socket(int domain
)
38 fd
= socket(domain
, MY_SOCK_STREAM
, 0);
49 fd
= socket(domain
, MY_SOCK_STREAM
, 0);
52 if (MY_SOCK_STREAM
!= SOCK_STREAM
) {
53 MY_SOCK_STREAM
= SOCK_STREAM
;
58 rb_sys_fail("socket");
61 if (MY_SOCK_STREAM
== SOCK_STREAM
) {
62 if (fcntl(fd
, F_SETFL
, O_RDWR
| O_NONBLOCK
) < 0)
63 close_fail(fd
, "fcntl(F_SETFL, O_RDWR | O_NONBLOCK)");
64 rb_fd_fix_cloexec(fd
);
71 my_connect(VALUE klass
, int io_wait
, int domain
,
72 const void *addr
, socklen_t addrlen
)
74 int fd
= my_socket(domain
);
76 if (connect(fd
, addr
, addrlen
) < 0) {
77 if (errno
== EINPROGRESS
) {
78 VALUE io
= sock_for_fd(klass
, fd
);
82 (void)kgio_call_wait_writable(io
);
86 close_fail(fd
, "connect");
88 return sock_for_fd(klass
, fd
);
92 tcp_getaddr(struct addrinfo
*hints
, struct sockaddr_storage
*addr
,
97 const char *ipname
= StringValuePtr(ip
);
101 if (TYPE(port
) != T_FIXNUM
)
102 rb_raise(rb_eTypeError
, "port must be a non-negative integer");
103 uport
= FIX2UINT(port
);
105 rc
= snprintf(ipport
, sizeof(ipport
), "%u", uport
);
106 if (rc
>= (int)sizeof(ipport
) || rc
<= 0)
107 rb_raise(rb_eArgError
, "invalid TCP port: %u", uport
);
108 memset(hints
, 0, sizeof(struct addrinfo
));
109 hints
->ai_family
= AF_UNSPEC
;
110 hints
->ai_socktype
= SOCK_STREAM
;
111 hints
->ai_protocol
= IPPROTO_TCP
;
112 /* disallow non-deterministic DNS lookups */
113 hints
->ai_flags
= AI_NUMERICHOST
;
115 rc
= getaddrinfo(ipname
, ipport
, hints
, &res
);
117 rb_raise(rb_eArgError
, "getaddrinfo(%s:%s): %s",
118 ipname
, ipport
, gai_strerror(rc
));
120 /* copy needed data and free ASAP to avoid needing rb_ensure */
121 hints
->ai_family
= res
->ai_family
;
122 hints
->ai_addrlen
= res
->ai_addrlen
;
123 memcpy(addr
, res
->ai_addr
, res
->ai_addrlen
);
127 static VALUE
tcp_connect(VALUE klass
, VALUE ip
, VALUE port
, int io_wait
)
129 struct addrinfo hints
;
130 struct sockaddr_storage addr
;
132 tcp_getaddr(&hints
, &addr
, ip
, port
);
134 return my_connect(klass
, io_wait
, hints
.ai_family
,
135 &addr
, hints
.ai_addrlen
);
138 static const struct sockaddr
*sockaddr_from(socklen_t
*addrlen
, VALUE addr
)
140 if (TYPE(addr
) == T_STRING
) {
141 *addrlen
= (socklen_t
)RSTRING_LEN(addr
);
142 return (const struct sockaddr
*)(RSTRING_PTR(addr
));
144 rb_raise(rb_eTypeError
, "invalid address");
148 #if defined(MSG_FASTOPEN) && defined(KGIO_HAVE_THREAD_CALL_WITHOUT_GVL)
149 #ifndef HAVE_RB_STR_SUBSEQ
150 #define rb_str_subseq rb_str_substr
156 const struct sockaddr
*addr
;
160 static VALUE
tfo_sendto(void *_a
)
162 struct tfo_args
*a
= _a
;
165 w
= sendto(a
->fd
, a
->buf
, a
->buflen
, MSG_FASTOPEN
, a
->addr
, a
->addrlen
);
172 * s = Kgio::Socket.new(:INET, :STREAM)
173 * addr = Socket.pack_sockaddr_in(80, "example.com")
174 * s.kgio_fastopen("hello world", addr) -> nil
176 * Starts a TCP connection using TCP Fast Open. This uses a blocking
177 * sendto() syscall and is only available on Ruby 1.9 or later.
178 * This raises exceptions (including Errno::EINPROGRESS/Errno::EAGAIN)
179 * on errors. Using this is only recommended for blocking sockets.
181 * Timeouts may be set with setsockopt:
183 * s.setsockopt(:SOCKET, :SNDTIMEO, [1,0].pack("l_l_"))
185 static VALUE
fastopen(VALUE sock
, VALUE buf
, VALUE addr
)
188 VALUE str
= (TYPE(buf
) == T_STRING
) ? buf
: rb_obj_as_string(buf
);
191 a
.fd
= my_fileno(sock
);
192 a
.buf
= RSTRING_PTR(str
);
193 a
.buflen
= (size_t)RSTRING_LEN(str
);
194 a
.addr
= sockaddr_from(&a
.addrlen
, addr
);
196 /* n.b. rb_thread_blocking_region preserves errno */
197 w
= (ssize_t
)rb_thread_io_blocking_region(tfo_sendto
, &a
, a
.fd
);
199 rb_sys_fail("sendto");
200 if ((size_t)w
== a
.buflen
)
203 return rb_str_subseq(str
, w
, a
.buflen
- w
);
205 #endif /* MSG_FASTOPEN */
210 * Kgio::TCPSocket.new('127.0.0.1', 80) -> socket
212 * Creates a new Kgio::TCPSocket object and initiates a
213 * non-blocking connection.
215 * This may block and call any method defined to +kgio_wait_writable+
218 * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
219 * lookups (which is subject to a different set of timeouts and
220 * best handled elsewhere).
222 static VALUE
kgio_tcp_connect(VALUE klass
, VALUE ip
, VALUE port
)
224 return tcp_connect(klass
, ip
, port
, 1);
230 * Kgio::TCPSocket.start('127.0.0.1', 80) -> socket
232 * Creates a new Kgio::TCPSocket object and initiates a
233 * non-blocking connection. The caller should select/poll
234 * on the socket for writability before attempting to write
235 * or optimistically attempt a write and handle :wait_writable
238 * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
239 * lookups (which is subject to a different set of timeouts and
240 * best handled elsewhere).
242 static VALUE
kgio_tcp_start(VALUE klass
, VALUE ip
, VALUE port
)
244 return tcp_connect(klass
, ip
, port
, 0);
247 static VALUE
unix_connect(VALUE klass
, VALUE path
, int io_wait
)
249 struct sockaddr_un addr
= { 0 };
253 len
= RSTRING_LEN(path
);
254 if ((long)sizeof(addr
.sun_path
) <= len
)
255 rb_raise(rb_eArgError
,
256 "too long unix socket path (max: %dbytes)",
257 (int)sizeof(addr
.sun_path
)-1);
259 memcpy(addr
.sun_path
, RSTRING_PTR(path
), len
);
260 addr
.sun_family
= AF_UNIX
;
262 return my_connect(klass
, io_wait
, PF_UNIX
, &addr
, sizeof(addr
));
268 * Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket
270 * Creates a new Kgio::UNIXSocket object and initiates a
271 * non-blocking connection.
273 * This may block and call any method defined to +kgio_wait_writable+
276 static VALUE
kgio_unix_connect(VALUE klass
, VALUE path
)
278 return unix_connect(klass
, path
, 1);
284 * Kgio::UNIXSocket.start("/path/to/unix/socket") -> socket
286 * Creates a new Kgio::UNIXSocket object and initiates a
287 * non-blocking connection. The caller should select/poll
288 * on the socket for writability before attempting to write
289 * or optimistically attempt a write and handle :wait_writable
292 static VALUE
kgio_unix_start(VALUE klass
, VALUE path
)
294 return unix_connect(klass
, path
, 0);
297 static VALUE
stream_connect(VALUE klass
, VALUE addr
, int io_wait
)
301 const struct sockaddr
*sockaddr
= sockaddr_from(&addrlen
, addr
);
303 switch (((const struct sockaddr_storage
*)(sockaddr
))->ss_family
) {
304 case AF_UNIX
: domain
= PF_UNIX
; break;
305 case AF_INET
: domain
= PF_INET
; break;
306 case AF_INET6
: domain
= PF_INET6
; break;
308 rb_raise(rb_eArgError
, "invalid address family");
311 return my_connect(klass
, io_wait
, domain
, sockaddr
, addrlen
);
316 * addr = Socket.pack_sockaddr_in(80, 'example.com')
317 * Kgio::Socket.connect(addr) -> socket
319 * addr = Socket.pack_sockaddr_un("/path/to/unix/socket")
320 * Kgio::Socket.connect(addr) -> socket
322 * Creates a generic Kgio::Socket object and initiates a
323 * non-blocking connection.
325 * This may block and call any method defined to +kgio_wait_writable+
328 static VALUE
kgio_connect(VALUE klass
, VALUE addr
)
330 return stream_connect(klass
, addr
, 1);
334 * If passed one argument, this is identical to Kgio::Socket.connect.
335 * If passed two or three arguments, it uses its superclass method:
337 * Socket.new(domain, socktype [, protocol ])
339 static VALUE
kgio_new(int argc
, VALUE
*argv
, VALUE klass
)
342 /* backwards compat, the only way for kgio <= 2.7.4 */
343 return stream_connect(klass
, argv
[0], 1);
345 return rb_call_super(argc
, argv
);
350 * addr = Socket.pack_sockaddr_in(80, 'example.com')
351 * Kgio::Socket.start(addr) -> socket
353 * addr = Socket.pack_sockaddr_un("/path/to/unix/socket")
354 * Kgio::Socket.start(addr) -> socket
356 * Creates a generic Kgio::Socket object and initiates a
357 * non-blocking connection. The caller should select/poll
358 * on the socket for writability before attempting to write
359 * or optimistically attempt a write and handle :wait_writable
362 static VALUE
kgio_start(VALUE klass
, VALUE addr
)
364 return stream_connect(klass
, addr
, 0);
367 void init_kgio_connect(void)
369 VALUE mKgio
= rb_define_module("Kgio");
370 VALUE cSocket
= rb_const_get(rb_cObject
, rb_intern("Socket"));
371 VALUE mSocketMethods
= rb_const_get(mKgio
, rb_intern("SocketMethods"));
372 VALUE cKgio_Socket
, cTCPSocket
, cUNIXSocket
;
375 * Document-class: Kgio::Socket
377 * A generic socket class with Kgio::SocketMethods included.
378 * This is returned by all Kgio methods that accept(2) a connected
381 cKgio_Socket
= rb_define_class_under(mKgio
, "Socket", cSocket
);
382 rb_include_module(cKgio_Socket
, mSocketMethods
);
383 rb_define_singleton_method(cKgio_Socket
, "new", kgio_new
, -1);
384 rb_define_singleton_method(cKgio_Socket
, "connect", kgio_connect
, 1);
385 rb_define_singleton_method(cKgio_Socket
, "start", kgio_start
, 1);
386 #if defined(MSG_FASTOPEN) && defined(KGIO_HAVE_THREAD_CALL_WITHOUT_GVL)
387 rb_define_method(cKgio_Socket
, "kgio_fastopen", fastopen
, 2);
390 * Document-class: Kgio::TCPSocket
392 * Kgio::TCPSocket should be used in place of the plain TCPSocket
393 * when kgio_* methods are needed.
395 cTCPSocket
= rb_const_get(rb_cObject
, rb_intern("TCPSocket"));
396 cTCPSocket
= rb_define_class_under(mKgio
, "TCPSocket", cTCPSocket
);
397 rb_include_module(cTCPSocket
, mSocketMethods
);
398 rb_define_singleton_method(cTCPSocket
, "new", kgio_tcp_connect
, 2);
399 rb_define_singleton_method(cTCPSocket
, "start", kgio_tcp_start
, 2);
402 * Document-class: Kgio::UNIXSocket
404 * Kgio::UNIXSocket should be used in place of the plain UNIXSocket
405 * when kgio_* methods are needed.
407 cUNIXSocket
= rb_const_get(rb_cObject
, rb_intern("UNIXSocket"));
408 cUNIXSocket
= rb_define_class_under(mKgio
, "UNIXSocket", cUNIXSocket
);
409 rb_include_module(cUNIXSocket
, mSocketMethods
);
410 rb_define_singleton_method(cUNIXSocket
, "new", kgio_unix_connect
, 1);
411 rb_define_singleton_method(cUNIXSocket
, "start", kgio_unix_start
, 1);