add support for recv() with MSG_PEEK
[kgio.git] / ext / kgio / read_write.c
blob54e5c82d7f80924f7a5248304483029861625483
1 #include "kgio.h"
2 static VALUE sym_wait_readable, sym_wait_writable;
3 static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
4 static ID id_set_backtrace;
6 /*
7 * we know MSG_DONTWAIT works properly on all stream sockets under Linux
8 * we can define this macro for other platforms as people care and
9 * notice.
11 #if defined(__linux__) && ! defined(USE_MSG_DONTWAIT)
12 # define USE_MSG_DONTWAIT
13 static const int peek_flags = MSG_DONTWAIT|MSG_PEEK;
14 #else
15 static const int peek_flags = MSG_PEEK;
16 #endif
18 NORETURN(static void raise_empty_bt(VALUE, const char *));
19 NORETURN(static void my_eof_error(void));
20 NORETURN(static void wr_sys_fail(const char *));
21 NORETURN(static void rd_sys_fail(const char *));
23 static void raise_empty_bt(VALUE err, const char *msg)
25 VALUE exc = rb_exc_new2(err, msg);
26 VALUE bt = rb_ary_new();
28 rb_funcall(exc, id_set_backtrace, 1, bt);
29 rb_exc_raise(exc);
32 static void my_eof_error(void)
34 raise_empty_bt(rb_eEOFError, "end of file reached");
37 static void wr_sys_fail(const char *msg)
39 switch (errno) {
40 case EPIPE:
41 errno = 0;
42 raise_empty_bt(eErrno_EPIPE, msg);
43 case ECONNRESET:
44 errno = 0;
45 raise_empty_bt(eErrno_ECONNRESET, msg);
47 rb_sys_fail(msg);
50 static void rd_sys_fail(const char *msg)
52 if (errno == ECONNRESET) {
53 errno = 0;
54 raise_empty_bt(eErrno_ECONNRESET, msg);
56 rb_sys_fail(msg);
59 static void prepare_read(struct io_args *a, int argc, VALUE *argv, VALUE io)
61 VALUE length;
63 a->io = io;
64 a->fd = my_fileno(io);
65 rb_scan_args(argc, argv, "11", &length, &a->buf);
66 a->len = NUM2LONG(length);
67 if (NIL_P(a->buf)) {
68 a->buf = rb_str_new(NULL, a->len);
69 } else {
70 StringValue(a->buf);
71 rb_str_resize(a->buf, a->len);
73 a->ptr = RSTRING_PTR(a->buf);
76 static int read_check(struct io_args *a, long n, const char *msg, int io_wait)
78 if (n == -1) {
79 if (errno == EINTR)
80 return -1;
81 rb_str_set_len(a->buf, 0);
82 if (errno == EAGAIN) {
83 if (io_wait) {
84 (void)kgio_call_wait_readable(a->io);
86 /* buf may be modified in other thread/fiber */
87 rb_str_resize(a->buf, a->len);
88 a->ptr = RSTRING_PTR(a->buf);
89 return -1;
90 } else {
91 a->buf = sym_wait_readable;
92 return 0;
95 rd_sys_fail(msg);
97 rb_str_set_len(a->buf, n);
98 if (n == 0)
99 a->buf = Qnil;
100 return 0;
103 static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io)
105 struct io_args a;
106 long n;
108 prepare_read(&a, argc, argv, io);
110 if (a.len > 0) {
111 set_nonblocking(a.fd);
112 retry:
113 n = (long)read(a.fd, a.ptr, a.len);
114 if (read_check(&a, n, "read", io_wait) != 0)
115 goto retry;
117 return a.buf;
121 * call-seq:
123 * io.kgio_read(maxlen) -> buffer
124 * io.kgio_read(maxlen, buffer) -> buffer
126 * Reads at most maxlen bytes from the stream socket. Returns with a
127 * newly allocated buffer, or may reuse an existing buffer if supplied.
129 * Calls whatever is is defined to be the kgio_wait_readable method
130 * for the class.
132 * Returns nil on EOF.
134 * This behaves like read(2) and IO#readpartial, NOT fread(3) or
135 * IO#read which possess read-in-full behavior.
137 static VALUE kgio_read(int argc, VALUE *argv, VALUE io)
139 return my_read(1, argc, argv, io);
143 * Same as Kgio::PipeMethods#kgio_read, except EOFError is raised
144 * on EOF without a backtrace. This method is intended as a
145 * drop-in replacement for places where IO#readpartial is used, and
146 * may be aliased as such.
148 static VALUE kgio_read_bang(int argc, VALUE *argv, VALUE io)
150 VALUE rv = my_read(1, argc, argv, io);
152 if (NIL_P(rv)) my_eof_error();
153 return rv;
157 * call-seq:
159 * io.kgio_tryread(maxlen) -> buffer
160 * io.kgio_tryread(maxlen, buffer) -> buffer
162 * Reads at most maxlen bytes from the stream socket. Returns with a
163 * newly allocated buffer, or may reuse an existing buffer if supplied.
165 * Returns nil on EOF.
167 * Returns :wait_readable if EAGAIN is encountered.
169 static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io)
171 return my_read(0, argc, argv, io);
174 #ifdef USE_MSG_DONTWAIT
175 static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io)
177 struct io_args a;
178 long n;
180 prepare_read(&a, argc, argv, io);
181 kgio_autopush_recv(io);
183 if (a.len > 0) {
184 retry:
185 n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT);
186 if (read_check(&a, n, "recv", io_wait) != 0)
187 goto retry;
189 return a.buf;
193 * This method may be optimized on some systems (e.g. GNU/Linux) to use
194 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
195 * Otherwise this is the same as Kgio::PipeMethods#kgio_read
197 static VALUE kgio_recv(int argc, VALUE *argv, VALUE io)
199 return my_recv(1, argc, argv, io);
203 * Same as Kgio::SocketMethods#kgio_read, except EOFError is raised
204 * on EOF without a backtrace
206 static VALUE kgio_recv_bang(int argc, VALUE *argv, VALUE io)
208 VALUE rv = my_recv(1, argc, argv, io);
210 if (NIL_P(rv)) my_eof_error();
211 return rv;
215 * This method may be optimized on some systems (e.g. GNU/Linux) to use
216 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
217 * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread
219 static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io)
221 return my_recv(0, argc, argv, io);
223 #else /* ! USE_MSG_DONTWAIT */
224 # define kgio_recv kgio_read
225 # define kgio_recv_bang kgio_read_bang
226 # define kgio_tryrecv kgio_tryread
227 #endif /* USE_MSG_DONTWAIT */
229 static VALUE my_peek(int io_wait, int argc, VALUE *argv, VALUE io)
231 struct io_args a;
232 long n;
234 prepare_read(&a, argc, argv, io);
235 kgio_autopush_recv(io);
237 if (a.len > 0) {
238 if (peek_flags == MSG_PEEK)
239 set_nonblocking(a.fd);
240 retry:
241 n = (long)recv(a.fd, a.ptr, a.len, peek_flags);
242 if (read_check(&a, n, "recv(MSG_PEEK)", io_wait) != 0)
243 goto retry;
245 return a.buf;
249 * call-seq:
251 * socket.kgio_trypeek(maxlen) -> buffer
252 * socket.kgio_trypeek(maxlen, buffer) -> buffer
254 * Like kgio_tryread, except it uses MSG_PEEK so it does not drain the
255 * socket buffer. A subsequent read of any type (including another peek)
256 * will return the same data.
258 static VALUE kgio_trypeek(int argc, VALUE *argv, VALUE io)
260 return my_peek(0, argc, argv, io);
264 * call-seq:
266 * socket.kgio_peek(maxlen) -> buffer
267 * socket.kgio_peek(maxlen, buffer) -> buffer
269 * Like kgio_read, except it uses MSG_PEEK so it does not drain the
270 * socket buffer. A subsequent read of any type (including another peek)
271 * will return the same data.
273 static VALUE kgio_peek(int argc, VALUE *argv, VALUE io)
275 return my_peek(1, argc, argv, io);
279 * call-seq:
281 * Kgio.trypeek(socket, maxlen) -> buffer
282 * Kgio.trypeek(socket, maxlen, buffer) -> buffer
284 * Like Kgio.tryread, except it uses MSG_PEEK so it does not drain the
285 * socket buffer. This can only be used on sockets and not pipe objects.
286 * Maybe used in place of SocketMethods#kgio_trypeek for non-Kgio objects
288 static VALUE s_trypeek(int argc, VALUE *argv, VALUE mod)
290 if (argc <= 1)
291 rb_raise(rb_eArgError, "wrong number of arguments");
292 return my_peek(0, argc - 1, &argv[1], argv[0]);
295 static void prepare_write(struct io_args *a, VALUE io, VALUE str)
297 a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str);
298 a->ptr = RSTRING_PTR(a->buf);
299 a->len = RSTRING_LEN(a->buf);
300 a->io = io;
301 a->fd = my_fileno(io);
304 static int write_check(struct io_args *a, long n, const char *msg, int io_wait)
306 if (a->len == n) {
307 done:
308 a->buf = Qnil;
309 } else if (n == -1) {
310 if (errno == EINTR)
311 return -1;
312 if (errno == EAGAIN) {
313 long written = RSTRING_LEN(a->buf) - a->len;
315 if (io_wait) {
316 (void)kgio_call_wait_writable(a->io);
318 /* buf may be modified in other thread/fiber */
319 a->len = RSTRING_LEN(a->buf) - written;
320 if (a->len <= 0)
321 goto done;
322 a->ptr = RSTRING_PTR(a->buf) + written;
323 return -1;
324 } else if (written > 0) {
325 a->buf = rb_str_new(a->ptr, a->len);
326 } else {
327 a->buf = sym_wait_writable;
329 return 0;
331 wr_sys_fail(msg);
332 } else {
333 assert(n >= 0 && n < a->len && "write/send syscall broken?");
334 a->ptr += n;
335 a->len -= n;
336 return -1;
338 return 0;
341 static VALUE my_write(VALUE io, VALUE str, int io_wait)
343 struct io_args a;
344 long n;
346 prepare_write(&a, io, str);
347 set_nonblocking(a.fd);
348 retry:
349 n = (long)write(a.fd, a.ptr, a.len);
350 if (write_check(&a, n, "write", io_wait) != 0)
351 goto retry;
352 return a.buf;
356 * call-seq:
358 * io.kgio_write(str) -> nil
360 * Returns nil when the write completes.
362 * Calls whatever is is defined to be the kgio_wait_writable method
363 * for the class.
365 static VALUE kgio_write(VALUE io, VALUE str)
367 return my_write(io, str, 1);
371 * call-seq:
373 * io.kgio_trywrite(str) -> nil or :wait_writable
375 * Returns nil if the write was completed in full.
377 * Returns a String containing the unwritten portion if EAGAIN
378 * was encountered, but some portion was successfully written.
380 * Returns :wait_writable if EAGAIN is encountered and nothing
381 * was written.
383 static VALUE kgio_trywrite(VALUE io, VALUE str)
385 return my_write(io, str, 0);
388 #ifdef USE_MSG_DONTWAIT
390 * This method behaves like Kgio::PipeMethods#kgio_write, except
391 * it will use send(2) with the MSG_DONTWAIT flag on sockets to
392 * avoid unnecessary calls to fcntl(2).
394 static VALUE my_send(VALUE io, VALUE str, int io_wait)
396 struct io_args a;
397 long n;
399 prepare_write(&a, io, str);
400 retry:
401 n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
402 if (write_check(&a, n, "send", io_wait) != 0)
403 goto retry;
404 if (TYPE(a.buf) != T_SYMBOL)
405 kgio_autopush_send(io);
406 return a.buf;
410 * This method may be optimized on some systems (e.g. GNU/Linux) to use
411 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
412 * Otherwise this is the same as Kgio::PipeMethods#kgio_write
414 static VALUE kgio_send(VALUE io, VALUE str)
416 return my_send(io, str, 1);
420 * This method may be optimized on some systems (e.g. GNU/Linux) to use
421 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
422 * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite
424 static VALUE kgio_trysend(VALUE io, VALUE str)
426 return my_send(io, str, 0);
428 #else /* ! USE_MSG_DONTWAIT */
429 # define kgio_send kgio_write
430 # define kgio_trysend kgio_trywrite
431 #endif /* ! USE_MSG_DONTWAIT */
434 * call-seq:
436 * Kgio.tryread(io, maxlen) -> buffer
437 * Kgio.tryread(io, maxlen, buffer) -> buffer
439 * Returns nil on EOF.
440 * Returns :wait_readable if EAGAIN is encountered.
442 * Maybe used in place of PipeMethods#kgio_tryread for non-Kgio objects
444 static VALUE s_tryread(int argc, VALUE *argv, VALUE mod)
446 if (argc <= 1)
447 rb_raise(rb_eArgError, "wrong number of arguments");
448 return my_read(0, argc - 1, &argv[1], argv[0]);
452 * call-seq:
454 * Kgio.trywrite(io, str) -> nil or :wait_writable
456 * Returns nil if the write was completed in full.
458 * Returns a String containing the unwritten portion if EAGAIN
459 * was encountered, but some portion was successfully written.
461 * Returns :wait_writable if EAGAIN is encountered and nothing
462 * was written.
464 * Maybe used in place of PipeMethods#kgio_trywrite for non-Kgio objects
466 static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str)
468 return my_write(io, str, 0);
471 void init_kgio_read_write(void)
473 VALUE mPipeMethods, mSocketMethods;
474 VALUE mKgio = rb_define_module("Kgio");
475 VALUE mWaiters = rb_const_get(mKgio, rb_intern("DefaultWaiters"));
477 sym_wait_readable = ID2SYM(rb_intern("wait_readable"));
478 sym_wait_writable = ID2SYM(rb_intern("wait_writable"));
480 rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
481 rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
482 rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
485 * Document-module: Kgio::PipeMethods
487 * This module may be used used to create classes that respond to
488 * various Kgio methods for reading and writing. This is included
489 * in Kgio::Pipe by default.
491 mPipeMethods = rb_define_module_under(mKgio, "PipeMethods");
492 rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
493 rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
494 rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
495 rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
496 rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
499 * Document-module: Kgio::SocketMethods
501 * This method behaves like Kgio::PipeMethods, but contains
502 * optimizations for sockets on certain operating systems
503 * (e.g. GNU/Linux).
505 mSocketMethods = rb_define_module_under(mKgio, "SocketMethods");
506 rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
507 rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
508 rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
509 rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
510 rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
511 rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
512 rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
515 * Returns the client IPv4 address of the socket in dotted quad
516 * form as a string. This is always the value of the
517 * Kgio::LOCALHOST constant for UNIX domain sockets.
519 rb_define_attr(mSocketMethods, "kgio_addr", 1, 1);
520 id_set_backtrace = rb_intern("set_backtrace");
521 eErrno_EPIPE = rb_const_get(rb_mErrno, rb_intern("EPIPE"));
522 eErrno_ECONNRESET = rb_const_get(rb_mErrno, rb_intern("ECONNRESET"));
523 rb_include_module(mPipeMethods, mWaiters);
524 rb_include_module(mSocketMethods, mWaiters);