2 static VALUE sym_wait_readable
, sym_wait_writable
;
3 static VALUE eErrno_EPIPE
, eErrno_ECONNRESET
;
4 static ID id_set_backtrace
;
7 * we know MSG_DONTWAIT works properly on all stream sockets under Linux
8 * we can define this macro for other platforms as people care and
11 #if defined(__linux__) && ! defined(USE_MSG_DONTWAIT)
12 # define USE_MSG_DONTWAIT
13 static const int peek_flags
= MSG_DONTWAIT
|MSG_PEEK
;
15 static const int peek_flags
= MSG_PEEK
;
18 NORETURN(static void raise_empty_bt(VALUE
, const char *));
19 NORETURN(static void my_eof_error(void));
20 NORETURN(static void wr_sys_fail(const char *));
21 NORETURN(static void rd_sys_fail(const char *));
23 static void raise_empty_bt(VALUE err
, const char *msg
)
25 VALUE exc
= rb_exc_new2(err
, msg
);
26 VALUE bt
= rb_ary_new();
28 rb_funcall(exc
, id_set_backtrace
, 1, bt
);
32 static void my_eof_error(void)
34 raise_empty_bt(rb_eEOFError
, "end of file reached");
37 static void wr_sys_fail(const char *msg
)
42 raise_empty_bt(eErrno_EPIPE
, msg
);
45 raise_empty_bt(eErrno_ECONNRESET
, msg
);
50 static void rd_sys_fail(const char *msg
)
52 if (errno
== ECONNRESET
) {
54 raise_empty_bt(eErrno_ECONNRESET
, msg
);
59 static void prepare_read(struct io_args
*a
, int argc
, VALUE
*argv
, VALUE io
)
64 a
->fd
= my_fileno(io
);
65 rb_scan_args(argc
, argv
, "11", &length
, &a
->buf
);
66 a
->len
= NUM2LONG(length
);
68 a
->buf
= rb_str_new(NULL
, a
->len
);
71 rb_str_modify(a
->buf
);
72 rb_str_resize(a
->buf
, a
->len
);
74 a
->ptr
= RSTRING_PTR(a
->buf
);
77 static int read_check(struct io_args
*a
, long n
, const char *msg
, int io_wait
)
81 a
->fd
= my_fileno(a
->io
);
84 rb_str_set_len(a
->buf
, 0);
85 if (errno
== EAGAIN
) {
87 (void)kgio_call_wait_readable(a
->io
);
89 /* buf may be modified in other thread/fiber */
90 rb_str_modify(a
->buf
);
91 rb_str_resize(a
->buf
, a
->len
);
92 a
->ptr
= RSTRING_PTR(a
->buf
);
95 a
->buf
= sym_wait_readable
;
101 rb_str_set_len(a
->buf
, n
);
107 static VALUE
my_read(int io_wait
, int argc
, VALUE
*argv
, VALUE io
)
112 prepare_read(&a
, argc
, argv
, io
);
115 set_nonblocking(a
.fd
);
117 n
= (long)read(a
.fd
, a
.ptr
, a
.len
);
118 if (read_check(&a
, n
, "read", io_wait
) != 0)
127 * io.kgio_read(maxlen) -> buffer
128 * io.kgio_read(maxlen, buffer) -> buffer
130 * Reads at most maxlen bytes from the stream socket. Returns with a
131 * newly allocated buffer, or may reuse an existing buffer if supplied.
133 * This may block and call any method defined to +kgio_wait_readable+
136 * Returns nil on EOF.
138 * This behaves like read(2) and IO#readpartial, NOT fread(3) or
139 * IO#read which possess read-in-full behavior.
141 static VALUE
kgio_read(int argc
, VALUE
*argv
, VALUE io
)
143 return my_read(1, argc
, argv
, io
);
147 * Same as Kgio::PipeMethods#kgio_read, except EOFError is raised
148 * on EOF without a backtrace. This method is intended as a
149 * drop-in replacement for places where IO#readpartial is used, and
150 * may be aliased as such.
152 static VALUE
kgio_read_bang(int argc
, VALUE
*argv
, VALUE io
)
154 VALUE rv
= my_read(1, argc
, argv
, io
);
156 if (NIL_P(rv
)) my_eof_error();
163 * io.kgio_tryread(maxlen) -> buffer
164 * io.kgio_tryread(maxlen, buffer) -> buffer
166 * Reads at most maxlen bytes from the stream socket. Returns with a
167 * newly allocated buffer, or may reuse an existing buffer if supplied.
169 * Returns nil on EOF.
171 * Returns :wait_readable if EAGAIN is encountered.
173 static VALUE
kgio_tryread(int argc
, VALUE
*argv
, VALUE io
)
175 return my_read(0, argc
, argv
, io
);
178 #ifdef USE_MSG_DONTWAIT
179 static VALUE
my_recv(int io_wait
, int argc
, VALUE
*argv
, VALUE io
)
184 prepare_read(&a
, argc
, argv
, io
);
185 kgio_autopush_recv(io
);
189 n
= (long)recv(a
.fd
, a
.ptr
, a
.len
, MSG_DONTWAIT
);
190 if (read_check(&a
, n
, "recv", io_wait
) != 0)
197 * This method may be optimized on some systems (e.g. GNU/Linux) to use
198 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
199 * Otherwise this is the same as Kgio::PipeMethods#kgio_read
201 static VALUE
kgio_recv(int argc
, VALUE
*argv
, VALUE io
)
203 return my_recv(1, argc
, argv
, io
);
207 * Same as Kgio::SocketMethods#kgio_read, except EOFError is raised
208 * on EOF without a backtrace
210 static VALUE
kgio_recv_bang(int argc
, VALUE
*argv
, VALUE io
)
212 VALUE rv
= my_recv(1, argc
, argv
, io
);
214 if (NIL_P(rv
)) my_eof_error();
219 * This method may be optimized on some systems (e.g. GNU/Linux) to use
220 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
221 * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread
223 static VALUE
kgio_tryrecv(int argc
, VALUE
*argv
, VALUE io
)
225 return my_recv(0, argc
, argv
, io
);
227 #else /* ! USE_MSG_DONTWAIT */
228 # define kgio_recv kgio_read
229 # define kgio_recv_bang kgio_read_bang
230 # define kgio_tryrecv kgio_tryread
231 #endif /* USE_MSG_DONTWAIT */
233 static VALUE
my_peek(int io_wait
, int argc
, VALUE
*argv
, VALUE io
)
238 prepare_read(&a
, argc
, argv
, io
);
239 kgio_autopush_recv(io
);
242 if (peek_flags
== MSG_PEEK
)
243 set_nonblocking(a
.fd
);
245 n
= (long)recv(a
.fd
, a
.ptr
, a
.len
, peek_flags
);
246 if (read_check(&a
, n
, "recv(MSG_PEEK)", io_wait
) != 0)
255 * socket.kgio_trypeek(maxlen) -> buffer
256 * socket.kgio_trypeek(maxlen, buffer) -> buffer
258 * Like kgio_tryread, except it uses MSG_PEEK so it does not drain the
259 * socket buffer. A subsequent read of any type (including another peek)
260 * will return the same data.
262 static VALUE
kgio_trypeek(int argc
, VALUE
*argv
, VALUE io
)
264 return my_peek(0, argc
, argv
, io
);
270 * socket.kgio_peek(maxlen) -> buffer
271 * socket.kgio_peek(maxlen, buffer) -> buffer
273 * Like kgio_read, except it uses MSG_PEEK so it does not drain the
274 * socket buffer. A subsequent read of any type (including another peek)
275 * will return the same data.
277 static VALUE
kgio_peek(int argc
, VALUE
*argv
, VALUE io
)
279 return my_peek(1, argc
, argv
, io
);
285 * Kgio.trypeek(socket, maxlen) -> buffer
286 * Kgio.trypeek(socket, maxlen, buffer) -> buffer
288 * Like Kgio.tryread, except it uses MSG_PEEK so it does not drain the
289 * socket buffer. This can only be used on sockets and not pipe objects.
290 * Maybe used in place of SocketMethods#kgio_trypeek for non-Kgio objects
292 static VALUE
s_trypeek(int argc
, VALUE
*argv
, VALUE mod
)
295 rb_raise(rb_eArgError
, "wrong number of arguments");
296 return my_peek(0, argc
- 1, &argv
[1], argv
[0]);
299 static void prepare_write(struct io_args
*a
, VALUE io
, VALUE str
)
301 a
->buf
= (TYPE(str
) == T_STRING
) ? str
: rb_obj_as_string(str
);
302 a
->ptr
= RSTRING_PTR(a
->buf
);
303 a
->len
= RSTRING_LEN(a
->buf
);
305 a
->fd
= my_fileno(io
);
308 static int write_check(struct io_args
*a
, long n
, const char *msg
, int io_wait
)
313 } else if (n
== -1) {
314 if (errno
== EINTR
) {
315 a
->fd
= my_fileno(a
->io
);
318 if (errno
== EAGAIN
) {
319 long written
= RSTRING_LEN(a
->buf
) - a
->len
;
322 (void)kgio_call_wait_writable(a
->io
);
324 /* buf may be modified in other thread/fiber */
325 a
->len
= RSTRING_LEN(a
->buf
) - written
;
328 a
->ptr
= RSTRING_PTR(a
->buf
) + written
;
330 } else if (written
> 0) {
331 a
->buf
= rb_str_new(a
->ptr
, a
->len
);
333 a
->buf
= sym_wait_writable
;
339 assert(n
>= 0 && n
< a
->len
&& "write/send syscall broken?");
347 static VALUE
my_write(VALUE io
, VALUE str
, int io_wait
)
352 prepare_write(&a
, io
, str
);
353 set_nonblocking(a
.fd
);
355 n
= (long)write(a
.fd
, a
.ptr
, a
.len
);
356 if (write_check(&a
, n
, "write", io_wait
) != 0)
364 * io.kgio_write(str) -> nil
366 * Returns nil when the write completes.
368 * This may block and call any method defined to +kgio_wait_writable+
371 static VALUE
kgio_write(VALUE io
, VALUE str
)
373 return my_write(io
, str
, 1);
379 * io.kgio_trywrite(str) -> nil, String or :wait_writable
381 * Returns nil if the write was completed in full.
383 * Returns a String containing the unwritten portion if EAGAIN
384 * was encountered, but some portion was successfully written.
386 * Returns :wait_writable if EAGAIN is encountered and nothing
389 static VALUE
kgio_trywrite(VALUE io
, VALUE str
)
391 return my_write(io
, str
, 0);
394 #ifdef USE_MSG_DONTWAIT
396 * This method behaves like Kgio::PipeMethods#kgio_write, except
397 * it will use send(2) with the MSG_DONTWAIT flag on sockets to
398 * avoid unnecessary calls to fcntl(2).
400 static VALUE
my_send(VALUE io
, VALUE str
, int io_wait
)
405 prepare_write(&a
, io
, str
);
407 n
= (long)send(a
.fd
, a
.ptr
, a
.len
, MSG_DONTWAIT
);
408 if (write_check(&a
, n
, "send", io_wait
) != 0)
410 if (TYPE(a
.buf
) != T_SYMBOL
)
411 kgio_autopush_send(io
);
416 * This method may be optimized on some systems (e.g. GNU/Linux) to use
417 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
418 * Otherwise this is the same as Kgio::PipeMethods#kgio_write
420 static VALUE
kgio_send(VALUE io
, VALUE str
)
422 return my_send(io
, str
, 1);
426 * This method may be optimized on some systems (e.g. GNU/Linux) to use
427 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
428 * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite
430 static VALUE
kgio_trysend(VALUE io
, VALUE str
)
432 return my_send(io
, str
, 0);
434 #else /* ! USE_MSG_DONTWAIT */
435 # define kgio_send kgio_write
436 # define kgio_trysend kgio_trywrite
437 #endif /* ! USE_MSG_DONTWAIT */
442 * Kgio.tryread(io, maxlen) -> buffer
443 * Kgio.tryread(io, maxlen, buffer) -> buffer
445 * Returns nil on EOF.
446 * Returns :wait_readable if EAGAIN is encountered.
448 * Maybe used in place of PipeMethods#kgio_tryread for non-Kgio objects
450 static VALUE
s_tryread(int argc
, VALUE
*argv
, VALUE mod
)
453 rb_raise(rb_eArgError
, "wrong number of arguments");
454 return my_read(0, argc
- 1, &argv
[1], argv
[0]);
460 * Kgio.trywrite(io, str) -> nil, String or :wait_writable
462 * Returns nil if the write was completed in full.
464 * Returns a String containing the unwritten portion if EAGAIN
465 * was encountered, but some portion was successfully written.
467 * Returns :wait_writable if EAGAIN is encountered and nothing
470 * Maybe used in place of PipeMethods#kgio_trywrite for non-Kgio objects
472 static VALUE
s_trywrite(VALUE mod
, VALUE io
, VALUE str
)
474 return my_write(io
, str
, 0);
477 void init_kgio_read_write(void)
479 VALUE mPipeMethods
, mSocketMethods
;
480 VALUE mKgio
= rb_define_module("Kgio");
481 VALUE mWaiters
= rb_const_get(mKgio
, rb_intern("DefaultWaiters"));
483 sym_wait_readable
= ID2SYM(rb_intern("wait_readable"));
484 sym_wait_writable
= ID2SYM(rb_intern("wait_writable"));
486 rb_define_singleton_method(mKgio
, "tryread", s_tryread
, -1);
487 rb_define_singleton_method(mKgio
, "trywrite", s_trywrite
, 2);
488 rb_define_singleton_method(mKgio
, "trypeek", s_trypeek
, -1);
491 * Document-module: Kgio::PipeMethods
493 * This module may be used used to create classes that respond to
494 * various Kgio methods for reading and writing. This is included
495 * in Kgio::Pipe by default.
497 mPipeMethods
= rb_define_module_under(mKgio
, "PipeMethods");
498 rb_define_method(mPipeMethods
, "kgio_read", kgio_read
, -1);
499 rb_define_method(mPipeMethods
, "kgio_read!", kgio_read_bang
, -1);
500 rb_define_method(mPipeMethods
, "kgio_write", kgio_write
, 1);
501 rb_define_method(mPipeMethods
, "kgio_tryread", kgio_tryread
, -1);
502 rb_define_method(mPipeMethods
, "kgio_trywrite", kgio_trywrite
, 1);
505 * Document-module: Kgio::SocketMethods
507 * This method behaves like Kgio::PipeMethods, but contains
508 * optimizations for sockets on certain operating systems
511 mSocketMethods
= rb_define_module_under(mKgio
, "SocketMethods");
512 rb_define_method(mSocketMethods
, "kgio_read", kgio_recv
, -1);
513 rb_define_method(mSocketMethods
, "kgio_read!", kgio_recv_bang
, -1);
514 rb_define_method(mSocketMethods
, "kgio_write", kgio_send
, 1);
515 rb_define_method(mSocketMethods
, "kgio_tryread", kgio_tryrecv
, -1);
516 rb_define_method(mSocketMethods
, "kgio_trywrite", kgio_trysend
, 1);
517 rb_define_method(mSocketMethods
, "kgio_trypeek", kgio_trypeek
, -1);
518 rb_define_method(mSocketMethods
, "kgio_peek", kgio_peek
, -1);
521 * Returns the client IPv4 address of the socket in dotted quad
522 * form as a string. This is always the value of the
523 * Kgio::LOCALHOST constant for UNIX domain sockets.
525 rb_define_attr(mSocketMethods
, "kgio_addr", 1, 1);
526 id_set_backtrace
= rb_intern("set_backtrace");
527 eErrno_EPIPE
= rb_const_get(rb_mErrno
, rb_intern("EPIPE"));
528 eErrno_ECONNRESET
= rb_const_get(rb_mErrno
, rb_intern("ECONNRESET"));
529 rb_include_module(mPipeMethods
, mWaiters
);
530 rb_include_module(mSocketMethods
, mWaiters
);