2 static VALUE sym_wait_readable
, sym_wait_writable
;
3 static VALUE eErrno_EPIPE
, eErrno_ECONNRESET
;
6 * we know MSG_DONTWAIT works properly on all stream sockets under Linux
7 * we can define this macro for other platforms as people care and
10 #if defined(__linux__) && ! defined(USE_MSG_DONTWAIT)
11 # define USE_MSG_DONTWAIT
14 NORETURN(static void raise_empty_bt(VALUE
, const char *));
15 NORETURN(static void my_eof_error(void));
16 NORETURN(static void wr_sys_fail(const char *));
18 static void raise_empty_bt(VALUE err
, const char *msg
)
20 VALUE exc
= rb_exc_new2(err
, msg
);
21 VALUE bt
= rb_ary_new();
23 rb_funcall(exc
, rb_intern("set_backtrace"), 1, bt
);
27 static void my_eof_error(void)
29 raise_empty_bt(rb_eEOFError
, "end of file reached");
32 static void wr_sys_fail(const char *msg
)
37 raise_empty_bt(eErrno_EPIPE
, msg
);
40 raise_empty_bt(eErrno_ECONNRESET
, msg
);
45 static void prepare_read(struct io_args
*a
, int argc
, VALUE
*argv
, VALUE io
)
50 a
->fd
= my_fileno(io
);
51 rb_scan_args(argc
, argv
, "11", &length
, &a
->buf
);
52 a
->len
= NUM2LONG(length
);
54 a
->buf
= rb_str_new(NULL
, a
->len
);
57 rb_str_resize(a
->buf
, a
->len
);
59 a
->ptr
= RSTRING_PTR(a
->buf
);
62 static int read_check(struct io_args
*a
, long n
, const char *msg
, int io_wait
)
67 rb_str_set_len(a
->buf
, 0);
68 if (errno
== EAGAIN
) {
70 (void)kgio_call_wait_readable(a
->io
);
72 /* buf may be modified in other thread/fiber */
73 rb_str_resize(a
->buf
, a
->len
);
74 a
->ptr
= RSTRING_PTR(a
->buf
);
77 a
->buf
= sym_wait_readable
;
83 rb_str_set_len(a
->buf
, n
);
89 static VALUE
my_read(int io_wait
, int argc
, VALUE
*argv
, VALUE io
)
94 prepare_read(&a
, argc
, argv
, io
);
97 set_nonblocking(a
.fd
);
99 n
= (long)read(a
.fd
, a
.ptr
, a
.len
);
100 if (read_check(&a
, n
, "read", io_wait
) != 0)
109 * io.kgio_read(maxlen) -> buffer
110 * io.kgio_read(maxlen, buffer) -> buffer
112 * Reads at most maxlen bytes from the stream socket. Returns with a
113 * newly allocated buffer, or may reuse an existing buffer if supplied.
115 * Calls whatever is is defined to be the kgio_wait_readable method
118 * Returns nil on EOF.
120 * This behaves like read(2) and IO#readpartial, NOT fread(3) or
121 * IO#read which possess read-in-full behavior.
123 static VALUE
kgio_read(int argc
, VALUE
*argv
, VALUE io
)
125 return my_read(1, argc
, argv
, io
);
129 * Same as Kgio::PipeMethods#kgio_read, except EOFError is raised
130 * on EOF without a backtrace. This method is intended as a
131 * drop-in replacement for places where IO#readpartial is used, and
132 * may be aliased as such.
134 static VALUE
kgio_read_bang(int argc
, VALUE
*argv
, VALUE io
)
136 VALUE rv
= my_read(1, argc
, argv
, io
);
138 if (NIL_P(rv
)) my_eof_error();
145 * io.kgio_tryread(maxlen) -> buffer
146 * io.kgio_tryread(maxlen, buffer) -> buffer
148 * Reads at most maxlen bytes from the stream socket. Returns with a
149 * newly allocated buffer, or may reuse an existing buffer if supplied.
151 * Returns nil on EOF.
153 * Returns :wait_readable if EAGAIN is encountered.
155 static VALUE
kgio_tryread(int argc
, VALUE
*argv
, VALUE io
)
157 return my_read(0, argc
, argv
, io
);
160 #ifdef USE_MSG_DONTWAIT
161 static VALUE
my_recv(int io_wait
, int argc
, VALUE
*argv
, VALUE io
)
166 prepare_read(&a
, argc
, argv
, io
);
170 n
= (long)recv(a
.fd
, a
.ptr
, a
.len
, MSG_DONTWAIT
);
171 if (read_check(&a
, n
, "recv", io_wait
) != 0)
178 * This method may be optimized on some systems (e.g. GNU/Linux) to use
179 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
180 * Otherwise this is the same as Kgio::PipeMethods#kgio_read
182 static VALUE
kgio_recv(int argc
, VALUE
*argv
, VALUE io
)
184 return my_recv(1, argc
, argv
, io
);
188 * Same as Kgio::SocketMethods#kgio_read, except EOFError is raised
189 * on EOF without a backtrace
191 static VALUE
kgio_recv_bang(int argc
, VALUE
*argv
, VALUE io
)
193 VALUE rv
= my_recv(1, argc
, argv
, io
);
195 if (NIL_P(rv
)) my_eof_error();
200 * This method may be optimized on some systems (e.g. GNU/Linux) to use
201 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
202 * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread
204 static VALUE
kgio_tryrecv(int argc
, VALUE
*argv
, VALUE io
)
206 return my_recv(0, argc
, argv
, io
);
208 #else /* ! USE_MSG_DONTWAIT */
209 # define kgio_recv kgio_read
210 # define kgio_recv_bang kgio_read_bang
211 # define kgio_tryrecv kgio_tryread
212 #endif /* USE_MSG_DONTWAIT */
214 static void prepare_write(struct io_args
*a
, VALUE io
, VALUE str
)
216 a
->buf
= (TYPE(str
) == T_STRING
) ? str
: rb_obj_as_string(str
);
217 a
->ptr
= RSTRING_PTR(a
->buf
);
218 a
->len
= RSTRING_LEN(a
->buf
);
220 a
->fd
= my_fileno(io
);
223 static int write_check(struct io_args
*a
, long n
, const char *msg
, int io_wait
)
228 } else if (n
== -1) {
231 if (errno
== EAGAIN
) {
232 long written
= RSTRING_LEN(a
->buf
) - a
->len
;
235 (void)kgio_call_wait_writable(a
->io
);
237 /* buf may be modified in other thread/fiber */
238 a
->len
= RSTRING_LEN(a
->buf
) - written
;
241 a
->ptr
= RSTRING_PTR(a
->buf
) + written
;
243 } else if (written
> 0) {
244 a
->buf
= rb_str_new(a
->ptr
, a
->len
);
246 a
->buf
= sym_wait_writable
;
252 assert(n
>= 0 && n
< a
->len
&& "write/send syscall broken?");
260 static VALUE
my_write(VALUE io
, VALUE str
, int io_wait
)
265 prepare_write(&a
, io
, str
);
266 set_nonblocking(a
.fd
);
268 n
= (long)write(a
.fd
, a
.ptr
, a
.len
);
269 if (write_check(&a
, n
, "write", io_wait
) != 0)
277 * io.kgio_write(str) -> nil
279 * Returns nil when the write completes.
281 * Calls whatever is is defined to be the kgio_wait_writable method
284 static VALUE
kgio_write(VALUE io
, VALUE str
)
286 return my_write(io
, str
, 1);
292 * io.kgio_trywrite(str) -> nil or :wait_writable
294 * Returns nil if the write was completed in full.
296 * Returns a String containing the unwritten portion if EAGAIN
297 * was encountered, but some portion was successfully written.
299 * Returns :wait_writable if EAGAIN is encountered and nothing
302 static VALUE
kgio_trywrite(VALUE io
, VALUE str
)
304 return my_write(io
, str
, 0);
307 #ifdef USE_MSG_DONTWAIT
309 * This method behaves like Kgio::PipeMethods#kgio_write, except
310 * it will use send(2) with the MSG_DONTWAIT flag on sockets to
311 * avoid unnecessary calls to fcntl(2).
313 static VALUE
my_send(VALUE io
, VALUE str
, int io_wait
)
318 prepare_write(&a
, io
, str
);
320 n
= (long)send(a
.fd
, a
.ptr
, a
.len
, MSG_DONTWAIT
);
321 if (write_check(&a
, n
, "send", io_wait
) != 0)
327 * This method may be optimized on some systems (e.g. GNU/Linux) to use
328 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
329 * Otherwise this is the same as Kgio::PipeMethods#kgio_write
331 static VALUE
kgio_send(VALUE io
, VALUE str
)
333 return my_send(io
, str
, 1);
337 * This method may be optimized on some systems (e.g. GNU/Linux) to use
338 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
339 * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite
341 static VALUE
kgio_trysend(VALUE io
, VALUE str
)
343 return my_send(io
, str
, 0);
345 #else /* ! USE_MSG_DONTWAIT */
346 # define kgio_send kgio_write
347 # define kgio_trysend kgio_trywrite
348 #endif /* ! USE_MSG_DONTWAIT */
350 void init_kgio_read_write(void)
352 VALUE mPipeMethods
, mSocketMethods
;
353 VALUE mKgio
= rb_define_module("Kgio");
354 VALUE mWaiters
= rb_const_get(mKgio
, rb_intern("DefaultWaiters"));
356 sym_wait_readable
= ID2SYM(rb_intern("wait_readable"));
357 sym_wait_writable
= ID2SYM(rb_intern("wait_writable"));
360 * Document-module: Kgio::PipeMethods
362 * This module may be used used to create classes that respond to
363 * various Kgio methods for reading and writing. This is included
364 * in Kgio::Pipe by default.
366 mPipeMethods
= rb_define_module_under(mKgio
, "PipeMethods");
367 rb_define_method(mPipeMethods
, "kgio_read", kgio_read
, -1);
368 rb_define_method(mPipeMethods
, "kgio_read!", kgio_read_bang
, -1);
369 rb_define_method(mPipeMethods
, "kgio_write", kgio_write
, 1);
370 rb_define_method(mPipeMethods
, "kgio_tryread", kgio_tryread
, -1);
371 rb_define_method(mPipeMethods
, "kgio_trywrite", kgio_trywrite
, 1);
374 * Document-module: Kgio::SocketMethods
376 * This method behaves like Kgio::PipeMethods, but contains
377 * optimizations for sockets on certain operating systems
380 mSocketMethods
= rb_define_module_under(mKgio
, "SocketMethods");
381 rb_define_method(mSocketMethods
, "kgio_read", kgio_recv
, -1);
382 rb_define_method(mSocketMethods
, "kgio_read!", kgio_recv_bang
, -1);
383 rb_define_method(mSocketMethods
, "kgio_write", kgio_send
, 1);
384 rb_define_method(mSocketMethods
, "kgio_tryread", kgio_tryrecv
, -1);
385 rb_define_method(mSocketMethods
, "kgio_trywrite", kgio_trysend
, 1);
388 * Returns the client IPv4 address of the socket in dotted quad
389 * form as a string. This is always the value of the
390 * Kgio::LOCALHOST constant for UNIX domain sockets.
392 rb_define_attr(mSocketMethods
, "kgio_addr", 1, 1);
394 eErrno_EPIPE
= rb_const_get(rb_mErrno
, rb_intern("EPIPE"));
395 eErrno_ECONNRESET
= rb_const_get(rb_mErrno
, rb_intern("ECONNRESET"));
396 rb_include_module(mPipeMethods
, mWaiters
);
397 rb_include_module(mSocketMethods
, mWaiters
);