use rb_thread_io_blocking_region() under 1.9.3dev
[socket_dontwait.git] / ext / socket_dontwait / socket_dontwait_ext.c
blob9a57f5b435a702629894cc309053098b04495399
1 #include <ruby.h>
2 #include <ruby/io.h>
3 #include <errno.h>
4 #include <sys/types.h>
5 #include <sys/socket.h>
6 #include <fcntl.h>
7 #ifndef MSG_DONTWAIT
8 # error MSG_DONTWAIT not defined!
9 #endif
11 struct io_args {
12 VALUE buf;
13 char *ptr;
14 long len;
15 int fd;
18 #ifndef HAVE_RB_THREAD_IO_BLOCKING_REGION
19 # define rb_thread_io_blocking_region(fn,data,fd) \
20 rb_thread_blocking_region((fn),(data), RUBY_UBF_IO, 0)
21 #endif
22 static long my_tbr(rb_blocking_function_t *fn, void *ptr)
24 struct io_args *args = (struct io_args *)ptr;
26 return (long)rb_thread_io_blocking_region(fn, ptr, args->fd);
29 static int set_blocking(int fd)
31 int flags = fcntl(fd, F_GETFL);
33 if (flags == -1)
34 return flags;
36 if ((flags & O_NONBLOCK))
37 flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
39 return flags;
42 static int can_retry(int fd)
44 if (errno == EINTR)
45 return 1;
46 if (errno == EAGAIN)
47 return set_blocking(fd) == -1 ? 0 : 1;
48 return 0;
51 static void prepare_read_buf(struct io_args *a, VALUE length)
53 a->len = NUM2LONG(length);
54 if (NIL_P(a->buf)) {
55 a->buf = rb_str_new(NULL, a->len);
56 } else {
57 StringValue(a->buf);
58 rb_str_resize(a->buf, a->len);
60 a->ptr = RSTRING_PTR(a->buf);
63 static void prepare_read_io(struct io_args *a, VALUE io)
65 rb_io_t *fptr;
67 GetOpenFile(io, fptr);
68 rb_io_check_readable(fptr);
69 a->fd = fptr->fd;
72 static void read_args(struct io_args *a, int argc, VALUE *argv, VALUE io)
74 VALUE length;
76 prepare_read_io(a, io);
77 rb_scan_args(argc, argv, "11", &length, &a->buf);
78 prepare_read_buf(a, length);
81 static VALUE read_retval(struct io_args *a, long n, const char *msg)
83 if (n == -1) {
84 rb_str_set_len(a->buf, 0);
85 rb_sys_fail(msg);
87 rb_str_set_len(a->buf, n);
88 if (n == 0)
89 rb_eof_error();
91 return a->buf;
95 * call-seq:
96 * ios.read_nonblock(maxlen) -> string
97 * ios.read_nonblock(maxlen, outbuf) -> outbuf
99 * This behaves like IO#read_nonblock in Ruby core.
100 * Unlike IO#read_nonblock, this does not have the side effect of
101 * setting the O_NONBLOCK flag on the file descriptor. It should
102 * otherwise behave exactly like IO#read_nonblock when dealing with
103 * sockets.
105 * Unlike BasicSocket#recv_nonblock, this allows +outbuf+ to be
106 * specified and reused to reduce impact on GC. This method never
107 * releases the GVL.
109 static VALUE read_nonblock(int argc, VALUE *argv, VALUE io)
111 struct io_args a;
112 long n;
114 read_args(&a, argc, argv, io);
115 if (a.len == 0)
116 return a.buf;
118 n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT);
120 return read_retval(&a, n, "recv");
123 /* used to implement readpartial if the initial recv() fails with EAGAIN */
124 static VALUE recv_once(void *arg)
126 struct io_args *a = arg;
127 long n = (long)recv(a->fd, a->ptr, a->len, 0);
129 return (VALUE)n;
133 * call-seq:
134 * ios.readpartial(maxlen) -> string
135 * ios.readpartial(maxlen, outbuf) -> outbuf
137 * This behaves like IO#readpartial from Ruby core.
138 * If line buffering (IO#gets) is never used for +ios+, then
139 * this can be safely used with file descriptors higher than 1023.
140 * If data is immediately not available, this will _unset_ the
141 * O_NONBLOCK flag, release the GVL, and block on the socket.
143 static VALUE readpartial(int argc, VALUE *argv, VALUE io)
145 struct io_args a;
146 long n;
148 read_args(&a, argc, argv, io);
149 if (a.len == 0)
150 return a.buf;
152 /* try optimistically first */
153 n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT);
155 while (n < 0 && can_retry(a.fd)) {
156 rb_str_locktmp(a.buf);
157 /* ugh, nothing available: block on the socket */
158 n = my_tbr(recv_once, &a);
159 rb_str_unlocktmp(a.buf);
162 return read_retval(&a, n, "recv");
165 /* used to implement BasicSocket#read */
166 static VALUE recv_all(void *arg)
168 struct io_args *a = arg;
169 long n = (long)recv(a->fd, a->ptr, a->len, MSG_WAITALL);
171 if (n >= 0) {
172 a->ptr += n;
173 a->len -= n;
175 return (VALUE)n;
178 /* used to implement BasicSocket#read() */
179 static VALUE read_all(struct io_args *a)
181 int rd_size = 16384;
182 long cur_len, n;
184 prepare_read_buf(a, INT2FIX(rd_size));
186 rb_str_locktmp(a->buf);
187 for (;;) {
188 do {
189 n = my_tbr(recv_all, a);
190 } while ((n < 0 && can_retry(a->fd)) || (n > 0 && a->len > 0));
192 cur_len = RSTRING_LEN(a->buf);
194 if (n == 0 || a->len > 0) {
195 rb_str_unlocktmp(a->buf);
196 rb_str_set_len(a->buf, cur_len - a->len);
197 return a->buf;
200 /* everything was fully read, allocate more */
201 rb_str_unlocktmp(a->buf);
202 rb_str_resize(a->buf, cur_len + rd_size);
203 rb_str_locktmp(a->buf);
204 a->ptr = RSTRING_PTR(a->buf) + cur_len;
205 a->len = rd_size;
210 * call-seq:
211 * ios.read([length [, buffer]]) -> string, buffer, or nil
213 * This behaves like IO#readfrom Ruby core.
214 * If line buffering (IO#gets) is never used for +ios+, then
215 * this can be safely used with file descriptors higher than 1023.
216 * If data is not immediately available, this will _unset_ the
217 * O_NONBLOCK flag, release the GVL, and block on the socket.
218 * This will use the MSG_WAITALL flag for the recv(2) syscall to
219 * reduce context switching.
221 static VALUE xread(int argc, VALUE *argv, VALUE io)
223 struct io_args a;
224 VALUE length;
225 long n;
227 prepare_read_io(&a, io);
228 rb_scan_args(argc, argv, "02", &length, &a.buf);
230 if (NIL_P(length))
231 return read_all(&a);
233 prepare_read_buf(&a, length);
234 if (a.len == 0)
235 return a.buf;
237 /* try to read as much as possible without blocking */
239 n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT);
240 while (n > 0 && (a.ptr += n) && (a.len -= n) > 0);
242 /* release the GVL to block on whatever's left */
243 rb_str_locktmp(a.buf);
244 while (a.len > 0 && n != 0) {
245 n = my_tbr(recv_all, &a);
246 if (n < 0) {
247 if (!can_retry(a.fd))
248 break;
251 rb_str_unlocktmp(a.buf);
252 n = RSTRING_LEN(a.buf) - a.len;
253 rb_str_set_len(a.buf, n);
254 if (n == 0) {
255 if (errno)
256 rb_sys_fail("recv");
257 return Qnil;
260 return a.buf;
263 static void prepare_write_args(struct io_args *a, VALUE io, VALUE str)
265 rb_io_t *fptr;
267 a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str);
268 a->ptr = RSTRING_PTR(a->buf);
269 a->len = RSTRING_LEN(a->buf);
270 GetOpenFile(io, fptr);
271 rb_io_check_writable(fptr);
272 a->fd = fptr->fd;
276 * call-seq:
277 * ios.write_nonblock(string) -> integer
279 * This behaves like IO#write_nonblock in Ruby core.
280 * Unlike IO#write_nonblock, this does not have the side effect of
281 * setting the O_NONBLOCK flag on the file descriptor. It should
282 * otherwise behave exactly like IO#write_nonblock when dealing with
283 * sockets.
285 * This method never releases the GVL.
287 static VALUE write_nonblock(VALUE io, VALUE str)
289 struct io_args a;
290 long n;
292 prepare_write_args(&a, io, str);
293 n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
294 if (n == -1)
295 rb_sys_fail("send");
297 return LONG2FIX(n);
300 /* used to implement BasicSocket#write */
301 static VALUE send_once(void *args)
303 struct io_args *a = args;
304 long n = (long)send(a->fd, a->ptr, a->len, 0);
306 if (n >= 0) {
307 a->ptr += n;
308 a->len -= n;
310 return (VALUE)n;
314 * call-seq:
315 * ios.write(string) -> integer
317 * This behaves like IO#write in Ruby core.
319 * This can be safely used to block on file descriptors higher than 1023.
320 * If socket buffer space is not immediately available in the kernel,
321 * this will _unset_ the O_NONBLOCK flag, release the GVL, and block
322 * on the socket until data is written.
324 static VALUE xwrite(VALUE io, VALUE str)
326 struct io_args a;
327 long n;
329 prepare_write_args(&a, io, str);
331 /* optimistically try to send everything w/o releasing GVL */
332 n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
333 if (n == a.len)
334 return LONG2FIX(n);
336 /* buffer may be expanded in the kernel, keep trying w/o blocking */
337 while (n >= 0 && (a.ptr += n) && (a.len -= n) > 0)
338 n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
340 /* all done, we managed to finish without releasing the GVL */
341 if (a.len == 0)
342 return LONG2FIX(RSTRING_LEN(a.buf));
344 if (n < 0 && !can_retry(a.fd))
345 rb_sys_fail("send");
347 rb_str_locktmp(a.buf);
348 while (a.len > 0) {
349 n = my_tbr(send_once, &a);
350 if (n < 0) {
351 if (!can_retry(a.fd))
352 break;
355 rb_str_unlocktmp(a.buf);
356 n = RSTRING_LEN(a.buf) - a.len;
357 rb_str_set_len(a.buf, n);
359 if (a.len > 0)
360 rb_sys_fail("send");
361 return LONG2FIX(n);
365 * call-seq:
366 * ios.sync = boolean -> boolean
368 * socket_dontwait makes BasicSocket#sync= a no-op.
369 * Ruby sockets already default to synchronized operation,
370 * and socket_dontwait prevents users from changing this default
371 * as it increases complexity.
373 static VALUE set_sync(VALUE io, VALUE boolean)
375 return boolean;
378 void Init_socket_dontwait_ext(void)
380 VALUE mod = rb_define_module("SocketDontwait");
382 rb_define_method(mod, "sync=", set_sync, 1);
383 rb_define_method(mod, "read", xread, -1);
384 rb_define_method(mod, "read_nonblock", read_nonblock, -1);
385 rb_define_method(mod, "readpartial", readpartial, -1);
386 rb_define_method(mod, "write_nonblock", write_nonblock, 1);
387 rb_define_method(mod, "write", xwrite, 1);