expand Kgio::*#kgio_read! documentation
[kgio.git] / ext / kgio / read_write.c
blob890392c857051327ad7cf8c50834c716d8304d30
1 #include "kgio.h"
2 static VALUE mKgio_WaitReadable, mKgio_WaitWritable;
3 static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
5 /*
6 * we know MSG_DONTWAIT works properly on all stream sockets under Linux
7 * we can define this macro for other platforms as people care and
8 * notice.
9 */
10 #if defined(__linux__) && ! defined(USE_MSG_DONTWAIT)
11 # define USE_MSG_DONTWAIT
12 #endif
14 NORETURN(static void raise_empty_bt(VALUE, const char *));
15 NORETURN(static void my_eof_error(void));
16 NORETURN(static void wr_sys_fail(const char *));
18 static void raise_empty_bt(VALUE err, const char *msg)
20 VALUE exc = rb_exc_new2(err, msg);
21 VALUE bt = rb_ary_new();
23 rb_funcall(exc, rb_intern("set_backtrace"), 1, bt);
24 rb_exc_raise(exc);
27 static void my_eof_error(void)
29 raise_empty_bt(rb_eEOFError, "");
32 static void wr_sys_fail(const char *msg)
34 switch (errno) {
35 case EPIPE:
36 errno = 0;
37 raise_empty_bt(eErrno_EPIPE, msg);
38 case ECONNRESET:
39 errno = 0;
40 raise_empty_bt(eErrno_ECONNRESET, msg);
42 rb_sys_fail(msg);
45 static void prepare_read(struct io_args *a, int argc, VALUE *argv, VALUE io)
47 VALUE length;
49 a->io = io;
50 a->fd = my_fileno(io);
51 rb_scan_args(argc, argv, "11", &length, &a->buf);
52 a->len = NUM2LONG(length);
53 if (NIL_P(a->buf)) {
54 a->buf = rb_str_new(NULL, a->len);
55 } else {
56 StringValue(a->buf);
57 rb_str_resize(a->buf, a->len);
59 a->ptr = RSTRING_PTR(a->buf);
62 static int read_check(struct io_args *a, long n, const char *msg, int io_wait)
64 if (n == -1) {
65 if (errno == EINTR)
66 return -1;
67 rb_str_set_len(a->buf, 0);
68 if (errno == EAGAIN) {
69 if (io_wait) {
70 kgio_wait_readable(a->io, a->fd);
72 /* buf may be modified in other thread/fiber */
73 rb_str_resize(a->buf, a->len);
74 a->ptr = RSTRING_PTR(a->buf);
75 return -1;
76 } else {
77 a->buf = mKgio_WaitReadable;
78 return 0;
81 rb_sys_fail(msg);
83 rb_str_set_len(a->buf, n);
84 if (n == 0)
85 a->buf = Qnil;
86 return 0;
89 static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io)
91 struct io_args a;
92 long n;
94 prepare_read(&a, argc, argv, io);
96 if (a.len > 0) {
97 set_nonblocking(a.fd);
98 retry:
99 n = (long)read(a.fd, a.ptr, a.len);
100 if (read_check(&a, n, "read", io_wait) != 0)
101 goto retry;
103 return a.buf;
107 * call-seq:
109 * io.kgio_read(maxlen) -> buffer
110 * io.kgio_read(maxlen, buffer) -> buffer
112 * Reads at most maxlen bytes from the stream socket. Returns with a
113 * newly allocated buffer, or may reuse an existing buffer if supplied.
115 * Calls the method assigned to Kgio.wait_readable, or blocks in a
116 * thread-safe manner for writability.
118 * Returns nil on EOF.
120 * This behaves like read(2) and IO#readpartial, NOT fread(3) or
121 * IO#read which possess read-in-full behavior.
123 static VALUE kgio_read(int argc, VALUE *argv, VALUE io)
125 return my_read(1, argc, argv, io);
129 * Same as Kgio::PipeMethods#kgio_read, except EOFError is raised
130 * on EOF without a backtrace. This method is intended as a
131 * drop-in replacement for places where IO#readpartial is used, and
132 * may be aliased as such.
134 static VALUE kgio_read_bang(int argc, VALUE *argv, VALUE io)
136 VALUE rv = my_read(1, argc, argv, io);
138 if (NIL_P(rv)) my_eof_error();
139 return rv;
143 * call-seq:
145 * io.kgio_tryread(maxlen) -> buffer
146 * io.kgio_tryread(maxlen, buffer) -> buffer
148 * Reads at most maxlen bytes from the stream socket. Returns with a
149 * newly allocated buffer, or may reuse an existing buffer if supplied.
151 * Returns nil on EOF.
153 * Returns Kgio::WaitReadable if EAGAIN is encountered.
155 static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io)
157 return my_read(0, argc, argv, io);
160 #ifdef USE_MSG_DONTWAIT
161 static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io)
163 struct io_args a;
164 long n;
166 prepare_read(&a, argc, argv, io);
168 if (a.len > 0) {
169 retry:
170 n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT);
171 if (read_check(&a, n, "recv", io_wait) != 0)
172 goto retry;
174 return a.buf;
178 * This method may be optimized on some systems (e.g. GNU/Linux) to use
179 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
180 * Otherwise this is the same as Kgio::PipeMethods#kgio_read
182 static VALUE kgio_recv(int argc, VALUE *argv, VALUE io)
184 return my_recv(1, argc, argv, io);
188 * Same as Kgio::SocketMethods#kgio_read, except EOFError is raised
189 * on EOF without a backtrace
191 static VALUE kgio_recv_bang(int argc, VALUE *argv, VALUE io)
193 VALUE rv = my_recv(1, argc, argv, io);
195 if (NIL_P(rv)) my_eof_error();
196 return rv;
200 * This method may be optimized on some systems (e.g. GNU/Linux) to use
201 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
202 * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread
204 static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io)
206 return my_recv(0, argc, argv, io);
208 #else /* ! USE_MSG_DONTWAIT */
209 # define kgio_recv kgio_read
210 # define kgio_recv_bang kgio_read_bang
211 # define kgio_tryrecv kgio_tryread
212 #endif /* USE_MSG_DONTWAIT */
214 static void prepare_write(struct io_args *a, VALUE io, VALUE str)
216 a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str);
217 a->ptr = RSTRING_PTR(a->buf);
218 a->len = RSTRING_LEN(a->buf);
219 a->io = io;
220 a->fd = my_fileno(io);
223 static int write_check(struct io_args *a, long n, const char *msg, int io_wait)
225 if (a->len == n) {
226 done:
227 a->buf = Qnil;
228 } else if (n == -1) {
229 if (errno == EINTR)
230 return -1;
231 if (errno == EAGAIN) {
232 long written = RSTRING_LEN(a->buf) - a->len;
234 if (io_wait) {
235 kgio_wait_writable(a->io, a->fd);
237 /* buf may be modified in other thread/fiber */
238 a->len = RSTRING_LEN(a->buf) - written;
239 if (a->len <= 0)
240 goto done;
241 a->ptr = RSTRING_PTR(a->buf) + written;
242 return -1;
243 } else if (written > 0) {
244 a->buf = rb_str_new(a->ptr, a->len);
245 } else {
246 a->buf = mKgio_WaitWritable;
248 return 0;
250 wr_sys_fail(msg);
251 } else {
252 assert(n >= 0 && n < a->len && "write/send syscall broken?");
253 a->ptr += n;
254 a->len -= n;
255 return -1;
257 return 0;
260 static VALUE my_write(VALUE io, VALUE str, int io_wait)
262 struct io_args a;
263 long n;
265 prepare_write(&a, io, str);
266 set_nonblocking(a.fd);
267 retry:
268 n = (long)write(a.fd, a.ptr, a.len);
269 if (write_check(&a, n, "write", io_wait) != 0)
270 goto retry;
271 return a.buf;
275 * call-seq:
277 * io.kgio_write(str) -> nil
279 * Returns nil when the write completes.
281 * Calls the method Kgio.wait_writable if it is set. Otherwise this
282 * blocks in a thread-safe manner until all data is written or a
283 * fatal error occurs.
285 static VALUE kgio_write(VALUE io, VALUE str)
287 return my_write(io, str, 1);
291 * call-seq:
293 * io.kgio_trywrite(str) -> nil or Kgio::WaitWritable
295 * Returns nil if the write was completed in full.
297 * Returns a String containing the unwritten portion if EAGAIN
298 * was encountered, but some portion was successfully written.
300 * Returns Kgio::WaitWritable if EAGAIN is encountered and nothing
301 * was written.
303 static VALUE kgio_trywrite(VALUE io, VALUE str)
305 return my_write(io, str, 0);
308 #ifdef USE_MSG_DONTWAIT
310 * This method behaves like Kgio::PipeMethods#kgio_write, except
311 * it will use send(2) with the MSG_DONTWAIT flag on sockets to
312 * avoid unnecessary calls to fcntl(2).
314 static VALUE my_send(VALUE io, VALUE str, int io_wait)
316 struct io_args a;
317 long n;
319 prepare_write(&a, io, str);
320 retry:
321 n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
322 if (write_check(&a, n, "send", io_wait) != 0)
323 goto retry;
324 return a.buf;
328 * This method may be optimized on some systems (e.g. GNU/Linux) to use
329 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
330 * Otherwise this is the same as Kgio::PipeMethods#kgio_write
332 static VALUE kgio_send(VALUE io, VALUE str)
334 return my_send(io, str, 1);
338 * This method may be optimized on some systems (e.g. GNU/Linux) to use
339 * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
340 * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite
342 static VALUE kgio_trysend(VALUE io, VALUE str)
344 return my_send(io, str, 0);
346 #else /* ! USE_MSG_DONTWAIT */
347 # define kgio_send kgio_write
348 # define kgio_trysend kgio_trywrite
349 #endif /* ! USE_MSG_DONTWAIT */
351 void init_kgio_read_write(void)
353 VALUE mPipeMethods, mSocketMethods;
354 VALUE mKgio = rb_define_module("Kgio");
356 mKgio_WaitReadable = rb_const_get(mKgio, rb_intern("WaitReadable"));
357 mKgio_WaitWritable = rb_const_get(mKgio, rb_intern("WaitWritable"));
360 * Document-module: Kgio::PipeMethods
362 * This module may be used used to create classes that respond to
363 * various Kgio methods for reading and writing. This is included
364 * in Kgio::Pipe by default.
366 mPipeMethods = rb_define_module_under(mKgio, "PipeMethods");
367 rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
368 rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
369 rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
370 rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
371 rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
374 * Document-module: Kgio::SocketMethods
376 * This method behaves like Kgio::PipeMethods, but contains
377 * optimizations for sockets on certain operating systems
378 * (e.g. GNU/Linux).
380 mSocketMethods = rb_define_module_under(mKgio, "SocketMethods");
381 rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
382 rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
383 rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
384 rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
385 rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
388 * Returns the client IPv4 address of the socket in dotted quad
389 * form as a string. This is always the value of the
390 * Kgio::LOCALHOST constant for UNIX domain sockets.
392 rb_define_attr(mSocketMethods, "kgio_addr", 1, 1);
394 eErrno_EPIPE = rb_const_get(rb_mErrno, rb_intern("EPIPE"));
395 eErrno_ECONNRESET = rb_const_get(rb_mErrno, rb_intern("ECONNRESET"));