make timed kgio_wait_* implementation safer
[kgio.git] / ext / kgio / read_write.c
blob7df70704a1bef99fde3e632d0b22452feb236e36
1 #include "kgio.h"
2 static VALUE sym_wait_readable, sym_wait_writable;
3 static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
4 static ID id_set_backtrace;
6 /*
7 * we know MSG_DONTWAIT works properly on all stream sockets under Linux
8 * we can define this macro for other platforms as people care and
9 * notice.
11 #if defined(__linux__) && ! defined(USE_MSG_DONTWAIT)
12 # define USE_MSG_DONTWAIT
13 static const int peek_flags = MSG_DONTWAIT|MSG_PEEK;
14 #else
15 static const int peek_flags = MSG_PEEK;
16 #endif
18 NORETURN(static void raise_empty_bt(VALUE, const char *));
19 NORETURN(static void my_eof_error(void));
20 NORETURN(static void wr_sys_fail(const char *));
21 NORETURN(static void rd_sys_fail(const char *));
23 static void raise_empty_bt(VALUE err, const char *msg)
25 VALUE exc = rb_exc_new2(err, msg);
26 VALUE bt = rb_ary_new();
28 rb_funcall(exc, id_set_backtrace, 1, bt);
29 rb_exc_raise(exc);
32 static void my_eof_error(void)
34 raise_empty_bt(rb_eEOFError, "end of file reached");
37 static void wr_sys_fail(const char *msg)
39 switch (errno) {
40 case EPIPE:
41 errno = 0;
42 raise_empty_bt(eErrno_EPIPE, msg);
43 case ECONNRESET:
44 errno = 0;
45 raise_empty_bt(eErrno_ECONNRESET, msg);
47 rb_sys_fail(msg);
50 static void rd_sys_fail(const char *msg)
52 if (errno == ECONNRESET) {
53 errno = 0;
54 raise_empty_bt(eErrno_ECONNRESET, msg);
56 rb_sys_fail(msg);
59 static void prepare_read(struct io_args *a, int argc, VALUE *argv, VALUE io)
61 VALUE length;
63 a->io = io;
64 a->fd = my_fileno(io);
65 rb_scan_args(argc, argv, "11", &length, &a->buf);
66 a->len = NUM2LONG(length);
67 if (NIL_P(a->buf)) {
68 a->buf = rb_str_new(NULL, a->len);
69 } else {
70 StringValue(a->buf);
71 rb_str_modify(a->buf);
72 rb_str_resize(a->buf, a->len);
74 a->ptr = RSTRING_PTR(a->buf);
77 static int read_check(struct io_args *a, long n, const char *msg, int io_wait)
79 if (n == -1) {
80 if (errno == EINTR) {
81 a->fd = my_fileno(a->io);
82 return -1;
84 rb_str_set_len(a->buf, 0);
85 if (errno == EAGAIN) {
86 if (io_wait) {
87 (void)kgio_call_wait_readable(a->io);
89 /* buf may be modified in other thread/fiber */
90 rb_str_modify(a->buf);
91 rb_str_resize(a->buf, a->len);
92 a->ptr = RSTRING_PTR(a->buf);
93 return -1;
94 } else {
95 a->buf = sym_wait_readable;
96 return 0;
99 rd_sys_fail(msg);
101 rb_str_set_len(a->buf, n);
102 if (n == 0)
103 a->buf = Qnil;
104 return 0;
107 static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io)
109 struct io_args a;
110 long n;
112 prepare_read(&a, argc, argv, io);
114 if (a.len > 0) {
115 set_nonblocking(a.fd);
116 retry:
117 n = (long)read(a.fd, a.ptr, a.len);
118 if (read_check(&a, n, "read", io_wait) != 0)
119 goto retry;
121 return a.buf;
125 * call-seq:
127 * io.kgio_read(maxlen) -> buffer
128 * io.kgio_read(maxlen, buffer) -> buffer
130 * Reads at most maxlen bytes from the stream socket. Returns with a
131 * newly allocated buffer, or may reuse an existing buffer if supplied.
133 * This may block and call any method defined to +kgio_wait_readable+
134 * for the class.
136 * Returns nil on EOF.
138 * This behaves like read(2) and IO#readpartial, NOT fread(3) or
139 * IO#read which possess read-in-full behavior.
141 static VALUE kgio_read(int argc, VALUE *argv, VALUE io)
143 return my_read(1, argc, argv, io);
147 * Same as Kgio::PipeMethods#kgio_read, except EOFError is raised
148 * on EOF without a backtrace. This method is intended as a
149 * drop-in replacement for places where IO#readpartial is used, and
150 * may be aliased as such.
152 static VALUE kgio_read_bang(int argc, VALUE *argv, VALUE io)
154 VALUE rv = my_read(1, argc, argv, io);
156 if (NIL_P(rv)) my_eof_error();
157 return rv;
161 * call-seq:
163 * io.kgio_tryread(maxlen) -> buffer
164 * io.kgio_tryread(maxlen, buffer) -> buffer
166 * Reads at most maxlen bytes from the stream socket. Returns with a
167 * newly allocated buffer, or may reuse an existing buffer if supplied.
169 * Returns nil on EOF.
171 * Returns :wait_readable if EAGAIN is encountered.
173 static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io)
175 return my_read(0, argc, argv, io);
178 #ifdef USE_MSG_DONTWAIT
179 static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io)
181 struct io_args a;
182 long n;
184 prepare_read(&a, argc, argv, io);
185 kgio_autopush_recv(io);
187 if (a.len > 0) {
188 retry:
189 n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT);
190 if (read_check(&a, n, "recv", io_wait) != 0)
191 goto retry;
193 return a.buf;
197 * This method may be optimized on some systems (e.g. GNU/Linux) to use
198 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
199 * Otherwise this is the same as Kgio::PipeMethods#kgio_read
201 static VALUE kgio_recv(int argc, VALUE *argv, VALUE io)
203 return my_recv(1, argc, argv, io);
207 * Same as Kgio::SocketMethods#kgio_read, except EOFError is raised
208 * on EOF without a backtrace
210 static VALUE kgio_recv_bang(int argc, VALUE *argv, VALUE io)
212 VALUE rv = my_recv(1, argc, argv, io);
214 if (NIL_P(rv)) my_eof_error();
215 return rv;
219 * This method may be optimized on some systems (e.g. GNU/Linux) to use
220 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
221 * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread
223 static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io)
225 return my_recv(0, argc, argv, io);
227 #else /* ! USE_MSG_DONTWAIT */
228 # define kgio_recv kgio_read
229 # define kgio_recv_bang kgio_read_bang
230 # define kgio_tryrecv kgio_tryread
231 #endif /* USE_MSG_DONTWAIT */
233 static VALUE my_peek(int io_wait, int argc, VALUE *argv, VALUE io)
235 struct io_args a;
236 long n;
238 prepare_read(&a, argc, argv, io);
239 kgio_autopush_recv(io);
241 if (a.len > 0) {
242 if (peek_flags == MSG_PEEK)
243 set_nonblocking(a.fd);
244 retry:
245 n = (long)recv(a.fd, a.ptr, a.len, peek_flags);
246 if (read_check(&a, n, "recv(MSG_PEEK)", io_wait) != 0)
247 goto retry;
249 return a.buf;
253 * call-seq:
255 * socket.kgio_trypeek(maxlen) -> buffer
256 * socket.kgio_trypeek(maxlen, buffer) -> buffer
258 * Like kgio_tryread, except it uses MSG_PEEK so it does not drain the
259 * socket buffer. A subsequent read of any type (including another peek)
260 * will return the same data.
262 static VALUE kgio_trypeek(int argc, VALUE *argv, VALUE io)
264 return my_peek(0, argc, argv, io);
268 * call-seq:
270 * socket.kgio_peek(maxlen) -> buffer
271 * socket.kgio_peek(maxlen, buffer) -> buffer
273 * Like kgio_read, except it uses MSG_PEEK so it does not drain the
274 * socket buffer. A subsequent read of any type (including another peek)
275 * will return the same data.
277 static VALUE kgio_peek(int argc, VALUE *argv, VALUE io)
279 return my_peek(1, argc, argv, io);
283 * call-seq:
285 * Kgio.trypeek(socket, maxlen) -> buffer
286 * Kgio.trypeek(socket, maxlen, buffer) -> buffer
288 * Like Kgio.tryread, except it uses MSG_PEEK so it does not drain the
289 * socket buffer. This can only be used on sockets and not pipe objects.
290 * Maybe used in place of SocketMethods#kgio_trypeek for non-Kgio objects
292 static VALUE s_trypeek(int argc, VALUE *argv, VALUE mod)
294 if (argc <= 1)
295 rb_raise(rb_eArgError, "wrong number of arguments");
296 return my_peek(0, argc - 1, &argv[1], argv[0]);
299 static void prepare_write(struct io_args *a, VALUE io, VALUE str)
301 a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str);
302 a->ptr = RSTRING_PTR(a->buf);
303 a->len = RSTRING_LEN(a->buf);
304 a->io = io;
305 a->fd = my_fileno(io);
308 static int write_check(struct io_args *a, long n, const char *msg, int io_wait)
310 if (a->len == n) {
311 done:
312 a->buf = Qnil;
313 } else if (n == -1) {
314 if (errno == EINTR) {
315 a->fd = my_fileno(a->io);
316 return -1;
318 if (errno == EAGAIN) {
319 long written = RSTRING_LEN(a->buf) - a->len;
321 if (io_wait) {
322 (void)kgio_call_wait_writable(a->io);
324 /* buf may be modified in other thread/fiber */
325 a->len = RSTRING_LEN(a->buf) - written;
326 if (a->len <= 0)
327 goto done;
328 a->ptr = RSTRING_PTR(a->buf) + written;
329 return -1;
330 } else if (written > 0) {
331 a->buf = rb_str_new(a->ptr, a->len);
332 } else {
333 a->buf = sym_wait_writable;
335 return 0;
337 wr_sys_fail(msg);
338 } else {
339 assert(n >= 0 && n < a->len && "write/send syscall broken?");
340 a->ptr += n;
341 a->len -= n;
342 return -1;
344 return 0;
347 static VALUE my_write(VALUE io, VALUE str, int io_wait)
349 struct io_args a;
350 long n;
352 prepare_write(&a, io, str);
353 set_nonblocking(a.fd);
354 retry:
355 n = (long)write(a.fd, a.ptr, a.len);
356 if (write_check(&a, n, "write", io_wait) != 0)
357 goto retry;
358 return a.buf;
362 * call-seq:
364 * io.kgio_write(str) -> nil
366 * Returns nil when the write completes.
368 * This may block and call any method defined to +kgio_wait_writable+
369 * for the class.
371 static VALUE kgio_write(VALUE io, VALUE str)
373 return my_write(io, str, 1);
377 * call-seq:
379 * io.kgio_trywrite(str) -> nil, String or :wait_writable
381 * Returns nil if the write was completed in full.
383 * Returns a String containing the unwritten portion if EAGAIN
384 * was encountered, but some portion was successfully written.
386 * Returns :wait_writable if EAGAIN is encountered and nothing
387 * was written.
389 static VALUE kgio_trywrite(VALUE io, VALUE str)
391 return my_write(io, str, 0);
394 #ifdef USE_MSG_DONTWAIT
396 * This method behaves like Kgio::PipeMethods#kgio_write, except
397 * it will use send(2) with the MSG_DONTWAIT flag on sockets to
398 * avoid unnecessary calls to fcntl(2).
400 static VALUE my_send(VALUE io, VALUE str, int io_wait)
402 struct io_args a;
403 long n;
405 prepare_write(&a, io, str);
406 retry:
407 n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
408 if (write_check(&a, n, "send", io_wait) != 0)
409 goto retry;
410 if (TYPE(a.buf) != T_SYMBOL)
411 kgio_autopush_send(io);
412 return a.buf;
416 * This method may be optimized on some systems (e.g. GNU/Linux) to use
417 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
418 * Otherwise this is the same as Kgio::PipeMethods#kgio_write
420 static VALUE kgio_send(VALUE io, VALUE str)
422 return my_send(io, str, 1);
426 * This method may be optimized on some systems (e.g. GNU/Linux) to use
427 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
428 * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite
430 static VALUE kgio_trysend(VALUE io, VALUE str)
432 return my_send(io, str, 0);
434 #else /* ! USE_MSG_DONTWAIT */
435 # define kgio_send kgio_write
436 # define kgio_trysend kgio_trywrite
437 #endif /* ! USE_MSG_DONTWAIT */
440 * call-seq:
442 * Kgio.tryread(io, maxlen) -> buffer
443 * Kgio.tryread(io, maxlen, buffer) -> buffer
445 * Returns nil on EOF.
446 * Returns :wait_readable if EAGAIN is encountered.
448 * Maybe used in place of PipeMethods#kgio_tryread for non-Kgio objects
450 static VALUE s_tryread(int argc, VALUE *argv, VALUE mod)
452 if (argc <= 1)
453 rb_raise(rb_eArgError, "wrong number of arguments");
454 return my_read(0, argc - 1, &argv[1], argv[0]);
458 * call-seq:
460 * Kgio.trywrite(io, str) -> nil, String or :wait_writable
462 * Returns nil if the write was completed in full.
464 * Returns a String containing the unwritten portion if EAGAIN
465 * was encountered, but some portion was successfully written.
467 * Returns :wait_writable if EAGAIN is encountered and nothing
468 * was written.
470 * Maybe used in place of PipeMethods#kgio_trywrite for non-Kgio objects
472 static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str)
474 return my_write(io, str, 0);
477 void init_kgio_read_write(void)
479 VALUE mPipeMethods, mSocketMethods;
480 VALUE mKgio = rb_define_module("Kgio");
481 VALUE mWaiters = rb_const_get(mKgio, rb_intern("DefaultWaiters"));
483 sym_wait_readable = ID2SYM(rb_intern("wait_readable"));
484 sym_wait_writable = ID2SYM(rb_intern("wait_writable"));
486 rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
487 rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
488 rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
491 * Document-module: Kgio::PipeMethods
493 * This module may be used used to create classes that respond to
494 * various Kgio methods for reading and writing. This is included
495 * in Kgio::Pipe by default.
497 mPipeMethods = rb_define_module_under(mKgio, "PipeMethods");
498 rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
499 rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
500 rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
501 rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
502 rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
505 * Document-module: Kgio::SocketMethods
507 * This method behaves like Kgio::PipeMethods, but contains
508 * optimizations for sockets on certain operating systems
509 * (e.g. GNU/Linux).
511 mSocketMethods = rb_define_module_under(mKgio, "SocketMethods");
512 rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
513 rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
514 rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
515 rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
516 rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
517 rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
518 rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
521 * Returns the client IPv4 address of the socket in dotted quad
522 * form as a string. This is always the value of the
523 * Kgio::LOCALHOST constant for UNIX domain sockets.
525 rb_define_attr(mSocketMethods, "kgio_addr", 1, 1);
526 id_set_backtrace = rb_intern("set_backtrace");
527 eErrno_EPIPE = rb_const_get(rb_mErrno, rb_intern("EPIPE"));
528 eErrno_ECONNRESET = rb_const_get(rb_mErrno, rb_intern("ECONNRESET"));
529 rb_include_module(mPipeMethods, mWaiters);
530 rb_include_module(mSocketMethods, mWaiters);