remove autopush support and make it a no-op
[kgio.git] / ext / kgio / writev.c
blob736aa6fb6b8ea56c607c47216f618ffdc9c30dfa
1 /*
2 * we're currently too lazy to use rb_ensure to free an allocation, so we
3 * the abuse rb_str_* API for a temporary buffer
4 */
5 #define RSTRING_MODIFIED 1
7 #include "kgio.h"
8 #include "my_fileno.h"
9 #include "nonblock.h"
10 #ifdef HAVE_WRITEV
11 # include <sys/uio.h>
12 # define USE_WRITEV 1
13 #else
14 # define USE_WRITEV 0
15 static ssize_t assert_writev(int fd, void* iov, int len)
17 assert(0 && "you should not try to call writev");
18 return -1;
20 # define writev assert_writev
21 #endif
23 #ifndef HAVE_RB_ARY_SUBSEQ
24 static inline VALUE my_ary_subseq(VALUE ary, long idx, long len)
26 VALUE args[2] = { LONG2FIX(idx), LONG2FIX(len) };
28 return rb_ary_aref(2, args, ary);
30 #define MY_ARY_SUBSEQ(ary,idx,len) my_ary_subseq((ary),(idx),(len))
31 #else
32 #define MY_ARY_SUBSEQ(ary,idx,len) rb_ary_subseq((ary),(idx),(len))
33 #endif
35 static VALUE sym_wait_writable;
37 #ifndef HAVE_WRITEV
38 #define iovec my_iovec
39 struct my_iovec {
40 void *iov_base;
41 size_t iov_len;
43 #endif
45 /* tests for choosing following constants were done on Linux 3.0 x86_64
46 * (Ubuntu 12.04) Core i3 i3-2330M slowed to 1600MHz
47 * testing script https://gist.github.com/2850641
48 * fill free to make more thorough testing and choose better value
51 /* test shows that its meaningless to set WRITEV_MEMLIMIT more that 1M
52 * even when tcp_wmem set to relatively high value (2M) (in fact, it becomes
53 * even slower). 512K performs a bit better in average case. */
54 #define WRITEV_MEMLIMIT (512*1024)
55 /* same test shows that custom_writev is faster than glibc writev when
56 * average string is smaller than ~500 bytes and slower when average strings
57 * is greater then ~600 bytes. 512 bytes were choosen cause current compilers
58 * turns x/512 into x>>9 */
59 #define WRITEV_IMPL_THRESHOLD 512
61 static int iov_max = 1024; /* this could be overriden in init */
63 struct wrv_args {
64 VALUE io;
65 VALUE buf;
66 VALUE vec_buf; /* FIXME: this requires RSTRING_MODIFY for rbx */
67 struct iovec *vec;
68 int iov_cnt;
69 size_t batch_len;
70 int something_written;
71 int fd;
74 static ssize_t custom_writev(int fd, const struct iovec *vec, int iov_cnt, size_t total_len)
76 int i;
77 ssize_t result;
78 char *buf, *curbuf;
79 const struct iovec *curvec = vec;
81 /* we do not want to use ruby's xmalloc because
82 * it can fire GC, and we'll free buffer shortly anyway */
83 curbuf = buf = malloc(total_len);
84 if (buf == NULL) return -1;
86 for (i = 0; i < iov_cnt; i++, curvec++) {
87 memcpy(curbuf, curvec->iov_base, curvec->iov_len);
88 curbuf += curvec->iov_len;
91 result = write(fd, buf, total_len);
93 /* free() may alter errno */
94 i = errno;
95 free(buf);
96 errno = i;
98 return result;
101 static void prepare_writev(struct wrv_args *a, VALUE io, VALUE ary)
103 a->io = io;
104 a->fd = my_fileno(io);
105 a->something_written = 0;
107 if (TYPE(ary) == T_ARRAY)
108 /* rb_ary_subseq will not copy array unless it modified */
109 a->buf = MY_ARY_SUBSEQ(ary, 0, RARRAY_LEN(ary));
110 else
111 a->buf = rb_Array(ary);
113 a->vec_buf = rb_str_new(0, 0);
114 a->vec = NULL;
117 #ifndef RARRAY_LENINT
118 static inline int rarray_int(VALUE val)
120 long num = RARRAY_LEN(val);
122 if ((long)(int)num != num)
123 rb_raise(rb_eRangeError, "%ld cannot to be an int", num);
125 return (int)num;
127 #define RARRAY_LENINT(n) rarray_int(n)
128 #endif
130 static void fill_iovec(struct wrv_args *a)
132 int i;
133 struct iovec *curvec;
135 a->iov_cnt = RARRAY_LENINT(a->buf);
136 a->batch_len = 0;
137 if (a->iov_cnt == 0) return;
138 if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
139 rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
140 curvec = a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
142 for (i=0; i < a->iov_cnt; i++, curvec++) {
143 VALUE str = rb_ary_entry(a->buf, i);
144 long str_len, next_len;
146 if (TYPE(str) != T_STRING) {
147 str = rb_obj_as_string(str);
148 rb_ary_store(a->buf, i, str);
151 str_len = RSTRING_LEN(str);
153 /* lets limit total memory to write,
154 * but always take first string */
155 next_len = a->batch_len + str_len;
156 if (i && next_len > WRITEV_MEMLIMIT) {
157 a->iov_cnt = i;
158 break;
160 a->batch_len = next_len;
162 curvec->iov_base = RSTRING_PTR(str);
163 curvec->iov_len = str_len;
167 static long trim_writev_buffer(struct wrv_args *a, ssize_t n)
169 long i;
170 long ary_len = RARRAY_LEN(a->buf);
172 if (n == (ssize_t)a->batch_len) {
173 i = a->iov_cnt;
174 n = 0;
175 } else {
176 for (i = 0; n && i < ary_len; i++) {
177 VALUE entry = rb_ary_entry(a->buf, i);
178 n -= (ssize_t)RSTRING_LEN(entry);
179 if (n < 0) break;
183 /* all done */
184 if (i == ary_len) {
185 assert(n == 0 && "writev system call is broken");
186 a->buf = Qnil;
187 return 0;
190 /* partially done, remove fully-written buffers */
191 if (i > 0)
192 a->buf = MY_ARY_SUBSEQ(a->buf, i, ary_len - i);
194 /* setup+replace partially written buffer */
195 if (n < 0) {
196 VALUE str = rb_ary_entry(a->buf, 0);
197 long str_len = RSTRING_LEN(str);
198 str = MY_STR_SUBSEQ(str, str_len + n, -n);
199 rb_ary_store(a->buf, 0, str);
201 return RARRAY_LEN(a->buf);
204 static long
205 writev_check(struct wrv_args *a, ssize_t n, const char *msg, int io_wait)
207 if (n >= 0) {
208 if (n > 0) a->something_written = 1;
209 return trim_writev_buffer(a, n);
210 } else if (n < 0) {
211 if (errno == EINTR) {
212 a->fd = my_fileno(a->io);
213 return -1;
215 if (errno == EAGAIN) {
216 if (io_wait) {
217 (void)kgio_call_wait_writable(a->io);
218 return -1;
219 } else if (!a->something_written) {
220 a->buf = sym_wait_writable;
222 return 0;
224 kgio_wr_sys_fail(msg);
226 return 0;
229 static VALUE my_writev(VALUE io, VALUE ary, int io_wait)
231 struct wrv_args a;
232 ssize_t n;
234 prepare_writev(&a, io, ary);
235 set_nonblocking(a.fd);
237 do {
238 fill_iovec(&a);
239 if (a.iov_cnt == 0)
240 n = 0;
241 else if (a.iov_cnt == 1)
242 n = write(a.fd, a.vec[0].iov_base, a.vec[0].iov_len);
243 /* for big strings use library function */
244 else if (USE_WRITEV &&
245 ((long)(a.batch_len/WRITEV_IMPL_THRESHOLD) > a.iov_cnt))
246 n = writev(a.fd, a.vec, a.iov_cnt);
247 else
248 n = custom_writev(a.fd, a.vec, a.iov_cnt, a.batch_len);
249 } while (writev_check(&a, n, "writev", io_wait) != 0);
250 rb_str_resize(a.vec_buf, 0);
252 return a.buf;
256 * call-seq:
258 * io.kgio_writev(array) -> nil
260 * Returns nil when the write completes.
262 * This may block and call any method defined to +kgio_wait_writable+
263 * for the class.
265 * Note: it uses +Array()+ semantic for converting argument, so that
266 * it will succeed if you pass something else.
268 static VALUE kgio_writev(VALUE io, VALUE ary)
270 return my_writev(io, ary, 1);
274 * call-seq:
276 * io.kgio_trywritev(array) -> nil, Array or :wait_writable
278 * Returns nil if the write was completed in full.
280 * Returns an Array of strings containing the unwritten portion
281 * if EAGAIN was encountered, but some portion was successfully written.
283 * Returns :wait_writable if EAGAIN is encountered and nothing
284 * was written.
286 * Note: it uses +Array()+ semantic for converting argument, so that
287 * it will succeed if you pass something else.
289 static VALUE kgio_trywritev(VALUE io, VALUE ary)
291 return my_writev(io, ary, 0);
295 * call-seq:
297 * Kgio.trywritev(io, array) -> nil, Array or :wait_writable
299 * Returns nil if the write was completed in full.
301 * Returns a Array of strings containing the unwritten portion if EAGAIN
302 * was encountered, but some portion was successfully written.
304 * Returns :wait_writable if EAGAIN is encountered and nothing
305 * was written.
307 * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
309 static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
311 return kgio_trywritev(io, ary);
314 void init_kgio_writev(void)
316 #ifdef IOV_MAX
317 int sys_iov_max = IOV_MAX;
318 #else
319 int sys_iov_max = (int)sysconf(_SC_IOV_MAX);
320 #endif
322 VALUE mPipeMethods, mSocketMethods;
323 VALUE mKgio = rb_define_module("Kgio");
325 if (sys_iov_max < iov_max)
326 iov_max = sys_iov_max;
328 sym_wait_writable = ID2SYM(rb_intern("wait_writable"));
330 rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
332 mPipeMethods = rb_define_module_under(mKgio, "PipeMethods");
333 rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
334 rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, 1);
336 mSocketMethods = rb_define_module_under(mKgio, "SocketMethods");
337 rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
338 rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, 1);